/*
 * Decompiled with CFR 0.152.
 */
package org.dbunit.dataset.stream;

import org.dbunit.dataset.AbstractTable;
import org.dbunit.dataset.DataSetException;
import org.dbunit.dataset.ITable;
import org.dbunit.dataset.ITableIterator;
import org.dbunit.dataset.ITableMetaData;
import org.dbunit.dataset.RowOutOfBoundsException;
import org.dbunit.dataset.stream.IDataSetConsumer;
import org.dbunit.dataset.stream.IDataSetProducer;
import org.dbunit.util.concurrent.BoundedBuffer;
import org.dbunit.util.concurrent.Puttable;
import org.dbunit.util.concurrent.Takable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamingIterator
implements ITableIterator {
    private static final Logger logger = LoggerFactory.getLogger(StreamingIterator.class);
    private static final Object EOD = new Object();
    private final Takable _channel;
    private StreamingTable _activeTable;
    private Object _taken = null;
    private boolean _eod = false;
    private Exception _asyncException;

    public StreamingIterator(IDataSetProducer source) throws DataSetException {
        BoundedBuffer channel = new BoundedBuffer(30);
        this._channel = channel;
        AsynchronousConsumer consumer = new AsynchronousConsumer(source, channel, this);
        Thread thread = new Thread((Runnable)consumer, "StreamingIterator");
        thread.setDaemon(true);
        thread.start();
        try {
            this._taken = this._channel.take();
        }
        catch (InterruptedException e) {
            logger.debug("Thread '" + Thread.currentThread() + "' was interrupted");
            throw this.resolveException(e);
        }
    }

    private DataSetException resolveException(InterruptedException cause) throws DataSetException {
        String msg = "Current thread was interrupted (Thread=" + Thread.currentThread() + ")";
        if (this._asyncException != null) {
            return new DataSetException(msg, this._asyncException);
        }
        return new DataSetException(msg, cause);
    }

    public boolean next() throws DataSetException {
        logger.debug("next() - start");
        if (this._eod) {
            return false;
        }
        while (this._activeTable != null && this._activeTable.next()) {
        }
        if (this._taken == EOD) {
            this._eod = true;
            this._activeTable = null;
            logger.debug("End of iterator.");
            return false;
        }
        if (this._taken instanceof ITableMetaData) {
            this._activeTable = new StreamingTable((ITableMetaData)this._taken);
            return true;
        }
        throw new IllegalStateException("Unexpected object taken from asyncronous handler: " + this._taken);
    }

    public ITableMetaData getTableMetaData() throws DataSetException {
        logger.debug("getTableMetaData() - start");
        return this._activeTable.getTableMetaData();
    }

    public ITable getTable() throws DataSetException {
        logger.debug("getTable() - start");
        return this._activeTable;
    }

    private void handleException(Exception e) {
        this._asyncException = e;
    }

    private static class AsynchronousConsumer
    implements Runnable,
    IDataSetConsumer {
        private static final Logger logger = LoggerFactory.getLogger(AsynchronousConsumer.class);
        private final IDataSetProducer _producer;
        private final Puttable _channel;
        private final StreamingIterator _exceptionHandler;
        private final Thread _invokerThread;

        public AsynchronousConsumer(IDataSetProducer source, Puttable channel, StreamingIterator exceptionHandler) {
            this._producer = source;
            this._channel = channel;
            this._exceptionHandler = exceptionHandler;
            this._invokerThread = Thread.currentThread();
        }

        public void run() {
            logger.debug("run() - start");
            try {
                this._producer.setConsumer(this);
                this._producer.produce();
            }
            catch (Exception e) {
                this._exceptionHandler.handleException(e);
                this._invokerThread.interrupt();
            }
            logger.debug("End of thread " + Thread.currentThread());
        }

        public void startDataSet() throws DataSetException {
        }

        public void endDataSet() throws DataSetException {
            logger.debug("endDataSet() - start");
            try {
                this._channel.put(EOD);
            }
            catch (InterruptedException e) {
                throw new DataSetException("Operation was interrupted");
            }
        }

        public void startTable(ITableMetaData metaData) throws DataSetException {
            logger.debug("startTable(metaData={}) - start", (Object)metaData);
            try {
                this._channel.put(metaData);
            }
            catch (InterruptedException e) {
                throw new DataSetException("Operation was interrupted");
            }
        }

        public void endTable() throws DataSetException {
        }

        public void row(Object[] values) throws DataSetException {
            logger.debug("row(values={}) - start", values);
            try {
                this._channel.put(values);
            }
            catch (InterruptedException e) {
                throw new DataSetException("Operation was interrupted");
            }
        }
    }

    private class StreamingTable
    extends AbstractTable {
        private final Logger logger = LoggerFactory.getLogger(StreamingTable.class);
        private ITableMetaData _metaData;
        private int _lastRow = -1;
        private boolean _eot = false;
        private Object[] _rowValues;

        public StreamingTable(ITableMetaData metaData) {
            this._metaData = metaData;
        }

        boolean next() throws DataSetException {
            this.logger.debug("next() - start");
            if (this._eot) {
                return false;
            }
            try {
                StreamingIterator.this._taken = StreamingIterator.this._channel.take();
                if (!(StreamingIterator.this._taken instanceof Object[])) {
                    this._eot = true;
                    return false;
                }
                ++this._lastRow;
                this._rowValues = (Object[])StreamingIterator.this._taken;
                return true;
            }
            catch (InterruptedException e) {
                throw StreamingIterator.this.resolveException(e);
            }
        }

        public ITableMetaData getTableMetaData() {
            this.logger.debug("getTableMetaData() - start");
            return this._metaData;
        }

        public int getRowCount() {
            this.logger.debug("getRowCount() - start");
            throw new UnsupportedOperationException();
        }

        public Object getValue(int row, String columnName) throws DataSetException {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("getValue(row={}, columnName={}) - start", (Object)Integer.toString(row), (Object)columnName);
            }
            while (!this._eot && row > this._lastRow) {
                this.next();
            }
            if (row < this._lastRow) {
                throw new UnsupportedOperationException("Cannot go backward!");
            }
            if (this._eot || row > this._lastRow) {
                throw new RowOutOfBoundsException(row + " > " + this._lastRow);
            }
            return this._rowValues[this.getColumnIndex(columnName)];
        }
    }
}

