/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.openservices.log.producer.inner;

import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.TagContent;
import com.aliyun.openservices.log.exception.LogException;
import com.aliyun.openservices.log.producer.ProducerConfig;
import com.aliyun.openservices.log.producer.inner.BlockedData;
import com.aliyun.openservices.log.producer.inner.ClientPool;
import com.aliyun.openservices.log.producer.inner.NamedThreadFactory;
import com.aliyun.openservices.log.producer.inner.PackageData;
import com.aliyun.openservices.log.producer.inner.PackageManager;
import com.aliyun.openservices.log.request.PutLogsRequest;
import com.aliyun.openservices.log.response.PutLogsResponse;
import java.util.ArrayList;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class IOThread
extends Thread {
    private static final Logger LOGGER = LoggerFactory.getLogger(IOThread.class);
    private static final String IO_THREAD_NAME = "log-producer-io-thread";
    private static final String IO_WORKER_BASE_NAME = "log-producer-io-worker-";
    private ExecutorService cachedThreadPool;
    private BlockingQueue<BlockedData> dataQueue = new LinkedBlockingQueue<BlockedData>();
    private ClientPool clientPool;
    private PackageManager packageManager;
    private ProducerConfig producerConfig;
    private AtomicLong sendLogBytes = new AtomicLong(0L);
    private AtomicLong sendLogTimeWindowInMillis = new AtomicLong(0L);

    public static IOThread launch(ClientPool cltPool, PackageManager packageManager, ProducerConfig producerConfig) {
        IOThread ioThread = new IOThread(cltPool, packageManager, producerConfig);
        ioThread.setName(IO_THREAD_NAME);
        ioThread.setDaemon(true);
        ioThread.start();
        return ioThread;
    }

    private IOThread(ClientPool cltPool, PackageManager packageManager, ProducerConfig producerConfig) {
        this.clientPool = cltPool;
        this.packageManager = packageManager;
        this.producerConfig = producerConfig;
        this.cachedThreadPool = new ThreadPoolExecutor(0, producerConfig.maxIOThreadSizeInPool, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new NamedThreadFactory(IO_WORKER_BASE_NAME));
    }

    public void addPackage(PackageData data, int bytes) {
        data.markAddToIOBeginTime();
        try {
            this.dataQueue.put(new BlockedData(data, bytes));
        }
        catch (InterruptedException e) {
            LOGGER.error("Failed to put data into dataQueue.", (Throwable)e);
        }
        data.markAddToIOEndTime();
    }

    public void shutdown() {
        this.interrupt();
        try {
            this.join();
        }
        catch (InterruptedException e) {
            LOGGER.warn("Failed to waiting for the IOThread to die. This may lead to data loss.", (Throwable)e);
        }
        while (!this.dataQueue.isEmpty()) {
            BlockedData bd;
            try {
                bd = this.dataQueue.poll(this.producerConfig.packageTimeoutInMS / 2, TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException e) {
                LOGGER.error("Failed to poll data from dataQueue.", (Throwable)e);
                break;
            }
            if (bd == null) continue;
            this.sendData(bd);
        }
        this.cachedThreadPool.shutdown();
        try {
            if (this.cachedThreadPool.awaitTermination(2 * this.producerConfig.packageTimeoutInMS, TimeUnit.MILLISECONDS)) {
                LOGGER.info("All submitted tasks in cachedThreadPool are executed.");
            } else {
                LOGGER.warn("The cachedThreadPool is not terminated. This may lead to data loss.");
            }
        }
        catch (InterruptedException e) {
            LOGGER.warn("The thread has been interrupted during shutdown. This may lead to data loss.", (Throwable)e);
        }
    }

    public void shutdownNow() {
        this.interrupt();
        this.cachedThreadPool.shutdownNow();
    }

    private void sendData(BlockedData bd) {
        try {
            LOGGER.debug("Before execute doSendData(), blockedData={}", (Object)bd);
            this.doSendData(bd);
            LOGGER.debug("After execute doSendData(), blockedData={}", (Object)bd);
        }
        catch (Exception e) {
            LOGGER.error("Failed to send data.", (Throwable)e);
        }
        catch (Error e) {
            LOGGER.error("Failed to send data.", (Throwable)e);
        }
        finally {
            this.packageManager.releaseBytes(bd.bytes);
        }
    }

    private void doSendData(BlockedData bd) {
        Client clt = this.clientPool.getClient(bd.data.project);
        if (clt == null) {
            bd.data.callback(null, new LogException("ProjectConfigNotExist", "the config of project " + bd.data.project + " is not exist", ""), 0.0f);
        } else {
            int retry = 0;
            LogException excep = null;
            PutLogsResponse response = null;
            while (retry++ <= this.producerConfig.retryTimes) {
                try {
                    PutLogsRequest request;
                    if (bd.data.shardHash != null && !bd.data.shardHash.isEmpty()) {
                        request = new PutLogsRequest(bd.data.project, bd.data.logstore, bd.data.topic, bd.data.source, bd.data.items, bd.data.shardHash);
                        ArrayList<TagContent> tags = new ArrayList<TagContent>();
                        tags.add(new TagContent("__pack_id__", bd.data.getPackageId()));
                        request.SetTags(tags);
                        request.setContentType(this.producerConfig.logsFormat.equals("protobuf") ? "application/x-protobuf" : "application/json");
                        response = clt.PutLogs(request);
                    } else {
                        request = new PutLogsRequest(bd.data.project, bd.data.logstore, bd.data.topic, bd.data.source, bd.data.items);
                        ArrayList<TagContent> tags = new ArrayList<TagContent>();
                        tags.add(new TagContent("__pack_id__", bd.data.getPackageId()));
                        request.SetTags(tags);
                        request.setContentType(this.producerConfig.logsFormat.equals("protobuf") ? "application/x-protobuf" : "application/json");
                        response = clt.PutLogs(request);
                    }
                    long tmpBytes = this.sendLogBytes.get();
                    this.sendLogBytes.set(tmpBytes + (long)bd.bytes);
                    break;
                }
                catch (LogException e) {
                    excep = new LogException(e.GetErrorCode(), e.GetErrorMessage() + ", itemscount: " + bd.data.items.size(), e.GetRequestId());
                }
            }
            long currTime = System.currentTimeMillis();
            float sec = (float)(currTime - this.sendLogTimeWindowInMillis.get()) / 1000.0f;
            float outflow = 0.0f;
            if (sec > 0.0f) {
                outflow = (float)this.sendLogBytes.get() / sec;
            }
            bd.data.callback(response, excep, outflow);
        }
    }

    @Override
    public void run() {
        try {
            LOGGER.info("The IOThread is going to work.");
            this.handleBlockedData();
            LOGGER.info("The IOThread terminated.");
        }
        catch (Exception e) {
            LOGGER.error("Failed to handle BlockedData.", (Throwable)e);
        }
    }

    private void handleBlockedData() {
        while (!this.isInterrupted()) {
            BlockedData bd;
            long currTime = System.currentTimeMillis();
            if (currTime - this.sendLogTimeWindowInMillis.get() > 60000L) {
                this.sendLogBytes.set(0L);
                this.sendLogTimeWindowInMillis.set(currTime);
            }
            try {
                bd = this.dataQueue.poll(this.producerConfig.packageTimeoutInMS / 2, TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException e) {
                LOGGER.info("The IOThread has been interrupted when poll data from dataQueue.");
                break;
            }
            if (bd == null) continue;
            bd.data.markCompleteIOBeginTimeInMillis(this.dataQueue.size());
            try {
                this.cachedThreadPool.submit(new Runnable(){

                    @Override
                    public void run() {
                        IOThread.this.sendData(bd);
                    }
                });
            }
            catch (RejectedExecutionException e) {
                try {
                    this.dataQueue.put(bd);
                }
                catch (InterruptedException e1) {
                    LOGGER.info("Failed to put blockedData into data Queue. Try to send it in IOThread.");
                    this.sendData(bd);
                    break;
                }
            }
        }
    }
}

