/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.otter.canal.client.kafka;

import com.alibaba.fastjson.JSON;
import com.alibaba.google.common.collect.Lists;
import com.alibaba.otter.canal.client.CanalMQConnector;
import com.alibaba.otter.canal.client.kafka.MessageDeserializer;
import com.alibaba.otter.canal.protocol.FlatMessage;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.exception.CanalClientException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

public class KafkaCanalConnector
implements CanalMQConnector {
    private KafkaConsumer<String, Message> kafkaConsumer;
    private KafkaConsumer<String, String> kafkaConsumer2;
    private String topic;
    private Integer partition;
    private Properties properties;
    private volatile boolean connected = false;
    private volatile boolean running = false;
    private boolean flatMessage;

    public KafkaCanalConnector(String servers, String topic, Integer partition, String groupId, Integer batchSize, boolean flatMessage) {
        this.topic = topic;
        this.partition = partition;
        this.flatMessage = flatMessage;
        this.properties = new Properties();
        this.properties.put("bootstrap.servers", servers);
        this.properties.put("group.id", groupId);
        this.properties.put("enable.auto.commit", (Object)false);
        this.properties.put("auto.commit.interval.ms", "1000");
        this.properties.put("auto.offset.reset", "latest");
        this.properties.put("request.timeout.ms", "40000");
        this.properties.put("session.timeout.ms", "30000");
        if (batchSize == null) {
            batchSize = 100;
        }
        this.properties.put("max.poll.records", batchSize.toString());
        this.properties.put("key.deserializer", StringDeserializer.class.getName());
        if (!flatMessage) {
            this.properties.put("value.deserializer", MessageDeserializer.class.getName());
        } else {
            this.properties.put("value.deserializer", StringDeserializer.class.getName());
        }
    }

    @Override
    public void connect() {
        if (this.connected) {
            return;
        }
        this.connected = true;
        if (this.kafkaConsumer == null && !this.flatMessage) {
            this.kafkaConsumer = new KafkaConsumer(this.properties);
        }
        if (this.kafkaConsumer2 == null && this.flatMessage) {
            this.kafkaConsumer2 = new KafkaConsumer(this.properties);
        }
    }

    @Override
    public void disconnect() {
        if (this.kafkaConsumer != null) {
            this.kafkaConsumer.close();
            this.kafkaConsumer = null;
        }
        if (this.kafkaConsumer2 != null) {
            this.kafkaConsumer2.close();
            this.kafkaConsumer2 = null;
        }
        this.connected = false;
    }

    private void waitClientRunning() {
        this.running = true;
    }

    @Override
    public boolean checkValid() {
        return true;
    }

    @Override
    public void subscribe() {
        this.waitClientRunning();
        if (!this.running) {
            return;
        }
        if (this.partition == null) {
            if (this.kafkaConsumer != null) {
                this.kafkaConsumer.subscribe(Collections.singletonList(this.topic));
            }
            if (this.kafkaConsumer2 != null) {
                this.kafkaConsumer2.subscribe(Collections.singletonList(this.topic));
            }
        } else {
            TopicPartition topicPartition = new TopicPartition(this.topic, this.partition.intValue());
            if (this.kafkaConsumer != null) {
                this.kafkaConsumer.assign(Collections.singletonList(topicPartition));
            }
            if (this.kafkaConsumer2 != null) {
                this.kafkaConsumer2.assign(Collections.singletonList(topicPartition));
            }
        }
    }

    @Override
    public void unsubscribe() {
        this.waitClientRunning();
        if (!this.running) {
            return;
        }
        if (this.kafkaConsumer != null) {
            this.kafkaConsumer.unsubscribe();
        }
        if (this.kafkaConsumer2 != null) {
            this.kafkaConsumer2.unsubscribe();
        }
    }

    @Override
    public List<Message> getList(Long timeout, TimeUnit unit) throws CanalClientException {
        this.waitClientRunning();
        if (!this.running) {
            return Lists.newArrayList();
        }
        List<Message> messages = this.getListWithoutAck(timeout, unit);
        if (messages != null && !messages.isEmpty()) {
            this.ack();
        }
        return messages;
    }

    @Override
    public List<Message> getListWithoutAck(Long timeout, TimeUnit unit) throws CanalClientException {
        this.waitClientRunning();
        if (!this.running) {
            return Lists.newArrayList();
        }
        ConsumerRecords records = this.kafkaConsumer.poll(unit.toMillis(timeout));
        if (!records.isEmpty()) {
            ArrayList<Message> messages = new ArrayList<Message>();
            for (ConsumerRecord record : records) {
                messages.add((Message)record.value());
            }
            return messages;
        }
        return Lists.newArrayList();
    }

    @Override
    public List<FlatMessage> getFlatList(Long timeout, TimeUnit unit) throws CanalClientException {
        this.waitClientRunning();
        if (!this.running) {
            return Lists.newArrayList();
        }
        List<FlatMessage> messages = this.getFlatListWithoutAck(timeout, unit);
        if (messages != null && !messages.isEmpty()) {
            this.ack();
        }
        return messages;
    }

    @Override
    public List<FlatMessage> getFlatListWithoutAck(Long timeout, TimeUnit unit) throws CanalClientException {
        this.waitClientRunning();
        if (!this.running) {
            return Lists.newArrayList();
        }
        ConsumerRecords records = this.kafkaConsumer2.poll(unit.toMillis(timeout));
        if (!records.isEmpty()) {
            ArrayList<FlatMessage> flatMessages = new ArrayList<FlatMessage>();
            for (ConsumerRecord record : records) {
                String flatMessageJson = (String)record.value();
                FlatMessage flatMessage = (FlatMessage)JSON.parseObject((String)flatMessageJson, FlatMessage.class);
                flatMessages.add(flatMessage);
            }
            return flatMessages;
        }
        return Lists.newArrayList();
    }

    @Override
    public void rollback() throws CanalClientException {
    }

    @Override
    public void ack() {
        this.waitClientRunning();
        if (!this.running) {
            return;
        }
        if (this.kafkaConsumer != null) {
            this.kafkaConsumer.commitSync();
        }
        if (this.kafkaConsumer2 != null) {
            this.kafkaConsumer2.commitSync();
        }
    }

    @Override
    public void subscribe(String filter) throws CanalClientException {
        throw new CanalClientException("mq not support this method");
    }

    @Override
    public Message get(int batchSize) throws CanalClientException {
        throw new CanalClientException("mq not support this method");
    }

    @Override
    public Message get(int batchSize, Long timeout, TimeUnit unit) throws CanalClientException {
        throw new CanalClientException("mq not support this method");
    }

    @Override
    public Message getWithoutAck(int batchSize) throws CanalClientException {
        throw new CanalClientException("mq not support this method");
    }

    @Override
    public Message getWithoutAck(int batchSize, Long timeout, TimeUnit unit) throws CanalClientException {
        throw new CanalClientException("mq not support this method");
    }

    @Override
    public void ack(long batchId) throws CanalClientException {
        throw new CanalClientException("mq not support this method");
    }

    @Override
    public void rollback(long batchId) throws CanalClientException {
        throw new CanalClientException("mq not support this method");
    }

    public void setSessionTimeout(Long timeout, TimeUnit unit) {
        long t = unit.toMillis(timeout);
        this.properties.put("request.timeout.ms", String.valueOf(t + 60000L));
        this.properties.put("session.timeout.ms", String.valueOf(t));
    }
}

