/*
 * Decompiled with CFR 0.152.
 */
package com.selectdb.flink.sink.committer;

import com.selectdb.flink.cfg.SelectdbOptions;
import com.selectdb.flink.exception.CopyIntoException;
import com.selectdb.flink.models.BaseResponse;
import com.selectdb.flink.models.CopyIntoResp;
import com.selectdb.flink.shaded.com.fasterxml.jackson.core.type.TypeReference;
import com.selectdb.flink.shaded.com.fasterxml.jackson.databind.ObjectMapper;
import com.selectdb.flink.sink.HttpPostBuilder;
import com.selectdb.flink.sink.HttpUtil;
import com.selectdb.flink.sink.ResponseUtil;
import com.selectdb.flink.sink.SelectdbCommittable;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.connector.sink.Committer;
import org.apache.flink.util.CollectionUtil;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SelectdbCommitter
implements Committer<SelectdbCommittable> {
    private static final Logger LOG = LoggerFactory.getLogger(SelectdbCommitter.class);
    private static final String commitPattern = "http://%s/copy/query";
    private ObjectMapper objectMapper = new ObjectMapper();
    private final CloseableHttpClient httpClient;
    private final SelectdbOptions selectdbOptions;
    int maxRetry;

    public SelectdbCommitter(SelectdbOptions selectdbOptions, int maxRetry) {
        this(selectdbOptions, maxRetry, new HttpUtil().getHttpClient());
    }

    public SelectdbCommitter(SelectdbOptions selectdbOptions, int maxRetry, CloseableHttpClient client) {
        this.selectdbOptions = selectdbOptions;
        this.maxRetry = maxRetry;
        this.httpClient = client;
    }

    public List<SelectdbCommittable> commit(List<SelectdbCommittable> committableList) throws IOException, InterruptedException {
        for (SelectdbCommittable committable : committableList) {
            this.commitTransaction(committable);
        }
        return Collections.emptyList();
    }

    private void commitTransaction(SelectdbCommittable committable) throws IOException {
        long start = System.currentTimeMillis();
        String hostPort = committable.getHostPort();
        String clusterName = committable.getClusterName();
        String copySQL = committable.getCopySQL();
        LOG.info("commit to cluster {} with copy sql: {}", (Object)clusterName, (Object)copySQL);
        int statusCode = -1;
        String reasonPhrase = null;
        int retry = 0;
        HashMap<String, String> params = new HashMap<String, String>();
        params.put("cluster", clusterName);
        params.put("sql", copySQL);
        boolean success = false;
        String loadResult = "";
        while (retry++ <= this.maxRetry) {
            HttpPostBuilder postBuilder = new HttpPostBuilder();
            postBuilder.setUrl(String.format(commitPattern, hostPort)).baseAuth(this.selectdbOptions.getUsername(), this.selectdbOptions.getPassword()).setEntity(new StringEntity(this.objectMapper.writeValueAsString(params)));
            try {
                CloseableHttpResponse response = this.httpClient.execute(postBuilder.build());
                Throwable throwable = null;
                try {
                    statusCode = response.getStatusLine().getStatusCode();
                    reasonPhrase = response.getStatusLine().getReasonPhrase();
                    if (statusCode != 200) {
                        LOG.warn("commit failed with status {} {}, reason {}", new Object[]{statusCode, hostPort, reasonPhrase});
                        continue;
                    }
                    if (response.getEntity() == null) continue;
                    loadResult = EntityUtils.toString(response.getEntity());
                    success = this.handleCommitResponse(loadResult);
                    if (success) {
                        LOG.info("commit success cost {}ms, response is {}", (Object)(System.currentTimeMillis() - start), (Object)loadResult);
                        break;
                    }
                    LOG.warn("commit failed, retry again");
                }
                catch (Throwable throwable2) {
                    throwable = throwable2;
                    throw throwable2;
                }
                finally {
                    if (response == null) continue;
                    if (throwable != null) {
                        try {
                            response.close();
                        }
                        catch (Throwable throwable3) {
                            throwable.addSuppressed(throwable3);
                        }
                        continue;
                    }
                    response.close();
                }
            }
            catch (IOException e) {
                LOG.error("commit error : ", (Throwable)e);
            }
        }
        if (!success) {
            LOG.error("commit error with status {}, reason {}, response {}", new Object[]{statusCode, reasonPhrase, loadResult});
            String copyErrMsg = String.format("commit error, status: %d, reason: %s, response: %s, copySQL: %s", statusCode, reasonPhrase, loadResult, committable.getCopySQL());
            throw new CopyIntoException(copyErrMsg);
        }
    }

    public boolean handleCommitResponse(String loadResult) throws IOException {
        BaseResponse baseResponse = this.objectMapper.readValue(loadResult, new TypeReference<BaseResponse>(){});
        if (baseResponse.getCode() == 0 && baseResponse.getData() instanceof Map) {
            CopyIntoResp dataResp = this.objectMapper.convertValue(baseResponse.getData(), CopyIntoResp.class);
            if ("1".equals(dataResp.getDataCode())) {
                LOG.error("copy into execute failed, reason:{}", (Object)loadResult);
                return false;
            }
            Map<String, String> result = dataResp.getResult();
            if (CollectionUtil.isNullOrEmpty(result) || !result.get("state").equals("FINISHED") && !ResponseUtil.isCommitted(result.get("msg"))) {
                LOG.error("copy into load failed, reason:{}", (Object)loadResult);
                return false;
            }
            return true;
        }
        LOG.error("commit failed, reason:{}", (Object)loadResult);
        return false;
    }

    public void close() throws Exception {
        if (this.httpClient != null) {
            this.httpClient.close();
        }
    }
}

