package com.taobao.txc.resourcemanager;

import com.taobao.txc.common.CommitMode;
import com.taobao.txc.common.LoggerInit;
import com.taobao.txc.common.LoggerWrap;
import com.taobao.txc.common.NetUtil;
import com.taobao.txc.common.TransactionMode;
import com.taobao.txc.common.TxcConstants;
import com.taobao.txc.common.TxcContext;
import com.taobao.txc.common.TxcXID;
import com.taobao.txc.common.config.IConfigCallback;
import com.taobao.txc.common.config.TxcConfigHolder;
import com.taobao.txc.common.exception.TxcErrCode;
import com.taobao.txc.common.exception.TxcException;
import com.taobao.txc.common.message.RegisterMessage;
import com.taobao.txc.common.message.ResultCode;
import com.taobao.txc.common.message.TxcMergeMessage;
import com.taobao.txc.common.message.TxcMergeResultMessage;
import com.taobao.txc.common.message.TxcMessage;
import com.taobao.txc.common.util.string.TxcString;
import com.taobao.txc.resourcemanager.jdbc.TxcAtomDataSourceHelper;
import com.taobao.txc.resourcemanager.mt.MtRmRpcClient;
import com.taobao.txc.rpc.api.ClientMessageListener;
import com.taobao.txc.rpc.api.TxcClientMessageSender;
import com.taobao.txc.rpc.impl.HeartbeatMessage;
import com.taobao.txc.rpc.impl.MessageFuture;
import com.taobao.txc.rpc.impl.RegisterRmMessage;
import com.taobao.txc.rpc.impl.RegisterRmResultMessage;
import com.taobao.txc.rpc.impl.RpcEndpoint;
import com.taobao.txc.rpc.impl.RpcMessage;
import com.taobao.txc.rpc.impl.TxcMessageCodec;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.internal.ConcurrentSet;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

@ChannelHandler.Sharable
/* loaded from: input_file:BOOT-INF/lib/txc-client-2.9.1.jar:com/taobao/txc/resourcemanager/RmRpcClient.class */
public class RmRpcClient extends RpcEndpoint implements IRmRpcClient, TxcClientMessageSender {
    private NioEventLoopGroup eventloopGroup;
    private ClientMessageListener clientMessageListener;
    private String appName;
    protected volatile Set<String> serverAddressList;
    private ConcurrentHashMap<String, Object> channelLocks;
    protected ConcurrentHashMap<String, TxcChannel> channels;
    private static RmRpcClient instance;
    private String customerKeys;
    private static final LoggerWrap logger = LoggerInit.logger;
    public static Set<String> tableKeywords = null;

    /* loaded from: input_file:BOOT-INF/lib/txc-client-2.9.1.jar:com/taobao/txc/resourcemanager/RmRpcClient$TxcChannel.class */
    public static final class TxcChannel {
        Channel channel;
        TransactionMode mode;

        public TxcChannel(Channel channel, TransactionMode transactionMode) {
            this.channel = channel;
            this.mode = transactionMode;
        }
    }

    /* loaded from: input_file:BOOT-INF/lib/txc-client-2.9.1.jar:com/taobao/txc/resourcemanager/RmRpcClient$Worker.class */
    public class Worker implements Runnable {
        RmRpcClient client;

        public Worker(RmRpcClient rmRpcClient) {
            this.client = rmRpcClient;
        }

        @Override // java.lang.Runnable
        public void run() {
            while (true) {
                synchronized (RmRpcClient.this.mergeLock) {
                    try {
                        RmRpcClient.this.mergeLock.wait(1L);
                    } catch (InterruptedException e) {
                    }
                }
                RmRpcClient.this.isSending = true;
                Iterator it = RmRpcClient.this.rmBasketMap.keySet().iterator();
                while (it.hasNext()) {
                    String str = (String) it.next();
                    BlockingQueue blockingQueue = (BlockingQueue) RmRpcClient.this.rmBasketMap.get(str);
                    if (!blockingQueue.isEmpty()) {
                        TxcMergeMessage txcMergeMessage = new TxcMergeMessage();
                        int i = 0;
                        while (!blockingQueue.isEmpty()) {
                            RpcMessage rpcMessage = (RpcMessage) blockingQueue.poll();
                            if (RmRpcClient.logger.isDebugEnabled() && (rpcMessage.getBody() instanceof RegisterMessage)) {
                                RmRpcClient.logger.debug("poll msg:" + rpcMessage.getBody());
                            }
                            txcMergeMessage.msgs.add((TxcMessage) rpcMessage.getBody());
                            txcMergeMessage.msgIds.add(Long.valueOf(rpcMessage.getId()));
                            i++;
                        }
                        if (i > 1 && RmRpcClient.logger.isDebugEnabled()) {
                            RmRpcClient.logger.debug("msgs:" + i);
                            Iterator<TxcMessage> it2 = txcMergeMessage.msgs.iterator();
                            while (it2.hasNext()) {
                                RmRpcClient.logger.debug(it2.next().toString());
                            }
                            StringBuffer stringBuffer = new StringBuffer();
                            Iterator<Long> it3 = txcMergeMessage.msgIds.iterator();
                            while (it3.hasNext()) {
                                stringBuffer.append("msgid:").append(it3.next().longValue()).append(";");
                            }
                            stringBuffer.append("\n");
                            Iterator it4 = RmRpcClient.this.futures.keySet().iterator();
                            while (it4.hasNext()) {
                                stringBuffer.append("futures:").append(((Long) it4.next()).longValue()).append(";");
                            }
                            RmRpcClient.logger.debug(stringBuffer.toString());
                        }
                        try {
                            RmRpcClient.this.sendRequest(this.client.connect(str), txcMergeMessage);
                        } catch (Exception e2) {
                            RmRpcClient.logger.error("", "txc merge call failed", e2);
                        }
                    }
                }
                RmRpcClient.this.isSending = false;
            }
        }
    }

    public String getAppName() {
        return this.appName;
    }

    public RmRpcClient(ThreadPoolExecutor threadPoolExecutor) {
        super(threadPoolExecutor);
        this.eventloopGroup = new NioEventLoopGroup(1);
        this.serverAddressList = null;
        this.channelLocks = new ConcurrentHashMap<>();
        this.channels = new ConcurrentHashMap<>();
    }

    public static RmRpcClient getInstance() {
        return instance;
    }

    public static RmRpcClient getInstance(ThreadPoolExecutor threadPoolExecutor) {
        if (instance == null) {
            instance = new RmRpcClient(threadPoolExecutor);
        }
        TxcConfigHolder.getInstance().getTableKeywordsDynamic(new IConfigCallback() { // from class: com.taobao.txc.resourcemanager.RmRpcClient.1
            @Override // com.taobao.txc.common.config.IConfigCallback
            public void callback(String str) {
                RmRpcClient.logger.info(String.format("table key words: %s", str));
                if (str == null) {
                    return;
                }
                String trim = str.trim();
                if (trim.length() == 0) {
                    return;
                }
                try {
                    if (RmRpcClient.tableKeywords == null) {
                        RmRpcClient.tableKeywords = new ConcurrentSet();
                    }
                    RmRpcClient.tableKeywords.clear();
                    for (String str2 : trim.split(",")) {
                        RmRpcClient.tableKeywords.add(str2.toUpperCase());
                    }
                } catch (Exception e) {
                }
            }
        });
        return instance;
    }

    @Override // com.taobao.txc.rpc.impl.RpcEndpoint, com.taobao.txc.resourcemanager.IRmRpcClient
    public void init() {
        this.skipVip = TxcConfigHolder.getSkipVip(false);
        logger.info("RmRpcClient skip vip " + this.skipVip);
        this.timerExecutor.scheduleAtFixedRate(new Runnable() { // from class: com.taobao.txc.resourcemanager.RmRpcClient.2
            @Override // java.lang.Runnable
            public void run() {
                RmRpcClient.this.reconnect();
            }
        }, 30L, 5L, TimeUnit.SECONDS);
        fetchServerAddressList();
        new Thread(new Worker(this)).start();
        super.init();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void fetchVipServerAddressIfAbsentOnce(String str) {
        if (TxcConfigHolder.getInstance().lookupVIP(str) == null) {
            String serverAddressVipDataId = TxcString.toServerAddressVipDataId(str);
            try {
                String config = TxcConfigHolder.getInstance().getConfig(serverAddressVipDataId, "TXC_GROUP", 1000L);
                logger.info(serverAddressVipDataId + "=" + config);
                String serverAddressVipVPCDataId = TxcString.toServerAddressVipVPCDataId(str);
                String config2 = TxcConfigHolder.getInstance().getConfig(serverAddressVipVPCDataId, "TXC_GROUP", 1000L);
                logger.info(serverAddressVipVPCDataId + "=" + config2);
                if (config2 != null) {
                    config = config2;
                }
                TxcConfigHolder.getInstance().updateVIPMapping(str, config);
            } catch (IOException e) {
                logger.error(TxcErrCode.DiamondGetConfig.errCode, String.format("get vip failed:%s", str));
            }
        }
    }

    protected void fetchServerAddressList() {
        this.serverAddressList = TxcAtomDataSourceHelper.getServerAddrs();
        Iterator<String> it = this.serverAddressList.iterator();
        while (it.hasNext()) {
            fetchVipServerAddressIfAbsentOnce(it.next());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void reconnect() {
        if (this.serverAddressList != null) {
            for (String str : this.serverAddressList) {
                if (str != null) {
                    try {
                        if (str.length() > 0) {
                            fetchVipServerAddressIfAbsentOnce(str);
                            connect(str);
                        }
                    } catch (Exception e) {
                        logger.error(TxcErrCode.NetConnect.errCode, "can not connect to " + str + " cause:" + e.getMessage(), e);
                    }
                }
            }
        }
    }

    @Override // io.netty.channel.ChannelInboundHandlerAdapter, io.netty.channel.ChannelInboundHandler
    public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
        if (this.messageExecutor.isShutdown()) {
            return;
        }
        logger.info("channel inactive:" + channelHandlerContext.channel());
        releaseChannel(channelHandlerContext.channel(), NetUtil.toStringAddress(channelHandlerContext.channel().remoteAddress()));
        super.channelInactive(channelHandlerContext);
    }

    @Override // com.taobao.txc.rpc.impl.RpcEndpoint
    public void destroy() {
        super.destroy();
        this.eventloopGroup.shutdownGracefully();
    }

    @Override // com.taobao.txc.rpc.impl.RpcEndpoint
    public void dispatch(long j, ChannelHandlerContext channelHandlerContext, Object obj) {
        String stringAddress = NetUtil.toStringAddress(channelHandlerContext.channel().remoteAddress());
        String lookupRIP = TxcConfigHolder.getInstance().lookupRIP(stringAddress);
        if (lookupRIP != null) {
            stringAddress = lookupRIP;
        }
        this.clientMessageListener.onMessage(j, stringAddress, obj);
    }

    @Override // com.taobao.txc.rpc.api.TxcClientMessageSender
    public Object invoke(Object obj, long j) throws IOException, TimeoutException {
        return super.invoke(TxcXID.getServerAddress(TxcContext.getCurrentXid()), getTargetServerChannel(), obj, j);
    }

    @Override // com.taobao.txc.rpc.api.TxcClientMessageSender
    public Object invoke(String str, Object obj, long j) throws IOException, TimeoutException {
        return super.invoke(str, connect(str), obj, j);
    }

    private Channel getTargetServerChannel() {
        if (!TxcContext.inTxcTransaction()) {
            return null;
        }
        String serverAddress = TxcXID.getServerAddress(TxcContext.getCurrentXid());
        fetchVipServerAddressIfAbsentOnce(serverAddress);
        return connect(serverAddress);
    }

    @Override // com.taobao.txc.rpc.api.TxcClientMessageSender
    public Object invoke(Object obj) throws IOException, TimeoutException {
        return invoke(obj, 30000L);
    }

    public short getRmType() {
        return (short) CommitMode.COMMIT_IN_PHASE1.getValue();
    }

    @Override // com.taobao.txc.rpc.impl.RpcEndpoint, io.netty.channel.ChannelInboundHandlerAdapter, io.netty.channel.ChannelInboundHandler
    public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
        if ((obj instanceof RpcMessage) && ((RpcMessage) obj).getBody() == HeartbeatMessage.PONG) {
            if (logger.isDebugEnabled()) {
                logger.debug("received PONG from " + channelHandlerContext.channel().remoteAddress());
                return;
            }
            return;
        }
        if (!(((RpcMessage) obj).getBody() instanceof TxcMergeResultMessage)) {
            super.channelRead(channelHandlerContext, obj);
            return;
        }
        TxcMergeResultMessage txcMergeResultMessage = (TxcMergeResultMessage) ((RpcMessage) obj).getBody();
        TxcMergeMessage txcMergeMessage = (TxcMergeMessage) this.mergeMsgMap.remove(Long.valueOf(((RpcMessage) obj).getId()));
        logger.info("received merge msg:" + ((RpcMessage) obj).getId() + ", exist in map:" + txcMergeMessage + ",origin rpc msg:" + ((RpcMessage) obj).getBody());
        int size = txcMergeMessage.msgs.size();
        for (int i = 0; i < size; i++) {
            long longValue = txcMergeMessage.msgIds.get(i).longValue();
            MessageFuture remove = this.futures.remove(Long.valueOf(longValue));
            if (remove == null) {
                logger.info("msg:" + longValue + " is not found in futures.");
            } else {
                remove.setResultMessage(txcMergeResultMessage.getMsgs()[i]);
            }
        }
    }

    @Override // io.netty.channel.ChannelInboundHandlerAdapter, io.netty.channel.ChannelInboundHandler
    public void userEventTriggered(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
        if (obj instanceof IdleStateEvent) {
            IdleStateEvent idleStateEvent = (IdleStateEvent) obj;
            if (idleStateEvent == IdleStateEvent.READER_IDLE_STATE_EVENT) {
                if (this instanceof MtRmRpcClient) {
                    logger.info("MtRmRpcClient channel" + channelHandlerContext.channel() + " idle.");
                } else {
                    logger.info("RmRpcClient channel" + channelHandlerContext.channel() + " idle.");
                }
                try {
                    channelHandlerContext.disconnect();
                    channelHandlerContext.close();
                    TxcConstants.removeChannelVersion(channelHandlerContext.channel());
                } catch (Exception e) {
                }
            }
            if (idleStateEvent == IdleStateEvent.WRITER_IDLE_STATE_EVENT) {
                try {
                    sendRequest(channelHandlerContext.channel(), HeartbeatMessage.PING);
                } catch (Throwable th) {
                    logger.error("", "send request error", th);
                }
            }
        }
    }

    private void releaseChannel(Channel channel, String str) {
        try {
            Object obj = this.channelLocks.get(str);
            if (obj == null) {
                this.channelLocks.putIfAbsent(str, new Object());
                obj = this.channelLocks.get(str);
            }
            synchronized (obj) {
                TxcChannel txcChannel = this.channels.get(str);
                if (txcChannel != null && txcChannel.channel.compareTo(channel) == 0) {
                    this.channels.remove(str);
                    try {
                        logger.info("release channel:" + channel);
                        channel.disconnect();
                        channel.close();
                        TxcConstants.removeChannelVersion(channel);
                    } catch (Throwable th) {
                        logger.error("", "channel close error", th);
                    }
                }
            }
        } catch (Exception e) {
            logger.error("", "close not active channel", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Channel connect(String str) {
        InetSocketAddress inetSocketAddress;
        String dbKeysFromSet;
        TxcChannel txcChannel = this.channels.get(str);
        if (txcChannel != null) {
            if (txcChannel.channel.isActive()) {
                return txcChannel.channel;
            }
            int i = 0;
            while (i < 1000) {
                try {
                    Thread.sleep(10L);
                } catch (InterruptedException e) {
                }
                txcChannel = this.channels.get(str);
                if (txcChannel == null) {
                    break;
                }
                if (txcChannel.channel.isActive()) {
                    return txcChannel.channel;
                }
                i++;
            }
            if (i == 1000) {
                logger.warn("channel " + txcChannel.channel + " is not active after long wait, close it.");
                releaseChannel(txcChannel.channel, str);
            }
        }
        logger.info("will connect to " + str);
        Object obj = this.channelLocks.get(str);
        if (obj == null) {
            this.channelLocks.putIfAbsent(str, new Object());
            obj = this.channelLocks.get(str);
        }
        synchronized (obj) {
            TxcChannel txcChannel2 = this.channels.get(str);
            if (txcChannel2 != null && txcChannel2.channel.isActive()) {
                return txcChannel2.channel;
            }
            String findVip = findVip(str);
            if (findVip == null || findVip.isEmpty()) {
                inetSocketAddress = NetUtil.toInetSocketAddress(str);
            } else {
                inetSocketAddress = NetUtil.toInetSocketAddress(findVip);
                logger.info(String.format("vip: %s ==> %s", str, findVip));
            }
            logger.info("connect to " + str + " via VIP " + findVip);
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(this.eventloopGroup).channel(NioSocketChannel.class).remoteAddress(inetSocketAddress).option(ChannelOption.TCP_NODELAY, true).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000).option(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.SO_REUSEADDR, true).handler(new ChannelInitializer<SocketChannel>() { // from class: com.taobao.txc.resourcemanager.RmRpcClient.3
                @Override // io.netty.channel.ChannelInitializer
                public void initChannel(SocketChannel socketChannel) throws Exception {
                    socketChannel.pipeline().addLast(new IdleStateHandler(15, 5, 0), new TxcMessageCodec(), RmRpcClient.this);
                }
            });
            ChannelFuture connect = bootstrap.connect();
            try {
                connect.await(15L, TimeUnit.SECONDS);
                if (connect.isCancelled()) {
                    howAboutTryingDirectConnect(findVip);
                    throw new TxcException("connect concelled, can not connect to txc server.");
                }
                if (!connect.isSuccess()) {
                    howAboutTryingDirectConnect(findVip);
                    throw new TxcException("connect failed, can not connect to txc server.");
                }
                Channel channel = connect.channel();
                try {
                    String dbKeysFromSet2 = this.customerKeys == null ? TxcAtomDataSourceHelper.getDbKeysFromSet() : this.customerKeys;
                    logger.info("RM will register dbkey:" + dbKeysFromSet2);
                    RegisterRmMessage registerRmMessage = new RegisterRmMessage(dbKeysFromSet2);
                    registerRmMessage.setAppName(this.appName);
                    registerRmMessage.setType(getRmType());
                    Object invoke = super.invoke(null, channel, registerRmMessage, 30000L);
                    if (invoke == null || !(invoke instanceof RegisterRmResultMessage)) {
                        if (channel != null) {
                            channel.close();
                        }
                        throw new TxcException(ResultCode.SYSTEMERROR.getValue(), "can not register RM.");
                    }
                    if (!((RegisterRmResultMessage) invoke).isResult()) {
                        logger.info("register RM failed. server version:" + ((RegisterRmResultMessage) invoke).getVersion());
                        if (channel != null) {
                            channel.close();
                        }
                        throw new TxcException(ResultCode.SYSTEMERROR.getValue(), "register RM failed.");
                    }
                    logger.info("register RM sucesss. server version:" + ((RegisterRmResultMessage) invoke).getVersion() + ",channel:" + channel);
                    if (this.customerKeys == null) {
                        synchronized (TxcAtomDataSourceHelper.class) {
                            this.channels.put(str, new TxcChannel(channel, TransactionMode.TXC_AT));
                            dbKeysFromSet = TxcAtomDataSourceHelper.getDbKeysFromSet();
                        }
                        if (!dbKeysFromSet2.equals(dbKeysFromSet)) {
                            sendRegisterMessage(str, channel, dbKeysFromSet);
                        }
                    } else {
                        this.channels.put(str, new TxcChannel(channel, TransactionMode.TXC_MT));
                    }
                    return channel;
                } catch (Exception e2) {
                    logger.error(TxcErrCode.RegistRM.errCode, "register RM failed.", e2);
                    if (channel != null) {
                        channel.close();
                    }
                    throw new TxcException(ResultCode.SYSTEMERROR.getValue(), "can not register RM.");
                }
            } catch (Exception e3) {
                howAboutTryingDirectConnect(findVip);
                throw new TxcException(e3, TxcConstants.CAN_NOT_CONNECT_TO_TXC_SERVER);
            }
        }
    }

    @Override // com.taobao.txc.rpc.impl.RpcEndpoint, io.netty.channel.ChannelInboundHandlerAdapter, io.netty.channel.ChannelHandlerAdapter, io.netty.channel.ChannelHandler, io.netty.channel.ChannelInboundHandler
    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) throws Exception {
        logger.error(TxcErrCode.ExceptionCaught.errCode, NetUtil.toStringAddress(channelHandlerContext.channel().remoteAddress()) + "connect exception. " + th.getMessage(), th);
        Iterator<Map.Entry<String, TxcChannel>> it = this.channels.entrySet().iterator();
        while (it.hasNext()) {
            if (it.next().getValue().channel.compareTo(channelHandlerContext.channel()) == 0) {
                it.remove();
                logger.info("remove channel:" + channelHandlerContext.channel());
            }
        }
        super.exceptionCaught(channelHandlerContext, th);
    }

    private void sendRegisterMessage(String str, Channel channel, String str2) {
        RegisterRmMessage registerRmMessage = new RegisterRmMessage(str2);
        registerRmMessage.setAppName(this.appName);
        registerRmMessage.setType(getRmType());
        try {
            super.invoke(null, channel, registerRmMessage, 0L, false);
        } catch (TxcException e) {
            if (e.getMessage() == null || !e.getMessage().contains(TxcConstants.CHANNEL_NOT_WRITABLE) || str == null) {
                logger.error("", "register failed", e);
            } else {
                this.channels.remove(str);
                logger.info("remove channel:" + channel);
            }
        } catch (IOException e2) {
        } catch (TimeoutException e3) {
        }
    }

    public void registerNewDbKey(String str) {
        logger.info("registerNewDbKey dbKey:" + str);
        synchronized (TxcAtomDataSourceHelper.class) {
        }
        for (Map.Entry<String, TxcChannel> entry : this.channels.entrySet()) {
            String key = entry.getKey();
            TxcChannel value = entry.getValue();
            if (value.mode.getValue() == TransactionMode.TXC_AT.getValue()) {
                logger.info("registerNewDbKey dbKey:" + str);
                sendRegisterMessage(key, value.channel, str);
            }
        }
    }

    public ClientMessageListener getClientMessageListener() {
        return this.clientMessageListener;
    }

    @Override // com.taobao.txc.resourcemanager.IRmRpcClient
    public void setClientMessageListener(ClientMessageListener clientMessageListener) {
        this.clientMessageListener = clientMessageListener;
    }

    @Override // com.taobao.txc.resourcemanager.IRmRpcClient, com.taobao.txc.rpc.api.TxcClientMessageSender
    public void sendResponse(long j, String str, Object obj) {
        logger.info("RmRpcClient sendResponse " + obj);
        super.sendResponse(j, connect(str), obj);
    }

    public void setAppName(String str) {
        this.appName = str;
    }

    public String getCustomerKeys() {
        return this.customerKeys;
    }

    public void setCustomerKeys(String str) {
        this.customerKeys = str;
    }
}
