package com.jcloud.jcq.client.common;

import com.jcloud.jcq.client.consumer.SubscribeConsumer;
import com.jcloud.jcq.common.utils.CommunicationUtils;
import com.jcloud.jcq.communication.core.ChannelWrapper;
import com.jcloud.jcq.communication.portal.CommunicationRequestHandler;
import com.jcloud.jcq.communication.protocol.CommunicationUnit;
import com.jcloud.jcq.protocol.ProtocolSerializer;
import com.jcloud.jcq.protocol.RequestCode;
import com.jcloud.jcq.protocol.broker.PushMessageRequest;
import java.util.HashMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/jcloud/jcq/client/common/RequestHandler.class */
public class RequestHandler implements CommunicationRequestHandler {
    private final RemotingApiWrapper remotingApiWrapper = RemotingApiWrapper.getInstance();
    ConcurrentMap<String, ClientInstance> clientInstances = new ConcurrentHashMap();
    private static final Logger logger = LoggerFactory.getLogger(RequestHandler.class);
    private static final RequestHandler instance = new RequestHandler();

    private RequestHandler() {
        registerRequestCode2Processor();
    }

    public static RequestHandler getInstance() {
        return instance;
    }

    @Override // com.jcloud.jcq.communication.portal.CommunicationRequestHandler
    public CommunicationUnit processRequest(ChannelWrapper channelWrapper, CommunicationUnit communicationUnit) throws Exception {
        String parseChannelRemoteAddr = CommunicationUtils.parseChannelRemoteAddr(channelWrapper.getChannel());
        if (logger.isDebugEnabled()) {
            logger.debug("got request code:{} from address:{}", RequestCode.getName(communicationUnit.getCode()), parseChannelRemoteAddr);
        }
        switch (communicationUnit.getCode()) {
            case 105:
                handlePushMessage(communicationUnit);
                return null;
            default:
                return null;
        }
    }

    @Override // com.jcloud.jcq.communication.portal.CommunicationRequestHandler
    public boolean rejectRequest() {
        return false;
    }

    public void registerClientInstance(ClientInstance clientInstance) {
        this.clientInstances.put(clientInstance.getInstanceId(), clientInstance);
    }

    public void unregisterClientInstance(ClientInstance clientInstance) {
        this.clientInstances.remove(clientInstance.getInstanceId());
    }

    private void registerRequestCode2Processor() {
        HashMap hashMap = new HashMap();
        hashMap.put((short) 105, null);
        this.remotingApiWrapper.registerRequestCode2Processor(hashMap, this);
    }

    private void handlePushMessage(CommunicationUnit communicationUnit) {
        PushMessageRequest pushMessageRequest = (PushMessageRequest) ProtocolSerializer.decode(communicationUnit.getData(), PushMessageRequest.class);
        String consumerId = pushMessageRequest.getConsumerId();
        ClientInstance clientInstance = this.clientInstances.get(consumerId);
        if (clientInstance == null) {
            logger.warn("requestHandler not have consumer:{} registered, cannot handle push request:{}", consumerId, pushMessageRequest);
        } else if (clientInstance instanceof SubscribeConsumer) {
            ((SubscribeConsumer) clientInstance).receiveMessages(pushMessageRequest.getMessages(), pushMessageRequest.getAckIndex(), pushMessageRequest.getBrokerGroupId());
        } else {
            logger.warn("clientInstance:{} is not instanceof SubscribeConsumer, cannot handle push request:{}", consumerId, pushMessageRequest);
        }
    }
}
