/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.schedulerx.worker.logcollector;

import com.alibaba.schedulerx.common.domain.StreamType;
import com.alibaba.schedulerx.common.util.ConfigUtil;
import com.alibaba.schedulerx.common.util.ExceptionUtil;
import com.alibaba.schedulerx.shade.com.aliyun.openservices.aliyun.log.producer.Callback;
import com.alibaba.schedulerx.shade.com.aliyun.openservices.aliyun.log.producer.LogProducer;
import com.alibaba.schedulerx.shade.com.aliyun.openservices.aliyun.log.producer.Producer;
import com.alibaba.schedulerx.shade.com.aliyun.openservices.aliyun.log.producer.ProducerConfig;
import com.alibaba.schedulerx.shade.com.aliyun.openservices.aliyun.log.producer.ProjectConfig;
import com.alibaba.schedulerx.shade.com.aliyun.openservices.aliyun.log.producer.ProjectConfigs;
import com.alibaba.schedulerx.shade.com.aliyun.openservices.aliyun.log.producer.Result;
import com.alibaba.schedulerx.shade.com.aliyun.openservices.log.common.LogItem;
import com.alibaba.schedulerx.shade.org.apache.commons.configuration.Configuration;
import com.alibaba.schedulerx.worker.SchedulerxWorker;
import com.alibaba.schedulerx.worker.log.LogFactory;
import com.alibaba.schedulerx.worker.log.Logger;
import com.alibaba.schedulerx.worker.logcollector.LogCollector;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class SlsLogCollector
extends LogCollector {
    private static final Logger LOGGER = LogFactory.getLogger(SlsLogCollector.class);
    private final String project;
    private final String logStore;
    private final String endpoint;
    private final String accessKeyId;
    private final String accessKeySecret;
    private final ExecutorService es;
    private final Producer producer;

    public SlsLogCollector() {
        Configuration conf = ConfigUtil.getWorkerConfig();
        this.project = conf.getString("sls.project");
        this.logStore = conf.getString("sls.log.store");
        this.endpoint = conf.getString("sls.endpoint");
        this.accessKeyId = conf.getString("sls.ak");
        this.accessKeySecret = conf.getString("sls.sk");
        int sendThreads = conf.getInt("sls.send.threads", 4);
        this.es = new ThreadPoolExecutor(sendThreads, sendThreads, 30L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(10240), new ThreadFactory(){
            private final AtomicInteger nextId = new AtomicInteger(1);
            private final String namePrefix = "Schedulerx-SLS-Send-Thread-";

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "Schedulerx-SLS-Send-Thread-" + this.nextId.getAndIncrement());
            }
        }, new ThreadPoolExecutor.DiscardPolicy());
        ProjectConfigs projectConfigs = new ProjectConfigs();
        projectConfigs.put(new ProjectConfig(this.project, this.endpoint, this.accessKeyId, this.accessKeySecret));
        ProducerConfig producerConfig = new ProducerConfig(projectConfigs);
        this.producer = new LogProducer(producerConfig);
    }

    @Override
    public void collect(final String key, final String line, final Throwable t, StreamType streamType, boolean isEnd) {
        this.es.submit(new Runnable(){

            @Override
            public void run() {
                try {
                    SlsLogCollector.this.producer.send(SlsLogCollector.this.project, SlsLogCollector.this.logStore, key, SchedulerxWorker.WORKER_ADDR, SlsLogCollector.generateLogItem(line, t), new Callback(){

                        @Override
                        public void onCompletion(Result result) {
                            if (result.isSuccessful()) {
                                LOGGER.debug("");
                            }
                        }
                    });
                }
                catch (Exception e) {
                    LOGGER.error("", e);
                }
            }
        });
    }

    private static LogItem generateLogItem(String line, Throwable t) {
        LogItem logItem = new LogItem();
        logItem.PushBack("message", line);
        if (t != null) {
            logItem.PushBack("trace", ExceptionUtil.getTrace(t));
        }
        return logItem;
    }
}

