package com.jcloud.jcq.client.trace;

import com.jcloud.jcq.client.Exception.ClientException;
import com.jcloud.jcq.client.common.ClientInstance;
import com.jcloud.jcq.client.common.QueueSelectStrategy;
import com.jcloud.jcq.client.common.impl.HashQueueSelectTrategy;
import com.jcloud.jcq.client.producer.AsyncSendCallback;
import com.jcloud.jcq.client.producer.Producer;
import com.jcloud.jcq.client.producer.ProducerConfig;
import com.jcloud.jcq.client.producer.ProducerFactory;
import com.jcloud.jcq.common.constants.Constants;
import com.jcloud.jcq.common.constants.NormalConstants;
import com.jcloud.jcq.common.queue.QueueRouteInfo;
import com.jcloud.jcq.common.thread.ThreadFactoryImpl;
import com.jcloud.jcq.common.trace.TracePoint;
import com.jcloud.jcq.common.trace.TraceStringHelper;
import com.jcloud.jcq.common.utils.StringUtils;
import com.jcloud.jcq.common.utils.SystemUtils;
import com.jcloud.jcq.protocol.Message;
import com.jcloud.jcq.protocol.client.SendMessageResponse;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/jcloud/jcq/client/trace/DefaultTraceDispatcherImpl.class */
public class DefaultTraceDispatcherImpl implements TraceDispatcher {
    private static final Logger logger = LoggerFactory.getLogger(DefaultTraceDispatcherImpl.class.getName());
    private String traceTopic;
    private String tenantId;
    private Producer traceProducer;
    private ClientInstance hostClient;
    private AtomicBoolean isRunning = new AtomicBoolean(false);
    private final int batchSize = 32;
    private BlockingQueue<TracePoint> traceQueue = new ArrayBlockingQueue(2048);
    private Map<String, List<TracePoint>> consumerTraceMap = new HashMap();
    private BlockingQueue<Runnable> sendTraceThreadPoolQueue = new LinkedBlockingQueue(NormalConstants.K);
    private QueueSelectStrategy queueSelectStrategy = new HashQueueSelectTrategy();
    private ExecutorService sendTraceExecutor = new ThreadPoolExecutor(2, 2, Constants.MINUTE, TimeUnit.MILLISECONDS, this.sendTraceThreadPoolQueue, new ThreadFactoryImpl("TraceProduceThread_"));
    private ExecutorService fetchTraceExecutor = new ThreadPoolExecutor(1, 1, Long.MAX_VALUE, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), new ThreadFactoryImpl("FetchTraceThread_"));
    private String hostClientIp = SystemUtils.getLocalIpAddress();

    /* loaded from: input_file:com/jcloud/jcq/client/trace/DefaultTraceDispatcherImpl$FetchTraceTask.class */
    class FetchTraceTask implements Runnable {
        FetchTraceTask() {
        }

        @Override // java.lang.Runnable
        public void run() {
            while (DefaultTraceDispatcherImpl.this.isRunning.get()) {
                ArrayList arrayList = new ArrayList();
                for (int i = 0; i < DefaultTraceDispatcherImpl.this.batchSize; i++) {
                    TracePoint tracePoint = null;
                    try {
                        tracePoint = (TracePoint) DefaultTraceDispatcherImpl.this.traceQueue.poll(5L, TimeUnit.MILLISECONDS);
                    } catch (InterruptedException e) {
                        DefaultTraceDispatcherImpl.logger.warn("Exception happened when poll TracePoint from traceQueue.", e);
                    }
                    if (tracePoint == null) {
                        break;
                    }
                    arrayList.add(tracePoint);
                }
                if (!arrayList.isEmpty()) {
                    DefaultTraceDispatcherImpl.this.sendTraceExecutor.submit(new SendTraceMessageTask(arrayList));
                }
            }
        }
    }

    /* loaded from: input_file:com/jcloud/jcq/client/trace/DefaultTraceDispatcherImpl$SendTraceMessageTask.class */
    class SendTraceMessageTask implements Runnable {
        List<TracePoint> tracePoints;

        public SendTraceMessageTask(List<TracePoint> list) {
            this.tracePoints = list;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                sendTraceMessage();
            } catch (ClientException e) {
                DefaultTraceDispatcherImpl.logger.warn("Exception happened when send trace message.", e);
            }
        }

        private void sendTraceMessage() throws ClientException {
            ArrayList arrayList = new ArrayList();
            QueueRouteInfo selectQueue = DefaultTraceDispatcherImpl.this.queueSelectStrategy.selectQueue(DefaultTraceDispatcherImpl.this.hostClient.getInstanceId(), DefaultTraceDispatcherImpl.this.traceProducer.getQueueSelector().getTraceQueueRoutes());
            if (selectQueue == null) {
                DefaultTraceDispatcherImpl.logger.warn("Failed to get Trace queue in cluster.");
                return;
            }
            int queueId = selectQueue.getQueueId();
            for (TracePoint tracePoint : this.tracePoints) {
                tracePoint.setTenantId(DefaultTraceDispatcherImpl.this.tenantId);
                String traceStringFromTracePoint = TraceStringHelper.getTraceStringFromTracePoint(tracePoint);
                Message message = new Message();
                message.setTopic(DefaultTraceDispatcherImpl.this.traceTopic);
                message.setMessageId(tracePoint.getMessageId());
                message.setBody(traceStringFromTracePoint.getBytes(Charset.forName("UTF-8")));
                message.setQueueId(queueId);
                arrayList.add(message);
            }
            DefaultTraceDispatcherImpl.this.traceProducer.sendMessageAsync(arrayList, new AsyncSendCallback() { // from class: com.jcloud.jcq.client.trace.DefaultTraceDispatcherImpl.SendTraceMessageTask.1
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // com.jcloud.jcq.client.common.AsyncRequestCallback
                public void onResponse(SendMessageResponse sendMessageResponse) {
                    DefaultTraceDispatcherImpl.logger.debug("Got Send trace messages response: {}.", sendMessageResponse);
                }

                @Override // com.jcloud.jcq.client.producer.AsyncSendCallback, com.jcloud.jcq.client.common.AsyncRequestCallback
                public void onException(Throwable th) {
                    DefaultTraceDispatcherImpl.logger.warn("Exception happened when async send message.", th);
                }
            }, 0L, queueId);
        }
    }

    public DefaultTraceDispatcherImpl(ClientInstance clientInstance) {
        this.hostClient = clientInstance;
        ProducerConfig producerConfig = new ProducerConfig();
        producerConfig.setMaxMsgNumsPerBatch(this.batchSize);
        producerConfig.setEnableCompress(true);
        producerConfig.setForTrace(true);
        producerConfig.setMetaServerAddress(this.hostClient.getClientConfig().getMetaServerAddress());
        try {
            this.traceProducer = ProducerFactory.getInstance().createProducer(clientInstance.getAccessKey(), clientInstance.getSecretKey(), producerConfig);
            this.traceProducer.getClientConfig().setToken(clientInstance.getClientConfig().getToken());
        } catch (ClientException e) {
            logger.warn("failed to create trace producer.");
        }
    }

    @Override // com.jcloud.jcq.client.trace.TraceDispatcher
    public void start() {
        if (!this.isRunning.compareAndSet(false, true)) {
            logger.warn("trace dispatcher already started.");
            return;
        }
        try {
            this.traceProducer.start();
        } catch (ClientException e) {
            logger.warn("failed to start trace producer.");
        }
        this.traceTopic = this.traceProducer.getQueueSelector().getTraceTopic();
        this.tenantId = this.traceProducer.getQueueSelector().getTenantId();
        this.fetchTraceExecutor.submit(new FetchTraceTask());
    }

    @Override // com.jcloud.jcq.client.trace.TraceDispatcher
    public boolean append(List<TracePoint> list) {
        boolean z = true;
        if (list == null || list.isEmpty()) {
            return true;
        }
        for (TracePoint tracePoint : list) {
            tracePoint.setClientAddress(this.hostClientIp);
            z &= this.traceQueue.offer(tracePoint);
        }
        if (z) {
            logger.debug("TracePoints: {} appended.", list);
        } else {
            logger.debug("TracePoints: {} append failed.", list);
        }
        return z;
    }

    @Override // com.jcloud.jcq.client.trace.TraceDispatcher
    public boolean append(List<TracePoint> list, String str, long j) {
        if (!append(list)) {
            return false;
        }
        this.consumerTraceMap.put(StringUtils.join(str, String.valueOf(j)), list);
        logger.debug("TracePoints: {}, BrokerAddress: {}, AckIndex: {} appended.", new Object[]{list, str, Long.valueOf(j)});
        return true;
    }

    @Override // com.jcloud.jcq.client.trace.TraceDispatcher
    public List<TracePoint> getAndRemoveTracePoints(String str, long j) {
        return this.consumerTraceMap.remove(StringUtils.join(str, String.valueOf(j)));
    }

    @Override // com.jcloud.jcq.client.trace.TraceDispatcher
    public void waitForFlush() {
        long currentTimeMillis = System.currentTimeMillis() + 1000;
        while (true) {
            if ((this.traceQueue.isEmpty() && this.sendTraceThreadPoolQueue.isEmpty()) || System.currentTimeMillis() >= currentTimeMillis) {
                break;
            } else {
                try {
                    Thread.sleep(5L);
                } catch (InterruptedException e) {
                }
            }
        }
        logger.info("Trace queue flush done. Left trace: {}, task: {}", Integer.valueOf(this.traceQueue.size()), Integer.valueOf(this.sendTraceThreadPoolQueue.size()));
    }

    @Override // com.jcloud.jcq.client.trace.TraceDispatcher
    public void shutdown() {
        waitForFlush();
        try {
            this.traceProducer.shutdown();
        } catch (ClientException e) {
            logger.warn("ClientException happened when shutdown trace producer.");
        }
        this.fetchTraceExecutor.shutdown();
        this.sendTraceExecutor.shutdown();
    }
}
