/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.openservices.log.producer.inner;

import com.aliyun.openservices.log.common.LogContent;
import com.aliyun.openservices.log.common.LogItem;
import com.aliyun.openservices.log.producer.ILogCallback;
import com.aliyun.openservices.log.producer.ProducerConfig;
import com.aliyun.openservices.log.producer.inner.ClientPool;
import com.aliyun.openservices.log.producer.inner.ControlThreadPool;
import com.aliyun.openservices.log.producer.inner.IOThread;
import com.aliyun.openservices.log.producer.inner.PackageData;
import com.aliyun.openservices.log.producer.inner.PackageMeta;
import com.aliyun.openservices.log.producer.inner.ShardHashManager;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PackageManager {
    private static final Logger LOGGER = LoggerFactory.getLogger(PackageManager.class);
    private ReadWriteLock metaRWLock = new ReentrantReadWriteLock();
    private HashMap<String, PackageMeta> metaMap = new HashMap();
    private ConcurrentHashMap<String, PackageData> dataMap = new ConcurrentHashMap();
    private ProducerConfig config;
    private Semaphore semaphore;
    private IOThread ioThread;
    private ControlThreadPool controlThreadPool;
    private ShardHashManager shardHashManager;

    public PackageManager(ProducerConfig config, ClientPool pool) {
        this.config = config;
        this.semaphore = new Semaphore(config.memPoolSizeInByte);
        this.ioThread = IOThread.launch(pool, this, config);
        this.shardHashManager = new ShardHashManager(pool, config);
        this.controlThreadPool = ControlThreadPool.launch(this.shardHashManager, this, config);
    }

    private static int LogItemListBytes(List<LogItem> logItems) {
        int b = 0;
        for (LogItem it : logItems) {
            b += 4;
            for (LogContent con : it.GetLogContents()) {
                b += con.mKey.length() + con.mValue.length();
            }
        }
        return b;
    }

    void acquireBytes(int b) {
        this.semaphore.acquireUninterruptibly(b);
    }

    void releaseBytes(int b) {
        this.semaphore.release(b);
    }

    public int availablePermits() {
        return this.semaphore.availablePermits();
    }

    void filterTimeoutPackage() {
        ArrayList<String> timeoutList = new ArrayList<String>();
        this.metaRWLock.writeLock().lock();
        for (Map.Entry<String, PackageMeta> entry : this.metaMap.entrySet()) {
            PackageMeta meta = entry.getValue();
            meta.lock.lock();
            long currTime = System.currentTimeMillis();
            if (currTime - meta.arriveTimeInMS >= (long)this.config.packageTimeoutInMS) {
                PackageData data = this.dataMap.remove(entry.getKey());
                if (meta.logLinesCount > 0) {
                    this.ioThread.addPackage(data, meta.packageBytes);
                }
                timeoutList.add(entry.getKey());
            }
            meta.lock.unlock();
        }
        for (String key : timeoutList) {
            this.metaMap.remove(key);
        }
        this.metaRWLock.writeLock().unlock();
    }

    public void flush() {
        LOGGER.debug("Try to flush PackageManager.");
        ArrayList<String> timeoutList = new ArrayList<String>();
        this.metaRWLock.writeLock().lock();
        for (Map.Entry<String, PackageMeta> entry : this.metaMap.entrySet()) {
            PackageMeta meta = entry.getValue();
            meta.lock.lock();
            PackageData data = this.dataMap.remove(entry.getKey());
            this.ioThread.addPackage(data, meta.packageBytes);
            meta.lock.unlock();
            timeoutList.add(entry.getKey());
        }
        for (String key : timeoutList) {
            this.metaMap.remove(key);
        }
        this.metaRWLock.writeLock().unlock();
    }

    public void close() {
        LOGGER.info("Try to close PackageManager.");
        this.controlThreadPool.shutdown();
        this.ioThread.shutdown();
    }

    public void closeNow() {
        LOGGER.info("Try to close PackageManager immediately.");
        this.controlThreadPool.shutdownNow();
        this.ioThread.shutdownNow();
    }

    public void add(String project, String logStore, String topic, String shardHash, String source, List<LogItem> logItems, ILogCallback callback) {
        if (callback != null) {
            callback.callSendBeginTimeInMillis = System.currentTimeMillis();
        }
        if (shardHash != null) {
            shardHash = this.shardHashManager.getBeginHash(project, logStore, shardHash);
        }
        String key = project + "|" + logStore + "|" + topic + "|" + shardHash + "|" + source;
        int linesCount = logItems.size();
        int logBytes = PackageManager.LogItemListBytes(logItems);
        this.acquireBytes(logBytes);
        this.metaRWLock.readLock().lock();
        PackageMeta meta = this.metaMap.get(key);
        if (meta == null) {
            this.metaRWLock.readLock().unlock();
            this.metaRWLock.writeLock().lock();
            meta = this.metaMap.get(key);
            if (meta == null) {
                meta = new PackageMeta(0, 0);
                this.metaMap.put(key, meta);
            }
            meta.lock.lock();
            this.metaRWLock.writeLock().unlock();
        } else {
            meta.lock.lock();
            this.metaRWLock.readLock().unlock();
        }
        PackageData data = this.dataMap.get(key);
        if (meta.logLinesCount > 0 && (meta.logLinesCount + linesCount >= this.config.logsCountPerPackage || meta.packageBytes + logBytes >= this.config.logsBytesPerPackage || System.currentTimeMillis() - meta.arriveTimeInMS >= (long)this.config.packageTimeoutInMS)) {
            this.ioThread.addPackage(data, meta.packageBytes);
            this.dataMap.remove(key);
            data = null;
            meta.clear();
        }
        if (data == null) {
            data = new PackageData(project, logStore, topic, shardHash, source);
            this.dataMap.put(key, data);
        }
        data.addItems(logItems, callback);
        meta.logLinesCount += linesCount;
        meta.packageBytes += logBytes;
        meta.lock.unlock();
        if (callback != null) {
            callback.callSendEndTimeInMillis = System.currentTimeMillis();
        }
    }
}

