/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.schedulerx.worker.processor;

import akka.actor.ActorSelection;
import com.alibaba.schedulerx.common.util.ConfigUtil;
import com.alibaba.schedulerx.common.util.ExceptionUtil;
import com.alibaba.schedulerx.common.util.HessianUtil;
import com.alibaba.schedulerx.protocol.Worker;
import com.alibaba.schedulerx.protocol.utils.FutureUtils;
import com.alibaba.schedulerx.shade.com.google.common.collect.Lists;
import com.alibaba.schedulerx.shade.org.apache.commons.collections.CollectionUtils;
import com.alibaba.schedulerx.worker.SchedulerxWorker;
import com.alibaba.schedulerx.worker.container.ContainerFactory;
import com.alibaba.schedulerx.worker.domain.JobContext;
import com.alibaba.schedulerx.worker.log.LogFactory;
import com.alibaba.schedulerx.worker.log.Logger;
import com.alibaba.schedulerx.worker.logcollector.LogCollector;
import com.alibaba.schedulerx.worker.logcollector.LogCollectorFactory;
import com.alibaba.schedulerx.worker.processor.JavaProcessor;
import com.alibaba.schedulerx.worker.processor.ProcessResult;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public abstract class MapJobProcessor
extends JavaProcessor {
    private LogCollector logCollector = LogCollectorFactory.get();
    private static final Logger LOGGER = LogFactory.getLogger(MapJobProcessor.class);

    public ProcessResult map(List<? extends Object> taskList, String taskName) {
        ProcessResult result = new ProcessResult(false);
        JobContext context = ContainerFactory.getContainerPool().getContext();
        ActorSelection masterAkkaSelection = SchedulerxWorker.actorSystem.actorSelection(context.getInstanceMasterActorPath());
        if (masterAkkaSelection == null) {
            String errMsg = "get taskMaster akka path error, path=" + context.getInstanceMasterActorPath();
            LOGGER.error(errMsg);
            result.setResult(errMsg);
            return result;
        }
        if (CollectionUtils.isEmpty(taskList)) {
            result.setResult("task list is empty");
            return result;
        }
        int batchSize = 3000;
        int size = taskList.size();
        LOGGER.info("map task list, jobInstanceId={}, taskName={}, size={}, batchSize={}", context.getJobInstanceId(), taskName, size, batchSize);
        int quotient = size / batchSize;
        int remainder = size % batchSize;
        int batchNumber = remainder > 0 ? quotient + 1 : quotient;
        ArrayList<Worker.WorkerMapTaskRequest.Builder> builders = Lists.newArrayList();
        for (int i = 0; i < batchNumber; ++i) {
            builders.add(Worker.WorkerMapTaskRequest.newBuilder());
        }
        int position = 0;
        int maxTaskBodySize = ConfigUtil.getWorkerConfig().getInt("task.body.size.max", 65536);
        try {
            for (Object object : taskList) {
                int batchIdx = position++ / batchSize;
                byte[] taskBody = HessianUtil.toBytes(object);
                if (taskBody.length > maxTaskBodySize) {
                    throw new IOException("taskBody size more than " + maxTaskBodySize + "B!");
                }
                ((Worker.WorkerMapTaskRequest.Builder)builders.get(batchIdx)).addTaskBody(ByteString.copyFrom((byte[])taskBody));
            }
            for (Worker.WorkerMapTaskRequest.Builder builder : builders) {
                builder.setJobId(context.getJobId());
                builder.setJobInstanceId(context.getJobInstanceId());
                builder.setTaskId(context.getTaskId());
                builder.setTaskName(taskName);
                Worker.WorkerMapTaskResponse response = (Worker.WorkerMapTaskResponse)FutureUtils.awaitResult((ActorSelection)masterAkkaSelection, (Object)builder.build(), (long)30L);
                if (!response.getSuccess()) {
                    LOGGER.error(response.getMessage());
                    this.logCollector.collect(context.getUniqueId(), response.getMessage());
                    result.setResult(response.getMessage());
                    return result;
                }
                if (!response.hasOverload() || !response.getOverload()) continue;
                LOGGER.warn("Task Master is busy, sleeping a while {}s...", 10);
                Thread.sleep(10000L);
            }
            result.setStatus(true);
        }
        catch (Throwable e) {
            LOGGER.error("", e);
            this.logCollector.collect(context.getUniqueId(), ExceptionUtil.getTrace(e));
            result.setResult(ExceptionUtil.getMessage(e));
        }
        return result;
    }

    protected boolean isRootTask(JobContext context) {
        return context.getTaskName().equals("MAP_TASK_ROOT");
    }
}

