/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.rocketmq.client.impl;

import com.alibaba.rocketmq.client.impl.factory.MQClientFactory;
import com.alibaba.rocketmq.client.impl.producer.MQProducerInner;
import com.alibaba.rocketmq.client.log.ClientLogger;
import com.alibaba.rocketmq.common.message.MessageDecoder;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.rocketmq.common.message.MessageQueue;
import com.alibaba.rocketmq.common.protocol.body.GetConsumerStatusBody;
import com.alibaba.rocketmq.common.protocol.body.ResetOffsetBody;
import com.alibaba.rocketmq.common.protocol.header.CheckTransactionStateRequestHeader;
import com.alibaba.rocketmq.common.protocol.header.GetConsumerStatusRequestHeader;
import com.alibaba.rocketmq.common.protocol.header.NotifyConsumerIdsChangedRequestHeader;
import com.alibaba.rocketmq.common.protocol.header.ResetOffsetRequestHeader;
import com.alibaba.rocketmq.remoting.common.RemotingHelper;
import com.alibaba.rocketmq.remoting.exception.RemotingCommandException;
import com.alibaba.rocketmq.remoting.netty.NettyRequestProcessor;
import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;

public class ClientRemotingProcessor
implements NettyRequestProcessor {
    private final Logger log = ClientLogger.getLog();
    private final MQClientFactory mqClientFactory;

    public ClientRemotingProcessor(MQClientFactory mqClientFactory) {
        this.mqClientFactory = mqClientFactory;
    }

    public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
        switch (request.getCode()) {
            case 39: {
                return this.checkTransactionState(ctx, request);
            }
            case 40: {
                return this.notifyConsumerIdsChanged(ctx, request);
            }
            case 220: {
                return this.resetOffset(ctx, request);
            }
            case 221: {
                return this.getConsumeStatus(ctx, request);
            }
        }
        return null;
    }

    public RemotingCommand checkTransactionState(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
        CheckTransactionStateRequestHeader requestHeader = (CheckTransactionStateRequestHeader)request.decodeCommandCustomHeader(CheckTransactionStateRequestHeader.class);
        ByteBuffer byteBuffer = ByteBuffer.wrap(request.getBody());
        MessageExt messageExt = MessageDecoder.decode((ByteBuffer)byteBuffer);
        if (messageExt != null) {
            String group = messageExt.getProperty("PGROUP");
            if (group != null) {
                MQProducerInner producer = this.mqClientFactory.selectProducer(group);
                if (producer != null) {
                    String addr = RemotingHelper.parseChannelRemoteAddr((Channel)ctx.channel());
                    producer.checkTransactionState(addr, messageExt, requestHeader);
                } else {
                    this.log.debug("checkTransactionState, pick producer by group[{}] failed", (Object)group);
                }
            } else {
                this.log.warn("checkTransactionState, pick producer group failed");
            }
        } else {
            this.log.warn("checkTransactionState, decode message failed");
        }
        return null;
    }

    public RemotingCommand notifyConsumerIdsChanged(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
        NotifyConsumerIdsChangedRequestHeader requestHeader = (NotifyConsumerIdsChangedRequestHeader)request.decodeCommandCustomHeader(NotifyConsumerIdsChangedRequestHeader.class);
        this.log.info("receive broker's notification[{}], the consumer group: {} changed, rebalance immediately", (Object)RemotingHelper.parseChannelRemoteAddr((Channel)ctx.channel()), (Object)requestHeader.getConsumerGroup());
        this.mqClientFactory.rebalanceImmediately();
        return null;
    }

    public RemotingCommand resetOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
        ResetOffsetRequestHeader requestHeader = (ResetOffsetRequestHeader)request.decodeCommandCustomHeader(ResetOffsetRequestHeader.class);
        this.log.info("invoke reset offset operation from broker. brokerAddr={}, topic={}, group={}, timestamp={}", new Object[]{RemotingHelper.parseChannelRemoteAddr((Channel)ctx.channel()), requestHeader.getTopic(), requestHeader.getGroup(), requestHeader.getTimestamp()});
        Map<Object, Long> offsetTable = new HashMap<MessageQueue, Long>();
        if (request.getBody() != null) {
            ResetOffsetBody body = (ResetOffsetBody)ResetOffsetBody.decode((byte[])request.getBody(), ResetOffsetBody.class);
            offsetTable = body.getOffsetTable();
        }
        this.mqClientFactory.resetOffset(requestHeader.getTopic(), requestHeader.getGroup(), offsetTable);
        return null;
    }

    public RemotingCommand getConsumeStatus(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
        RemotingCommand response = RemotingCommand.createResponseCommand(null);
        GetConsumerStatusRequestHeader requestHeader = (GetConsumerStatusRequestHeader)request.decodeCommandCustomHeader(GetConsumerStatusRequestHeader.class);
        Map<MessageQueue, Long> offsetTable = this.mqClientFactory.getConsumerStatus(requestHeader.getTopic(), requestHeader.getGroup());
        GetConsumerStatusBody body = new GetConsumerStatusBody();
        body.setMessageQueueTable(offsetTable);
        response.setBody(body.encode());
        response.setCode(0);
        return response;
    }
}

