package com.github.ltsopensource.tasktracker.processor;

import com.github.ltsopensource.core.commons.utils.Callable;
import com.github.ltsopensource.core.commons.utils.CollectionUtils;
import com.github.ltsopensource.core.constant.ExtConfig;
import com.github.ltsopensource.core.domain.JobMeta;
import com.github.ltsopensource.core.domain.JobRunResult;
import com.github.ltsopensource.core.exception.JobTrackerNotFoundException;
import com.github.ltsopensource.core.exception.RequestTimeoutException;
import com.github.ltsopensource.core.failstore.FailStorePathBuilder;
import com.github.ltsopensource.core.json.JSON;
import com.github.ltsopensource.core.logger.Logger;
import com.github.ltsopensource.core.logger.LoggerFactory;
import com.github.ltsopensource.core.protocol.JobProtos;
import com.github.ltsopensource.core.protocol.command.JobCompletedRequest;
import com.github.ltsopensource.core.protocol.command.JobPushRequest;
import com.github.ltsopensource.core.protocol.command.JobPushResponse;
import com.github.ltsopensource.core.remoting.RemotingClientDelegate;
import com.github.ltsopensource.core.support.NodeShutdownHook;
import com.github.ltsopensource.core.support.RetryScheduler;
import com.github.ltsopensource.core.support.SystemClock;
import com.github.ltsopensource.remoting.AsyncCallback;
import com.github.ltsopensource.remoting.Channel;
import com.github.ltsopensource.remoting.ResponseFuture;
import com.github.ltsopensource.remoting.exception.RemotingCommandException;
import com.github.ltsopensource.remoting.protocol.RemotingCommand;
import com.github.ltsopensource.remoting.protocol.RemotingProtos;
import com.github.ltsopensource.tasktracker.domain.Response;
import com.github.ltsopensource.tasktracker.domain.TaskTrackerAppContext;
import com.github.ltsopensource.tasktracker.expcetion.NoAvailableJobRunnerException;
import com.github.ltsopensource.tasktracker.runner.RunnerCallback;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/* loaded from: input_file:com/github/ltsopensource/tasktracker/processor/JobPushProcessor.class */
public class JobPushProcessor extends AbstractProcessor {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) JobPushProcessor.class);
    private RetryScheduler<JobRunResult> retryScheduler;
    private JobRunnerCallback jobRunnerCallback;
    private RemotingClientDelegate remotingClient;

    /* loaded from: input_file:com/github/ltsopensource/tasktracker/processor/JobPushProcessor$JobRunnerCallback.class */
    private class JobRunnerCallback implements RunnerCallback {
        private JobRunnerCallback() {
        }

        @Override // com.github.ltsopensource.tasktracker.runner.RunnerCallback
        public JobMeta runComplete(Response response) {
            final CountDownLatch countDownLatch;
            final JobRunResult jobRunResult = new JobRunResult();
            jobRunResult.setTime(Long.valueOf(SystemClock.now()));
            jobRunResult.setJobMeta(response.getJobMeta());
            jobRunResult.setAction(response.getAction());
            jobRunResult.setMsg(response.getMsg());
            JobCompletedRequest jobCompletedRequest = (JobCompletedRequest) JobPushProcessor.this.appContext.getCommandBodyWrapper().wrapper(new JobCompletedRequest());
            jobCompletedRequest.addJobResult(jobRunResult);
            jobCompletedRequest.setReceiveNewJob(response.isReceiveNewJob());
            RemotingCommand createRequestCommand = RemotingCommand.createRequestCommand(JobProtos.RequestCode.JOB_COMPLETED.code(), jobCompletedRequest);
            final Response response2 = new Response();
            try {
                countDownLatch = new CountDownLatch(1);
                JobPushProcessor.this.remotingClient.invokeAsync(createRequestCommand, new AsyncCallback() { // from class: com.github.ltsopensource.tasktracker.processor.JobPushProcessor.JobRunnerCallback.1
                    @Override // com.github.ltsopensource.remoting.AsyncCallback
                    public void operationComplete(ResponseFuture responseFuture) {
                        try {
                            RemotingCommand responseCommand = responseFuture.getResponseCommand();
                            if (responseCommand == null || responseCommand.getCode() != RemotingProtos.ResponseCode.SUCCESS.code()) {
                                if (JobPushProcessor.LOGGER.isInfoEnabled()) {
                                    JobPushProcessor.LOGGER.info("Job feedback failed, save local files。{}", jobRunResult);
                                }
                                try {
                                    if (JobPushProcessor.this.isEnableFailStore()) {
                                        JobPushProcessor.this.retryScheduler.inSchedule(jobRunResult.getJobMeta().getJobId().concat("_") + SystemClock.now(), jobRunResult);
                                    } else {
                                        Logger logger = JobPushProcessor.LOGGER;
                                        Object[] objArr = new Object[2];
                                        objArr[0] = responseCommand != null ? Integer.valueOf(responseCommand.getCode()) : null;
                                        objArr[1] = JSON.toJSONString(jobRunResult);
                                        logger.error("Send Job Result to JobTracker Error, code={}, jobRunResult={}", objArr);
                                    }
                                } catch (Exception e) {
                                    JobPushProcessor.LOGGER.error("Job feedback failed", e);
                                }
                            } else {
                                JobPushRequest jobPushRequest = (JobPushRequest) responseCommand.getBody();
                                if (jobPushRequest != null) {
                                    if (JobPushProcessor.LOGGER.isDebugEnabled()) {
                                        JobPushProcessor.LOGGER.debug("Get new job :{}", JSON.toJSONString(jobPushRequest.getJobMetaList()));
                                    }
                                    if (CollectionUtils.isNotEmpty(jobPushRequest.getJobMetaList())) {
                                        response2.setJobMeta(jobPushRequest.getJobMetaList().get(0));
                                    }
                                }
                            }
                        } finally {
                            countDownLatch.countDown();
                        }
                    }
                });
            } catch (JobTrackerNotFoundException e) {
                try {
                    JobPushProcessor.LOGGER.warn("No job tracker available! save local files.");
                    if (JobPushProcessor.this.isEnableFailStore()) {
                        JobPushProcessor.this.retryScheduler.inSchedule(jobRunResult.getJobMeta().getJobId().concat("_") + SystemClock.now(), jobRunResult);
                    } else {
                        JobPushProcessor.LOGGER.error("Send Job Result to JobTracker Error, server is down, jobRunResult={}", JSON.toJSONString(jobRunResult));
                    }
                } catch (Exception e2) {
                    JobPushProcessor.LOGGER.error("Save files failed, {}", jobRunResult.getJobMeta(), e2);
                }
            }
            try {
                countDownLatch.await(60000L, TimeUnit.MILLISECONDS);
                return response2.getJobMeta();
            } catch (InterruptedException e3) {
                throw new RequestTimeoutException(e3);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public JobPushProcessor(TaskTrackerAppContext taskTrackerAppContext) {
        super(taskTrackerAppContext);
        this.remotingClient = taskTrackerAppContext.getRemotingClient();
        this.jobRunnerCallback = new JobRunnerCallback();
        if (isEnableFailStore()) {
            this.retryScheduler = new RetryScheduler<JobRunResult>(JobPushProcessor.class.getSimpleName(), taskTrackerAppContext, FailStorePathBuilder.getJobFeedbackPath(taskTrackerAppContext), 3) { // from class: com.github.ltsopensource.tasktracker.processor.JobPushProcessor.1
                @Override // com.github.ltsopensource.core.support.RetryScheduler
                protected boolean isRemotingEnable() {
                    return JobPushProcessor.this.remotingClient.isServerEnable();
                }

                @Override // com.github.ltsopensource.core.support.RetryScheduler
                protected boolean retry(List<JobRunResult> list) {
                    return JobPushProcessor.this.retrySendJobResults(list);
                }
            };
            this.retryScheduler.start();
            NodeShutdownHook.registerHook(taskTrackerAppContext, getClass().getName(), new Callable() { // from class: com.github.ltsopensource.tasktracker.processor.JobPushProcessor.2
                @Override // com.github.ltsopensource.core.commons.utils.Callable
                public void call() throws Exception {
                    JobPushProcessor.this.retryScheduler.stop();
                }
            });
        }
    }

    @Override // com.github.ltsopensource.remoting.RemotingProcessor
    public RemotingCommand processRequest(Channel channel, RemotingCommand remotingCommand) throws RemotingCommandException {
        ArrayList arrayList = null;
        for (JobMeta jobMeta : ((JobPushRequest) remotingCommand.getBody()).getJobMetaList()) {
            try {
                this.appContext.getRunnerPool().execute(jobMeta, this.jobRunnerCallback);
            } catch (NoAvailableJobRunnerException e) {
                if (arrayList == null) {
                    arrayList = new ArrayList();
                }
                arrayList.add(jobMeta.getJobId());
            }
        }
        if (!CollectionUtils.isNotEmpty(arrayList)) {
            return RemotingCommand.createResponseCommand(JobProtos.ResponseCode.JOB_PUSH_SUCCESS.code(), "job push success!");
        }
        JobPushResponse jobPushResponse = new JobPushResponse();
        jobPushResponse.setFailedJobIds(arrayList);
        return RemotingCommand.createResponseCommand(JobProtos.ResponseCode.NO_AVAILABLE_JOB_RUNNER.code(), jobPushResponse);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean isEnableFailStore() {
        return !this.appContext.getConfig().getParameter(ExtConfig.TASK_TRACKER_JOB_RESULT_FAIL_STORE_CLOSE, false);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean retrySendJobResults(List<JobRunResult> list) {
        JobCompletedRequest jobCompletedRequest = (JobCompletedRequest) this.appContext.getCommandBodyWrapper().wrapper(new JobCompletedRequest());
        jobCompletedRequest.setJobRunResults(list);
        jobCompletedRequest.setReSend(true);
        try {
            RemotingCommand invokeSync = this.remotingClient.invokeSync(RemotingCommand.createRequestCommand(JobProtos.RequestCode.JOB_COMPLETED.code(), jobCompletedRequest));
            if (invokeSync != null && invokeSync.getCode() == RemotingProtos.ResponseCode.SUCCESS.code()) {
                return true;
            }
            LOGGER.warn("Send job failed, {}", invokeSync);
            return false;
        } catch (JobTrackerNotFoundException e) {
            LOGGER.error("Retry send job result failed! jobResults={}", list, e);
            return false;
        }
    }
}
