/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.stream.binder.kafka;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Callable;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.utils.Utils;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder;
import org.springframework.cloud.stream.binder.BinderHeaders;
import org.springframework.cloud.stream.binder.ConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaExtendedBindingProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties;
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.cloud.stream.provisioning.ProvisioningProvider;
import org.springframework.context.Lifecycle;
import org.springframework.expression.Expression;
import org.springframework.expression.common.LiteralExpression;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter;
import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ErrorHandler;
import org.springframework.kafka.listener.config.ContainerProperties;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.kafka.support.SendResult;
import org.springframework.kafka.support.TopicPartitionInitialOffset;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

public class KafkaMessageChannelBinder
extends AbstractMessageChannelBinder<ExtendedConsumerProperties<KafkaConsumerProperties>, ExtendedProducerProperties<KafkaProducerProperties>, KafkaTopicProvisioner>
implements ExtendedPropertiesBinder<MessageChannel, KafkaConsumerProperties, KafkaProducerProperties> {
    private final KafkaBinderConfigurationProperties configurationProperties;
    private ProducerListener<byte[], byte[]> producerListener;
    private KafkaExtendedBindingProperties extendedBindingProperties = new KafkaExtendedBindingProperties();
    private final Map<String, Collection<PartitionInfo>> topicsInUse = new HashMap<String, Collection<PartitionInfo>>();

    public KafkaMessageChannelBinder(KafkaBinderConfigurationProperties configurationProperties, KafkaTopicProvisioner provisioningProvider) {
        super(false, KafkaMessageChannelBinder.headersToMap(configurationProperties), (ProvisioningProvider)provisioningProvider);
        this.configurationProperties = configurationProperties;
    }

    private static String[] headersToMap(KafkaBinderConfigurationProperties configurationProperties) {
        String[] headersToMap;
        if (ObjectUtils.isEmpty((Object[])configurationProperties.getHeaders())) {
            headersToMap = BinderHeaders.STANDARD_HEADERS;
        } else {
            String[] combinedHeadersToMap = Arrays.copyOfRange(BinderHeaders.STANDARD_HEADERS, 0, BinderHeaders.STANDARD_HEADERS.length + configurationProperties.getHeaders().length);
            System.arraycopy(configurationProperties.getHeaders(), 0, combinedHeadersToMap, BinderHeaders.STANDARD_HEADERS.length, configurationProperties.getHeaders().length);
            headersToMap = combinedHeadersToMap;
        }
        return headersToMap;
    }

    public void setExtendedBindingProperties(KafkaExtendedBindingProperties extendedBindingProperties) {
        this.extendedBindingProperties = extendedBindingProperties;
    }

    public void setProducerListener(ProducerListener<byte[], byte[]> producerListener) {
        this.producerListener = producerListener;
    }

    Map<String, Collection<PartitionInfo>> getTopicsInUse() {
        return this.topicsInUse;
    }

    public KafkaConsumerProperties getExtendedConsumerProperties(String channelName) {
        return this.extendedBindingProperties.getExtendedConsumerProperties(channelName);
    }

    public KafkaProducerProperties getExtendedProducerProperties(String channelName) {
        return this.extendedBindingProperties.getExtendedProducerProperties(channelName);
    }

    protected MessageHandler createProducerMessageHandler(final ProducerDestination destination, ExtendedProducerProperties<KafkaProducerProperties> producerProperties) throws Exception {
        final DefaultKafkaProducerFactory<byte[], byte[]> producerFB = this.getProducerFactory(producerProperties);
        Collection partitions = ((KafkaTopicProvisioner)this.provisioningProvider).getPartitionsForTopic(producerProperties.getPartitionCount(), (Callable)new Callable<Collection<PartitionInfo>>(){

            @Override
            public Collection<PartitionInfo> call() throws Exception {
                return producerFB.createProducer().partitionsFor(destination.getName());
            }
        });
        this.topicsInUse.put(destination.getName(), partitions);
        if (producerProperties.getPartitionCount() < partitions.size() && this.logger.isInfoEnabled()) {
            this.logger.info((Object)("The `partitionCount` of the producer for topic " + destination.getName() + " is " + producerProperties.getPartitionCount() + ", smaller than the actual partition count of " + partitions.size() + " of the topic. The larger number will be used instead."));
        }
        KafkaTemplate kafkaTemplate = new KafkaTemplate(producerFB);
        if (this.producerListener != null) {
            kafkaTemplate.setProducerListener(this.producerListener);
        }
        return new ProducerConfigurationMessageHandler(kafkaTemplate, destination.getName(), producerProperties, producerFB);
    }

    private DefaultKafkaProducerFactory<byte[], byte[]> getProducerFactory(ExtendedProducerProperties<KafkaProducerProperties> producerProperties) {
        HashMap<String, Object> props = new HashMap<String, Object>();
        props.put("retries", 0);
        props.put("buffer.memory", 0x2000000);
        props.put("key.serializer", ByteArraySerializer.class);
        props.put("value.serializer", ByteArraySerializer.class);
        props.put("acks", String.valueOf(this.configurationProperties.getRequiredAcks()));
        if (!ObjectUtils.isEmpty((Object)this.configurationProperties.getConfiguration())) {
            props.putAll(this.configurationProperties.getConfiguration());
        }
        if (ObjectUtils.isEmpty(props.get("bootstrap.servers"))) {
            props.put("bootstrap.servers", this.configurationProperties.getKafkaConnectionString());
        }
        if (ObjectUtils.isEmpty(props.get("batch.size"))) {
            props.put("batch.size", String.valueOf(((KafkaProducerProperties)producerProperties.getExtension()).getBufferSize()));
        }
        if (ObjectUtils.isEmpty(props.get("linger.ms"))) {
            props.put("linger.ms", String.valueOf(((KafkaProducerProperties)producerProperties.getExtension()).getBatchTimeout()));
        }
        if (ObjectUtils.isEmpty(props.get("compression.type"))) {
            props.put("compression.type", ((KafkaProducerProperties)producerProperties.getExtension()).getCompressionType().toString());
        }
        if (!ObjectUtils.isEmpty((Object)((KafkaProducerProperties)producerProperties.getExtension()).getConfiguration())) {
            props.putAll(((KafkaProducerProperties)producerProperties.getExtension()).getConfiguration());
        }
        return new DefaultKafkaProducerFactory(props);
    }

    protected MessageProducer createConsumerEndpoint(final ConsumerDestination destination, final String group, final ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties) {
        ArrayList<PartitionInfo> listenedPartitions;
        boolean anonymous = !StringUtils.hasText((String)group);
        Assert.isTrue((!anonymous || !((KafkaConsumerProperties)extendedConsumerProperties.getExtension()).isEnableDlq() ? 1 : 0) != 0, (String)"DLQ support is not available for anonymous subscriptions");
        String consumerGroup = anonymous ? "anonymous." + UUID.randomUUID().toString() : group;
        final ConsumerFactory<?, ?> consumerFactory = this.createKafkaConsumerFactory(anonymous, consumerGroup, extendedConsumerProperties);
        int partitionCount = extendedConsumerProperties.getInstanceCount() * extendedConsumerProperties.getConcurrency();
        ArrayList<PartitionInfo> allPartitions = ((KafkaTopicProvisioner)this.provisioningProvider).getPartitionsForTopic(partitionCount, (Callable)new Callable<Collection<PartitionInfo>>(){

            @Override
            public Collection<PartitionInfo> call() throws Exception {
                return consumerFactory.createConsumer().partitionsFor(destination.getName());
            }
        });
        if (((KafkaConsumerProperties)extendedConsumerProperties.getExtension()).isAutoRebalanceEnabled() || extendedConsumerProperties.getInstanceCount() == 1) {
            listenedPartitions = allPartitions;
        } else {
            listenedPartitions = new ArrayList<PartitionInfo>();
            for (PartitionInfo partition : allPartitions) {
                if (partition.partition() % extendedConsumerProperties.getInstanceCount() != extendedConsumerProperties.getInstanceIndex()) continue;
                listenedPartitions.add(partition);
            }
        }
        this.topicsInUse.put(destination.getName(), listenedPartitions);
        Assert.isTrue((!CollectionUtils.isEmpty(listenedPartitions) ? 1 : 0) != 0, (String)"A list of partitions must be provided");
        TopicPartitionInitialOffset[] topicPartitionInitialOffsets = this.getTopicPartitionInitialOffsets(listenedPartitions);
        ContainerProperties containerProperties = anonymous || ((KafkaConsumerProperties)extendedConsumerProperties.getExtension()).isAutoRebalanceEnabled() ? new ContainerProperties(new String[]{destination.getName()}) : new ContainerProperties(topicPartitionInitialOffsets);
        int concurrency = Math.min(extendedConsumerProperties.getConcurrency(), listenedPartitions.size());
        ConcurrentMessageListenerContainer messageListenerContainer = new ConcurrentMessageListenerContainer(consumerFactory, containerProperties){

            public void stop(Runnable callback) {
                super.stop(callback);
            }
        };
        messageListenerContainer.setConcurrency(concurrency);
        messageListenerContainer.getContainerProperties().setAckOnError(this.isAutoCommitOnError(extendedConsumerProperties));
        if (!((KafkaConsumerProperties)extendedConsumerProperties.getExtension()).isAutoCommitOffset()) {
            messageListenerContainer.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
        }
        if (this.logger.isDebugEnabled()) {
            this.logger.debug((Object)("Listened partitions: " + StringUtils.collectionToCommaDelimitedString(listenedPartitions)));
        }
        if (this.logger.isDebugEnabled()) {
            this.logger.debug((Object)("Listened partitions: " + StringUtils.collectionToCommaDelimitedString(listenedPartitions)));
        }
        KafkaMessageDrivenChannelAdapter kafkaMessageDrivenChannelAdapter = new KafkaMessageDrivenChannelAdapter((AbstractMessageListenerContainer)messageListenerContainer);
        kafkaMessageDrivenChannelAdapter.setBeanFactory((BeanFactory)this.getBeanFactory());
        RetryTemplate retryTemplate = this.buildRetryTemplate((ConsumerProperties)extendedConsumerProperties);
        kafkaMessageDrivenChannelAdapter.setRetryTemplate(retryTemplate);
        if (((KafkaConsumerProperties)extendedConsumerProperties.getExtension()).isEnableDlq()) {
            DefaultKafkaProducerFactory<byte[], byte[]> producerFactory = this.getProducerFactory((ExtendedProducerProperties<KafkaProducerProperties>)new ExtendedProducerProperties((Object)new KafkaProducerProperties()));
            final KafkaTemplate kafkaTemplate = new KafkaTemplate(producerFactory);
            messageListenerContainer.getContainerProperties().setErrorHandler(new ErrorHandler(){

                public void handle(Exception thrownException, final ConsumerRecord message) {
                    final byte[] key = message.key() != null ? Utils.toArray((ByteBuffer)ByteBuffer.wrap((byte[])message.key())) : null;
                    final byte[] payload = message.value() != null ? Utils.toArray((ByteBuffer)ByteBuffer.wrap((byte[])message.value())) : null;
                    String dlqName = StringUtils.hasText((String)((KafkaConsumerProperties)extendedConsumerProperties.getExtension()).getDlqName()) ? ((KafkaConsumerProperties)extendedConsumerProperties.getExtension()).getDlqName() : "error." + destination.getName() + "." + group;
                    ListenableFuture sentDlq = kafkaTemplate.send(dlqName, message.partition(), (Object)key, (Object)payload);
                    sentDlq.addCallback((ListenableFutureCallback)new ListenableFutureCallback<SendResult<byte[], byte[]>>(){
                        StringBuilder sb;
                        {
                            this.sb = new StringBuilder().append(" a message with key='").append(KafkaMessageChannelBinder.this.toDisplayString(ObjectUtils.nullSafeToString((byte[])key), 50)).append("'").append(" and payload='").append(KafkaMessageChannelBinder.this.toDisplayString(ObjectUtils.nullSafeToString((byte[])payload), 50)).append("'").append(" received from ").append(message.partition());
                        }

                        public void onFailure(Throwable ex) {
                            KafkaMessageChannelBinder.this.logger.error((Object)("Error sending to DLQ" + this.sb.toString()), ex);
                        }

                        public void onSuccess(SendResult<byte[], byte[]> result) {
                            if (KafkaMessageChannelBinder.this.logger.isDebugEnabled()) {
                                KafkaMessageChannelBinder.this.logger.debug((Object)("Sent to DLQ " + this.sb.toString()));
                            }
                        }
                    });
                }
            });
        }
        return kafkaMessageDrivenChannelAdapter;
    }

    private ConsumerFactory<?, ?> createKafkaConsumerFactory(boolean anonymous, String consumerGroup, ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties) {
        HashMap<String, Object> props = new HashMap<String, Object>();
        props.put("key.deserializer", ByteArrayDeserializer.class);
        props.put("value.deserializer", ByteArrayDeserializer.class);
        props.put("enable.auto.commit", false);
        props.put("group.id", consumerGroup);
        props.put("auto.offset.reset", anonymous ? "latest" : "earliest");
        props.put("auto.commit.interval.ms", 100);
        if (!ObjectUtils.isEmpty((Object)this.configurationProperties.getConfiguration())) {
            props.putAll(this.configurationProperties.getConfiguration());
        }
        if (ObjectUtils.isEmpty(props.get("bootstrap.servers"))) {
            props.put("bootstrap.servers", this.configurationProperties.getKafkaConnectionString());
        }
        if (!ObjectUtils.isEmpty((Object)((KafkaConsumerProperties)consumerProperties.getExtension()).getConfiguration())) {
            props.putAll(((KafkaConsumerProperties)consumerProperties.getExtension()).getConfiguration());
        }
        return new DefaultKafkaConsumerFactory(props);
    }

    private boolean isAutoCommitOnError(ExtendedConsumerProperties<KafkaConsumerProperties> properties) {
        return ((KafkaConsumerProperties)properties.getExtension()).getAutoCommitOnError() != null ? ((KafkaConsumerProperties)properties.getExtension()).getAutoCommitOnError() : ((KafkaConsumerProperties)properties.getExtension()).isAutoCommitOffset() && ((KafkaConsumerProperties)properties.getExtension()).isEnableDlq();
    }

    private TopicPartitionInitialOffset[] getTopicPartitionInitialOffsets(Collection<PartitionInfo> listenedPartitions) {
        TopicPartitionInitialOffset[] topicPartitionInitialOffsets = new TopicPartitionInitialOffset[listenedPartitions.size()];
        int i = 0;
        for (PartitionInfo partition : listenedPartitions) {
            topicPartitionInitialOffsets[i++] = new TopicPartitionInitialOffset(partition.topic(), partition.partition());
        }
        return topicPartitionInitialOffsets;
    }

    private String toDisplayString(String original, int maxCharacters) {
        if (original.length() <= maxCharacters) {
            return original;
        }
        return original.substring(0, maxCharacters) + "...";
    }

    private final class ProducerConfigurationMessageHandler
    extends KafkaProducerMessageHandler<byte[], byte[]>
    implements Lifecycle {
        private boolean running;
        private final DefaultKafkaProducerFactory<byte[], byte[]> producerFactory;

        private ProducerConfigurationMessageHandler(KafkaTemplate<byte[], byte[]> kafkaTemplate, String topic, ExtendedProducerProperties<KafkaProducerProperties> producerProperties, DefaultKafkaProducerFactory<byte[], byte[]> producerFactory) {
            super(kafkaTemplate);
            this.running = true;
            this.setTopicExpression((Expression)new LiteralExpression(topic));
            this.setBeanFactory((BeanFactory)KafkaMessageChannelBinder.this.getBeanFactory());
            if (producerProperties.isPartitioned()) {
                SpelExpressionParser parser = new SpelExpressionParser();
                this.setPartitionIdExpression(parser.parseExpression("headers.scst_partition"));
            }
            if (((KafkaProducerProperties)producerProperties.getExtension()).isSync()) {
                this.setSync(true);
            }
            this.producerFactory = producerFactory;
        }

        public void start() {
            try {
                super.onInit();
            }
            catch (Exception e) {
                this.logger.error((Object)"Initialization errors: ", (Throwable)e);
                throw new RuntimeException(e);
            }
        }

        public void stop() {
            this.producerFactory.stop();
            this.running = false;
        }

        public boolean isRunning() {
            return this.running;
        }
    }
}

