package com.github.ltsopensource.jobtracker.support;

import com.github.ltsopensource.core.commons.utils.CollectionUtils;
import com.github.ltsopensource.core.commons.utils.DotLogUtils;
import com.github.ltsopensource.core.commons.utils.Holder;
import com.github.ltsopensource.core.constant.Constants;
import com.github.ltsopensource.core.constant.ExtConfig;
import com.github.ltsopensource.core.exception.RemotingSendException;
import com.github.ltsopensource.core.exception.RequestTimeoutException;
import com.github.ltsopensource.core.factory.NamedThreadFactory;
import com.github.ltsopensource.core.json.JSON;
import com.github.ltsopensource.core.logger.Logger;
import com.github.ltsopensource.core.logger.LoggerFactory;
import com.github.ltsopensource.core.protocol.JobProtos;
import com.github.ltsopensource.core.protocol.command.JobPullRequest;
import com.github.ltsopensource.core.protocol.command.JobPushRequest;
import com.github.ltsopensource.core.protocol.command.JobPushResponse;
import com.github.ltsopensource.core.remoting.RemotingServerDelegate;
import com.github.ltsopensource.core.support.JobDomainConverter;
import com.github.ltsopensource.jobtracker.domain.JobTrackerAppContext;
import com.github.ltsopensource.jobtracker.domain.TaskTrackerNode;
import com.github.ltsopensource.jobtracker.monitor.JobTrackerMStatReporter;
import com.github.ltsopensource.jobtracker.sender.JobPushResult;
import com.github.ltsopensource.jobtracker.sender.JobSender;
import com.github.ltsopensource.queue.domain.JobPo;
import com.github.ltsopensource.remoting.AsyncCallback;
import com.github.ltsopensource.remoting.ResponseFuture;
import com.github.ltsopensource.remoting.protocol.RemotingCommand;
import com.github.ltsopensource.store.jdbc.exception.DupEntryException;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

/* loaded from: input_file:com/github/ltsopensource/jobtracker/support/JobPusher.class */
public class JobPusher {
    private JobTrackerAppContext appContext;
    private final ExecutorService pushExecutorService;
    private JobTrackerMStatReporter stat;
    private RemotingServerDelegate remotingServer;
    private int jobPushBatchSize;
    private final Logger LOGGER = LoggerFactory.getLogger((Class<?>) JobPusher.class);
    private ConcurrentHashMap<String, AtomicBoolean> PUSHING_FLAG = new ConcurrentHashMap<>();
    private final ExecutorService executorService = Executors.newFixedThreadPool(Constants.AVAILABLE_PROCESSOR * 5, new NamedThreadFactory(JobPusher.class.getSimpleName() + "-Executor", true));

    public JobPusher(JobTrackerAppContext jobTrackerAppContext) {
        this.jobPushBatchSize = 10;
        this.appContext = jobTrackerAppContext;
        this.pushExecutorService = Executors.newFixedThreadPool(jobTrackerAppContext.getConfig().getParameter(ExtConfig.JOB_TRACKER_PUSHER_THREAD_NUM, Constants.DEFAULT_JOB_TRACKER_PUSHER_THREAD_NUM), new NamedThreadFactory(JobPusher.class.getSimpleName() + "-AsyncPusher", true));
        this.stat = (JobTrackerMStatReporter) jobTrackerAppContext.getMStatReporter();
        this.remotingServer = jobTrackerAppContext.getRemotingServer();
        this.jobPushBatchSize = jobTrackerAppContext.getConfig().getParameter(ExtConfig.JOB_TRACKER_PUSH_BATCH_SIZE, 10);
    }

    public void push(final JobPullRequest jobPullRequest) {
        this.executorService.submit(new Runnable() { // from class: com.github.ltsopensource.jobtracker.support.JobPusher.1
            @Override // java.lang.Runnable
            public void run() {
                try {
                    JobPusher.this.push0(jobPullRequest);
                } catch (Exception e) {
                    JobPusher.this.LOGGER.error("Job push failed!", e);
                }
            }
        });
    }

    private AtomicBoolean getPushingFlag(TaskTrackerNode taskTrackerNode) {
        AtomicBoolean atomicBoolean = this.PUSHING_FLAG.get(taskTrackerNode.getIdentity());
        if (atomicBoolean == null) {
            atomicBoolean = new AtomicBoolean(false);
            AtomicBoolean putIfAbsent = this.PUSHING_FLAG.putIfAbsent(taskTrackerNode.getIdentity(), atomicBoolean);
            if (putIfAbsent != null) {
                atomicBoolean = putIfAbsent;
            }
        }
        return atomicBoolean;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void push0(JobPullRequest jobPullRequest) {
        String nodeGroup = jobPullRequest.getNodeGroup();
        String identity = jobPullRequest.getIdentity();
        this.appContext.getTaskTrackerManager().updateTaskTrackerAvailableThreads(nodeGroup, identity, jobPullRequest.getAvailableThreads(), jobPullRequest.getTimestamp());
        final TaskTrackerNode taskTrackerNode = this.appContext.getTaskTrackerManager().getTaskTrackerNode(nodeGroup, identity);
        if (taskTrackerNode == null) {
            if (this.LOGGER.isDebugEnabled()) {
                this.LOGGER.debug("taskTrackerNodeGroup:{}, taskTrackerIdentity:{} , didn't have node.", nodeGroup, identity);
                return;
            }
            return;
        }
        int i = taskTrackerNode.getAvailableThread().get();
        if (i <= 0) {
            return;
        }
        AtomicBoolean pushingFlag = getPushingFlag(taskTrackerNode);
        if (pushingFlag.compareAndSet(false, true)) {
            try {
                int i2 = this.jobPushBatchSize;
                int i3 = i % i2 == 0 ? i / i2 : (i / i2) + 1;
                final CountDownLatch countDownLatch = new CountDownLatch(i3);
                for (int i4 = 1; i4 <= i3; i4++) {
                    int i5 = i2;
                    if (i4 == i3) {
                        i5 = i - (i2 * (i3 - 1));
                    }
                    final int i6 = i5;
                    this.pushExecutorService.execute(new Runnable() { // from class: com.github.ltsopensource.jobtracker.support.JobPusher.2
                        @Override // java.lang.Runnable
                        public void run() {
                            try {
                                try {
                                    JobPusher.this.send(JobPusher.this.remotingServer, i6, taskTrackerNode);
                                    countDownLatch.countDown();
                                } catch (Throwable th) {
                                    JobPusher.this.LOGGER.error("Error on Push Job to {}", taskTrackerNode, th);
                                    countDownLatch.countDown();
                                }
                            } catch (Throwable th2) {
                                countDownLatch.countDown();
                                throw th2;
                            }
                        }
                    });
                }
                try {
                    countDownLatch.await();
                    DotLogUtils.dot("taskTrackerNodeGroup:{}, taskTrackerIdentity:{} , pushing finished. batchTimes:{}, size:{}", nodeGroup, identity, Integer.valueOf(i3), Integer.valueOf(i));
                    pushingFlag.compareAndSet(true, false);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            } catch (Throwable th) {
                pushingFlag.compareAndSet(true, false);
                throw th;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public JobPushResult send(final RemotingServerDelegate remotingServerDelegate, int i, final TaskTrackerNode taskTrackerNode) {
        final String nodeGroup = taskTrackerNode.getNodeGroup();
        final String identity = taskTrackerNode.getIdentity();
        return (JobPushResult) this.appContext.getJobSender().send(nodeGroup, identity, i, new JobSender.SendInvoker() { // from class: com.github.ltsopensource.jobtracker.support.JobPusher.3
            @Override // com.github.ltsopensource.jobtracker.sender.JobSender.SendInvoker
            public JobSender.SendResult invoke(final List<JobPo> list) {
                JobPushRequest jobPushRequest = (JobPushRequest) JobPusher.this.appContext.getCommandBodyWrapper().wrapper(new JobPushRequest());
                jobPushRequest.setJobMetaList(JobDomainConverter.convert(list));
                RemotingCommand createRequestCommand = RemotingCommand.createRequestCommand(JobProtos.RequestCode.PUSH_JOB.code(), jobPushRequest);
                final Holder holder = new Holder(false);
                final CountDownLatch countDownLatch = new CountDownLatch(1);
                try {
                    remotingServerDelegate.invokeAsync(taskTrackerNode.getChannel().getChannel(), createRequestCommand, new AsyncCallback() { // from class: com.github.ltsopensource.jobtracker.support.JobPusher.3.1
                        @Override // com.github.ltsopensource.remoting.AsyncCallback
                        public void operationComplete(ResponseFuture responseFuture) {
                            try {
                                RemotingCommand responseCommand = responseFuture.getResponseCommand();
                                if (responseCommand == null) {
                                    JobPusher.this.LOGGER.warn("Job push failed! response command is null!");
                                    countDownLatch.countDown();
                                    return;
                                }
                                if (responseCommand.getCode() == JobProtos.ResponseCode.JOB_PUSH_SUCCESS.code()) {
                                    if (JobPusher.this.LOGGER.isDebugEnabled()) {
                                        JobPusher.this.LOGGER.debug("Job push success! nodeGroup=" + nodeGroup + ", identity=" + identity + ", jobList=" + JSON.toJSONString(list));
                                    }
                                    holder.set(true);
                                    JobPusher.this.stat.incPushJobNum(list.size());
                                } else if (responseCommand.getCode() == JobProtos.ResponseCode.NO_AVAILABLE_JOB_RUNNER.code()) {
                                    JobPushResponse jobPushResponse = (JobPushResponse) responseCommand.getBody();
                                    if (jobPushResponse == null || !CollectionUtils.isNotEmpty(jobPushResponse.getFailedJobIds())) {
                                        JobPusher.this.stat.incPushJobNum(list.size());
                                    } else {
                                        for (String str : jobPushResponse.getFailedJobIds()) {
                                            Iterator it = list.iterator();
                                            while (true) {
                                                if (it.hasNext()) {
                                                    JobPo jobPo = (JobPo) it.next();
                                                    if (str.equals(jobPo.getJobId())) {
                                                        JobPusher.this.resumeJob(jobPo);
                                                        break;
                                                    }
                                                }
                                            }
                                        }
                                        JobPusher.this.stat.incPushJobNum(list.size() - jobPushResponse.getFailedJobIds().size());
                                    }
                                    holder.set(true);
                                }
                            } finally {
                                countDownLatch.countDown();
                            }
                        }
                    });
                    try {
                        countDownLatch.await(60000L, TimeUnit.MILLISECONDS);
                        if (((Boolean) holder.get()).booleanValue()) {
                            return new JobSender.SendResult(true, JobPushResult.SUCCESS);
                        }
                        if (JobPusher.this.LOGGER.isDebugEnabled()) {
                            JobPusher.this.LOGGER.debug("Job push failed! nodeGroup=" + nodeGroup + ", identity=" + identity + ", jobs=" + JSON.toJSONObject(list));
                        }
                        Iterator<JobPo> it = list.iterator();
                        while (it.hasNext()) {
                            JobPusher.this.resumeJob(it.next());
                        }
                        return new JobSender.SendResult(false, JobPushResult.SENT_ERROR);
                    } catch (InterruptedException e) {
                        throw new RequestTimeoutException(e);
                    }
                } catch (RemotingSendException e2) {
                    JobPusher.this.LOGGER.error("Remoting send error, jobPos={}", JSON.toJSONObject(list), e2);
                    return new JobSender.SendResult(false, JobPushResult.SENT_ERROR);
                }
            }
        }).getReturnValue();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void resumeJob(JobPo jobPo) {
        boolean z = true;
        try {
            jobPo.setIsRunning(true);
            this.appContext.getExecutableJobQueue().add(jobPo);
        } catch (DupEntryException e) {
            this.LOGGER.warn("ExecutableJobQueue already exist:" + JSON.toJSONString(jobPo));
            z = false;
        }
        this.appContext.getExecutingJobQueue().remove(jobPo.getJobId());
        if (z) {
            this.appContext.getExecutableJobQueue().resume(jobPo);
        }
    }
}
