/*
 * Decompiled with CFR 0.152.
 */
package org.apache.rocketmq.spark.streaming;

import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.commons.lang.Validate;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spark.RocketMQConfig;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.receiver.Receiver;

public class RocketMQReceiver
extends Receiver<Message> {
    protected MQPushConsumer consumer;
    protected boolean ordered;
    protected Properties properties;

    public RocketMQReceiver(Properties properties, StorageLevel storageLevel) {
        super(storageLevel);
        this.properties = properties;
    }

    public void onStart() {
        Validate.notEmpty((Map)this.properties, (String)"Consumer properties can not be empty");
        this.ordered = RocketMQConfig.getBoolean(this.properties, "consumer.messages.orderly", false);
        this.consumer = new DefaultMQPushConsumer();
        RocketMQConfig.buildConsumerConfigs(this.properties, (DefaultMQPushConsumer)this.consumer);
        if (this.ordered) {
            this.consumer.registerMessageListener(new MessageListenerOrderly(){

                public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                    if (RocketMQReceiver.this.process(msgs)) {
                        return ConsumeOrderlyStatus.SUCCESS;
                    }
                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                }
            });
        } else {
            this.consumer.registerMessageListener(new MessageListenerConcurrently(){

                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    if (RocketMQReceiver.this.process(msgs)) {
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    }
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            });
        }
        try {
            this.consumer.start();
        }
        catch (MQClientException e) {
            throw new RuntimeException(e);
        }
    }

    public boolean process(List<MessageExt> msgs) {
        if (msgs.isEmpty()) {
            return true;
        }
        try {
            for (MessageExt msg : msgs) {
                this.store(msg);
            }
            return true;
        }
        catch (Exception e) {
            return false;
        }
    }

    public void onStop() {
        this.consumer.shutdown();
    }
}

