/*
 * Decompiled with CFR 0.152.
 */
package org.jumpmind.symmetric.route;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.sql.DataSource;
import org.apache.commons.lang.StringUtils;
import org.jumpmind.symmetric.common.logging.ILog;
import org.jumpmind.symmetric.common.logging.LogFactory;
import org.jumpmind.symmetric.model.Channel;
import org.jumpmind.symmetric.model.Data;
import org.jumpmind.symmetric.model.DataRef;
import org.jumpmind.symmetric.route.RouterContext;
import org.jumpmind.symmetric.service.IDataService;
import org.jumpmind.symmetric.service.IService;
import org.jumpmind.symmetric.util.AppUtils;
import org.springframework.dao.DataAccessException;
import org.springframework.jdbc.core.ConnectionCallback;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.support.JdbcUtils;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public class DataToRouteReader
implements Runnable {
    static final ILog log = LogFactory.getLog(DataToRouteReader.class);
    private int fetchSize;
    protected BlockingQueue<Data> dataQueue;
    private DataSource dataSource;
    private RouterContext context;
    private DataRef dataRef;
    private IDataService dataService;
    private IService sql;
    private boolean reading = true;
    private int peekAheadCount;
    private static final int DEFAULT_QUERY_TIMEOUT = 300;
    private int queryTimeout = 300;

    public DataToRouteReader(DataSource dataSource, int maxQueueSize, IService sql, int fetchSize, RouterContext context, DataRef dataRef, IDataService dataService) {
        this(dataSource, maxQueueSize, sql, fetchSize, context, dataRef, dataService, 300);
    }

    public DataToRouteReader(DataSource dataSource, int peekAheadCount, IService sql, int fetchSize, RouterContext context, DataRef dataRef, IDataService dataService, int queryTimeout) {
        this.peekAheadCount = peekAheadCount;
        this.dataSource = dataSource;
        this.dataQueue = new LinkedBlockingQueue<Data>(peekAheadCount);
        this.sql = sql;
        this.context = context;
        this.fetchSize = fetchSize;
        this.dataRef = dataRef;
        this.dataService = dataService;
        this.queryTimeout = queryTimeout;
    }

    public Data take() {
        Data data = null;
        try {
            data = this.dataQueue.poll(this.queryTimeout == 0 ? 600L : (long)this.queryTimeout, TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            log.warn(e);
        }
        if (data instanceof EOD) {
            return null;
        }
        return data;
    }

    protected String getSql(Channel channel) {
        String select = this.sql.getSql("selectDataToBatchSql");
        if (!channel.isUseOldDataToRoute()) {
            select = select.replace("d.old_data", "''");
        }
        if (!channel.isUseRowDataToRoute()) {
            select = select.replace("d.row_data", "''");
        }
        if (!channel.isUsePkDataToRoute()) {
            select = select.replace("d.pk_data", "''");
        }
        return select;
    }

    @Override
    public void run() {
        try {
            JdbcTemplate jdbcTemplate = new JdbcTemplate(this.dataSource);
            jdbcTemplate.execute((ConnectionCallback)new ConnectionCallback<Integer>(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                public Integer doInConnection(Connection c) throws SQLException, DataAccessException {
                    Integer n;
                    int dataCount = 0;
                    PreparedStatement ps = null;
                    ResultSet rs = null;
                    try {
                        String channelId = DataToRouteReader.this.context.getChannel().getChannelId();
                        ps = c.prepareStatement(DataToRouteReader.this.getSql(DataToRouteReader.this.context.getChannel().getChannel()), 1003, 1007);
                        ps.setQueryTimeout(DataToRouteReader.this.queryTimeout);
                        ps.setFetchSize(DataToRouteReader.this.fetchSize);
                        ps.setString(1, channelId);
                        ps.setLong(2, DataToRouteReader.this.dataRef.getRefDataId());
                        long executeTimeInMs = System.currentTimeMillis();
                        rs = ps.executeQuery();
                        executeTimeInMs = System.currentTimeMillis() - executeTimeInMs;
                        if (executeTimeInMs > 30000L) {
                            log.warn("RoutedDataSelectedInTime", executeTimeInMs, channelId);
                        }
                        long maxDataToRoute = DataToRouteReader.this.context.getChannel().getMaxDataToRoute();
                        String lastTransactionId = null;
                        ArrayList<Data> peekAheadQueue = new ArrayList<Data>(DataToRouteReader.this.peekAheadCount);
                        boolean nontransactional = DataToRouteReader.this.context.getChannel().getBatchAlgorithm().equals("nontransactional");
                        boolean moreData = true;
                        while ((long)dataCount <= maxDataToRoute || lastTransactionId != null) {
                            if (moreData) {
                                moreData = DataToRouteReader.this.fillPeekAheadQueue(peekAheadQueue, DataToRouteReader.this.peekAheadCount, rs);
                            }
                            if ((lastTransactionId == null || nontransactional) && peekAheadQueue.size() > 0) {
                                Data data = (Data)peekAheadQueue.remove(0);
                                DataToRouteReader.this.copyToQueue(data);
                                ++dataCount;
                                lastTransactionId = data.getTransactionId();
                                continue;
                            }
                            if (lastTransactionId != null && peekAheadQueue.size() > 0) {
                                Iterator datas = peekAheadQueue.iterator();
                                int dataWithSameTransactionIdCount = 0;
                                while (datas.hasNext()) {
                                    Data data = (Data)datas.next();
                                    if (!lastTransactionId.equals(data.getTransactionId())) continue;
                                    ++dataWithSameTransactionIdCount;
                                    datas.remove();
                                    DataToRouteReader.this.copyToQueue(data);
                                    ++dataCount;
                                }
                                if (dataWithSameTransactionIdCount != 0) continue;
                                lastTransactionId = null;
                                continue;
                            }
                            if (peekAheadQueue.size() != 0) continue;
                        }
                        n = dataCount;
                        Object var18_15 = null;
                    }
                    catch (Throwable throwable) {
                        Object var18_16 = null;
                        JdbcUtils.closeResultSet(rs);
                        JdbcUtils.closeStatement(ps);
                        rs = null;
                        ps = null;
                        long ts = System.currentTimeMillis();
                        boolean done = false;
                        do {
                            done = DataToRouteReader.this.dataQueue.offer(new EOD());
                            AppUtils.sleep(50L);
                        } while (!done && DataToRouteReader.this.reading);
                        DataToRouteReader.this.context.incrementStat(System.currentTimeMillis() - ts, "enqueue.eod.data.ms");
                        DataToRouteReader.this.reading = false;
                        throw throwable;
                    }
                    JdbcUtils.closeResultSet((ResultSet)rs);
                    JdbcUtils.closeStatement((Statement)ps);
                    rs = null;
                    ps = null;
                    long ts = System.currentTimeMillis();
                    boolean done = false;
                    do {
                        done = DataToRouteReader.this.dataQueue.offer(new EOD());
                        AppUtils.sleep(50L);
                    } while (!done && DataToRouteReader.this.reading);
                    DataToRouteReader.this.context.incrementStat(System.currentTimeMillis() - ts, "enqueue.eod.data.ms");
                    DataToRouteReader.this.reading = false;
                    return n;
                }
            });
        }
        catch (Throwable ex) {
            log.error(ex);
        }
    }

    protected boolean fillPeekAheadQueue(List<Data> peekAheadQueue, int peekAheadCount, ResultSet rs) throws SQLException {
        boolean moreData = true;
        int toRead = peekAheadCount - peekAheadQueue.size();
        int dataCount = 0;
        long ts = System.currentTimeMillis();
        while (this.reading && dataCount < toRead) {
            if (rs.next()) {
                if (this.process(rs)) {
                    Data data = this.dataService.readData(rs);
                    this.context.setLastDataIdForTransactionId(data);
                    peekAheadQueue.add(data);
                    ++dataCount;
                    this.context.incrementStat(System.currentTimeMillis() - ts, "read.data.ms");
                } else {
                    this.context.incrementStat(System.currentTimeMillis() - ts, "already.read.data.ms");
                }
                ts = System.currentTimeMillis();
                continue;
            }
            moreData = false;
            break;
        }
        return moreData;
    }

    protected boolean process(ResultSet rs) throws SQLException {
        return StringUtils.isBlank((String)rs.getString(13));
    }

    protected void copyToQueue(Data data) {
        long ts = System.currentTimeMillis();
        while (!this.dataQueue.offer(data) && this.reading) {
            AppUtils.sleep(50L);
        }
        this.context.incrementStat(System.currentTimeMillis() - ts, "enqueue.data.ms");
    }

    public boolean isReading() {
        return this.reading;
    }

    public void setReading(boolean reading) {
        this.reading = reading;
    }

    public BlockingQueue<Data> getDataQueue() {
        return this.dataQueue;
    }

    class EOD
    extends Data {
        EOD() {
        }
    }
}

