/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.internals.InternalTopicConfig;
import org.apache.kafka.streams.processor.internals.StreamsKafkaClient;
import org.slf4j.Logger;

public class InternalTopicManager {
    static final Long WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT = TimeUnit.MILLISECONDS.convert(1L, TimeUnit.DAYS);
    public static final String CLEANUP_POLICY_PROP = "cleanup.policy";
    public static final String RETENTION_MS = "retention.ms";
    private static final int MAX_TOPIC_READY_TRY = 5;
    private final Logger log;
    private final Time time;
    private final long windowChangeLogAdditionalRetention;
    private final int replicationFactor;
    private final StreamsKafkaClient streamsKafkaClient;

    public InternalTopicManager(StreamsKafkaClient streamsKafkaClient, int replicationFactor, long windowChangeLogAdditionalRetention, Time time) {
        this.streamsKafkaClient = streamsKafkaClient;
        this.replicationFactor = replicationFactor;
        this.windowChangeLogAdditionalRetention = windowChangeLogAdditionalRetention;
        this.time = time;
        LogContext logContext = new LogContext(String.format("stream-thread [%s] ", Thread.currentThread().getName()));
        this.log = logContext.logger(this.getClass());
    }

    public void makeReady(Map<InternalTopicConfig, Integer> topics) {
        for (int i = 0; i < 5; ++i) {
            try {
                MetadataResponse metadata = this.streamsKafkaClient.fetchMetadata();
                Map<String, Integer> existingTopicPartitions = this.fetchExistingPartitionCountByTopic(metadata);
                Map<InternalTopicConfig, Integer> topicsToBeCreated = this.validateTopicPartitions(topics, existingTopicPartitions);
                if (topicsToBeCreated.size() > 0) {
                    if (metadata.brokers().size() < this.replicationFactor) {
                        throw new StreamsException("Found only " + metadata.brokers().size() + " brokers, " + " but replication factor is " + this.replicationFactor + "." + " Decrease replication factor for internal topics via StreamsConfig parameter \"replication.factor\"" + " or add more brokers to your cluster.");
                    }
                    this.streamsKafkaClient.createTopics(topicsToBeCreated, this.replicationFactor, this.windowChangeLogAdditionalRetention, metadata);
                }
                return;
            }
            catch (StreamsException ex) {
                this.log.warn("Could not create internal topics: {} Retry #{}", (Object)ex.getMessage(), (Object)i);
                this.time.sleep(100L);
                continue;
            }
        }
        throw new StreamsException("Could not create internal topics.");
    }

    public Map<String, Integer> getNumPartitions(Set<String> topics) {
        for (int i = 0; i < 5; ++i) {
            try {
                MetadataResponse metadata = this.streamsKafkaClient.fetchMetadata();
                Map<String, Integer> existingTopicPartitions = this.fetchExistingPartitionCountByTopic(metadata);
                existingTopicPartitions.keySet().retainAll(topics);
                return existingTopicPartitions;
            }
            catch (StreamsException ex) {
                this.log.warn("Could not get number of partitions: {} Retry #{}", (Object)ex.getMessage(), (Object)i);
                this.time.sleep(100L);
                continue;
            }
        }
        throw new StreamsException("Could not get number of partitions.");
    }

    public void close() {
        try {
            this.streamsKafkaClient.close();
        }
        catch (IOException e) {
            this.log.warn("Could not close StreamsKafkaClient.");
        }
    }

    private Map<InternalTopicConfig, Integer> validateTopicPartitions(Map<InternalTopicConfig, Integer> topicsPartitionsMap, Map<String, Integer> existingTopicNamesPartitions) {
        HashMap<InternalTopicConfig, Integer> topicsToBeCreated = new HashMap<InternalTopicConfig, Integer>();
        for (Map.Entry<InternalTopicConfig, Integer> entry : topicsPartitionsMap.entrySet()) {
            InternalTopicConfig topic = entry.getKey();
            Integer partition = entry.getValue();
            if (existingTopicNamesPartitions.containsKey(topic.name())) {
                if (existingTopicNamesPartitions.get(topic.name()).equals(partition)) continue;
                throw new StreamsException("Existing internal topic " + topic.name() + " has invalid partitions." + " Expected: " + partition + " Actual: " + existingTopicNamesPartitions.get(topic.name()) + ". Use 'kafka.tools.StreamsResetter' tool to clean up invalid topics before processing.");
            }
            topicsToBeCreated.put(topic, partition);
        }
        return topicsToBeCreated;
    }

    private Map<String, Integer> fetchExistingPartitionCountByTopic(MetadataResponse metadata) {
        HashMap<String, Integer> existingPartitionCountByTopic = new HashMap<String, Integer>();
        Collection topicsMetadata = metadata.topicMetadata();
        for (MetadataResponse.TopicMetadata topicMetadata : topicsMetadata) {
            existingPartitionCountByTopic.put(topicMetadata.topic(), topicMetadata.partitionMetadata().size());
        }
        return existingPartitionCountByTopic;
    }
}

