package org.apache.rocketmq.store;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.FileLock;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.SystemClock;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBatch;
import org.apache.rocketmq.common.running.RunningStats;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.store.ConsumeQueueExt;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.config.StorePathConfigHelper;
import org.apache.rocketmq.store.dledger.DLedgerCommitLog;
import org.apache.rocketmq.store.ha.HAService;
import org.apache.rocketmq.store.index.IndexService;
import org.apache.rocketmq.store.index.QueryOffsetResult;
import org.apache.rocketmq.store.schedule.ScheduleMessageService;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.aspectj.lang.JoinPoint;

/* loaded from: input_file:BOOT-INF/lib/rocketmq-store-4.5.1.jar:org/apache/rocketmq/store/DefaultMessageStore.class */
public class DefaultMessageStore implements MessageStore {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
    private final MessageStoreConfig messageStoreConfig;
    private final CommitLog commitLog;
    private final ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>> consumeQueueTable;
    private final FlushConsumeQueueService flushConsumeQueueService;
    private final CleanCommitLogService cleanCommitLogService;
    private final CleanConsumeQueueService cleanConsumeQueueService;
    private final IndexService indexService;
    private final ReputMessageService reputMessageService;
    private final HAService haService;
    private final ScheduleMessageService scheduleMessageService;
    private final StoreStatsService storeStatsService;
    private final TransientStorePool transientStorePool;
    private final BrokerStatsManager brokerStatsManager;
    private final MessageArrivingListener messageArrivingListener;
    private final BrokerConfig brokerConfig;
    private StoreCheckpoint storeCheckpoint;
    private final LinkedList<CommitLogDispatcher> dispatcherList;
    private RandomAccessFile lockFile;
    private FileLock lock;
    private final RunningFlags runningFlags = new RunningFlags();
    private final SystemClock systemClock = new SystemClock();
    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("StoreScheduledThread"));
    private volatile boolean shutdown = true;
    private AtomicLong printTimes = new AtomicLong(0);
    boolean shutDownNormal = false;
    private final AllocateMappedFileService allocateMappedFileService = new AllocateMappedFileService(this);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:BOOT-INF/lib/rocketmq-store-4.5.1.jar:org/apache/rocketmq/store/DefaultMessageStore$CleanCommitLogService.class */
    public class CleanCommitLogService {
        private static final int MAX_MANUAL_DELETE_FILE_TIMES = 20;
        private final double diskSpaceWarningLevelRatio = Double.parseDouble(System.getProperty("rocketmq.broker.diskSpaceWarningLevelRatio", "0.90"));
        private final double diskSpaceCleanForciblyRatio = Double.parseDouble(System.getProperty("rocketmq.broker.diskSpaceCleanForciblyRatio", "0.85"));
        private long lastRedeleteTimestamp = 0;
        private volatile int manualDeleteFileSeveralTimes = 0;
        private volatile boolean cleanImmediately = false;

        CleanCommitLogService() {
        }

        public void excuteDeleteFilesManualy() {
            this.manualDeleteFileSeveralTimes = 20;
            DefaultMessageStore.log.info("executeDeleteFilesManually was invoked");
        }

        public void run() {
            try {
                deleteExpiredFiles();
                redeleteHangedFile();
            } catch (Throwable th) {
                DefaultMessageStore.log.warn(getServiceName() + " service has exception. ", th);
            }
        }

        private void deleteExpiredFiles() {
            long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime();
            int deleteCommitLogFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval();
            int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();
            boolean isTimeToDelete = isTimeToDelete();
            boolean isSpaceToDelete = isSpaceToDelete();
            boolean z = this.manualDeleteFileSeveralTimes > 0;
            if (isTimeToDelete || isSpaceToDelete || z) {
                if (z) {
                    this.manualDeleteFileSeveralTimes--;
                }
                boolean z2 = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately;
                DefaultMessageStore.log.info("begin to delete before {} hours file. timeup: {} spacefull: {} manualDeleteFileSeveralTimes: {} cleanAtOnce: {}", Long.valueOf(fileReservedTime), Boolean.valueOf(isTimeToDelete), Boolean.valueOf(isSpaceToDelete), Integer.valueOf(this.manualDeleteFileSeveralTimes), Boolean.valueOf(z2));
                if (DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime * 3600000, deleteCommitLogFilesInterval, destroyMapedFileIntervalForcibly, z2) <= 0 && isSpaceToDelete) {
                    DefaultMessageStore.log.warn("disk space will be full soon, but delete file failed.");
                }
            }
        }

        private void redeleteHangedFile() {
            int redeleteHangedFileInterval = DefaultMessageStore.this.getMessageStoreConfig().getRedeleteHangedFileInterval();
            long currentTimeMillis = System.currentTimeMillis();
            if (currentTimeMillis - this.lastRedeleteTimestamp > redeleteHangedFileInterval) {
                this.lastRedeleteTimestamp = currentTimeMillis;
                if (DefaultMessageStore.this.commitLog.retryDeleteFirstFile(DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly())) {
                }
            }
        }

        public String getServiceName() {
            return CleanCommitLogService.class.getSimpleName();
        }

        private boolean isTimeToDelete() {
            String deleteWhen = DefaultMessageStore.this.getMessageStoreConfig().getDeleteWhen();
            if (!UtilAll.isItTimeToDo(deleteWhen)) {
                return false;
            }
            DefaultMessageStore.log.info("it's time to reclaim disk space, " + deleteWhen);
            return true;
        }

        private boolean isSpaceToDelete() {
            double diskMaxUsedSpaceRatio = DefaultMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0d;
            this.cleanImmediately = false;
            double diskPartitionSpaceUsedPercent = UtilAll.getDiskPartitionSpaceUsedPercent(DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog());
            if (diskPartitionSpaceUsedPercent > this.diskSpaceWarningLevelRatio) {
                if (DefaultMessageStore.this.runningFlags.getAndMakeDiskFull()) {
                    DefaultMessageStore.log.error("physic disk maybe full soon " + diskPartitionSpaceUsedPercent + ", so mark disk full");
                }
                this.cleanImmediately = true;
            } else if (diskPartitionSpaceUsedPercent > this.diskSpaceCleanForciblyRatio) {
                this.cleanImmediately = true;
            } else if (!DefaultMessageStore.this.runningFlags.getAndMakeDiskOK()) {
                DefaultMessageStore.log.info("physic disk space OK " + diskPartitionSpaceUsedPercent + ", so mark disk ok");
            }
            if (diskPartitionSpaceUsedPercent < 0.0d || diskPartitionSpaceUsedPercent > diskMaxUsedSpaceRatio) {
                DefaultMessageStore.log.info("physic disk maybe full soon, so reclaim space, " + diskPartitionSpaceUsedPercent);
                return true;
            }
            double diskPartitionSpaceUsedPercent2 = UtilAll.getDiskPartitionSpaceUsedPercent(StorePathConfigHelper.getStorePathConsumeQueue(DefaultMessageStore.this.getMessageStoreConfig().getStorePathRootDir()));
            if (diskPartitionSpaceUsedPercent2 > this.diskSpaceWarningLevelRatio) {
                if (DefaultMessageStore.this.runningFlags.getAndMakeDiskFull()) {
                    DefaultMessageStore.log.error("logics disk maybe full soon " + diskPartitionSpaceUsedPercent2 + ", so mark disk full");
                }
                this.cleanImmediately = true;
            } else if (diskPartitionSpaceUsedPercent2 > this.diskSpaceCleanForciblyRatio) {
                this.cleanImmediately = true;
            } else if (!DefaultMessageStore.this.runningFlags.getAndMakeDiskOK()) {
                DefaultMessageStore.log.info("logics disk space OK " + diskPartitionSpaceUsedPercent2 + ", so mark disk ok");
            }
            if (diskPartitionSpaceUsedPercent2 >= 0.0d && diskPartitionSpaceUsedPercent2 <= diskMaxUsedSpaceRatio) {
                return false;
            }
            DefaultMessageStore.log.info("logics disk maybe full soon, so reclaim space, " + diskPartitionSpaceUsedPercent2);
            return true;
        }

        public int getManualDeleteFileSeveralTimes() {
            return this.manualDeleteFileSeveralTimes;
        }

        public void setManualDeleteFileSeveralTimes(int i) {
            this.manualDeleteFileSeveralTimes = i;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:BOOT-INF/lib/rocketmq-store-4.5.1.jar:org/apache/rocketmq/store/DefaultMessageStore$CleanConsumeQueueService.class */
    public class CleanConsumeQueueService {
        private long lastPhysicalMinOffset = 0;

        CleanConsumeQueueService() {
        }

        public void run() {
            try {
                deleteExpiredFiles();
            } catch (Throwable th) {
                DefaultMessageStore.log.warn(getServiceName() + " service has exception. ", th);
            }
        }

        private void deleteExpiredFiles() {
            int deleteConsumeQueueFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteConsumeQueueFilesInterval();
            long minOffset = DefaultMessageStore.this.commitLog.getMinOffset();
            if (minOffset > this.lastPhysicalMinOffset) {
                this.lastPhysicalMinOffset = minOffset;
                Iterator it = DefaultMessageStore.this.consumeQueueTable.values().iterator();
                while (it.hasNext()) {
                    Iterator it2 = ((ConcurrentMap) it.next()).values().iterator();
                    while (it2.hasNext()) {
                        if (((ConsumeQueue) it2.next()).deleteExpiredFile(minOffset) > 0 && deleteConsumeQueueFilesInterval > 0) {
                            try {
                                Thread.sleep(deleteConsumeQueueFilesInterval);
                            } catch (InterruptedException e) {
                            }
                        }
                    }
                }
                DefaultMessageStore.this.indexService.deleteExpiredFile(minOffset);
            }
        }

        public String getServiceName() {
            return CleanConsumeQueueService.class.getSimpleName();
        }
    }

    /* loaded from: input_file:BOOT-INF/lib/rocketmq-store-4.5.1.jar:org/apache/rocketmq/store/DefaultMessageStore$CommitLogDispatcherBuildConsumeQueue.class */
    class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {
        CommitLogDispatcherBuildConsumeQueue() {
        }

        @Override // org.apache.rocketmq.store.CommitLogDispatcher
        public void dispatch(DispatchRequest dispatchRequest) {
            switch (MessageSysFlag.getTransactionValue(dispatchRequest.getSysFlag())) {
                case 0:
                case 8:
                    DefaultMessageStore.this.putMessagePositionInfo(dispatchRequest);
                    return;
                case 4:
                case 12:
                default:
                    return;
            }
        }
    }

    /* loaded from: input_file:BOOT-INF/lib/rocketmq-store-4.5.1.jar:org/apache/rocketmq/store/DefaultMessageStore$CommitLogDispatcherBuildIndex.class */
    class CommitLogDispatcherBuildIndex implements CommitLogDispatcher {
        CommitLogDispatcherBuildIndex() {
        }

        @Override // org.apache.rocketmq.store.CommitLogDispatcher
        public void dispatch(DispatchRequest dispatchRequest) {
            if (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) {
                DefaultMessageStore.this.indexService.buildIndex(dispatchRequest);
            }
        }
    }

    /* loaded from: input_file:BOOT-INF/lib/rocketmq-store-4.5.1.jar:org/apache/rocketmq/store/DefaultMessageStore$FlushConsumeQueueService.class */
    class FlushConsumeQueueService extends ServiceThread {
        private static final int RETRY_TIMES_OVER = 3;
        private long lastFlushTimestamp = 0;

        FlushConsumeQueueService() {
        }

        private void doFlush(int i) {
            int flushConsumeQueueLeastPages = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueLeastPages();
            if (i == 3) {
                flushConsumeQueueLeastPages = 0;
            }
            long j = 0;
            int flushConsumeQueueThoroughInterval = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueThoroughInterval();
            long currentTimeMillis = System.currentTimeMillis();
            if (currentTimeMillis >= this.lastFlushTimestamp + flushConsumeQueueThoroughInterval) {
                this.lastFlushTimestamp = currentTimeMillis;
                flushConsumeQueueLeastPages = 0;
                j = DefaultMessageStore.this.getStoreCheckpoint().getLogicsMsgTimestamp();
            }
            Iterator it = DefaultMessageStore.this.consumeQueueTable.values().iterator();
            while (it.hasNext()) {
                for (ConsumeQueue consumeQueue : ((ConcurrentMap) it.next()).values()) {
                    boolean z = false;
                    for (int i2 = 0; i2 < i && !z; i2++) {
                        z = consumeQueue.flush(flushConsumeQueueLeastPages);
                    }
                }
            }
            if (0 == flushConsumeQueueLeastPages) {
                if (j > 0) {
                    DefaultMessageStore.this.getStoreCheckpoint().setLogicsMsgTimestamp(j);
                }
                DefaultMessageStore.this.getStoreCheckpoint().flush();
            }
        }

        @Override // java.lang.Runnable
        public void run() {
            DefaultMessageStore.log.info(getServiceName() + " service started");
            while (!isStopped()) {
                try {
                    waitForRunning(DefaultMessageStore.this.getMessageStoreConfig().getFlushIntervalConsumeQueue());
                    doFlush(1);
                } catch (Exception e) {
                    DefaultMessageStore.log.warn(getServiceName() + " service has exception. ", (Throwable) e);
                }
            }
            doFlush(3);
            DefaultMessageStore.log.info(getServiceName() + " service end");
        }

        @Override // org.apache.rocketmq.common.ServiceThread
        public String getServiceName() {
            return FlushConsumeQueueService.class.getSimpleName();
        }

        @Override // org.apache.rocketmq.common.ServiceThread
        public long getJointime() {
            return 60000L;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:BOOT-INF/lib/rocketmq-store-4.5.1.jar:org/apache/rocketmq/store/DefaultMessageStore$ReputMessageService.class */
    public class ReputMessageService extends ServiceThread {
        private volatile long reputFromOffset = 0;

        ReputMessageService() {
        }

        public long getReputFromOffset() {
            return this.reputFromOffset;
        }

        public void setReputFromOffset(long j) {
            this.reputFromOffset = j;
        }

        @Override // org.apache.rocketmq.common.ServiceThread
        public void shutdown() {
            for (int i = 0; i < 50 && isCommitLogAvailable(); i++) {
                try {
                    Thread.sleep(100L);
                } catch (InterruptedException e) {
                }
            }
            if (isCommitLogAvailable()) {
                DefaultMessageStore.log.warn("shutdown ReputMessageService, but commitlog have not finish to be dispatched, CL: {} reputFromOffset: {}", Long.valueOf(DefaultMessageStore.this.commitLog.getMaxOffset()), Long.valueOf(this.reputFromOffset));
            }
            super.shutdown();
        }

        public long behind() {
            return DefaultMessageStore.this.commitLog.getMaxOffset() - this.reputFromOffset;
        }

        private boolean isCommitLogAvailable() {
            return this.reputFromOffset < DefaultMessageStore.this.commitLog.getMaxOffset();
        }

        private void doReput() {
            if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {
                DefaultMessageStore.log.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.", Long.valueOf(this.reputFromOffset), Long.valueOf(DefaultMessageStore.this.commitLog.getMinOffset()));
                this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();
            }
            boolean z = true;
            while (isCommitLogAvailable() && z) {
                if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {
                    return;
                }
                SelectMappedBufferResult data = DefaultMessageStore.this.commitLog.getData(this.reputFromOffset);
                if (data != null) {
                    try {
                        this.reputFromOffset = data.getStartOffset();
                        int i = 0;
                        while (i < data.getSize() && z) {
                            DispatchRequest checkMessageAndReturnSize = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(data.getByteBuffer(), false, false);
                            int msgSize = checkMessageAndReturnSize.getBufferSize() == -1 ? checkMessageAndReturnSize.getMsgSize() : checkMessageAndReturnSize.getBufferSize();
                            if (checkMessageAndReturnSize.isSuccess()) {
                                if (msgSize > 0) {
                                    DefaultMessageStore.this.doDispatch(checkMessageAndReturnSize);
                                    if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
                                        DefaultMessageStore.this.messageArrivingListener.arriving(checkMessageAndReturnSize.getTopic(), checkMessageAndReturnSize.getQueueId(), checkMessageAndReturnSize.getConsumeQueueOffset() + 1, checkMessageAndReturnSize.getTagsCode(), checkMessageAndReturnSize.getStoreTimestamp(), checkMessageAndReturnSize.getBitMap(), checkMessageAndReturnSize.getPropertiesMap());
                                    }
                                    this.reputFromOffset += msgSize;
                                    i += msgSize;
                                    if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
                                        DefaultMessageStore.this.storeStatsService.getSinglePutMessageTopicTimesTotal(checkMessageAndReturnSize.getTopic()).incrementAndGet();
                                        DefaultMessageStore.this.storeStatsService.getSinglePutMessageTopicSizeTotal(checkMessageAndReturnSize.getTopic()).addAndGet(checkMessageAndReturnSize.getMsgSize());
                                    }
                                } else if (msgSize == 0) {
                                    this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
                                    i = data.getSize();
                                }
                            } else if (!checkMessageAndReturnSize.isSuccess()) {
                                if (msgSize > 0) {
                                    DefaultMessageStore.log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", Long.valueOf(this.reputFromOffset));
                                    this.reputFromOffset += msgSize;
                                } else {
                                    z = false;
                                    if (DefaultMessageStore.this.getMessageStoreConfig().isEnableDLegerCommitLog() || DefaultMessageStore.this.brokerConfig.getBrokerId() == 0) {
                                        DefaultMessageStore.log.error("[BUG]dispatch message to consume queue error, COMMITLOG OFFSET: {}", Long.valueOf(this.reputFromOffset));
                                        this.reputFromOffset += data.getSize() - i;
                                    }
                                }
                            }
                        }
                    } finally {
                        data.release();
                    }
                } else {
                    z = false;
                }
            }
        }

        @Override // java.lang.Runnable
        public void run() {
            DefaultMessageStore.log.info(getServiceName() + " service started");
            while (!isStopped()) {
                try {
                    Thread.sleep(1L);
                    doReput();
                } catch (Exception e) {
                    DefaultMessageStore.log.warn(getServiceName() + " service has exception. ", (Throwable) e);
                }
            }
            DefaultMessageStore.log.info(getServiceName() + " service end");
        }

        @Override // org.apache.rocketmq.common.ServiceThread
        public String getServiceName() {
            return ReputMessageService.class.getSimpleName();
        }
    }

    public DefaultMessageStore(MessageStoreConfig messageStoreConfig, BrokerStatsManager brokerStatsManager, MessageArrivingListener messageArrivingListener, BrokerConfig brokerConfig) throws IOException {
        this.messageArrivingListener = messageArrivingListener;
        this.brokerConfig = brokerConfig;
        this.messageStoreConfig = messageStoreConfig;
        this.brokerStatsManager = brokerStatsManager;
        if (messageStoreConfig.isEnableDLegerCommitLog()) {
            this.commitLog = new DLedgerCommitLog(this);
        } else {
            this.commitLog = new CommitLog(this);
        }
        this.consumeQueueTable = new ConcurrentHashMap(32);
        this.flushConsumeQueueService = new FlushConsumeQueueService();
        this.cleanCommitLogService = new CleanCommitLogService();
        this.cleanConsumeQueueService = new CleanConsumeQueueService();
        this.storeStatsService = new StoreStatsService();
        this.indexService = new IndexService(this);
        if (messageStoreConfig.isEnableDLegerCommitLog()) {
            this.haService = null;
        } else {
            this.haService = new HAService(this);
        }
        this.reputMessageService = new ReputMessageService();
        this.scheduleMessageService = new ScheduleMessageService(this);
        this.transientStorePool = new TransientStorePool(messageStoreConfig);
        if (messageStoreConfig.isTransientStorePoolEnable()) {
            this.transientStorePool.init();
        }
        this.allocateMappedFileService.start();
        this.indexService.start();
        this.dispatcherList = new LinkedList<>();
        this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue());
        this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex());
        File file = new File(StorePathConfigHelper.getLockFile(messageStoreConfig.getStorePathRootDir()));
        MappedFile.ensureDirOK(file.getParent());
        this.lockFile = new RandomAccessFile(file, "rw");
    }

    public void truncateDirtyLogicFiles(long j) {
        Iterator<ConcurrentMap<Integer, ConsumeQueue>> it = this.consumeQueueTable.values().iterator();
        while (it.hasNext()) {
            Iterator<ConsumeQueue> it2 = it.next().values().iterator();
            while (it2.hasNext()) {
                it2.next().truncateDirtyLogicFiles(j);
            }
        }
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public boolean load() {
        boolean z;
        boolean z2 = true;
        try {
            boolean z3 = !isTempFileExist();
            log.info("last shutdown {}", z3 ? "normally" : "abnormally");
            if (null != this.scheduleMessageService) {
                z2 = 1 != 0 && this.scheduleMessageService.load();
            }
            z = (z2 && this.commitLog.load()) && loadConsumeQueue();
            if (z) {
                this.storeCheckpoint = new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
                this.indexService.load(z3);
                recover(z3);
                log.info("load over, and the max phy offset = {}", Long.valueOf(getMaxPhyOffset()));
            }
        } catch (Exception e) {
            log.error("load exception", (Throwable) e);
            z = false;
        }
        if (!z) {
            this.allocateMappedFileService.shutdown();
        }
        return z;
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public void start() throws Exception {
        this.lock = this.lockFile.getChannel().tryLock(0L, 1L, false);
        if (this.lock == null || this.lock.isShared() || !this.lock.isValid()) {
            throw new RuntimeException("Lock failed,MQ already started");
        }
        this.lockFile.getChannel().write(ByteBuffer.wrap(JoinPoint.SYNCHRONIZATION_LOCK.getBytes()));
        this.lockFile.getChannel().force(true);
        long minOffset = this.commitLog.getMinOffset();
        Iterator<ConcurrentMap<Integer, ConsumeQueue>> it = this.consumeQueueTable.values().iterator();
        while (it.hasNext()) {
            for (ConsumeQueue consumeQueue : it.next().values()) {
                if (consumeQueue.getMaxPhysicOffset() > minOffset) {
                    minOffset = consumeQueue.getMaxPhysicOffset();
                }
            }
        }
        if (minOffset < 0) {
            minOffset = 0;
        }
        if (minOffset < this.commitLog.getMinOffset()) {
            minOffset = this.commitLog.getMinOffset();
            log.warn("[TooSmallCqOffset] maxPhysicalPosInLogicQueue={} clMinOffset={}", Long.valueOf(minOffset), Long.valueOf(this.commitLog.getMinOffset()));
        }
        log.info("[SetReputOffset] maxPhysicalPosInLogicQueue={} clMinOffset={} clMaxOffset={} clConfirmedOffset={}", Long.valueOf(minOffset), Long.valueOf(this.commitLog.getMinOffset()), Long.valueOf(this.commitLog.getMaxOffset()), Long.valueOf(this.commitLog.getConfirmOffset()));
        this.reputMessageService.setReputFromOffset(minOffset);
        this.reputMessageService.start();
        while (dispatchBehindBytes() > 0) {
            Thread.sleep(1000L);
            log.info("Try to finish doing reput the messages fall behind during the starting, reputOffset={} maxOffset={} behind={}", Long.valueOf(this.reputMessageService.getReputFromOffset()), Long.valueOf(getMaxPhyOffset()), Long.valueOf(dispatchBehindBytes()));
        }
        recoverTopicQueueTable();
        if (!this.messageStoreConfig.isEnableDLegerCommitLog()) {
            this.haService.start();
            handleScheduleMessageService(this.messageStoreConfig.getBrokerRole());
        }
        this.flushConsumeQueueService.start();
        this.commitLog.start();
        this.storeStatsService.start();
        createTempFile();
        addScheduleTask();
        this.shutdown = false;
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public void shutdown() {
        if (!this.shutdown) {
            this.shutdown = true;
            this.scheduledExecutorService.shutdown();
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
                log.error("shutdown Exception, ", (Throwable) e);
            }
            if (this.scheduleMessageService != null) {
                this.scheduleMessageService.shutdown();
            }
            if (this.haService != null) {
                this.haService.shutdown();
            }
            this.storeStatsService.shutdown();
            this.indexService.shutdown();
            this.commitLog.shutdown();
            this.reputMessageService.shutdown();
            this.flushConsumeQueueService.shutdown();
            this.allocateMappedFileService.shutdown();
            this.storeCheckpoint.flush();
            this.storeCheckpoint.shutdown();
            if (this.runningFlags.isWriteable() && dispatchBehindBytes() == 0) {
                deleteFile(StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir()));
                this.shutDownNormal = true;
            } else {
                log.warn("the store may be wrong, so shutdown abnormally, and keep abort file.");
            }
        }
        this.transientStorePool.destroy();
        if (this.lockFile == null || this.lock == null) {
            return;
        }
        try {
            this.lock.release();
            this.lockFile.close();
        } catch (IOException e2) {
        }
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public void destroy() {
        destroyLogics();
        this.commitLog.destroy();
        this.indexService.destroy();
        deleteFile(StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir()));
        deleteFile(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
    }

    public void destroyLogics() {
        Iterator<ConcurrentMap<Integer, ConsumeQueue>> it = this.consumeQueueTable.values().iterator();
        while (it.hasNext()) {
            Iterator<ConsumeQueue> it2 = it.next().values().iterator();
            while (it2.hasNext()) {
                it2.next().destroy();
            }
        }
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public PutMessageResult putMessage(MessageExtBrokerInner messageExtBrokerInner) {
        if (this.shutdown) {
            log.warn("message store has shutdown, so putMessage is forbidden");
            return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
        }
        if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
            if (this.printTimes.getAndIncrement() % 50000 == 0) {
                log.warn("message store is slave mode, so putMessage is forbidden ");
            }
            return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
        }
        if (!this.runningFlags.isWriteable()) {
            if (this.printTimes.getAndIncrement() % 50000 == 0) {
                log.warn("message store is not writeable, so putMessage is forbidden " + this.runningFlags.getFlagBits());
            }
            return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
        }
        this.printTimes.set(0L);
        if (messageExtBrokerInner.getTopic().length() > 127) {
            log.warn("putMessage message topic length too long " + messageExtBrokerInner.getTopic().length());
            return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
        }
        if (messageExtBrokerInner.getPropertiesString() != null && messageExtBrokerInner.getPropertiesString().length() > 32767) {
            log.warn("putMessage message properties length too long " + messageExtBrokerInner.getPropertiesString().length());
            return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);
        }
        if (isOSPageCacheBusy()) {
            return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);
        }
        long now = getSystemClock().now();
        PutMessageResult putMessage = this.commitLog.putMessage(messageExtBrokerInner);
        long now2 = getSystemClock().now() - now;
        if (now2 > 500) {
            log.warn("putMessage not in lock eclipse time(ms)={}, bodyLength={}", Long.valueOf(now2), Integer.valueOf(messageExtBrokerInner.getBody().length));
        }
        this.storeStatsService.setPutMessageEntireTimeMax(now2);
        if (null == putMessage || !putMessage.isOk()) {
            this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
        }
        return putMessage;
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public PutMessageResult putMessages(MessageExtBatch messageExtBatch) {
        if (this.shutdown) {
            log.warn("DefaultMessageStore has shutdown, so putMessages is forbidden");
            return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
        }
        if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
            if (this.printTimes.getAndIncrement() % 50000 == 0) {
                log.warn("DefaultMessageStore is in slave mode, so putMessages is forbidden ");
            }
            return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
        }
        if (!this.runningFlags.isWriteable()) {
            if (this.printTimes.getAndIncrement() % 50000 == 0) {
                log.warn("DefaultMessageStore is not writable, so putMessages is forbidden " + this.runningFlags.getFlagBits());
            }
            return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
        }
        this.printTimes.set(0L);
        if (messageExtBatch.getTopic().length() > 127) {
            log.warn("PutMessages topic length too long " + messageExtBatch.getTopic().length());
            return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
        }
        if (messageExtBatch.getBody().length > this.messageStoreConfig.getMaxMessageSize()) {
            log.warn("PutMessages body length too long " + messageExtBatch.getBody().length);
            return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
        }
        if (isOSPageCacheBusy()) {
            return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);
        }
        long now = getSystemClock().now();
        PutMessageResult putMessages = this.commitLog.putMessages(messageExtBatch);
        long now2 = getSystemClock().now() - now;
        if (now2 > 500) {
            log.warn("not in lock eclipse time(ms)={}, bodyLength={}", Long.valueOf(now2), Integer.valueOf(messageExtBatch.getBody().length));
        }
        this.storeStatsService.setPutMessageEntireTimeMax(now2);
        if (null == putMessages || !putMessages.isOk()) {
            this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
        }
        return putMessages;
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public boolean isOSPageCacheBusy() {
        long now = this.systemClock.now() - getCommitLog().getBeginTimeInLock();
        return now < 10000000 && now > this.messageStoreConfig.getOsPageCacheBusyTimeOutMills();
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public long lockTimeMills() {
        return this.commitLog.lockTimeMills();
    }

    public SystemClock getSystemClock() {
        return this.systemClock;
    }

    public CommitLog getCommitLog() {
        return this.commitLog;
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public GetMessageResult getMessage(String str, String str2, int i, long j, int i2, MessageFilter messageFilter) {
        GetMessageStatus getMessageStatus;
        long nextOffsetCorrection;
        if (this.shutdown) {
            log.warn("message store has shutdown, so getMessage is forbidden");
            return null;
        }
        if (!this.runningFlags.isReadable()) {
            log.warn("message store is not readable, so getMessage is forbidden " + this.runningFlags.getFlagBits());
            return null;
        }
        long now = getSystemClock().now();
        GetMessageStatus getMessageStatus2 = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
        long j2 = 0;
        long j3 = 0;
        GetMessageResult getMessageResult = new GetMessageResult();
        long maxOffset = this.commitLog.getMaxOffset();
        ConsumeQueue findConsumeQueue = findConsumeQueue(str2, i);
        if (findConsumeQueue != null) {
            j2 = findConsumeQueue.getMinOffsetInQueue();
            j3 = findConsumeQueue.getMaxOffsetInQueue();
            if (j3 == 0) {
                getMessageStatus = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
                nextOffsetCorrection = nextOffsetCorrection(j, 0L);
            } else if (j < j2) {
                getMessageStatus = GetMessageStatus.OFFSET_TOO_SMALL;
                nextOffsetCorrection = nextOffsetCorrection(j, j2);
            } else if (j == j3) {
                getMessageStatus = GetMessageStatus.OFFSET_OVERFLOW_ONE;
                nextOffsetCorrection = nextOffsetCorrection(j, j);
            } else if (j > j3) {
                getMessageStatus = GetMessageStatus.OFFSET_OVERFLOW_BADLY;
                nextOffsetCorrection = 0 == j2 ? nextOffsetCorrection(j, j2) : nextOffsetCorrection(j, j3);
            } else {
                SelectMappedBufferResult indexBuffer = findConsumeQueue.getIndexBuffer(j);
                if (indexBuffer != null) {
                    try {
                        getMessageStatus = GetMessageStatus.NO_MATCHED_MESSAGE;
                        long j4 = Long.MIN_VALUE;
                        long j5 = 0;
                        int i3 = 0;
                        int max = Math.max(16000, i2 * 20);
                        boolean isDiskFallRecorded = this.messageStoreConfig.isDiskFallRecorded();
                        ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
                        while (i3 < indexBuffer.getSize() && i3 < max) {
                            long j6 = indexBuffer.getByteBuffer().getLong();
                            int i4 = indexBuffer.getByteBuffer().getInt();
                            long j7 = indexBuffer.getByteBuffer().getLong();
                            j5 = j6;
                            if (j4 == Long.MIN_VALUE || j6 >= j4) {
                                if (isTheBatchFull(i4, i2, getMessageResult.getBufferTotalSize(), getMessageResult.getMessageCount(), checkInDiskByCommitOffset(j6, maxOffset))) {
                                    break;
                                }
                                boolean z = false;
                                boolean z2 = true;
                                if (findConsumeQueue.isExtAddr(j7)) {
                                    z = findConsumeQueue.getExt(j7, cqExtUnit);
                                    if (z) {
                                        j7 = cqExtUnit.getTagsCode();
                                    } else {
                                        log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}, topic={}, group={}", Long.valueOf(j7), Long.valueOf(j6), Integer.valueOf(i4), str2, str);
                                        z2 = false;
                                    }
                                }
                                if (messageFilter != null) {
                                    if (!messageFilter.isMatchedByConsumeQueue(z2 ? Long.valueOf(j7) : null, z ? cqExtUnit : null)) {
                                        if (getMessageResult.getBufferTotalSize() == 0) {
                                            getMessageStatus = GetMessageStatus.NO_MATCHED_MESSAGE;
                                        }
                                    }
                                }
                                SelectMappedBufferResult message = this.commitLog.getMessage(j6, i4);
                                if (null == message) {
                                    if (getMessageResult.getBufferTotalSize() == 0) {
                                        getMessageStatus = GetMessageStatus.MESSAGE_WAS_REMOVING;
                                    }
                                    j4 = this.commitLog.rollNextFile(j6);
                                } else if (messageFilter == null || messageFilter.isMatchedByCommitLog(message.getByteBuffer().slice(), null)) {
                                    this.storeStatsService.getGetMessageTransferedMsgCount().incrementAndGet();
                                    getMessageResult.addMessage(message);
                                    getMessageStatus = GetMessageStatus.FOUND;
                                    j4 = Long.MIN_VALUE;
                                } else {
                                    if (getMessageResult.getBufferTotalSize() == 0) {
                                        getMessageStatus = GetMessageStatus.NO_MATCHED_MESSAGE;
                                    }
                                    message.release();
                                }
                            }
                            i3 += 20;
                        }
                        if (isDiskFallRecorded) {
                            this.brokerStatsManager.recordDiskFallBehindSize(str, str2, i, maxOffset - j5);
                        }
                        nextOffsetCorrection = j + (i3 / 20);
                        getMessageResult.setSuggestPullingFromSlave(maxOffset - j5 > ((long) (((double) StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE) * (((double) this.messageStoreConfig.getAccessMessageInMemoryMaxRatio()) / 100.0d))));
                        indexBuffer.release();
                    } catch (Throwable th) {
                        indexBuffer.release();
                        throw th;
                    }
                } else {
                    getMessageStatus = GetMessageStatus.OFFSET_FOUND_NULL;
                    nextOffsetCorrection = nextOffsetCorrection(j, findConsumeQueue.rollNextFile(j));
                    log.warn("consumer request topic: " + str2 + "offset: " + j + " minOffset: " + j2 + " maxOffset: " + j3 + ", but access logic queue failed.");
                }
            }
        } else {
            getMessageStatus = GetMessageStatus.NO_MATCHED_LOGIC_QUEUE;
            nextOffsetCorrection = nextOffsetCorrection(j, 0L);
        }
        if (GetMessageStatus.FOUND == getMessageStatus) {
            this.storeStatsService.getGetMessageTimesTotalFound().incrementAndGet();
        } else {
            this.storeStatsService.getGetMessageTimesTotalMiss().incrementAndGet();
        }
        this.storeStatsService.setGetMessageEntireTimeMax(getSystemClock().now() - now);
        getMessageResult.setStatus(getMessageStatus);
        getMessageResult.setNextBeginOffset(nextOffsetCorrection);
        getMessageResult.setMaxOffset(j3);
        getMessageResult.setMinOffset(j2);
        return getMessageResult;
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public long getMaxOffsetInQueue(String str, int i) {
        ConsumeQueue findConsumeQueue = findConsumeQueue(str, i);
        if (findConsumeQueue != null) {
            return findConsumeQueue.getMaxOffsetInQueue();
        }
        return 0L;
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public long getMinOffsetInQueue(String str, int i) {
        ConsumeQueue findConsumeQueue = findConsumeQueue(str, i);
        if (findConsumeQueue != null) {
            return findConsumeQueue.getMinOffsetInQueue();
        }
        return -1L;
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public long getCommitLogOffsetInQueue(String str, int i, long j) {
        SelectMappedBufferResult indexBuffer;
        ConsumeQueue findConsumeQueue = findConsumeQueue(str, i);
        if (findConsumeQueue == null || (indexBuffer = findConsumeQueue.getIndexBuffer(j)) == null) {
            return 0L;
        }
        try {
            long j2 = indexBuffer.getByteBuffer().getLong();
            indexBuffer.release();
            return j2;
        } catch (Throwable th) {
            indexBuffer.release();
            throw th;
        }
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public long getOffsetInQueueByTime(String str, int i, long j) {
        ConsumeQueue findConsumeQueue = findConsumeQueue(str, i);
        if (findConsumeQueue != null) {
            return findConsumeQueue.getOffsetInQueueByTime(j);
        }
        return 0L;
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public MessageExt lookMessageByOffset(long j) {
        SelectMappedBufferResult message = this.commitLog.getMessage(j, 4);
        if (null == message) {
            return null;
        }
        try {
            MessageExt lookMessageByOffset = lookMessageByOffset(j, message.getByteBuffer().getInt());
            message.release();
            return lookMessageByOffset;
        } catch (Throwable th) {
            message.release();
            throw th;
        }
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public SelectMappedBufferResult selectOneMessageByOffset(long j) {
        SelectMappedBufferResult message = this.commitLog.getMessage(j, 4);
        if (null == message) {
            return null;
        }
        try {
            SelectMappedBufferResult message2 = this.commitLog.getMessage(j, message.getByteBuffer().getInt());
            message.release();
            return message2;
        } catch (Throwable th) {
            message.release();
            throw th;
        }
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public SelectMappedBufferResult selectOneMessageByOffset(long j, int i) {
        return this.commitLog.getMessage(j, i);
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public String getRunningDataInfo() {
        return this.storeStatsService.toString();
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public HashMap<String, String> getRuntimeInfo() {
        HashMap<String, String> runtimeInfo = this.storeStatsService.getRuntimeInfo();
        runtimeInfo.put(RunningStats.commitLogDiskRatio.name(), String.valueOf(UtilAll.getDiskPartitionSpaceUsedPercent(getMessageStoreConfig().getStorePathCommitLog())));
        runtimeInfo.put(RunningStats.consumeQueueDiskRatio.name(), String.valueOf(UtilAll.getDiskPartitionSpaceUsedPercent(StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()))));
        if (this.scheduleMessageService != null) {
            this.scheduleMessageService.buildRunningStats(runtimeInfo);
        }
        runtimeInfo.put(RunningStats.commitLogMinOffset.name(), String.valueOf(getMinPhyOffset()));
        runtimeInfo.put(RunningStats.commitLogMaxOffset.name(), String.valueOf(getMaxPhyOffset()));
        return runtimeInfo;
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public long getMaxPhyOffset() {
        return this.commitLog.getMaxOffset();
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public long getMinPhyOffset() {
        return this.commitLog.getMinOffset();
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public long getEarliestMessageTime(String str, int i) {
        ConsumeQueue findConsumeQueue = findConsumeQueue(str, i);
        if (findConsumeQueue != null) {
            return getStoreTime(findConsumeQueue.getIndexBuffer(findConsumeQueue.getMinLogicOffset() / 20));
        }
        return -1L;
    }

    private long getStoreTime(SelectMappedBufferResult selectMappedBufferResult) {
        if (selectMappedBufferResult == null) {
            return -1L;
        }
        try {
            long pickupStoreTimestamp = getCommitLog().pickupStoreTimestamp(selectMappedBufferResult.getByteBuffer().getLong(), selectMappedBufferResult.getByteBuffer().getInt());
            selectMappedBufferResult.release();
            return pickupStoreTimestamp;
        } catch (Exception e) {
            selectMappedBufferResult.release();
            return -1L;
        } catch (Throwable th) {
            selectMappedBufferResult.release();
            throw th;
        }
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public long getEarliestMessageTime() {
        return getCommitLog().pickupStoreTimestamp(getMinPhyOffset(), this.messageStoreConfig.getMaxMessageSize() * 2);
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public long getMessageStoreTimeStamp(String str, int i, long j) {
        ConsumeQueue findConsumeQueue = findConsumeQueue(str, i);
        if (findConsumeQueue != null) {
            return getStoreTime(findConsumeQueue.getIndexBuffer(j));
        }
        return -1L;
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public long getMessageTotalInQueue(String str, int i) {
        ConsumeQueue findConsumeQueue = findConsumeQueue(str, i);
        if (findConsumeQueue != null) {
            return findConsumeQueue.getMessageTotalInQueue();
        }
        return -1L;
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public SelectMappedBufferResult getCommitLogData(long j) {
        if (!this.shutdown) {
            return this.commitLog.getData(j);
        }
        log.warn("message store has shutdown, so getPhyQueueData is forbidden");
        return null;
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public boolean appendToCommitLog(long j, byte[] bArr) {
        if (this.shutdown) {
            log.warn("message store has shutdown, so appendToPhyQueue is forbidden");
            return false;
        }
        boolean appendData = this.commitLog.appendData(j, bArr);
        if (appendData) {
            this.reputMessageService.wakeup();
        } else {
            log.error("appendToPhyQueue failed " + j + " " + bArr.length);
        }
        return appendData;
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public void executeDeleteFilesManually() {
        this.cleanCommitLogService.excuteDeleteFilesManualy();
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public QueryMessageResult queryMessage(String str, String str2, int i, long j, long j2) {
        QueryMessageResult queryMessageResult = new QueryMessageResult();
        long j3 = j2;
        for (int i2 = 0; i2 < 3; i2++) {
            QueryOffsetResult queryOffset = this.indexService.queryOffset(str, str2, i, j, j3);
            if (queryOffset.getPhyOffsets().isEmpty()) {
                break;
            }
            Collections.sort(queryOffset.getPhyOffsets());
            queryMessageResult.setIndexLastUpdatePhyoffset(queryOffset.getIndexLastUpdatePhyoffset());
            queryMessageResult.setIndexLastUpdateTimestamp(queryOffset.getIndexLastUpdateTimestamp());
            for (int i3 = 0; i3 < queryOffset.getPhyOffsets().size(); i3++) {
                long longValue = queryOffset.getPhyOffsets().get(i3).longValue();
                try {
                    MessageExt lookMessageByOffset = lookMessageByOffset(longValue);
                    if (0 == i3) {
                        j3 = lookMessageByOffset.getStoreTimestamp();
                    }
                    if (1 != 0) {
                        SelectMappedBufferResult data = this.commitLog.getData(longValue, false);
                        if (data != null) {
                            int i4 = data.getByteBuffer().getInt(0);
                            data.getByteBuffer().limit(i4);
                            data.setSize(i4);
                            queryMessageResult.addMessage(data);
                        }
                    } else {
                        log.warn("queryMessage hash duplicate, {} {}", str, str2);
                    }
                } catch (Exception e) {
                    log.error("queryMessage exception", (Throwable) e);
                }
            }
            if (queryMessageResult.getBufferTotalSize() > 0 || j3 < j) {
                break;
            }
        }
        return queryMessageResult;
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public void updateHaMasterAddress(String str) {
        this.haService.updateMasterAddress(str);
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public long slaveFallBehindMuch() {
        return this.commitLog.getMaxOffset() - this.haService.getPush2SlaveMaxOffset().get();
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public long now() {
        return this.systemClock.now();
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public int cleanUnusedTopic(Set<String> set) {
        Iterator<Map.Entry<String, ConcurrentMap<Integer, ConsumeQueue>>> it = this.consumeQueueTable.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<String, ConcurrentMap<Integer, ConsumeQueue>> next = it.next();
            String key = next.getKey();
            if (!set.contains(key) && !key.equals(ScheduleMessageService.SCHEDULE_TOPIC)) {
                for (ConsumeQueue consumeQueue : next.getValue().values()) {
                    consumeQueue.destroy();
                    log.info("cleanUnusedTopic: {} {} ConsumeQueue cleaned", consumeQueue.getTopic(), Integer.valueOf(consumeQueue.getQueueId()));
                    this.commitLog.removeQueueFromTopicQueueTable(consumeQueue.getTopic(), consumeQueue.getQueueId());
                }
                it.remove();
                log.info("cleanUnusedTopic: {},topic destroyed", key);
            }
        }
        return 0;
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public void cleanExpiredConsumerQueue() {
        long minOffset = this.commitLog.getMinOffset();
        Iterator<Map.Entry<String, ConcurrentMap<Integer, ConsumeQueue>>> it = this.consumeQueueTable.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<String, ConcurrentMap<Integer, ConsumeQueue>> next = it.next();
            String key = next.getKey();
            if (!key.equals(ScheduleMessageService.SCHEDULE_TOPIC)) {
                ConcurrentMap<Integer, ConsumeQueue> value = next.getValue();
                Iterator<Map.Entry<Integer, ConsumeQueue>> it2 = value.entrySet().iterator();
                while (it2.hasNext()) {
                    Map.Entry<Integer, ConsumeQueue> next2 = it2.next();
                    long lastOffset = next2.getValue().getLastOffset();
                    if (lastOffset == -1) {
                        log.warn("maybe ConsumeQueue was created just now. topic={} queueId={} maxPhysicOffset={} minLogicOffset={}.", next2.getValue().getTopic(), Integer.valueOf(next2.getValue().getQueueId()), Long.valueOf(next2.getValue().getMaxPhysicOffset()), Long.valueOf(next2.getValue().getMinLogicOffset()));
                    } else if (lastOffset < minOffset) {
                        log.info("cleanExpiredConsumerQueue: {} {} consumer queue destroyed, minCommitLogOffset: {} maxCLOffsetInConsumeQueue: {}", key, next2.getKey(), Long.valueOf(minOffset), Long.valueOf(lastOffset));
                        this.commitLog.removeQueueFromTopicQueueTable(next2.getValue().getTopic(), next2.getValue().getQueueId());
                        next2.getValue().destroy();
                        it2.remove();
                    }
                }
                if (value.isEmpty()) {
                    log.info("cleanExpiredConsumerQueue: {},topic destroyed", key);
                    it.remove();
                }
            }
        }
    }

    /* JADX WARN: Type inference failed for: r0v0, types: [java.util.Map<java.lang.String, java.lang.Long>, long, java.util.Map, java.util.HashMap] */
    public Map<String, Long> getMessageIds(String str, int i, long j, long j2, SocketAddress socketAddress) {
        ?? hashMap = new HashMap();
        if (this.shutdown) {
            return hashMap;
        }
        ConsumeQueue findConsumeQueue = findConsumeQueue(str, i);
        if (findConsumeQueue != null) {
            long max = Math.max(j, findConsumeQueue.getMinOffsetInQueue());
            long min = Math.min(j2, findConsumeQueue.getMaxOffsetInQueue());
            if (min == 0) {
                return hashMap;
            }
            long j3 = max;
            while (j3 < min) {
                SelectMappedBufferResult indexBuffer = findConsumeQueue.getIndexBuffer(j3);
                if (indexBuffer == null) {
                    return hashMap;
                }
                for (int i2 = 0; i2 < indexBuffer.getSize(); i2 += 20) {
                    try {
                        j3++;
                        hashMap.put(MessageDecoder.createMessageId(ByteBuffer.allocate(16), MessageExt.socketAddress2ByteBuffer(socketAddress), indexBuffer.getByteBuffer().getLong()), Long.valueOf((long) hashMap));
                        if (j3 > min) {
                            return hashMap;
                        }
                    } finally {
                        indexBuffer.release();
                    }
                }
                indexBuffer.release();
            }
        }
        return hashMap;
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public boolean checkInDiskByConsumeOffset(String str, int i, long j) {
        SelectMappedBufferResult indexBuffer;
        long maxOffset = this.commitLog.getMaxOffset();
        ConsumeQueue findConsumeQueue = findConsumeQueue(str, i);
        if (findConsumeQueue == null || (indexBuffer = findConsumeQueue.getIndexBuffer(j)) == null) {
            return false;
        }
        try {
            if (0 >= indexBuffer.getSize()) {
                indexBuffer.release();
                return false;
            }
            int i2 = 0 + 20;
            boolean checkInDiskByCommitOffset = checkInDiskByCommitOffset(indexBuffer.getByteBuffer().getLong(), maxOffset);
            indexBuffer.release();
            return checkInDiskByCommitOffset;
        } catch (Throwable th) {
            indexBuffer.release();
            throw th;
        }
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public long dispatchBehindBytes() {
        return this.reputMessageService.behind();
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public long flush() {
        return this.commitLog.flush();
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public boolean resetWriteOffset(long j) {
        return this.commitLog.resetOffset(j);
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public long getConfirmOffset() {
        return this.commitLog.getConfirmOffset();
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public void setConfirmOffset(long j) {
        this.commitLog.setConfirmOffset(j);
    }

    public MessageExt lookMessageByOffset(long j, int i) {
        SelectMappedBufferResult message = this.commitLog.getMessage(j, i);
        if (null == message) {
            return null;
        }
        try {
            MessageExt decode = MessageDecoder.decode(message.getByteBuffer(), true, false);
            message.release();
            return decode;
        } catch (Throwable th) {
            message.release();
            throw th;
        }
    }

    public ConsumeQueue findConsumeQueue(String str, int i) {
        ConcurrentMap<Integer, ConsumeQueue> concurrentMap = this.consumeQueueTable.get(str);
        if (null == concurrentMap) {
            ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap(128);
            ConcurrentMap<Integer, ConsumeQueue> putIfAbsent = this.consumeQueueTable.putIfAbsent(str, concurrentHashMap);
            concurrentMap = putIfAbsent != null ? putIfAbsent : concurrentHashMap;
        }
        ConsumeQueue consumeQueue = concurrentMap.get(Integer.valueOf(i));
        if (null == consumeQueue) {
            ConsumeQueue consumeQueue2 = new ConsumeQueue(str, i, StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), getMessageStoreConfig().getMapedFileSizeConsumeQueue(), this);
            ConsumeQueue putIfAbsent2 = concurrentMap.putIfAbsent(Integer.valueOf(i), consumeQueue2);
            consumeQueue = putIfAbsent2 != null ? putIfAbsent2 : consumeQueue2;
        }
        return consumeQueue;
    }

    private long nextOffsetCorrection(long j, long j2) {
        long j3 = j;
        if (getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE || getMessageStoreConfig().isOffsetCheckInSlave()) {
            j3 = j2;
        }
        return j3;
    }

    private boolean checkInDiskByCommitOffset(long j, long j2) {
        return j2 - j > ((long) (((double) StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE) * (((double) this.messageStoreConfig.getAccessMessageInMemoryMaxRatio()) / 100.0d)));
    }

    private boolean isTheBatchFull(int i, int i2, int i3, int i4, boolean z) {
        if (0 == i3 || 0 == i4) {
            return false;
        }
        if (i2 <= i4) {
            return true;
        }
        return z ? i3 + i > this.messageStoreConfig.getMaxTransferBytesOnMessageInDisk() || i4 > this.messageStoreConfig.getMaxTransferCountOnMessageInDisk() - 1 : i3 + i > this.messageStoreConfig.getMaxTransferBytesOnMessageInMemory() || i4 > this.messageStoreConfig.getMaxTransferCountOnMessageInMemory() - 1;
    }

    private void deleteFile(String str) {
        log.info(str + (new File(str).delete() ? " delete OK" : " delete Failed"));
    }

    private void createTempFile() throws IOException {
        String abortFile = StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir());
        File file = new File(abortFile);
        MappedFile.ensureDirOK(file.getParent());
        log.info(abortFile + (file.createNewFile() ? " create OK" : " already exists"));
    }

    private void addScheduleTask() {
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { // from class: org.apache.rocketmq.store.DefaultMessageStore.1
            @Override // java.lang.Runnable
            public void run() {
                DefaultMessageStore.this.cleanFilesPeriodically();
            }
        }, 60000L, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS);
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { // from class: org.apache.rocketmq.store.DefaultMessageStore.2
            @Override // java.lang.Runnable
            public void run() {
                DefaultMessageStore.this.checkSelf();
            }
        }, 1L, 10L, TimeUnit.MINUTES);
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { // from class: org.apache.rocketmq.store.DefaultMessageStore.3
            @Override // java.lang.Runnable
            public void run() {
                if (DefaultMessageStore.this.getMessageStoreConfig().isDebugLockEnable()) {
                    try {
                        if (DefaultMessageStore.this.commitLog.getBeginTimeInLock() != 0) {
                            long currentTimeMillis = System.currentTimeMillis() - DefaultMessageStore.this.commitLog.getBeginTimeInLock();
                            if (currentTimeMillis > 1000 && currentTimeMillis < 10000000) {
                                MixAll.string2FileNotSafe(UtilAll.jstack(), System.getProperty("user.home") + File.separator + "debug/lock/stack-" + DefaultMessageStore.this.commitLog.getBeginTimeInLock() + "-" + currentTimeMillis);
                            }
                        }
                    } catch (Exception e) {
                    }
                }
            }
        }, 1L, 1L, TimeUnit.SECONDS);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void cleanFilesPeriodically() {
        this.cleanCommitLogService.run();
        this.cleanConsumeQueueService.run();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void checkSelf() {
        this.commitLog.checkSelf();
        Iterator<Map.Entry<String, ConcurrentMap<Integer, ConsumeQueue>>> it = this.consumeQueueTable.entrySet().iterator();
        while (it.hasNext()) {
            Iterator<Map.Entry<Integer, ConsumeQueue>> it2 = it.next().getValue().entrySet().iterator();
            while (it2.hasNext()) {
                it2.next().getValue().checkSelf();
            }
        }
    }

    private boolean isTempFileExist() {
        return new File(StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir())).exists();
    }

    private boolean loadConsumeQueue() {
        ConsumeQueue consumeQueue;
        File[] listFiles = new File(StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir())).listFiles();
        if (listFiles != null) {
            for (File file : listFiles) {
                String name = file.getName();
                File[] listFiles2 = file.listFiles();
                if (listFiles2 != null) {
                    for (File file2 : listFiles2) {
                        try {
                            int parseInt = Integer.parseInt(file2.getName());
                            consumeQueue = new ConsumeQueue(name, parseInt, StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), getMessageStoreConfig().getMapedFileSizeConsumeQueue(), this);
                            putConsumeQueue(name, parseInt, consumeQueue);
                        } catch (NumberFormatException e) {
                        }
                        if (!consumeQueue.load()) {
                            return false;
                        }
                    }
                }
            }
        }
        log.info("load logics queue all over, OK");
        return true;
    }

    private void recover(boolean z) {
        long recoverConsumeQueue = recoverConsumeQueue();
        if (z) {
            this.commitLog.recoverNormally(recoverConsumeQueue);
        } else {
            this.commitLog.recoverAbnormally(recoverConsumeQueue);
        }
        recoverTopicQueueTable();
    }

    public MessageStoreConfig getMessageStoreConfig() {
        return this.messageStoreConfig;
    }

    public TransientStorePool getTransientStorePool() {
        return this.transientStorePool;
    }

    private void putConsumeQueue(String str, int i, ConsumeQueue consumeQueue) {
        ConcurrentMap<Integer, ConsumeQueue> concurrentMap = this.consumeQueueTable.get(str);
        if (null != concurrentMap) {
            concurrentMap.put(Integer.valueOf(i), consumeQueue);
            return;
        }
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        concurrentHashMap.put(Integer.valueOf(i), consumeQueue);
        this.consumeQueueTable.put(str, concurrentHashMap);
    }

    private long recoverConsumeQueue() {
        long j = -1;
        Iterator<ConcurrentMap<Integer, ConsumeQueue>> it = this.consumeQueueTable.values().iterator();
        while (it.hasNext()) {
            for (ConsumeQueue consumeQueue : it.next().values()) {
                consumeQueue.recover();
                if (consumeQueue.getMaxPhysicOffset() > j) {
                    j = consumeQueue.getMaxPhysicOffset();
                }
            }
        }
        return j;
    }

    public void recoverTopicQueueTable() {
        HashMap<String, Long> hashMap = new HashMap<>(1024);
        long minOffset = this.commitLog.getMinOffset();
        Iterator<ConcurrentMap<Integer, ConsumeQueue>> it = this.consumeQueueTable.values().iterator();
        while (it.hasNext()) {
            for (ConsumeQueue consumeQueue : it.next().values()) {
                hashMap.put(consumeQueue.getTopic() + "-" + consumeQueue.getQueueId(), Long.valueOf(consumeQueue.getMaxOffsetInQueue()));
                consumeQueue.correctMinOffset(minOffset);
            }
        }
        this.commitLog.setTopicQueueTable(hashMap);
    }

    public AllocateMappedFileService getAllocateMappedFileService() {
        return this.allocateMappedFileService;
    }

    public StoreStatsService getStoreStatsService() {
        return this.storeStatsService;
    }

    public RunningFlags getAccessRights() {
        return this.runningFlags;
    }

    public ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>> getConsumeQueueTable() {
        return this.consumeQueueTable;
    }

    public StoreCheckpoint getStoreCheckpoint() {
        return this.storeCheckpoint;
    }

    public HAService getHaService() {
        return this.haService;
    }

    public ScheduleMessageService getScheduleMessageService() {
        return this.scheduleMessageService;
    }

    public RunningFlags getRunningFlags() {
        return this.runningFlags;
    }

    public void doDispatch(DispatchRequest dispatchRequest) {
        Iterator<CommitLogDispatcher> it = this.dispatcherList.iterator();
        while (it.hasNext()) {
            it.next().dispatch(dispatchRequest);
        }
    }

    public void putMessagePositionInfo(DispatchRequest dispatchRequest) {
        findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId()).putMessagePositionInfoWrapper(dispatchRequest);
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public BrokerStatsManager getBrokerStatsManager() {
        return this.brokerStatsManager;
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public void handleScheduleMessageService(BrokerRole brokerRole) {
        if (this.scheduleMessageService != null) {
            if (brokerRole == BrokerRole.SLAVE) {
                this.scheduleMessageService.shutdown();
            } else {
                this.scheduleMessageService.start();
            }
        }
    }

    public int remainTransientStoreBufferNumbs() {
        return this.transientStorePool.remainBufferNumbs();
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public boolean isTransientStorePoolDeficient() {
        return remainTransientStoreBufferNumbs() == 0;
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public LinkedList<CommitLogDispatcher> getDispatcherList() {
        return this.dispatcherList;
    }

    @Override // org.apache.rocketmq.store.MessageStore
    public ConsumeQueue getConsumeQueue(String str, int i) {
        ConcurrentMap<Integer, ConsumeQueue> concurrentMap = this.consumeQueueTable.get(str);
        if (concurrentMap == null) {
            return null;
        }
        return concurrentMap.get(Integer.valueOf(i));
    }

    public void unlockMappedFile(final MappedFile mappedFile) {
        this.scheduledExecutorService.schedule(new Runnable() { // from class: org.apache.rocketmq.store.DefaultMessageStore.4
            @Override // java.lang.Runnable
            public void run() {
                mappedFile.munlock();
            }
        }, 6L, TimeUnit.SECONDS);
    }
}
