/*
 * Decompiled with CFR 0.152.
 */
package com.github.ltsopensource.jobtracker.processor;

import com.github.ltsopensource.core.cluster.NodeType;
import com.github.ltsopensource.core.commons.concurrent.limiter.RateLimiter;
import com.github.ltsopensource.core.protocol.JobProtos;
import com.github.ltsopensource.core.protocol.command.AbstractRemotingCommandBody;
import com.github.ltsopensource.jobtracker.channel.ChannelWrapper;
import com.github.ltsopensource.jobtracker.domain.JobTrackerAppContext;
import com.github.ltsopensource.jobtracker.processor.AbstractRemotingProcessor;
import com.github.ltsopensource.jobtracker.processor.JobBizLogProcessor;
import com.github.ltsopensource.jobtracker.processor.JobCancelProcessor;
import com.github.ltsopensource.jobtracker.processor.JobCompletedProcessor;
import com.github.ltsopensource.jobtracker.processor.JobPullProcessor;
import com.github.ltsopensource.jobtracker.processor.JobSubmitProcessor;
import com.github.ltsopensource.remoting.Channel;
import com.github.ltsopensource.remoting.RemotingProcessor;
import com.github.ltsopensource.remoting.exception.RemotingCommandException;
import com.github.ltsopensource.remoting.protocol.RemotingCommand;
import com.github.ltsopensource.remoting.protocol.RemotingProtos;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

public class RemotingDispatcher
extends AbstractRemotingProcessor {
    private final Map<JobProtos.RequestCode, RemotingProcessor> processors = new HashMap<JobProtos.RequestCode, RemotingProcessor>();
    private RateLimiter rateLimiter;
    private int reqLimitAcquireTimeout = 50;
    private boolean reqLimitEnable = false;

    public RemotingDispatcher(JobTrackerAppContext appContext) {
        super(appContext);
        this.processors.put(JobProtos.RequestCode.SUBMIT_JOB, new JobSubmitProcessor(appContext));
        this.processors.put(JobProtos.RequestCode.JOB_COMPLETED, new JobCompletedProcessor(appContext));
        this.processors.put(JobProtos.RequestCode.JOB_PULL, new JobPullProcessor(appContext));
        this.processors.put(JobProtos.RequestCode.BIZ_LOG_SEND, new JobBizLogProcessor(appContext));
        this.processors.put(JobProtos.RequestCode.CANCEL_JOB, new JobCancelProcessor(appContext));
        this.reqLimitEnable = appContext.getConfig().getParameter("remoting.req.limit.enable", false);
        Integer maxQPS = appContext.getConfig().getParameter("remoting.req.limit.maxQPS", 5000);
        this.rateLimiter = RateLimiter.create(maxQPS.intValue());
        this.reqLimitAcquireTimeout = appContext.getConfig().getParameter("remoting.req.limit.acquire.timeout", 50);
    }

    @Override
    public RemotingCommand processRequest(Channel channel, RemotingCommand request) throws RemotingCommandException {
        if (request.getCode() == JobProtos.RequestCode.HEART_BEAT.code()) {
            this.offerHandler(channel, request);
            return RemotingCommand.createResponseCommand(JobProtos.ResponseCode.HEART_BEAT_SUCCESS.code(), "");
        }
        if (this.reqLimitEnable) {
            return this.doBizWithReqLimit(channel, request);
        }
        return this.doBiz(channel, request);
    }

    private RemotingCommand doBizWithReqLimit(Channel channel, RemotingCommand request) throws RemotingCommandException {
        if (this.rateLimiter.tryAcquire(this.reqLimitAcquireTimeout, TimeUnit.MILLISECONDS)) {
            return this.doBiz(channel, request);
        }
        return RemotingCommand.createResponseCommand(RemotingProtos.ResponseCode.SYSTEM_BUSY.code(), "remoting server is busy!");
    }

    private RemotingCommand doBiz(Channel channel, RemotingCommand request) throws RemotingCommandException {
        JobProtos.RequestCode code = JobProtos.RequestCode.valueOf(request.getCode());
        RemotingProcessor processor = this.processors.get((Object)code);
        if (processor == null) {
            return RemotingCommand.createResponseCommand(RemotingProtos.ResponseCode.REQUEST_CODE_NOT_SUPPORTED.code(), "request code not supported!");
        }
        this.offerHandler(channel, request);
        return processor.processRequest(channel, request);
    }

    private void offerHandler(Channel channel, RemotingCommand request) {
        AbstractRemotingCommandBody commandBody = (AbstractRemotingCommandBody)request.getBody();
        String nodeGroup = commandBody.getNodeGroup();
        String identity = commandBody.getIdentity();
        NodeType nodeType = NodeType.valueOf(commandBody.getNodeType());
        this.appContext.getChannelManager().offerChannel(new ChannelWrapper(channel, nodeType, nodeGroup, identity));
    }
}

