/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ChangelogReader;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StateManager;
import org.apache.kafka.streams.processor.internals.StateRestorer;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProcessorStateManager
implements StateManager {
    private static final Logger log = LoggerFactory.getLogger(ProcessorStateManager.class);
    public static final String STATE_CHANGELOG_TOPIC_SUFFIX = "-changelog";
    static final String CHECKPOINT_FILE_NAME = ".checkpoint";
    private final File baseDir;
    private final TaskId taskId;
    private final String logPrefix;
    private final boolean isStandby;
    private final StateDirectory stateDirectory;
    private final ChangelogReader changelogReader;
    private final Map<String, StateStore> stores;
    private final Map<String, StateStore> globalStores;
    private final Map<TopicPartition, Long> offsetLimits;
    private final Map<TopicPartition, Long> restoredOffsets;
    private final Map<TopicPartition, Long> checkpointedOffsets;
    private final Map<String, StateRestoreCallback> restoreCallbacks;
    private final Map<String, String> storeToChangelogTopic;
    private final boolean eosEnabled;
    private final Map<String, TopicPartition> partitionForTopic;
    private OffsetCheckpoint checkpoint;

    public ProcessorStateManager(TaskId taskId, Collection<TopicPartition> sources, boolean isStandby, StateDirectory stateDirectory, Map<String, String> storeToChangelogTopic, ChangelogReader changelogReader, boolean eosEnabled) throws LockException, IOException {
        this.taskId = taskId;
        this.stateDirectory = stateDirectory;
        this.changelogReader = changelogReader;
        this.logPrefix = String.format("task [%s]", taskId);
        this.partitionForTopic = new HashMap<String, TopicPartition>();
        for (TopicPartition source : sources) {
            this.partitionForTopic.put(source.topic(), source);
        }
        this.stores = new LinkedHashMap<String, StateStore>();
        this.globalStores = new HashMap<String, StateStore>();
        this.offsetLimits = new HashMap<TopicPartition, Long>();
        this.restoredOffsets = new HashMap<TopicPartition, Long>();
        this.isStandby = isStandby;
        this.restoreCallbacks = isStandby ? new HashMap() : null;
        this.storeToChangelogTopic = storeToChangelogTopic;
        this.eosEnabled = eosEnabled;
        if (!stateDirectory.lock(taskId, 5)) {
            throw new LockException(String.format("%s Failed to lock the state directory for task %s", this.logPrefix, taskId));
        }
        try {
            this.baseDir = stateDirectory.directoryForTask(taskId);
        }
        catch (ProcessorStateException e) {
            throw new LockException(String.format("%s Failed to get the directory for task %s. Exception %s", new Object[]{this.logPrefix, taskId, e}));
        }
        this.checkpoint = new OffsetCheckpoint(new File(this.baseDir, CHECKPOINT_FILE_NAME));
        this.checkpointedOffsets = new HashMap<TopicPartition, Long>(this.checkpoint.read());
        if (eosEnabled) {
            this.checkpoint.delete();
            this.checkpoint = null;
        }
        log.info("{} Created state store manager for task {} with the acquired state dir lock", (Object)this.logPrefix, (Object)taskId);
    }

    public static String storeChangelogTopic(String applicationId, String storeName) {
        return applicationId + "-" + storeName + STATE_CHANGELOG_TOPIC_SUFFIX;
    }

    @Override
    public File baseDir() {
        return this.baseDir;
    }

    @Override
    public void register(StateStore store, boolean loggingEnabled, StateRestoreCallback stateRestoreCallback) {
        log.debug("{} Registering state store {} to its state manager", (Object)this.logPrefix, (Object)store.name());
        if (store.name().equals(CHECKPOINT_FILE_NAME)) {
            throw new IllegalArgumentException(String.format("%s Illegal store name: %s", this.logPrefix, CHECKPOINT_FILE_NAME));
        }
        if (this.stores.containsKey(store.name())) {
            throw new IllegalArgumentException(String.format("%s Store %s has already been registered.", this.logPrefix, store.name()));
        }
        String topic = this.storeToChangelogTopic.get(store.name());
        if (topic == null) {
            this.stores.put(store.name(), store);
            return;
        }
        TopicPartition storePartition = new TopicPartition(topic, this.getPartition(topic));
        this.changelogReader.validatePartitionExists(storePartition, store.name());
        if (this.isStandby) {
            if (store.persistent()) {
                log.trace("{} Preparing standby replica of persistent state store {} with changelog topic {}", new Object[]{this.logPrefix, store.name(), topic});
                this.restoreCallbacks.put(topic, stateRestoreCallback);
            }
        } else {
            log.trace("{} Restoring state store {} from changelog topic {}", new Object[]{this.logPrefix, store.name(), topic});
            StateRestorer restorer = new StateRestorer(storePartition, stateRestoreCallback, this.checkpointedOffsets.get(storePartition), this.offsetLimit(storePartition), store.persistent());
            this.changelogReader.register(restorer);
        }
        this.stores.put(store.name(), store);
    }

    @Override
    public Map<TopicPartition, Long> checkpointed() {
        HashMap<TopicPartition, Long> partitionsAndOffsets = new HashMap<TopicPartition, Long>();
        for (Map.Entry<String, StateRestoreCallback> entry : this.restoreCallbacks.entrySet()) {
            int partition;
            String topicName = entry.getKey();
            TopicPartition storePartition = new TopicPartition(topicName, partition = this.getPartition(topicName));
            if (this.checkpointedOffsets.containsKey(storePartition)) {
                partitionsAndOffsets.put(storePartition, this.checkpointedOffsets.get(storePartition));
                continue;
            }
            partitionsAndOffsets.put(storePartition, -1L);
        }
        return partitionsAndOffsets;
    }

    List<ConsumerRecord<byte[], byte[]>> updateStandbyStates(TopicPartition storePartition, List<ConsumerRecord<byte[], byte[]>> records) {
        long limit = this.offsetLimit(storePartition);
        ArrayList<ConsumerRecord<byte[], byte[]>> remainingRecords = null;
        StateRestoreCallback restoreCallback = this.restoreCallbacks.get(storePartition.topic());
        long lastOffset = -1L;
        int count = 0;
        for (ConsumerRecord<byte[], byte[]> record : records) {
            if (record.offset() < limit) {
                try {
                    restoreCallback.restore((byte[])record.key(), (byte[])record.value());
                }
                catch (Exception e) {
                    throw new ProcessorStateException(String.format("%s exception caught while trying to restore state from %s", this.logPrefix, storePartition), e);
                }
                lastOffset = record.offset();
            } else {
                if (remainingRecords == null) {
                    remainingRecords = new ArrayList<ConsumerRecord<byte[], byte[]>>(records.size() - count);
                }
                remainingRecords.add(record);
            }
            ++count;
        }
        this.restoredOffsets.put(storePartition, lastOffset + 1L);
        return remainingRecords;
    }

    void putOffsetLimit(TopicPartition partition, long limit) {
        log.trace("{} Updating store offset limit for partition {} to {}", new Object[]{this.logPrefix, partition, limit});
        this.offsetLimits.put(partition, limit);
    }

    private long offsetLimit(TopicPartition partition) {
        Long limit = this.offsetLimits.get(partition);
        return limit != null ? limit : Long.MAX_VALUE;
    }

    @Override
    public StateStore getStore(String name) {
        return this.stores.get(name);
    }

    @Override
    public void flush() {
        if (!this.stores.isEmpty()) {
            log.debug("{} Flushing all stores registered in the state manager", (Object)this.logPrefix);
            for (StateStore store : this.stores.values()) {
                try {
                    log.trace("{} Flushing store={}", (Object)this.logPrefix, (Object)store.name());
                    store.flush();
                }
                catch (Exception e) {
                    throw new ProcessorStateException(String.format("%s Failed to flush state store %s", this.logPrefix, store.name()), e);
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close(Map<TopicPartition, Long> ackedOffsets) throws ProcessorStateException {
        ProcessorStateException firstException;
        block13: {
            firstException = null;
            try {
                if (this.stores.isEmpty()) break block13;
                log.debug("{} Closing its state manager and all the registered state stores", (Object)this.logPrefix);
                for (Map.Entry<String, StateStore> entry : this.stores.entrySet()) {
                    log.debug("{} Closing storage engine {}", (Object)this.logPrefix, (Object)entry.getKey());
                    try {
                        entry.getValue().close();
                    }
                    catch (Exception e) {
                        if (firstException == null) {
                            firstException = new ProcessorStateException(String.format("%s Failed to close state store %s", this.logPrefix, entry.getKey()), e);
                        }
                        log.error("{} Failed to close state store {}: ", new Object[]{this.logPrefix, entry.getKey(), e});
                    }
                }
                if (ackedOffsets == null) break block13;
                this.checkpoint(ackedOffsets);
            }
            catch (Throwable throwable) {
                try {
                    this.stateDirectory.unlock(this.taskId);
                }
                catch (IOException e) {
                    if (firstException == null) {
                        firstException = new ProcessorStateException(String.format("%s Failed to release state dir lock", this.logPrefix), e);
                    }
                    log.error("{} Failed to release state dir lock: ", (Object)this.logPrefix, (Object)e);
                }
                throw throwable;
            }
        }
        try {
            this.stateDirectory.unlock(this.taskId);
        }
        catch (IOException e) {
            if (firstException == null) {
                firstException = new ProcessorStateException(String.format("%s Failed to release state dir lock", this.logPrefix), e);
            }
            log.error("{} Failed to release state dir lock: ", (Object)this.logPrefix, (Object)e);
        }
        if (firstException != null) {
            throw firstException;
        }
    }

    @Override
    public void checkpoint(Map<TopicPartition, Long> ackedOffsets) {
        log.trace("{} Writing checkpoint: {}", (Object)this.logPrefix, ackedOffsets);
        this.checkpointedOffsets.putAll(this.changelogReader.restoredOffsets());
        for (Map.Entry<String, StateStore> entry : this.stores.entrySet()) {
            String storeName = entry.getKey();
            if (!entry.getValue().persistent() || !this.storeToChangelogTopic.containsKey(storeName)) continue;
            String changelogTopic = this.storeToChangelogTopic.get(storeName);
            TopicPartition topicPartition = new TopicPartition(changelogTopic, this.getPartition(storeName));
            if (ackedOffsets.containsKey(topicPartition)) {
                this.checkpointedOffsets.put(topicPartition, ackedOffsets.get(topicPartition) + 1L);
                continue;
            }
            if (!this.restoredOffsets.containsKey(topicPartition)) continue;
            this.checkpointedOffsets.put(topicPartition, this.restoredOffsets.get(topicPartition));
        }
        try {
            if (this.checkpoint == null) {
                this.checkpoint = new OffsetCheckpoint(new File(this.baseDir, CHECKPOINT_FILE_NAME));
            }
            this.checkpoint.write(this.checkpointedOffsets);
        }
        catch (IOException e) {
            log.warn("Failed to write checkpoint file to {}:", (Object)new File(this.baseDir, CHECKPOINT_FILE_NAME), (Object)e);
        }
    }

    private int getPartition(String topic) {
        TopicPartition partition = this.partitionForTopic.get(topic);
        return partition == null ? this.taskId.partition : partition.partition();
    }

    void registerGlobalStateStores(List<StateStore> stateStores) {
        log.info("{} Register global stores {}", (Object)this.logPrefix, stateStores);
        for (StateStore stateStore : stateStores) {
            this.globalStores.put(stateStore.name(), stateStore);
        }
    }

    @Override
    public StateStore getGlobalStore(String name) {
        return this.globalStores.get(name);
    }
}

