/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.schedulerx.worker.master;

import akka.actor.ActorContext;
import akka.actor.ActorSelection;
import com.alibaba.schedulerx.common.domain.InstanceStatus;
import com.alibaba.schedulerx.common.domain.JobInstanceInfo;
import com.alibaba.schedulerx.common.domain.TaskStatus;
import com.alibaba.schedulerx.common.domain.WorkerProgressCounter;
import com.alibaba.schedulerx.common.util.ExceptionUtil;
import com.alibaba.schedulerx.common.util.IdUtil;
import com.alibaba.schedulerx.common.util.JsonUtil;
import com.alibaba.schedulerx.protocol.Worker;
import com.alibaba.schedulerx.protocol.utils.FutureUtils;
import com.alibaba.schedulerx.shade.com.google.common.collect.Maps;
import com.alibaba.schedulerx.worker.SchedulerxWorker;
import com.alibaba.schedulerx.worker.domain.JobContext;
import com.alibaba.schedulerx.worker.domain.MapTaskProgress;
import com.alibaba.schedulerx.worker.log.LogFactory;
import com.alibaba.schedulerx.worker.log.Logger;
import com.alibaba.schedulerx.worker.logcollector.ClientLoggerMessage;
import com.alibaba.schedulerx.worker.logcollector.LogCollector;
import com.alibaba.schedulerx.worker.logcollector.LogCollectorFactory;
import com.alibaba.schedulerx.worker.master.TaskMaster;
import com.alibaba.schedulerx.worker.processor.JobProcessor;
import com.alibaba.schedulerx.worker.processor.MapJobProcessor;
import com.alibaba.schedulerx.worker.processor.ProcessResult;
import com.alibaba.schedulerx.worker.util.ActorPathUtil;
import com.alibaba.schedulerx.worker.util.JavaProcessorProfileUtil;
import com.alibaba.schedulerx.worker.util.WorkerIdGenerator;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;

public class BroadcastTaskMaster
extends TaskMaster {
    private static final Logger LOGGER = LogFactory.getLogger(BroadcastTaskMaster.class);
    private Map<String, String> worker2uniqueIdMap = Maps.newConcurrentMap();
    private Map<String, WorkerProgressCounter> workerProgressMap = Maps.newConcurrentMap();
    private LogCollector logCollector = LogCollectorFactory.get();

    public BroadcastTaskMaster(JobInstanceInfo jobInstanceInfo, ActorContext actorContext) throws Exception {
        super(jobInstanceInfo, actorContext);
    }

    @Override
    public void submitInstance(JobInstanceInfo info) {
        if ("java".equalsIgnoreCase(info.getJobType())) {
            try {
                this.preProcess(info);
            }
            catch (Exception e) {
                LOGGER.error("BroadcastTaskMaster.preProcess failed, jobInstanceId={}", info.getJobInstanceId(), e);
                String uniqueId = IdUtil.getUniqueId(info.getJobId(), info.getJobInstanceId(), 0L);
                this.logCollector.collect(uniqueId, ClientLoggerMessage.appendMessage("[BroadcastTaskMaster-submitTask]broadcast task init fail worker addr is ", SchedulerxWorker.WORKER_ADDR, ExceptionUtil.getMessage(e)));
                Worker.ContainerReportTaskStatusRequest faileRequest = Worker.ContainerReportTaskStatusRequest.newBuilder().setJobId(info.getJobId()).setJobInstanceId(info.getJobInstanceId()).setTaskId(0L).setStatus(TaskStatus.FAILED.getValue()).setWorkerId(WorkerIdGenerator.get()).setWorkerAddr(SchedulerxWorker.WORKER_ADDR).build();
                this.updateTaskStatus(faileRequest);
                return;
            }
        }
        List<String> allWorkers = info.getAllWorkers();
        for (String workerIdAddr : allWorkers) {
            Worker.ContainerReportTaskStatusRequest faileRequest;
            String[] workerInfo = workerIdAddr.split("@");
            String workerAddr = workerInfo[1];
            String workerId = workerInfo[0];
            ActorSelection selection = this.getActorContext().actorSelection(ActorPathUtil.getContainerRouterPath(workerIdAddr));
            long taskId = this.aquireTaskId();
            String uniqueId = IdUtil.getUniqueId(info.getJobId(), info.getJobInstanceId(), taskId);
            Worker.MasterStartContainerRequest request = this.convert2StartContainerRequest(info, taskId);
            try {
                this.taskStatusMap.put(uniqueId, TaskStatus.RUNNING);
                if (!this.workerProgressMap.containsKey(workerAddr)) {
                    WorkerProgressCounter workerProgressCounter = new WorkerProgressCounter(workerAddr);
                    this.workerProgressMap.put(workerAddr, workerProgressCounter);
                }
                this.workerProgressMap.get(workerAddr).incrementTotal();
                this.workerProgressMap.get(workerAddr).incrementRunning();
                Worker.MasterStartContainerResponse response = (Worker.MasterStartContainerResponse)FutureUtils.awaitResult((ActorSelection)selection, (Object)request, (long)10L);
                if (response.getSuccess()) {
                    this.worker2uniqueIdMap.put(workerIdAddr, uniqueId);
                    this.logCollector.collect(uniqueId, ClientLoggerMessage.appendMessage("[BroadcastTaskMaster-submitTask]broadcast task init success worker addr is ", workerAddr));
                    continue;
                }
                LOGGER.error("submitTask[{}] to worker error, {}", uniqueId, workerAddr, response.getMessage());
                this.logCollector.collect(uniqueId, ClientLoggerMessage.appendMessage("[BroadcastTaskMaster-submitTask]broadcast task init fail worker addr is ", workerAddr, response.getMessage()));
                faileRequest = Worker.ContainerReportTaskStatusRequest.newBuilder().setJobId(info.getJobId()).setJobInstanceId(info.getJobInstanceId()).setTaskId(taskId).setStatus(TaskStatus.FAILED.getValue()).setWorkerId(workerId).setWorkerAddr(workerAddr).build();
                this.updateTaskStatus(faileRequest);
            }
            catch (Throwable e) {
                LOGGER.error("start container failed, worker:{}, uniqueId:{}", workerAddr, uniqueId, e);
                this.logCollector.collect(uniqueId, ClientLoggerMessage.appendMessage("[BroadcastTaskMaster-submitTask]broadcast task init fail worker addr is ", workerAddr), e);
                faileRequest = Worker.ContainerReportTaskStatusRequest.newBuilder().setJobId(info.getJobId()).setJobInstanceId(info.getJobInstanceId()).setTaskId(taskId).setStatus(TaskStatus.FAILED.getValue()).setWorkerAddr(workerAddr).setWorkerId(workerId).build();
                this.updateTaskStatus(faileRequest);
            }
        }
        this.init();
    }

    @Override
    public void killInstance(String reason) {
        super.killInstance(reason);
        String uniqueId = IdUtil.getUniqueIdWithoutTask(this.jobInstanceInfo.getJobId(), this.jobInstanceInfo.getJobInstanceId());
        this.updateNewInstanceStatus(this.getSerialNum(), this.jobInstanceInfo.getJobInstanceId(), InstanceStatus.FAILED, reason);
        List<String> allWorkers = this.jobInstanceInfo.getAllWorkers();
        for (String workerIdAddr : allWorkers) {
            try {
                ActorSelection selection = this.getActorContext().actorSelection(ActorPathUtil.getContainerRouterPath(workerIdAddr));
                Worker.MasterKillContainerRequest request = Worker.MasterKillContainerRequest.newBuilder().setJobId(this.jobInstanceInfo.getJobId()).setJobInstanceId(this.jobInstanceInfo.getJobInstanceId()).build();
                selection.tell((Object)request, null);
            }
            catch (Throwable e) {
                this.logCollector.collect(uniqueId, ClientLoggerMessage.appendMessage("[Master-killInstance]kill instance tell worker fail worker addr is ", workerIdAddr), e);
                LOGGER.error("send kill instance request exception, worker:{}, uninqueId:{}", workerIdAddr, uniqueId);
            }
        }
    }

    @Override
    public void destroyContainerPool() {
        List<String> allWorkers = this.jobInstanceInfo.getAllWorkers();
        for (String workerIdAddr : allWorkers) {
            Worker.MasterDestroyContainerPoolRequest request = Worker.MasterDestroyContainerPoolRequest.newBuilder().setJobInstanceId(this.jobInstanceInfo.getJobInstanceId()).setJobId(this.jobInstanceInfo.getJobId()).setWorkerIdAddr(workerIdAddr).setSerialNum(this.getSerialNum()).build();
            SchedulerxWorker.AtLeastDeliveryRoutingActor.tell((Object)request, null);
        }
    }

    @Override
    public void updateTaskStatus(Worker.ContainerReportTaskStatusRequest request) {
        long jobId = request.getJobId();
        long jobInstanceId = request.getJobInstanceId();
        long taskId = request.getTaskId();
        String workerAddr = request.getWorkerAddr();
        TaskStatus taskStatus = TaskStatus.parseValue(request.getStatus());
        String uniqueId = IdUtil.getUniqueId(jobId, jobInstanceId, taskId);
        this.taskStatusMap.put(uniqueId, taskStatus);
        if (!this.workerProgressMap.containsKey(workerAddr)) {
            WorkerProgressCounter workerProgressCounter = new WorkerProgressCounter(workerAddr);
            this.workerProgressMap.put(workerAddr, workerProgressCounter);
        }
        if (taskStatus.equals((Object)TaskStatus.RUNNING)) {
            this.workerProgressMap.get(workerAddr).incrementRunning();
        } else if (taskStatus.equals((Object)TaskStatus.SUCCESS)) {
            this.workerProgressMap.get(workerAddr).incrementSuccess();
        } else if (taskStatus.equals((Object)TaskStatus.FAILED)) {
            this.workerProgressMap.get(workerAddr).incrementFailed();
        }
        InstanceStatus newStatus = InstanceStatus.UNKNOWN;
        if (this.taskStatusMap.size() > 0) {
            if (!this.isJobInstanceFinished()) {
                newStatus = InstanceStatus.RUNNING;
            } else {
                newStatus = InstanceStatus.SUCCESS;
                if (!newStatus.equals((Object)InstanceStatus.FAILED)) {
                    for (TaskStatus status : this.taskStatusMap.values()) {
                        if (!status.equals((Object)TaskStatus.FAILED)) continue;
                        newStatus = InstanceStatus.FAILED;
                        break;
                    }
                }
            }
        }
        this.updateNewInstanceStatus(request.getSerialNum(), jobInstanceId, newStatus, request.getResult());
    }

    @Override
    public String getJobInstanceProgress() {
        MapTaskProgress detail = new MapTaskProgress();
        detail.setWorkerProgress(this.workerProgressMap.values());
        return JsonUtil.toJson(detail);
    }

    @Override
    protected void init() {
        if (this.INITED) {
            return;
        }
        super.init();
        new Thread(new Runnable(){

            @Override
            public void run() {
                while (!BroadcastTaskMaster.this.instanceStatus.isFinish()) {
                    BroadcastTaskMaster.this.aliveCheckWorkerSet.addAll(BroadcastTaskMaster.this.jobInstanceInfo.getAllWorkers());
                    for (String workerIdAddr : BroadcastTaskMaster.this.aliveCheckWorkerSet) {
                        try {
                            ActorSelection selection = BroadcastTaskMaster.this.getActorContext().actorSelection(ActorPathUtil.getWorkerHeartbeatRouterPath(workerIdAddr));
                            Worker.MasterCheckWorkerAliveRequest request = Worker.MasterCheckWorkerAliveRequest.newBuilder().setJobInstanceId(BroadcastTaskMaster.this.jobInstanceInfo.getJobInstanceId()).build();
                            FutureUtils.awaitResult((ActorSelection)selection, (Object)request, (long)10L);
                        }
                        catch (TimeoutException e) {
                            String uniqueId = (String)BroadcastTaskMaster.this.worker2uniqueIdMap.get(workerIdAddr);
                            if (uniqueId != null) {
                                String[] workerInfo = workerIdAddr.split("@");
                                String workerAddr = workerInfo[1];
                                String workerId = workerInfo[0];
                                String[] tokens = uniqueId.split("_");
                                Worker.ContainerReportTaskStatusRequest request = Worker.ContainerReportTaskStatusRequest.newBuilder().setJobId(Long.valueOf(tokens[0]).longValue()).setJobInstanceId(Long.valueOf(tokens[1]).longValue()).setTaskId(Long.valueOf(tokens[2]).longValue()).setStatus(TaskStatus.FAILED.getValue()).setWorkerAddr(workerAddr).setWorkerId(workerId).build();
                                BroadcastTaskMaster.this.updateTaskStatus(request);
                                LOGGER.warn("worker[{}] is down, set {} to failed", workerAddr, uniqueId);
                                continue;
                            }
                            LOGGER.error("can't found workerAddr of uniqueId={}", uniqueId);
                        }
                        catch (Throwable e) {
                            LOGGER.error("", e);
                        }
                    }
                    try {
                        Thread.sleep(5000L);
                    }
                    catch (InterruptedException e) {
                        LOGGER.error("", e);
                        break;
                    }
                }
            }
        }, "Schedulerx-BroadcastTaskMaster-check-worker-alive-thread-" + this.jobInstanceInfo.getJobId() + "_" + this.jobInstanceInfo.getJobInstanceId()).start();
    }

    public Map<String, WorkerProgressCounter> getWorkerProgressMap() {
        return this.workerProgressMap;
    }

    @Override
    protected void checkProcessor() throws Exception {
        JobProcessor processor;
        if ("java".equalsIgnoreCase(this.jobInstanceInfo.getJobType()) && (processor = JavaProcessorProfileUtil.getJavaProcessor(this.jobInstanceInfo.getContent())) instanceof MapJobProcessor) {
            throw new IOException(processor.getClass().getName() + " shouldn't extends MapJobProcessor or MapReduceJobProcessor");
        }
    }

    @Override
    public ProcessResult postFinish(long jobInstanceId) {
        ProcessResult postResult = null;
        if ("java".equalsIgnoreCase(this.jobInstanceInfo.getJobType())) {
            try {
                JobContext context = JobContext.newBuilder().setJobId(this.jobInstanceInfo.getJobId()).setJobInstanceId(jobInstanceId).setJobType(this.jobInstanceInfo.getJobType()).setContent(this.jobInstanceInfo.getContent()).setScheduleTime(this.jobInstanceInfo.getScheduleTime()).setDataTime(this.jobInstanceInfo.getDataTime()).setJobParameters(this.jobInstanceInfo.getParameters()).setInstanceParameters(this.jobInstanceInfo.getInstanceParameters()).setUser(this.jobInstanceInfo.getUser()).build();
                JobProcessor jobProcessor = JavaProcessorProfileUtil.getJavaProcessor(context.getContent());
                postResult = jobProcessor.postProcess(context);
            }
            catch (Throwable e) {
                LOGGER.error("", e);
            }
        }
        return postResult;
    }

    private void preProcess(JobInstanceInfo jobInstanceInfo) throws Exception {
        JobContext context = JobContext.newBuilder().setJobId(jobInstanceInfo.getJobId()).setJobInstanceId(jobInstanceInfo.getJobInstanceId()).setJobType(jobInstanceInfo.getJobType()).setContent(jobInstanceInfo.getContent()).setScheduleTime(jobInstanceInfo.getScheduleTime()).setDataTime(jobInstanceInfo.getDataTime()).setJobParameters(jobInstanceInfo.getParameters()).setInstanceParameters(jobInstanceInfo.getInstanceParameters()).setUser(jobInstanceInfo.getUser()).build();
        JobProcessor jobProcessor = JavaProcessorProfileUtil.getJavaProcessor(context.getContent());
        jobProcessor.preProcess(context);
    }

    @Override
    public void clear() {
        super.clear();
        this.worker2uniqueIdMap.clear();
        this.workerProgressMap.clear();
    }
}

