/*
 * Decompiled with CFR 0.152.
 */
package com.selectdb.flink.sink.writer;

import com.selectdb.flink.cfg.SelectdbExecutionOptions;
import com.selectdb.flink.cfg.SelectdbOptions;
import com.selectdb.flink.connection.JdbcConnectionProvider;
import com.selectdb.flink.connection.SimpleJdbcConnectionProvider;
import com.selectdb.flink.shaded.com.fasterxml.jackson.core.JsonProcessingException;
import com.selectdb.flink.shaded.com.fasterxml.jackson.core.type.TypeReference;
import com.selectdb.flink.shaded.com.fasterxml.jackson.databind.JsonNode;
import com.selectdb.flink.shaded.com.fasterxml.jackson.databind.ObjectMapper;
import com.selectdb.flink.sink.writer.SelectdbRecordSerializer;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JsonDebeziumSchemaSerializer
implements SelectdbRecordSerializer<String> {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(JsonDebeziumSchemaSerializer.class);
    private static final String OP_READ = "r";
    private static final String OP_CREATE = "c";
    private static final String OP_UPDATE = "u";
    private static final String OP_DELETE = "d";
    public static final String EXECUTE_DDL = "ALTER TABLE %s %s COLUMN %s %s";
    public static final String addDropDDLRegex = "ALTER\\s+TABLE\\s+[^\\s]+\\s+(ADD|DROP)\\s+(COLUMN\\s+)?([^\\s]+)(\\s+([^\\s]+))?.*";
    private final Pattern addDropDDLPattern;
    private SelectdbOptions selectdbOptions;
    private ObjectMapper objectMapper = new ObjectMapper();
    private JdbcConnectionProvider connectionProvider;
    private Connection connection;
    private String lineDelimiter = "\n";
    private Boolean ignoreUpdateBefore = true;

    public JsonDebeziumSchemaSerializer(SelectdbOptions selectdbOptions, Pattern pattern) {
        this.selectdbOptions = selectdbOptions;
        this.addDropDDLPattern = pattern == null ? Pattern.compile(addDropDDLRegex, 2) : pattern;
        this.connectionProvider = new SimpleJdbcConnectionProvider(selectdbOptions);
    }

    public JsonDebeziumSchemaSerializer(SelectdbOptions selectdbOptions, Pattern pattern, SelectdbExecutionOptions executionOptions) {
        this(selectdbOptions, pattern);
        if (executionOptions != null) {
            this.lineDelimiter = executionOptions.getLoadProps().getProperty("file.line_delimiter", "\n");
            this.ignoreUpdateBefore = executionOptions.getIgnoreUpdateBefore();
        }
    }

    @Override
    public void open() throws IOException {
        try {
            Connection connection = this.connectionProvider.getOrEstablishConnection();
            Throwable throwable = null;
            if (connection != null) {
                if (throwable != null) {
                    try {
                        connection.close();
                    }
                    catch (Throwable throwable2) {
                        throwable.addSuppressed(throwable2);
                    }
                } else {
                    connection.close();
                }
            }
        }
        catch (Exception e) {
            throw new IOException(e);
        }
    }

    @Override
    public byte[] serialize(String record) throws IOException {
        Map<String, String> valueMap;
        LOG.debug("received debezium json data {} :", (Object)record);
        JsonNode recordRoot = this.objectMapper.readTree(record);
        String op = this.extractJsonNode(recordRoot, "op");
        if (Objects.isNull(op)) {
            this.schemaChange(recordRoot);
            return null;
        }
        if (OP_READ.equals(op) || OP_CREATE.equals(op)) {
            valueMap = this.extractAfterRow(recordRoot);
            this.addDeleteSign(valueMap, false);
        } else {
            if (OP_UPDATE.equals(op)) {
                return this.extractUpdate(recordRoot);
            }
            if (OP_DELETE.equals(op)) {
                valueMap = this.extractBeforeRow(recordRoot);
                this.addDeleteSign(valueMap, true);
            } else {
                LOG.error("parse record fail, unknown op {} in {}", (Object)op, (Object)record);
                return null;
            }
        }
        return this.objectMapper.writeValueAsString(valueMap).getBytes(StandardCharsets.UTF_8);
    }

    private byte[] extractUpdate(JsonNode recordRoot) throws JsonProcessingException {
        StringBuilder updateRow = new StringBuilder();
        if (!this.ignoreUpdateBefore.booleanValue()) {
            Map<String, String> beforeRow = this.extractBeforeRow(recordRoot);
            this.addDeleteSign(beforeRow, true);
            updateRow.append(this.objectMapper.writeValueAsString(beforeRow)).append(this.lineDelimiter);
        }
        Map<String, String> afterRow = this.extractAfterRow(recordRoot);
        this.addDeleteSign(afterRow, false);
        updateRow.append(this.objectMapper.writeValueAsString(afterRow));
        return updateRow.toString().getBytes(StandardCharsets.UTF_8);
    }

    @VisibleForTesting
    public boolean schemaChange(JsonNode recordRoot) {
        boolean status = false;
        try {
            String ddl = this.extractDDL(recordRoot);
            if (StringUtils.isNullOrWhitespaceOnly((String)ddl)) {
                LOG.info("ddl can not do schema change:{}", (Object)recordRoot);
                return false;
            }
            boolean doSchemaChange = this.checkLightSchemaChange(ddl);
            status = doSchemaChange && this.execSchemaChange(ddl);
            LOG.info("schema change status:{}", (Object)status);
        }
        catch (Exception ex) {
            LOG.warn("schema change error :", (Throwable)ex);
        }
        return status;
    }

    private void addDeleteSign(Map<String, String> valueMap, boolean delete) {
        if (delete) {
            valueMap.put("__DORIS_DELETE_SIGN__", "1");
        } else {
            valueMap.put("__DORIS_DELETE_SIGN__", "0");
        }
    }

    private boolean checkLightSchemaChange(String ddl) throws Exception {
        Map<String, String> columnMap = this.parseAddDropColumn(ddl);
        if (columnMap.isEmpty()) {
            return false;
        }
        if (columnMap.containsKey("DROP")) {
            String dropColumn = columnMap.get("DROP");
            try (Connection connection = this.connectionProvider.getOrEstablishConnection();
                 PreparedStatement ps = connection.prepareStatement(String.format("DESC %s ALL", this.selectdbOptions.getTableIdentifier()));
                 ResultSet rs = ps.executeQuery();){
                while (true) {
                    if (rs.next()) {
                        String field = rs.getString("Field");
                        String key = rs.getString("Key");
                        if (!dropColumn.equals(field) || !"TRUE".equalsIgnoreCase(key)) continue;
                        LOG.warn("light schema change can not do table {}", (Object)this.selectdbOptions.getTableIdentifier());
                        boolean bl = false;
                        return bl;
                        continue;
                    }
                    break;
                }
            }
        }
        return true;
    }

    private Map<String, String> parseAddDropColumn(String ddl) {
        Matcher matcher;
        HashMap<String, String> columnMap = new HashMap<String, String>();
        if (ddl != null && (matcher = this.addDropDDLPattern.matcher(ddl)).find()) {
            String op = matcher.group(1);
            String col = matcher.group(3);
            columnMap.put(op.toUpperCase(), col);
        }
        return columnMap;
    }

    private boolean execSchemaChange(String ddl) throws Exception {
        try (Connection connection = this.connectionProvider.getOrEstablishConnection();
             PreparedStatement ps = connection.prepareStatement(ddl);){
            ps.execute();
        }
        return true;
    }

    private String extractJsonNode(JsonNode record, String key) {
        return record != null && record.get(key) != null ? record.get(key).asText() : null;
    }

    private Map<String, String> extractBeforeRow(JsonNode record) {
        return this.extractRow(record.get("before"));
    }

    private Map<String, String> extractAfterRow(JsonNode record) {
        return this.extractRow(record.get("after"));
    }

    private Map<String, String> extractRow(JsonNode recordRow) {
        HashMap recordMap = this.objectMapper.convertValue((Object)recordRow, new TypeReference<Map<String, String>>(){});
        return recordMap != null ? recordMap : new HashMap();
    }

    @VisibleForTesting
    public String extractDDL(JsonNode record) throws JsonProcessingException {
        Matcher matcher;
        String historyRecord = this.extractJsonNode(record, "historyRecord");
        if (Objects.isNull(historyRecord)) {
            return null;
        }
        String ddl = this.extractJsonNode(this.objectMapper.readTree(historyRecord), "ddl");
        LOG.debug("received debezium ddl :{}", (Object)ddl);
        if (!Objects.isNull(ddl) && (matcher = this.addDropDDLPattern.matcher(ddl)).find()) {
            String op = matcher.group(1);
            String col = matcher.group(3);
            String type = matcher.group(5);
            type = this.handleType(type);
            ddl = String.format(EXECUTE_DDL, this.selectdbOptions.getTableIdentifier(), op, col, type);
            LOG.info("parse ddl:{}", (Object)ddl);
            return ddl;
        }
        return null;
    }

    private String handleType(String type) {
        if (type == null || "".equals(type)) {
            return "";
        }
        Pattern pattern = Pattern.compile("varchar\\(([1-9][0-9]*)\\)", 2);
        Matcher matcher = pattern.matcher(type);
        if (matcher.find()) {
            String len = matcher.group(1);
            return String.format("varchar(%d)", Math.min(Integer.parseInt(len) * 3, 65533));
        }
        return type;
    }

    @Override
    public void close() throws IOException {
        this.connectionProvider.closeConnection();
    }

    public static Builder builder() {
        return new Builder();
    }

    public static class Builder {
        private SelectdbOptions selectdbOptions;
        private Pattern addDropDDLPattern;
        private SelectdbExecutionOptions executionOptions;

        public Builder setSelectdbOptions(SelectdbOptions selectdbOptions) {
            this.selectdbOptions = selectdbOptions;
            return this;
        }

        public Builder setPattern(Pattern addDropDDLPattern) {
            this.addDropDDLPattern = addDropDDLPattern;
            return this;
        }

        public Builder setExecutionOptions(SelectdbExecutionOptions executionOptions) {
            this.executionOptions = executionOptions;
            return this;
        }

        public JsonDebeziumSchemaSerializer build() {
            return new JsonDebeziumSchemaSerializer(this.selectdbOptions, this.addDropDDLPattern, this.executionOptions);
        }
    }
}

