/*
 * Decompiled with CFR 0.152.
 */
package org.apache.rocketmq.spark.streaming;

import java.util.List;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spark.RocketMQConfig;
import org.apache.rocketmq.spark.streaming.DefaultMessageRetryManager;
import org.apache.rocketmq.spark.streaming.MessageRetryManager;
import org.apache.rocketmq.spark.streaming.MessageSet;
import org.apache.rocketmq.spark.streaming.RocketMQReceiver;
import org.apache.spark.storage.StorageLevel;

public class ReliableRocketMQReceiver
extends RocketMQReceiver {
    private BlockingQueue<MessageSet> queue;
    private MessageRetryManager messageRetryManager;
    private MessageSender sender;

    public ReliableRocketMQReceiver(Properties properties, StorageLevel storageLevel) {
        super(properties, storageLevel);
    }

    @Override
    public void onStart() {
        int queueSize = RocketMQConfig.getInteger(this.properties, "spout.queue.size", 500);
        this.queue = new LinkedBlockingQueue<MessageSet>(queueSize);
        int maxRetry = RocketMQConfig.getInteger(this.properties, "spout.messages.max.retry", 3);
        int ttl = RocketMQConfig.getInteger(this.properties, "spout.messages.ttl", 300000);
        this.messageRetryManager = new DefaultMessageRetryManager(this.queue, maxRetry, ttl);
        this.sender = new MessageSender();
        this.sender.setName("MessageSender");
        this.sender.setDaemon(true);
        this.sender.start();
        super.onStart();
    }

    @Override
    public boolean process(List<MessageExt> msgs) {
        if (msgs.isEmpty()) {
            return true;
        }
        MessageSet messageSet = new MessageSet(msgs);
        try {
            this.queue.put(messageSet);
            return true;
        }
        catch (InterruptedException e) {
            return false;
        }
    }

    public void ack(Object msgId) {
        String id = msgId.toString();
        this.messageRetryManager.ack(id);
    }

    public void fail(Object msgId) {
        String id = msgId.toString();
        this.messageRetryManager.fail(id);
    }

    @Override
    public void onStop() {
        this.consumer.shutdown();
    }

    class MessageSender
    extends Thread {
        MessageSender() {
        }

        @Override
        public void run() {
            while (ReliableRocketMQReceiver.this.isStarted()) {
                MessageSet messageSet = null;
                try {
                    messageSet = (MessageSet)ReliableRocketMQReceiver.this.queue.take();
                }
                catch (InterruptedException e) {
                    continue;
                }
                if (messageSet == null) continue;
                ReliableRocketMQReceiver.this.messageRetryManager.mark(messageSet);
                try {
                    ReliableRocketMQReceiver.this.store(messageSet);
                    ReliableRocketMQReceiver.this.ack(messageSet.getId());
                }
                catch (Exception e) {
                    ReliableRocketMQReceiver.this.fail(messageSet.getId());
                }
            }
        }
    }
}

