/*
 * Decompiled with CFR 0.152.
 */
package org.jumpmind.symmetric.service.impl;

import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStream;
import java.io.Reader;
import java.io.Writer;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.HashSet;
import java.util.List;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.ddlutils.model.Table;
import org.jumpmind.symmetric.Version;
import org.jumpmind.symmetric.common.TableConstants;
import org.jumpmind.symmetric.db.IDbDialect;
import org.jumpmind.symmetric.db.postgresql.PostgreSqlDbDialect;
import org.jumpmind.symmetric.extract.DataExtractorContext;
import org.jumpmind.symmetric.extract.IDataExtractor;
import org.jumpmind.symmetric.extract.IExtractorFilter;
import org.jumpmind.symmetric.model.ChannelMap;
import org.jumpmind.symmetric.model.Data;
import org.jumpmind.symmetric.model.DataEventType;
import org.jumpmind.symmetric.model.DataMetaData;
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.model.NodeChannel;
import org.jumpmind.symmetric.model.OutgoingBatch;
import org.jumpmind.symmetric.model.OutgoingBatches;
import org.jumpmind.symmetric.model.TriggerHistory;
import org.jumpmind.symmetric.model.TriggerRouter;
import org.jumpmind.symmetric.route.SimpleRouterContext;
import org.jumpmind.symmetric.service.IAcknowledgeService;
import org.jumpmind.symmetric.service.IConfigurationService;
import org.jumpmind.symmetric.service.IDataExtractorService;
import org.jumpmind.symmetric.service.IDataService;
import org.jumpmind.symmetric.service.IExtractListener;
import org.jumpmind.symmetric.service.INodeService;
import org.jumpmind.symmetric.service.IOutgoingBatchService;
import org.jumpmind.symmetric.service.IRouterService;
import org.jumpmind.symmetric.service.ITriggerRouterService;
import org.jumpmind.symmetric.service.impl.AbstractService;
import org.jumpmind.symmetric.transport.IOutgoingTransport;
import org.jumpmind.symmetric.transport.TransportUtils;
import org.jumpmind.symmetric.transport.file.FileOutgoingTransport;
import org.jumpmind.symmetric.util.CsvUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.dao.DataAccessException;
import org.springframework.jdbc.core.ConnectionCallback;
import org.springframework.jdbc.support.JdbcUtils;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public class DataExtractorService
extends AbstractService
implements IDataExtractorService,
BeanFactoryAware {
    private IOutgoingBatchService outgoingBatchService;
    private IRouterService routingService;
    private IDataService dataService;
    private IConfigurationService configurationService;
    private IAcknowledgeService acknowledgeService;
    private ITriggerRouterService triggerRouterService;
    private INodeService nodeService;
    private IDbDialect dbDialect;
    private BeanFactory beanFactory;
    private DataExtractorContext clonableContext;
    private List<IExtractorFilter> extractorFilters;

    @Override
    public void extractConfigurationStandalone(Node node, OutputStream out) throws IOException {
        this.extractConfigurationStandalone(node, TransportUtils.toWriter(out));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void extractConfigurationStandalone(Node node, BufferedWriter writer) throws IOException {
        try {
            OutgoingBatch batch = new OutgoingBatch(node.getNodeId(), "config", OutgoingBatch.Status.NE);
            if (Version.isOlderThanVersion(node.getSymmetricVersion(), "1.6.0")) {
                this.outgoingBatchService.insertOutgoingBatch(batch);
                this.acknowledgeService.ack(batch.getBatchInfo());
            } else {
                batch.setBatchId(-9999L);
            }
            IDataExtractor dataExtractor = this.getDataExtractor(node.getSymmetricVersion());
            DataExtractorContext ctxCopy = this.clonableContext.copy(dataExtractor);
            dataExtractor.init(writer, ctxCopy);
            dataExtractor.begin(batch, writer);
            this.extractConfiguration(node, writer, ctxCopy);
            dataExtractor.commit(batch, writer);
            Object var7_6 = null;
        }
        catch (Throwable throwable) {
            Object var7_7 = null;
            writer.flush();
            throw throwable;
        }
        writer.flush();
    }

    @Override
    public void extractConfiguration(Node node, BufferedWriter writer, DataExtractorContext ctx) throws IOException {
        TriggerRouter triggerRouter;
        int i;
        List<TriggerRouter> triggerRouters = this.triggerRouterService.getTriggerRoutersForRegistration(StringUtils.isBlank((String)node.getSymmetricVersion()) ? Version.version() : node.getSymmetricVersion(), this.parameterService.getNodeGroupId(), node.getNodeGroupId());
        if (node.isVersionGreaterThanOrEqualTo(1, 5, 0)) {
            for (i = triggerRouters.size() - 1; i >= 0; --i) {
                triggerRouter = triggerRouters.get(i);
                StringBuilder sql = new StringBuilder(this.dbDialect.createPurgeSqlFor(node, triggerRouter));
                this.addPurgeCriteriaToConfigurationTables(triggerRouter.getTrigger().getSourceTableName(), sql);
                CsvUtils.writeSql(sql.toString(), writer);
            }
        }
        for (i = 0; i < triggerRouters.size(); ++i) {
            triggerRouter = triggerRouters.get(i);
            IDataExtractor dataExtractor = ctx != null ? ctx.getDataExtractor() : this.getDataExtractor(node.getSymmetricVersion());
            TriggerHistory triggerHistory = new TriggerHistory(this.dbDialect.getTable(triggerRouter.getTrigger(), false), triggerRouter.getTrigger());
            triggerHistory.setTriggerHistoryId(Integer.MAX_VALUE - i);
            if (!triggerRouter.getTrigger().getSourceTableName().endsWith("node_identity")) {
                this.writeInitialLoad(node, triggerRouter, triggerHistory, writer, ctx);
                continue;
            }
            Data data = new Data(1L, null, node.getNodeId(), DataEventType.INSERT, triggerRouter.getTrigger().getSourceTableName(), null, triggerHistory, triggerRouter.getTrigger().getChannelId(), null, null);
            dataExtractor.write(writer, data, triggerRouter.getRouter().getRouterId(), ctx);
        }
        if (triggerRouters.size() == 0) {
            this.log.error("RegistrationEmpty", node);
        }
    }

    private void addPurgeCriteriaToConfigurationTables(String sourceTableName, StringBuilder sql) {
        Node me;
        if ((TableConstants.getTableName(this.dbDialect.getTablePrefix(), "node").equalsIgnoreCase(sourceTableName) || TableConstants.getTableName(this.dbDialect.getTablePrefix(), "node_security").equalsIgnoreCase(sourceTableName)) && (me = this.nodeService.findIdentity()) != null) {
            sql.append(String.format(" where created_at_node_id='%s'", me.getNodeId()));
        }
    }

    private IDataExtractor getDataExtractor(String version) {
        int[] versions;
        String beanName = "dataExtractor";
        if (version != null && (versions = Version.parseVersion(version))[0] == 1) {
            if (versions[1] <= 2) {
                beanName = beanName + "10";
            } else if (versions[1] <= 3) {
                beanName = beanName + "13";
            } else if (versions[1] <= 4 && !version.equals("1.4.1-appaji")) {
                beanName = beanName + "14";
            } else if (versions[1] <= 7) {
                beanName = beanName + "16";
            }
        }
        return (IDataExtractor)this.beanFactory.getBean(beanName);
    }

    @Override
    public void extractInitialLoadWithinBatchFor(Node node, TriggerRouter trigger, BufferedWriter writer, DataExtractorContext ctx) {
        this.writeInitialLoad(node, trigger, writer, ctx);
    }

    protected void writeInitialLoad(Node node, TriggerRouter trigger, BufferedWriter writer, DataExtractorContext ctx) {
        this.writeInitialLoad(node, trigger, this.triggerRouterService.getNewestTriggerHistoryForTrigger(trigger.getTrigger().getTriggerId()), writer, ctx);
    }

    protected void writeInitialLoad(final Node node, final TriggerRouter triggerRouter, TriggerHistory triggerHistory, final BufferedWriter writer, final DataExtractorContext ctx) {
        final boolean newExtractorCreated = ctx == null || ctx.getDataExtractor() == null;
        final IDataExtractor dataExtractor = !newExtractorCreated ? ctx.getDataExtractor() : this.getDataExtractor(node.getSymmetricVersion());
        Table tableForSql = this.dbDialect.getTable(triggerRouter.getTrigger().getSourceCatalogName(), triggerRouter.getTrigger().getSourceSchemaName(), dataExtractor.getLegacyTableName(triggerRouter.getTrigger().getSourceTableName()), true);
        final String sql = this.dbDialect.createInitalLoadSqlFor(node, triggerRouter, tableForSql);
        this.log.debug("Sql", sql);
        if (!tableForSql.getName().equals(triggerHistory.getSourceTableName())) {
            String tableName = triggerHistory.getSourceTableName();
            triggerHistory = new TriggerHistory(tableForSql, triggerRouter.getTrigger());
            triggerHistory.setSourceTableName(tableName);
        }
        final TriggerHistory triggerHistory2Use = triggerHistory;
        this.jdbcTemplate.execute((ConnectionCallback)new ConnectionCallback<Object>(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public Object doInConnection(Connection conn) throws SQLException, DataAccessException {
                try {
                    OutgoingBatch batch = ctx.getBatch();
                    Table table = DataExtractorService.this.dbDialect.getTable(triggerRouter.getTrigger(), true);
                    NodeChannel channel = batch != null ? DataExtractorService.this.configurationService.getNodeChannel(batch.getChannelId()) : new NodeChannel("reload");
                    HashSet<Node> oneNodeSet = new HashSet<Node>();
                    oneNodeSet.add(node);
                    boolean autoCommitFlag = conn.getAutoCommit();
                    PreparedStatement st = null;
                    ResultSet rs = null;
                    try {
                        DataExtractorContext ctxCopy;
                        if (DataExtractorService.this.dbDialect instanceof PostgreSqlDbDialect) {
                            conn.setAutoCommit(false);
                        }
                        st = conn.prepareStatement(sql, 1003, 1007);
                        st.setQueryTimeout(DataExtractorService.this.jdbcTemplate.getQueryTimeout());
                        st.setFetchSize(DataExtractorService.this.dbDialect.getStreamingResultsFetchSize());
                        rs = st.executeQuery();
                        DataExtractorContext dataExtractorContext = ctxCopy = ctx == null ? DataExtractorService.this.clonableContext.copy(dataExtractor) : ctx;
                        if (newExtractorCreated) {
                            dataExtractor.init(writer, ctxCopy);
                            dataExtractor.begin(batch, writer);
                        }
                        SimpleRouterContext routingContext = new SimpleRouterContext(node.getNodeId(), DataExtractorService.this.jdbcTemplate, channel);
                        int dataNotRouted = 0;
                        while (rs.next()) {
                            Data data = new Data(0L, null, rs.getString(1), DataEventType.INSERT, triggerHistory2Use.getSourceTableName(), null, triggerHistory2Use, "reload", null, null);
                            DataMetaData dataMetaData = new DataMetaData(data, table, triggerRouter, channel);
                            if (!StringUtils.isBlank((String)triggerRouter.getInitialLoadSelect()) || DataExtractorService.this.routingService.shouldDataBeRouted(routingContext, dataMetaData, oneNodeSet, true)) {
                                dataExtractor.write(writer, data, triggerRouter.getRouter().getRouterId(), ctxCopy);
                                continue;
                            }
                            ++dataNotRouted;
                        }
                        if (dataNotRouted > 0) {
                            DataExtractorService.this.log.info("RouterInitialLoadNotRouted", dataNotRouted, triggerRouter.getTrigger().getSourceTableName());
                        }
                        if (newExtractorCreated) {
                            dataExtractor.commit(batch, writer);
                        }
                        Object var15_16 = null;
                    }
                    catch (Throwable throwable) {
                        Object var15_17 = null;
                        if (DataExtractorService.this.dbDialect instanceof PostgreSqlDbDialect) {
                            conn.commit();
                            conn.setAutoCommit(autoCommitFlag);
                        }
                        JdbcUtils.closeResultSet(rs);
                        JdbcUtils.closeStatement((Statement)st);
                        throw throwable;
                    }
                    if (DataExtractorService.this.dbDialect instanceof PostgreSqlDbDialect) {
                        conn.commit();
                        conn.setAutoCommit(autoCommitFlag);
                    }
                    JdbcUtils.closeResultSet((ResultSet)rs);
                    JdbcUtils.closeStatement((Statement)st);
                    return null;
                }
                catch (SQLException e) {
                    throw new RuntimeException(e.getSQLState() + "Error during SQL: " + sql, e);
                }
                catch (Exception e) {
                    throw new RuntimeException("Error during SQL: " + sql, e);
                }
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    @Override
    public boolean extract(Node node, IOutgoingTransport targetTransport) throws IOException {
        IDataExtractor dataExtractor = this.getDataExtractor(node.getSymmetricVersion());
        if (!this.parameterService.is("start.route.job")) {
            this.routingService.routeData();
        }
        long ts = System.currentTimeMillis();
        OutgoingBatches batches = this.outgoingBatchService.getOutgoingBatches(node);
        long delta = System.currentTimeMillis() - ts;
        if (delta > 30000L) {
            this.log.warn("LongRunningOperation", "selecting batches to extract", delta);
        }
        if (batches == null || batches.getBatches() == null || batches.getBatches().size() <= 0) return false;
        ChannelMap suspendIgnoreChannelsList = targetTransport.getSuspendIgnoreChannelLists(this.configurationService);
        List<OutgoingBatch> ignoredBatches = batches.filterBatchesForChannels(suspendIgnoreChannelsList.getIgnoreChannels());
        batches.filterBatchesForChannels(suspendIgnoreChannelsList.getSuspendChannels());
        FileOutgoingTransport fileTransport = null;
        try {
            if (this.parameterService.is("stream.to.file.enabled")) {
                fileTransport = new FileOutgoingTransport(this.parameterService.getLong("stream.to.file.threshold.bytes"), "extract");
            }
            ExtractStreamHandler handler = new ExtractStreamHandler(dataExtractor, fileTransport != null ? fileTransport : targetTransport);
            this.databaseExtract(node, batches.getBatches(), handler);
            this.networkTransfer(fileTransport, targetTransport);
            for (OutgoingBatch batch : ignoredBatches) {
                batch.setStatus(OutgoingBatch.Status.IG);
            }
            this.outgoingBatchService.updateOutgoingBatches(ignoredBatches);
            Calendar now = Calendar.getInstance();
            for (NodeChannel nodeChannel : batches.getActiveChannels()) {
                nodeChannel.setLastExtractedTime(now.getTime());
                this.configurationService.saveNodeChannelControl(nodeChannel, false);
            }
            Object var17_14 = null;
            if (fileTransport == null) return true;
        }
        catch (Throwable throwable) {
            Object var17_15 = null;
            if (fileTransport == null) throw throwable;
            fileTransport.close();
            throw throwable;
        }
        fileTransport.close();
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void networkTransfer(FileOutgoingTransport fileTransport, IOutgoingTransport targetTransport) throws IOException {
        if (fileTransport != null) {
            fileTransport.close();
            Reader reader = null;
            try {
                reader = fileTransport.getReader();
                IOUtils.copy((Reader)reader, (Writer)targetTransport.open());
                Object var5_4 = null;
            }
            catch (Throwable throwable) {
                Object var5_5 = null;
                IOUtils.closeQuietly((Reader)reader);
                fileTransport.delete();
                throw throwable;
            }
            IOUtils.closeQuietly((Reader)reader);
            fileTransport.delete();
            {
            }
        }
    }

    protected void databaseExtract(Node node, List<OutgoingBatch> batches, IExtractListener handler) throws IOException {
        OutgoingBatch currentBatch = null;
        try {
            try {
                boolean initialized = false;
                for (OutgoingBatch batch : batches) {
                    batch.resetStats();
                    currentBatch = batch;
                    long ts = System.currentTimeMillis();
                    if (!initialized) {
                        handler.init();
                        initialized = true;
                    }
                    handler.startBatch(batch);
                    this.selectEventDataToExtract(handler, batch);
                    handler.endBatch(batch);
                    batch.setExtractMillis(System.currentTimeMillis() - ts);
                    batch.setSentCount(batch.getSentCount() + 1L);
                    batch.setStatus(OutgoingBatch.Status.SE);
                    this.outgoingBatchService.updateOutgoingBatch(batch);
                }
                Object var11_11 = null;
            }
            catch (RuntimeException e) {
                SQLException se = this.unwrapSqlException(e);
                if (currentBatch != null) {
                    if (se != null) {
                        currentBatch.setSqlState(se.getSQLState());
                        currentBatch.setSqlCode(se.getErrorCode());
                        currentBatch.setSqlMessage(se.getMessage());
                    } else {
                        currentBatch.setSqlMessage(e.getMessage());
                    }
                    currentBatch.setStatus(OutgoingBatch.Status.ER);
                    this.outgoingBatchService.updateOutgoingBatch(currentBatch);
                } else {
                    this.log.error("BatchStatusLoggingFailed", e);
                }
                throw e;
            }
        }
        catch (Throwable throwable) {
            Object var11_12 = null;
            handler.done();
            throw throwable;
        }
        handler.done();
    }

    @Override
    public boolean extractBatchRange(IOutgoingTransport transport, String startBatchId, String endBatchId) throws IOException {
        IDataExtractor dataExtractor = this.getDataExtractor(null);
        ExtractStreamHandler handler = new ExtractStreamHandler(dataExtractor, transport);
        return this.extractBatchRange(handler, startBatchId, endBatchId);
    }

    private boolean areNumeric(String ... data) {
        if (data != null) {
            for (String string : data) {
                try {
                    Long.parseLong(string);
                }
                catch (NumberFormatException e) {
                    return false;
                }
            }
        }
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean extractBatchRange(IExtractListener handler, String startBatchId, String endBatchId) throws IOException {
        OutgoingBatches batches;
        if (this.areNumeric(startBatchId, endBatchId) && (batches = this.outgoingBatchService.getOutgoingBatchRange(startBatchId, endBatchId)) != null && batches.getBatches() != null && batches.getBatches().size() > 0) {
            try {
                handler.init();
                for (OutgoingBatch batch : batches.getBatches()) {
                    handler.startBatch(batch);
                    this.selectEventDataToExtract(handler, batch);
                    handler.endBatch(batch);
                }
                Object var8_7 = null;
            }
            catch (Throwable throwable) {
                Object var8_8 = null;
                handler.done();
                throw throwable;
            }
            handler.done();
            return true;
        }
        return false;
    }

    private void selectEventDataToExtract(final IExtractListener handler, final OutgoingBatch batch) {
        this.jdbcTemplate.execute((ConnectionCallback)new ConnectionCallback<Object>(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public Object doInConnection(Connection conn) throws SQLException, DataAccessException {
                ResultSet rs = null;
                PreparedStatement ps = null;
                boolean autoCommitFlag = conn.getAutoCommit();
                try {
                    if (DataExtractorService.this.dbDialect instanceof PostgreSqlDbDialect) {
                        conn.setAutoCommit(false);
                    }
                    String sql = DataExtractorService.this.getSql("selectEventDataToExtractSql");
                    sql = DataExtractorService.this.dbDialect.massageDataExtractionSql(sql, batch.getChannelId());
                    ps = conn.prepareStatement(sql, 1003, 1007);
                    ps.setQueryTimeout(DataExtractorService.this.jdbcTemplate.getQueryTimeout());
                    ps.setFetchSize(DataExtractorService.this.dbDialect.getStreamingResultsFetchSize());
                    ps.setString(1, batch.getNodeId());
                    ps.setLong(2, batch.getBatchId());
                    long ts = System.currentTimeMillis();
                    rs = ps.executeQuery();
                    long delta = System.currentTimeMillis() - ts;
                    if (delta > 30000L) {
                        DataExtractorService.this.log.warn("LongRunningOperation", "selecting data to extract", delta);
                    }
                    while (rs.next()) {
                        try {
                            handler.dataExtracted(DataExtractorService.this.dataService.readData(rs), rs.getString(13));
                        }
                        catch (RuntimeException e) {
                            throw e;
                        }
                        catch (Exception e) {
                            throw new RuntimeException(e);
                        }
                    }
                    Object var12_10 = null;
                }
                catch (Throwable throwable) {
                    Object var12_11 = null;
                    if (DataExtractorService.this.dbDialect instanceof PostgreSqlDbDialect) {
                        conn.commit();
                        conn.setAutoCommit(autoCommitFlag);
                    }
                    JdbcUtils.closeResultSet(rs);
                    JdbcUtils.closeStatement(ps);
                    throw throwable;
                }
                if (DataExtractorService.this.dbDialect instanceof PostgreSqlDbDialect) {
                    conn.commit();
                    conn.setAutoCommit(autoCommitFlag);
                }
                JdbcUtils.closeResultSet((ResultSet)rs);
                JdbcUtils.closeStatement((Statement)ps);
                return null;
            }
        });
    }

    public void setOutgoingBatchService(IOutgoingBatchService batchBuilderService) {
        this.outgoingBatchService = batchBuilderService;
    }

    public void setContext(DataExtractorContext context) {
        this.clonableContext = context;
    }

    @Override
    public void setDbDialect(IDbDialect dialect) {
        this.dbDialect = dialect;
    }

    public void setConfigurationService(IConfigurationService configurationService) {
        this.configurationService = configurationService;
    }

    public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
        this.beanFactory = beanFactory;
    }

    @Override
    public void addExtractorFilter(IExtractorFilter extractorFilter) {
        if (this.extractorFilters == null) {
            this.extractorFilters = new ArrayList<IExtractorFilter>();
        }
        this.extractorFilters.add(extractorFilter);
    }

    public void setExtractorFilters(List<IExtractorFilter> extractorFilters) {
        this.extractorFilters = extractorFilters;
    }

    public void setAcknowledgeService(IAcknowledgeService acknowledgeService) {
        this.acknowledgeService = acknowledgeService;
    }

    public void setNodeService(INodeService nodeService) {
        this.nodeService = nodeService;
    }

    public void setRoutingService(IRouterService routingService) {
        this.routingService = routingService;
    }

    public void setDataService(IDataService dataService) {
        this.dataService = dataService;
    }

    public void setTriggerRouterService(ITriggerRouterService triggerService) {
        this.triggerRouterService = triggerService;
    }

    class ExtractStreamHandler
    implements IExtractListener {
        IOutgoingTransport transport;
        IDataExtractor dataExtractor;
        DataExtractorContext context;
        BufferedWriter writer;

        ExtractStreamHandler(IDataExtractor dataExtractor, IOutgoingTransport transport) throws IOException {
            this.transport = transport;
            this.dataExtractor = dataExtractor;
        }

        public void dataExtracted(Data data, String routerId) throws IOException {
            if (DataExtractorService.this.extractorFilters != null) {
                for (IExtractorFilter filter : DataExtractorService.this.extractorFilters) {
                    if (filter.filterData(data, routerId, this.context)) continue;
                    return;
                }
            }
            this.dataExtractor.write(this.writer, data, routerId, this.context);
        }

        public void done() throws IOException {
        }

        public void endBatch(OutgoingBatch batch) throws IOException {
            this.dataExtractor.commit(batch, this.writer);
        }

        public void init() throws IOException {
            this.writer = this.transport.open();
            this.context = DataExtractorService.this.clonableContext.copy(this.dataExtractor);
            this.dataExtractor.init(this.writer, this.context);
        }

        public void startBatch(OutgoingBatch batch) throws IOException {
            this.context.setBatch(batch);
            this.dataExtractor.begin(batch, this.writer);
        }
    }
}

