/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.schedulerx.shade.com.aliyun.openservices.aliyun.log.producer;

import com.alibaba.schedulerx.shade.com.aliyun.openservices.aliyun.log.producer.Callback;
import com.alibaba.schedulerx.shade.com.aliyun.openservices.aliyun.log.producer.Producer;
import com.alibaba.schedulerx.shade.com.aliyun.openservices.aliyun.log.producer.ProducerConfig;
import com.alibaba.schedulerx.shade.com.aliyun.openservices.aliyun.log.producer.Result;
import com.alibaba.schedulerx.shade.com.aliyun.openservices.aliyun.log.producer.ShardHashAdjuster;
import com.alibaba.schedulerx.shade.com.aliyun.openservices.aliyun.log.producer.errors.MaxBatchCountExceedException;
import com.alibaba.schedulerx.shade.com.aliyun.openservices.aliyun.log.producer.errors.ProducerException;
import com.alibaba.schedulerx.shade.com.aliyun.openservices.aliyun.log.producer.internals.BatchHandler;
import com.alibaba.schedulerx.shade.com.aliyun.openservices.aliyun.log.producer.internals.IOThreadPool;
import com.alibaba.schedulerx.shade.com.aliyun.openservices.aliyun.log.producer.internals.LogAccumulator;
import com.alibaba.schedulerx.shade.com.aliyun.openservices.aliyun.log.producer.internals.Mover;
import com.alibaba.schedulerx.shade.com.aliyun.openservices.aliyun.log.producer.internals.ProducerBatch;
import com.alibaba.schedulerx.shade.com.aliyun.openservices.aliyun.log.producer.internals.RetryQueue;
import com.alibaba.schedulerx.shade.com.aliyun.openservices.aliyun.log.producer.internals.Utils;
import com.alibaba.schedulerx.shade.com.aliyun.openservices.log.common.LogItem;
import com.alibaba.schedulerx.shade.com.google.common.util.concurrent.ListenableFuture;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LogProducer
implements Producer {
    private static final Logger LOGGER = LoggerFactory.getLogger(LogProducer.class);
    private static final AtomicInteger INSTANCE_ID_GENERATOR = new AtomicInteger(0);
    private static final String LOG_PRODUCER_PREFIX = "aliyun-log-producer-";
    private static final String MOVER_SUFFIX = "-mover";
    private static final String SUCCESS_BATCH_HANDLER_SUFFIX = "-success-batch-handler";
    private static final String FAILURE_BATCH_HANDLER_SUFFIX = "-failure-batch-handler";
    private final int instanceId;
    private final String name;
    private final String producerHash;
    private final ProducerConfig producerConfig;
    private final Semaphore memoryController;
    private final RetryQueue retryQueue;
    private final IOThreadPool ioThreadPool;
    private final LogAccumulator accumulator;
    private final Mover mover;
    private final BatchHandler successBatchHandler;
    private final BatchHandler failureBatchHandler;
    private final AtomicInteger batchCount = new AtomicInteger(0);
    private final ShardHashAdjuster adjuster;

    public LogProducer(ProducerConfig producerConfig) {
        this.instanceId = INSTANCE_ID_GENERATOR.getAndIncrement();
        this.name = LOG_PRODUCER_PREFIX + this.instanceId;
        this.producerHash = Utils.generateProducerHash(this.instanceId);
        this.producerConfig = producerConfig;
        this.memoryController = new Semaphore(producerConfig.getTotalSizeInBytes());
        this.retryQueue = new RetryQueue();
        LinkedBlockingQueue<ProducerBatch> successQueue = new LinkedBlockingQueue<ProducerBatch>();
        LinkedBlockingQueue<ProducerBatch> failureQueue = new LinkedBlockingQueue<ProducerBatch>();
        this.ioThreadPool = new IOThreadPool(producerConfig.getIoThreadCount(), this.name);
        this.accumulator = new LogAccumulator(this.producerHash, producerConfig, this.memoryController, this.retryQueue, successQueue, failureQueue, this.ioThreadPool, this.batchCount);
        this.mover = new Mover(this.name + MOVER_SUFFIX, producerConfig, this.accumulator, this.retryQueue, successQueue, failureQueue, this.ioThreadPool, this.batchCount);
        this.successBatchHandler = new BatchHandler(this.name + SUCCESS_BATCH_HANDLER_SUFFIX, successQueue, this.batchCount, this.memoryController);
        this.failureBatchHandler = new BatchHandler(this.name + FAILURE_BATCH_HANDLER_SUFFIX, failureQueue, this.batchCount, this.memoryController);
        this.mover.start();
        this.successBatchHandler.start();
        this.failureBatchHandler.start();
        this.adjuster = new ShardHashAdjuster(producerConfig.getBuckets());
    }

    @Override
    public ListenableFuture<Result> send(String project, String logStore, LogItem logItem) throws InterruptedException, ProducerException {
        return this.send(project, logStore, "", "", null, logItem, null);
    }

    @Override
    public ListenableFuture<Result> send(String project, String logStore, List<LogItem> logItems) throws InterruptedException, ProducerException {
        return this.send(project, logStore, "", "", null, logItems, null);
    }

    @Override
    public ListenableFuture<Result> send(String project, String logStore, String topic, String source, LogItem logItem) throws InterruptedException, ProducerException {
        return this.send(project, logStore, topic, source, null, logItem, null);
    }

    @Override
    public ListenableFuture<Result> send(String project, String logStore, String topic, String source, List<LogItem> logItems) throws InterruptedException, ProducerException {
        return this.send(project, logStore, topic, source, null, logItems, null);
    }

    @Override
    public ListenableFuture<Result> send(String project, String logStore, String topic, String source, String shardHash, LogItem logItem) throws InterruptedException, ProducerException {
        return this.send(project, logStore, topic, source, shardHash, logItem, null);
    }

    @Override
    public ListenableFuture<Result> send(String project, String logStore, String topic, String source, String shardHash, List<LogItem> logItems) throws InterruptedException, ProducerException {
        return this.send(project, logStore, topic, source, shardHash, logItems, null);
    }

    @Override
    public ListenableFuture<Result> send(String project, String logStore, LogItem logItem, Callback callback) throws InterruptedException, ProducerException {
        return this.send(project, logStore, "", "", null, logItem, callback);
    }

    @Override
    public ListenableFuture<Result> send(String project, String logStore, List<LogItem> logItems, Callback callback) throws InterruptedException, ProducerException {
        return this.send(project, logStore, "", "", null, logItems, callback);
    }

    @Override
    public ListenableFuture<Result> send(String project, String logStore, String topic, String source, LogItem logItem, Callback callback) throws InterruptedException, ProducerException {
        return this.send(project, logStore, topic, source, null, logItem, callback);
    }

    @Override
    public ListenableFuture<Result> send(String project, String logStore, String topic, String source, List<LogItem> logItems, Callback callback) throws InterruptedException, ProducerException {
        return this.send(project, logStore, topic, source, null, logItems, callback);
    }

    @Override
    public ListenableFuture<Result> send(String project, String logStore, String topic, String source, String shardHash, LogItem logItem, Callback callback) throws InterruptedException, ProducerException {
        Utils.assertArgumentNotNull(logItem, "logItem");
        ArrayList<LogItem> logItems = new ArrayList<LogItem>();
        logItems.add(logItem);
        return this.send(project, logStore, topic, source, shardHash, logItems, callback);
    }

    @Override
    public ListenableFuture<Result> send(String project, String logStore, String topic, String source, String shardHash, List<LogItem> logItems, Callback callback) throws InterruptedException, ProducerException {
        Utils.assertArgumentNotNullOrEmpty(project, "project");
        Utils.assertArgumentNotNullOrEmpty(logStore, "logStore");
        if (topic == null) {
            topic = "";
        }
        Utils.assertArgumentNotNull(logItems, "logItems");
        if (logItems.isEmpty()) {
            throw new IllegalArgumentException("logItems cannot be empty");
        }
        int count = logItems.size();
        if (count > 40960) {
            throw new MaxBatchCountExceedException("the log list size is " + count + " which exceeds the MAX_BATCH_COUNT " + 40960);
        }
        if (shardHash != null && this.producerConfig.isAdjustShardHash()) {
            shardHash = this.adjuster.adjust(shardHash);
        }
        return this.accumulator.append(project, logStore, topic, source, shardHash, logItems, callback);
    }

    @Override
    public void close() throws InterruptedException, ProducerException {
        this.close(Long.MAX_VALUE);
    }

    @Override
    public void close(long timeoutMs) throws InterruptedException, ProducerException {
        ProducerException firstException;
        block12: {
            block11: {
                block10: {
                    if (timeoutMs < 0L) {
                        throw new IllegalArgumentException("timeoutMs must be greater than or equal to 0, got " + timeoutMs);
                    }
                    firstException = null;
                    LOGGER.info("Closing the log producer, timeoutMs={}", (Object)timeoutMs);
                    try {
                        timeoutMs = this.closeMover(timeoutMs);
                    }
                    catch (ProducerException e) {
                        firstException = e;
                    }
                    LOGGER.debug("After close mover, timeoutMs={}", (Object)timeoutMs);
                    try {
                        timeoutMs = this.closeIOThreadPool(timeoutMs);
                    }
                    catch (ProducerException e) {
                        if (firstException != null) break block10;
                        firstException = e;
                    }
                }
                LOGGER.debug("After close ioThreadPool, timeoutMs={}", (Object)timeoutMs);
                try {
                    timeoutMs = this.closeSuccessBatchHandler(timeoutMs);
                }
                catch (ProducerException e) {
                    if (firstException != null) break block11;
                    firstException = e;
                }
            }
            LOGGER.debug("After close success batch handler, timeoutMs={}", (Object)timeoutMs);
            try {
                timeoutMs = this.closeFailureBatchHandler(timeoutMs);
            }
            catch (ProducerException e) {
                if (firstException != null) break block12;
                firstException = e;
            }
        }
        LOGGER.debug("After close failure batch handler, timeoutMs={}", (Object)timeoutMs);
        if (firstException != null) {
            throw firstException;
        }
        LOGGER.info("The log producer has been closed");
    }

    private long closeMover(long timeoutMs) throws InterruptedException, ProducerException {
        long startMs = System.currentTimeMillis();
        this.accumulator.close();
        this.retryQueue.close();
        this.mover.close();
        this.mover.join(timeoutMs);
        if (this.mover.isAlive()) {
            LOGGER.warn("The mover thread is still alive");
            throw new ProducerException("the mover thread is still alive");
        }
        long nowMs = System.currentTimeMillis();
        return Math.max(0L, timeoutMs - nowMs + startMs);
    }

    private long closeIOThreadPool(long timeoutMs) throws InterruptedException, ProducerException {
        long startMs = System.currentTimeMillis();
        this.ioThreadPool.shutdown();
        if (!this.ioThreadPool.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS)) {
            LOGGER.warn("The ioThreadPool is not fully terminated");
            throw new ProducerException("the ioThreadPool is not fully terminated");
        }
        LOGGER.debug("The ioThreadPool is terminated");
        long nowMs = System.currentTimeMillis();
        return Math.max(0L, timeoutMs - nowMs + startMs);
    }

    private long closeSuccessBatchHandler(long timeoutMs) throws InterruptedException, ProducerException {
        boolean invokedFromCallback;
        long startMs = System.currentTimeMillis();
        this.successBatchHandler.close();
        boolean bl = invokedFromCallback = Thread.currentThread() == this.successBatchHandler;
        if (invokedFromCallback) {
            LOGGER.warn("Skip join success batch handler since you have incorrectly invoked close from the producer call-back");
            return timeoutMs;
        }
        this.successBatchHandler.join(timeoutMs);
        if (this.successBatchHandler.isAlive()) {
            LOGGER.warn("The success batch handler thread is still alive");
            throw new ProducerException("the success batch handler thread is still alive");
        }
        long nowMs = System.currentTimeMillis();
        return Math.max(0L, timeoutMs - nowMs + startMs);
    }

    private long closeFailureBatchHandler(long timeoutMs) throws InterruptedException, ProducerException {
        boolean invokedFromCallback;
        long startMs = System.currentTimeMillis();
        this.failureBatchHandler.close();
        boolean bl = invokedFromCallback = Thread.currentThread() == this.successBatchHandler || Thread.currentThread() == this.failureBatchHandler;
        if (invokedFromCallback) {
            LOGGER.warn("Skip join failure batch handler since you have incorrectly invoked close from the producer call-back");
            return timeoutMs;
        }
        this.failureBatchHandler.join(timeoutMs);
        if (this.failureBatchHandler.isAlive()) {
            LOGGER.warn("The failure batch handler thread is still alive");
            throw new ProducerException("the failure batch handler thread is still alive");
        }
        long nowMs = System.currentTimeMillis();
        return Math.max(0L, timeoutMs - nowMs + startMs);
    }

    @Override
    public ProducerConfig getProducerConfig() {
        return this.producerConfig;
    }

    @Override
    public int getBatchCount() {
        return this.batchCount.get();
    }

    @Override
    public int availableMemoryInBytes() {
        return this.memoryController.availablePermits();
    }
}

