/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.replication.regionserver;

import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Service;
import edu.umd.cs.findbugs.annotations.SuppressWarnings;
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationThrottler;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationWALReaderManager;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALKey;

@InterfaceAudience.Private
public class ReplicationSource
extends Thread
implements ReplicationSourceInterface {
    private static final Log LOG = LogFactory.getLog(ReplicationSource.class);
    private Map<String, PriorityBlockingQueue<Path>> queues = new HashMap<String, PriorityBlockingQueue<Path>>();
    private int queueSizePerGroup;
    private ReplicationQueues replicationQueues;
    private ReplicationPeers replicationPeers;
    private Configuration conf;
    private ReplicationQueueInfo replicationQueueInfo;
    private String peerId;
    private ReplicationSourceManager manager;
    private Stoppable stopper;
    private long sleepForRetries;
    private long replicationQueueSizeCapacity;
    private int replicationQueueNbCapacity;
    private FileSystem fs;
    private UUID clusterId;
    private UUID peerClusterId;
    private AtomicLong totalReplicatedEdits = new AtomicLong(0L);
    private AtomicLong totalReplicatedOperations = new AtomicLong(0L);
    private String peerClusterZnode;
    private int maxRetriesMultiplier;
    private volatile boolean sourceRunning = false;
    private MetricsSource metrics;
    private int logQueueWarnThreshold;
    private ReplicationEndpoint replicationEndpoint;
    private WALEntryFilter walEntryFilter;
    private ReplicationThrottler throttler;
    private ConcurrentHashMap<String, ReplicationSourceWorkerThread> workerThreads = new ConcurrentHashMap();

    @Override
    public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, ReplicationQueues replicationQueues, ReplicationPeers replicationPeers, Stoppable stopper, String peerClusterZnode, UUID clusterId, ReplicationEndpoint replicationEndpoint, MetricsSource metrics) throws IOException {
        this.stopper = stopper;
        this.conf = HBaseConfiguration.create((Configuration)conf);
        this.decorateConf();
        this.replicationQueueSizeCapacity = this.conf.getLong("replication.source.size.capacity", 0x4000000L);
        this.replicationQueueNbCapacity = this.conf.getInt("replication.source.nb.capacity", 25000);
        this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000L);
        this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300);
        this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32);
        long bandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0L);
        this.throttler = new ReplicationThrottler((double)bandwidth / 10.0);
        this.replicationQueues = replicationQueues;
        this.replicationPeers = replicationPeers;
        this.manager = manager;
        this.fs = fs;
        this.metrics = metrics;
        this.clusterId = clusterId;
        this.peerClusterZnode = peerClusterZnode;
        this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode);
        this.peerId = this.replicationQueueInfo.getPeerId();
        this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
        this.replicationEndpoint = replicationEndpoint;
    }

    private void decorateConf() {
        String replicationCodec = this.conf.get("hbase.replication.rpc.codec");
        if (StringUtils.isNotEmpty((String)replicationCodec)) {
            this.conf.set("hbase.client.rpc.codec", replicationCodec);
        }
    }

    @Override
    public void enqueueLog(Path log) {
        String logPrefix = DefaultWALProvider.getWALPrefixFromWALName(log.getName());
        PriorityBlockingQueue<Path> queue = this.queues.get(logPrefix);
        if (queue == null) {
            queue = new PriorityBlockingQueue<Path>(this.queueSizePerGroup, new LogsComparator());
            this.queues.put(logPrefix, queue);
            if (this.sourceRunning) {
                ReplicationSourceWorkerThread worker = new ReplicationSourceWorkerThread(logPrefix, queue, this.replicationQueueInfo, this);
                ReplicationSourceWorkerThread extant = this.workerThreads.putIfAbsent(logPrefix, worker);
                if (extant != null) {
                    LOG.debug((Object)("Someone has beat us to start a worker thread for wal group " + logPrefix));
                } else {
                    LOG.debug((Object)("Starting up worker for wal group " + logPrefix));
                    worker.startup();
                }
            }
        }
        queue.put(log);
        this.metrics.incrSizeOfLogQueue();
        int queueSize = queue.size();
        if (queueSize > this.logQueueWarnThreshold) {
            LOG.warn((Object)("WAL group " + logPrefix + " queue size: " + queueSize + " exceeds value of replication.source.log.queue.warn: " + this.logQueueWarnThreshold));
        }
    }

    @Override
    public void addHFileRefs(TableName tableName, byte[] family, List<String> files) throws ReplicationException {
        Map tableCFMap;
        String peerId = this.peerClusterZnode;
        if (peerId.contains("-")) {
            peerId = this.peerClusterZnode.split("-")[0];
        }
        if ((tableCFMap = this.replicationPeers.getPeer(peerId).getTableCFs()) != null) {
            List tableCfs = (List)tableCFMap.get(tableName);
            if (tableCFMap.containsKey(tableName) && (tableCfs == null || tableCfs.contains(Bytes.toString((byte[])family)))) {
                this.replicationQueues.addHFileRefs(peerId, files);
                this.metrics.incrSizeOfHFileRefsQueue(files.size());
            } else {
                LOG.debug((Object)("HFiles will not be replicated belonging to the table " + tableName + " family " + Bytes.toString((byte[])family) + " to peer id " + peerId));
            }
        } else {
            this.replicationQueues.addHFileRefs(peerId, files);
            this.metrics.incrSizeOfHFileRefsQueue(files.size());
        }
    }

    private void uninitialize() {
        LOG.debug((Object)("Source exiting " + this.peerId));
        this.metrics.clear();
        if (this.replicationEndpoint.state() == Service.State.STARTING || this.replicationEndpoint.state() == Service.State.RUNNING) {
            this.replicationEndpoint.stopAndWait();
        }
    }

    @Override
    public void run() {
        this.sourceRunning = true;
        try {
            Service.State state = (Service.State)this.replicationEndpoint.start().get();
            if (state != Service.State.RUNNING) {
                LOG.warn((Object)"ReplicationEndpoint was not started. Exiting");
                this.uninitialize();
                return;
            }
        }
        catch (Exception ex) {
            LOG.warn((Object)"Error starting ReplicationEndpoint, exiting", (Throwable)ex);
            throw new RuntimeException(ex);
        }
        ArrayList filters = Lists.newArrayList((Object[])new WALEntryFilter[]{new SystemTableWALEntryFilter()});
        WALEntryFilter filterFromEndpoint = this.replicationEndpoint.getWALEntryfilter();
        if (filterFromEndpoint != null) {
            filters.add(filterFromEndpoint);
        }
        this.walEntryFilter = new ChainWALEntryFilter(filters);
        int sleepMultiplier = 1;
        while (this.isSourceActive() && this.peerClusterId == null) {
            this.peerClusterId = this.replicationEndpoint.getPeerUUID();
            if (!this.isSourceActive() || this.peerClusterId != null || !this.sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) continue;
            ++sleepMultiplier;
        }
        if (this.clusterId.equals(this.peerClusterId) && !this.replicationEndpoint.canReplicateToSameCluster()) {
            this.terminate("ClusterId " + this.clusterId + " is replicating to itself: peerClusterId " + this.peerClusterId + " which is not allowed by ReplicationEndpoint:" + this.replicationEndpoint.getClass().getName(), null, false);
            this.manager.closeQueue(this);
            return;
        }
        LOG.info((Object)("Replicating " + this.clusterId + " -> " + this.peerClusterId));
        for (Map.Entry<String, PriorityBlockingQueue<Path>> entry : this.queues.entrySet()) {
            PriorityBlockingQueue<Path> queue;
            ReplicationSourceWorkerThread worker;
            String walGroupId = entry.getKey();
            ReplicationSourceWorkerThread extant = this.workerThreads.putIfAbsent(walGroupId, worker = new ReplicationSourceWorkerThread(walGroupId, queue = entry.getValue(), this.replicationQueueInfo, this));
            if (extant != null) {
                LOG.debug((Object)("Someone has beat us to start a worker thread for wal group " + walGroupId));
                continue;
            }
            LOG.debug((Object)("Starting up worker for wal group " + walGroupId));
            worker.startup();
        }
    }

    protected boolean sleepForRetries(String msg, int sleepMultiplier) {
        try {
            if (LOG.isTraceEnabled()) {
                LOG.trace((Object)(msg + ", sleeping " + this.sleepForRetries + " times " + sleepMultiplier));
            }
            Thread.sleep(this.sleepForRetries * (long)sleepMultiplier);
        }
        catch (InterruptedException e) {
            LOG.debug((Object)"Interrupted while sleeping between retries");
            Thread.currentThread().interrupt();
        }
        return sleepMultiplier < this.maxRetriesMultiplier;
    }

    protected boolean isPeerEnabled() {
        return this.replicationPeers.getStatusOfPeer(this.peerId);
    }

    @Override
    public void startup() {
        String n = Thread.currentThread().getName();
        Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler(){

            @Override
            public void uncaughtException(Thread t, Throwable e) {
                LOG.error((Object)"Unexpected exception in ReplicationSource", e);
            }
        };
        Threads.setDaemonThreadRunning((Thread)this, (String)(n + ".replicationSource," + this.peerClusterZnode), (Thread.UncaughtExceptionHandler)handler);
    }

    @Override
    public void terminate(String reason) {
        this.terminate(reason, null);
    }

    @Override
    public void terminate(String reason, Exception cause) {
        this.terminate(reason, cause, true);
    }

    public void terminate(String reason, Exception cause, boolean join) {
        if (cause == null) {
            LOG.info((Object)("Closing source " + this.peerClusterZnode + " because: " + reason));
        } else {
            LOG.error((Object)("Closing source " + this.peerClusterZnode + " because an error occurred: " + reason), (Throwable)cause);
        }
        this.sourceRunning = false;
        Collection<ReplicationSourceWorkerThread> workers = this.workerThreads.values();
        for (ReplicationSourceWorkerThread worker : workers) {
            worker.setWorkerRunning(false);
            worker.interrupt();
        }
        ListenableFuture future = null;
        if (this.replicationEndpoint != null) {
            future = this.replicationEndpoint.stop();
        }
        if (join) {
            for (ReplicationSourceWorkerThread worker : workers) {
                Threads.shutdown((Thread)worker, (long)this.sleepForRetries);
                LOG.info((Object)("ReplicationSourceWorker " + worker.getName() + " terminated"));
            }
            if (future != null) {
                try {
                    future.get(this.sleepForRetries * (long)this.maxRetriesMultiplier, TimeUnit.MILLISECONDS);
                }
                catch (Exception e) {
                    LOG.warn((Object)("Got exception while waiting for endpoint to shutdown for replication source :" + this.peerClusterZnode), (Throwable)e);
                }
            }
        }
    }

    @Override
    public String getPeerClusterZnode() {
        return this.peerClusterZnode;
    }

    @Override
    public String getPeerClusterId() {
        return this.peerId;
    }

    @Override
    public Path getCurrentPath() {
        for (ReplicationSourceWorkerThread worker : this.workerThreads.values()) {
            if (worker.getCurrentPath() == null) continue;
            return worker.getCurrentPath();
        }
        return null;
    }

    private boolean isSourceActive() {
        return !this.stopper.isStopped() && this.sourceRunning;
    }

    @Override
    public String getStats() {
        StringBuilder sb = new StringBuilder();
        sb.append("Total replicated edits: ").append(this.totalReplicatedEdits).append(", current progress: \n");
        for (Map.Entry<String, ReplicationSourceWorkerThread> entry : this.workerThreads.entrySet()) {
            String walGroupId = entry.getKey();
            ReplicationSourceWorkerThread worker = entry.getValue();
            long position = worker.getCurrentPosition();
            Path currentPath = worker.getCurrentPath();
            sb.append("walGroup [").append(walGroupId).append("]: ");
            if (currentPath != null) {
                sb.append("currently replicating from: ").append(currentPath).append(" at position: ").append(position).append("\n");
                continue;
            }
            sb.append("no replication ongoing, waiting for new log");
        }
        return sb.toString();
    }

    public MetricsSource getSourceMetrics() {
        return this.metrics;
    }

    public class ReplicationSourceWorkerThread
    extends Thread {
        private ReplicationSource source;
        private String walGroupId;
        private PriorityBlockingQueue<Path> queue;
        private ReplicationQueueInfo replicationQueueInfo;
        private WAL.Reader reader;
        private long lastLoggedPosition = -1L;
        private volatile Path currentPath;
        private ReplicationWALReaderManager repLogReader;
        private int currentNbOperations = 0;
        private int currentSize = 0;
        private boolean workerRunning = true;
        private long currentNbHFiles = 0L;

        public ReplicationSourceWorkerThread(String walGroupId, PriorityBlockingQueue<Path> queue, ReplicationQueueInfo replicationQueueInfo, ReplicationSource source) {
            this.walGroupId = walGroupId;
            this.queue = queue;
            this.replicationQueueInfo = replicationQueueInfo;
            this.repLogReader = new ReplicationWALReaderManager(ReplicationSource.this.fs, ReplicationSource.this.conf);
            this.source = source;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            if (this.replicationQueueInfo.isQueueRecovered()) {
                try {
                    this.repLogReader.setPosition(ReplicationSource.this.replicationQueues.getLogPosition(ReplicationSource.this.peerClusterZnode, this.queue.peek().getName()));
                    if (LOG.isTraceEnabled()) {
                        LOG.trace((Object)("Recovered queue started with log " + this.queue.peek() + " at position " + this.repLogReader.getPosition()));
                    }
                }
                catch (ReplicationException e) {
                    this.terminate("Couldn't get the position of this recovered queue " + ReplicationSource.this.peerClusterZnode, (Exception)((Object)e));
                }
            }
            while (this.isWorkerActive()) {
                ArrayList<WAL.Entry> entries;
                boolean gotIOE;
                boolean currentWALisBeingWrittenTo;
                int sleepMultiplier;
                block41: {
                    sleepMultiplier = 1;
                    if (!ReplicationSource.this.isPeerEnabled()) {
                        if (!ReplicationSource.this.sleepForRetries("Replication is disabled", sleepMultiplier)) continue;
                        ++sleepMultiplier;
                        continue;
                    }
                    Path oldPath = this.getCurrentPath();
                    boolean hasCurrentPath = this.getNextPath();
                    if (this.getCurrentPath() != null && oldPath == null) {
                        sleepMultiplier = 1;
                    }
                    if (!hasCurrentPath) {
                        if (!ReplicationSource.this.sleepForRetries("No log to process", sleepMultiplier)) continue;
                        ++sleepMultiplier;
                        continue;
                    }
                    currentWALisBeingWrittenTo = false;
                    if (!this.replicationQueueInfo.isQueueRecovered() && this.queue.size() == 0) {
                        currentWALisBeingWrittenTo = true;
                    }
                    if (!this.openReader(sleepMultiplier)) {
                        sleepMultiplier = 1;
                        continue;
                    }
                    if (this.reader == null) {
                        if (!ReplicationSource.this.sleepForRetries("Unable to open a reader", sleepMultiplier)) continue;
                        ++sleepMultiplier;
                        continue;
                    }
                    gotIOE = false;
                    this.currentNbOperations = 0;
                    this.currentNbHFiles = 0L;
                    entries = new ArrayList<WAL.Entry>(1);
                    this.currentSize = 0;
                    try {
                        if (this.readAllEntriesToReplicateOrNextFile(currentWALisBeingWrittenTo, entries)) {
                            continue;
                        }
                    }
                    catch (IOException ioe) {
                        LOG.warn((Object)(ReplicationSource.this.peerClusterZnode + " Got: "), (Throwable)ioe);
                        gotIOE = true;
                        if (!(ioe.getCause() instanceof EOFException)) break block41;
                        boolean considerDumping = false;
                        if (this.replicationQueueInfo.isQueueRecovered()) {
                            try {
                                FileStatus stat = ReplicationSource.this.fs.getFileStatus(this.currentPath);
                                if (stat.getLen() == 0L) {
                                    LOG.warn((Object)(ReplicationSource.this.peerClusterZnode + " Got EOF and the file was empty"));
                                }
                                considerDumping = true;
                            }
                            catch (IOException e) {
                                LOG.warn((Object)(ReplicationSource.this.peerClusterZnode + " Got while getting file size: "), (Throwable)e);
                            }
                        }
                        if (considerDumping && sleepMultiplier == ReplicationSource.this.maxRetriesMultiplier && this.processEndOfFile()) {
                            continue;
                        }
                    }
                    finally {
                        try {
                            this.reader = null;
                            this.repLogReader.closeReader();
                        }
                        catch (IOException e) {
                            gotIOE = true;
                            LOG.warn((Object)"Unable to finalize the tailing of a file", (Throwable)e);
                        }
                        continue;
                    }
                }
                if (this.isWorkerActive() && (gotIOE || entries.isEmpty())) {
                    if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
                        ReplicationSource.this.manager.logPositionAndCleanOldLogs(this.currentPath, ReplicationSource.this.peerClusterZnode, this.repLogReader.getPosition(), this.replicationQueueInfo.isQueueRecovered(), currentWALisBeingWrittenTo);
                        this.lastLoggedPosition = this.repLogReader.getPosition();
                    }
                    if (!gotIOE) {
                        sleepMultiplier = 1;
                        ReplicationSource.this.metrics.setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(), this.walGroupId);
                    }
                    if (!ReplicationSource.this.sleepForRetries("Nothing to replicate", sleepMultiplier)) continue;
                    ++sleepMultiplier;
                    continue;
                }
                sleepMultiplier = 1;
                this.shipEdits(currentWALisBeingWrittenTo, entries);
            }
            if (this.replicationQueueInfo.isQueueRecovered()) {
                ConcurrentHashMap concurrentHashMap = ReplicationSource.this.workerThreads;
                synchronized (concurrentHashMap) {
                    Threads.sleep((long)100L);
                    boolean allOtherTaskDone = true;
                    for (ReplicationSourceWorkerThread worker : ReplicationSource.this.workerThreads.values()) {
                        if (worker.equals(this) || !worker.isAlive()) continue;
                        allOtherTaskDone = false;
                        break;
                    }
                    if (allOtherTaskDone) {
                        ReplicationSource.this.manager.closeRecoveredQueue(this.source);
                        LOG.info((Object)("Finished recovering queue " + ReplicationSource.this.peerClusterZnode + " with the following stats: " + ReplicationSource.this.getStats()));
                    }
                }
            }
        }

        protected boolean readAllEntriesToReplicateOrNextFile(boolean currentWALisBeingWrittenTo, List<WAL.Entry> entries) throws IOException {
            long seenEntries = 0L;
            if (LOG.isTraceEnabled()) {
                LOG.trace((Object)("Seeking in " + this.currentPath + " at position " + this.repLogReader.getPosition()));
            }
            this.repLogReader.seek();
            long positionBeforeRead = this.repLogReader.getPosition();
            WAL.Entry entry = this.repLogReader.readNextAndSetPosition();
            while (entry != null) {
                ReplicationSource.this.metrics.incrLogEditsRead();
                ++seenEntries;
                if (ReplicationSource.this.replicationEndpoint.canReplicateToSameCluster() || !entry.getKey().getClusterIds().contains(ReplicationSource.this.peerClusterId)) {
                    entry = ReplicationSource.this.walEntryFilter.filter(entry);
                    WALEdit edit = null;
                    WALKey logKey = null;
                    if (entry != null) {
                        edit = entry.getEdit();
                        logKey = entry.getKey();
                    }
                    if (edit != null && edit.size() != 0) {
                        logKey.addClusterId(ReplicationSource.this.clusterId);
                        this.currentNbOperations += this.countDistinctRowKeys(edit);
                        entries.add(entry);
                        this.currentSize = (int)((long)this.currentSize + entry.getEdit().heapSize());
                        this.currentSize += this.calculateTotalSizeOfStoreFiles(edit);
                    } else {
                        ReplicationSource.this.metrics.incrLogEditsFiltered();
                    }
                }
                if ((long)this.currentSize >= ReplicationSource.this.replicationQueueSizeCapacity || entries.size() >= ReplicationSource.this.replicationQueueNbCapacity) break;
                try {
                    entry = this.repLogReader.readNextAndSetPosition();
                }
                catch (IOException ie) {
                    LOG.debug((Object)("Break on IOE: " + ie.getMessage()));
                    break;
                }
            }
            ReplicationSource.this.metrics.incrLogReadInBytes(this.repLogReader.getPosition() - positionBeforeRead);
            if (currentWALisBeingWrittenTo) {
                return false;
            }
            return seenEntries == 0L && this.processEndOfFile();
        }

        private int calculateTotalSizeOfStoreFiles(WALEdit edit) {
            ArrayList<Cell> cells = edit.getCells();
            int totalStoreFilesSize = 0;
            int totalCells = edit.size();
            for (int i = 0; i < totalCells; ++i) {
                if (!CellUtil.matchingQualifier((Cell)((Cell)cells.get(i)), (byte[])WALEdit.BULK_LOAD)) continue;
                try {
                    WALProtos.BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor((Cell)cells.get(i));
                    List stores = bld.getStoresList();
                    int totalStores = stores.size();
                    for (int j = 0; j < totalStores; ++j) {
                        totalStoreFilesSize = (int)((long)totalStoreFilesSize + ((WALProtos.StoreDescriptor)stores.get(j)).getStoreFileSizeBytes());
                    }
                    continue;
                }
                catch (IOException e) {
                    LOG.error((Object)"Failed to deserialize bulk load entry from wal edit. Size of HFiles part of cell will not be considered in replication request size calculation.", (Throwable)e);
                }
            }
            return totalStoreFilesSize;
        }

        private void cleanUpHFileRefs(WALEdit edit) throws IOException {
            String peerId = ReplicationSource.this.peerClusterZnode;
            if (peerId.contains("-")) {
                peerId = ReplicationSource.this.peerClusterZnode.split("-")[0];
            }
            ArrayList<Cell> cells = edit.getCells();
            int totalCells = cells.size();
            for (int i = 0; i < totalCells; ++i) {
                Cell cell = (Cell)cells.get(i);
                if (!CellUtil.matchingQualifier((Cell)cell, (byte[])WALEdit.BULK_LOAD)) continue;
                WALProtos.BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
                List stores = bld.getStoresList();
                int totalStores = stores.size();
                for (int j = 0; j < totalStores; ++j) {
                    List storeFileList = ((WALProtos.StoreDescriptor)stores.get(j)).getStoreFileList();
                    ReplicationSource.this.manager.cleanUpHFileRefs(peerId, storeFileList);
                    ReplicationSource.this.metrics.decrSizeOfHFileRefsQueue(storeFileList.size());
                }
            }
        }

        protected boolean getNextPath() {
            try {
                if (this.currentPath == null) {
                    this.currentPath = this.queue.poll(ReplicationSource.this.sleepForRetries, TimeUnit.MILLISECONDS);
                    ReplicationSource.this.metrics.decrSizeOfLogQueue();
                    if (this.currentPath != null) {
                        ReplicationSource.this.manager.cleanOldLogs(this.currentPath.getName(), ReplicationSource.this.peerClusterZnode, this.replicationQueueInfo.isQueueRecovered());
                        if (LOG.isTraceEnabled()) {
                            LOG.trace((Object)("New log: " + this.currentPath));
                        }
                    }
                }
            }
            catch (InterruptedException e) {
                LOG.warn((Object)"Interrupted while reading edits", (Throwable)e);
            }
            return this.currentPath != null;
        }

        protected boolean openReader(int sleepMultiplier) {
            block15: {
                try {
                    try {
                        if (LOG.isTraceEnabled()) {
                            LOG.trace((Object)("Opening log " + this.currentPath));
                        }
                        this.reader = this.repLogReader.openReader(this.currentPath);
                    }
                    catch (FileNotFoundException fnfe) {
                        if (this.replicationQueueInfo.isQueueRecovered()) {
                            List deadRegionServers = this.replicationQueueInfo.getDeadRegionServers();
                            LOG.info((Object)("NB dead servers : " + deadRegionServers.size()));
                            Path rootDir = FSUtils.getRootDir(ReplicationSource.this.conf);
                            for (String curDeadServerName : deadRegionServers) {
                                Path[] locs;
                                Path deadRsDirectory = new Path(rootDir, DefaultWALProvider.getWALDirectoryName(curDeadServerName));
                                for (Path possibleLogLocation : locs = new Path[]{new Path(deadRsDirectory, this.currentPath.getName()), new Path(deadRsDirectory.suffix("-splitting"), this.currentPath.getName())}) {
                                    LOG.info((Object)("Possible location " + possibleLogLocation.toUri().toString()));
                                    if (!ReplicationSource.this.manager.getFs().exists(possibleLogLocation)) continue;
                                    LOG.info((Object)("Log " + this.currentPath + " still exists at " + possibleLogLocation));
                                    return true;
                                }
                            }
                            if (ReplicationSource.this.stopper instanceof ReplicationSyncUp.DummyServer) {
                                FileStatus[] rss;
                                for (FileStatus rs : rss = ReplicationSource.this.fs.listStatus(ReplicationSource.this.manager.getLogDir())) {
                                    FileStatus[] logs;
                                    Path p = rs.getPath();
                                    for (FileStatus log : logs = ReplicationSource.this.fs.listStatus(p)) {
                                        if (!(p = new Path(p, log.getPath().getName())).getName().equals(this.currentPath.getName())) continue;
                                        this.currentPath = p;
                                        LOG.info((Object)("Log " + this.currentPath.getName() + " found at " + this.currentPath));
                                        this.openReader(sleepMultiplier);
                                        return true;
                                    }
                                }
                            }
                            throw new IOException("File from recovered queue is nowhere to be found", fnfe);
                        }
                        Path archivedLogLocation = new Path(ReplicationSource.this.manager.getOldLogDir(), this.currentPath.getName());
                        if (ReplicationSource.this.manager.getFs().exists(archivedLogLocation)) {
                            this.currentPath = archivedLogLocation;
                            LOG.info((Object)("Log " + this.currentPath + " was moved to " + archivedLogLocation));
                            this.openReader(sleepMultiplier);
                        }
                    }
                }
                catch (LeaseNotRecoveredException lnre) {
                    LOG.warn((Object)(ReplicationSource.this.peerClusterZnode + " Try to recover the WAL lease " + this.currentPath), (Throwable)((Object)lnre));
                    this.recoverLease(ReplicationSource.this.conf, this.currentPath);
                    this.reader = null;
                }
                catch (IOException ioe) {
                    if (ioe instanceof EOFException && this.isCurrentLogEmpty()) {
                        return true;
                    }
                    LOG.warn((Object)(ReplicationSource.this.peerClusterZnode + " Got: "), (Throwable)ioe);
                    this.reader = null;
                    if (ioe.getCause() instanceof NullPointerException) {
                        LOG.warn((Object)"Got NPE opening reader, will retry.");
                    }
                    if (sleepMultiplier < ReplicationSource.this.maxRetriesMultiplier) break block15;
                    LOG.warn((Object)"Waited too long for this file, considering dumping");
                    return !this.processEndOfFile();
                }
            }
            return true;
        }

        private void recoverLease(Configuration conf, final Path path) {
            try {
                FileSystem dfs = FSUtils.getCurrentFileSystem(conf);
                FSUtils fsUtils = FSUtils.getInstance(dfs, conf);
                fsUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable(){

                    @Override
                    public boolean progress() {
                        LOG.debug((Object)("recover WAL lease: " + path));
                        return ReplicationSourceWorkerThread.this.isWorkerActive();
                    }
                });
            }
            catch (IOException e) {
                LOG.warn((Object)("unable to recover lease for WAL: " + path), (Throwable)e);
            }
        }

        private boolean isCurrentLogEmpty() {
            return this.repLogReader.getPosition() == 0L && !this.replicationQueueInfo.isQueueRecovered() && this.queue.size() == 0;
        }

        private int countDistinctRowKeys(WALEdit edit) {
            ArrayList<Cell> cells = edit.getCells();
            int distinctRowKeys = 1;
            int totalHFileEntries = 0;
            Cell lastCell = (Cell)cells.get(0);
            int totalCells = edit.size();
            for (int i = 0; i < totalCells; ++i) {
                if (CellUtil.matchingQualifier((Cell)((Cell)cells.get(i)), (byte[])WALEdit.BULK_LOAD)) {
                    try {
                        WALProtos.BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor((Cell)cells.get(i));
                        List stores = bld.getStoresList();
                        int totalStores = stores.size();
                        for (int j = 0; j < totalStores; ++j) {
                            totalHFileEntries += ((WALProtos.StoreDescriptor)stores.get(j)).getStoreFileList().size();
                        }
                    }
                    catch (IOException e) {
                        LOG.error((Object)"Failed to deserialize bulk load entry from wal edit. Then its hfiles count will not be added into metric.");
                    }
                }
                if (!CellUtil.matchingRow((Cell)((Cell)cells.get(i)), (Cell)lastCell)) {
                    ++distinctRowKeys;
                }
                lastCell = (Cell)cells.get(i);
            }
            this.currentNbHFiles += (long)totalHFileEntries;
            return distinctRowKeys + totalHFileEntries;
        }

        protected void shipEdits(boolean currentWALisBeingWrittenTo, List<WAL.Entry> entries) {
            int sleepMultiplier = 0;
            if (entries.isEmpty()) {
                LOG.warn((Object)"Was given 0 edits to ship");
                return;
            }
            while (this.isWorkerActive()) {
                try {
                    long sleepTicks;
                    if (ReplicationSource.this.throttler.isEnabled() && (sleepTicks = ReplicationSource.this.throttler.getNextSleepInterval(this.currentSize)) > 0L) {
                        try {
                            if (LOG.isTraceEnabled()) {
                                LOG.trace((Object)("To sleep " + sleepTicks + "ms for throttling control"));
                            }
                            Thread.sleep(sleepTicks);
                        }
                        catch (InterruptedException e) {
                            LOG.debug((Object)"Interrupted while sleeping for throttling control");
                            Thread.currentThread().interrupt();
                            continue;
                        }
                        ReplicationSource.this.throttler.resetStartTick();
                    }
                    ReplicationEndpoint.ReplicateContext replicateContext = new ReplicationEndpoint.ReplicateContext();
                    replicateContext.setEntries(entries).setSize(this.currentSize);
                    replicateContext.setWalGroupId(this.walGroupId);
                    long startTimeNs = System.nanoTime();
                    boolean replicated = ReplicationSource.this.replicationEndpoint.replicate(replicateContext);
                    long endTimeNs = System.nanoTime();
                    if (!replicated) continue;
                    sleepMultiplier = Math.max(sleepMultiplier - 1, 0);
                    if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
                        int size = entries.size();
                        for (int i = 0; i < size; ++i) {
                            this.cleanUpHFileRefs(entries.get(i).getEdit());
                        }
                        ReplicationSource.this.manager.logPositionAndCleanOldLogs(this.currentPath, ReplicationSource.this.peerClusterZnode, this.repLogReader.getPosition(), this.replicationQueueInfo.isQueueRecovered(), currentWALisBeingWrittenTo);
                        this.lastLoggedPosition = this.repLogReader.getPosition();
                    }
                    if (ReplicationSource.this.throttler.isEnabled()) {
                        ReplicationSource.this.throttler.addPushSize(this.currentSize);
                    }
                    ReplicationSource.this.totalReplicatedEdits.addAndGet(entries.size());
                    ReplicationSource.this.totalReplicatedOperations.addAndGet(this.currentNbOperations);
                    ReplicationSource.this.metrics.shipBatch(this.currentNbOperations, this.currentSize, this.currentNbHFiles);
                    ReplicationSource.this.metrics.setAgeOfLastShippedOp(entries.get(entries.size() - 1).getKey().getWriteTime(), this.walGroupId);
                    if (!LOG.isTraceEnabled()) break;
                    LOG.trace((Object)("Replicated " + ReplicationSource.this.totalReplicatedEdits + " entries in total, or " + ReplicationSource.this.totalReplicatedOperations + " operations in " + (endTimeNs - startTimeNs) / 1000000L + " ms"));
                    break;
                }
                catch (Exception ex) {
                    LOG.warn((Object)(ReplicationSource.this.replicationEndpoint.getClass().getName() + " threw unknown exception:" + org.apache.hadoop.util.StringUtils.stringifyException((Throwable)ex)));
                    if (!ReplicationSource.this.sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) continue;
                    ++sleepMultiplier;
                }
            }
        }

        @SuppressWarnings(value={"DE_MIGHT_IGNORE"}, justification="Yeah, this is how it works")
        protected boolean processEndOfFile() {
            if (this.queue.size() != 0) {
                long trailerSize = this.repLogReader.currentTrailerSize();
                long currentPosition = this.repLogReader.getPosition();
                FileStatus stat = null;
                try {
                    stat = ReplicationSource.this.fs.getFileStatus(this.currentPath);
                }
                catch (IOException exception) {
                    LOG.warn((Object)("Couldn't get file length information about log " + this.currentPath + ", it " + (trailerSize < 0L ? "was not" : "was") + " closed cleanly" + ", stats: " + ReplicationSource.this.getStats()));
                    ReplicationSource.this.metrics.incrUnknownFileLengthForClosedWAL();
                }
                if (stat != null) {
                    if (trailerSize < 0L) {
                        if (currentPosition < stat.getLen()) {
                            long skippedBytes = stat.getLen() - currentPosition;
                            LOG.info((Object)("Reached the end of WAL file '" + this.currentPath + "'. It was not closed cleanly, so we did not parse " + skippedBytes + " bytes of data."));
                            ReplicationSource.this.metrics.incrUncleanlyClosedWALs();
                            ReplicationSource.this.metrics.incrBytesSkippedInUncleanlyClosedWALs(skippedBytes);
                        }
                    } else if (currentPosition + trailerSize < stat.getLen()) {
                        LOG.warn((Object)("Processing end of WAL file '" + this.currentPath + "'. At position " + currentPosition + ", which is too far away from reported file length " + stat.getLen() + ". Restarting WAL reading (see HBASE-15983 for details). stats: " + ReplicationSource.this.getStats()));
                        this.repLogReader.setPosition(0L);
                        ReplicationSource.this.metrics.incrRestartedWALReading();
                        ReplicationSource.this.metrics.incrRepeatedFileBytes(currentPosition);
                        return false;
                    }
                }
                if (LOG.isTraceEnabled()) {
                    LOG.trace((Object)("Reached the end of log " + this.currentPath + ", stats: " + ReplicationSource.this.getStats() + ", and the length of the file is " + (stat == null ? "N/A" : Long.valueOf(stat.getLen()))));
                }
                this.currentPath = null;
                this.repLogReader.finishCurrentFile();
                this.reader = null;
                ReplicationSource.this.metrics.incrCompletedWAL();
                return true;
            }
            if (this.replicationQueueInfo.isQueueRecovered()) {
                LOG.debug((Object)("Finished recovering queue for group " + this.walGroupId + " of peer " + ReplicationSource.this.peerClusterZnode));
                ReplicationSource.this.metrics.incrCompletedRecoveryQueue();
                this.workerRunning = false;
                return true;
            }
            return false;
        }

        public void startup() {
            String n = Thread.currentThread().getName();
            Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler(){

                @Override
                public void uncaughtException(Thread t, Throwable e) {
                    RSRpcServices.exitIfOOME(e);
                    LOG.error((Object)("Unexpected exception in ReplicationSourceWorkerThread, currentPath=" + ReplicationSourceWorkerThread.this.getCurrentPath()), e);
                    ReplicationSource.this.stopper.stop("Unexpected exception in ReplicationSourceWorkerThread");
                }
            };
            Threads.setDaemonThreadRunning((Thread)this, (String)(n + ".replicationSource." + this.walGroupId + "," + ReplicationSource.this.peerClusterZnode), (Thread.UncaughtExceptionHandler)handler);
            ReplicationSource.this.workerThreads.put(this.walGroupId, this);
        }

        public Path getCurrentPath() {
            return this.currentPath;
        }

        public long getCurrentPosition() {
            return this.repLogReader.getPosition();
        }

        private boolean isWorkerActive() {
            return !ReplicationSource.this.stopper.isStopped() && this.workerRunning && !this.isInterrupted();
        }

        private void terminate(String reason, Exception cause) {
            if (cause == null) {
                LOG.info((Object)("Closing worker for wal group " + this.walGroupId + " because: " + reason));
            } else {
                LOG.error((Object)("Closing worker for wal group " + this.walGroupId + " because an error occurred: " + reason), (Throwable)cause);
            }
            this.interrupt();
            Threads.shutdown((Thread)this, (long)ReplicationSource.this.sleepForRetries);
            LOG.info((Object)("ReplicationSourceWorker " + this.getName() + " terminated"));
        }

        public void setWorkerRunning(boolean workerRunning) {
            this.workerRunning = workerRunning;
        }
    }

    public static class LogsComparator
    implements Comparator<Path> {
        @Override
        public int compare(Path o1, Path o2) {
            return Long.valueOf(LogsComparator.getTS(o1)).compareTo(LogsComparator.getTS(o2));
        }

        private static long getTS(Path p) {
            int tsIndex = p.getName().lastIndexOf(46) + 1;
            return Long.parseLong(p.getName().substring(tsIndex));
        }
    }
}

