/*
 * Decompiled with CFR 0.152.
 */
package com.github.ltsopensource.jobtracker.support.checker;

import com.github.ltsopensource.biz.logger.domain.JobLogPo;
import com.github.ltsopensource.biz.logger.domain.LogType;
import com.github.ltsopensource.core.cluster.NodeType;
import com.github.ltsopensource.core.commons.utils.CollectionUtils;
import com.github.ltsopensource.core.commons.utils.QuietUtils;
import com.github.ltsopensource.core.constant.Level;
import com.github.ltsopensource.core.exception.RemotingSendException;
import com.github.ltsopensource.core.factory.NamedThreadFactory;
import com.github.ltsopensource.core.json.JSON;
import com.github.ltsopensource.core.logger.Logger;
import com.github.ltsopensource.core.logger.LoggerFactory;
import com.github.ltsopensource.core.protocol.JobProtos;
import com.github.ltsopensource.core.protocol.command.JobAskRequest;
import com.github.ltsopensource.core.protocol.command.JobAskResponse;
import com.github.ltsopensource.core.remoting.RemotingServerDelegate;
import com.github.ltsopensource.core.support.JobDomainConverter;
import com.github.ltsopensource.core.support.SystemClock;
import com.github.ltsopensource.jobtracker.channel.ChannelWrapper;
import com.github.ltsopensource.jobtracker.domain.JobTrackerAppContext;
import com.github.ltsopensource.jobtracker.monitor.JobTrackerMStatReporter;
import com.github.ltsopensource.queue.domain.JobPo;
import com.github.ltsopensource.remoting.AsyncCallback;
import com.github.ltsopensource.remoting.Channel;
import com.github.ltsopensource.remoting.ResponseFuture;
import com.github.ltsopensource.remoting.protocol.RemotingCommand;
import com.github.ltsopensource.remoting.protocol.RemotingProtos;
import com.github.ltsopensource.store.jdbc.exception.DupEntryException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

public class ExecutingDeadJobChecker {
    private static final Logger LOGGER = LoggerFactory.getLogger(ExecutingDeadJobChecker.class);
    private final ScheduledExecutorService FIXED_EXECUTOR_SERVICE = Executors.newScheduledThreadPool(1, new NamedThreadFactory("LTS-ExecutingJobQueue-Fix-Executor", true));
    private JobTrackerAppContext appContext;
    private JobTrackerMStatReporter stat;
    private AtomicBoolean start = new AtomicBoolean(false);
    private ScheduledFuture<?> scheduledFuture;

    public ExecutingDeadJobChecker(JobTrackerAppContext appContext) {
        this.appContext = appContext;
        this.stat = (JobTrackerMStatReporter)appContext.getMStatReporter();
    }

    public void start() {
        try {
            if (this.start.compareAndSet(false, true)) {
                int fixCheckPeriodSeconds = this.appContext.getConfig().getParameter("jobtracker.executing.job.fix.check.interval.seconds", 30);
                if (fixCheckPeriodSeconds < 5) {
                    fixCheckPeriodSeconds = 5;
                } else if (fixCheckPeriodSeconds > 300) {
                    fixCheckPeriodSeconds = 300;
                }
                this.scheduledFuture = this.FIXED_EXECUTOR_SERVICE.scheduleWithFixedDelay(new Runnable(){

                    @Override
                    public void run() {
                        try {
                            if (!ExecutingDeadJobChecker.this.appContext.getRegistryStatMonitor().isAvailable()) {
                                return;
                            }
                            ExecutingDeadJobChecker.this.checkAndFix();
                        }
                        catch (Throwable t) {
                            LOGGER.error("Check executing dead job error ", t);
                        }
                    }
                }, fixCheckPeriodSeconds, fixCheckPeriodSeconds, TimeUnit.SECONDS);
            }
            LOGGER.info("Executing dead job checker started!");
        }
        catch (Throwable e) {
            LOGGER.error("Executing dead job checker start failed!", e);
        }
    }

    private void checkAndFix() throws RemotingSendException {
        int maxDeadCheckTime = this.appContext.getConfig().getParameter("jobtracker.executing.job.fix.deadline.seconds", 20);
        if (maxDeadCheckTime < 10) {
            maxDeadCheckTime = 10;
        } else if (maxDeadCheckTime > 300) {
            maxDeadCheckTime = 300;
        }
        List<JobPo> maybeDeadJobPos = this.appContext.getExecutingJobQueue().getDeadJobs(SystemClock.now() - (long)(maxDeadCheckTime * 1000));
        if (CollectionUtils.isNotEmpty(maybeDeadJobPos)) {
            HashMap<String, ArrayList<JobPo>> jobMap = new HashMap<String, ArrayList<JobPo>>();
            for (JobPo jobPo : maybeDeadJobPos) {
                ArrayList<JobPo> jobPos = (ArrayList<JobPo>)jobMap.get(jobPo.getTaskTrackerIdentity());
                if (jobPos == null) {
                    jobPos = new ArrayList<JobPo>();
                    jobMap.put(jobPo.getTaskTrackerIdentity(), jobPos);
                }
                jobPos.add(jobPo);
            }
            for (Map.Entry entry : jobMap.entrySet()) {
                String taskTrackerNodeGroup = ((JobPo)((List)entry.getValue()).get(0)).getTaskTrackerNodeGroup();
                String taskTrackerIdentity = (String)entry.getKey();
                ChannelWrapper channelWrapper = this.appContext.getChannelManager().getChannel(taskTrackerNodeGroup, NodeType.TASK_TRACKER, taskTrackerIdentity);
                if (channelWrapper == null && taskTrackerIdentity != null) {
                    Long offlineTimestamp = this.appContext.getChannelManager().getOfflineTimestamp(taskTrackerIdentity);
                    if (offlineTimestamp != null && SystemClock.now() - offlineTimestamp <= 10000L) continue;
                    this.fixDeadJob((List)entry.getValue());
                    continue;
                }
                if (channelWrapper == null || channelWrapper.getChannel() == null || !channelWrapper.isOpen()) continue;
                this.askTimeoutJob(channelWrapper.getChannel(), (List)entry.getValue());
            }
        }
    }

    private void askTimeoutJob(Channel channel, final List<JobPo> jobPos) {
        try {
            RemotingServerDelegate remotingServer = this.appContext.getRemotingServer();
            ArrayList<String> jobIds = new ArrayList<String>(jobPos.size());
            for (JobPo jobPo : jobPos) {
                jobIds.add(jobPo.getJobId());
            }
            JobAskRequest requestBody = this.appContext.getCommandBodyWrapper().wrapper(new JobAskRequest());
            requestBody.setJobIds(jobIds);
            RemotingCommand request = RemotingCommand.createRequestCommand(JobProtos.RequestCode.JOB_ASK.code(), requestBody);
            remotingServer.invokeAsync(channel, request, new AsyncCallback(){

                @Override
                public void operationComplete(ResponseFuture responseFuture) {
                    JobAskResponse responseBody;
                    List<String> deadJobIds;
                    RemotingCommand response = responseFuture.getResponseCommand();
                    if (response != null && RemotingProtos.ResponseCode.SUCCESS.code() == response.getCode() && CollectionUtils.isNotEmpty(deadJobIds = (responseBody = (JobAskResponse)response.getBody()).getJobIds())) {
                        QuietUtils.sleep(ExecutingDeadJobChecker.this.appContext.getConfig().getParameter("jobtracker.fix.executing.job.waiting.mills", 1000L));
                        for (JobPo jobPo : jobPos) {
                            if (!deadJobIds.contains(jobPo.getJobId())) continue;
                            ExecutingDeadJobChecker.this.fixDeadJob(jobPo);
                        }
                    }
                }
            });
        }
        catch (RemotingSendException e) {
            LOGGER.error("Ask timeout Job error, ", e);
        }
    }

    private void fixDeadJob(List<JobPo> jobPos) {
        for (JobPo jobPo : jobPos) {
            this.fixDeadJob(jobPo);
        }
    }

    private void fixDeadJob(JobPo jobPo) {
        try {
            if (this.appContext.getExecutingJobQueue().getJob(jobPo.getJobId()) == null) {
                return;
            }
            jobPo.setGmtModified(SystemClock.now());
            jobPo.setTaskTrackerIdentity(null);
            jobPo.setIsRunning(false);
            try {
                this.appContext.getExecutableJobQueue().add(jobPo);
            }
            catch (DupEntryException e) {
                LOGGER.warn("ExecutableJobQueue already exist:" + JSON.toJSONString(jobPo));
            }
            this.appContext.getExecutingJobQueue().remove(jobPo.getJobId());
            JobLogPo jobLogPo = JobDomainConverter.convertJobLog(jobPo);
            jobLogPo.setLogTime(SystemClock.now());
            jobLogPo.setSuccess(true);
            jobLogPo.setLevel(Level.WARN);
            jobLogPo.setLogType(LogType.FIXED_DEAD);
            this.appContext.getJobLogger().log(jobLogPo);
            this.stat.incFixExecutingJobNum();
        }
        catch (Throwable t) {
            LOGGER.error(t.getMessage(), t);
        }
        LOGGER.info("checkAndFix dead job ! {}", JSON.toJSONString(jobPo));
    }

    public void stop() {
        try {
            if (this.start.compareAndSet(true, false)) {
                this.scheduledFuture.cancel(true);
                this.FIXED_EXECUTOR_SERVICE.shutdown();
            }
            LOGGER.info("Executing dead job checker stopped!");
        }
        catch (Throwable t) {
            LOGGER.error("Executing dead job checker stop failed!", t);
        }
    }
}

