/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.rocketmq.tools.monitor;

import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.PullResult;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQBrokerException;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.log.ClientLogger;
import com.alibaba.rocketmq.common.MQVersion;
import com.alibaba.rocketmq.common.ThreadFactoryImpl;
import com.alibaba.rocketmq.common.admin.ConsumeStats;
import com.alibaba.rocketmq.common.admin.OffsetWrapper;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.rocketmq.common.message.MessageQueue;
import com.alibaba.rocketmq.common.protocol.body.Connection;
import com.alibaba.rocketmq.common.protocol.body.ConsumerConnection;
import com.alibaba.rocketmq.common.protocol.body.ConsumerRunningInfo;
import com.alibaba.rocketmq.common.protocol.body.TopicList;
import com.alibaba.rocketmq.common.protocol.topic.OffsetMovedEvent;
import com.alibaba.rocketmq.remoting.RPCHook;
import com.alibaba.rocketmq.remoting.exception.RemotingException;
import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt;
import com.alibaba.rocketmq.tools.monitor.DefaultMonitorListener;
import com.alibaba.rocketmq.tools.monitor.DeleteMsgsEvent;
import com.alibaba.rocketmq.tools.monitor.MonitorConfig;
import com.alibaba.rocketmq.tools.monitor.MonitorListener;
import com.alibaba.rocketmq.tools.monitor.UndoneMsgs;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.TreeMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;

public class MonitorService {
    private final Logger log = ClientLogger.getLog();
    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor((ThreadFactory)new ThreadFactoryImpl("MonitorService"));
    private final MonitorConfig monitorConfig;
    private final MonitorListener monitorListener;
    private final DefaultMQAdminExt defaultMQAdminExt;
    private final DefaultMQPullConsumer defaultMQPullConsumer = new DefaultMQPullConsumer("TOOLS_CONSUMER");
    private final DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("__MONITOR_CONSUMER");

    public MonitorService(MonitorConfig monitorConfig, MonitorListener monitorListener, RPCHook rpcHook) {
        this.monitorConfig = monitorConfig;
        this.monitorListener = monitorListener;
        this.defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
        this.defaultMQAdminExt.setInstanceName(this.instanceName());
        this.defaultMQAdminExt.setNamesrvAddr(monitorConfig.getNamesrvAddr());
        this.defaultMQPullConsumer.setInstanceName(this.instanceName());
        this.defaultMQPullConsumer.setNamesrvAddr(monitorConfig.getNamesrvAddr());
        this.defaultMQPushConsumer.setInstanceName(this.instanceName());
        this.defaultMQPushConsumer.setNamesrvAddr(monitorConfig.getNamesrvAddr());
        try {
            this.defaultMQPushConsumer.setConsumeThreadMin(1);
            this.defaultMQPushConsumer.setConsumeThreadMax(1);
            this.defaultMQPushConsumer.subscribe("OFFSET_MOVED_EVENT", "*");
            this.defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently(){

                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    try {
                        OffsetMovedEvent ome = (OffsetMovedEvent)OffsetMovedEvent.decode((byte[])msgs.get(0).getBody(), OffsetMovedEvent.class);
                        DeleteMsgsEvent deleteMsgsEvent = new DeleteMsgsEvent();
                        deleteMsgsEvent.setOffsetMovedEvent(ome);
                        deleteMsgsEvent.setEventTimestamp(msgs.get(0).getStoreTimestamp());
                        MonitorService.this.monitorListener.reportDeleteMsgsEvent(deleteMsgsEvent);
                    }
                    catch (Exception exception) {
                        // empty catch block
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
        }
        catch (MQClientException e) {
            // empty catch block
        }
    }

    private String instanceName() {
        String name = System.currentTimeMillis() + (long)new Random().nextInt() + this.monitorConfig.getNamesrvAddr();
        return "MonitorService_" + name.hashCode();
    }

    private void startScheduleTask() {
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable(){

            @Override
            public void run() {
                try {
                    MonitorService.this.doMonitorWork();
                }
                catch (Exception e) {
                    MonitorService.this.log.error("doMonitorWork Exception", (Throwable)e);
                }
            }
        }, 20000L, this.monitorConfig.getRoundInterval(), TimeUnit.MILLISECONDS);
    }

    public void start() throws MQClientException {
        this.defaultMQPullConsumer.start();
        this.defaultMQAdminExt.start();
        this.defaultMQPushConsumer.start();
        this.startScheduleTask();
    }

    public void shutdown() {
        this.defaultMQPullConsumer.shutdown();
        this.defaultMQAdminExt.shutdown();
        this.defaultMQPushConsumer.shutdown();
    }

    public void doMonitorWork() throws RemotingException, MQClientException, InterruptedException {
        long beginTime = System.currentTimeMillis();
        this.monitorListener.beginRound();
        TopicList topicList = this.defaultMQAdminExt.fetchAllTopicList();
        for (String topic : topicList.getTopicList()) {
            if (!topic.startsWith("%RETRY%")) continue;
            String consumerGroup = topic.substring("%RETRY%".length());
            try {
                this.reportUndoneMsgs(consumerGroup);
            }
            catch (Exception e) {
                // empty catch block
            }
            try {
                this.reportConsumerRunningInfo(consumerGroup);
            }
            catch (Exception e) {}
        }
        this.monitorListener.endRound();
        long spentTimeMills = System.currentTimeMillis() - beginTime;
        this.log.info("Execute one round monitor work, spent timemills: {}", (Object)spentTimeMills);
    }

    public void reportConsumerRunningInfo(String consumerGroup) throws InterruptedException, MQBrokerException, RemotingException, MQClientException {
        ConsumerConnection cc = this.defaultMQAdminExt.examineConsumerConnectionInfo(consumerGroup);
        TreeMap<String, ConsumerRunningInfo> infoMap = new TreeMap<String, ConsumerRunningInfo>();
        for (Connection c : cc.getConnectionSet()) {
            String clientId = c.getClientId();
            if (c.getVersion() < MQVersion.Version.V3_1_8_SNAPSHOT.ordinal()) continue;
            try {
                ConsumerRunningInfo info = this.defaultMQAdminExt.getConsumerRunningInfo(consumerGroup, clientId, false);
                infoMap.put(clientId, info);
            }
            catch (Exception e) {}
        }
        if (!infoMap.isEmpty()) {
            this.monitorListener.reportConsumerRunningInfo(infoMap);
        }
    }

    private void reportFailedMsgs(String consumerGroup, String topic) {
    }

    private void reportUndoneMsgs(String consumerGroup) {
        ConsumeStats cs = null;
        try {
            cs = this.defaultMQAdminExt.examineConsumeStats(consumerGroup);
        }
        catch (Exception e) {
            return;
        }
        ConsumerConnection cc = null;
        try {
            cc = this.defaultMQAdminExt.examineConsumerConnectionInfo(consumerGroup);
        }
        catch (Exception e) {
            return;
        }
        if (cs != null) {
            HashMap<String, ConsumeStats> csByTopic = new HashMap<String, ConsumeStats>();
            for (Map.Entry next : cs.getOffsetTable().entrySet()) {
                MessageQueue mq = (MessageQueue)next.getKey();
                OffsetWrapper ow = (OffsetWrapper)next.getValue();
                ConsumeStats csTmp = (ConsumeStats)csByTopic.get(mq.getTopic());
                if (null == csTmp) {
                    csTmp = new ConsumeStats();
                    csByTopic.put(mq.getTopic(), csTmp);
                }
                csTmp.getOffsetTable().put(mq, ow);
            }
            for (Map.Entry next : csByTopic.entrySet()) {
                UndoneMsgs undoneMsgs = new UndoneMsgs();
                undoneMsgs.setConsumerGroup(consumerGroup);
                undoneMsgs.setTopic((String)next.getKey());
                this.computeUndoneMsgs(undoneMsgs, (ConsumeStats)next.getValue());
                this.monitorListener.reportUndoneMsgs(undoneMsgs);
                this.reportFailedMsgs(consumerGroup, (String)next.getKey());
            }
        }
    }

    private void computeUndoneMsgs(UndoneMsgs undoneMsgs, ConsumeStats consumeStats) {
        long total = 0L;
        long singleMax = 0L;
        long delayMax = 0L;
        for (Map.Entry next : consumeStats.getOffsetTable().entrySet()) {
            MessageQueue mq = (MessageQueue)next.getKey();
            OffsetWrapper ow = (OffsetWrapper)next.getValue();
            long diff = ow.getBrokerOffset() - ow.getConsumerOffset();
            if (diff > singleMax) {
                singleMax = diff;
            }
            if (diff > 0L) {
                total += diff;
            }
            if (ow.getLastTimestamp() <= 0L) continue;
            try {
                long maxOffset = this.defaultMQPullConsumer.maxOffset(mq);
                if (maxOffset <= 0L) continue;
                PullResult pull = this.defaultMQPullConsumer.pull(mq, "*", maxOffset - 1L, 1);
                switch (pull.getPullStatus()) {
                    case FOUND: {
                        long delay = ((MessageExt)pull.getMsgFoundList().get(0)).getStoreTimestamp() - ow.getLastTimestamp();
                        if (delay <= delayMax) break;
                        delayMax = delay;
                        break;
                    }
                    case NO_MATCHED_MSG: 
                    case NO_NEW_MSG: 
                    case OFFSET_ILLEGAL: {
                        break;
                    }
                }
            }
            catch (Exception e) {}
        }
        undoneMsgs.setUndoneMsgsTotal(total);
        undoneMsgs.setUndoneMsgsSingleMQ(singleMax);
        undoneMsgs.setUndoneMsgsDelayTimeMills(delayMax);
    }

    public static void main(String[] args) throws MQClientException {
        MonitorService.main0(args, null);
    }

    public static void main0(String[] args, RPCHook rpcHook) throws MQClientException {
        final MonitorService monitorService = new MonitorService(new MonitorConfig(), new DefaultMonitorListener(), rpcHook);
        monitorService.start();
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable(){
            private volatile boolean hasShutdown = false;

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                3 var1_1 = this;
                synchronized (var1_1) {
                    if (!this.hasShutdown) {
                        this.hasShutdown = true;
                        monitorService.shutdown();
                    }
                }
            }
        }, "ShutdownHook"));
    }
}

