package com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer;

import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.MergeRequest;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.ProcessQueueGroup;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.log.ClientLogger;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.ThreadFactoryImpl;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.logging.InternalLogger;
import com.aliyun.openservices.shade.com.google.common.base.Objects;
import com.aliyun.openservices.shade.io.netty.util.internal.ConcurrentSet;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/* loaded from: input_file:BOOT-INF/lib/ons-client-1.8.7.1.Final.jar:com/aliyun/openservices/shade/com/alibaba/rocketmq/client/impl/consumer/MergeThreadExecutor.class */
public class MergeThreadExecutor {
    private static final InternalLogger LOG = ClientLogger.getLog();
    private static final long MAX_TIME_CONSUME_CONTINUOUSLY = Long.parseLong(System.getProperty("rocketmq.client.maxTimeConsumeContinuously", "5000"));
    private final ConsumeMessageOrderlyByGroupService cs;
    private final MessageQueueGroupLock queueGroupLock = new MessageQueueGroupLock();
    private ConcurrentSet<ContinuouslyMergeRequest> mergeRequestSet = new ConcurrentSet<>();
    private int mergeThreadMin = 10;
    private int mergeThreadMax = 32;
    private final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(this.mergeThreadMin, this.mergeThreadMax, 60000, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), new ThreadFactoryImpl("MergeMessageThread_"));

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/ons-client-1.8.7.1.Final.jar:com/aliyun/openservices/shade/com/alibaba/rocketmq/client/impl/consumer/MergeThreadExecutor$ContinuouslyMergeRequest.class */
    public class ContinuouslyMergeRequest implements Runnable {
        private MergeRequest mergeRequest;

        public ContinuouslyMergeRequest(MergeRequest mergeRequest) {
            this.mergeRequest = mergeRequest;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                if (this.mergeRequest.getQueueGroup().getQueuePairs() == null) {
                    MergeThreadExecutor.this.remove(this);
                    return;
                }
                ProcessQueueGroup.ProcessQueueGroupStatus processQueueStatus = this.mergeRequest.getQueueGroup().getProcessQueueGroup().getProcessQueueStatus();
                if (processQueueStatus == ProcessQueueGroup.ProcessQueueGroupStatus.ALL_DROPPED) {
                    MergeThreadExecutor.LOG.warn("run, the message queue group not be able to consume, because it's dropped. {}", this.mergeRequest.getQueueGroup().getMessageQueueGroup());
                    MergeThreadExecutor.this.remove(this);
                    return;
                }
                if (processQueueStatus == ProcessQueueGroup.ProcessQueueGroupStatus.PARTIALLY_DROPPED) {
                    MergeThreadExecutor.LOG.warn("run, the message queue group not be able to consume, because some of its message queues are dropped. {}", this.mergeRequest.getQueueGroup().getMessageQueueGroup());
                    MergeThreadExecutor.this.updateLaterAndMergeAgain(this.mergeRequest, 1000L);
                    return;
                }
                Object fetchLockObject = MergeThreadExecutor.this.queueGroupLock.fetchLockObject(this.mergeRequest.getQueueGroup().getMessageQueueGroup());
                long currentTimeMillis = System.currentTimeMillis();
                int i = 0;
                while (!this.mergeRequest.isInterrupted()) {
                    long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
                    if (i > 0 && currentTimeMillis2 > MergeThreadExecutor.MAX_TIME_CONSUME_CONTINUOUSLY) {
                        MergeThreadExecutor.this.submit(this.mergeRequest, true);
                        return;
                    } else {
                        synchronized (fetchLockObject) {
                            this.mergeRequest.run();
                        }
                        i++;
                    }
                }
                if (this.mergeRequest.getInterruptCode() == MergeRequest.InterruptCode.REMOVE_REQUEST) {
                    MergeThreadExecutor.this.remove(this);
                    synchronized (fetchLockObject) {
                        this.mergeRequest.run();
                    }
                } else if (this.mergeRequest.getInterruptCode() == MergeRequest.InterruptCode.LOCK_LATER) {
                    MergeThreadExecutor.this.tryLockLaterAndMergeAgain(this.mergeRequest, 10L);
                } else if (this.mergeRequest.getInterruptCode() == MergeRequest.InterruptCode.UPDATE_LATER) {
                    MergeThreadExecutor.this.updateLaterAndMergeAgain(this.mergeRequest, 10L);
                } else if (this.mergeRequest.getInterruptCode() == MergeRequest.InterruptCode.MERGE_LATER) {
                    MergeThreadExecutor.this.submitLater(this.mergeRequest, 1L);
                } else {
                    MergeThreadExecutor.LOG.error("unexpected interrupt code in mergeRequest {}", this.mergeRequest.getInterruptCode());
                    MergeThreadExecutor.this.remove(this);
                }
            } catch (Exception e) {
                MergeThreadExecutor.LOG.error("Run merge request exception", (Throwable) e);
            }
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            return Objects.equal(this.mergeRequest, ((ContinuouslyMergeRequest) obj).mergeRequest);
        }

        public int hashCode() {
            return Objects.hashCode(this.mergeRequest);
        }
    }

    public MergeThreadExecutor(ConsumeMessageOrderlyByGroupService consumeMessageOrderlyByGroupService) {
        this.cs = consumeMessageOrderlyByGroupService;
    }

    public void shutdown() {
        this.threadPoolExecutor.shutdown();
    }

    public void setCorePoolSize(int i) {
        this.threadPoolExecutor.setCorePoolSize(Math.max(this.mergeThreadMin, i));
    }

    public int getCorePoolSize() {
        return this.threadPoolExecutor.getCorePoolSize();
    }

    public MessageQueueGroupLock getQueueGroupLock() {
        return this.queueGroupLock;
    }

    public void remove(ContinuouslyMergeRequest continuouslyMergeRequest) {
        this.mergeRequestSet.remove(continuouslyMergeRequest);
    }

    public void submit(MergeRequest mergeRequest, boolean z) {
        mergeRequest.setInterrupted(false);
        mergeRequest.setInterruptCode(null);
        ContinuouslyMergeRequest continuouslyMergeRequest = new ContinuouslyMergeRequest(mergeRequest);
        boolean add = this.mergeRequestSet.add(continuouslyMergeRequest);
        if (z || add) {
            try {
                this.threadPoolExecutor.submit(continuouslyMergeRequest);
            } catch (Exception e) {
                LOG.error("error submit merge request: {}, mq: {}", e.toString(), mergeRequest.getQueueGroup().getMessageQueueGroup());
            }
        }
    }

    public void submitLater(final MergeRequest mergeRequest, long j) {
        long j2 = j;
        if (j2 == -1) {
            j2 = this.cs.getDefaultMQPushConsumer().getSuspendCurrentQueueTimeMillis();
        }
        if (j2 > 30000) {
            j2 = 30000;
        }
        this.cs.getScheduledExecutorService().schedule(new Runnable() { // from class: com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.MergeThreadExecutor.1
            @Override // java.lang.Runnable
            public void run() {
                MergeThreadExecutor.this.submit(mergeRequest, true);
            }
        }, j2, TimeUnit.MILLISECONDS);
    }

    public void tryLockLaterAndMergeAgain(final MergeRequest mergeRequest, long j) {
        this.cs.getScheduledExecutorService().schedule(new Runnable() { // from class: com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.MergeThreadExecutor.2
            @Override // java.lang.Runnable
            public void run() {
                if (MergeThreadExecutor.this.cs.lockMQGroup(mergeRequest.getQueueGroup().getMessageQueueGroup())) {
                    MergeThreadExecutor.this.submitLater(mergeRequest, 10L);
                } else {
                    MergeThreadExecutor.this.submitLater(mergeRequest, 3000L);
                }
            }
        }, j, TimeUnit.MILLISECONDS);
    }

    public void updateLaterAndMergeAgain(final MergeRequest mergeRequest, long j) {
        this.cs.getScheduledExecutorService().schedule(new Runnable() { // from class: com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.MergeThreadExecutor.3
            @Override // java.lang.Runnable
            public void run() {
                MergeThreadExecutor.this.cs.defaultMQPushConsumerImpl.getmQClientFactory().updateTopicRouteInfoFromNameServer(mergeRequest.getQueueGroup().getTopic());
                boolean doRebalance = MergeThreadExecutor.this.cs.defaultMQPushConsumerImpl.doRebalance();
                QueueGroup queueGroup = mergeRequest.getQueueGroup();
                mergeRequest.setQueueGroup(MergeThreadExecutor.this.cs.getQueueGroupMap().get(queueGroup.getTopic()).get(Integer.valueOf(queueGroup.getQueueGroupId())));
                if (doRebalance) {
                    MergeThreadExecutor.this.submitLater(mergeRequest, 10L);
                } else {
                    MergeThreadExecutor.this.submitLater(mergeRequest, 3000L);
                }
            }
        }, j, TimeUnit.MILLISECONDS);
    }
}
