/*
 * Decompiled with CFR 0.152.
 */
package com.bizvane.comsumer.config;

import com.bizvane.comsumer.config.ConsumerConfig;
import java.util.List;
import java.util.Properties;
import java.util.function.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;

@ConditionalOnProperty(name={"spring.kafka.consumer.group-id"})
@Component
public class KafkaConsumerClient<K> {
    private static final Logger log = LoggerFactory.getLogger(KafkaConsumerClient.class);
    private KafkaConsumer<K, String> kafkaConsumer;
    Properties props = null;
    private ConsumerConfig consumerConfig;

    @Autowired
    public KafkaConsumerClient(ConsumerConfig consumerConfig) {
        this.consumerConfig = consumerConfig;
        this.init();
    }

    void init() {
        this.props = new Properties();
        this.props.setProperty("bootstrap.servers", this.consumerConfig.getBootstrapservers());
        this.props.setProperty("group.id", this.consumerConfig.getGroupid());
        this.props.setProperty("enable.auto.commit", this.consumerConfig.getEnableautocommit());
        this.props.setProperty("auto.offset.reset", this.consumerConfig.getAutooffsetreset());
        this.props.setProperty("key.deserializer", this.consumerConfig.getKeydeserializer());
        this.props.setProperty("value.deserializer", this.consumerConfig.getValuedeserializer());
        this.props.setProperty("session.timeout.ms", String.valueOf(this.consumerConfig.getSessiontimeout()));
        this.props.setProperty("request.timeout.ms", String.valueOf(this.consumerConfig.getRequesttimeout()));
        this.props.setProperty("max.partition.fetch.bytes", String.valueOf(this.consumerConfig.getMaxpartitionfetchbytes()));
        this.kafkaConsumer = new KafkaConsumer(this.props);
    }

    public void receive(List<String> topic, long timeOut, Consumer<ConsumerRecords<K, String>> predicate) {
        log.info("KafkaConsumerClient_receive");
        this.kafkaConsumer.subscribe(topic);
        log.info("kafkaConsumer_subscribe_{}_success", topic);
        try {
            while (true) {
                long start = System.currentTimeMillis();
                ConsumerRecords records = this.kafkaConsumer.poll(timeOut);
                int count = records.count();
                long end = System.currentTimeMillis();
                log.info("\u672c\u6b21\u62c9\u53d6\u6570\u636e\u6761\u6570:{},\u5f00\u59cb\u65f6\u95f4:{},\u7ed3\u675f\u65f6\u95f4:{},\u8017\u65f6:{}ms", new Object[]{count, start, end, end - start});
                if (records.isEmpty()) continue;
                predicate.accept(records);
                this.kafkaConsumer.commitAsync();
            }
        }
        catch (Exception e) {
            log.error("KafkaConsumerClient_receive_error_msg:{}", (Throwable)e);
            e.printStackTrace();
            return;
        }
    }
}

