package com.selectdb.flink.sink.writer;

import com.selectdb.flink.cfg.SelectdbExecutionOptions;
import com.selectdb.flink.cfg.SelectdbOptions;
import com.selectdb.flink.exception.CopyLoadException;
import com.selectdb.flink.exception.SelectdbRuntimeException;
import com.selectdb.flink.shaded.com.fasterxml.jackson.databind.ObjectMapper;
import com.selectdb.flink.sink.HttpPutBuilder;
import com.selectdb.flink.sink.HttpUtil;
import com.selectdb.flink.utils.BackoffAndRetryUtils;
import java.io.IOException;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.util.Preconditions;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/selectdb/flink/sink/writer/SelectdbStageLoad.class */
public class SelectdbStageLoad implements Serializable {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(SelectdbStageLoad.class);
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
    private final LabelGenerator labelGenerator;
    private final byte[] lineDelimiter;
    private static final String UPLOAD_URL_PATTERN = "http://%s/copy/upload";
    private String uploadUrl;
    private String hostPort;
    private final String username;
    private final String password;
    private final String db;
    private final String table;
    private final Properties loadProps;
    private RecordBuffer buffer;
    private long currentCheckpointID;
    private AtomicInteger fileNum;
    private SelectdbExecutionOptions executionOptions;
    private ExecutorService loadExecutorService;
    private StageLoadAsyncExecutor loadAsyncExecutor;
    private BlockingQueue<RecordBuffer> writeQueue;
    private BlockingQueue<RecordBuffer> readQueue;
    private final AtomicBoolean started;
    private List<String> fileList = new CopyOnWriteArrayList();
    private AtomicReference<Throwable> exception = new AtomicReference<>(null);
    private HttpClientBuilder httpClientBuilder = new HttpUtil().getHttpClientBuilder();

    /* loaded from: input_file:com/selectdb/flink/sink/writer/SelectdbStageLoad$DefaultThreadFactory.class */
    static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        DefaultThreadFactory(String str) {
            this.namePrefix = "pool-" + poolNumber.getAndIncrement() + "-" + str + "-";
        }

        @Override // java.util.concurrent.ThreadFactory
        public Thread newThread(Runnable runnable) {
            Thread thread = new Thread(runnable, this.namePrefix + this.threadNumber.getAndIncrement());
            thread.setDaemon(false);
            return thread;
        }
    }

    /* loaded from: input_file:com/selectdb/flink/sink/writer/SelectdbStageLoad$StageLoadAsyncExecutor.class */
    class StageLoadAsyncExecutor implements Runnable {
        StageLoadAsyncExecutor() {
        }

        @Override // java.lang.Runnable
        public void run() {
            SelectdbStageLoad.LOG.info("StageLoadAsyncExecutor start");
            while (SelectdbStageLoad.this.started.get()) {
                RecordBuffer recordBuffer = null;
                try {
                    try {
                        recordBuffer = (RecordBuffer) SelectdbStageLoad.this.readQueue.poll(2000L, TimeUnit.MILLISECONDS);
                        if (recordBuffer != null) {
                            if (recordBuffer.getFileName() != null) {
                                uploadToStorage(recordBuffer.getFileName(), recordBuffer);
                                SelectdbStageLoad.this.fileList.add(recordBuffer.getFileName());
                            }
                            if (recordBuffer != null) {
                                recordBuffer.clear();
                                SelectdbStageLoad.this.putRecordToWriteQueue(recordBuffer);
                            }
                        } else if (recordBuffer != null) {
                            recordBuffer.clear();
                            SelectdbStageLoad.this.putRecordToWriteQueue(recordBuffer);
                        }
                    } catch (Exception e) {
                        SelectdbStageLoad.LOG.error("worker running error", e);
                        SelectdbStageLoad.this.exception.set(e);
                        if (recordBuffer != null) {
                            recordBuffer.clear();
                            SelectdbStageLoad.this.putRecordToWriteQueue(recordBuffer);
                        }
                    }
                } catch (Throwable th) {
                    if (recordBuffer != null) {
                        recordBuffer.clear();
                        SelectdbStageLoad.this.putRecordToWriteQueue(recordBuffer);
                    }
                    throw th;
                }
            }
            SelectdbStageLoad.LOG.info("StageLoadAsyncExecutor stop");
        }

        public void uploadToStorage(String str, RecordBuffer recordBuffer) throws IOException {
            long currentTimeMillis = System.currentTimeMillis();
            SelectdbStageLoad.LOG.info("file write started for {}", str);
            String uploadAddress = getUploadAddress(str);
            long currentTimeMillis2 = System.currentTimeMillis();
            SelectdbStageLoad.LOG.info("redirect to internalStage address:{}, cost {} ms", uploadAddress, Long.valueOf(currentTimeMillis2 - currentTimeMillis));
            SelectdbStageLoad.LOG.info("upload file {} finished, record {} size {}, cost {}ms, with requestId {}", new Object[]{str, Integer.valueOf(recordBuffer.getNumOfRecords()), Integer.valueOf(recordBuffer.getBufferSizeBytes()), Long.valueOf(System.currentTimeMillis() - currentTimeMillis2), uploadToInternalStage(uploadAddress, recordBuffer.getData())});
        }

        public String uploadToInternalStage(String str, ByteBuffer byteBuffer) throws CopyLoadException {
            ByteArrayEntity byteArrayEntity = new ByteArrayEntity(byteBuffer.array(), byteBuffer.arrayOffset(), byteBuffer.limit());
            HttpPutBuilder httpPutBuilder = new HttpPutBuilder();
            httpPutBuilder.setUrl(str).addCommonHeader().setEntity(byteArrayEntity);
            HttpPut build = httpPutBuilder.build();
            try {
                return String.valueOf(BackoffAndRetryUtils.backoffAndRetry(BackoffAndRetryUtils.LoadOperation.UPLOAD_FILE, () -> {
                    CloseableHttpResponse execute = SelectdbStageLoad.this.httpClientBuilder.build().execute((HttpUriRequest) build);
                    Throwable th = null;
                    try {
                        int statusCode = execute.getStatusLine().getStatusCode();
                        String requestId = getRequestId(execute.getAllHeaders());
                        if (statusCode != 200 || execute.getEntity() == null) {
                            throw new CopyLoadException("upload file error: " + execute.getStatusLine().toString() + ", with requestId " + requestId);
                        }
                        String entityUtils = EntityUtils.toString(execute.getEntity());
                        if (entityUtils == null || entityUtils.isEmpty()) {
                            return requestId;
                        }
                        SelectdbStageLoad.LOG.error("upload file failed, requestId is {}, response result: {}", requestId, entityUtils);
                        throw new CopyLoadException("upload file failed: " + execute.getStatusLine().toString() + ", with requestId " + requestId);
                    } finally {
                        if (execute != null) {
                            if (0 != 0) {
                                try {
                                    execute.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                execute.close();
                            }
                        }
                    }
                }));
            } catch (Exception e) {
                SelectdbStageLoad.LOG.error("Failed to upload data to internal stage ", e);
                throw new CopyLoadException("Failed to upload data to internal stage, " + e.getMessage());
            }
        }

        public String getRequestId(Header[] headerArr) {
            if (headerArr == null || headerArr.length == 0) {
                return null;
            }
            for (Header header : headerArr) {
                String name = header.getName();
                if (name != null && name.toLowerCase().matches("x-\\S+-request-id")) {
                    return name + ":" + header.getValue();
                }
            }
            return null;
        }

        public String getUploadAddress(String str) throws CopyLoadException {
            HttpPutBuilder httpPutBuilder = new HttpPutBuilder();
            httpPutBuilder.setUrl(SelectdbStageLoad.this.uploadUrl).addFileName(str).addCommonHeader().setEmptyEntity().baseAuth(SelectdbStageLoad.this.username, SelectdbStageLoad.this.password);
            try {
                Object backoffAndRetry = BackoffAndRetryUtils.backoffAndRetry(BackoffAndRetryUtils.LoadOperation.GET_INTERNAL_STAGE_ADDRESS, () -> {
                    CloseableHttpResponse execute = SelectdbStageLoad.this.httpClientBuilder.build().execute((HttpUriRequest) httpPutBuilder.build());
                    Throwable th = null;
                    try {
                        int statusCode = execute.getStatusLine().getStatusCode();
                        String reasonPhrase = execute.getStatusLine().getReasonPhrase();
                        if (statusCode != 307) {
                            HttpEntity entity = execute.getEntity();
                            SelectdbStageLoad.LOG.error("Failed to get internalStage address, status {}, reason {}, response {}", new Object[]{Integer.valueOf(statusCode), reasonPhrase, entity == null ? null : EntityUtils.toString(entity)});
                            throw new CopyLoadException("Failed get internalStage address");
                        }
                        String value = execute.getFirstHeader("location").getValue();
                        if (execute != null) {
                            if (0 != 0) {
                                try {
                                    execute.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                execute.close();
                            }
                        }
                        return value;
                    } catch (Throwable th3) {
                        if (execute != null) {
                            if (0 != 0) {
                                try {
                                    execute.close();
                                } catch (Throwable th4) {
                                    th.addSuppressed(th4);
                                }
                            } else {
                                execute.close();
                            }
                        }
                        throw th3;
                    }
                });
                Preconditions.checkNotNull(backoffAndRetry, "internalStage address is null");
                return backoffAndRetry.toString();
            } catch (Exception e) {
                SelectdbStageLoad.LOG.error("Get internalStage address error,", e);
                throw new CopyLoadException("Get internalStage address error, " + e.getMessage());
            }
        }
    }

    public SelectdbStageLoad(SelectdbOptions selectdbOptions, SelectdbExecutionOptions selectdbExecutionOptions, LabelGenerator labelGenerator) {
        this.hostPort = selectdbOptions.getLoadUrl();
        String[] split = selectdbOptions.getTableIdentifier().split("\\.");
        this.db = split[0];
        this.table = split[1];
        this.username = selectdbOptions.getUsername();
        this.password = selectdbOptions.getPassword();
        this.labelGenerator = labelGenerator;
        this.uploadUrl = String.format(UPLOAD_URL_PATTERN, this.hostPort);
        this.loadProps = selectdbExecutionOptions.getLoadProps();
        this.lineDelimiter = this.loadProps.getProperty(LoadConstants.LINE_DELIMITER_KEY, "\n").getBytes(StandardCharsets.UTF_8);
        this.fileNum = new AtomicInteger();
        this.executionOptions = selectdbExecutionOptions;
        this.writeQueue = new ArrayBlockingQueue(selectdbExecutionOptions.getFlushQueueSize());
        LOG.info("init RecordBuffer capacity {}, count {}", Integer.valueOf(selectdbExecutionOptions.getBufferSize()), Integer.valueOf(selectdbExecutionOptions.getFlushQueueSize()));
        for (int i = 0; i < selectdbExecutionOptions.getFlushQueueSize(); i++) {
            this.writeQueue.add(new RecordBuffer(this.lineDelimiter, selectdbExecutionOptions.getBufferSize()));
        }
        this.readQueue = new LinkedBlockingDeque();
        this.loadAsyncExecutor = new StageLoadAsyncExecutor();
        this.loadExecutorService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(1), new DefaultThreadFactory("upload-executor"), new ThreadPoolExecutor.AbortPolicy());
        this.started = new AtomicBoolean(true);
        this.loadExecutorService.execute(this.loadAsyncExecutor);
    }

    public String getHostPort() {
        return this.hostPort;
    }

    public List<String> getFileList() {
        return this.fileList;
    }

    public void clearFileList() {
        this.fileNum.set(0);
        this.fileList.clear();
    }

    public void writeRecord(byte[] bArr) throws InterruptedException {
        checkFlushException();
        if (this.buffer == null) {
            this.buffer = takeRecordFromWriteQueue();
        }
        this.buffer.insert(bArr);
        if (this.buffer.getBufferSizeBytes() >= this.executionOptions.getBufferSize() * 0.8d || (this.executionOptions.getBufferCount() != 0 && this.buffer.getNumOfRecords() >= this.executionOptions.getBufferCount())) {
            flush(false);
        }
    }

    public void flush(boolean z) throws InterruptedException {
        checkFlushException();
        if (this.buffer == null) {
            return;
        }
        this.buffer.setFileName(this.labelGenerator.generateLabel(this.currentCheckpointID, this.fileNum.getAndIncrement()));
        this.readQueue.put(this.buffer);
        if (z) {
            waitAsyncLoadFinish();
        }
        this.buffer = null;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void putRecordToWriteQueue(RecordBuffer recordBuffer) {
        try {
            this.writeQueue.put(recordBuffer);
        } catch (InterruptedException e) {
            throw new RuntimeException("Failed to recycle a buffer to queue");
        }
    }

    private RecordBuffer takeRecordFromWriteQueue() {
        checkFlushException();
        try {
            return this.writeQueue.take();
        } catch (InterruptedException e) {
            throw new RuntimeException("Failed to take a buffer from queue");
        }
    }

    private void checkFlushException() {
        if (this.exception.get() != null) {
            throw new SelectdbRuntimeException(this.exception.get());
        }
    }

    private void waitAsyncLoadFinish() throws InterruptedException {
        for (int i = 0; i < this.executionOptions.getFlushQueueSize() + 1; i++) {
            this.readQueue.put(takeRecordFromWriteQueue());
        }
    }

    public void close() {
        this.loadExecutorService.shutdown();
        this.started.set(false);
        this.writeQueue.clear();
        this.readQueue.clear();
    }

    public void setCurrentCheckpointID(long j) {
        this.currentCheckpointID = j;
    }

    @VisibleForTesting
    public void setHttpClientBuilder(HttpClientBuilder httpClientBuilder) {
        this.httpClientBuilder = httpClientBuilder;
    }
}
