/*
 * Decompiled with CFR 0.152.
 */
package com.bizvane.utils.kafkautils;

import com.bizvane.utils.kafkautils.ConsumerConfig;
import java.util.List;
import java.util.Properties;
import java.util.function.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;

@ConditionalOnProperty(name={"spring.kafka.consumer.group-id"})
@Component
public class KafkaConsumerClient<K> {
    private KafkaConsumer<K, String> kafkaConsumer;
    Properties props = null;
    private ConsumerConfig consumerConfig;

    @Autowired
    public KafkaConsumerClient(ConsumerConfig consumerConfig) {
        this.consumerConfig = consumerConfig;
        this.init();
    }

    void init() {
        this.props = new Properties();
        this.props.setProperty("bootstrap.servers", this.consumerConfig.getBootstrapservers());
        this.props.setProperty("group.id", this.consumerConfig.getGroupid());
        this.props.setProperty("enable.auto.commit", this.consumerConfig.getEnableautocommit());
        this.props.setProperty("auto.offset.reset", this.consumerConfig.getAutooffsetreset());
        this.props.setProperty("key.deserializer", this.consumerConfig.getKeydeserializer());
        this.props.setProperty("value.deserializer", this.consumerConfig.getValuedeserializer());
        this.props.setProperty("session.timeout.ms", String.valueOf(this.consumerConfig.getSessiontimeout()));
        this.props.setProperty("request.timeout.ms", String.valueOf(this.consumerConfig.getRequesttimeout()));
        this.props.setProperty("max.partition.fetch.bytes", String.valueOf(this.consumerConfig.getMaxpartitionfetchbytes()));
        this.kafkaConsumer = new KafkaConsumer(this.props);
    }

    public void receive(List<String> topic, long timeOut, Consumer<ConsumerRecords<K, String>> predicate) {
        this.kafkaConsumer.subscribe(topic);
        while (true) {
            ConsumerRecords records;
            if ((records = this.kafkaConsumer.poll(timeOut)).isEmpty()) {
                continue;
            }
            predicate.accept(records);
            this.kafkaConsumer.commitSync();
        }
    }
}

