package com.jcloud.jcq.client.consumer.impl;

import com.jcloud.jcq.client.Exception.ClientException;
import com.jcloud.jcq.client.Exception.ClientExceptionCode;
import com.jcloud.jcq.client.common.QueueRebalanceListener;
import com.jcloud.jcq.client.consumer.ConsumeService;
import com.jcloud.jcq.client.consumer.ConsumerConfig;
import com.jcloud.jcq.client.consumer.MessageListener;
import com.jcloud.jcq.client.consumer.SubscribeConsumer;
import com.jcloud.jcq.common.Pair;
import com.jcloud.jcq.common.filter.FilterExpression;
import com.jcloud.jcq.common.queue.QueueRouteInfo;
import com.jcloud.jcq.common.service.ServiceState;
import com.jcloud.jcq.common.thread.ThreadFactoryImpl;
import com.jcloud.jcq.common.utils.StringUtils;
import com.jcloud.jcq.protocol.Message;
import com.jcloud.jcq.protocol.client.SubscribeTopicRequest;
import com.jcloud.jcq.protocol.client.SubscribeTopicResponse;
import com.jcloud.jcq.protocol.client.UnsubscribeTopicRequest;
import com.jcloud.jcq.protocol.client.UnsubscribeTopicResponse;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/* loaded from: input_file:com/jcloud/jcq/client/consumer/impl/DefaultSubscribeConsumerImpl.class */
public class DefaultSubscribeConsumerImpl extends DefaultConsumerImpl implements SubscribeConsumer {
    private Map<String, FilterExpression> subscribedTopics;
    private ConsumeService consumeService;
    private ScheduledExecutorService connectionCheckScheduledExecutor;
    private ConcurrentMap<String, ConcurrentMap<String, String>> subscribedTopicBrokerMap;
    private ConcurrentMap<String, Set<QueueRouteInfo>> topicSubscribedQueuesMap;
    private ExecutorService executorService;

    public DefaultSubscribeConsumerImpl(String str, String str2, String str3, ConsumerConfig consumerConfig) {
        super(str, str2, str3, consumerConfig);
        this.subscribedTopics = new HashMap();
        this.subscribedTopicBrokerMap = new ConcurrentHashMap();
        this.topicSubscribedQueuesMap = new ConcurrentHashMap();
        this.consumeService = new ConsumeServiceImpl(this);
        this.connectionCheckScheduledExecutor = new ScheduledThreadPoolExecutor(1, new ThreadFactoryImpl("ConnectionCheckScheduleThread_"));
    }

    @Override // com.jcloud.jcq.client.consumer.SubscribeConsumer
    public void subscribeTopic(String str, MessageListener messageListener, FilterExpression filterExpression) throws ClientException {
        if (this.state != ServiceState.CREATE && this.state != ServiceState.RUNNING) {
            throw new ClientException(String.format("consumer is in state[%s], not allowing subscribeTopic operation.", this.state.name()), ClientExceptionCode.OPERATION_NOT_SUPPORTED_BY_CURRENT_CLIENT_STATE.getCode());
        }
        if (StringUtils.isEmpty(str)) {
            throw new ClientException("topic is empty", ClientExceptionCode.INVALID_PARAMETER.getCode());
        }
        if (messageListener == null) {
            throw new ClientException("messageListener is null", ClientExceptionCode.INVALID_PARAMETER.getCode());
        }
        if (this.subscribedTopics.containsKey(str)) {
            return;
        }
        this.subscribedTopics.put(str, filterExpression);
        this.consumeService.registerTopic(str, messageListener);
        if (this.state == ServiceState.CREATE) {
            return;
        }
        privateSubscribeTopic(str, filterExpression, null, null);
    }

    @Override // com.jcloud.jcq.client.consumer.SubscribeConsumer
    public void unsubscribeTopic(String str) throws ClientException {
        if (this.state != ServiceState.CREATE && this.state != ServiceState.RUNNING) {
            throw new ClientException(String.format("consumer is in state[%s], not allowing unsubscribeTopic operation.", this.state.name()), ClientExceptionCode.OPERATION_NOT_SUPPORTED_BY_CURRENT_CLIENT_STATE.getCode());
        }
        if (StringUtils.isEmpty(str)) {
            throw new ClientException("topic is empty", ClientExceptionCode.INVALID_PARAMETER.getCode());
        }
        if (this.state == ServiceState.RUNNING) {
            privateUnsubscribeTopic(str, null, null);
        }
        this.subscribedTopics.remove(str);
        this.queueSelector.removeTopic(str);
        this.consumeService.unregisterTopic(str);
    }

    @Override // com.jcloud.jcq.client.consumer.SubscribeConsumer
    public void receiveMessages(List<Message> list, long j, String str) {
        if (this.state != ServiceState.RUNNING) {
            this.logger.warn("consumer:{} is not running", this.consumerId);
            return;
        }
        if (list == null || list.isEmpty()) {
            this.logger.warn("messages is empty");
            return;
        }
        try {
            tryDecompressMessages(list);
            HashMap hashMap = new HashMap();
            for (Message message : list) {
                List list2 = (List) hashMap.get(message.getTopic());
                if (list2 == null) {
                    list2 = new ArrayList();
                    hashMap.put(message.getTopic(), list2);
                }
                list2.add(message);
            }
            for (Map.Entry entry : hashMap.entrySet()) {
                this.consumeService.submitMessages((String) entry.getKey(), (List) entry.getValue(), j, str);
            }
        } catch (ClientException e) {
            this.logger.warn("got exception:{} when decompressing messages of ackIndex:{}", e, Long.valueOf(j));
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.jcloud.jcq.client.consumer.impl.DefaultConsumerImpl, com.jcloud.jcq.client.common.AbstractClient
    public void doBeforeStart() throws ClientException {
        super.doBeforeStart();
        try {
            for (Map.Entry<String, FilterExpression> entry : this.subscribedTopics.entrySet()) {
                privateSubscribeTopic(entry.getKey(), entry.getValue(), null, null);
            }
            this.connectionCheckScheduledExecutor.scheduleWithFixedDelay(new Runnable() { // from class: com.jcloud.jcq.client.consumer.impl.DefaultSubscribeConsumerImpl.1
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        DefaultSubscribeConsumerImpl.this.checkConnectionToBroker();
                    } catch (Exception e) {
                        DefaultSubscribeConsumerImpl.this.logger.error("consumer:{} got exception:{} when check connection to broker", DefaultSubscribeConsumerImpl.this.consumerId, e);
                    }
                }
            }, 5L, 1L, TimeUnit.SECONDS);
            this.executorService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingDeque(), new ThreadFactoryImpl(getClass().getSimpleName() + "Thread_"));
            this.queueSelector.registerQueueRebalanceListener(new QueueRebalanceListener() { // from class: com.jcloud.jcq.client.consumer.impl.DefaultSubscribeConsumerImpl.2
                @Override // com.jcloud.jcq.client.common.QueueRebalanceListener
                public void queueRebalanced(final Map<String, List<QueueRouteInfo>> map) {
                    DefaultSubscribeConsumerImpl.this.executorService.submit(new Runnable() { // from class: com.jcloud.jcq.client.consumer.impl.DefaultSubscribeConsumerImpl.2.1
                        @Override // java.lang.Runnable
                        public void run() {
                            DefaultSubscribeConsumerImpl.this.applyQueueRebalancedResult(map);
                        }
                    });
                }
            });
            this.consumeService.start();
        } catch (ClientException e) {
            this.logger.error("consumer:{} got exception:{} during subscribe topic", this.consumerId, e);
            for (String str : this.subscribedTopics.keySet()) {
                if (this.subscribedTopicBrokerMap.containsKey(str)) {
                    try {
                        privateUnsubscribeTopic(str, null, null);
                    } catch (ClientException e2) {
                        this.logger.error("consumer:{} got exception:{} during unsubscribe topic:{}", new Object[]{this.consumerId, e, str});
                    }
                }
            }
            throw e;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.jcloud.jcq.client.consumer.impl.DefaultConsumerImpl, com.jcloud.jcq.client.common.AbstractClient
    public void doBeforeShutdown() throws ClientException {
        Iterator it = new ArrayList(this.subscribedTopics.keySet()).iterator();
        while (it.hasNext()) {
            unsubscribeTopic((String) it.next());
        }
        this.consumeService.shutdown();
        this.connectionCheckScheduledExecutor.shutdown();
        this.executorService.shutdown();
        super.doBeforeShutdown();
    }

    private void privateSubscribeTopic(String str, FilterExpression filterExpression, Pair<String, String> pair, List<QueueRouteInfo> list) throws ClientException {
        Map<Pair<String, String>, List<QueueRouteInfo>> hashMap = new HashMap();
        if (pair == null || list == null || list.isEmpty()) {
            List<QueueRouteInfo> queuesByTopic = this.queueSelector.getQueuesByTopic(str);
            if (queuesByTopic == null || queuesByTopic.isEmpty()) {
                this.logger.error("no queues for topic:{}", str);
                if (this.state == ServiceState.CREATE) {
                    throw new ClientException(String.format("subscribe topic failed, cannot find queue for topic:%s", str), ClientExceptionCode.NOT_FOUND_QUEUE.getCode());
                }
                return;
            }
            hashMap = getBrokerAddressQueuesMap(queuesByTopic);
        } else {
            hashMap.put(pair, list);
        }
        SubscribeTopicRequest subscribeTopicRequest = new SubscribeTopicRequest();
        subscribeTopicRequest.setTopic(str);
        subscribeTopicRequest.setConsumerGroupId(this.consumerGroupId);
        subscribeTopicRequest.setConsumerId(this.consumerId);
        subscribeTopicRequest.setConsumeFromWhere(this.consumerConfig.getDefaultConsumePosition());
        subscribeTopicRequest.setMaxPushNums(this.consumerConfig.getRecommendedBatchSizePerPush());
        subscribeTopicRequest.setFilterExpression(filterExpression);
        for (Map.Entry<Pair<String, String>, List<QueueRouteInfo>> entry : hashMap.entrySet()) {
            String object1 = entry.getKey().getObject1();
            String object2 = entry.getKey().getObject2();
            ArrayList arrayList = new ArrayList();
            Iterator<QueueRouteInfo> it = entry.getValue().iterator();
            while (it.hasNext()) {
                arrayList.add(Integer.valueOf(it.next().getQueueId()));
            }
            subscribeTopicRequest.setQueueIds(arrayList);
            SubscribeTopicResponse subscribeTopicResponse = (SubscribeTopicResponse) this.remotingApiWrapper.sync(this, object2, subscribeTopicRequest, SubscribeTopicResponse.class);
            if (subscribeTopicResponse.success()) {
                ConcurrentMap<String, String> concurrentMap = this.subscribedTopicBrokerMap.get(str);
                if (concurrentMap == null) {
                    concurrentMap = new ConcurrentHashMap();
                    this.subscribedTopicBrokerMap.put(str, concurrentMap);
                }
                concurrentMap.put(object1, object2);
                Set<QueueRouteInfo> set = this.topicSubscribedQueuesMap.get(str);
                if (set == null) {
                    set = new CopyOnWriteArraySet();
                    this.topicSubscribedQueuesMap.put(str, set);
                }
                set.addAll(entry.getValue());
                this.logger.info("consumer:{} subscribe topic:{} from broker:{} queues:{} got response:{}", new Object[]{this.consumerId, str, object2, arrayList, subscribeTopicResponse});
            } else {
                this.logger.error("consumer:{} subscribe topic:{} from broker:{} queues:{} got response:{}", new Object[]{this.consumerId, str, object2, arrayList, subscribeTopicResponse});
                if (this.state == ServiceState.CREATE) {
                    throw new ClientException(String.format("subscribe topic:%s from broker:%s failed, response:%s", str, object2, subscribeTopicResponse), ClientExceptionCode.NOT_FOUND_QUEUE.getCode());
                }
            }
        }
    }

    private void privateUnsubscribeTopic(String str, Pair<String, String> pair, List<QueueRouteInfo> list) throws ClientException {
        Map<Pair<String, String>, List<QueueRouteInfo>> hashMap = new HashMap();
        if (pair == null || list == null || list.isEmpty()) {
            List<QueueRouteInfo> queuesByTopic = this.queueSelector.getQueuesByTopic(str);
            if (queuesByTopic == null || queuesByTopic.isEmpty()) {
                this.logger.error("no queues for topic:{}", str);
                if (this.state == ServiceState.CREATE) {
                    throw new ClientException(String.format("subscribe topic failed, cannot find queue for topic:%s", str), ClientExceptionCode.NOT_FOUND_QUEUE.getCode());
                }
                return;
            }
            hashMap = getBrokerAddressQueuesMap(queuesByTopic);
        } else {
            hashMap.put(pair, list);
        }
        UnsubscribeTopicRequest unsubscribeTopicRequest = new UnsubscribeTopicRequest();
        unsubscribeTopicRequest.setTopic(str);
        unsubscribeTopicRequest.setConsumerGroupId(this.consumerGroupId);
        unsubscribeTopicRequest.setConsumerId(this.consumerId);
        ConcurrentMap<String, String> concurrentMap = this.subscribedTopicBrokerMap.get(str);
        Set<QueueRouteInfo> set = this.topicSubscribedQueuesMap.get(str);
        for (Map.Entry<Pair<String, String>, List<QueueRouteInfo>> entry : hashMap.entrySet()) {
            String object1 = entry.getKey().getObject1();
            String object2 = entry.getKey().getObject2();
            ArrayList arrayList = new ArrayList();
            Iterator<QueueRouteInfo> it = entry.getValue().iterator();
            while (it.hasNext()) {
                arrayList.add(Integer.valueOf(it.next().getQueueId()));
            }
            unsubscribeTopicRequest.setQueueIds(arrayList);
            UnsubscribeTopicResponse unsubscribeTopicResponse = (UnsubscribeTopicResponse) this.remotingApiWrapper.sync(this, object2, unsubscribeTopicRequest, UnsubscribeTopicResponse.class);
            if (unsubscribeTopicResponse.success()) {
                this.logger.info("consumer:{}, unsubscribe topic:{} from broker:{} queues:{} got response:{}", new Object[]{this.consumerId, str, object2, arrayList, unsubscribeTopicResponse});
                boolean z = true;
                if (set != null) {
                    set.removeAll(entry.getValue());
                    Iterator<QueueRouteInfo> it2 = set.iterator();
                    while (true) {
                        if (it2.hasNext()) {
                            if (it2.next().getAddress().equals(object2)) {
                                z = false;
                                break;
                            }
                        } else {
                            break;
                        }
                    }
                }
                if (concurrentMap != null && z) {
                    concurrentMap.remove(object1);
                }
            } else {
                this.logger.warn("consumer:{} unsubscribe topic:{} from broker:{} queues:{} got response:{}", new Object[]{this.consumerId, str, object2, arrayList, unsubscribeTopicResponse});
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void checkConnectionToBroker() {
        for (Map.Entry<String, FilterExpression> entry : this.subscribedTopics.entrySet()) {
            String key = entry.getKey();
            FilterExpression value = entry.getValue();
            try {
                List<QueueRouteInfo> queuesByTopic = this.queueSelector.getQueuesByTopic(key);
                if (queuesByTopic == null || queuesByTopic.isEmpty()) {
                    this.subscribedTopicBrokerMap.remove(key);
                } else {
                    Map<Pair<String, String>, List<QueueRouteInfo>> brokerAddressQueuesMap = getBrokerAddressQueuesMap(queuesByTopic);
                    ConcurrentMap<String, String> concurrentMap = this.subscribedTopicBrokerMap.get(key);
                    if (concurrentMap == null) {
                        concurrentMap = new ConcurrentHashMap();
                        this.subscribedTopicBrokerMap.put(key, concurrentMap);
                    }
                    for (Map.Entry<Pair<String, String>, List<QueueRouteInfo>> entry2 : brokerAddressQueuesMap.entrySet()) {
                        String object1 = entry2.getKey().getObject1();
                        String object2 = entry2.getKey().getObject2();
                        if (StringUtils.isEmpty(object1) || StringUtils.isEmpty(object2)) {
                            concurrentMap.remove(object1);
                        } else {
                            List<QueueRouteInfo> value2 = entry2.getValue();
                            if (concurrentMap.get(object1) == null || !concurrentMap.get(object1).equals(object2)) {
                                concurrentMap.remove(object1);
                                privateSubscribeTopic(key, value, entry2.getKey(), value2);
                            } else if (!this.remotingApiWrapper.isConnectionActive(object2)) {
                                concurrentMap.remove(object1);
                                String alternativeAddress = this.queueSelector.getAlternativeAddress(object2);
                                this.logger.info("connect to broker:{} is not active anymore, got alternative address:{}", object2, alternativeAddress);
                                if (alternativeAddress != null) {
                                    privateSubscribeTopic(key, value, new Pair<>(object1, alternativeAddress), value2);
                                }
                            }
                        }
                    }
                }
            } catch (ClientException e) {
                this.logger.warn("consumer:{} got exception:{} when check connection for topic:{}", new Object[]{this.consumerId, e, key});
            }
        }
    }

    private Map<Pair<String, String>, List<QueueRouteInfo>> getBrokerAddressQueuesMap(List<QueueRouteInfo> list) {
        HashMap hashMap = new HashMap();
        for (QueueRouteInfo queueRouteInfo : list) {
            Pair pair = new Pair(queueRouteInfo.getBrokerGroup(), queueRouteInfo.getAddress());
            List list2 = (List) hashMap.get(pair);
            if (list2 == null) {
                list2 = new ArrayList();
                hashMap.put(pair, list2);
            }
            list2.add(queueRouteInfo);
        }
        return hashMap;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void applyQueueRebalancedResult(Map<String, List<QueueRouteInfo>> map) {
        for (Map.Entry<String, List<QueueRouteInfo>> entry : map.entrySet()) {
            String key = entry.getKey();
            List<QueueRouteInfo> value = entry.getValue();
            ArrayList arrayList = new ArrayList();
            if (this.topicSubscribedQueuesMap.get(key) == null) {
                arrayList.addAll(value);
            } else {
                arrayList.addAll(value);
                arrayList.removeAll(this.topicSubscribedQueuesMap.get(key));
            }
            if (!arrayList.isEmpty()) {
                for (Map.Entry<Pair<String, String>, List<QueueRouteInfo>> entry2 : getBrokerAddressQueuesMap(arrayList).entrySet()) {
                    try {
                        privateSubscribeTopic(key, this.subscribedTopics.get(key), entry2.getKey(), entry2.getValue());
                    } catch (ClientException e) {
                        this.logger.warn("consumer:{} got exception:{} when subscribe topic:{} from broker:{} queues:{}", new Object[]{this.consumerId, e, key, entry2.getKey(), entry2.getValue()});
                    }
                }
            }
        }
        for (Map.Entry<String, Set<QueueRouteInfo>> entry3 : this.topicSubscribedQueuesMap.entrySet()) {
            String key2 = entry3.getKey();
            Set<QueueRouteInfo> value2 = entry3.getValue();
            ArrayList arrayList2 = new ArrayList();
            if (map.get(key2) == null) {
                arrayList2.addAll(value2);
            } else {
                arrayList2.addAll(value2);
                arrayList2.removeAll(map.get(key2));
            }
            if (!arrayList2.isEmpty()) {
                for (Map.Entry<Pair<String, String>, List<QueueRouteInfo>> entry4 : getBrokerAddressQueuesMap(arrayList2).entrySet()) {
                    try {
                        privateUnsubscribeTopic(key2, entry4.getKey(), entry4.getValue());
                    } catch (ClientException e2) {
                        this.logger.warn("consumer:{} got exception:{} when unsubscribe topic:{} from broker:{} queues:{}", new Object[]{this.consumerId, e2, key2, entry4.getKey(), entry4.getValue()});
                    }
                }
            }
        }
    }
}
