/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.stream.binder.kafka.provisioning;

import java.util.Collection;
import java.util.Properties;
import java.util.concurrent.Callable;
import kafka.common.ErrorMapping;
import kafka.utils.ZkUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.security.JaasUtils;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.cloud.stream.binder.BinderException;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.kafka.admin.AdminUtilsOperation;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties;
import org.springframework.cloud.stream.binder.kafka.utils.KafkaTopicUtils;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.cloud.stream.provisioning.ProvisioningException;
import org.springframework.cloud.stream.provisioning.ProvisioningProvider;
import org.springframework.retry.RetryCallback;
import org.springframework.retry.RetryContext;
import org.springframework.retry.RetryOperations;
import org.springframework.retry.RetryPolicy;
import org.springframework.retry.backoff.BackOffPolicy;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

public class KafkaTopicProvisioner
implements ProvisioningProvider<ExtendedConsumerProperties<KafkaConsumerProperties>, ExtendedProducerProperties<KafkaProducerProperties>>,
InitializingBean {
    private final Log logger = LogFactory.getLog(this.getClass());
    private final KafkaBinderConfigurationProperties configurationProperties;
    private final AdminUtilsOperation adminUtilsOperation;
    private RetryOperations metadataRetryOperations;

    public KafkaTopicProvisioner(KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties, AdminUtilsOperation adminUtilsOperation) {
        this.configurationProperties = kafkaBinderConfigurationProperties;
        this.adminUtilsOperation = adminUtilsOperation;
    }

    public void setMetadataRetryOperations(RetryOperations metadataRetryOperations) {
        this.metadataRetryOperations = metadataRetryOperations;
    }

    public void afterPropertiesSet() throws Exception {
        if (this.metadataRetryOperations == null) {
            RetryTemplate retryTemplate = new RetryTemplate();
            SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
            simpleRetryPolicy.setMaxAttempts(10);
            retryTemplate.setRetryPolicy((RetryPolicy)simpleRetryPolicy);
            ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
            backOffPolicy.setInitialInterval(100L);
            backOffPolicy.setMultiplier(2.0);
            backOffPolicy.setMaxInterval(1000L);
            retryTemplate.setBackOffPolicy((BackOffPolicy)backOffPolicy);
            this.metadataRetryOperations = retryTemplate;
        }
    }

    public ProducerDestination provisionProducerDestination(String name, ExtendedProducerProperties<KafkaProducerProperties> properties) {
        if (this.logger.isInfoEnabled()) {
            this.logger.info((Object)("Using kafka topic for outbound: " + name));
        }
        KafkaTopicUtils.validateTopicName(name);
        this.createTopicsIfAutoCreateEnabledAndAdminUtilsPresent(name, properties.getPartitionCount());
        if (this.configurationProperties.isAutoCreateTopics() && this.adminUtilsOperation != null) {
            ZkUtils zkUtils = ZkUtils.apply((String)this.configurationProperties.getZkConnectionString(), (int)this.configurationProperties.getZkSessionTimeout(), (int)this.configurationProperties.getZkConnectionTimeout(), (boolean)JaasUtils.isZkSecurityEnabled());
            int partitions = this.adminUtilsOperation.partitionSize(name, zkUtils);
            return new KafkaProducerDestination(name, partitions);
        }
        return new KafkaProducerDestination(name);
    }

    public ConsumerDestination provisionConsumerDestination(String name, String group, ExtendedConsumerProperties<KafkaConsumerProperties> properties) {
        KafkaTopicUtils.validateTopicName(name);
        boolean anonymous = !StringUtils.hasText((String)group);
        Assert.isTrue((!anonymous || !((KafkaConsumerProperties)properties.getExtension()).isEnableDlq() ? 1 : 0) != 0, (String)"DLQ support is not available for anonymous subscriptions");
        if (properties.getInstanceCount() == 0) {
            throw new IllegalArgumentException("Instance count cannot be zero");
        }
        int partitionCount = properties.getInstanceCount() * properties.getConcurrency();
        this.createTopicsIfAutoCreateEnabledAndAdminUtilsPresent(name, partitionCount);
        if (this.configurationProperties.isAutoCreateTopics() && this.adminUtilsOperation != null) {
            ZkUtils zkUtils = ZkUtils.apply((String)this.configurationProperties.getZkConnectionString(), (int)this.configurationProperties.getZkSessionTimeout(), (int)this.configurationProperties.getZkConnectionTimeout(), (boolean)JaasUtils.isZkSecurityEnabled());
            int partitions = this.adminUtilsOperation.partitionSize(name, zkUtils);
            if (((KafkaConsumerProperties)properties.getExtension()).isEnableDlq() && !anonymous) {
                String dlqTopic = StringUtils.hasText((String)((KafkaConsumerProperties)properties.getExtension()).getDlqName()) ? ((KafkaConsumerProperties)properties.getExtension()).getDlqName() : "error." + name + "." + group;
                this.createTopicAndPartitions(dlqTopic, partitions);
                return new KafkaConsumerDestination(name, partitions, dlqTopic);
            }
            return new KafkaConsumerDestination(name, partitions);
        }
        return new KafkaConsumerDestination(name);
    }

    private void createTopicsIfAutoCreateEnabledAndAdminUtilsPresent(String topicName, int partitionCount) {
        if (this.configurationProperties.isAutoCreateTopics() && this.adminUtilsOperation != null) {
            this.createTopicAndPartitions(topicName, partitionCount);
        } else if (this.configurationProperties.isAutoCreateTopics() && this.adminUtilsOperation == null) {
            this.logger.warn((Object)"Auto creation of topics is enabled, but Kafka AdminUtils class is not present on the classpath. No topic will be created by the binder");
        } else if (!this.configurationProperties.isAutoCreateTopics()) {
            this.logger.info((Object)"Auto creation of topics is disabled.");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void createTopicAndPartitions(final String topicName, int partitionCount) {
        block6: {
            try (final ZkUtils zkUtils = ZkUtils.apply((String)this.configurationProperties.getZkConnectionString(), (int)this.configurationProperties.getZkSessionTimeout(), (int)this.configurationProperties.getZkConnectionTimeout(), (boolean)JaasUtils.isZkSecurityEnabled());){
                short errorCode = this.adminUtilsOperation.errorCodeFromTopicMetadata(topicName, zkUtils);
                if (errorCode == ErrorMapping.NoError()) {
                    int effectivePartitionCount = this.configurationProperties.isAutoAddPartitions() ? Math.max(this.configurationProperties.getMinPartitionCount(), partitionCount) : partitionCount;
                    int partitionSize = this.adminUtilsOperation.partitionSize(topicName, zkUtils);
                    if (partitionSize >= effectivePartitionCount) break block6;
                    if (this.configurationProperties.isAutoAddPartitions()) {
                        this.adminUtilsOperation.invokeAddPartitions(zkUtils, topicName, effectivePartitionCount, null, false);
                        break block6;
                    }
                    throw new ProvisioningException("The number of expected partitions was: " + partitionCount + ", but " + partitionSize + (partitionSize > 1 ? " have " : " has ") + "been found instead." + "Consider either increasing the partition count of the topic or enabling " + "`autoAddPartitions`");
                }
                if (errorCode == ErrorMapping.UnknownTopicOrPartitionCode()) {
                    final int effectivePartitionCount = Math.max(this.configurationProperties.getMinPartitionCount(), partitionCount);
                    this.metadataRetryOperations.execute((RetryCallback)new RetryCallback<Object, RuntimeException>(){

                        public Object doWithRetry(RetryContext context) throws RuntimeException {
                            try {
                                KafkaTopicProvisioner.this.adminUtilsOperation.invokeCreateTopic(zkUtils, topicName, effectivePartitionCount, KafkaTopicProvisioner.this.configurationProperties.getReplicationFactor(), new Properties());
                            }
                            catch (Exception e) {
                                String exceptionClass = e.getClass().getName();
                                if (exceptionClass.equals("kafka.common.TopicExistsException") || exceptionClass.equals("org.apache.kafka.common.errors.TopicExistsException")) {
                                    if (KafkaTopicProvisioner.this.logger.isWarnEnabled()) {
                                        KafkaTopicProvisioner.this.logger.warn((Object)("Attempt to create topic: " + topicName + ". Topic already exists."));
                                    }
                                }
                                throw e;
                            }
                            return null;
                        }
                    });
                    break block6;
                }
                throw new ProvisioningException("Error fetching Kafka topic metadata: ", ErrorMapping.exceptionFor((short)errorCode));
            }
        }
    }

    public Collection<PartitionInfo> getPartitionsForTopic(final int partitionCount, final Callable<Collection<PartitionInfo>> callable) {
        try {
            return (Collection)this.metadataRetryOperations.execute((RetryCallback)new RetryCallback<Collection<PartitionInfo>, Exception>(){

                public Collection<PartitionInfo> doWithRetry(RetryContext context) throws Exception {
                    Collection partitions = (Collection)callable.call();
                    if (partitions.size() < partitionCount) {
                        throw new IllegalStateException("The number of expected partitions was: " + partitionCount + ", but " + partitions.size() + (partitions.size() > 1 ? " have " : " has ") + "been found instead");
                    }
                    return partitions;
                }
            });
        }
        catch (Exception e) {
            this.logger.error((Object)"Cannot initialize Binder", (Throwable)e);
            throw new BinderException("Cannot initialize binder:", (Throwable)e);
        }
    }

    private static final class KafkaConsumerDestination
    implements ConsumerDestination {
        private final String consumerDestinationName;
        private final int partitions;
        private final String dlqName;

        KafkaConsumerDestination(String consumerDestinationName) {
            this(consumerDestinationName, 0, null);
        }

        KafkaConsumerDestination(String consumerDestinationName, int partitions) {
            this(consumerDestinationName, partitions, null);
        }

        KafkaConsumerDestination(String consumerDestinationName, Integer partitions, String dlqName) {
            this.consumerDestinationName = consumerDestinationName;
            this.partitions = partitions;
            this.dlqName = dlqName;
        }

        public String getName() {
            return this.consumerDestinationName;
        }

        public String toString() {
            return "KafkaConsumerDestination{consumerDestinationName='" + this.consumerDestinationName + '\'' + ", partitions=" + this.partitions + ", dlqName='" + this.dlqName + '\'' + '}';
        }
    }

    private static final class KafkaProducerDestination
    implements ProducerDestination {
        private final String producerDestinationName;
        private final int partitions;

        KafkaProducerDestination(String destinationName) {
            this(destinationName, 0);
        }

        KafkaProducerDestination(String destinationName, Integer partitions) {
            this.producerDestinationName = destinationName;
            this.partitions = partitions;
        }

        public String getName() {
            return this.producerDestinationName;
        }

        public String getNameForPartition(int partition) {
            return this.producerDestinationName;
        }

        public String toString() {
            return "KafkaProducerDestination{producerDestinationName='" + this.producerDestinationName + '\'' + ", partitions=" + this.partitions + '}';
        }
    }
}

