/*
 * Decompiled with CFR 0.152.
 */
package com.selectdb.flink.sink.writer;

import com.selectdb.flink.cfg.SelectdbExecutionOptions;
import com.selectdb.flink.cfg.SelectdbOptions;
import com.selectdb.flink.exception.CopyLoadException;
import com.selectdb.flink.exception.SelectdbRuntimeException;
import com.selectdb.flink.shaded.com.fasterxml.jackson.databind.ObjectMapper;
import com.selectdb.flink.sink.HttpPutBuilder;
import com.selectdb.flink.sink.HttpUtil;
import com.selectdb.flink.sink.writer.LabelGenerator;
import com.selectdb.flink.sink.writer.RecordBuffer;
import com.selectdb.flink.utils.BackoffAndRetryUtils;
import java.io.IOException;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.util.Preconditions;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SelectdbStageLoad
implements Serializable {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(SelectdbStageLoad.class);
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
    private final LabelGenerator labelGenerator;
    private final byte[] lineDelimiter;
    private static final String UPLOAD_URL_PATTERN = "http://%s/copy/upload";
    private String uploadUrl;
    private String hostPort;
    private final String username;
    private final String password;
    private final String db;
    private final String table;
    private final Properties loadProps;
    private List<String> fileList = new CopyOnWriteArrayList<String>();
    private RecordBuffer buffer;
    private long currentCheckpointID;
    private AtomicInteger fileNum;
    private SelectdbExecutionOptions executionOptions;
    private ExecutorService loadExecutorService;
    private StageLoadAsyncExecutor loadAsyncExecutor;
    private BlockingQueue<RecordBuffer> writeQueue;
    private BlockingQueue<RecordBuffer> readQueue;
    private final AtomicBoolean started;
    private AtomicReference<Throwable> exception = new AtomicReference<Object>(null);
    private HttpClientBuilder httpClientBuilder = new HttpUtil().getHttpClientBuilder();

    public SelectdbStageLoad(SelectdbOptions selectdbOptions, SelectdbExecutionOptions executionOptions, LabelGenerator labelGenerator) {
        this.hostPort = selectdbOptions.getLoadUrl();
        String[] tableInfo = selectdbOptions.getTableIdentifier().split("\\.");
        this.db = tableInfo[0];
        this.table = tableInfo[1];
        this.username = selectdbOptions.getUsername();
        this.password = selectdbOptions.getPassword();
        this.labelGenerator = labelGenerator;
        this.uploadUrl = String.format(UPLOAD_URL_PATTERN, this.hostPort);
        this.loadProps = executionOptions.getLoadProps();
        this.lineDelimiter = this.loadProps.getProperty("file.line_delimiter", "\n").getBytes(StandardCharsets.UTF_8);
        this.fileNum = new AtomicInteger();
        this.executionOptions = executionOptions;
        this.writeQueue = new ArrayBlockingQueue<RecordBuffer>(executionOptions.getFlushQueueSize());
        LOG.info("init RecordBuffer capacity {}, count {}", (Object)executionOptions.getBufferSize(), (Object)executionOptions.getFlushQueueSize());
        for (int index = 0; index < executionOptions.getFlushQueueSize(); ++index) {
            this.writeQueue.add(new RecordBuffer(this.lineDelimiter, executionOptions.getBufferSize()));
        }
        this.readQueue = new LinkedBlockingDeque<RecordBuffer>();
        this.loadAsyncExecutor = new StageLoadAsyncExecutor();
        this.loadExecutorService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(1), new DefaultThreadFactory("upload-executor"), new ThreadPoolExecutor.AbortPolicy());
        this.started = new AtomicBoolean(true);
        this.loadExecutorService.execute(this.loadAsyncExecutor);
    }

    public String getHostPort() {
        return this.hostPort;
    }

    public List<String> getFileList() {
        return this.fileList;
    }

    public void clearFileList() {
        this.fileNum.set(0);
        this.fileList.clear();
    }

    public void writeRecord(byte[] record) throws InterruptedException {
        this.checkFlushException();
        if (this.buffer == null) {
            this.buffer = this.takeRecordFromWriteQueue();
        }
        this.buffer.insert(record);
        if ((double)this.buffer.getBufferSizeBytes() >= (double)this.executionOptions.getBufferSize() * 0.8 || this.executionOptions.getBufferCount() != 0 && this.buffer.getNumOfRecords() >= this.executionOptions.getBufferCount()) {
            this.flush(false);
        }
    }

    public void flush(boolean waitUtilDone) throws InterruptedException {
        this.checkFlushException();
        if (this.buffer == null) {
            return;
        }
        String fileName = this.labelGenerator.generateLabel(this.currentCheckpointID, this.fileNum.getAndIncrement());
        this.buffer.setFileName(fileName);
        RecordBuffer tmpBuff = this.buffer;
        this.readQueue.put(tmpBuff);
        if (waitUtilDone) {
            this.waitAsyncLoadFinish();
        }
        this.buffer = null;
    }

    private void putRecordToWriteQueue(RecordBuffer buffer) {
        try {
            this.writeQueue.put(buffer);
        }
        catch (InterruptedException e) {
            throw new RuntimeException("Failed to recycle a buffer to queue");
        }
    }

    private RecordBuffer takeRecordFromWriteQueue() {
        this.checkFlushException();
        try {
            return this.writeQueue.take();
        }
        catch (InterruptedException e) {
            throw new RuntimeException("Failed to take a buffer from queue");
        }
    }

    private void checkFlushException() {
        if (this.exception.get() != null) {
            throw new SelectdbRuntimeException(this.exception.get());
        }
    }

    private void waitAsyncLoadFinish() throws InterruptedException {
        for (int i = 0; i < this.executionOptions.getFlushQueueSize() + 1; ++i) {
            RecordBuffer empty = this.takeRecordFromWriteQueue();
            this.readQueue.put(empty);
        }
    }

    public void close() {
        this.loadExecutorService.shutdown();
        this.started.set(false);
        this.writeQueue.clear();
        this.readQueue.clear();
    }

    public void setCurrentCheckpointID(long currentCheckpointID) {
        this.currentCheckpointID = currentCheckpointID;
    }

    @VisibleForTesting
    public void setHttpClientBuilder(HttpClientBuilder httpClientBuilder) {
        this.httpClientBuilder = httpClientBuilder;
    }

    static class DefaultThreadFactory
    implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        DefaultThreadFactory(String name) {
            this.namePrefix = "pool-" + poolNumber.getAndIncrement() + "-" + name + "-";
        }

        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r, this.namePrefix + this.threadNumber.getAndIncrement());
            t.setDaemon(false);
            return t;
        }
    }

    class StageLoadAsyncExecutor
    implements Runnable {
        StageLoadAsyncExecutor() {
        }

        @Override
        public void run() {
            LOG.info("StageLoadAsyncExecutor start");
            while (SelectdbStageLoad.this.started.get()) {
                RecordBuffer buffer = null;
                try {
                    buffer = (RecordBuffer)SelectdbStageLoad.this.readQueue.poll(2000L, TimeUnit.MILLISECONDS);
                    if (buffer == null || buffer.getFileName() == null) continue;
                    this.uploadToStorage(buffer.getFileName(), buffer);
                    SelectdbStageLoad.this.fileList.add(buffer.getFileName());
                }
                catch (Exception e) {
                    LOG.error("worker running error", (Throwable)e);
                    SelectdbStageLoad.this.exception.set(e);
                    break;
                }
                finally {
                    if (buffer == null) continue;
                    buffer.clear();
                    SelectdbStageLoad.this.putRecordToWriteQueue(buffer);
                }
            }
            LOG.info("StageLoadAsyncExecutor stop");
        }

        public void uploadToStorage(String fileName, RecordBuffer buffer) throws IOException {
            long start = System.currentTimeMillis();
            LOG.info("file write started for {}", (Object)fileName);
            String address = this.getUploadAddress(fileName);
            long addressTs = System.currentTimeMillis();
            LOG.info("redirect to internalStage address:{}, cost {} ms", (Object)address, (Object)(addressTs - start));
            String requestId = this.uploadToInternalStage(address, buffer.getData());
            LOG.info("upload file {} finished, record {} size {}, cost {}ms, with requestId {}", new Object[]{fileName, buffer.getNumOfRecords(), buffer.getBufferSizeBytes(), System.currentTimeMillis() - addressTs, requestId});
        }

        public String uploadToInternalStage(String address, ByteBuffer data) throws CopyLoadException {
            ByteArrayEntity entity = new ByteArrayEntity(data.array(), data.arrayOffset(), data.limit());
            HttpPutBuilder putBuilder = new HttpPutBuilder();
            putBuilder.setUrl(address).addCommonHeader().setEntity(entity);
            HttpPut httpPut = putBuilder.build();
            try {
                Object result = BackoffAndRetryUtils.backoffAndRetry(BackoffAndRetryUtils.LoadOperation.UPLOAD_FILE, () -> {
                    Throwable throwable = null;
                    try (CloseableHttpResponse response = SelectdbStageLoad.this.httpClientBuilder.build().execute(httpPut);){
                        int statusCode = response.getStatusLine().getStatusCode();
                        String requestId = this.getRequestId(response.getAllHeaders());
                        if (statusCode != 200 || response.getEntity() == null) throw new CopyLoadException("upload file error: " + response.getStatusLine().toString() + ", with requestId " + requestId);
                        String loadResult = EntityUtils.toString(response.getEntity());
                        if (loadResult == null || loadResult.isEmpty()) {
                            String string = requestId;
                            return string;
                        }
                        try {
                            LOG.error("upload file failed, requestId is {}, response result: {}", (Object)requestId, (Object)loadResult);
                            throw new CopyLoadException("upload file failed: " + response.getStatusLine().toString() + ", with requestId " + requestId);
                        }
                        catch (Throwable throwable2) {
                            throwable = throwable2;
                            throw throwable2;
                        }
                    }
                });
                return String.valueOf(result);
            }
            catch (Exception ex) {
                LOG.error("Failed to upload data to internal stage ", (Throwable)ex);
                throw new CopyLoadException("Failed to upload data to internal stage, " + ex.getMessage());
            }
        }

        public String getRequestId(Header[] headers) {
            if (headers == null || headers.length == 0) {
                return null;
            }
            for (int i = 0; i < headers.length; ++i) {
                Header header = headers[i];
                String name = header.getName();
                if (name == null || !name.toLowerCase().matches("x-\\S+-request-id")) continue;
                return name + ":" + header.getValue();
            }
            return null;
        }

        public String getUploadAddress(String fileName) throws CopyLoadException {
            HttpPutBuilder putBuilder = new HttpPutBuilder();
            putBuilder.setUrl(SelectdbStageLoad.this.uploadUrl).addFileName(fileName).addCommonHeader().setEmptyEntity().baseAuth(SelectdbStageLoad.this.username, SelectdbStageLoad.this.password);
            try {
                Object address = BackoffAndRetryUtils.backoffAndRetry(BackoffAndRetryUtils.LoadOperation.GET_INTERNAL_STAGE_ADDRESS, () -> {
                    Throwable throwable = null;
                    try (CloseableHttpResponse execute = SelectdbStageLoad.this.httpClientBuilder.build().execute(putBuilder.build());){
                        int statusCode = execute.getStatusLine().getStatusCode();
                        String reason = execute.getStatusLine().getReasonPhrase();
                        if (statusCode == 307) {
                            String uploadAddress;
                            Header location = execute.getFirstHeader("location");
                            String string = uploadAddress = location.getValue();
                            return string;
                        }
                        try {
                            HttpEntity entity = execute.getEntity();
                            String result = entity == null ? null : EntityUtils.toString(entity);
                            LOG.error("Failed to get internalStage address, status {}, reason {}, response {}", new Object[]{statusCode, reason, result});
                            throw new CopyLoadException("Failed get internalStage address");
                        }
                        catch (Throwable throwable2) {
                            throwable = throwable2;
                            throw throwable2;
                        }
                    }
                });
                Preconditions.checkNotNull((Object)address, (String)"internalStage address is null");
                return address.toString();
            }
            catch (Exception e) {
                LOG.error("Get internalStage address error,", (Throwable)e);
                throw new CopyLoadException("Get internalStage address error, " + e.getMessage());
            }
        }
    }
}

