package com.alibaba.dts.client.executor.grid.queue.send;

import com.alibaba.dts.client.executor.grid.queue.TaskEvent;
import com.alibaba.dts.client.executor.job.context.ClientContextImpl;
import com.alibaba.dts.client.executor.job.context.JobContextImpl;
import com.alibaba.dts.client.route.RouteRule;
import com.alibaba.dts.client.route.impl.RoundRobinRule;
import com.alibaba.dts.common.domain.remoting.RemoteMachine;
import com.alibaba.dts.common.logger.SchedulerXLoggerFactory;
import com.alibaba.dts.common.logger.innerlog.Logger;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import org.springframework.util.CollectionUtils;

/* loaded from: input_file:com/alibaba/dts/client/executor/grid/queue/send/TaskRouter.class */
public class TaskRouter implements Runnable {
    private static final Logger logger = SchedulerXLoggerFactory.getLogger((Class<?>) TaskRouter.class);
    private ClientContextImpl clientContext;
    private SendManager sendManager;
    private Map<String, RouteRule> routeMap;
    private RouteRule routeRule = new RoundRobinRule();

    public TaskRouter(ClientContextImpl clientContextImpl, SendManager sendManager) {
        this.clientContext = clientContextImpl;
        this.sendManager = sendManager;
        this.routeMap = clientContextImpl.getNodeConfig().getRouteMap();
    }

    @Override // java.lang.Runnable
    public void run() {
        BlockingQueue<TaskEvent> routeQueue = this.sendManager.getRouteQueue();
        BlockingQueue<TaskEvent> mergeQueue = this.sendManager.getMergeQueue();
        while (true) {
            TaskEvent taskEvent = null;
            try {
                taskEvent = routeQueue.take();
                long id = taskEvent.getExecutableTask().getJob().getId();
                long id2 = taskEvent.getExecutableTask().getJobInstanceSnapshot().getId();
                if (this.clientContext.getGridJobManager().containsInterruptedJobInstance(id2)) {
                    logger.debug("job instance interrupted, jobId={}, jobInstanceId={}, taskId={}", Long.valueOf(id), Long.valueOf(id2), Long.valueOf(taskEvent.getExecutableTask().getTaskSnapshot().getId()));
                } else {
                    try {
                        List<RemoteMachine> list = this.sendManager.getMachinesByJob().get(Long.valueOf(taskEvent.getExecutableTask().getJob().getId()));
                        if (this.routeMap == null || CollectionUtils.isEmpty(this.routeMap)) {
                            logger.debug("[TaskRouter] routeMap is null! jobId={}, jobInstanceId={}", Long.valueOf(id), Long.valueOf(id2));
                        } else {
                            RouteRule routeRule = this.routeMap.get(taskEvent.getExecutableTask().getJob().getJobProcessor());
                            if (routeRule != null) {
                                this.routeRule = routeRule;
                            }
                        }
                        JobContextImpl jobContextImpl = new JobContextImpl();
                        jobContextImpl.setJob(taskEvent.getExecutableTask().getJob());
                        jobContextImpl.setJobInstanceSnapshot(taskEvent.getExecutableTask().getJobInstanceSnapshot());
                        jobContextImpl.setRouteTask(taskEvent.getTask());
                        jobContextImpl.setTaskName(taskEvent.getExecutableTask().getTaskSnapshot().getTaskName());
                        RemoteMachine rule = this.routeRule.rule(jobContextImpl, list);
                        if (rule == null) {
                            logger.error("[TaskRouter] RoutePreProcess error,targetMachine is null! jobId={}, jobInstanceId={}", Long.valueOf(id), Long.valueOf(id2));
                        }
                        taskEvent.setTargetMachine(rule);
                        mergeQueue.put(taskEvent);
                        logger.debug("[TaskRouter] RoutePreProcess,remoteMachine={}, jobId={}, jobInstanceId={}", rule, Long.valueOf(id), Long.valueOf(id2));
                    } catch (Throwable th) {
                        logger.error("[TaskRouter] RoutePreProcess error, jobId={}, jobInstanceId={}", Long.valueOf(id), Long.valueOf(id2), th);
                    }
                }
            } catch (Throwable th2) {
                logger.error("failed to route task, task={}", taskEvent, th2);
            }
        }
    }
}
