package org.apache.kafka.streams.processor.internals;

import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/GlobalStreamThread.class */
public class GlobalStreamThread extends Thread {
    private static final Logger log = LoggerFactory.getLogger(GlobalStreamThread.class);
    private final StreamsConfig config;
    private final Consumer<byte[], byte[]> consumer;
    private final StateDirectory stateDirectory;
    private final Time time;
    private final ThreadCache cache;
    private final StreamsMetrics streamsMetrics;
    private final ProcessorTopology topology;
    private volatile boolean running;
    private volatile StreamsException startupException;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/GlobalStreamThread$StateConsumer.class */
    public static class StateConsumer {
        private final Consumer<byte[], byte[]> consumer;
        private final GlobalStateMaintainer stateMaintainer;
        private final Time time;
        private final long pollMs;
        private final long flushInterval;
        private long lastFlush;

        StateConsumer(Consumer<byte[], byte[]> consumer, GlobalStateMaintainer globalStateMaintainer, Time time, long j, long j2) {
            this.consumer = consumer;
            this.stateMaintainer = globalStateMaintainer;
            this.time = time;
            this.pollMs = j;
            this.flushInterval = j2;
        }

        void initialize() {
            Map<TopicPartition, Long> initialize = this.stateMaintainer.initialize();
            this.consumer.assign(initialize.keySet());
            for (Map.Entry<TopicPartition, Long> entry : initialize.entrySet()) {
                this.consumer.seek(entry.getKey(), entry.getValue().longValue());
            }
            this.lastFlush = this.time.milliseconds();
        }

        void pollAndUpdate() {
            Iterator it = this.consumer.poll(this.pollMs).iterator();
            while (it.hasNext()) {
                this.stateMaintainer.update((ConsumerRecord) it.next());
            }
            long milliseconds = this.time.milliseconds();
            if (this.flushInterval < 0 || milliseconds < this.lastFlush + this.flushInterval) {
                return;
            }
            this.stateMaintainer.flushState();
            this.lastFlush = milliseconds;
        }

        public void close() throws IOException {
            try {
                this.consumer.close();
            } catch (Exception e) {
                GlobalStreamThread.log.error("Failed to cleanly close GlobalStreamThread consumer", e);
            }
            this.stateMaintainer.close();
        }
    }

    public GlobalStreamThread(ProcessorTopology processorTopology, StreamsConfig streamsConfig, Consumer<byte[], byte[]> consumer, StateDirectory stateDirectory, Metrics metrics, Time time, String str) {
        super(str);
        this.running = false;
        this.time = time;
        this.config = streamsConfig;
        this.topology = processorTopology;
        this.consumer = consumer;
        this.stateDirectory = stateDirectory;
        long max = Math.max(0L, streamsConfig.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG).longValue() / (streamsConfig.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG).intValue() + 1));
        this.streamsMetrics = new StreamsMetricsImpl(metrics, str, Collections.singletonMap("client-id", str));
        this.cache = new ThreadCache(str, max, this.streamsMetrics);
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        StateConsumer initialize = initialize();
        if (initialize == null) {
            return;
        }
        while (this.running) {
            try {
                initialize.pollAndUpdate();
            } finally {
                try {
                    initialize.close();
                } catch (IOException e) {
                    log.error("Failed to cleanly shutdown GlobalStreamThread", e);
                }
            }
        }
        log.debug("Shutting down GlobalStreamThread at user request");
    }

    private StateConsumer initialize() {
        try {
            GlobalStateManagerImpl globalStateManagerImpl = new GlobalStateManagerImpl(this.topology, this.consumer, this.stateDirectory);
            StateConsumer stateConsumer = new StateConsumer(this.consumer, new GlobalStateUpdateTask(this.topology, new GlobalProcessorContextImpl(this.config, globalStateManagerImpl, this.streamsMetrics, this.cache), globalStateManagerImpl), this.time, this.config.getLong(StreamsConfig.POLL_MS_CONFIG).longValue(), this.config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG).longValue());
            stateConsumer.initialize();
            this.running = true;
            return stateConsumer;
        } catch (Exception e) {
            this.startupException = new StreamsException("Exception caught during initialization of GlobalStreamThread", e);
            return null;
        } catch (StreamsException e2) {
            this.startupException = e2;
            return null;
        }
    }

    @Override // java.lang.Thread
    public synchronized void start() {
        super.start();
        while (!this.running) {
            Utils.sleep(1L);
            if (this.startupException != null) {
                throw this.startupException;
            }
        }
    }

    public void close() {
        this.running = false;
    }

    public boolean stillRunning() {
        return this.running;
    }
}
