package org.apache.rocketmq.store.schedule;

import ch.qos.logback.core.pattern.color.ANSIConstants;
import ch.qos.logback.core.rolling.helper.DateTokenConverter;
import java.util.HashMap;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.rocketmq.common.ConfigManager;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.running.RunningStats;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.store.ConsumeQueue;
import org.apache.rocketmq.store.ConsumeQueueExt;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageStatus;
import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.apache.rocketmq.store.config.StorePathConfigHelper;

/* loaded from: input_file:BOOT-INF/lib/rocketmq-store-4.5.1.jar:org/apache/rocketmq/store/schedule/ScheduleMessageService.class */
public class ScheduleMessageService extends ConfigManager {
    private static final InternalLogger log = InternalLoggerFactory.getLogger("RocketmqStore");
    public static final String SCHEDULE_TOPIC = "SCHEDULE_TOPIC_XXXX";
    private static final long FIRST_DELAY_TIME = 1000;
    private static final long DELAY_FOR_A_WHILE = 100;
    private static final long DELAY_FOR_A_PERIOD = 10000;
    private final DefaultMessageStore defaultMessageStore;
    private Timer timer;
    private MessageStore writeMessageStore;
    private int maxDelayLevel;
    private final ConcurrentMap<Integer, Long> delayLevelTable = new ConcurrentHashMap(32);
    private final ConcurrentMap<Integer, Long> offsetTable = new ConcurrentHashMap(32);
    private final AtomicBoolean started = new AtomicBoolean(false);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:BOOT-INF/lib/rocketmq-store-4.5.1.jar:org/apache/rocketmq/store/schedule/ScheduleMessageService$DeliverDelayedMessageTimerTask.class */
    public class DeliverDelayedMessageTimerTask extends TimerTask {
        private final int delayLevel;
        private final long offset;

        public DeliverDelayedMessageTimerTask(int i, long j) {
            this.delayLevel = i;
            this.offset = j;
        }

        @Override // java.util.TimerTask, java.lang.Runnable
        public void run() {
            try {
                if (ScheduleMessageService.this.isStarted()) {
                    executeOnTimeup();
                }
            } catch (Exception e) {
                ScheduleMessageService.log.error("ScheduleMessageService, executeOnTimeup exception", (Throwable) e);
                ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, this.offset), 10000L);
            }
        }

        private long correctDeliverTimestamp(long j, long j2) {
            long j3 = j2;
            if (j2 > j + ((Long) ScheduleMessageService.this.delayLevelTable.get(Integer.valueOf(this.delayLevel))).longValue()) {
                j3 = j;
            }
            return j3;
        }

        public void executeOnTimeup() {
            PutMessageResult putMessage;
            ConsumeQueue findConsumeQueue = ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(ScheduleMessageService.SCHEDULE_TOPIC, ScheduleMessageService.delayLevel2QueueId(this.delayLevel));
            long j = this.offset;
            if (findConsumeQueue != null) {
                SelectMappedBufferResult indexBuffer = findConsumeQueue.getIndexBuffer(this.offset);
                if (indexBuffer != null) {
                    try {
                        long j2 = this.offset;
                        int i = 0;
                        ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
                        while (i < indexBuffer.getSize()) {
                            long j3 = indexBuffer.getByteBuffer().getLong();
                            int i2 = indexBuffer.getByteBuffer().getInt();
                            long j4 = indexBuffer.getByteBuffer().getLong();
                            if (findConsumeQueue.isExtAddr(j4)) {
                                if (findConsumeQueue.getExt(j4, cqExtUnit)) {
                                    j4 = cqExtUnit.getTagsCode();
                                } else {
                                    ScheduleMessageService.log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}", Long.valueOf(j4), Long.valueOf(j3), Integer.valueOf(i2));
                                    j4 = ScheduleMessageService.this.computeDeliverTimestamp(this.delayLevel, ScheduleMessageService.this.defaultMessageStore.getCommitLog().pickupStoreTimestamp(j3, i2));
                                }
                            }
                            long currentTimeMillis = System.currentTimeMillis();
                            long correctDeliverTimestamp = correctDeliverTimestamp(currentTimeMillis, j4);
                            long j5 = this.offset + (i / 20);
                            long j6 = correctDeliverTimestamp - currentTimeMillis;
                            if (j6 > 0) {
                                ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, j5), j6);
                                ScheduleMessageService.this.updateOffset(this.delayLevel, j5);
                                indexBuffer.release();
                                return;
                            }
                            MessageExt lookMessageByOffset = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(j3, i2);
                            if (lookMessageByOffset != null) {
                                try {
                                    putMessage = ScheduleMessageService.this.writeMessageStore.putMessage(messageTimeup(lookMessageByOffset));
                                } catch (Exception e) {
                                    ScheduleMessageService.log.error("ScheduleMessageService, messageTimeup execute error, drop it. msgExt=" + lookMessageByOffset + ", nextOffset=" + j5 + ",offsetPy=" + j3 + ",sizePy=" + i2, (Throwable) e);
                                }
                                if (putMessage == null || putMessage.getPutMessageStatus() != PutMessageStatus.PUT_OK) {
                                    ScheduleMessageService.log.error("ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}", lookMessageByOffset.getTopic(), lookMessageByOffset.getMsgId());
                                    ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, j5), 10000L);
                                    ScheduleMessageService.this.updateOffset(this.delayLevel, j5);
                                    return;
                                }
                                i += 20;
                            }
                            i += 20;
                        }
                        long j7 = this.offset + (i / 20);
                        ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, j7), 100L);
                        ScheduleMessageService.this.updateOffset(this.delayLevel, j7);
                        indexBuffer.release();
                        return;
                    } finally {
                        indexBuffer.release();
                    }
                }
                long minOffsetInQueue = findConsumeQueue.getMinOffsetInQueue();
                if (this.offset < minOffsetInQueue) {
                    j = minOffsetInQueue;
                    ScheduleMessageService.log.error("schedule CQ offset invalid. offset=" + this.offset + ", cqMinOffset=" + minOffsetInQueue + ", queueId=" + findConsumeQueue.getQueueId());
                }
            }
            ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, j), 100L);
        }

        private MessageExtBrokerInner messageTimeup(MessageExt messageExt) {
            MessageExtBrokerInner messageExtBrokerInner = new MessageExtBrokerInner();
            messageExtBrokerInner.setBody(messageExt.getBody());
            messageExtBrokerInner.setFlag(messageExt.getFlag());
            MessageAccessor.setProperties(messageExtBrokerInner, messageExt.getProperties());
            messageExtBrokerInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(MessageExt.parseTopicFilterType(messageExtBrokerInner.getSysFlag()), messageExtBrokerInner.getTags()));
            messageExtBrokerInner.setPropertiesString(MessageDecoder.messageProperties2String(messageExt.getProperties()));
            messageExtBrokerInner.setSysFlag(messageExt.getSysFlag());
            messageExtBrokerInner.setBornTimestamp(messageExt.getBornTimestamp());
            messageExtBrokerInner.setBornHost(messageExt.getBornHost());
            messageExtBrokerInner.setStoreHost(messageExt.getStoreHost());
            messageExtBrokerInner.setReconsumeTimes(messageExt.getReconsumeTimes());
            messageExtBrokerInner.setWaitStoreMsgOK(false);
            MessageAccessor.clearProperty(messageExtBrokerInner, "DELAY");
            messageExtBrokerInner.setTopic(messageExtBrokerInner.getProperty("REAL_TOPIC"));
            messageExtBrokerInner.setQueueId(Integer.parseInt(messageExtBrokerInner.getProperty("REAL_QID")));
            return messageExtBrokerInner;
        }
    }

    public ScheduleMessageService(DefaultMessageStore defaultMessageStore) {
        this.defaultMessageStore = defaultMessageStore;
        this.writeMessageStore = defaultMessageStore;
    }

    public static int queueId2DelayLevel(int i) {
        return i + 1;
    }

    public static int delayLevel2QueueId(int i) {
        return i - 1;
    }

    public void setWriteMessageStore(MessageStore messageStore) {
        this.writeMessageStore = messageStore;
    }

    public void buildRunningStats(HashMap<String, String> hashMap) {
        for (Map.Entry<Integer, Long> entry : this.offsetTable.entrySet()) {
            hashMap.put(String.format("%s_%d", RunningStats.scheduleMessageOffset.name(), entry.getKey()), String.format("%d,%d", Long.valueOf(entry.getValue().longValue()), Long.valueOf(this.defaultMessageStore.getMaxOffsetInQueue(SCHEDULE_TOPIC, delayLevel2QueueId(entry.getKey().intValue())))));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void updateOffset(int i, long j) {
        this.offsetTable.put(Integer.valueOf(i), Long.valueOf(j));
    }

    public long computeDeliverTimestamp(int i, long j) {
        Long l = this.delayLevelTable.get(Integer.valueOf(i));
        return l != null ? l.longValue() + j : j + 1000;
    }

    public void start() {
        if (this.started.compareAndSet(false, true)) {
            this.timer = new Timer("ScheduleMessageTimerThread", true);
            for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
                Integer key = entry.getKey();
                Long value = entry.getValue();
                Long l = this.offsetTable.get(key);
                if (null == l) {
                    l = 0L;
                }
                if (value != null) {
                    this.timer.schedule(new DeliverDelayedMessageTimerTask(key.intValue(), l.longValue()), 1000L);
                }
            }
            this.timer.scheduleAtFixedRate(new TimerTask() { // from class: org.apache.rocketmq.store.schedule.ScheduleMessageService.1
                @Override // java.util.TimerTask, java.lang.Runnable
                public void run() {
                    try {
                        if (ScheduleMessageService.this.started.get()) {
                            ScheduleMessageService.this.persist();
                        }
                    } catch (Throwable th) {
                        ScheduleMessageService.log.error("scheduleAtFixedRate flush exception", th);
                    }
                }
            }, 10000L, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
        }
    }

    public void shutdown() {
        if (!this.started.compareAndSet(true, false) || null == this.timer) {
            return;
        }
        this.timer.cancel();
    }

    public boolean isStarted() {
        return this.started.get();
    }

    public int getMaxDelayLevel() {
        return this.maxDelayLevel;
    }

    @Override // org.apache.rocketmq.common.ConfigManager
    public String encode() {
        return encode(false);
    }

    @Override // org.apache.rocketmq.common.ConfigManager
    public boolean load() {
        return super.load() && parseDelayLevel();
    }

    @Override // org.apache.rocketmq.common.ConfigManager
    public String configFilePath() {
        return StorePathConfigHelper.getDelayOffsetStorePath(this.defaultMessageStore.getMessageStoreConfig().getStorePathRootDir());
    }

    @Override // org.apache.rocketmq.common.ConfigManager
    public void decode(String str) {
        DelayOffsetSerializeWrapper delayOffsetSerializeWrapper;
        if (str == null || (delayOffsetSerializeWrapper = (DelayOffsetSerializeWrapper) DelayOffsetSerializeWrapper.fromJson(str, DelayOffsetSerializeWrapper.class)) == null) {
            return;
        }
        this.offsetTable.putAll(delayOffsetSerializeWrapper.getOffsetTable());
    }

    @Override // org.apache.rocketmq.common.ConfigManager
    public String encode(boolean z) {
        DelayOffsetSerializeWrapper delayOffsetSerializeWrapper = new DelayOffsetSerializeWrapper();
        delayOffsetSerializeWrapper.setOffsetTable(this.offsetTable);
        return delayOffsetSerializeWrapper.toJson(z);
    }

    public boolean parseDelayLevel() {
        HashMap hashMap = new HashMap();
        hashMap.put("s", 1000L);
        hashMap.put(ANSIConstants.ESC_END, 60000L);
        hashMap.put("h", 3600000L);
        hashMap.put(DateTokenConverter.CONVERTER_KEY, 86400000L);
        String messageDelayLevel = this.defaultMessageStore.getMessageStoreConfig().getMessageDelayLevel();
        try {
            String[] split = messageDelayLevel.split(" ");
            for (int i = 0; i < split.length; i++) {
                String str = split[i];
                Long l = (Long) hashMap.get(str.substring(str.length() - 1));
                int i2 = i + 1;
                if (i2 > this.maxDelayLevel) {
                    this.maxDelayLevel = i2;
                }
                this.delayLevelTable.put(Integer.valueOf(i2), Long.valueOf(l.longValue() * Long.parseLong(str.substring(0, str.length() - 1))));
            }
            return true;
        } catch (Exception e) {
            log.error("parseDelayLevel exception", (Throwable) e);
            log.info("levelString String = {}", messageDelayLevel);
            return false;
        }
    }
}
