package com.jcloud.jcq.client.consumer.impl;

import com.jcloud.jcq.client.Exception.ClientException;
import com.jcloud.jcq.client.common.ClientConstants;
import com.jcloud.jcq.client.common.RemotingApiWrapper;
import com.jcloud.jcq.client.consumer.AsyncAckCallback;
import com.jcloud.jcq.client.consumer.ConsumeResult;
import com.jcloud.jcq.client.consumer.ConsumeService;
import com.jcloud.jcq.client.consumer.MessageListener;
import com.jcloud.jcq.client.consumer.SubscribeConsumer;
import com.jcloud.jcq.common.constants.Constants;
import com.jcloud.jcq.common.message.AckAction;
import com.jcloud.jcq.common.thread.ThreadFactoryImpl;
import com.jcloud.jcq.common.utils.StringUtils;
import com.jcloud.jcq.protocol.Message;
import com.jcloud.jcq.protocol.ResponseCode;
import com.jcloud.jcq.protocol.client.AckMessageRequest;
import com.jcloud.jcq.protocol.client.AckMessageResponse;
import java.util.List;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/jcloud/jcq/client/consumer/impl/ConsumeServiceImpl.class */
public class ConsumeServiceImpl implements ConsumeService {
    private static final Logger logger = LoggerFactory.getLogger(ConsumeServiceImpl.class);
    private SubscribeConsumer consumer;
    private BlockingDeque<Runnable> consumeQueue;
    private BlockingDeque<Runnable> ackQueue;
    private ExecutorService consumeExecutor;
    private ExecutorService ackExecutor;
    private ConcurrentMap<String, MessageListener> topicListenerMap = new ConcurrentHashMap();
    private RemotingApiWrapper remotingApiWrapper = RemotingApiWrapper.getInstance();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/jcloud/jcq/client/consumer/impl/ConsumeServiceImpl$AckTask.class */
    public class AckTask implements Runnable {
        private String topic;
        private long ackIndex;
        private AckAction ackAction;
        private String brokerGroupId;

        public AckTask(String str, AckAction ackAction, long j, String str2) {
            this.topic = str;
            this.ackAction = ackAction;
            this.ackIndex = j;
            this.brokerGroupId = str2;
        }

        @Override // java.lang.Runnable
        public void run() {
            AckMessageRequest ackMessageRequest = new AckMessageRequest();
            ackMessageRequest.setTopic(this.topic);
            ackMessageRequest.setAckAction(this.ackAction);
            ackMessageRequest.setAckIndex(this.ackIndex);
            ackMessageRequest.setConsumerGroupId(ConsumeServiceImpl.this.consumer.getConsumerGroupId());
            try {
                final String str = ConsumeServiceImpl.this.consumer.getQueueSelector().getBrokerGroupAddressMap().get(this.brokerGroupId);
                if (StringUtils.isEmpty(str)) {
                    ConsumeServiceImpl.logger.warn("cannot get address of broker group:{} when do ack for topic:{} ackIndex:{} ackAction:{}", new Object[]{this.brokerGroupId, this.topic, Long.valueOf(this.ackIndex), this.ackAction});
                } else {
                    ConsumeServiceImpl.this.remotingApiWrapper.async(ConsumeServiceImpl.this.consumer, str, ackMessageRequest, new AsyncAckCallback() { // from class: com.jcloud.jcq.client.consumer.impl.ConsumeServiceImpl.AckTask.1
                        /* JADX WARN: Can't rename method to resolve collision */
                        @Override // com.jcloud.jcq.client.common.AsyncRequestCallback
                        public void onResponse(AckMessageResponse ackMessageResponse) {
                            ConsumeServiceImpl.logger.info("ackIndex:{}, ackAction:{}, ackMessage to broker:{} done, response:{}", new Object[]{Long.valueOf(AckTask.this.ackIndex), AckTask.this.ackAction.name(), str, ResponseCode.getName(ackMessageResponse.getResponseCode())});
                        }

                        @Override // com.jcloud.jcq.client.consumer.AsyncAckCallback, com.jcloud.jcq.client.common.AsyncRequestCallback
                        public void onException(Throwable th) {
                            ConsumeServiceImpl.logger.warn("got exception {} when ack ackIndex:{}", th, Long.valueOf(AckTask.this.ackIndex));
                        }
                    }, AckMessageResponse.class);
                }
            } catch (ClientException e) {
                ConsumeServiceImpl.logger.warn("got exception {} when ack ackIndex:{}", e, Long.valueOf(this.ackIndex));
            }
        }
    }

    /* loaded from: input_file:com/jcloud/jcq/client/consumer/impl/ConsumeServiceImpl$ConsumeTask.class */
    class ConsumeTask implements Runnable {
        private String topic;
        private List<Message> messages;
        private long ackIndex;
        private String brokerGroupId;
        private int retryTimes;

        public ConsumeTask(ConsumeServiceImpl consumeServiceImpl, String str, List<Message> list, long j, String str2) {
            this(str, list, j, str2, 0);
        }

        public ConsumeTask(String str, List<Message> list, long j, String str2, int i) {
            this.topic = str;
            this.messages = list;
            this.ackIndex = j;
            this.brokerGroupId = str2;
            this.retryTimes = i;
        }

        @Override // java.lang.Runnable
        public void run() {
            ConsumeResult consumeResult;
            AckAction ackAction = AckAction.SUCCESS;
            MessageListener messageListener = (MessageListener) ConsumeServiceImpl.this.topicListenerMap.get(this.topic);
            if (messageListener == null) {
                ConsumeServiceImpl.logger.warn("messageListener is null for topic {}", this.topic);
                return;
            }
            try {
                consumeResult = messageListener.consumeMessages(this.messages);
            } catch (Exception e) {
                ConsumeServiceImpl.logger.warn("got exception {} when consume messages {}", e, this.messages);
                consumeResult = ConsumeResult.FAILED;
            }
            if (consumeResult != ConsumeResult.SUCCESS) {
                ConsumeServiceImpl.logger.warn("consume {} when consume messages:{}, retryTimes:{}", new Object[]{consumeResult.name(), this.messages, Integer.valueOf(this.retryTimes)});
                if (this.retryTimes < ConsumeServiceImpl.this.consumer.getConsumerConfig().getConsumeMaxRetryTimes()) {
                    ConsumeServiceImpl.this.consumeExecutor.submit(new ConsumeTask(this.topic, this.messages, this.ackIndex, this.brokerGroupId, this.retryTimes + 1));
                    return;
                }
                ackAction = AckAction.CONSUME_FAILED;
            }
            ConsumeServiceImpl.this.ackExecutor.submit(new AckTask(this.topic, ackAction, this.ackIndex, this.brokerGroupId));
        }
    }

    public ConsumeServiceImpl(SubscribeConsumer subscribeConsumer) {
        this.consumer = subscribeConsumer;
        this.consumeQueue = new LinkedBlockingDeque(subscribeConsumer.getConsumerConfig().getMessageBufferSize());
        this.ackQueue = new LinkedBlockingDeque(subscribeConsumer.getConsumerConfig().getAckBufferSize());
        this.consumeExecutor = new ThreadPoolExecutor(this.consumer.getConsumerConfig().getConsumePoolCoreSize(), this.consumer.getConsumerConfig().getConsumePoolCoreSize() * 2, Constants.MINUTE, TimeUnit.MILLISECONDS, this.consumeQueue, new ThreadFactoryImpl(ClientConstants.CONSUME_THREAD_NAME));
        this.ackExecutor = new ThreadPoolExecutor(this.consumer.getConsumerConfig().getAckPoolCoreSize(), this.consumer.getConsumerConfig().getAckPoolCoreSize() * 2, Constants.MINUTE, TimeUnit.MILLISECONDS, this.ackQueue, new ThreadFactoryImpl(ClientConstants.ACK_THREAD_NAME));
    }

    @Override // com.jcloud.jcq.client.consumer.ConsumeService
    public void registerTopic(String str, MessageListener messageListener) {
        this.topicListenerMap.put(str, messageListener);
    }

    @Override // com.jcloud.jcq.client.consumer.ConsumeService
    public void unregisterTopic(String str) {
        this.topicListenerMap.remove(str);
    }

    @Override // com.jcloud.jcq.client.consumer.ConsumeService
    public void submitMessages(String str, List<Message> list, long j, String str2) {
        try {
            this.consumeExecutor.submit(new ConsumeTask(this, str, list, j, str2));
        } catch (RejectedExecutionException e) {
            logger.warn("rejected by consume executor when submitting consume task for messages:{}, possibly message buffer is full.", list);
        }
    }

    @Override // com.jcloud.jcq.client.consumer.ConsumeService
    public void start() {
    }

    @Override // com.jcloud.jcq.client.consumer.ConsumeService
    public void shutdown() {
        this.consumeExecutor.shutdown();
        this.ackExecutor.shutdown();
    }
}
