/*
 * Decompiled with CFR 0.152.
 */
package com.selectdb.flink.sink.writer;

import com.selectdb.flink.cfg.SelectdbExecutionOptions;
import com.selectdb.flink.cfg.SelectdbOptions;
import com.selectdb.flink.sink.CopySQLBuilder;
import com.selectdb.flink.sink.SelectdbCommittable;
import com.selectdb.flink.sink.writer.LabelGenerator;
import com.selectdb.flink.sink.writer.SelectdbRecordSerializer;
import com.selectdb.flink.sink.writer.SelectdbStageLoad;
import com.selectdb.flink.sink.writer.SelectdbWriterState;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.api.connector.sink.SinkWriter;
import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SelectdbWriter<IN>
implements SinkWriter<IN, SelectdbCommittable, SelectdbWriterState> {
    private static final Logger LOG = LoggerFactory.getLogger(SelectdbWriter.class);
    private final long lastCheckpointId;
    private SelectdbStageLoad selectdbStageLoad;
    volatile boolean loading;
    volatile boolean needCommit;
    private final SelectdbOptions selectdbOptions;
    private final SelectdbExecutionOptions executionOptions;
    private final String labelPrefix;
    private final byte[] lineDelimiter;
    private final LabelGenerator labelGenerator;
    private final SelectdbWriterState selectdbWriterState;
    private final SelectdbRecordSerializer<IN> serializer;

    public SelectdbWriter(Sink.InitContext initContext, List<SelectdbWriterState> state, SelectdbRecordSerializer<IN> serializer, SelectdbOptions selectdbOptions, SelectdbExecutionOptions executionOptions) {
        this.lastCheckpointId = initContext.getRestoredCheckpointId().orElse(0L);
        LOG.info("restore checkpointId {}", (Object)this.lastCheckpointId);
        LOG.info("labelPrefix is {}", (Object)executionOptions.getLabelPrefix());
        this.selectdbWriterState = new SelectdbWriterState(executionOptions.getLabelPrefix());
        this.labelPrefix = executionOptions.getLabelPrefix() + "_" + initContext.getSubtaskId();
        this.lineDelimiter = executionOptions.getLoadProps().getProperty("file.line_delimiter", "\n").getBytes();
        this.labelGenerator = new LabelGenerator(this.labelPrefix);
        this.serializer = serializer;
        this.selectdbOptions = selectdbOptions;
        this.executionOptions = executionOptions;
        this.loading = false;
        this.needCommit = false;
    }

    public void initializeLoad(List<SelectdbWriterState> state) throws IOException {
        this.selectdbStageLoad = new SelectdbStageLoad(this.selectdbOptions, this.executionOptions, this.labelGenerator);
        this.selectdbStageLoad.setCurrentCheckpointID(this.lastCheckpointId + 1L);
        this.serializer.open();
    }

    public void write(IN in, SinkWriter.Context context) throws IOException, InterruptedException {
        byte[] serialize = this.serializer.serialize(in);
        if (Objects.isNull(serialize)) {
            return;
        }
        if (!this.needCommit) {
            this.needCommit = true;
        }
        this.selectdbStageLoad.writeRecord(serialize);
    }

    public List<SelectdbCommittable> prepareCommit(boolean flush) throws IOException, InterruptedException {
        Preconditions.checkState((this.selectdbStageLoad != null ? 1 : 0) != 0);
        if (!this.needCommit) {
            return Collections.emptyList();
        }
        LOG.info("checkpoint arrived, upload buffer to storage");
        this.selectdbStageLoad.flush(true);
        CopySQLBuilder copySQLBuilder = new CopySQLBuilder(this.selectdbOptions, this.executionOptions, this.selectdbStageLoad.getFileList());
        String copySql = copySQLBuilder.buildCopySQL();
        return ImmutableList.of((Object)new SelectdbCommittable(this.selectdbStageLoad.getHostPort(), this.selectdbOptions.getClusterName(), copySql));
    }

    public List<SelectdbWriterState> snapshotState(long checkpointId) throws IOException {
        Preconditions.checkState((this.selectdbStageLoad != null ? 1 : 0) != 0);
        LOG.info("clear the file list {}", this.selectdbStageLoad.getFileList());
        this.selectdbStageLoad.clearFileList();
        this.selectdbStageLoad.setCurrentCheckpointID(checkpointId + 1L);
        this.needCommit = false;
        return Collections.singletonList(this.selectdbWriterState);
    }

    public void close() throws Exception {
        LOG.info("Closing Sink Writer.");
        if (this.selectdbStageLoad != null) {
            this.selectdbStageLoad.close();
        }
        this.serializer.close();
    }

    @VisibleForTesting
    public void setSelectdbStageLoad(SelectdbStageLoad selectdbStageLoad) {
        this.selectdbStageLoad = selectdbStageLoad;
    }
}

