/*
 * Decompiled with CFR 0.152.
 */
package com.jcloud.jcq.client.producer.impl;

import com.jcloud.jcq.client.Exception.ClientException;
import com.jcloud.jcq.client.Exception.ClientExceptionCode;
import com.jcloud.jcq.client.common.AbstractClient;
import com.jcloud.jcq.client.common.ClientConfig;
import com.jcloud.jcq.client.producer.AsyncSendCallback;
import com.jcloud.jcq.client.producer.Producer;
import com.jcloud.jcq.client.producer.ProducerConfig;
import com.jcloud.jcq.client.trace.DefaultTraceDispatcherImpl;
import com.jcloud.jcq.common.msg.attribute.CompressType;
import com.jcloud.jcq.common.queue.QueueRouteInfo;
import com.jcloud.jcq.common.service.ServiceState;
import com.jcloud.jcq.common.trace.TracePoint;
import com.jcloud.jcq.common.trace.TraceType;
import com.jcloud.jcq.common.utils.StringUtils;
import com.jcloud.jcq.common.utils.SystemClock;
import com.jcloud.jcq.common.utils.ZipUtils;
import com.jcloud.jcq.communication.protocol.CommunicationType;
import com.jcloud.jcq.protocol.Message;
import com.jcloud.jcq.protocol.MessageUtils;
import com.jcloud.jcq.protocol.Response;
import com.jcloud.jcq.protocol.client.SendMessageRequest;
import com.jcloud.jcq.protocol.client.SendMessageResponse;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;

public class DefaultProducerImpl
extends AbstractClient
implements Producer {
    private String producerId;
    private ProducerConfig producerConfig;

    public DefaultProducerImpl(String accessKey, String secretKey, String producerId, ProducerConfig producerConfig) {
        this.accessKey = accessKey;
        this.secretKey = secretKey;
        this.producerId = producerId;
        this.producerConfig = producerConfig;
        if (producerConfig.isMessageTraceOn()) {
            this.traceDispatcher = new DefaultTraceDispatcherImpl(this);
        }
    }

    @Override
    public String getProducerId() {
        return this.producerId;
    }

    @Override
    public String getInstanceId() {
        return this.producerId;
    }

    @Override
    public ProducerConfig getProducerConfig() {
        return this.producerConfig;
    }

    @Override
    public ClientConfig getClientConfig() {
        return this.producerConfig;
    }

    @Override
    public SendMessageResponse sendMessage(Message message) throws ClientException {
        return this.commonSend(Arrays.asList(message), CommunicationType.SYNC_REQUEST, null, this.producerConfig.getSendTimeout(), -1);
    }

    @Override
    public SendMessageResponse sendMessage(Message message, long timeout) throws ClientException {
        return this.commonSend(Arrays.asList(message), CommunicationType.SYNC_REQUEST, null, timeout, -1);
    }

    @Override
    public SendMessageResponse sendMessage(Message message, long timeout, int queueId) throws ClientException {
        return this.commonSend(Arrays.asList(message), CommunicationType.SYNC_REQUEST, null, timeout, queueId);
    }

    @Override
    public void sendMessageAsync(Message message, AsyncSendCallback callback) throws ClientException {
        this.commonSend(Arrays.asList(message), CommunicationType.ASYNC_REQUEST, callback, this.producerConfig.getSendTimeout(), -1);
    }

    @Override
    public void sendMessageAsync(Message message, AsyncSendCallback callback, long timeout) throws ClientException {
        this.commonSend(Arrays.asList(message), CommunicationType.ASYNC_REQUEST, callback, timeout, -1);
    }

    @Override
    public void sendMessageAsync(Message message, AsyncSendCallback callback, long timeout, int queueId) throws ClientException {
        this.commonSend(Arrays.asList(message), CommunicationType.ASYNC_REQUEST, callback, timeout, queueId);
    }

    @Override
    public void sendMessageOneway(Message message) throws ClientException {
        this.commonSend(Arrays.asList(message), CommunicationType.ONE_WAY_REQUEST, null, this.producerConfig.getSendTimeout(), -1);
    }

    @Override
    public SendMessageResponse sendMessage(List<Message> messages) throws ClientException {
        return this.commonSend(messages, CommunicationType.SYNC_REQUEST, null, this.producerConfig.getSendTimeout(), -1);
    }

    @Override
    public SendMessageResponse sendMessage(List<Message> messages, long timeout) throws ClientException {
        return this.commonSend(messages, CommunicationType.SYNC_REQUEST, null, timeout, -1);
    }

    @Override
    public SendMessageResponse sendMessage(List<Message> messages, long timeout, int queueId) throws ClientException {
        return this.commonSend(messages, CommunicationType.SYNC_REQUEST, null, timeout, queueId);
    }

    @Override
    public void sendMessageAsync(List<Message> messages, AsyncSendCallback callback) throws ClientException {
        this.commonSend(messages, CommunicationType.ASYNC_REQUEST, callback, this.producerConfig.getSendTimeout(), -1);
    }

    @Override
    public void sendMessageAsync(List<Message> messages, AsyncSendCallback callback, long timeout) throws ClientException {
        this.commonSend(messages, CommunicationType.ASYNC_REQUEST, callback, timeout, -1);
    }

    @Override
    public void sendMessageAsync(List<Message> messages, AsyncSendCallback callback, long timeout, int queueId) throws ClientException {
        this.commonSend(messages, CommunicationType.ASYNC_REQUEST, callback, timeout, queueId);
    }

    @Override
    public void sendMessageOneway(List<Message> messages) throws ClientException {
        this.commonSend(messages, CommunicationType.ONE_WAY_REQUEST, null, this.producerConfig.getSendTimeout(), -1);
    }

    @Override
    protected void doBeforeStart() throws ClientException {
    }

    @Override
    protected void doBeforeShutdown() {
    }

    private void checkMessages(List<Message> messages) throws ClientException {
        if (messages == null || messages.isEmpty()) {
            throw new ClientException("messages is empty", ClientExceptionCode.MESSAGE_EMPTY.getCode());
        }
        if (messages.size() > this.producerConfig.getMaxMsgNumsPerBatch()) {
            throw new ClientException(String.format("messages size [%d] is bigger than batch message threshold [%d]", messages.size(), this.producerConfig.getMaxMsgNumsPerBatch()), ClientExceptionCode.MESSAGE_BATCH_SIZE_EXCEEDED.getCode());
        }
        String uniqueTopic = "";
        for (Message message : messages) {
            if (message == null) {
                throw new ClientException("message is null", ClientExceptionCode.MESSAGE_EMPTY.getCode());
            }
            if (StringUtils.isEmpty(message.getTopic())) {
                throw new ClientException("message topic is empty", ClientExceptionCode.MESSAGE_TOPIC_EMPTY.getCode());
            }
            if (!uniqueTopic.equals(message.getTopic())) {
                if (!uniqueTopic.equals("")) {
                    throw new ClientException("topic is not unique in this batch send", ClientExceptionCode.MESSAGE_TOPIC_NOT_UNIQUE_IN_BATCH.getCode());
                }
                uniqueTopic = message.getTopic();
            }
            if (message.getBody() == null || message.getBody().length == 0) {
                throw new ClientException("message body is empty", ClientExceptionCode.MESSAGE_BODY_EMPTY.getCode());
            }
            if (message.getBody().length > this.producerConfig.getMaxMsgSize()) {
                throw new ClientException(String.format("message size [%d] is bigger than message size threshold [%d]", message.getBody().length, this.producerConfig.getMaxMsgSize()), ClientExceptionCode.MESSAGE_SIZE_EXCEEDED.getCode());
            }
            String businessId = message.getProperties().get("BUSINESS_ID");
            if (!StringUtils.isEmpty(businessId) && businessId.length() > 128) {
                throw new ClientException(String.format("message businessId length [%d] too long, should be no more than: [%d].", businessId.length(), 128), ClientExceptionCode.OTHER.getCode());
            }
            try {
                if (!StringUtils.isEmpty(message.getMessageId())) continue;
                MessageUtils.internalSetMessageId(message, UUID.randomUUID().toString());
            }
            catch (Exception e) {
                throw new ClientException("internalSetMessageId error.", ClientExceptionCode.OTHER.getCode());
            }
        }
    }

    private SendMessageResponse commonSend(List<Message> messages, CommunicationType communicationType, final AsyncSendCallback asyncSendCallback, long timeout, int queueId) throws ClientException {
        if (this.state != ServiceState.RUNNING) {
            throw new ClientException(String.format("producer:[%s] is not running, cannot send message", this.producerId), ClientExceptionCode.OPERATION_NOT_SUPPORTED_BY_CURRENT_CLIENT_STATE.getCode());
        }
        this.checkMessages(messages);
        String topic = messages.get(0).getTopic();
        QueueRouteInfo queueRouteInfo = null;
        if (queueId >= 0) {
            queueRouteInfo = this.queueSelector.getQueue(topic, queueId);
            if (queueRouteInfo == null) {
                throw new ClientException(String.format("send message failed, cannot find queue with queueId:%d for topic:%s", queueId, topic), ClientExceptionCode.NOT_FOUND_QUEUE.getCode());
            }
        } else {
            queueRouteInfo = this.selectQueue(topic);
            if (queueRouteInfo == null) {
                throw new ClientException(String.format("send message failed, cannot find queue for topic:%s", topic), ClientExceptionCode.NOT_FOUND_QUEUE.getCode());
            }
        }
        for (Message message : messages) {
            message.setQueueId(queueRouteInfo.getQueueId());
        }
        ArrayList<TracePoint> tracePoints = new ArrayList();
        if (this.isMessageTraceOn()) {
            tracePoints = this.getTracePoints(messages);
        }
        this.tryCompressMessages(messages);
        SendMessageRequest sendMessageRequest = new SendMessageRequest();
        sendMessageRequest.setTopic(topic);
        sendMessageRequest.setMessages(messages);
        sendMessageRequest.setMessageType(this.producerConfig.getMessageType());
        sendMessageRequest.setQueueId(queueRouteInfo.getQueueId());
        SendMessageResponse sendMessageResponse = null;
        switch (communicationType) {
            case SYNC_REQUEST: {
                sendMessageResponse = this.remotingApiWrapper.sync(this, queueRouteInfo.getAddress(), sendMessageRequest, SendMessageResponse.class, timeout <= 0L ? (long)this.producerConfig.getSendTimeout() : timeout);
                break;
            }
            case ASYNC_REQUEST: {
                final ArrayList<TracePoint> finalTracePoints = tracePoints;
                this.remotingApiWrapper.async(this, queueRouteInfo.getAddress(), sendMessageRequest, new AsyncSendCallback(){

                    @Override
                    public void onResponse(SendMessageResponse response) {
                        asyncSendCallback.onResponse(response);
                        DefaultProducerImpl.this.completeAndAppendTracePoints(finalTracePoints, response);
                    }

                    @Override
                    public void onException(Throwable throwable) {
                        asyncSendCallback.onException(throwable);
                    }
                }, SendMessageResponse.class, timeout <= 0L ? (long)this.producerConfig.getSendTimeout() : timeout);
                break;
            }
            case ONE_WAY_REQUEST: {
                this.remotingApiWrapper.oneway(this, queueRouteInfo.getAddress(), sendMessageRequest);
                break;
            }
        }
        if (this.producerConfig.getShowInfoLevelSendMsgLog()) {
            this.logger.info("{} send message finished, topic:{}, requestId:{}, messages number:{}", new Object[]{communicationType.name(), topic, sendMessageRequest.getRequestId(), messages.size()});
        } else if (this.logger.isDebugEnabled()) {
            this.logger.debug("{} send message finished, topic:{}, requestId:{}, messages number:{}", new Object[]{communicationType.name(), topic, sendMessageRequest.getRequestId(), messages.size()});
        }
        if (this.isMessageTraceOn() && sendMessageResponse != null) {
            this.completeAndAppendTracePoints(tracePoints, sendMessageResponse);
        }
        return sendMessageResponse;
    }

    private void tryCompressMessages(List<Message> messages) {
        if (!this.producerConfig.isEnableCompress()) {
            return;
        }
        for (Message message : messages) {
            if (message.getBody() == null || message.getBody().length < this.producerConfig.getCompressMessageThreshold()) continue;
            try {
                message.setBody(ZipUtils.compress(message.getBody()));
                message.setCompressType(CompressType.ZIP);
            }
            catch (IOException e) {
                this.logger.warn("got exception:{} when try compressing message body of message:{}", (Object)e, (Object)message.getMessageId());
            }
        }
    }

    @Override
    public List<TracePoint> getTracePoints(List<Message> messages) {
        ArrayList<TracePoint> tracePoints = new ArrayList<TracePoint>();
        if (!this.isMessageTraceOn() || messages == null || messages.isEmpty()) {
            return tracePoints;
        }
        for (Message message : messages) {
            String delayTimeStr;
            TracePoint tracePoint = new TracePoint(message.getMessageId(), message.getTopic(), message.getQueueId());
            tracePoint.setMessageType(this.producerConfig.getMessageType());
            tracePoint.setTraceType(TraceType.PRODUCE);
            if (message.getProperties().containsKey("BUSINESS_ID")) {
                tracePoint.setBusinessId(message.getProperties().get("BUSINESS_ID"));
            }
            if (!StringUtils.isEmpty(delayTimeStr = message.getProperties().get("DELAY_TIME"))) {
                tracePoint.setDelayTime(Long.parseLong(delayTimeStr));
            }
            tracePoints.add(tracePoint);
        }
        return tracePoints;
    }

    @Override
    public void completeAndAppendTracePoints(List<TracePoint> tracePoints, Response response) {
        if (!this.isMessageTraceOn() || tracePoints == null || tracePoints.isEmpty()) {
            return;
        }
        int costTime = (int)(SystemClock.now() - tracePoints.get(0).getTimeStamp());
        for (TracePoint tracePoint : tracePoints) {
            tracePoint.setRequestId(response.getRequestId());
            tracePoint.setCostTime(costTime);
            tracePoint.setSuccess(response.getResponseCode() == 0);
        }
        this.traceDispatcher.append(tracePoints);
    }

    @Override
    public boolean isMessageTraceOn() {
        return this.producerConfig.isMessageTraceOn();
    }

    @Override
    public boolean isTraceProducer() {
        return this.producerConfig.isForTrace();
    }
}

