/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.AssignedTasks;
import org.apache.kafka.streams.processor.internals.ChangelogReader;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.processor.internals.ThreadMetadataProvider;
import org.slf4j.Logger;

class TaskManager {
    private final Logger log;
    private final AssignedTasks active;
    private final AssignedTasks standby;
    private final ChangelogReader changelogReader;
    private final String logPrefix;
    private final Consumer<byte[], byte[]> restoreConsumer;
    private final StreamThread.AbstractTaskCreator taskCreator;
    private final StreamThread.AbstractTaskCreator standbyTaskCreator;
    private ThreadMetadataProvider threadMetadataProvider;
    private Consumer<byte[], byte[]> consumer;

    TaskManager(ChangelogReader changelogReader, String logPrefix, Consumer<byte[], byte[]> restoreConsumer, StreamThread.AbstractTaskCreator taskCreator, StreamThread.AbstractTaskCreator standbyTaskCreator, AssignedTasks active, AssignedTasks standby) {
        this.changelogReader = changelogReader;
        this.logPrefix = logPrefix;
        this.restoreConsumer = restoreConsumer;
        this.taskCreator = taskCreator;
        this.standbyTaskCreator = standbyTaskCreator;
        this.active = active;
        this.standby = standby;
        LogContext logContext = new LogContext(logPrefix);
        this.log = logContext.logger(this.getClass());
    }

    void createTasks(Collection<TopicPartition> assignment) {
        if (this.threadMetadataProvider == null) {
            throw new IllegalStateException(this.logPrefix + "taskIdProvider has not been initialized while adding stream tasks. This should not happen.");
        }
        if (this.consumer == null) {
            throw new IllegalStateException(this.logPrefix + "consumer has not been initialized while adding stream tasks. This should not happen.");
        }
        this.changelogReader.reset();
        this.standby.closeNonAssignedSuspendedTasks(this.threadMetadataProvider.standbyTasks());
        Map<TaskId, Set<TopicPartition>> assignedActiveTasks = this.threadMetadataProvider.activeTasks();
        this.active.closeNonAssignedSuspendedTasks(assignedActiveTasks);
        this.addStreamTasks(assignment);
        this.addStandbyTasks();
        Set<TopicPartition> partitions = this.active.uninitializedPartitions();
        this.log.trace("pausing partitions: {}", partitions);
        this.consumer.pause(partitions);
    }

    void setThreadMetadataProvider(ThreadMetadataProvider threadMetadataProvider) {
        this.threadMetadataProvider = threadMetadataProvider;
    }

    private void addStreamTasks(Collection<TopicPartition> assignment) {
        Map<TaskId, Set<TopicPartition>> assignedTasks = this.threadMetadataProvider.activeTasks();
        if (assignedTasks.isEmpty()) {
            return;
        }
        HashMap<TaskId, Set<TopicPartition>> newTasks = new HashMap<TaskId, Set<TopicPartition>>();
        this.log.debug("Adding assigned tasks as active: {}", assignedTasks);
        for (Map.Entry<TaskId, Set<TopicPartition>> entry : assignedTasks.entrySet()) {
            TaskId taskId = entry.getKey();
            Set<TopicPartition> partitions = entry.getValue();
            if (assignment.containsAll(partitions)) {
                try {
                    if (this.active.maybeResumeSuspendedTask(taskId, partitions)) continue;
                    newTasks.put(taskId, partitions);
                    continue;
                }
                catch (StreamsException e) {
                    this.log.error("Failed to resume an active task {} due to the following error:", (Object)taskId, (Object)e);
                    throw e;
                }
            }
            this.log.warn("Task {} owned partitions {} are not contained in the assignment {}", new Object[]{taskId, partitions, assignment});
        }
        if (newTasks.isEmpty()) {
            return;
        }
        this.log.trace("New active tasks to be created: {}", newTasks);
        for (Task task : this.taskCreator.createTasks(this.consumer, newTasks)) {
            this.active.addNewTask(task);
        }
    }

    private void addStandbyTasks() {
        Map<TaskId, Set<TopicPartition>> assignedStandbyTasks = this.threadMetadataProvider.standbyTasks();
        if (assignedStandbyTasks.isEmpty()) {
            return;
        }
        this.log.debug("Adding assigned standby tasks {}", assignedStandbyTasks);
        HashMap<TaskId, Set<TopicPartition>> newStandbyTasks = new HashMap<TaskId, Set<TopicPartition>>();
        for (Map.Entry<TaskId, Set<TopicPartition>> entry : assignedStandbyTasks.entrySet()) {
            Set<TopicPartition> partitions;
            TaskId taskId = entry.getKey();
            if (this.standby.maybeResumeSuspendedTask(taskId, partitions = entry.getValue())) continue;
            newStandbyTasks.put(taskId, partitions);
        }
        if (newStandbyTasks.isEmpty()) {
            return;
        }
        this.log.trace("New standby tasks to be created: {}", newStandbyTasks);
        for (Task task : this.standbyTaskCreator.createTasks(this.consumer, newStandbyTasks)) {
            this.standby.addNewTask(task);
        }
    }

    Set<TaskId> activeTaskIds() {
        return this.active.allAssignedTaskIds();
    }

    Set<TaskId> standbyTaskIds() {
        return this.standby.allAssignedTaskIds();
    }

    Set<TaskId> prevActiveTaskIds() {
        return this.active.previousTaskIds();
    }

    void suspendTasksAndState() {
        this.log.debug("Suspending all active tasks {} and standby tasks {}", this.active.runningTaskIds(), this.standby.runningTaskIds());
        AtomicReference<Object> firstException = new AtomicReference<Object>(null);
        firstException.compareAndSet(null, this.active.suspend());
        firstException.compareAndSet(null, this.standby.suspend());
        this.restoreConsumer.assign(Collections.emptyList());
        Exception exception = firstException.get();
        if (exception != null) {
            throw new StreamsException(this.logPrefix + "failed to suspend stream tasks", exception);
        }
    }

    void shutdown(boolean clean) {
        this.log.debug("Shutting down all active tasks {}, standby tasks {}, suspended tasks {}, and suspended standby tasks {}", new Object[]{this.active.runningTaskIds(), this.standby.runningTaskIds(), this.active.previousTaskIds(), this.standby.previousTaskIds()});
        this.active.close(clean);
        this.standby.close(clean);
        try {
            this.threadMetadataProvider.close();
        }
        catch (Throwable e) {
            this.log.error("Failed to close KafkaStreamClient due to the following error:", e);
        }
        this.restoreConsumer.assign(Collections.emptyList());
        this.taskCreator.close();
        this.standbyTaskCreator.close();
    }

    Set<TaskId> suspendedActiveTaskIds() {
        return this.active.previousTaskIds();
    }

    Set<TaskId> suspendedStandbyTaskIds() {
        return this.standby.previousTaskIds();
    }

    Task activeTask(TopicPartition partition) {
        return this.active.runningTaskFor(partition);
    }

    Task standbyTask(TopicPartition partition) {
        return this.standby.runningTaskFor(partition);
    }

    Map<TaskId, Task> activeTasks() {
        return this.active.runningTaskMap();
    }

    Map<TaskId, Task> standbyTasks() {
        return this.standby.runningTaskMap();
    }

    void setConsumer(Consumer<byte[], byte[]> consumer) {
        this.consumer = consumer;
    }

    boolean updateNewAndRestoringTasks() {
        Set<TopicPartition> resumed = this.active.initializeNewTasks();
        this.standby.initializeNewTasks();
        Collection<TopicPartition> restored = this.changelogReader.restore(this.active);
        resumed.addAll(this.active.updateRestored(restored));
        if (!resumed.isEmpty()) {
            this.log.trace("resuming partitions {}", resumed);
            this.consumer.resume(resumed);
        }
        if (this.active.allTasksRunning()) {
            this.assignStandbyPartitions();
            return true;
        }
        return false;
    }

    boolean hasActiveRunningTasks() {
        return this.active.hasRunningTasks();
    }

    boolean hasStandbyRunningTasks() {
        return this.standby.hasRunningTasks();
    }

    private void assignStandbyPartitions() {
        Collection<Task> running = this.standby.running();
        HashMap<TopicPartition, Long> checkpointedOffsets = new HashMap<TopicPartition, Long>();
        for (Task task : running) {
            checkpointedOffsets.putAll(task.checkpointedOffsets());
        }
        this.restoreConsumer.assign(checkpointedOffsets.keySet());
        for (Map.Entry entry : checkpointedOffsets.entrySet()) {
            TopicPartition partition = (TopicPartition)entry.getKey();
            long offset = (Long)entry.getValue();
            if (offset >= 0L) {
                this.restoreConsumer.seek(partition, offset);
                continue;
            }
            this.restoreConsumer.seekToBeginning(Collections.singleton(partition));
        }
    }

    int commitAll() {
        int committed = this.active.commit();
        return committed + this.standby.commit();
    }

    int process() {
        return this.active.process();
    }

    int punctuate() {
        return this.active.punctuate();
    }

    int maybeCommitActiveTasks() {
        return this.active.maybeCommit();
    }

    public String toString(String indent) {
        StringBuilder builder = new StringBuilder();
        builder.append(indent).append("\tActive tasks:\n");
        builder.append(this.active.toString(indent + "\t\t"));
        builder.append(indent).append("\tStandby tasks:\n");
        builder.append(this.standby.toString(indent + "\t\t"));
        return builder.toString();
    }
}

