package org.jumpmind.symmetric.route;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.sql.DataSource;
import org.apache.commons.lang.StringUtils;
import org.jumpmind.symmetric.common.logging.ILog;
import org.jumpmind.symmetric.common.logging.LogFactory;
import org.jumpmind.symmetric.model.Channel;
import org.jumpmind.symmetric.model.Data;
import org.jumpmind.symmetric.model.DataRef;
import org.jumpmind.symmetric.service.IDataService;
import org.jumpmind.symmetric.service.IService;
import org.jumpmind.symmetric.util.AppUtils;
import org.springframework.jdbc.core.ConnectionCallback;
import org.springframework.jdbc.core.JdbcTemplate;

/* loaded from: input_file:org/jumpmind/symmetric/route/DataToRouteReader.class */
public class DataToRouteReader implements Runnable {
    static final ILog log = LogFactory.getLog(DataToRouteReader.class);
    private int fetchSize;
    protected BlockingQueue<Data> dataQueue;
    private DataSource dataSource;
    private RouterContext context;
    private DataRef dataRef;
    private IDataService dataService;
    private IService sql;
    private boolean reading;
    private int peekAheadCount;
    private static final int DEFAULT_QUERY_TIMEOUT = 300;
    private int queryTimeout;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/jumpmind/symmetric/route/DataToRouteReader$EOD.class */
    public class EOD extends Data {
        EOD() {
        }
    }

    public DataToRouteReader(DataSource dataSource, int i, IService iService, int i2, RouterContext routerContext, DataRef dataRef, IDataService iDataService) {
        this(dataSource, i, iService, i2, routerContext, dataRef, iDataService, DEFAULT_QUERY_TIMEOUT);
    }

    public DataToRouteReader(DataSource dataSource, int i, IService iService, int i2, RouterContext routerContext, DataRef dataRef, IDataService iDataService, int i3) {
        this.reading = true;
        this.queryTimeout = DEFAULT_QUERY_TIMEOUT;
        this.peekAheadCount = i;
        this.dataSource = dataSource;
        this.dataQueue = new LinkedBlockingQueue(i);
        this.sql = iService;
        this.context = routerContext;
        this.fetchSize = i2;
        this.dataRef = dataRef;
        this.dataService = iDataService;
        this.queryTimeout = i3;
    }

    public Data take() {
        Data data = null;
        try {
            data = this.dataQueue.poll(this.queryTimeout == 0 ? 600L : this.queryTimeout, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            log.warn(e);
        }
        if (data instanceof EOD) {
            return null;
        }
        return data;
    }

    protected String getSql(Channel channel) {
        String sql = this.sql.getSql("selectDataToBatchSql");
        if (!channel.isUseOldDataToRoute()) {
            sql = sql.replace("d.old_data", "''");
        }
        if (!channel.isUseRowDataToRoute()) {
            sql = sql.replace("d.row_data", "''");
        }
        if (!channel.isUsePkDataToRoute()) {
            sql = sql.replace("d.pk_data", "''");
        }
        return sql;
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            new JdbcTemplate(this.dataSource).execute(new ConnectionCallback<Integer>() { // from class: org.jumpmind.symmetric.route.DataToRouteReader.1
                /* JADX WARN: Code restructure failed: missing block: B:71:0x01f7, code lost:
                
                    if (r0 == false) goto L49;
                 */
                /* JADX WARN: Code restructure failed: missing block: B:75:0x0204, code lost:
                
                    r8.this$0.context.incrementStat(java.lang.System.currentTimeMillis() - r0, org.jumpmind.symmetric.route.RouterContext.STAT_ENQUEUE_EOD_MS);
                    r8.this$0.reading = false;
                 */
                /* JADX WARN: Code restructure failed: missing block: B:77:0x01bd, code lost:
                
                    throw r25;
                 */
                /* renamed from: doInConnection, reason: merged with bridge method [inline-methods] */
                /*
                    Code decompiled incorrectly, please refer to instructions dump.
                    To view partially-correct add '--show-bad-code' argument
                */
                public java.lang.Integer m63doInConnection(java.sql.Connection r9) throws java.sql.SQLException, org.springframework.dao.DataAccessException {
                    /*
                        Method dump skipped, instructions count: 545
                        To view this dump add '--comments-level debug' option
                    */
                    throw new UnsupportedOperationException("Method not decompiled: org.jumpmind.symmetric.route.DataToRouteReader.AnonymousClass1.m63doInConnection(java.sql.Connection):java.lang.Integer");
                }
            });
        } catch (Throwable th) {
            log.error(th);
        }
    }

    protected boolean fillPeekAheadQueue(List<Data> list, int i, ResultSet resultSet) throws SQLException {
        boolean z = true;
        int size = i - list.size();
        int i2 = 0;
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            long j = currentTimeMillis;
            if (!this.reading || i2 >= size) {
                break;
            }
            if (!resultSet.next()) {
                z = false;
                break;
            }
            if (process(resultSet)) {
                Data readData = this.dataService.readData(resultSet);
                this.context.setLastDataIdForTransactionId(readData);
                list.add(readData);
                i2++;
                this.context.incrementStat(System.currentTimeMillis() - j, RouterContext.STAT_READ_DATA_MS);
            } else {
                this.context.incrementStat(System.currentTimeMillis() - j, RouterContext.STAT_REREAD_DATA_MS);
            }
            currentTimeMillis = System.currentTimeMillis();
        }
        return z;
    }

    protected boolean process(ResultSet resultSet) throws SQLException {
        return StringUtils.isBlank(resultSet.getString(13));
    }

    protected void copyToQueue(Data data) {
        long currentTimeMillis = System.currentTimeMillis();
        while (!this.dataQueue.offer(data) && this.reading) {
            AppUtils.sleep(50L);
        }
        this.context.incrementStat(System.currentTimeMillis() - currentTimeMillis, RouterContext.STAT_ENQUEUE_DATA_MS);
    }

    public boolean isReading() {
        return this.reading;
    }

    public void setReading(boolean z) {
        this.reading = z;
    }

    public BlockingQueue<Data> getDataQueue() {
        return this.dataQueue;
    }
}
