/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.ons.open.trace.core.dispatch.impl;

import com.alibaba.ons.open.trace.core.common.OnsTraceContext;
import com.alibaba.ons.open.trace.core.common.OnsTraceDataEncoder;
import com.alibaba.ons.open.trace.core.common.OnsTraceTransferBean;
import com.alibaba.ons.open.trace.core.dispatch.AsyncDispatcher;
import com.alibaba.ons.open.trace.core.hook.ClientRPCHook;
import com.aliyun.openservices.ons.api.impl.authority.SessionCredentials;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.exception.MQClientException;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.log.ClientLogger;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.producer.SendCallback;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.producer.SendResult;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.ThreadFactoryImpl;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.message.Message;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.namesrv.TopAddressing;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;

public class AsyncArrayDispatcher
implements AsyncDispatcher {
    private static final Logger clientlog = ClientLogger.getLog();
    private final int queueSize;
    private final int batchSize;
    private final DefaultMQProducer traceProducer;
    private final ThreadPoolExecutor traceExecuter;
    private AtomicLong discardCount;
    private Thread worker;
    private ArrayBlockingQueue<OnsTraceContext> traceContextQueue;
    private ArrayBlockingQueue<Runnable> appenderQueue;
    private volatile Thread shutDownHook;
    private volatile boolean stopped = false;

    public AsyncArrayDispatcher(Properties properties) throws MQClientException {
        int queueSize = Integer.parseInt(properties.getProperty("AsyncBufferSize", "2048"));
        this.queueSize = queueSize = 1 << 32 - Integer.numberOfLeadingZeros(queueSize - 1);
        this.batchSize = Integer.parseInt(properties.getProperty("MaxBatchNum", "1"));
        this.discardCount = new AtomicLong(0L);
        this.traceContextQueue = new ArrayBlockingQueue(1024);
        this.appenderQueue = new ArrayBlockingQueue(queueSize);
        this.traceExecuter = new ThreadPoolExecutor(10, 20, 60000L, TimeUnit.MILLISECONDS, this.appenderQueue, new ThreadFactoryImpl("MQTraceSendThread_"));
        SessionCredentials sessionCredentials = new SessionCredentials();
        Properties sessionProperties = new Properties();
        String accessKey = properties.getProperty("AccessKey");
        String secretKey = properties.getProperty("SecretKey");
        sessionProperties.put("AccessKey", accessKey);
        sessionProperties.put("SecretKey", secretKey);
        sessionCredentials.updateContent(sessionProperties);
        this.traceProducer = new DefaultMQProducer(new ClientRPCHook(sessionCredentials));
        this.traceProducer.setProducerGroup(accessKey + "_INNER_TRACE_PRODUCER");
        this.traceProducer.setSendMsgTimeout(5000);
        this.traceProducer.setInstanceName(properties.getProperty("InstanceName", String.valueOf(System.currentTimeMillis())));
        String nameSrv = properties.getProperty("NAMESRV_ADDR");
        if (nameSrv == null) {
            TopAddressing topAddressing = new TopAddressing(properties.getProperty("ADDRSRV_URL"));
            nameSrv = topAddressing.fetchNSAddr();
        }
        this.traceProducer.setNamesrvAddr(nameSrv);
        this.traceProducer.setVipChannelEnabled(false);
        int maxSize = Integer.parseInt(properties.getProperty("MaxMsgSize", "128000"));
        this.traceProducer.setMaxMessageSize(maxSize - 10000);
        this.traceProducer.start();
    }

    @Override
    public void start(String workName) {
        this.worker = new Thread((Runnable)new AsyncRunnable(), "MQ-AsyncArrayDispatcher-Thread-" + workName);
        this.worker.setDaemon(true);
        this.worker.start();
        this.registerShutDownHook();
    }

    @Override
    public boolean append(Object ctx) {
        boolean result = this.traceContextQueue.offer((OnsTraceContext)ctx);
        if (!result) {
            clientlog.info("buffer full" + this.discardCount.incrementAndGet() + " ,context is " + ctx);
        }
        return result;
    }

    @Override
    public void flush() throws IOException {
        long end = System.currentTimeMillis() + 500L;
        while (this.traceContextQueue.size() > 0 || this.appenderQueue.size() > 0 && System.currentTimeMillis() <= end) {
            try {
                Thread.sleep(1L);
            }
            catch (InterruptedException e) {
                // empty catch block
                break;
            }
        }
        clientlog.info("------end trace send " + this.traceContextQueue.size() + "   " + this.appenderQueue.size());
    }

    @Override
    public void shutdown() {
        this.stopped = true;
        this.traceExecuter.shutdown();
        if (null != this.traceProducer) {
            this.traceProducer.shutdown();
        }
        this.removeShutdownHook();
    }

    public void registerShutDownHook() {
        if (this.shutDownHook == null) {
            this.shutDownHook = new Thread(new Runnable(){
                private volatile boolean hasShutdown = false;

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void run() {
                    1 var1_1 = this;
                    synchronized (var1_1) {
                        if (!this.hasShutdown) {
                            try {
                                AsyncArrayDispatcher.this.flush();
                            }
                            catch (IOException e) {
                                clientlog.error("system mqtrace hook shutdown failed ,maybe loss some trace data");
                            }
                        }
                    }
                }
            }, "ShutdownHookMQTrace");
            Runtime.getRuntime().addShutdownHook(this.shutDownHook);
        }
    }

    public void removeShutdownHook() {
        if (this.shutDownHook != null) {
            Runtime.getRuntime().removeShutdownHook(this.shutDownHook);
        }
    }

    class AsyncAppenderRequest
    implements Runnable {
        List<OnsTraceContext> contextList;

        public AsyncAppenderRequest(List<OnsTraceContext> contextList) {
            this.contextList = contextList != null ? contextList : new ArrayList<OnsTraceContext>(1);
        }

        @Override
        public void run() {
            this.sendTraceData(this.contextList);
        }

        public void sendTraceData(List<OnsTraceContext> contextList) {
            ArrayList<OnsTraceTransferBean> transBeanList = new ArrayList<OnsTraceTransferBean>();
            String currentRegionId = "DefaultRegion";
            for (OnsTraceContext context : contextList) {
                currentRegionId = context.getRegionId();
                OnsTraceTransferBean traceData = OnsTraceDataEncoder.encoderFromContextBean(context);
                transBeanList.add(traceData);
            }
            this.flushData(transBeanList, currentRegionId);
        }

        private void flushData(List<OnsTraceTransferBean> transBeanList, String currentRegionId) {
            if (transBeanList.size() == 0) {
                return;
            }
            StringBuilder buffer = new StringBuilder(1024);
            int count = 0;
            HashSet<String> keySet = new HashSet<String>();
            for (OnsTraceTransferBean bean : transBeanList) {
                keySet.addAll(bean.getTransKey());
                buffer.append(bean.getTransData());
                ++count;
                if (buffer.length() < AsyncArrayDispatcher.this.traceProducer.getMaxMessageSize()) continue;
                this.sendTraceDataByMQ(keySet, buffer.toString(), currentRegionId);
                buffer.delete(0, buffer.length());
                keySet.clear();
                count = 0;
            }
            if (count > 0) {
                this.sendTraceDataByMQ(keySet, buffer.toString(), currentRegionId);
            }
            transBeanList.clear();
        }

        private void sendTraceDataByMQ(Set<String> keySet, final String data, String currentRegionId) {
            String topic = "rmq_sys_TRACE_DATA_" + currentRegionId;
            Message message = new Message(topic, data.getBytes());
            message.setKeys(keySet);
            try {
                AsyncArrayDispatcher.this.traceProducer.send(message, new SendCallback(){

                    @Override
                    public void onSuccess(SendResult sendResult) {
                    }

                    @Override
                    public void onException(Throwable e) {
                        clientlog.info("send trace data ,the traceData is " + data);
                    }
                }, 5000L);
            }
            catch (Exception e) {
                clientlog.info("send trace data,the traceData is" + data);
            }
        }
    }

    class AsyncRunnable
    implements Runnable {
        private boolean stopped;

        AsyncRunnable() {
        }

        @Override
        public void run() {
            while (!this.stopped) {
                ArrayList<OnsTraceContext> contexts = new ArrayList<OnsTraceContext>(AsyncArrayDispatcher.this.batchSize);
                for (int i = 0; i < AsyncArrayDispatcher.this.batchSize; ++i) {
                    OnsTraceContext context = null;
                    try {
                        context = (OnsTraceContext)AsyncArrayDispatcher.this.traceContextQueue.poll(5L, TimeUnit.MILLISECONDS);
                    }
                    catch (InterruptedException e) {
                        // empty catch block
                    }
                    if (context == null) break;
                    contexts.add(context);
                }
                if (contexts.size() > 0) {
                    AsyncAppenderRequest request = new AsyncAppenderRequest(contexts);
                    AsyncArrayDispatcher.this.traceExecuter.submit(request);
                    continue;
                }
                if (!AsyncArrayDispatcher.this.stopped) continue;
                this.stopped = true;
            }
        }
    }
}

