package com.alibaba.dts.client.service;

import com.alibaba.dts.client.executor.grid.queue.receive.TaskReceiveHandler;
import com.alibaba.dts.client.executor.grid.unit.FlexibleThreadPoolExecutor;
import com.alibaba.dts.client.executor.job.context.ClientContextImpl;
import com.alibaba.dts.common.context.InvocationContext;
import com.alibaba.dts.common.domain.ExecutableTask;
import com.alibaba.dts.common.domain.remoting.RemoteMachine;
import com.alibaba.dts.common.domain.result.Result;
import com.alibaba.dts.common.domain.result.ResultCode;
import com.alibaba.dts.common.domain.store.ExecutionCounter;
import com.alibaba.dts.common.domain.store.TaskSnapshot;
import com.alibaba.dts.common.exception.AccessException;
import com.alibaba.dts.common.logger.SchedulerXLoggerFactory;
import com.alibaba.dts.common.logger.innerlog.Logger;
import com.alibaba.dts.common.service.NodeServerService;
import com.alibaba.dts.common.util.NamedThreadFactory;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.springframework.util.CollectionUtils;

/* loaded from: input_file:com/alibaba/dts/client/service/NodeServerServiceImpl.class */
public class NodeServerServiceImpl implements NodeServerService {
    private static final Logger logger = SchedulerXLoggerFactory.getLogger((Class<?>) NodeServerServiceImpl.class);
    private ClientContextImpl clientContext;
    private BlockingQueue<ExecutableTask> taskReceiveBuffer;
    TaskReceiveHandler taskReceiveHandler;
    private int bufferSize;
    private final ConcurrentHashMap<String, AtomicInteger> logTable = new ConcurrentHashMap<>();
    private ConcurrentHashMap<Long, ExecutorService> executorServiceMap = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<String, RemoteMachine> connectToNodes = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<String, RemoteMachine> connectFromNodes = new ConcurrentHashMap<>();

    public NodeServerServiceImpl(ClientContextImpl clientContextImpl) {
        this.clientContext = clientContextImpl;
    }

    public void init() {
        this.bufferSize = this.clientContext.nodeConfig.getReceiveBufferSize();
        this.taskReceiveBuffer = new ArrayBlockingQueue(this.bufferSize);
        this.taskReceiveHandler = new TaskReceiveHandler(this.clientContext, this.executorServiceMap);
        this.taskReceiveHandler.listen(this.taskReceiveBuffer);
    }

    @Override // com.alibaba.dts.common.service.NodeServerService
    public Result<Boolean> connect() {
        return new Result<>(true, ResultCode.SUCCESS);
    }

    @Override // com.alibaba.dts.common.service.NodeServerService
    public Result<Boolean> receiveTasks(ExecutableTask executableTask) {
        try {
            Long valueOf = Long.valueOf(executableTask.getJobInstanceSnapshot().getId());
            if (this.clientContext.getGridJobManager().containsInterruptedJobInstance(valueOf.longValue())) {
                logger.warn("[NodeServerService]: receiveTasks force interrupt:,jobId:" + executableTask.getJob().getId() + ",jobInstanceId:" + executableTask.getJobInstanceSnapshot().getId() + ",total tasks:" + executableTask.getTaskSnapshotList().size());
                return new Result<>(true, ResultCode.FAILURE);
            }
            if (this.executorServiceMap.get(valueOf) == null) {
                int consumerThreads = this.clientContext.getClientConfig().getConsumerThreads();
                Map<String, Integer> consumerThreadsMap = this.clientContext.getClientConfig().getConsumerThreadsMap();
                if (!CollectionUtils.isEmpty(consumerThreadsMap) && consumerThreadsMap.get(executableTask.getJob().getJobProcessor()) != null) {
                    consumerThreads = this.clientContext.getClientConfig().checkConsumerThreads(consumerThreadsMap.get(executableTask.getJob().getJobProcessor()).intValue());
                }
                FlexibleThreadPoolExecutor flexibleThreadPoolExecutor = new FlexibleThreadPoolExecutor(consumerThreads, consumerThreads, 15L, TimeUnit.SECONDS, new LinkedBlockingQueue(), new NamedThreadFactory("SchedulerX-Grid-Task-Processor_" + executableTask.getJob().getId() + "_" + valueOf + "_" + executableTask.getJobInstanceSnapshot().getFireTime() + "#"));
                if (this.executorServiceMap.putIfAbsent(valueOf, flexibleThreadPoolExecutor) != null) {
                    flexibleThreadPoolExecutor.shutdownNow();
                }
            }
            return this.taskReceiveBuffer.offer(executableTask) ? new Result<>(true, ResultCode.SUCCESS) : new Result<>(false, ResultCode.NODE_RECEIVE_QUEUE_NOT_AVAILABLE);
        } catch (Exception e) {
            logger.error("Job接收错误, {}", executableTask, e);
            return new Result<>(false);
        }
    }

    @Override // com.alibaba.dts.common.service.NodeServerService
    public Result<Boolean> acknowledge(TaskSnapshot taskSnapshot) {
        try {
            RemoteMachine acquireRemoteMachine = InvocationContext.acquireRemoteMachine();
            if (taskSnapshot.getStatus() == 3 || (taskSnapshot.getStatus() == 4 && taskSnapshot.getRetryCount() <= 0)) {
                if (this.clientContext.getStore().getTaskSnapshotDao().delete(taskSnapshot) <= 0) {
                    return new Result<>(true);
                }
            } else if (this.clientContext.getStore().getTaskSnapshotDao().taskSnapshotAck(taskSnapshot) <= 0) {
                return new Result<>(true);
            }
            int status = taskSnapshot.getStatus();
            Long valueOf = Long.valueOf(taskSnapshot.getJobInstanceId());
            ConcurrentHashMap<String, ConcurrentHashMap<String, ExecutionCounter>> concurrentHashMap = this.clientContext.getExecutionCounterTable().get(valueOf);
            if (concurrentHashMap == null) {
                concurrentHashMap = new ConcurrentHashMap<>();
                ConcurrentHashMap<String, ConcurrentHashMap<String, ExecutionCounter>> putIfAbsent = this.clientContext.getExecutionCounterTable().putIfAbsent(valueOf, concurrentHashMap);
                if (putIfAbsent != null) {
                    concurrentHashMap = putIfAbsent;
                }
            }
            String str = acquireRemoteMachine.getRemoteAddress().substring(0, acquireRemoteMachine.getRemoteAddress().indexOf(58) + 1) + acquireRemoteMachine.getNodeListenPort();
            ConcurrentHashMap<String, ExecutionCounter> concurrentHashMap2 = concurrentHashMap.get(str);
            if (concurrentHashMap2 == null) {
                concurrentHashMap2 = new ConcurrentHashMap<>();
                ConcurrentHashMap<String, ExecutionCounter> putIfAbsent2 = concurrentHashMap.putIfAbsent(str, concurrentHashMap2);
                if (putIfAbsent2 != null) {
                    concurrentHashMap2 = putIfAbsent2;
                }
            }
            String taskName = taskSnapshot.getTaskName();
            ExecutionCounter executionCounter = concurrentHashMap2.get(taskName);
            if (executionCounter == null) {
                executionCounter = new ExecutionCounter();
                executionCounter.setReceiveNode(str);
                executionCounter.setTaskName(taskName);
                ExecutionCounter putIfAbsent3 = concurrentHashMap2.putIfAbsent(taskName, executionCounter);
                if (putIfAbsent3 != null) {
                    executionCounter = putIfAbsent3;
                }
            }
            if (status == 3) {
                executionCounter.getSuccessCounter().getAndIncrement();
                executionCounter.getQueuedCounter().decrementAndGet();
            } else {
                executionCounter.getFailCounter().getAndIncrement();
                executionCounter.getQueuedCounter().decrementAndGet();
            }
            return new Result<>(true);
        } catch (Throwable th) {
            logger.error("Task snapshot ack failed, {}", taskSnapshot, th);
            return new Result<>(false);
        }
    }

    public boolean stopTask(long j, long j2) {
        try {
            this.clientContext.getGridJobManager().addInterruptedJobInstance(j2);
            this.clientContext.getGridTaskSender().clearInsertBuffer(j2);
            this.clientContext.getExecutor().doGridJobCleanTaskForStop(j2);
            fixDispatchedTasksStatus(j2);
            return true;
        } catch (Throwable th) {
            logger.error("failed to stop job, id = {}, jonInstanceId = {}", Long.valueOf(j), Long.valueOf(j2), th);
            return false;
        }
    }

    private void fixDispatchedTasksStatus(final long j) throws AccessException {
        new Thread(new Runnable() { // from class: com.alibaba.dts.client.service.NodeServerServiceImpl.1
            @Override // java.lang.Runnable
            public void run() {
                boolean z = false;
                while (!z) {
                    try {
                        TimeUnit.SECONDS.sleep(10L);
                        long deleteByJobInstanceId = NodeServerServiceImpl.this.clientContext.getStore().getTaskSnapshotDao().deleteByJobInstanceId(j);
                        while (deleteByJobInstanceId > 0) {
                            deleteByJobInstanceId = NodeServerServiceImpl.this.clientContext.getStore().getTaskSnapshotDao().deleteByJobInstanceId(j);
                        }
                        z = true;
                    } catch (Throwable th) {
                        NodeServerServiceImpl.logger.error("deleteByJobInstanceId error", th);
                        z = false;
                    }
                }
            }
        }).start();
    }

    public ConcurrentHashMap<Long, ExecutorService> getExecutorServiceMap() {
        return this.executorServiceMap;
    }
}
