package com.alibaba.dts.client.executor.longtime.unit;

import com.alibaba.dts.client.executor.grid.unit.FlexibleThreadPoolExecutor;
import com.alibaba.dts.client.executor.job.context.ClientContextImpl;
import com.alibaba.dts.client.executor.longtime.LongTimePool;
import com.alibaba.dts.client.executor.longtime.processor.LongTimeTaskProcessor;
import com.alibaba.dts.client.executor.longtime.processor.PullProcessor;
import com.alibaba.dts.client.executor.longtime.processor.ReFillingProcessor;
import com.alibaba.dts.common.constants.Constants;
import com.alibaba.dts.common.domain.ExecutableTask;
import com.alibaba.dts.common.domain.result.Result;
import com.alibaba.dts.common.domain.store.TaskSnapshot;
import com.alibaba.dts.common.exception.InitException;
import com.alibaba.dts.common.logger.SchedulerXLoggerFactory;
import com.alibaba.dts.common.logger.innerlog.Logger;
import com.alibaba.dts.common.util.NamedThreadFactory;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.springframework.util.CollectionUtils;

/* loaded from: input_file:com/alibaba/dts/client/executor/longtime/unit/ExecutorUnit.class */
public class ExecutorUnit implements Constants {
    private static final Logger logger = SchedulerXLoggerFactory.getLogger((Class<?>) ExecutorUnit.class);
    private final ClientContextImpl clientContext;
    private ExecutableTask executableTask;
    private BlockingQueue<TaskSnapshot> releaseQueue;
    private final LongTimePool longTimePool;
    private volatile boolean releaseTaskFlag = false;
    private volatile boolean pullTaskFlag = false;
    private PullProcessor pullProcessor = null;
    private ReFillingProcessor reFillingProcessor = null;
    private BlockingQueue<TaskSnapshot> queue = null;
    private BlockingQueue<TaskSnapshot> completedqueue = null;
    private ExecutorService longTimeTaskProcessors = null;
    private ConcurrentHashMap<Long, TaskRunStatistic> taskRunStatisticMap = new ConcurrentHashMap<>();
    private final AtomicInteger threadCounter = new AtomicInteger();
    private ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1, new ThreadFactory() { // from class: com.alibaba.dts.client.executor.longtime.unit.ExecutorUnit.1
        @Override // java.util.concurrent.ThreadFactory
        public Thread newThread(Runnable runnable) {
            return new Thread(runnable, "DTS-LongTimeTaskStates-report-thread-" + ExecutorUnit.this.executableTask.getJob().getId() + "-" + ExecutorUnit.this.executableTask.getJob().getJobProcessor() + "-" + ExecutorUnit.this.executableTask.getJobInstanceSnapshot().getId());
        }
    });

    public ConcurrentHashMap<Long, TaskRunStatistic> getTaskRunStatisticMap() {
        return this.taskRunStatisticMap;
    }

    public ExecutorUnit(ClientContextImpl clientContextImpl, LongTimePool longTimePool, ExecutableTask executableTask) {
        this.releaseQueue = new LinkedBlockingQueue();
        this.clientContext = clientContextImpl;
        this.longTimePool = longTimePool;
        this.executableTask = executableTask;
        clientContextImpl.getClientConfig().getPageSize();
        Map<String, Integer> pageSizeMap = clientContextImpl.getClientConfig().getPageSizeMap();
        if (!CollectionUtils.isEmpty(pageSizeMap) && pageSizeMap.get(executableTask.getJob().getJobProcessor()) != null) {
            clientContextImpl.getClientConfig().checkPageSize(pageSizeMap.get(executableTask.getJob().getJobProcessor()).intValue());
        }
        this.executableTask.setLength(1);
        this.releaseQueue = new LinkedBlockingQueue(10000);
    }

    public boolean isPullTaskFlag() {
        return this.pullTaskFlag;
    }

    public void setPullTaskFlag(boolean z) {
        this.pullTaskFlag = z;
    }

    public boolean isReleaseTaskFlag() {
        return this.releaseTaskFlag;
    }

    public void setReleaseTaskFlag(boolean z) {
        this.releaseTaskFlag = z;
    }

    public boolean isExistsInTaskRunStatisticMap(Long l) {
        return this.taskRunStatisticMap.containsKey(l);
    }

    public void addTaskRunStatisticMap(Long l) {
        this.taskRunStatisticMap.put(l, new TaskRunStatistic(l.longValue()));
    }

    public void addTaskRunStatisticMap(Long l, int i) {
        this.taskRunStatisticMap.put(l, new TaskRunStatistic(l.longValue(), i));
    }

    public void updateTaskRunStatisticMap(Long l, Long l2) {
        try {
            TaskRunStatistic taskRunStatistic = this.taskRunStatisticMap.get(l);
            taskRunStatistic.setRuntimes(taskRunStatistic.getRuntimes() + 1);
            taskRunStatistic.setLastTimeConsuming(l2.longValue());
            taskRunStatistic.setLastRunTime(new Date());
            this.taskRunStatisticMap.put(l, taskRunStatistic);
        } catch (Throwable th) {
            logger.error("[LExecutorUnit]: updateTaskRunStatisticMap error, taskid:" + l + ", runtime:" + l2, th);
        }
    }

    public void updateTaskRunStatisticMap(Long l, Long l2, int i) {
        try {
            TaskRunStatistic taskRunStatistic = this.taskRunStatisticMap.get(l);
            taskRunStatistic.setRuntimes(taskRunStatistic.getRuntimes() + 1);
            taskRunStatistic.setLastTimeConsuming(l2.longValue());
            taskRunStatistic.setLastRunTime(new Date());
            taskRunStatistic.setProcessResult(i);
            this.taskRunStatisticMap.put(l, taskRunStatistic);
        } catch (Throwable th) {
            logger.error("[LExecutorUnit]: updateTaskRunStatisticMap error, taskid:" + l + ", runtime:" + l2 + ", lastProcessResult:" + i, th);
        }
    }

    public void deleteTaskRunStatisticMap(Long l) {
        try {
            this.taskRunStatisticMap.remove(l);
        } catch (Throwable th) {
            logger.error("[LExecutorUnit]: deleteTaskRunStatisticMap error, taskid:" + l, th);
        }
    }

    public String getTaskRunStatisticMapStr() {
        StringBuffer stringBuffer = new StringBuffer();
        stringBuffer.append("[");
        Iterator<TaskRunStatistic> it = this.taskRunStatisticMap.values().iterator();
        while (it.hasNext()) {
            stringBuffer.append("," + it.next().toString());
        }
        stringBuffer.append("]");
        return stringBuffer.toString();
    }

    public void init() throws InitException {
        this.pullProcessor = new PullProcessor(this.clientContext, this);
        this.reFillingProcessor = new ReFillingProcessor(this.clientContext, this);
        this.queue = new LinkedBlockingQueue(10000);
        this.completedqueue = new LinkedBlockingQueue(10000);
        this.pullProcessor.start();
        this.reFillingProcessor.start();
        initTaskProcessors();
        initStatesReportTimer();
    }

    public void initStatesReportTimer() throws InitException {
        try {
            this.executorService = Executors.newScheduledThreadPool(1, new ThreadFactory() { // from class: com.alibaba.dts.client.executor.longtime.unit.ExecutorUnit.2
                @Override // java.util.concurrent.ThreadFactory
                public Thread newThread(Runnable runnable) {
                    return new Thread(runnable, "DTS-LongTimeTaskStates-report-thread-" + ExecutorUnit.this.executableTask.getJob().getId() + "-" + ExecutorUnit.this.executableTask.getJob().getJobProcessor() + "-" + ExecutorUnit.this.executableTask.getJobInstanceSnapshot().getId());
                }
            });
            this.executorService.scheduleAtFixedRate(new StatesReportTimer(this, this.clientContext), 60000L, 120000L, TimeUnit.MILLISECONDS);
        } catch (Throwable th) {
            throw new InitException("[ExecutorUnit]: initStatesReportTimer error", th);
        }
    }

    @Deprecated
    public void activeInit() throws InitException {
    }

    private void initTaskProcessors() {
        try {
            int consumerThreads = this.clientContext.getClientConfig().getConsumerThreads();
            Map<String, Integer> consumerThreadsMap = this.clientContext.getClientConfig().getConsumerThreadsMap();
            if (!CollectionUtils.isEmpty(consumerThreadsMap) && consumerThreadsMap.get(this.executableTask.getJob().getJobProcessor()) != null) {
                consumerThreads = this.clientContext.getClientConfig().checkConsumerThreads(consumerThreadsMap.get(this.executableTask.getJob().getJobProcessor()).intValue());
            }
            if (this.executableTask.getRunThreads() > 0) {
                consumerThreads = this.executableTask.getRunThreads();
            }
            this.longTimeTaskProcessors = new FlexibleThreadPoolExecutor(consumerThreads, consumerThreads, 60L, TimeUnit.SECONDS, new LinkedBlockingDeque(), new NamedThreadFactory(getThreadName(this.executableTask)));
            for (int i = 0; i < consumerThreads; i++) {
                this.longTimeTaskProcessors.submit(new LongTimeTaskProcessor(this.clientContext, this, i, this.threadCounter));
            }
        } catch (Throwable th) {
            logger.error("[LExecutorUnit]: initTaskProcessors error", th);
        }
    }

    private String getThreadName(ExecutableTask executableTask) {
        return "DTSLongTimeTaskProcessor-" + executableTask.getJob().getId() + "-" + executableTask.getJob().getJobProcessor() + "-" + executableTask.getJobInstanceSnapshot().getId() + "-" + executableTask.getJobInstanceSnapshot().getFireTime() + "-" + new SimpleDateFormat("yyyyMMddHHmmssSSS").format(new Date()) + "-";
    }

    public void restartPull() {
        try {
            if (this.longTimeTaskProcessors == null || this.longTimeTaskProcessors.isShutdown() || this.longTimeTaskProcessors.isTerminated()) {
                initTaskProcessors();
            }
            if (this.pullProcessor == null || this.pullProcessor.isStop() || !this.pullProcessor.isAlive()) {
                this.pullProcessor = new PullProcessor(this.clientContext, this);
                this.pullProcessor.start();
            }
            logger.info("[LExecutorUnit]: restartPull start!");
        } catch (Throwable th) {
            logger.error("[LExecutorUnit]: restartPull error", th);
        }
    }

    public void releaseCompleteTask() {
        if (isPullTaskFlag()) {
            return;
        }
        try {
            try {
                setReleaseTaskFlag(true);
                BlockingQueue<TaskSnapshot> queue = getQueue();
                BlockingQueue<TaskSnapshot> completedqueue = getCompletedqueue();
                int size = queue.size();
                int size2 = completedqueue.size();
                int i = 0;
                while (!queue.isEmpty()) {
                    try {
                        TaskSnapshot poll = queue.poll();
                        if (poll != null) {
                            this.releaseQueue.put(poll);
                        }
                    } catch (Throwable th) {
                        logger.error("[LReleaseProcessor]: pullQueue error, instanceId:" + getExecutableTask().getJobInstanceSnapshot().getId(), th);
                    }
                }
                while (!completedqueue.isEmpty()) {
                    try {
                        TaskSnapshot poll2 = completedqueue.poll();
                        if (poll2 != null) {
                            this.releaseQueue.put(poll2);
                        }
                    } catch (Throwable th2) {
                        logger.error("[LReleaseProcessor]: pullAndPut error, instanceId:" + getExecutableTask().getJobInstanceSnapshot().getId(), th2);
                    }
                }
                int size3 = this.releaseQueue.size();
                while (!this.releaseQueue.isEmpty()) {
                    try {
                        Result<Boolean> result = null;
                        TaskSnapshot poll3 = this.releaseQueue.poll();
                        if (poll3 != null) {
                            result = this.clientContext.getExecutor().acknowledgeRes(poll3, 7, 0);
                        }
                        if (result != null && result.getData().booleanValue()) {
                            deleteTaskRunStatisticMap(Long.valueOf(poll3.getId()));
                            i++;
                            logger.info("[LReleaseProcessor] release task, instanceid:" + poll3.getJobInstanceId() + ",taskid(db):" + poll3.getId());
                        } else if (poll3 != null) {
                            queue.put(poll3);
                            logger.warn("[LReleaseProcessor] release task failur, reenter queue, instanceid:" + poll3.getJobInstanceId() + ",taskid(db):" + poll3.getId() + ",ackResult:" + result.toString());
                        }
                    } catch (Throwable th3) {
                        logger.error("[LReleaseProcessor]: pullCompleteQueue error, instanceId:" + getExecutableTask().getJobInstanceSnapshot().getId(), th3);
                    }
                }
                logger.info("[LReleaseProcessor]: releaseQueue end, queueTotal:" + size + ", completeQueueTotal:" + size2 + ", releaseQueueTotal:" + size3 + ", releaseQueueSuccess:" + i);
                setReleaseTaskFlag(false);
            } catch (Throwable th4) {
                logger.error("[LReleaseProcessor]: run error, instanceId:" + getExecutableTask().getJobInstanceSnapshot().getId(), th4);
                setReleaseTaskFlag(false);
            }
        } catch (Throwable th5) {
            setReleaseTaskFlag(false);
            throw th5;
        }
    }

    public void clear() {
        try {
            int size = this.queue.size();
            int size2 = this.completedqueue.size();
            int size3 = this.releaseQueue.size();
            int size4 = this.taskRunStatisticMap.size();
            this.queue.clear();
            this.completedqueue.clear();
            this.releaseQueue.clear();
            this.taskRunStatisticMap.clear();
            logger.info("[LExecutorUnit]: clear success, queueTotal:" + size + ", completeQueueTotal:" + size2 + ", releaseQueueTotal:" + size3 + ", taskRunStatisticMapTotal:" + size4);
        } catch (Throwable th) {
            logger.error("[LExecutorUnit]: clear error, instanceId:" + this.executableTask.getJobInstanceSnapshot().getId(), th);
        }
    }

    public void stopTask() {
        try {
            this.executorService.shutdownNow();
            logger.warn("executorService  shutdown now.");
            this.pullProcessor.setStop(true);
            this.reFillingProcessor.setStop(true);
            this.longTimeTaskProcessors.shutdownNow();
            clear();
            logger.info("[LExecutorUnit]: stopTask end");
        } catch (Throwable th) {
            logger.error("[LExecutorUnit]: stopTask error, instanceId:" + this.executableTask.getJobInstanceSnapshot().getId(), th);
        }
    }

    public void forceStopTask() {
        try {
            try {
                this.pullProcessor.stop();
            } catch (Throwable th) {
                logger.error("[LExecutorUnit]: forceStopTask pullProcessor error, instanceId:" + this.executableTask.getJobInstanceSnapshot().getId(), th);
            }
            try {
                this.reFillingProcessor.stop();
            } catch (Throwable th2) {
                logger.error("[LExecutorUnit]: forceStopTask pullProcessor error, instanceId:" + this.executableTask.getJobInstanceSnapshot().getId(), th2);
            }
            clear();
            this.executorService.shutdownNow();
            this.longTimeTaskProcessors.shutdownNow();
            logger.info("[LExecutorUnit]: stopTask end");
        } catch (Throwable th3) {
            logger.error("[LExecutorUnit]: forceStopTask error, instanceId:" + this.executableTask.getJobInstanceSnapshot().getId(), th3);
        }
    }

    public boolean isExecutorStop() {
        return this.queue.isEmpty() && this.threadCounter.get() == 0;
    }

    public boolean offer(TaskSnapshot taskSnapshot) {
        boolean z = false;
        try {
            z = this.queue.offer(taskSnapshot, 5000L, TimeUnit.MILLISECONDS);
            logger.info("[LExecutorUnit]: offer task,instanceId:" + taskSnapshot.getJobInstanceId() + ",taskid:" + taskSnapshot.getId() + ",result:" + z);
        } catch (Throwable th) {
            logger.error("[LExecutorUnit]: offer error, jobInstanceId:" + taskSnapshot.getJobInstanceId() + ", id:" + taskSnapshot.getId(), th);
        }
        return z;
    }

    public void taskPostProcess(TaskSnapshot taskSnapshot) {
        try {
            this.completedqueue.add(taskSnapshot);
            logger.info("[LExecutorUnit]: taskPostProcess,taskid:" + taskSnapshot.getId());
        } catch (Throwable th) {
            logger.error("[LPullProcessor]: put error, instanceId:" + taskSnapshot.getJobInstanceId() + ", id:" + taskSnapshot.getId(), th);
        }
    }

    public BlockingQueue<TaskSnapshot> getCompletedqueue() {
        return this.completedqueue;
    }

    public ExecutableTask getExecutableTask() {
        return this.executableTask;
    }

    public BlockingQueue<TaskSnapshot> getQueue() {
        return this.queue;
    }

    public AtomicInteger getThreadCounter() {
        return this.threadCounter;
    }

    public LongTimePool getLongTimePool() {
        return this.longTimePool;
    }

    public String toString() {
        return "ExecutorUnit [executableTask=" + this.executableTask + "]";
    }

    public ReFillingProcessor getReFillingProcessor() {
        return this.reFillingProcessor;
    }
}
