/*
 * Decompiled with CFR 0.152.
 */
package com.jcloud.jcq.client.consumer.impl;

import com.jcloud.jcq.client.Exception.ClientException;
import com.jcloud.jcq.client.Exception.ClientExceptionCode;
import com.jcloud.jcq.client.common.QueueRebalanceListener;
import com.jcloud.jcq.client.consumer.ConsumeService;
import com.jcloud.jcq.client.consumer.ConsumerConfig;
import com.jcloud.jcq.client.consumer.MessageListener;
import com.jcloud.jcq.client.consumer.SubscribeConsumer;
import com.jcloud.jcq.client.consumer.impl.ConsumeServiceImpl;
import com.jcloud.jcq.client.consumer.impl.DefaultConsumerImpl;
import com.jcloud.jcq.common.Pair;
import com.jcloud.jcq.common.filter.FilterExpression;
import com.jcloud.jcq.common.queue.QueueRouteInfo;
import com.jcloud.jcq.common.service.ServiceState;
import com.jcloud.jcq.common.thread.ThreadFactoryImpl;
import com.jcloud.jcq.common.trace.TracePoint;
import com.jcloud.jcq.common.utils.StringUtils;
import com.jcloud.jcq.communication.core.ChannelWrapper;
import com.jcloud.jcq.protocol.Message;
import com.jcloud.jcq.protocol.client.SubscribeTopicRequest;
import com.jcloud.jcq.protocol.client.SubscribeTopicResponse;
import com.jcloud.jcq.protocol.client.UnsubscribeTopicRequest;
import com.jcloud.jcq.protocol.client.UnsubscribeTopicResponse;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class DefaultSubscribeConsumerImpl
extends DefaultConsumerImpl
implements SubscribeConsumer {
    private Map<String, FilterExpression> subscribedTopics = new HashMap<String, FilterExpression>();
    private ConsumeService consumeService;
    private ScheduledExecutorService connectionCheckScheduledExecutor;
    private ConcurrentMap<String, ConcurrentMap<String, ChannelWrapper>> subscribedTopicBrokerMap = new ConcurrentHashMap<String, ConcurrentMap<String, ChannelWrapper>>();
    private ConcurrentMap<String, Set<QueueRouteInfo>> topicSubscribedQueuesMap = new ConcurrentHashMap<String, Set<QueueRouteInfo>>();
    private ExecutorService executorService;

    public DefaultSubscribeConsumerImpl(String accessKey, String secretKey, String consumerId, ConsumerConfig consumerConfig) {
        super(accessKey, secretKey, consumerId, consumerConfig);
        this.consumeService = new ConsumeServiceImpl(this);
        this.connectionCheckScheduledExecutor = new ScheduledThreadPoolExecutor(1, new ThreadFactoryImpl("ConnectionCheckScheduleThread_"));
    }

    @Override
    public void subscribeTopic(String topic, MessageListener messageListener, FilterExpression filterExpression) throws ClientException {
        if (this.state != ServiceState.CREATE && this.state != ServiceState.RUNNING) {
            throw new ClientException(String.format("consumer is in state[%s], not allowing subscribeTopic operation.", this.state.name()), ClientExceptionCode.OPERATION_NOT_SUPPORTED_BY_CURRENT_CLIENT_STATE.getCode());
        }
        if (StringUtils.isEmpty(topic)) {
            throw new ClientException("topic is empty", ClientExceptionCode.INVALID_PARAMETER.getCode());
        }
        if (messageListener == null) {
            throw new ClientException("messageListener is null", ClientExceptionCode.INVALID_PARAMETER.getCode());
        }
        if (this.subscribedTopics.containsKey(topic)) {
            return;
        }
        this.subscribedTopics.put(topic, filterExpression);
        this.consumeService.registerTopic(topic, messageListener);
        if (this.state == ServiceState.CREATE) {
            return;
        }
        this.privateSubscribeTopic(topic, filterExpression, null, null);
    }

    @Override
    public void unsubscribeTopic(String topic) throws ClientException {
        if (this.state != ServiceState.CREATE && this.state != ServiceState.RUNNING) {
            throw new ClientException(String.format("consumer is in state[%s], not allowing unsubscribeTopic operation.", this.state.name()), ClientExceptionCode.OPERATION_NOT_SUPPORTED_BY_CURRENT_CLIENT_STATE.getCode());
        }
        if (StringUtils.isEmpty(topic)) {
            throw new ClientException("topic is empty", ClientExceptionCode.INVALID_PARAMETER.getCode());
        }
        if (this.state == ServiceState.RUNNING) {
            this.privateUnsubscribeTopic(topic, null, null);
        }
        this.subscribedTopics.remove(topic);
        this.queueSelector.removeTopic(topic);
        this.consumeService.unregisterTopic(topic);
    }

    @Override
    public void receiveMessages(List<Message> messages, long ackIndex, String brokerGroupId) {
        if (messages == null || messages.isEmpty()) {
            this.logger.warn("messages is empty");
            return;
        }
        try {
            this.tryDecompressMessages(messages);
        }
        catch (ClientException e) {
            this.logger.warn("got exception:{} when decompressing messages of ackIndex:{}", (Object)e, (Object)ackIndex);
            return;
        }
        if (this.isMessageTraceOn()) {
            List<TracePoint> tracePoints = this.getTracePoints(messages);
            this.completeAndAppendBeforeConsumeTracePoints(tracePoints, (String)this.queueSelector.getBrokerGroupAddressMap().get(brokerGroupId), null, ackIndex);
        }
        HashMap<String, ArrayList<Message>> topicMessagesMap = new HashMap<String, ArrayList<Message>>();
        for (Message message : messages) {
            ArrayList<Message> msgs = (ArrayList<Message>)topicMessagesMap.get(message.getTopic());
            if (msgs == null) {
                msgs = new ArrayList<Message>();
                topicMessagesMap.put(message.getTopic(), msgs);
            }
            msgs.add(message);
        }
        for (Map.Entry entry : topicMessagesMap.entrySet()) {
            this.consumeService.submitMessages((String)entry.getKey(), (List)entry.getValue(), ackIndex, brokerGroupId);
        }
    }

    @Override
    protected void doBeforeStart() throws ClientException {
        super.doBeforeStart();
        try {
            for (Map.Entry<String, FilterExpression> entry : this.subscribedTopics.entrySet()) {
                this.privateSubscribeTopic(entry.getKey(), entry.getValue(), null, null);
            }
        }
        catch (ClientException e) {
            this.logger.error("consumer:{} got exception:{} during subscribe topic", (Object)this.consumerId, (Object)e);
            for (String topic : this.subscribedTopics.keySet()) {
                if (!this.subscribedTopicBrokerMap.containsKey(topic)) continue;
                try {
                    this.privateUnsubscribeTopic(topic, null, null);
                }
                catch (ClientException e1) {
                    this.logger.error("consumer:{} got exception:{} during unsubscribe topic:{}", new Object[]{this.consumerId, e, topic});
                }
            }
            throw e;
        }
        this.connectionCheckScheduledExecutor.scheduleWithFixedDelay(new Runnable(){

            @Override
            public void run() {
                try {
                    DefaultSubscribeConsumerImpl.this.checkConnectionToBroker();
                }
                catch (Exception e) {
                    DefaultSubscribeConsumerImpl.this.logger.error("consumer:{} got exception:{} when check connection to broker", (Object)DefaultSubscribeConsumerImpl.this.consumerId, (Object)e);
                }
            }
        }, 5L, 1L, TimeUnit.SECONDS);
        this.executorService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<Runnable>(), new ThreadFactoryImpl(this.getClass().getSimpleName() + "Thread_"));
        this.queueSelector.registerQueueRebalanceListener(new QueueRebalanceListener(){

            @Override
            public void queueRebalanced(final Map<String, List<QueueRouteInfo>> topicQueuesMap) {
                DefaultSubscribeConsumerImpl.this.executorService.submit(new Runnable(){

                    @Override
                    public void run() {
                        DefaultSubscribeConsumerImpl.this.applyQueueRebalancedResult(topicQueuesMap);
                    }
                });
            }
        });
        this.consumeService.start();
    }

    @Override
    protected void doBeforeShutdown() throws ClientException {
        ArrayList<String> topics = new ArrayList<String>(this.subscribedTopics.keySet());
        for (String topic : topics) {
            this.unsubscribeTopic(topic);
        }
        this.consumeService.shutdown();
        this.connectionCheckScheduledExecutor.shutdown();
        this.executorService.shutdown();
        super.doBeforeShutdown();
    }

    public ConcurrentMap<String, ConcurrentMap<String, ChannelWrapper>> getSubscribedTopicBrokerMap() {
        return this.subscribedTopicBrokerMap;
    }

    private void privateSubscribeTopic(String topic, FilterExpression filterExpression, Pair<String, String> brokerPair, List<QueueRouteInfo> queues) throws ClientException {
        Map<Object, Object> brokerGroupQueuesMap = new HashMap<Pair<String, String>, List<QueueRouteInfo>>();
        if (brokerPair != null && queues != null && !queues.isEmpty()) {
            brokerGroupQueuesMap.put(brokerPair, queues);
        } else {
            List<QueueRouteInfo> queueRouteInfos = this.queueSelector.getQueuesByTopic(topic);
            if (queueRouteInfos == null || queueRouteInfos.isEmpty()) {
                this.logger.error("no queues for topic:{}", (Object)topic);
                if (this.state == ServiceState.CREATE) {
                    throw new ClientException(String.format("subscribe topic failed, cannot find queue for topic:%s", topic), ClientExceptionCode.NOT_FOUND_QUEUE.getCode());
                }
                return;
            }
            brokerGroupQueuesMap = this.getBrokerAddressQueuesMap(queueRouteInfos);
        }
        SubscribeTopicRequest subscribeTopicRequest = new SubscribeTopicRequest();
        subscribeTopicRequest.setTopic(topic);
        subscribeTopicRequest.setConsumerGroupId(this.consumerGroupId);
        subscribeTopicRequest.setConsumerId(this.consumerId);
        subscribeTopicRequest.setConsumeFromWhere(this.consumerConfig.getDefaultConsumePosition());
        subscribeTopicRequest.setMaxPushNums(this.consumerConfig.getRecommendedBatchSizePerPush());
        subscribeTopicRequest.setFilterExpression(filterExpression);
        for (Map.Entry<Object, Object> entry : brokerGroupQueuesMap.entrySet()) {
            String brokerGroup = (String)((Pair)entry.getKey()).getObject1();
            String brokerAddr = (String)((Pair)entry.getKey()).getObject2();
            ArrayList<Integer> queueIds = new ArrayList<Integer>();
            for (QueueRouteInfo queueRouteInfo : (List)entry.getValue()) {
                queueIds.add(queueRouteInfo.getQueueId());
            }
            subscribeTopicRequest.setQueueIds(queueIds);
            SubscribeTopicResponse subscribeTopicResponse = this.remotingApiWrapper.sync(this, brokerAddr, subscribeTopicRequest, SubscribeTopicResponse.class);
            if (subscribeTopicResponse.success()) {
                ConcurrentHashMap<String, ChannelWrapper> subscribedBrokers = (ConcurrentHashMap<String, ChannelWrapper>)this.subscribedTopicBrokerMap.get(topic);
                if (subscribedBrokers == null) {
                    subscribedBrokers = new ConcurrentHashMap<String, ChannelWrapper>();
                    this.subscribedTopicBrokerMap.put(topic, subscribedBrokers);
                }
                ChannelWrapper currentCW = this.remotingApiWrapper.getChannelWrapperByAddress(brokerAddr);
                if (brokerGroup == null || currentCW == null) {
                    this.logger.warn("Unexpected brokerGroup:{}, currentCW:{}", (Object)brokerGroup, (Object)currentCW);
                } else {
                    subscribedBrokers.put(brokerGroup, currentCW);
                }
                CopyOnWriteArraySet subscribedQueues = (CopyOnWriteArraySet)this.topicSubscribedQueuesMap.get(topic);
                if (subscribedQueues == null) {
                    subscribedQueues = new CopyOnWriteArraySet();
                    this.topicSubscribedQueuesMap.put(topic, subscribedQueues);
                }
                subscribedQueues.addAll((Collection)entry.getValue());
                this.logger.info("consumer:{} subscribe topic:{} from broker:{} queues:{} got response:{}", new Object[]{this.consumerId, topic, brokerAddr, queueIds, subscribeTopicResponse});
                continue;
            }
            this.logger.error("consumer:{} subscribe topic:{} from broker:{} queues:{} got response:{}", new Object[]{this.consumerId, topic, brokerAddr, queueIds, subscribeTopicResponse});
            if (this.state != ServiceState.CREATE) continue;
            throw new ClientException(String.format("subscribe topic:%s from broker:%s failed, response:%s", topic, brokerAddr, subscribeTopicResponse), ClientExceptionCode.NOT_FOUND_QUEUE.getCode());
        }
    }

    private void privateUnsubscribeTopic(String topic, Pair<String, String> brokerPair, List<QueueRouteInfo> queues) throws ClientException {
        Map<Object, Object> brokerGroupQueuesMap = new HashMap<Pair<String, String>, List<QueueRouteInfo>>();
        if (brokerPair != null && queues != null && !queues.isEmpty()) {
            brokerGroupQueuesMap.put(brokerPair, queues);
        } else {
            List<QueueRouteInfo> queueRouteInfos = this.queueSelector.getQueuesByTopic(topic);
            if (queueRouteInfos == null || queueRouteInfos.isEmpty()) {
                this.logger.error("no queues for topic:{}", (Object)topic);
                if (this.state == ServiceState.CREATE) {
                    throw new ClientException(String.format("subscribe topic failed, cannot find queue for topic:%s", topic), ClientExceptionCode.NOT_FOUND_QUEUE.getCode());
                }
                return;
            }
            brokerGroupQueuesMap = this.getBrokerAddressQueuesMap(queueRouteInfos);
        }
        UnsubscribeTopicRequest unsubscribeTopicRequest = new UnsubscribeTopicRequest();
        unsubscribeTopicRequest.setTopic(topic);
        unsubscribeTopicRequest.setConsumerGroupId(this.consumerGroupId);
        unsubscribeTopicRequest.setConsumerId(this.consumerId);
        ConcurrentMap subscribedBrokers = (ConcurrentMap)this.subscribedTopicBrokerMap.get(topic);
        Set subscribedQueues = (Set)this.topicSubscribedQueuesMap.get(topic);
        for (Map.Entry<Object, Object> entry : brokerGroupQueuesMap.entrySet()) {
            String brokerGroup = (String)((Pair)entry.getKey()).getObject1();
            String brokerAddr = (String)((Pair)entry.getKey()).getObject2();
            ArrayList<Integer> queueIds = new ArrayList<Integer>();
            for (QueueRouteInfo queueRouteInfo : (List)entry.getValue()) {
                queueIds.add(queueRouteInfo.getQueueId());
            }
            unsubscribeTopicRequest.setQueueIds(queueIds);
            UnsubscribeTopicResponse unsubscribeTopicResponse = this.remotingApiWrapper.sync(this, brokerAddr, unsubscribeTopicRequest, UnsubscribeTopicResponse.class);
            if (unsubscribeTopicResponse.success()) {
                this.logger.info("consumer:{}, unsubscribe topic:{} from broker:{} queues:{} got response:{}", new Object[]{this.consumerId, topic, brokerAddr, queueIds, unsubscribeTopicResponse});
                boolean removeBroker = true;
                if (subscribedQueues != null) {
                    subscribedQueues.removeAll((Collection)entry.getValue());
                    for (QueueRouteInfo queueRouteInfo : subscribedQueues) {
                        if (!queueRouteInfo.getAddress().equals(brokerAddr)) continue;
                        removeBroker = false;
                        break;
                    }
                }
                if (subscribedBrokers == null || !removeBroker) continue;
                subscribedBrokers.remove(brokerGroup);
                continue;
            }
            this.logger.warn("consumer:{} unsubscribe topic:{} from broker:{} queues:{} got response:{}", new Object[]{this.consumerId, topic, brokerAddr, queueIds, unsubscribeTopicResponse});
        }
    }

    private void checkConnectionToBroker() {
        for (Map.Entry<String, FilterExpression> entry : this.subscribedTopics.entrySet()) {
            String topic = entry.getKey();
            FilterExpression filterExpression = entry.getValue();
            try {
                List<QueueRouteInfo> queueRouteInfos = this.queueSelector.getQueuesByTopic(topic);
                if (queueRouteInfos == null || queueRouteInfos.isEmpty()) {
                    this.subscribedTopicBrokerMap.remove(topic);
                    continue;
                }
                Map<Pair<String, String>, List<QueueRouteInfo>> brokerGroupQueuesMap = this.getBrokerAddressQueuesMap(queueRouteInfos);
                ConcurrentHashMap subscribedBrokers = (ConcurrentHashMap)this.subscribedTopicBrokerMap.get(topic);
                if (subscribedBrokers == null) {
                    subscribedBrokers = new ConcurrentHashMap();
                    this.subscribedTopicBrokerMap.put(topic, subscribedBrokers);
                }
                for (Map.Entry<Pair<String, String>, List<QueueRouteInfo>> mapEntry : brokerGroupQueuesMap.entrySet()) {
                    String brokerGroup = mapEntry.getKey().getObject1();
                    String brokerAddress = mapEntry.getKey().getObject2();
                    if (StringUtils.isEmpty(brokerGroup) || StringUtils.isEmpty(brokerAddress)) {
                        subscribedBrokers.remove(brokerGroup);
                        continue;
                    }
                    List<QueueRouteInfo> queues = mapEntry.getValue();
                    ChannelWrapper currentCW = this.remotingApiWrapper.getChannelWrapperByAddress(brokerAddress);
                    ChannelWrapper lastSavedCW = (ChannelWrapper)subscribedBrokers.get(brokerGroup);
                    if (lastSavedCW == null || !lastSavedCW.equals(currentCW)) {
                        this.logger.info("connect to broker:{} is abnormal, currentCW:{}, lastSavedCW:{}", new Object[]{brokerAddress, currentCW, lastSavedCW});
                        subscribedBrokers.remove(brokerGroup);
                        this.privateSubscribeTopic(topic, filterExpression, mapEntry.getKey(), queues);
                        continue;
                    }
                    if (this.remotingApiWrapper.isConnectionActive(brokerAddress)) continue;
                    subscribedBrokers.remove(brokerGroup);
                    String alternativeAddress = this.queueSelector.getAlternativeAddress(brokerAddress);
                    this.logger.info("connect to broker:{} is not active anymore, got alternative address:{}", (Object)brokerAddress, (Object)alternativeAddress);
                    if (alternativeAddress == null) continue;
                    this.privateSubscribeTopic(topic, filterExpression, new Pair<String, String>(brokerGroup, alternativeAddress), queues);
                }
            }
            catch (ClientException e) {
                this.logger.warn("consumer:{} got exception:{} when check connection for topic:{}", new Object[]{this.consumerId, e, topic});
            }
        }
    }

    private Map<Pair<String, String>, List<QueueRouteInfo>> getBrokerAddressQueuesMap(List<QueueRouteInfo> queueRouteInfos) {
        HashMap<Pair<String, String>, List<QueueRouteInfo>> brokerGroupQueuesMap = new HashMap<Pair<String, String>, List<QueueRouteInfo>>();
        for (QueueRouteInfo queueRouteInfo : queueRouteInfos) {
            Pair<String, String> key = new Pair<String, String>(queueRouteInfo.getBrokerGroup(), queueRouteInfo.getAddress());
            ArrayList<QueueRouteInfo> queues = (ArrayList<QueueRouteInfo>)brokerGroupQueuesMap.get(key);
            if (queues == null) {
                queues = new ArrayList<QueueRouteInfo>();
                brokerGroupQueuesMap.put(key, queues);
            }
            queues.add(queueRouteInfo);
        }
        return brokerGroupQueuesMap;
    }

    private void applyQueueRebalancedResult(Map<String, List<QueueRouteInfo>> topicQueuesMap) {
        Map<Pair<String, String>, List<QueueRouteInfo>> brokerGroupQueuesMap;
        Collection<QueueRouteInfo> queueRouteInfos;
        String topic;
        for (Map.Entry<String, List<QueueRouteInfo>> entry : topicQueuesMap.entrySet()) {
            topic = entry.getKey();
            queueRouteInfos = entry.getValue();
            ArrayList<QueueRouteInfo> queuesNeedSubscribe = new ArrayList<QueueRouteInfo>();
            if (this.topicSubscribedQueuesMap.get(topic) == null) {
                queuesNeedSubscribe.addAll(queueRouteInfos);
            } else {
                queuesNeedSubscribe.addAll(queueRouteInfos);
                queuesNeedSubscribe.removeAll((Collection)this.topicSubscribedQueuesMap.get(topic));
            }
            if (queuesNeedSubscribe.isEmpty()) continue;
            brokerGroupQueuesMap = this.getBrokerAddressQueuesMap(queuesNeedSubscribe);
            for (Map.Entry<Pair<String, String>, List<QueueRouteInfo>> e : brokerGroupQueuesMap.entrySet()) {
                try {
                    this.privateSubscribeTopic(topic, this.subscribedTopics.get(topic), e.getKey(), e.getValue());
                }
                catch (ClientException exception) {
                    this.logger.warn("consumer:{} got exception:{} when subscribe topic:{} from broker:{} queues:{}", new Object[]{this.consumerId, exception, topic, e.getKey(), e.getValue()});
                }
            }
        }
        for (Map.Entry<String, List<QueueRouteInfo>> entry : this.topicSubscribedQueuesMap.entrySet()) {
            topic = entry.getKey();
            queueRouteInfos = (Set)((Object)entry.getValue());
            ArrayList<QueueRouteInfo> queuesNeedUnSubscribe = new ArrayList<QueueRouteInfo>();
            if (topicQueuesMap.get(topic) == null) {
                queuesNeedUnSubscribe.addAll(queueRouteInfos);
            } else {
                queuesNeedUnSubscribe.addAll(queueRouteInfos);
                queuesNeedUnSubscribe.removeAll((Collection)topicQueuesMap.get(topic));
            }
            if (queuesNeedUnSubscribe.isEmpty()) continue;
            brokerGroupQueuesMap = this.getBrokerAddressQueuesMap(queuesNeedUnSubscribe);
            for (Map.Entry<Pair<String, String>, List<QueueRouteInfo>> e : brokerGroupQueuesMap.entrySet()) {
                try {
                    this.privateUnsubscribeTopic(topic, e.getKey(), e.getValue());
                }
                catch (ClientException exception) {
                    this.logger.warn("consumer:{} got exception:{} when unsubscribe topic:{} from broker:{} queues:{}", new Object[]{this.consumerId, exception, topic, e.getKey(), e.getValue()});
                }
            }
        }
    }
}

