/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.stream.binder.kafka.provisioning;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.CreatePartitionsResult;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.PartitionInfo;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.cloud.stream.binder.BinderException;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaAdminProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties;
import org.springframework.cloud.stream.binder.kafka.utils.KafkaTopicUtils;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.cloud.stream.provisioning.ProvisioningException;
import org.springframework.cloud.stream.provisioning.ProvisioningProvider;
import org.springframework.retry.RetryOperations;
import org.springframework.retry.RetryPolicy;
import org.springframework.retry.backoff.BackOffPolicy;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;

public class KafkaTopicProvisioner
implements ProvisioningProvider<ExtendedConsumerProperties<KafkaConsumerProperties>, ExtendedProducerProperties<KafkaProducerProperties>>,
InitializingBean {
    private static final int DEFAULT_OPERATION_TIMEOUT = 30;
    private final Log logger = LogFactory.getLog(this.getClass());
    private final KafkaBinderConfigurationProperties configurationProperties;
    private final int operationTimeout = 30;
    private final Map<String, Object> adminClientProperties;
    private RetryOperations metadataRetryOperations;

    public KafkaTopicProvisioner(KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties, KafkaProperties kafkaProperties) {
        Assert.isTrue((kafkaProperties != null ? 1 : 0) != 0, (String)"KafkaProperties cannot be null");
        this.adminClientProperties = kafkaProperties.buildAdminProperties();
        this.configurationProperties = kafkaBinderConfigurationProperties;
        this.normalalizeBootPropsWithBinder(this.adminClientProperties, kafkaProperties, kafkaBinderConfigurationProperties);
    }

    public void setMetadataRetryOperations(RetryOperations metadataRetryOperations) {
        this.metadataRetryOperations = metadataRetryOperations;
    }

    public void afterPropertiesSet() throws Exception {
        if (this.metadataRetryOperations == null) {
            RetryTemplate retryTemplate = new RetryTemplate();
            SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
            simpleRetryPolicy.setMaxAttempts(10);
            retryTemplate.setRetryPolicy((RetryPolicy)simpleRetryPolicy);
            ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
            backOffPolicy.setInitialInterval(100L);
            backOffPolicy.setMultiplier(2.0);
            backOffPolicy.setMaxInterval(1000L);
            retryTemplate.setBackOffPolicy((BackOffPolicy)backOffPolicy);
            this.metadataRetryOperations = retryTemplate;
        }
    }

    public ProducerDestination provisionProducerDestination(String name, ExtendedProducerProperties<KafkaProducerProperties> properties) {
        if (this.logger.isInfoEnabled()) {
            this.logger.info((Object)("Using kafka topic for outbound: " + name));
        }
        KafkaTopicUtils.validateTopicName(name);
        try (AdminClient adminClient = AdminClient.create(this.adminClientProperties);){
            this.createTopic(adminClient, name, properties.getPartitionCount(), false, ((KafkaProducerProperties)properties.getExtension()).getAdmin());
            int partitions = 0;
            if (this.configurationProperties.isAutoCreateTopics()) {
                DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Collections.singletonList(name));
                KafkaFuture all = describeTopicsResult.all();
                Map topicDescriptions = null;
                try {
                    this.getClass();
                    topicDescriptions = (Map)all.get(30L, TimeUnit.SECONDS);
                }
                catch (Exception e) {
                    throw new ProvisioningException("Problems encountered with partitions finding", (Throwable)e);
                }
                TopicDescription topicDescription = (TopicDescription)topicDescriptions.get(name);
                partitions = topicDescription.partitions().size();
            }
            KafkaProducerDestination kafkaProducerDestination = new KafkaProducerDestination(name, partitions);
            return kafkaProducerDestination;
        }
    }

    public ConsumerDestination provisionConsumerDestination(String name, String group, ExtendedConsumerProperties<KafkaConsumerProperties> properties) {
        KafkaConsumerDestination consumerDestination;
        block16: {
            KafkaTopicUtils.validateTopicName(name);
            boolean anonymous = !StringUtils.hasText((String)group);
            Assert.isTrue((!anonymous || !((KafkaConsumerProperties)properties.getExtension()).isEnableDlq() ? 1 : 0) != 0, (String)"DLQ support is not available for anonymous subscriptions");
            if (properties.getInstanceCount() == 0) {
                throw new IllegalArgumentException("Instance count cannot be zero");
            }
            int partitionCount = properties.getInstanceCount() * properties.getConcurrency();
            consumerDestination = new KafkaConsumerDestination(name);
            try (AdminClient adminClient = this.createAdminClient();){
                this.createTopic(adminClient, name, partitionCount, ((KafkaConsumerProperties)properties.getExtension()).isAutoRebalanceEnabled(), ((KafkaConsumerProperties)properties.getExtension()).getAdmin());
                if (!this.configurationProperties.isAutoCreateTopics()) break block16;
                DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Collections.singletonList(name));
                KafkaFuture all = describeTopicsResult.all();
                try {
                    Map topicDescriptions = (Map)all.get(30L, TimeUnit.SECONDS);
                    TopicDescription topicDescription = (TopicDescription)topicDescriptions.get(name);
                    int partitions = topicDescription.partitions().size();
                    consumerDestination = this.createDlqIfNeedBe(adminClient, name, group, properties, anonymous, partitions);
                    if (consumerDestination == null) {
                        consumerDestination = new KafkaConsumerDestination(name, partitions);
                    }
                }
                catch (Exception e) {
                    throw new ProvisioningException("provisioning exception", (Throwable)e);
                }
            }
        }
        return consumerDestination;
    }

    AdminClient createAdminClient() {
        return AdminClient.create(this.adminClientProperties);
    }

    private void normalalizeBootPropsWithBinder(Map<String, Object> adminProps, KafkaProperties bootProps, KafkaBinderConfigurationProperties binderProps) {
        String kafkaConnectionString = binderProps.getKafkaConnectionString();
        if (ObjectUtils.isEmpty((Object)adminProps.get("bootstrap.servers")) || !kafkaConnectionString.equals(binderProps.getDefaultKafkaConnectionString())) {
            adminProps.put("bootstrap.servers", kafkaConnectionString);
        }
        Map<String, String> binderProperties = binderProps.getConfiguration();
        Set adminConfigNames = AdminClientConfig.configNames();
        binderProperties.forEach((key, value) -> {
            String replaced;
            if (key.equals("bootstrap.servers")) {
                throw new IllegalStateException("Set binder bootstrap servers via the 'brokers' property, not 'configuration'");
            }
            if (adminConfigNames.contains(key) && (replaced = adminProps.put((String)key, value)) != null && this.logger.isDebugEnabled()) {
                this.logger.debug((Object)("Overrode boot property: [" + key + "], from: [" + replaced + "] to: [" + value + "]"));
            }
        });
    }

    private ConsumerDestination createDlqIfNeedBe(AdminClient adminClient, String name, String group, ExtendedConsumerProperties<KafkaConsumerProperties> properties, boolean anonymous, int partitions) {
        if (((KafkaConsumerProperties)properties.getExtension()).isEnableDlq() && !anonymous) {
            String dlqTopic = StringUtils.hasText((String)((KafkaConsumerProperties)properties.getExtension()).getDlqName()) ? ((KafkaConsumerProperties)properties.getExtension()).getDlqName() : "error." + name + "." + group;
            try {
                this.createTopicAndPartitions(adminClient, dlqTopic, partitions, ((KafkaConsumerProperties)properties.getExtension()).isAutoRebalanceEnabled(), ((KafkaConsumerProperties)properties.getExtension()).getAdmin());
            }
            catch (Throwable throwable) {
                if (throwable instanceof Error) {
                    throw (Error)throwable;
                }
                throw new ProvisioningException("provisioning exception", throwable);
            }
            return new KafkaConsumerDestination(name, partitions, dlqTopic);
        }
        return null;
    }

    private void createTopic(AdminClient adminClient, String name, int partitionCount, boolean tolerateLowerPartitionsOnBroker, KafkaAdminProperties properties) {
        try {
            this.createTopicIfNecessary(adminClient, name, partitionCount, tolerateLowerPartitionsOnBroker, properties);
        }
        catch (Throwable throwable) {
            if (throwable instanceof Error) {
                throw (Error)throwable;
            }
            throw new ProvisioningException("provisioning exception", throwable);
        }
    }

    private void createTopicIfNecessary(AdminClient adminClient, String topicName, int partitionCount, boolean tolerateLowerPartitionsOnBroker, KafkaAdminProperties properties) throws Throwable {
        if (this.configurationProperties.isAutoCreateTopics()) {
            this.createTopicAndPartitions(adminClient, topicName, partitionCount, tolerateLowerPartitionsOnBroker, properties);
        } else {
            this.logger.info((Object)"Auto creation of topics is disabled.");
        }
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    private void createTopicAndPartitions(AdminClient adminClient, String topicName, int partitionCount, boolean tolerateLowerPartitionsOnBroker, KafkaAdminProperties adminProperties) throws Throwable {
        ListTopicsResult listTopicsResult = adminClient.listTopics();
        KafkaFuture namesFutures = listTopicsResult.names();
        Set names = (Set)namesFutures.get(30L, TimeUnit.SECONDS);
        if (names.contains(topicName)) {
            int effectivePartitionCount = this.configurationProperties.isAutoAddPartitions() ? Math.max(this.configurationProperties.getMinPartitionCount(), partitionCount) : partitionCount;
            DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Collections.singletonList(topicName));
            KafkaFuture topicDescriptionsFuture = describeTopicsResult.all();
            Map topicDescriptions = (Map)topicDescriptionsFuture.get(30L, TimeUnit.SECONDS);
            TopicDescription topicDescription = (TopicDescription)topicDescriptions.get(topicName);
            int partitionSize = topicDescription.partitions().size();
            if (partitionSize >= effectivePartitionCount) return;
            if (this.configurationProperties.isAutoAddPartitions()) {
                CreatePartitionsResult partitions = adminClient.createPartitions(Collections.singletonMap(topicName, NewPartitions.increaseTo((int)effectivePartitionCount)));
                partitions.all().get(30L, TimeUnit.SECONDS);
                return;
            } else {
                if (!tolerateLowerPartitionsOnBroker) throw new ProvisioningException("The number of expected partitions was: " + partitionCount + ", but " + partitionSize + (partitionSize > 1 ? " have " : " has ") + "been found instead.Consider either increasing the partition count of the topic or enabling `autoAddPartitions`");
                this.logger.warn((Object)("The number of expected partitions was: " + partitionCount + ", but " + partitionSize + (partitionSize > 1 ? " have " : " has ") + "been found instead.There will be " + (effectivePartitionCount - partitionSize) + " idle consumers"));
            }
            return;
        } else {
            int effectivePartitionCount = Math.max(this.configurationProperties.getMinPartitionCount(), partitionCount);
            this.metadataRetryOperations.execute(context -> {
                Map<Integer, List<Integer>> replicasAssignments = adminProperties.getReplicasAssignments();
                NewTopic newTopic = replicasAssignments != null && replicasAssignments.size() > 0 ? new NewTopic(topicName, adminProperties.getReplicasAssignments()) : new NewTopic(topicName, effectivePartitionCount, adminProperties.getReplicationFactor() != null ? adminProperties.getReplicationFactor().shortValue() : this.configurationProperties.getReplicationFactor());
                if (adminProperties.getConfiguration().size() > 0) {
                    newTopic.configs(adminProperties.getConfiguration());
                }
                CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singletonList(newTopic));
                try {
                    createTopicsResult.all().get(30L, TimeUnit.SECONDS);
                }
                catch (Exception e) {
                    if (e instanceof ExecutionException) {
                        String exceptionMessage = e.getMessage();
                        if (exceptionMessage.contains("org.apache.kafka.common.errors.TopicExistsException")) {
                            if (this.logger.isWarnEnabled()) {
                                this.logger.warn((Object)("Attempt to create topic: " + topicName + ". Topic already exists."));
                            }
                        }
                        this.logger.error((Object)"Failed to create topics", e.getCause());
                        throw e.getCause();
                    }
                    this.logger.error((Object)"Failed to create topics", e.getCause());
                    throw e.getCause();
                }
                return null;
            });
        }
    }

    public Collection<PartitionInfo> getPartitionsForTopic(int partitionCount, boolean tolerateLowerPartitionsOnBroker, Callable<Collection<PartitionInfo>> callable) {
        try {
            return (Collection)this.metadataRetryOperations.execute(context -> {
                Collection partitions = (Collection)callable.call();
                int partitionSize = partitions.size();
                if (partitionSize < partitionCount) {
                    if (tolerateLowerPartitionsOnBroker) {
                        this.logger.warn((Object)("The number of expected partitions was: " + partitionCount + ", but " + partitionSize + (partitionSize > 1 ? " have " : " has ") + "been found instead.There will be " + (partitionCount - partitionSize) + " idle consumers"));
                    } else {
                        throw new IllegalStateException("The number of expected partitions was: " + partitionCount + ", but " + partitionSize + (partitionSize > 1 ? " have " : " has ") + "been found instead");
                    }
                }
                return partitions;
            });
        }
        catch (Exception e) {
            this.logger.error((Object)"Cannot initialize Binder", (Throwable)e);
            throw new BinderException("Cannot initialize binder:", (Throwable)e);
        }
    }

    private static final class KafkaConsumerDestination
    implements ConsumerDestination {
        private final String consumerDestinationName;
        private final int partitions;
        private final String dlqName;

        KafkaConsumerDestination(String consumerDestinationName) {
            this(consumerDestinationName, 0, null);
        }

        KafkaConsumerDestination(String consumerDestinationName, int partitions) {
            this(consumerDestinationName, partitions, null);
        }

        KafkaConsumerDestination(String consumerDestinationName, Integer partitions, String dlqName) {
            this.consumerDestinationName = consumerDestinationName;
            this.partitions = partitions;
            this.dlqName = dlqName;
        }

        public String getName() {
            return this.consumerDestinationName;
        }

        public String toString() {
            return "KafkaConsumerDestination{consumerDestinationName='" + this.consumerDestinationName + '\'' + ", partitions=" + this.partitions + ", dlqName='" + this.dlqName + '\'' + '}';
        }
    }

    private static final class KafkaProducerDestination
    implements ProducerDestination {
        private final String producerDestinationName;
        private final int partitions;

        KafkaProducerDestination(String destinationName, Integer partitions) {
            this.producerDestinationName = destinationName;
            this.partitions = partitions;
        }

        public String getName() {
            return this.producerDestinationName;
        }

        public String getNameForPartition(int partition) {
            return this.producerDestinationName;
        }

        public String toString() {
            return "KafkaProducerDestination{producerDestinationName='" + this.producerDestinationName + '\'' + ", partitions=" + this.partitions + '}';
        }
    }
}

