package com.selectdb.flink.sink.writer;

import com.selectdb.flink.cfg.SelectdbExecutionOptions;
import com.selectdb.flink.cfg.SelectdbOptions;
import com.selectdb.flink.sink.CopySQLBuilder;
import com.selectdb.flink.sink.SelectdbCommittable;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.api.connector.sink.SinkWriter;
import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/selectdb/flink/sink/writer/SelectdbWriter.class */
public class SelectdbWriter<IN> implements SinkWriter<IN, SelectdbCommittable, SelectdbWriterState> {
    private static final Logger LOG = LoggerFactory.getLogger(SelectdbWriter.class);
    private final long lastCheckpointId;
    private SelectdbStageLoad selectdbStageLoad;
    volatile boolean loading;
    volatile boolean needCommit;
    private final SelectdbOptions selectdbOptions;
    private final SelectdbExecutionOptions executionOptions;
    private final String labelPrefix;
    private final byte[] lineDelimiter;
    private final LabelGenerator labelGenerator;
    private final SelectdbWriterState selectdbWriterState;
    private final SelectdbRecordSerializer<IN> serializer;

    public SelectdbWriter(Sink.InitContext initContext, List<SelectdbWriterState> list, SelectdbRecordSerializer<IN> selectdbRecordSerializer, SelectdbOptions selectdbOptions, SelectdbExecutionOptions selectdbExecutionOptions) {
        this.lastCheckpointId = initContext.getRestoredCheckpointId().orElse(0L);
        LOG.info("restore checkpointId {}", Long.valueOf(this.lastCheckpointId));
        LOG.info("labelPrefix is {}", selectdbExecutionOptions.getLabelPrefix());
        this.selectdbWriterState = new SelectdbWriterState(selectdbExecutionOptions.getLabelPrefix());
        this.labelPrefix = selectdbExecutionOptions.getLabelPrefix() + "_" + initContext.getSubtaskId();
        this.lineDelimiter = selectdbExecutionOptions.getLoadProps().getProperty(LoadConstants.LINE_DELIMITER_KEY, "\n").getBytes();
        this.labelGenerator = new LabelGenerator(this.labelPrefix);
        this.serializer = selectdbRecordSerializer;
        this.selectdbOptions = selectdbOptions;
        this.executionOptions = selectdbExecutionOptions;
        this.loading = false;
        this.needCommit = false;
    }

    public void initializeLoad(List<SelectdbWriterState> list) throws IOException {
        this.selectdbStageLoad = new SelectdbStageLoad(this.selectdbOptions, this.executionOptions, this.labelGenerator);
        this.selectdbStageLoad.setCurrentCheckpointID(this.lastCheckpointId + 1);
        this.serializer.open();
    }

    public void write(IN in, SinkWriter.Context context) throws IOException, InterruptedException {
        byte[] serialize = this.serializer.serialize(in);
        if (Objects.isNull(serialize)) {
            return;
        }
        if (!this.needCommit) {
            this.needCommit = true;
        }
        this.selectdbStageLoad.writeRecord(serialize);
    }

    public List<SelectdbCommittable> prepareCommit(boolean z) throws IOException, InterruptedException {
        Preconditions.checkState(this.selectdbStageLoad != null);
        if (!this.needCommit) {
            return Collections.emptyList();
        }
        LOG.info("checkpoint arrived, upload buffer to storage");
        this.selectdbStageLoad.flush(true);
        return ImmutableList.of(new SelectdbCommittable(this.selectdbStageLoad.getHostPort(), this.selectdbOptions.getClusterName(), new CopySQLBuilder(this.selectdbOptions, this.executionOptions, this.selectdbStageLoad.getFileList()).buildCopySQL()));
    }

    public List<SelectdbWriterState> snapshotState(long j) throws IOException {
        Preconditions.checkState(this.selectdbStageLoad != null);
        LOG.info("clear the file list {}", this.selectdbStageLoad.getFileList());
        this.selectdbStageLoad.clearFileList();
        this.selectdbStageLoad.setCurrentCheckpointID(j + 1);
        this.needCommit = false;
        return Collections.singletonList(this.selectdbWriterState);
    }

    public void close() throws Exception {
        LOG.info("Closing Sink Writer.");
        if (this.selectdbStageLoad != null) {
            this.selectdbStageLoad.close();
        }
        this.serializer.close();
    }

    @VisibleForTesting
    public void setSelectdbStageLoad(SelectdbStageLoad selectdbStageLoad) {
        this.selectdbStageLoad = selectdbStageLoad;
    }
}
