package cn.bizvane.rocketmq.spring.core.consumer;

import cn.bizvane.rocketmq.spring.annotation.RocketMQMessageListener;
import cn.bizvane.rocketmq.spring.autoconfigure.RocketMQProperties;
import cn.bizvane.rocketmq.spring.exception.ConsumerRegisterException;
import com.google.common.base.Joiner;
import com.google.common.collect.Maps;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.framework.AopProxyUtils;
import org.springframework.aop.support.AopUtils;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.ApplicationContext;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;

/* loaded from: input_file:cn/bizvane/rocketmq/spring/core/consumer/ConsumerAnnotationBean.class */
public class ConsumerAnnotationBean implements InitializingBean, ConsumerBean {
    private static final Logger log = LoggerFactory.getLogger(ConsumerAnnotationBean.class);
    private RocketMQProperties properties;
    private ApplicationContext ctx;
    private MessageListenerOrderly listenerOrderly;
    private MessageListenerConcurrently listenerConcurrently;
    private boolean running;
    List<DefaultMQPushConsumer> consumerList = new ArrayList();
    final String SUBSCRIBE_SPLITTER = " || ";

    public ConsumerAnnotationBean(RocketMQProperties rocketMQProperties, ApplicationContext applicationContext, MessageListenerOrderly messageListenerOrderly, MessageListenerConcurrently messageListenerConcurrently) {
        this.properties = rocketMQProperties;
        this.ctx = applicationContext;
        this.listenerOrderly = messageListenerOrderly;
        this.listenerConcurrently = messageListenerConcurrently;
    }

    private void initConsumer() {
        HashMap newHashMap = Maps.newHashMap();
        RocketMQProperties.Consumer consumer = this.properties.getConsumer();
        for (Map.Entry entry : this.ctx.getBeansOfType(RocketMQListener.class).entrySet()) {
            String groupName = consumer.getGroupName();
            if (!StringUtils.hasText(groupName)) {
                throw new ConsumerRegisterException("consumerGroup empty.");
            }
            RocketMQListener rocketMQListener = (RocketMQListener) entry.getValue();
            Class<?> cls = rocketMQListener.getClass();
            if (AopUtils.isAopProxy(rocketMQListener)) {
                cls = AopProxyUtils.ultimateTargetClass(rocketMQListener);
            }
            RocketMQMessageListener rocketMQMessageListener = (RocketMQMessageListener) cls.getAnnotation(RocketMQMessageListener.class);
            if (rocketMQMessageListener == null) {
                throw new ConsumerRegisterException(String.format("class:%s 没有注解 @RocketMQMessageListener", cls.getName()));
            }
            String str = rocketMQMessageListener.topic();
            if (!StringUtils.hasText(str)) {
                throw new ConsumerRegisterException("topic empty.");
            }
            String groupName2 = rocketMQMessageListener.groupName();
            String str2 = StringUtils.isEmpty(groupName2) ? groupName : groupName + "-" + groupName2;
            List list = (List) newHashMap.get(str2);
            if (list == null) {
                list = new ArrayList();
            }
            list.add(str);
            newHashMap.put(str2, list);
            Map<String, RocketMQListener> map = SUBSCRIPTION_TABLE.get(str);
            if (map == null) {
                map = Maps.newHashMap();
                SUBSCRIPTION_TABLE.put(str, map);
            }
            for (String str3 : rocketMQMessageListener.tags()) {
                if (map.containsKey(str3)) {
                    throw new ConsumerRegisterException(String.format("重复的tag监听, %s:%s", str, str3));
                }
                map.put(str3, rocketMQListener);
            }
        }
        for (Map.Entry entry2 : newHashMap.entrySet()) {
            String str4 = (String) entry2.getKey();
            DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer((StringUtils.hasText(this.properties.getAccessKey()) && StringUtils.hasText(this.properties.getSecretKey())) ? new AclClientRPCHook(new SessionCredentials(this.properties.getAccessKey(), this.properties.getSecretKey())) : null);
            defaultMQPushConsumer.setNamesrvAddr(this.properties.getNameServer());
            defaultMQPushConsumer.setConsumerGroup(str4);
            defaultMQPushConsumer.setMaxReconsumeTimes(consumer.getMaxRetryCount());
            defaultMQPushConsumer.setConsumeTimeout(consumer.getTimeout());
            defaultMQPushConsumer.setConsumeFromWhere(consumer.getConsumeFromWhere());
            defaultMQPushConsumer.setUnitMode(true);
            defaultMQPushConsumer.setUnitName(str4);
            defaultMQPushConsumer.setConsumeMessageBatchMaxSize(1);
            if (this.properties.isNamespaceEnable()) {
                defaultMQPushConsumer.setNamespace(this.properties.getNamespace());
            }
            if (consumer.getConsumeMode() == ConsumeMode.CONCURRENTLY) {
                defaultMQPushConsumer.setMessageListener(this.listenerConcurrently);
                defaultMQPushConsumer.setConsumeThreadMin(consumer.getMaxThread());
                defaultMQPushConsumer.setConsumeThreadMax(consumer.getMaxThread());
            } else {
                defaultMQPushConsumer.setMessageListener(this.listenerOrderly);
                defaultMQPushConsumer.setSuspendCurrentQueueTimeMillis(consumer.getSuspendCurrentQueueTimeMillis());
            }
            if (consumer.getPullBatchSize() > 0) {
                defaultMQPushConsumer.setPullBatchSize(consumer.getPullBatchSize());
            }
            if (consumer.getQueueCacheCount() > 0) {
                defaultMQPushConsumer.setPullThresholdForQueue(consumer.getQueueCacheCount());
            }
            if (consumer.getTopicCacheCount() > 0) {
                defaultMQPushConsumer.setPullThresholdForTopic(consumer.getTopicCacheCount());
            }
            if (consumer.getConsumeFromWhere() == ConsumeFromWhere.CONSUME_FROM_TIMESTAMP) {
                if (StringUtils.hasText(consumer.getConsumeTimestamp()) && consumer.getConsumeTimestamp().length() != 14) {
                    throw new ConsumerRegisterException("consumeTimestamp 格式不正确.");
                }
                defaultMQPushConsumer.setConsumeTimestamp(consumer.getConsumeTimestamp());
            }
            for (String str5 : (List) entry2.getValue()) {
                String join = Joiner.on(" || ").join(getSubscriptionTable().get(str5).keySet());
                try {
                    defaultMQPushConsumer.subscribe(str5, join.contains("*") ? "*" : join);
                    log.info("ConsumerGroup:{} 订阅 topic: {} tags: {} ", new Object[]{str4, str5, join});
                } catch (MQClientException e) {
                    log.error("ConsumerGroup:{} 订阅topic: {}, tags: {} 失败", new Object[]{str4, str5, join});
                }
            }
            this.consumerList.add(defaultMQPushConsumer);
        }
    }

    @Override // cn.bizvane.rocketmq.spring.core.consumer.ConsumerBean
    public void start() {
        if (CollectionUtils.isEmpty(this.consumerList) || !isClosed()) {
            return;
        }
        for (DefaultMQPushConsumer defaultMQPushConsumer : this.consumerList) {
            try {
                log.info("Consumer in the start ....");
                defaultMQPushConsumer.start();
                log.info("Consumer start finish....");
                this.running = true;
            } catch (MQClientException e) {
                throw new IllegalStateException("Failed to start RocketMQ push consumer", e);
            }
        }
    }

    @Override // cn.bizvane.rocketmq.spring.core.consumer.ConsumerBean
    public void shutdown() {
        if (CollectionUtils.isEmpty(this.consumerList) || !isStarted()) {
            return;
        }
        for (DefaultMQPushConsumer defaultMQPushConsumer : this.consumerList) {
            log.info("Consumer in the shutdown ....");
            defaultMQPushConsumer.shutdown();
            log.info("Consumer shutdown finish ....");
            this.running = false;
        }
    }

    @Override // cn.bizvane.rocketmq.spring.core.consumer.ConsumerBean
    public boolean isStarted() {
        return this.running;
    }

    @Override // cn.bizvane.rocketmq.spring.core.consumer.ConsumerBean
    public boolean isClosed() {
        return !this.running;
    }

    @Override // cn.bizvane.rocketmq.spring.core.consumer.ConsumerBean
    public Map<String, Map<String, RocketMQListener>> getSubscriptionTable() {
        return SUBSCRIPTION_TABLE;
    }

    public void afterPropertiesSet() throws Exception {
        initConsumer();
    }
}
