/*
 * Decompiled with CFR 0.152.
 */
package org.jumpmind.symmetric.service.impl;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.commons.lang.StringUtils;
import org.apache.ddlutils.model.Table;
import org.jumpmind.symmetric.model.Data;
import org.jumpmind.symmetric.model.DataMetaData;
import org.jumpmind.symmetric.model.DataRef;
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.model.NodeChannel;
import org.jumpmind.symmetric.model.NodeGroupLink;
import org.jumpmind.symmetric.model.OutgoingBatch;
import org.jumpmind.symmetric.model.Router;
import org.jumpmind.symmetric.model.TriggerRouter;
import org.jumpmind.symmetric.route.DataToRouteReader;
import org.jumpmind.symmetric.route.IBatchAlgorithm;
import org.jumpmind.symmetric.route.IDataRouter;
import org.jumpmind.symmetric.route.IRouterContext;
import org.jumpmind.symmetric.route.RouterContext;
import org.jumpmind.symmetric.service.IClusterService;
import org.jumpmind.symmetric.service.IConfigurationService;
import org.jumpmind.symmetric.service.IDataService;
import org.jumpmind.symmetric.service.INodeService;
import org.jumpmind.symmetric.service.IOutgoingBatchService;
import org.jumpmind.symmetric.service.IRouterService;
import org.jumpmind.symmetric.service.ITriggerRouterService;
import org.jumpmind.symmetric.service.impl.AbstractService;
import org.springframework.dao.DataAccessException;
import org.springframework.jdbc.core.ResultSetExtractor;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public class RouterService
extends AbstractService
implements IRouterService {
    private IClusterService clusterService;
    private IDataService dataService;
    private IConfigurationService configurationService;
    private ITriggerRouterService triggerRouterService;
    private IOutgoingBatchService outgoingBatchService;
    private INodeService nodeService;
    private Map<String, IDataRouter> routers;
    private Map<String, IBatchAlgorithm> batchAlgorithms;
    private long transactionViewClockSyncThresholdInMs = 5000L;
    transient ExecutorService readThread = Executors.newSingleThreadExecutor();

    @Override
    public boolean shouldDataBeRouted(IRouterContext context, DataMetaData dataMetaData, Set<Node> nodes, boolean initialLoad) {
        IDataRouter router = this.getDataRouter(dataMetaData.getTriggerRouter());
        Collection<String> nodeIds = router.routeToNodes(context, dataMetaData, nodes, initialLoad);
        for (Node node : nodes) {
            if (nodeIds == null || !nodeIds.contains(node.getNodeId())) continue;
            return true;
        }
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public synchronized void routeData() {
        if (this.clusterService.lock("ROUTE")) {
            try {
                long databaseTimeAtRoutingStart = this.dbDialect.getDatabaseTime();
                long ts = System.currentTimeMillis();
                this.updateAbandonedRoutingBatches();
                long delta = System.currentTimeMillis() - ts;
                if (delta > 30000L) {
                    this.log.warn("LongRunningOperation", "updating abandoned RT batches", delta);
                }
                Node sourceNode = this.nodeService.findIdentity();
                DataRef ref = this.dataService.getDataRef();
                int dataCount = this.routeDataForEachChannel(ref, sourceNode);
                this.findAndSaveNextDataId(databaseTimeAtRoutingStart);
                ts = System.currentTimeMillis() - ts;
                if (dataCount > 0 || ts > 30000L) {
                    this.log.info("RoutedDataInTime", dataCount, ts);
                }
                Object var11_7 = null;
                this.clusterService.unlock("ROUTE");
            }
            catch (Throwable throwable) {
                Object var11_8 = null;
                this.clusterService.unlock("ROUTE");
                throw throwable;
            }
        }
    }

    protected void updateAbandonedRoutingBatches() {
        this.outgoingBatchService.updateAbandonedRoutingBatches();
    }

    protected int routeDataForEachChannel(DataRef ref, Node sourceNode) {
        List<NodeChannel> channels = this.configurationService.getNodeChannels();
        int dataCount = 0;
        for (NodeChannel nodeChannel : channels) {
            if (nodeChannel.isSuspendEnabled() || !nodeChannel.isEnabled()) continue;
            dataCount += this.routeDataForChannel(ref, nodeChannel, sourceNode);
        }
        return dataCount;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Loose catch block
     */
    protected int routeDataForChannel(DataRef ref, NodeChannel nodeChannel, Node sourceNode) {
        Object v0;
        RouterContext context = null;
        long ts = System.currentTimeMillis();
        int dataCount = -1;
        context = new RouterContext(sourceNode.getNodeId(), nodeChannel, this.dataSource);
        int n = dataCount = this.selectDataAndRoute(ref, context);
        Object var11_9 = null;
        try {
            try {
                if (dataCount > 0) {
                    long insertTs = System.currentTimeMillis();
                    this.dataService.insertDataEvents(context.getJdbcTemplate(), context.getDataEventList());
                    context.clearDataEventsList();
                    this.completeBatchesAndCommit(context);
                    context.incrementStat(System.currentTimeMillis() - insertTs, "insert.data.events.ms");
                }
                v0 = null;
            }
            catch (Exception e) {
                if (context != null) {
                    context.rollback();
                }
                this.log.error(e);
                v0 = null;
            }
        }
        catch (Throwable throwable) {
            v0 = null;
        }
        Object var15_21 = v0;
        context.logStats(this.log, dataCount, System.currentTimeMillis() - ts);
        context.cleanup();
        return n;
        catch (Exception ex) {
            Object v2;
            int n2;
            try {
                this.log.error("RouterRoutingFailed", ex, nodeChannel.getChannelId());
                if (context != null) {
                    context.rollback();
                }
                n2 = 0;
                Object var11_10 = null;
            }
            catch (Throwable throwable) {
                Object v1;
                Object var11_11 = null;
                try {
                    try {
                        if (dataCount > 0) {
                            long insertTs = System.currentTimeMillis();
                            this.dataService.insertDataEvents(context.getJdbcTemplate(), context.getDataEventList());
                            context.clearDataEventsList();
                            this.completeBatchesAndCommit(context);
                            context.incrementStat(System.currentTimeMillis() - insertTs, "insert.data.events.ms");
                        }
                        v1 = null;
                    }
                    catch (Exception e) {
                        if (context != null) {
                            context.rollback();
                        }
                        this.log.error(e);
                        v1 = null;
                    }
                }
                catch (Throwable throwable2) {
                    v1 = null;
                }
                Object var15_23 = v1;
                context.logStats(this.log, dataCount, System.currentTimeMillis() - ts);
                context.cleanup();
                throw throwable;
            }
            try {
                try {
                    if (dataCount > 0) {
                        long insertTs = System.currentTimeMillis();
                        this.dataService.insertDataEvents(context.getJdbcTemplate(), context.getDataEventList());
                        context.clearDataEventsList();
                        this.completeBatchesAndCommit(context);
                        context.incrementStat(System.currentTimeMillis() - insertTs, "insert.data.events.ms");
                    }
                    v2 = null;
                }
                catch (Exception e) {
                    if (context != null) {
                        context.rollback();
                    }
                    this.log.error(e);
                    v2 = null;
                }
            }
            catch (Throwable throwable) {
                v2 = null;
            }
            Object var15_22 = v2;
            context.logStats(this.log, dataCount, System.currentTimeMillis() - ts);
            context.cleanup();
            return n2;
        }
    }

    protected void completeBatchesAndCommit(RouterContext context) throws SQLException {
        ArrayList<OutgoingBatch> batches = new ArrayList<OutgoingBatch>(context.getBatchesByNodes().values());
        context.commit();
        for (OutgoingBatch batch : batches) {
            batch.setRouterMillis(System.currentTimeMillis() - batch.getCreateTime().getTime());
            Set<IDataRouter> usedRouters = context.getUsedDataRouters();
            for (IDataRouter dataRouter : usedRouters) {
                dataRouter.completeBatch(context, batch);
            }
            if ("-1".equals(batch.getNodeId())) {
                batch.setStatus(OutgoingBatch.Status.OK);
            } else {
                batch.setStatus(OutgoingBatch.Status.NE);
            }
            this.outgoingBatchService.updateOutgoingBatch(batch);
            context.getBatchesByNodes().remove(batch.getNodeId());
        }
        context.setNeedsCommitted(false);
    }

    protected void findAndSaveNextDataId(long databaseTimeAtRoutingStart) {
        final DataRef ref = this.dataService.getDataRef();
        long ts = System.currentTimeMillis();
        final int dataIdIncrementBy = this.parameterService.getInt("data.id.increment.by");
        long lastDataId = (Long)this.jdbcTemplate.query(this.getSql("selectDistinctDataIdFromDataEventSql"), new Object[]{ref.getRefDataId()}, new int[]{2}, (ResultSetExtractor)new ResultSetExtractor<Long>(){

            public Long extractData(ResultSet rs) throws SQLException, DataAccessException {
                long lastDataId = ref.getRefDataId();
                while (rs.next()) {
                    long dataId = rs.getLong(1);
                    if (lastDataId == -1L || lastDataId + (long)dataIdIncrementBy == dataId || lastDataId == dataId) {
                        lastDataId = dataId;
                        continue;
                    }
                    if (RouterService.this.dataService.countDataInRange(lastDataId, dataId) != 0) break;
                    if (RouterService.this.dbDialect.supportsTransactionViews()) {
                        if (RouterService.this.dbDialect.areDatabaseTransactionsPendingSince(RouterService.this.dataService.findCreateTimeOfData(dataId).getTime() + RouterService.this.transactionViewClockSyncThresholdInMs) || RouterService.this.dataService.countDataInRange(lastDataId, dataId) != 0) continue;
                        RouterService.this.log.info("RouterSkippingDataIdsNoTransactions", lastDataId, dataId);
                        lastDataId = dataId;
                        continue;
                    }
                    if (!RouterService.this.isDataGapExpired(dataId)) break;
                    RouterService.this.log.info("RouterSkippingDataIdsGapExpired", lastDataId, dataId);
                    lastDataId = dataId;
                }
                return lastDataId;
            }
        });
        long updateTimeInMs = System.currentTimeMillis() - ts;
        if (updateTimeInMs > 10000L) {
            this.log.info("RoutedDataRefUpdateTime", updateTimeInMs);
        }
        if (ref.getRefDataId() != lastDataId) {
            this.dataService.saveDataRef(new DataRef(lastDataId, new Date()));
        }
    }

    protected boolean isDataGapExpired(long dataId) {
        long gapTimoutInMs = this.parameterService.getLong("routing.stale.dataid.gap.time.ms");
        Date createTime = this.dataService.findCreateTimeOfEvent(dataId);
        return System.currentTimeMillis() - createTime.getTime() > gapTimoutInMs;
    }

    protected Set<Node> findAvailableNodes(TriggerRouter triggerRouter, RouterContext context) {
        Set<Node> nodes = context.getAvailableNodes().get(triggerRouter);
        if (nodes == null) {
            nodes = new HashSet<Node>();
            Router router = triggerRouter.getRouter();
            List<NodeGroupLink> links = this.configurationService.getGroupLinksFor(router.getSourceNodeGroupId(), router.getTargetNodeGroupId());
            if (links.size() > 0) {
                nodes.addAll(this.nodeService.findEnabledNodesFromNodeGroup(router.getTargetNodeGroupId()));
            } else {
                this.log.error("RouterIllegalNodeGroupLink", router.getRouterId(), router.getSourceNodeGroupId(), router.getTargetNodeGroupId());
            }
            context.getAvailableNodes().put(triggerRouter, nodes);
        }
        return nodes;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected int selectDataAndRoute(DataRef ref, RouterContext context) throws SQLException {
        DataToRouteReader reader = new DataToRouteReader(this.dataSource, this.dbDialect.getRouterDataPeekAheadCount(), this, this.dbDialect.getStreamingResultsFetchSize(), context, ref, this.dataService, this.jdbcTemplate.getQueryTimeout());
        this.readThread.execute(reader);
        Data data = null;
        int dataCount = 0;
        try {
            int numberOfEventsBeforeFlush = this.parameterService.getInt("routing.flush.jdbc.batch.size");
            do {
                Object var12_9;
                long ts;
                block6: {
                    if ((data = reader.take()) == null) continue;
                    ++dataCount;
                    this.routeData(data, context);
                    ts = System.currentTimeMillis();
                    try {
                        if (numberOfEventsBeforeFlush <= context.getDataEventList().size() || context.isNeedsCommitted()) {
                            this.dataService.insertDataEvents(context.getJdbcTemplate(), context.getDataEventList());
                            context.clearDataEventsList();
                        }
                        if (!context.isNeedsCommitted()) break block6;
                        this.completeBatchesAndCommit(context);
                        long maxDataToRoute = context.getChannel().getMaxDataToRoute();
                        if (maxDataToRoute <= 0L || (long)dataCount <= maxDataToRoute) break block6;
                        this.log.info("RoutedMaxNumberData", dataCount, context.getChannel().getChannelId());
                        var12_9 = null;
                    }
                    catch (Throwable throwable) {
                        var12_9 = null;
                        context.incrementStat(System.currentTimeMillis() - ts, "insert.data.events.ms");
                        throw throwable;
                    }
                    context.incrementStat(System.currentTimeMillis() - ts, "insert.data.events.ms");
                    {
                        break;
                    }
                }
                var12_9 = null;
                context.incrementStat(System.currentTimeMillis() - ts, "insert.data.events.ms");
                {
                }
            } while (data != null);
            Object var14_11 = null;
            reader.setReading(false);
        }
        catch (Throwable throwable) {
            Object var14_12 = null;
            reader.setReading(false);
            throw throwable;
        }
        return dataCount;
    }

    protected void routeData(Data data, RouterContext context) throws SQLException {
        context.recordTransactionBoundaryEncountered(data);
        List<TriggerRouter> triggerRouters = this.getTriggerRoutersForData(data);
        if (triggerRouters != null && triggerRouters.size() > 0) {
            for (TriggerRouter triggerRouter : triggerRouters) {
                Table table = this.dbDialect.getTable(triggerRouter.getTrigger(), true);
                DataMetaData dataMetaData = new DataMetaData(data, table, triggerRouter, context.getChannel());
                Collection<String> nodeIds = null;
                if (!context.getChannel().isIgnoreEnabled() && triggerRouter.isRouted(data.getEventType())) {
                    IDataRouter dataRouter = this.getDataRouter(triggerRouter);
                    context.addUsedDataRouter(dataRouter);
                    long ts = System.currentTimeMillis();
                    nodeIds = dataRouter.routeToNodes(context, dataMetaData, this.findAvailableNodes(triggerRouter, context), false);
                    context.incrementStat(System.currentTimeMillis() - ts, "data.router.ms");
                    if (data.getSourceNodeId() != null && nodeIds != null) {
                        nodeIds.remove(data.getSourceNodeId());
                    }
                }
                this.insertDataEvents(context, dataMetaData, nodeIds, triggerRouter);
            }
        } else {
            this.log.warn("TriggerProcessingFailedMissing", data.getTriggerHistory().getTriggerId(), data.getDataId());
        }
    }

    protected void insertDataEvents(RouterContext context, DataMetaData dataMetaData, Collection<String> nodeIds, TriggerRouter triggerRouter) {
        if (nodeIds == null || nodeIds.size() == 0) {
            nodeIds = new HashSet<String>(1);
            nodeIds.add("-1");
        }
        long ts = System.currentTimeMillis();
        for (String nodeId : nodeIds) {
            Map<String, OutgoingBatch> batches = context.getBatchesByNodes();
            OutgoingBatch batch = batches.get(nodeId);
            if (batch == null) {
                batch = new OutgoingBatch(nodeId, dataMetaData.getNodeChannel().getChannelId(), OutgoingBatch.Status.RT);
                this.outgoingBatchService.insertOutgoingBatch(batch);
                context.getBatchesByNodes().put(nodeId, batch);
            }
            batch.incrementDataEventCount();
            context.addDataEvent(dataMetaData.getData().getDataId(), batch.getBatchId(), triggerRouter.getRouter().getRouterId());
            if (!this.batchAlgorithms.get(context.getChannel().getBatchAlgorithm()).isBatchComplete(batch, dataMetaData, context)) continue;
            context.setNeedsCommitted(true);
        }
        context.incrementStat(System.currentTimeMillis() - ts, "insert.data.events.ms");
    }

    protected IDataRouter getDataRouter(TriggerRouter trigger) {
        IDataRouter router = null;
        if (!StringUtils.isBlank((String)trigger.getRouter().getRouterType()) && (router = this.routers.get(trigger.getRouter().getRouterType())) == null) {
            this.log.warn("RouterMissing", trigger.getRouter().getRouterType(), trigger.getTrigger().getTriggerId());
        }
        if (router == null) {
            return this.routers.get("default");
        }
        return router;
    }

    protected List<TriggerRouter> getTriggerRoutersForData(Data data) {
        List<TriggerRouter> triggerRouters = this.triggerRouterService.getTriggerRoutersForCurrentNode(false).get(data.getTriggerHistory().getTriggerId());
        if (triggerRouters == null || triggerRouters.size() == 0) {
            triggerRouters = this.triggerRouterService.getTriggerRoutersForCurrentNode(true).get(data.getTriggerHistory().getTriggerId());
        }
        return triggerRouters;
    }

    @Override
    public void addDataRouter(String name, IDataRouter dataRouter) {
        this.routers.put(name, dataRouter);
    }

    @Override
    public void addBatchAlgorithm(String name, IBatchAlgorithm algorithm) {
        this.batchAlgorithms.put(name, algorithm);
    }

    public void setConfigurationService(IConfigurationService configurationService) {
        this.configurationService = configurationService;
    }

    public void setOutgoingBatchService(IOutgoingBatchService outgoingBatchService) {
        this.outgoingBatchService = outgoingBatchService;
    }

    public void setClusterService(IClusterService clusterService) {
        this.clusterService = clusterService;
    }

    public void setNodeService(INodeService nodeService) {
        this.nodeService = nodeService;
    }

    public void setDataService(IDataService dataService) {
        this.dataService = dataService;
    }

    public void setRouters(Map<String, IDataRouter> routers) {
        this.routers = routers;
    }

    public void setBatchAlgorithms(Map<String, IBatchAlgorithm> batchAlgorithms) {
        this.batchAlgorithms = batchAlgorithms;
    }

    public void setTriggerRouterService(ITriggerRouterService triggerService) {
        this.triggerRouterService = triggerService;
    }

    public void setTransactionViewClockSyncThresholdInMs(long transactionViewClockSyncThresholdInMs) {
        this.transactionViewClockSyncThresholdInMs = transactionViewClockSyncThresholdInMs;
    }
}

