/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.dts.client.remoting;

import com.alibaba.dts.client.executor.grid.ClientNodeRemotingServer;
import com.alibaba.dts.client.executor.grid.ClientNodeSystemRemotingServer;
import com.alibaba.dts.client.executor.job.context.ClientContextImpl;
import com.alibaba.dts.client.executor.job.context.JobContextImpl;
import com.alibaba.dts.client.remoting.listener.ServerChannelEventListener;
import com.alibaba.dts.client.remoting.processor.NodeClientRequestProcessor;
import com.alibaba.dts.client.remoting.processor.NodeServerRequestProcessor;
import com.alibaba.dts.client.remoting.proxy.NodeClientInvocationHandler;
import com.alibaba.dts.client.remoting.proxy.NodeClientSystemInvocationHandler;
import com.alibaba.dts.client.remoting.proxy.NodeServerInvocationHandler;
import com.alibaba.dts.client.remoting.timer.NodeHeartBeatTimer;
import com.alibaba.dts.client.remoting.timer.NodeSnifferTimer;
import com.alibaba.dts.common.constants.Constants;
import com.alibaba.dts.common.context.InvocationContext;
import com.alibaba.dts.common.domain.ExecutableTask;
import com.alibaba.dts.common.domain.remoting.RemoteMachine;
import com.alibaba.dts.common.domain.result.Result;
import com.alibaba.dts.common.domain.result.ResultCode;
import com.alibaba.dts.common.domain.store.ExecutionCounter;
import com.alibaba.dts.common.domain.store.Job;
import com.alibaba.dts.common.domain.store.JobInstanceSnapshot;
import com.alibaba.dts.common.domain.store.TaskSnapshot;
import com.alibaba.dts.common.exception.AccessException;
import com.alibaba.dts.common.exception.InitException;
import com.alibaba.dts.common.exception.RemotingConnectException;
import com.alibaba.dts.common.exception.RemotingSendRequestException;
import com.alibaba.dts.common.exception.RemotingTimeoutException;
import com.alibaba.dts.common.logger.SchedulerXLoggerFactory;
import com.alibaba.dts.common.logger.innerlog.Logger;
import com.alibaba.dts.common.remoting.netty.NettyClientConfig;
import com.alibaba.dts.common.remoting.netty.NettyRemotingClient;
import com.alibaba.dts.common.remoting.netty.NettyRemotingServer;
import com.alibaba.dts.common.remoting.netty.NettyServerConfig;
import com.alibaba.dts.common.remoting.protocol.RemotingCommand;
import com.alibaba.dts.common.service.NodeClientService;
import com.alibaba.dts.common.service.NodeServerService;
import com.alibaba.dts.common.util.NamedThreadFactory;
import com.alibaba.dts.shade.io.netty.channel.Channel;
import java.lang.reflect.InvocationHandler;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

public class NodeRemoting
implements Constants {
    private static final Logger logger = SchedulerXLoggerFactory.getLogger(NodeRemoting.class);
    public static volatile boolean isGetNodeIpsAvailable = true;
    private NettyRemotingClient client;
    private NettyRemotingServer server;
    private NettyRemotingServer systemServer;
    private NodeClientService nodeClientService;
    private NodeServerService nodeServerService;
    private NodeServerService nodeServerSystemService;
    private volatile List<String> serverListCache;
    private volatile List<String> nodeListCache;
    private ClientContextImpl clientContext;
    private InvocationHandler nodeServerInvocationHandler;
    private InvocationHandler nodeClientSystemInvocationHandler;
    private InvocationHandler nodeClientInvocationHandler;
    private ThreadPoolExecutor bizExecutors = null;
    private ThreadPoolExecutor systemExecutors = null;
    private LinkedBlockingQueue<Runnable> requestQueue = new LinkedBlockingQueue();
    private LinkedBlockingQueue<Runnable> systemRequestQueue = new LinkedBlockingQueue();
    private ConcurrentHashMap<String, Integer> clientNodes = new ConcurrentHashMap();
    private ConcurrentHashMap<String, AtomicInteger> snifferedClientNodes = new ConcurrentHashMap();
    private ScheduledExecutorService nodeSnifferExecutorService = Executors.newScheduledThreadPool(1, new ThreadFactory(){
        int index = 0;

        @Override
        public Thread newThread(Runnable runnable) {
            ++this.index;
            return new Thread(runnable, "Schedulerx-Server-Sniffer-Thread-" + this.index);
        }
    });
    private ScheduledExecutorService timeExecutorService = Executors.newScheduledThreadPool(1, new ThreadFactory(){
        int index = 0;

        @Override
        public Thread newThread(Runnable runnable) {
            ++this.index;
            return new Thread(runnable, "DTS-heart-beat-thread-" + this.index);
        }
    });
    private ExecutorService taskReSendExecutorService = Executors.newSingleThreadExecutor(new NamedThreadFactory("Task-ReSend-Thread-"));
    private ExecutorService taskDealFailExecutorService = Executors.newSingleThreadExecutor(new NamedThreadFactory("Task-DealFail-Thread-"));
    private ExecutorService nodeAliveCheckExecutorService = Executors.newFixedThreadPool(10, new NamedThreadFactory("SchedulerX-Node-Alive-Check-Thread-"));
    public static volatile AtomicBoolean isNodeHeartBeatRunning = new AtomicBoolean(false);

    public NodeRemoting(ClientContextImpl clientContext) {
        this.clientContext = clientContext;
        this.nodeClientInvocationHandler = new NodeClientInvocationHandler(clientContext);
        this.nodeServerInvocationHandler = new NodeServerInvocationHandler(clientContext);
        this.nodeClientSystemInvocationHandler = new NodeClientSystemInvocationHandler(clientContext);
        this.nodeServerService = this.proxyServerInterface();
        this.nodeServerSystemService = this.proxyServerSystemInterface();
        this.nodeClientService = this.proxyClientInterface();
    }

    public void init() throws InitException {
        this.initNodeServer();
        this.initNodeSystemServer();
        this.initNodeClient();
        this.initNodeHeartbeatTimer();
    }

    private void initNodeSystemServer() throws InitException {
        NettyServerConfig config = new NettyServerConfig();
        config.setListenPort(this.clientContext.getNodeConfig().getSystemListenPort());
        logger.info("local system listen port is {}", (Object)this.clientContext.getNodeConfig().getSystemListenPort());
        NodeServerRequestProcessor systemProcessor = new NodeServerRequestProcessor(this.systemRequestQueue, this.clientContext);
        systemProcessor.init();
        ServerChannelEventListener listener = new ServerChannelEventListener(this.clientContext);
        this.systemServer = new ClientNodeSystemRemotingServer(config, listener, this.clientContext);
        this.systemExecutors = new ThreadPoolExecutor(this.clientContext.getNodeConfig().getRemotingThreads(), this.clientContext.getNodeConfig().getRemotingThreads(), 0L, TimeUnit.MILLISECONDS, this.systemRequestQueue, new ThreadFactory(){
            int index = 0;

            @Override
            public Thread newThread(Runnable runnable) {
                ++this.index;
                return new Thread(runnable, "SchedulerX-node-system-server-remoting-thread-" + this.index);
            }
        });
        this.systemServer.registerProcessor(0, systemProcessor, this.systemExecutors);
        try {
            this.systemServer.start();
        }
        catch (Throwable e) {
            throw new InitException("[ServerRemoting]: init error", e);
        }
    }

    private void initNodeServer() throws InitException {
        NettyServerConfig config = new NettyServerConfig();
        config.setListenPort(this.clientContext.getNodeConfig().getListenPort());
        logger.info("local listen port is {}", (Object)this.clientContext.getNodeConfig().getListenPort());
        NodeServerRequestProcessor bizProcessor = new NodeServerRequestProcessor(this.requestQueue, this.clientContext);
        bizProcessor.init();
        ServerChannelEventListener listener = new ServerChannelEventListener(this.clientContext);
        this.server = new ClientNodeRemotingServer(config, listener, this.clientContext);
        this.bizExecutors = new ThreadPoolExecutor(this.clientContext.getNodeConfig().getRemotingThreads(), this.clientContext.getNodeConfig().getRemotingThreads(), 0L, TimeUnit.MILLISECONDS, this.requestQueue, new ThreadFactory(){
            int index = 0;

            @Override
            public Thread newThread(Runnable runnable) {
                ++this.index;
                return new Thread(runnable, "SchedulerX-node-server-remoting-thread-" + this.index);
            }
        });
        this.server.registerProcessor(0, bizProcessor, this.bizExecutors);
        try {
            this.server.start();
        }
        catch (Throwable e) {
            throw new InitException("[ServerRemoting]: init error", e);
        }
    }

    private void initNodeClient() throws InitException {
        NettyClientConfig config = new NettyClientConfig();
        NodeClientRequestProcessor processor = new NodeClientRequestProcessor(this.clientContext);
        this.client = new NettyRemotingClient(config);
        this.client.registerProcessor(0, processor, Executors.newFixedThreadPool(this.clientContext.getNodeConfig().getRemotingThreads(), new ThreadFactory(){
            int index = 0;

            @Override
            public Thread newThread(Runnable runnable) {
                ++this.index;
                Thread thread = new Thread(runnable, "SchedulerX-node-client-remoting-thread-" + this.index);
                thread.setPriority(10);
                return thread;
            }
        }));
        try {
            this.client.start();
        }
        catch (Throwable e) {
            throw new InitException("[ClientRemoting]: initRemotingClient error", e);
        }
    }

    private void initNodeHeartbeatTimer() throws InitException {
        try {
            this.nodeSnifferExecutorService.scheduleAtFixedRate(new NodeSnifferTimer(this.clientContext), 0L, this.clientContext.getNodeConfig().getHeartbeatInterval() * 3L, TimeUnit.MILLISECONDS);
            this.timeExecutorService.scheduleAtFixedRate(new NodeHeartBeatTimer(this.clientContext), 0L, this.clientContext.getNodeConfig().getHeartbeatInterval(), TimeUnit.MILLISECONDS);
        }
        catch (Throwable e) {
            throw new InitException("[ClientRemoting]: initHeartBeatTimer error, heartBeatIntervalTime:" + this.clientContext.getClientConfig().getHeartBeatIntervalTime(), e);
        }
        logger.warn("[ClientRemoting]: initHeartBeatTimer success, heartBeatIntervalTime:" + this.clientContext.getClientConfig().getHeartBeatIntervalTime());
    }

    public void connectNodes(List<RemoteMachine> remoteMachines) throws InitException, InterruptedException {
        if (remoteMachines == null || remoteMachines.isEmpty()) {
            return;
        }
        final CountDownLatch latch = new CountDownLatch(remoteMachines.size());
        for (final RemoteMachine remoteMachine : remoteMachines) {
            this.nodeAliveCheckExecutorService.submit(new Runnable(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void run() {
                    try {
                        NodeRemoting.this.connectNode(remoteMachine);
                    }
                    catch (Throwable throwable) {
                        logger.warn("failed to connect to node " + remoteMachine, throwable);
                    }
                    finally {
                        latch.countDown();
                    }
                }
            });
        }
        latch.await();
    }

    private void connectNode(RemoteMachine remoteMachine) {
        try {
            String key = remoteMachine.getRemoteAddress() + ":" + remoteMachine.getNodeSystemListenPort();
            String systemRemoteAddress = remoteMachine.getSystemRemoteAddress();
            if (!this.clientNodes.containsKey(key)) {
                this.clientNodes.put(key, 0);
                this.snifferedClientNodes.put(key, new AtomicInteger(0));
            }
            long connectTime = System.currentTimeMillis();
            this.clientContext.getNodeConfig().setConnectTime(connectTime);
            InvocationContext.setRemoteMachine(remoteMachine);
            remoteMachine.setTimeout(10000L);
            Result<Boolean> connectResult = this.nodeServerSystemService.connect();
            if (null == connectResult) {
                this.dealConnectFailed(key, remoteMachine.getRemoteAddress());
                return;
            }
            if (!connectResult.getData().booleanValue()) {
                this.dealConnectFailed(key, remoteMachine.getRemoteAddress());
                throw new InitException("[ClientRemoting]: connectServer error," + connectResult.getResultCode().getInformation());
            }
            this.clientNodes.put(key, 0);
            this.snifferedClientNodes.put(key, new AtomicInteger(0));
        }
        catch (Throwable throwable) {
            logger.warn("failed to connect to node " + remoteMachine.getRemoteAddress(), throwable);
        }
    }

    private void dealConnectFailed(String key, String remoteAddress) {
        int missingCount = this.clientNodes.get(key) + 1;
        if (missingCount >= 4) {
            this.clientNodes.remove(key);
            this.snifferedClientNodes.remove(key);
            logger.info("remove " + key + " from clientNodes");
            logger.warn("failed to connect to node , connectResult is null, clientGroup: " + this.clientContext.getNodeConfig().getGroupId() + ", remoteMachine: " + key + ", failed count: " + missingCount);
            if (this.clientContext.getNodeConfig().isEnableRedispatch()) {
                this.reSendTasks(remoteAddress);
            } else {
                this.setTasksFail(remoteAddress);
            }
        } else {
            this.clientNodes.put(key, missingCount);
            this.snifferedClientNodes.put(key, new AtomicInteger(missingCount));
        }
    }

    private void setTasksFail(final String remoteAddress) {
        this.taskDealFailExecutorService.submit(new Runnable(){
            private List<Job> jobs = new ArrayList<Job>();
            private List<JobInstanceSnapshot> jobInstanceSnapshots = new ArrayList<JobInstanceSnapshot>();

            @Override
            public void run() {
                try {
                    for (ConcurrentHashMap<String, ConcurrentHashMap<String, ExecutionCounter>> executionCounterMapByReceiveNode : NodeRemoting.this.clientContext.getExecutionCounterTable().values()) {
                        for (ConcurrentHashMap<String, ExecutionCounter> executionCounterMapByTaskName : executionCounterMapByReceiveNode.values()) {
                            for (ExecutionCounter executionCounter : executionCounterMapByTaskName.values()) {
                                if (!remoteAddress.equals(executionCounter.getReceiveNode())) continue;
                                executionCounter.getFailCounter().set(executionCounter.getFailCounter().get() + executionCounter.getQueuedCounter().get());
                                executionCounter.getQueuedCounter().set(0L);
                            }
                        }
                    }
                    long count = NodeRemoting.this.clientContext.getStore().getTaskSnapshotDao().deleteByReceiveNodeAddressAndStatus(remoteAddress, 1);
                    while (count > 0L) {
                        count = NodeRemoting.this.clientContext.getStore().getTaskSnapshotDao().deleteByReceiveNodeAddressAndStatus(remoteAddress, 1);
                    }
                }
                catch (Throwable throwable) {
                    logger.error("failed to reSend tasks, receiveNodeAddress=" + remoteAddress, throwable);
                }
            }
        });
    }

    private void reSendTasks(final String remoteAddress) {
        this.taskReSendExecutorService.submit(new Runnable(){
            private List<Job> jobs = new ArrayList<Job>();
            private List<JobInstanceSnapshot> jobInstanceSnapshots = new ArrayList<JobInstanceSnapshot>();

            @Override
            public void run() {
                try {
                    for (ConcurrentHashMap<String, ConcurrentHashMap<String, ExecutionCounter>> executionCounterMapByReceiveNode : NodeRemoting.this.clientContext.getExecutionCounterTable().values()) {
                        for (ConcurrentHashMap<String, ExecutionCounter> executionCounterMapByTaskName : executionCounterMapByReceiveNode.values()) {
                            for (ExecutionCounter executionCounter : executionCounterMapByTaskName.values()) {
                                if (!remoteAddress.equals(executionCounter.getReceiveNode())) continue;
                                executionCounter.getTotalCounter().set(executionCounter.getSuccessCounter().get() + executionCounter.getFailCounter().get());
                                executionCounter.getQueuedCounter().set(0L);
                            }
                        }
                    }
                    List<TaskSnapshot> taskSnapshots = this.listReSendTasks(0L);
                    while (taskSnapshots != null && !taskSnapshots.isEmpty()) {
                        ArrayList<ExecutableTask> executableTasks = new ArrayList<ExecutableTask>();
                        for (TaskSnapshot taskSnapshot : taskSnapshots) {
                            ExecutableTask executableTask;
                            Job job;
                            taskSnapshot.setCompensation(true);
                            long jobInstanceId = taskSnapshot.getJobInstanceId();
                            JobInstanceSnapshot jobInstanceSnapshot = this.getJobInstanceExisted(jobInstanceId);
                            if (jobInstanceSnapshot == null) {
                                Result<JobInstanceSnapshot> result = this.getJobInstance(jobInstanceId);
                                if (result == null || result.getResultCode() != ResultCode.SUCCESS) {
                                    return;
                                }
                                jobInstanceSnapshot = result.getData();
                                this.jobInstanceSnapshots.add(jobInstanceSnapshot);
                            }
                            if ((job = this.getJobExisted(jobInstanceSnapshot.getJobId())) == null) {
                                Result<Job> result = this.getJob(jobInstanceSnapshot.getJobId());
                                if (result == null || result.getResultCode() != ResultCode.SUCCESS) {
                                    return;
                                }
                                job = result.getData();
                                this.jobs.add(job);
                            }
                            if ((executableTask = this.getExecutableTask(executableTasks, jobInstanceId)) == null) {
                                executableTask = new ExecutableTask();
                                executableTask.setJobInstanceSnapshot(jobInstanceSnapshot);
                                executableTask.setJob(job);
                                executableTasks.add(executableTask);
                            }
                            executableTask.getTaskSnapshotList().add(taskSnapshot);
                        }
                        for (ExecutableTask executableTask : executableTasks) {
                            JobContextImpl jobContext = new JobContextImpl();
                            jobContext.setJob(executableTask.getJob());
                            jobContext.setJobInstanceSnapshot(executableTask.getJobInstanceSnapshot());
                            Result<Boolean> result = NodeRemoting.this.clientContext.getGridTaskSender().dispatchCompensateTaskList(executableTask.getTaskSnapshotList(), jobContext);
                            if (result != null && !result.getData().booleanValue()) continue;
                        }
                        long start = taskSnapshots.get(taskSnapshots.size() - 1).getId();
                        taskSnapshots = this.listReSendTasks(start);
                    }
                }
                catch (Throwable throwable) {
                    logger.error("failed to reSend tasks, receiveNodeAddress=" + remoteAddress, throwable);
                }
            }

            private List<TaskSnapshot> listReSendTasks(long startId) {
                List<TaskSnapshot> taskSnapshots = null;
                try {
                    taskSnapshots = NodeRemoting.this.clientContext.getStore().getTaskSnapshotDao().listByIdAndReceiveNodeAndStatus(startId, remoteAddress, 1);
                }
                catch (AccessException e) {
                    logger.error("", e);
                }
                return taskSnapshots;
            }

            private Job getJobExisted(long jobId) {
                for (Job job : this.jobs) {
                    if (job.getId() != jobId) continue;
                    return job;
                }
                return null;
            }

            private JobInstanceSnapshot getJobInstanceExisted(long jobInstanceId) {
                for (JobInstanceSnapshot jobInstanceSnapshot : this.jobInstanceSnapshots) {
                    if (jobInstanceSnapshot.getId() != jobInstanceId) continue;
                    return jobInstanceSnapshot;
                }
                return null;
            }

            private Result<JobInstanceSnapshot> getJobInstance(long jobInstanceId) {
                List<String> serverList = NodeRemoting.this.clientContext.getClientRemoting().getServerList();
                Result<JobInstanceSnapshot> result = null;
                for (String server : serverList) {
                    InvocationContext.setRemoteMachine(new RemoteMachine(server));
                    result = NodeRemoting.this.clientContext.getServerService().getJobInstanceById(jobInstanceId);
                    if (result != null) break;
                    logger.error("clientContext getServerService getJobInstanceById error from server {} with job instance id {}", (Object)server, (Object)jobInstanceId);
                }
                return result;
            }

            private Result<Job> getJob(long jobId) {
                List<String> serverList = NodeRemoting.this.clientContext.getClientRemoting().getServerList();
                Result<Job> result = null;
                for (String server : serverList) {
                    InvocationContext.setRemoteMachine(new RemoteMachine(server));
                    result = NodeRemoting.this.clientContext.getServerService().getJobById(jobId);
                    if (result != null) break;
                    logger.error("clientContext getServerService getJobById error from server {} with job id {}", (Object)server, (Object)jobId);
                }
                return result;
            }

            private ExecutableTask getExecutableTask(List<ExecutableTask> executableTasks, long jobInstanceId) {
                for (ExecutableTask executableTask : executableTasks) {
                    if (executableTask.getJobInstanceSnapshot().getId() != jobInstanceId) continue;
                    return executableTask;
                }
                return null;
            }
        });
    }

    public NodeClientService proxyClientInterface() {
        return this.clientContext.getProxyService().proxyInterface(NodeClientService.class, this.nodeServerInvocationHandler);
    }

    public NodeServerService proxyServerInterface() {
        return this.clientContext.getProxyService().proxyInterface(NodeServerService.class, this.nodeClientInvocationHandler);
    }

    private NodeServerService proxyServerSystemInterface() {
        return this.clientContext.getProxyService().proxyInterface(NodeServerService.class, this.nodeClientSystemInvocationHandler);
    }

    public RemotingCommand invokeSync(Channel channel, RemotingCommand request, long timeoutMillis) throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
        return this.server.invokeSync(channel, request, timeoutMillis);
    }

    public RemotingCommand invokeSync(String addr, RemotingCommand request, long timeoutMillis) throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
        return this.client.invokeSync(addr, request, timeoutMillis);
    }

    public void deleteConnection(String remoteAddress) {
    }

    public Channel getAndCreateChannel(String addr) throws InterruptedException {
        return this.client.getAndCreateChannel(addr);
    }

    public List<RemoteMachine> getNodes(String groupId, long jobId, int jobType) {
        List<RemoteMachine> remoteMachines = new ArrayList<RemoteMachine>();
        try {
            List<String> serverList = this.clientContext.getClientRemoting().getServerList();
            if (serverList == null || serverList.isEmpty()) {
                return remoteMachines;
            }
            Collections.shuffle(serverList);
            String localAddressWithListenPort = this.clientContext.getNodeConfig().getLocalAddress();
            List<Object> remoteNodes = new ArrayList();
            for (String serverAddress : serverList) {
                try {
                    InvocationContext.setRemoteMachine(new RemoteMachine(serverAddress));
                    if (isGetNodeIpsAvailable) {
                        remoteNodes = this.clientContext.getClientRemoting().getServerSystemService().getNodeIps(groupId, jobId, jobType);
                        if (remoteNodes == null || remoteNodes.size() == 0) continue;
                        for (String string : remoteNodes) {
                            RemoteMachine remoteMachine = new RemoteMachine();
                            String[] ipPorts = string.split(":");
                            String remoteIp = ipPorts[0];
                            int nodeListenPort = Integer.valueOf(ipPorts[1]);
                            int systemNodeListenPort = Integer.valueOf(ipPorts[2]);
                            String remoteAddress = remoteIp + ":" + nodeListenPort;
                            if (remoteNodes.size() > 1 && localAddressWithListenPort.equals(remoteAddress) && this.clientContext.getNodeConfig().isDispatchOnly()) continue;
                            String systemRemoteAddress = remoteIp + ":" + systemNodeListenPort;
                            remoteMachine.setRemoteAddress(remoteAddress);
                            remoteMachine.setSystemRemoteAddress(systemRemoteAddress);
                            remoteMachines.add(remoteMachine);
                        }
                        return remoteMachines;
                    }
                    remoteMachines = this.clientContext.getServerService().getRemoteMachines(groupId, jobId);
                    Iterator<RemoteMachine> remoteMachineIterator = remoteMachines.iterator();
                    while (remoteMachineIterator.hasNext()) {
                        RemoteMachine remoteMachine = remoteMachineIterator.next();
                        String remoteIp = remoteMachine.getRemoteAddress().substring(0, remoteMachine.getRemoteAddress().indexOf(":"));
                        String remoteAddress = remoteIp + ":" + remoteMachine.getNodeListenPort();
                        String systemRemoteAddress = remoteIp + ":" + remoteMachine.getNodeSystemListenPort();
                        if (remoteMachines.size() > 1 && localAddressWithListenPort.equals(remoteAddress) && this.clientContext.getNodeConfig().isDispatchOnly()) {
                            remoteMachineIterator.remove();
                            continue;
                        }
                        remoteMachine.setRemoteAddress(remoteAddress);
                        remoteMachine.setSystemRemoteAddress(systemRemoteAddress);
                    }
                    if (remoteMachines.size() <= 0) continue;
                    return remoteMachines;
                }
                catch (Throwable e) {
                    logger.error("getNodes error,serverAddress:" + serverAddress, e);
                }
            }
        }
        catch (Throwable e) {
            logger.error("getServerList error", e);
        }
        return remoteMachines;
    }

    public ConcurrentHashMap<String, Integer> getClientNodes() {
        return this.clientNodes;
    }

    public void sniffer(RemoteMachine remoteMachine) {
        InvocationContext.setRemoteMachine(remoteMachine);
        remoteMachine.setTimeout(10000L);
        Result<Boolean> connectResult = this.nodeServerSystemService.connect();
        StringBuilder sb = new StringBuilder();
        sb.append(remoteMachine.getRemoteAddress()).append(":").append(remoteMachine.getNodeSystemListenPort());
        String key = sb.toString();
        if (null == connectResult || !connectResult.getData().booleanValue()) {
            AtomicInteger failedCount = this.snifferedClientNodes.get(key);
            if (failedCount != null) {
                failedCount.getAndIncrement();
                if (failedCount.get() >= 4 && failedCount.get() % 4 == 0) {
                    logger.warn("failed to sniffer node , connectResult is null, machineGroup:" + this.clientContext.getNodeConfig().getGroupId() + ", remoteMachine: " + key + ", failedCount: " + failedCount.get());
                }
            } else {
                failedCount = new AtomicInteger(1);
                AtomicInteger failedCountExisted = this.snifferedClientNodes.putIfAbsent(key, failedCount);
                if (failedCountExisted != null) {
                    failedCount = failedCountExisted;
                    failedCountExisted.getAndIncrement();
                }
            }
            if (failedCount.get() >= 16) {
                this.snifferedClientNodes.remove(key);
                logger.info("remove " + key + " from snifferedClientNodes");
            }
            return;
        }
        this.clientNodes.put(key, 0);
        this.snifferedClientNodes.put(key, new AtomicInteger(0));
    }

    public void stopService() {
        this.nodeSnifferExecutorService.shutdownNow();
        this.timeExecutorService.shutdownNow();
    }

    public NodeServerService getNodeServerService() {
        return this.nodeServerService;
    }

    public NodeServerService getNodeServerSystemService() {
        return this.nodeServerSystemService;
    }

    public NodeClientService getNodeClientService() {
        return this.nodeClientService;
    }
}

