package com.alibaba.dts.client.executor.grid;

import com.alibaba.dts.client.executor.grid.flowcontrol.FlowControlParameterWatcher;
import com.alibaba.dts.client.executor.grid.queue.TaskEvent;
import com.alibaba.dts.client.executor.grid.queue.send.SendManager;
import com.alibaba.dts.client.executor.grid.unit.FlexibleThreadPoolExecutor;
import com.alibaba.dts.client.executor.job.context.ClientContextImpl;
import com.alibaba.dts.client.executor.job.context.JobContext;
import com.alibaba.dts.common.constants.Constants;
import com.alibaba.dts.common.domain.ExecutableTask;
import com.alibaba.dts.common.domain.remoting.RemoteMachine;
import com.alibaba.dts.common.domain.result.Result;
import com.alibaba.dts.common.domain.result.ResultCode;
import com.alibaba.dts.common.domain.store.TaskSnapshot;
import com.alibaba.dts.common.exception.AccessException;
import com.alibaba.dts.common.exception.InitException;
import com.alibaba.dts.common.logger.SchedulerXLoggerFactory;
import com.alibaba.dts.common.logger.innerlog.Logger;
import com.alibaba.dts.common.util.BytesUtil;
import com.alibaba.dts.common.util.BytesUtil4Client;
import com.alibaba.dts.common.util.NamedThreadFactory;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;

/* loaded from: input_file:com/alibaba/dts/client/executor/grid/GridTaskSender.class */
public class GridTaskSender {
    private static final Logger logger = SchedulerXLoggerFactory.getLogger((Class<?>) GridTaskSender.class);
    private long maxBodySize;
    private final ClientContextImpl clientContext;
    private SendManager sendManager = new SendManager();
    private long flowControlCountGate = 500000;
    private ConcurrentHashMap<Long, BlockingQueue<List<TaskEvent>>> tasksForInsertBufferMap = new ConcurrentHashMap<>();
    private ConcurrentHashMap<Long, Boolean> tasksForInsertBufferMapFlag = new ConcurrentHashMap<>();
    private ExecutorService reSendExecutorService = new FlexibleThreadPoolExecutor(8, 16, 60, TimeUnit.SECONDS, new LinkedBlockingDeque(), new NamedThreadFactory("SchedulerX-Tasks-ReSend-Thread-"));

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/alibaba/dts/client/executor/grid/GridTaskSender$TaskForInsertConsumer.class */
    public class TaskForInsertConsumer {
        private ExecutorService bossThreadPool = Executors.newSingleThreadExecutor(new NamedThreadFactory("SchedulerX-TaskForInsertConsumer-Boss-"));
        private ExecutorService workerThreadPool = Executors.newFixedThreadPool(16, new NamedThreadFactory("SchedulerX-TaskForInsertConsumer-Worker-"));

        TaskForInsertConsumer() {
        }

        public void init() throws InitException {
            try {
                this.bossThreadPool.submit(new Runnable() { // from class: com.alibaba.dts.client.executor.grid.GridTaskSender.TaskForInsertConsumer.1
                    @Override // java.lang.Runnable
                    public void run() {
                        while (true) {
                            try {
                                for (Map.Entry entry : GridTaskSender.this.tasksForInsertBufferMap.entrySet()) {
                                    final Long l = (Long) entry.getKey();
                                    final BlockingQueue blockingQueue = (BlockingQueue) entry.getValue();
                                    if (!GridTaskSender.this.tasksForInsertBufferMapFlag.containsKey(l)) {
                                        GridTaskSender.this.tasksForInsertBufferMapFlag.put(l, true);
                                        TaskForInsertConsumer.this.workerThreadPool.submit(new Runnable() { // from class: com.alibaba.dts.client.executor.grid.GridTaskSender.TaskForInsertConsumer.1.1
                                            @Override // java.lang.Runnable
                                            public void run() {
                                                while (GridTaskSender.this.tasksForInsertBufferMap.containsKey(l)) {
                                                    try {
                                                        List<TaskEvent> list = (List) blockingQueue.poll(10L, TimeUnit.SECONDS);
                                                        if (list != null && list.size() > 0) {
                                                            while (FlowControlParameterWatcher.dbTasksCount.get() >= GridTaskSender.this.flowControlCountGate) {
                                                                TimeUnit.SECONDS.sleep(5L);
                                                            }
                                                            TaskEvent taskEvent = list.get(0);
                                                            GridTaskSender.this.persistTasks(list, l.longValue());
                                                            GridTaskSender.this.sendManager.putTasksToRouteQueue(list, l.longValue());
                                                            GridTaskSender.logger.debug("[TaskForInsertConsumer]: dispatchTaskList, executableTask has been put to schedulerXSendQueue, jobId={}, jobInstanceId={}, size={}", Long.valueOf(taskEvent.getExecutableTask().getJob().getId()), Long.valueOf(taskEvent.getExecutableTask().getJobInstanceSnapshot().getId()), Integer.valueOf(list.size()));
                                                        }
                                                    } catch (Throwable th) {
                                                        GridTaskSender.logger.error("TaskForInsertConsumer error", th);
                                                    }
                                                }
                                            }
                                        });
                                    }
                                }
                                TimeUnit.SECONDS.sleep(1L);
                            } catch (Throwable th) {
                                GridTaskSender.logger.error("TaskForInsertConsumer error", th);
                            }
                        }
                    }
                });
            } catch (Throwable th) {
                throw new InitException(th);
            }
        }
    }

    public GridTaskSender(ClientContextImpl clientContextImpl) {
        this.clientContext = clientContextImpl;
    }

    private void initSchedulerXSendQueue() throws InitException {
        if (this.clientContext == null) {
            logger.equals("[GridTaskSender] clientContext is null!");
            throw new InitException("[GridTaskSender] clientContext is null!");
        }
        this.sendManager.init(this.clientContext);
    }

    public void init() throws InitException {
        try {
            this.maxBodySize = this.clientContext.getClientConfig().getMaxBodySize();
            initSchedulerXSendQueue();
            initTasksForInsertBufferConsumer();
            logger.info("[GridTaskSender]: init over");
        } catch (Throwable th) {
            logger.error("[GridTaskSender]: init error:", th);
        }
    }

    private void initTasksForInsertBufferConsumer() throws InitException {
        new TaskForInsertConsumer().init();
    }

    public Result<Boolean> dispatchTaskList(List<? extends Object> list, String str, JobContext jobContext, int i) {
        long id = jobContext.getJob().getId();
        long id2 = jobContext.getJobInstanceSnapshot().getId();
        logger.info("{} tasks dispatched, jobId={}, jobInstanceId={}", Integer.valueOf(list.size()), Long.valueOf(id), Long.valueOf(id2));
        if (this.clientContext.getGridJobManager().containsInterruptedJobInstance(jobContext.getJobInstanceSnapshot().getId())) {
            logger.warn("[GridTaskSender]: dispatchTaskList is interrupted forcedly:,jobId=" + id + ",jobInstanceId=" + id2 + ",total tasks=" + list.size());
            return new Result<>(false);
        }
        boolean z = true;
        ArrayList arrayList = new ArrayList();
        List<RemoteMachine> list2 = null;
        int i2 = 10;
        while (i2 > 0) {
            try {
                list2 = this.clientContext.getNodeRemoting().getNodes(this.clientContext.getClientConfig().getGroupId(), id, jobContext.getJob().getType());
                if (list2 != null && !list2.isEmpty()) {
                    break;
                }
                logger.warn("there is no working nodes existed, groupId={}, jobId={}, so it will retry {} times", this.clientContext.getClientConfig().getGroupId(), Long.valueOf(id), Integer.valueOf(i2));
                i2--;
                if (i2 <= 0) {
                    throw new RuntimeException("there is no working nodes existed, groupId={}, jobId={}, and it has retried for 10 times");
                }
                TimeUnit.SECONDS.sleep(10L);
            } catch (Throwable th) {
                z = false;
                logger.error("[GridTaskSender]: dispatchTaskList error,", th);
            }
        }
        this.sendManager.resetRoutesMachines(jobContext.getJob().getId(), list2);
        long j = 0;
        long currentTimeMillis = System.currentTimeMillis();
        for (int i3 = 0; i3 < list.size(); i3++) {
            if (isInterruptedInstance(jobContext.getJobInstanceSnapshot().getId())) {
                logger.warn("[GridTaskSender]: dispatchTaskList is interrupted forcedly:,jobId:" + jobContext.getJob().getId() + ",jobInstanceId:" + jobContext.getJobInstanceSnapshot().getId() + ",total tasks:" + list.size() + ",put schedulerXSendQueue tasks:" + j);
                Result<Boolean> result = new Result<>(false);
                result.setResultCode(ResultCode.TASK_SEND_INTERRUPT);
                return result;
            }
            long currentTimeMillis2 = System.currentTimeMillis();
            if (i == 1) {
                ExecutableTask executableTask = new ExecutableTask(jobContext.getJob(), jobContext.getJobInstanceSnapshot());
                fillTaskSnapshot(executableTask, list.get(i3), str);
                TaskEvent taskEvent = new TaskEvent();
                taskEvent.setExecutableTask(executableTask);
                taskEvent.setTask(list.get(i3));
                if ((i3 + 1) % Constants.MAX_TASKLIST_SIZE == 0) {
                    arrayList.add(taskEvent);
                    this.clientContext.getFlowControlChain().control(jobContext);
                    persistTasks(arrayList, id2);
                    this.sendManager.putTasksToRouteQueue(arrayList, id2);
                    logger.debug("[GridTaskSender]: dispatchTaskList,executableTask has been put to schedulerXSendQueue, jobId={}, jobInstanceId={}, taskName={}, consumptionTime={}, size={}", Long.valueOf(executableTask.getJob().getId()), Long.valueOf(executableTask.getJobInstanceSnapshot().getId()), str, Long.valueOf(System.currentTimeMillis() - currentTimeMillis2), Integer.valueOf(arrayList.size()));
                    arrayList.clear();
                } else {
                    arrayList.add(taskEvent);
                }
                j++;
            } else if (i == 2) {
                TaskSnapshot taskSnapshot = (TaskSnapshot) list.get(i3);
                ExecutableTask executableTask2 = new ExecutableTask(jobContext.getJob(), jobContext.getJobInstanceSnapshot());
                executableTask2.setTaskSnapshot(taskSnapshot);
                executableTask2.getTaskSnapshot().setReSent(true);
                executableTask2.setCompensation(true);
                TaskEvent taskEvent2 = new TaskEvent();
                taskEvent2.setExecutableTask(executableTask2);
                taskEvent2.setTask(getTaskObject(taskSnapshot));
                this.sendManager.putSingleTaskToRouteQueue(taskEvent2);
                j++;
            } else if (i == 3) {
                TaskSnapshot taskSnapshot2 = (TaskSnapshot) list.get(i3);
                ExecutableTask executableTask3 = new ExecutableTask(jobContext.getJob(), jobContext.getJobInstanceSnapshot());
                executableTask3.setTaskSnapshot(taskSnapshot2);
                TaskEvent taskEvent3 = new TaskEvent();
                taskEvent3.setExecutableTask(executableTask3);
                taskEvent3.setTask(getTaskObject(taskSnapshot2));
                this.sendManager.putSingleTaskToRouteQueue(taskEvent3);
                j++;
            }
        }
        if (i == 1 && arrayList.size() > 0) {
            this.clientContext.getFlowControlChain().control(jobContext);
            persistTasks(arrayList, id2);
            this.sendManager.putTasksToRouteQueue(arrayList, id2);
            arrayList.clear();
        }
        logger.debug("[GridTaskSender]: dispatchTaskList,executableTask has been put to schedulerXSendQueue, jobId={}, jobInstanceId={}, total={}, put schedulerXSendQueue tasks={}, taskName={}, consumptionTime={}", Long.valueOf(jobContext.getJob().getId()), Long.valueOf(jobContext.getJobInstanceSnapshot().getId()), Integer.valueOf(list.size()), Long.valueOf(j), str, Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
        return new Result<>(Boolean.valueOf(z));
    }

    private int calculateBatchSize(List<? extends Object> list, List<RemoteMachine> list2) {
        int taskInsertBatchSize = this.clientContext.getNodeConfig().getTaskInsertBatchSize();
        if (taskInsertBatchSize != 0) {
            return taskInsertBatchSize;
        }
        int size = list.size() < list2.size() * 2 ? 1 : list.size() / (list2.size() * 2);
        if (size > 3000) {
            size = 3000;
        }
        return size;
    }

    public Result<Boolean> dispatchTaskList(List<? extends Object> list, String str, JobContext jobContext) {
        return dispatchTaskList(list, str, jobContext, 1);
    }

    public Result<Boolean> dispatchCompensateTaskList(List<TaskSnapshot> list, JobContext jobContext) {
        return dispatchTaskList(list, null, jobContext, 2);
    }

    public Result<Boolean> dispatchRetryTaskList(List<TaskSnapshot> list, JobContext jobContext) {
        return dispatchTaskList(list, null, jobContext, 3);
    }

    private void fillTaskSnapshot(ExecutableTask executableTask, Object obj, String str) {
        byte[] bArr = null;
        try {
            bArr = BytesUtil4Client.objectToBytes(obj);
        } catch (Throwable th) {
            logger.error("[GridTaskSender]: fillTaskSnapshot objectToBytes error, taskName:" + str + ", task:" + obj, th);
        }
        if (BytesUtil4Client.isEmpty(bArr)) {
            logger.error("[GridTaskSender]: fillTaskSnapshot objectToBytes body is empty, taskName:" + str + ", task:" + obj);
            return;
        }
        if (bArr.length > this.maxBodySize) {
            throw new RuntimeException("[GridTaskSender]: single task is too large, more than 64KB");
        }
        TaskSnapshot taskSnapshot = new TaskSnapshot();
        taskSnapshot.setGmtCreate(new Date());
        taskSnapshot.setGmtModified(new Date());
        taskSnapshot.setJobInstanceId(executableTask.getJobInstanceSnapshot().getId());
        taskSnapshot.setJobProcessor(executableTask.getJob().getJobProcessor());
        taskSnapshot.setBody(bArr);
        taskSnapshot.setStatus(0);
        taskSnapshot.setTaskName(str);
        taskSnapshot.setRetryCount(0);
        executableTask.setTaskSnapshot(taskSnapshot);
    }

    protected Object getTaskObject(TaskSnapshot taskSnapshot) {
        Object obj = null;
        if (Constants.DEFAULT_ROOT_LEVEL_TASK_NAME.equals(taskSnapshot.getTaskName())) {
            if (BytesUtil.isEmpty(taskSnapshot.getBody())) {
                logger.error("[GridTaskSender]: BytesUtil setTask bytesToObject error, body is empty, instanceId:" + taskSnapshot.getJobInstanceId() + ", id:" + taskSnapshot.getId());
                return null;
            }
            try {
                obj = BytesUtil.bytesToObject(taskSnapshot.getBody());
            } catch (Throwable th) {
                logger.error("[GridTaskSender]: BytesUtil setTask bytesToObject error, instanceId:" + taskSnapshot.getJobInstanceId() + ", id:" + taskSnapshot.getId(), th);
            }
        } else {
            if (BytesUtil4Client.isEmpty(taskSnapshot.getBody())) {
                logger.error("[GridTaskSender]: BytesUtil4Client setTask bytesToObject error, body is empty, instanceId:" + taskSnapshot.getJobInstanceId() + ", id:" + taskSnapshot.getId());
                return null;
            }
            try {
                obj = BytesUtil4Client.bytesToObject(taskSnapshot.getBody());
            } catch (Throwable th2) {
                logger.error("[GridTaskSender]: BytesUtil4Client setTask bytesToObject error, instanceId:" + taskSnapshot.getJobInstanceId() + ", id:" + taskSnapshot.getId(), th2);
            }
        }
        return obj;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void persistTasks(List<TaskEvent> list, long j) {
        if (list.isEmpty()) {
            return;
        }
        ArrayList arrayList = new ArrayList();
        if (this.clientContext.getGridJobManager().containsInterruptedJobInstance(j)) {
            return;
        }
        Iterator<TaskEvent> it = list.iterator();
        while (it.hasNext()) {
            TaskSnapshot taskSnapshot = it.next().getExecutableTask().getTaskSnapshot();
            taskSnapshot.setId(this.clientContext.getIdWorker().nextId());
            arrayList.add(taskSnapshot);
        }
        try {
            this.clientContext.getStore().getTaskSnapshotDao().insertBatch(arrayList);
        } catch (AccessException e) {
            logger.error("failed to insert tasksnapshots batch. jobInstanceId=" + j, (Throwable) e);
        }
    }

    public void addInterruptedJobInstance(long j) {
        try {
            this.clientContext.getGridJobManager().addInterruptedJobInstance(j);
            logger.info("[GridTaskSender]: addInterceptInstance instanceId:" + j);
        } catch (Throwable th) {
            logger.error("[GridTaskSender]: addInterceptInstance error,instanceId:" + j, th);
        }
    }

    public void removeInterruptedJobInstance(long j) {
        try {
            this.clientContext.getGridJobManager().removeInterruptedJobInstance(j);
            logger.info("[GridTaskSender]: removeInterceptInstance instanceId:" + j);
        } catch (Throwable th) {
            logger.error("[GridTaskSender]: removeInterceptInstance error,instanceId:" + j, th);
        }
    }

    public boolean isInterruptedInstance(long j) {
        return this.clientContext.getGridJobManager().containsInterruptedJobInstance(j);
    }

    public ExecutorService getReSendExecutorService() {
        return this.reSendExecutorService;
    }

    public ConcurrentHashMap<Long, BlockingQueue<List<TaskEvent>>> getTasksForInsertBufferMap() {
        return this.tasksForInsertBufferMap;
    }

    public void clearInsertBuffer(long j) {
        this.tasksForInsertBufferMap.remove(Long.valueOf(j));
        this.tasksForInsertBufferMapFlag.remove(Long.valueOf(j));
    }

    public SendManager getSendManager() {
        return this.sendManager;
    }
}
