/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapreduce.task.reduce;

import com.facebook.presto.hadoop.$internal.com.google.common.annotations.VisibleForTesting;
import com.facebook.presto.hadoop.$internal.org.apache.commons.logging.Log;
import com.facebook.presto.hadoop.$internal.org.apache.commons.logging.LogFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ChecksumFileSystem;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.IFile;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapOutputFile;
import org.apache.hadoop.mapred.Merger;
import org.apache.hadoop.mapred.RawKeyValueIterator;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.Task;
import org.apache.hadoop.mapreduce.CryptoUtils;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.task.reduce.ExceptionReporter;
import org.apache.hadoop.mapreduce.task.reduce.InMemoryMapOutput;
import org.apache.hadoop.mapreduce.task.reduce.InMemoryReader;
import org.apache.hadoop.mapreduce.task.reduce.InMemoryWriter;
import org.apache.hadoop.mapreduce.task.reduce.MapOutput;
import org.apache.hadoop.mapreduce.task.reduce.MergeManager;
import org.apache.hadoop.mapreduce.task.reduce.MergeThread;
import org.apache.hadoop.mapreduce.task.reduce.OnDiskMapOutput;
import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;

@InterfaceAudience.LimitedPrivate(value={"MapReduce"})
@InterfaceStability.Unstable
public class MergeManagerImpl<K, V>
implements MergeManager<K, V> {
    private static final Log LOG = LogFactory.getLog(MergeManagerImpl.class);
    private static final float DEFAULT_SHUFFLE_MEMORY_LIMIT_PERCENT = 0.25f;
    private final TaskAttemptID reduceId;
    private final JobConf jobConf;
    private final FileSystem localFS;
    private final FileSystem rfs;
    private final LocalDirAllocator localDirAllocator;
    protected MapOutputFile mapOutputFile;
    Set<InMemoryMapOutput<K, V>> inMemoryMergedMapOutputs = new TreeSet<InMemoryMapOutput<K, V>>(new MapOutput.MapOutputComparator());
    private IntermediateMemoryToMemoryMerger memToMemMerger;
    Set<InMemoryMapOutput<K, V>> inMemoryMapOutputs = new TreeSet<InMemoryMapOutput<K, V>>(new MapOutput.MapOutputComparator());
    private final MergeThread<InMemoryMapOutput<K, V>, K, V> inMemoryMerger;
    Set<CompressAwarePath> onDiskMapOutputs = new TreeSet<CompressAwarePath>();
    private final OnDiskMerger onDiskMerger;
    @VisibleForTesting
    final long memoryLimit;
    private long usedMemory;
    private long commitMemory;
    private final long maxSingleShuffleLimit;
    private final int memToMemMergeOutputsThreshold;
    private final long mergeThreshold;
    private final int ioSortFactor;
    private final Reporter reporter;
    private final ExceptionReporter exceptionReporter;
    private final Class<? extends Reducer> combinerClass;
    private final Task.CombineOutputCollector<K, V> combineCollector;
    private final Counters.Counter spilledRecordsCounter;
    private final Counters.Counter reduceCombineInputCounter;
    private final Counters.Counter mergedMapOutputsCounter;
    private final CompressionCodec codec;
    private final Progress mergePhase;

    public MergeManagerImpl(TaskAttemptID reduceId, JobConf jobConf, FileSystem localFS, LocalDirAllocator localDirAllocator, Reporter reporter, CompressionCodec codec, Class<? extends Reducer> combinerClass, Task.CombineOutputCollector<K, V> combineCollector, Counters.Counter spilledRecordsCounter, Counters.Counter reduceCombineInputCounter, Counters.Counter mergedMapOutputsCounter, ExceptionReporter exceptionReporter, Progress mergePhase, MapOutputFile mapOutputFile) {
        this.reduceId = reduceId;
        this.jobConf = jobConf;
        this.localDirAllocator = localDirAllocator;
        this.exceptionReporter = exceptionReporter;
        this.reporter = reporter;
        this.codec = codec;
        this.combinerClass = combinerClass;
        this.combineCollector = combineCollector;
        this.reduceCombineInputCounter = reduceCombineInputCounter;
        this.spilledRecordsCounter = spilledRecordsCounter;
        this.mergedMapOutputsCounter = mergedMapOutputsCounter;
        this.mapOutputFile = mapOutputFile;
        this.mapOutputFile.setConf(jobConf);
        this.localFS = localFS;
        this.rfs = ((LocalFileSystem)localFS).getRaw();
        float maxInMemCopyUse = jobConf.getFloat("mapreduce.reduce.shuffle.input.buffer.percent", 0.7f);
        if ((double)maxInMemCopyUse > 1.0 || (double)maxInMemCopyUse < 0.0) {
            throw new IllegalArgumentException("Invalid value for mapreduce.reduce.shuffle.input.buffer.percent: " + maxInMemCopyUse);
        }
        this.memoryLimit = (long)((float)jobConf.getLong("mapreduce.reduce.memory.totalbytes", Runtime.getRuntime().maxMemory()) * maxInMemCopyUse);
        this.ioSortFactor = jobConf.getInt("mapreduce.task.io.sort.factor", 100);
        float singleShuffleMemoryLimitPercent = jobConf.getFloat("mapreduce.reduce.shuffle.memory.limit.percent", 0.25f);
        if (singleShuffleMemoryLimitPercent <= 0.0f || singleShuffleMemoryLimitPercent > 1.0f) {
            throw new IllegalArgumentException("Invalid value for mapreduce.reduce.shuffle.memory.limit.percent: " + singleShuffleMemoryLimitPercent);
        }
        this.usedMemory = 0L;
        this.commitMemory = 0L;
        this.maxSingleShuffleLimit = (long)((float)this.memoryLimit * singleShuffleMemoryLimitPercent);
        this.memToMemMergeOutputsThreshold = jobConf.getInt("mapreduce.reduce.merge.memtomem.threshold", this.ioSortFactor);
        this.mergeThreshold = (long)((float)this.memoryLimit * jobConf.getFloat("mapreduce.reduce.shuffle.merge.percent", 0.9f));
        LOG.info("MergerManager: memoryLimit=" + this.memoryLimit + ", " + "maxSingleShuffleLimit=" + this.maxSingleShuffleLimit + ", " + "mergeThreshold=" + this.mergeThreshold + ", " + "ioSortFactor=" + this.ioSortFactor + ", " + "memToMemMergeOutputsThreshold=" + this.memToMemMergeOutputsThreshold);
        if (this.maxSingleShuffleLimit >= this.mergeThreshold) {
            throw new RuntimeException("Invalid configuration: maxSingleShuffleLimit should be less than mergeThreshold maxSingleShuffleLimit: " + this.maxSingleShuffleLimit + "mergeThreshold: " + this.mergeThreshold);
        }
        boolean allowMemToMemMerge = jobConf.getBoolean("mapreduce.reduce.merge.memtomem.enabled", false);
        if (allowMemToMemMerge) {
            this.memToMemMerger = new IntermediateMemoryToMemoryMerger(this, this.memToMemMergeOutputsThreshold);
            this.memToMemMerger.start();
        } else {
            this.memToMemMerger = null;
        }
        this.inMemoryMerger = this.createInMemoryMerger();
        this.inMemoryMerger.start();
        this.onDiskMerger = new OnDiskMerger(this);
        this.onDiskMerger.start();
        this.mergePhase = mergePhase;
    }

    protected MergeThread<InMemoryMapOutput<K, V>, K, V> createInMemoryMerger() {
        return new InMemoryMerger(this);
    }

    protected MergeThread<CompressAwarePath, K, V> createOnDiskMerger() {
        return new OnDiskMerger(this);
    }

    TaskAttemptID getReduceId() {
        return this.reduceId;
    }

    @VisibleForTesting
    ExceptionReporter getExceptionReporter() {
        return this.exceptionReporter;
    }

    @Override
    public void waitForResource() throws InterruptedException {
        this.inMemoryMerger.waitForMerge();
    }

    private boolean canShuffleToMemory(long requestedSize) {
        return requestedSize < this.maxSingleShuffleLimit;
    }

    @Override
    public synchronized MapOutput<K, V> reserve(TaskAttemptID mapId, long requestedSize, int fetcher) throws IOException {
        if (!this.canShuffleToMemory(requestedSize)) {
            LOG.info(mapId + ": Shuffling to disk since " + requestedSize + " is greater than maxSingleShuffleLimit (" + this.maxSingleShuffleLimit + ")");
            return new OnDiskMapOutput(mapId, this.reduceId, this, requestedSize, this.jobConf, this.mapOutputFile, fetcher, true);
        }
        if (this.usedMemory > this.memoryLimit) {
            LOG.debug(mapId + ": Stalling shuffle since usedMemory (" + this.usedMemory + ") is greater than memoryLimit (" + this.memoryLimit + ")." + " CommitMemory is (" + this.commitMemory + ")");
            return null;
        }
        LOG.debug(mapId + ": Proceeding with shuffle since usedMemory (" + this.usedMemory + ") is lesser than memoryLimit (" + this.memoryLimit + ")." + "CommitMemory is (" + this.commitMemory + ")");
        return this.unconditionalReserve(mapId, requestedSize, true);
    }

    private synchronized InMemoryMapOutput<K, V> unconditionalReserve(TaskAttemptID mapId, long requestedSize, boolean primaryMapOutput) {
        this.usedMemory += requestedSize;
        return new InMemoryMapOutput(this.jobConf, mapId, this, (int)requestedSize, this.codec, primaryMapOutput);
    }

    synchronized void unreserve(long size) {
        this.usedMemory -= size;
    }

    public synchronized void closeInMemoryFile(InMemoryMapOutput<K, V> mapOutput) {
        this.inMemoryMapOutputs.add(mapOutput);
        LOG.info("closeInMemoryFile -> map-output of size: " + mapOutput.getSize() + ", inMemoryMapOutputs.size() -> " + this.inMemoryMapOutputs.size() + ", commitMemory -> " + this.commitMemory + ", usedMemory ->" + this.usedMemory);
        this.commitMemory += mapOutput.getSize();
        if (this.commitMemory >= this.mergeThreshold) {
            LOG.info("Starting inMemoryMerger's merge since commitMemory=" + this.commitMemory + " > mergeThreshold=" + this.mergeThreshold + ". Current usedMemory=" + this.usedMemory);
            this.inMemoryMapOutputs.addAll(this.inMemoryMergedMapOutputs);
            this.inMemoryMergedMapOutputs.clear();
            this.inMemoryMerger.startMerge(this.inMemoryMapOutputs);
            this.commitMemory = 0L;
        }
        if (this.memToMemMerger != null && this.inMemoryMapOutputs.size() >= this.memToMemMergeOutputsThreshold) {
            this.memToMemMerger.startMerge(this.inMemoryMapOutputs);
        }
    }

    public synchronized void closeInMemoryMergedFile(InMemoryMapOutput<K, V> mapOutput) {
        this.inMemoryMergedMapOutputs.add(mapOutput);
        LOG.info("closeInMemoryMergedFile -> size: " + mapOutput.getSize() + ", inMemoryMergedMapOutputs.size() -> " + this.inMemoryMergedMapOutputs.size());
    }

    public synchronized void closeOnDiskFile(CompressAwarePath file) {
        this.onDiskMapOutputs.add(file);
        if (this.onDiskMapOutputs.size() >= 2 * this.ioSortFactor - 1) {
            this.onDiskMerger.startMerge(this.onDiskMapOutputs);
        }
    }

    @Override
    public RawKeyValueIterator close() throws Throwable {
        if (this.memToMemMerger != null) {
            this.memToMemMerger.close();
        }
        this.inMemoryMerger.close();
        this.onDiskMerger.close();
        ArrayList<InMemoryMapOutput<K, V>> memory = new ArrayList<InMemoryMapOutput<K, V>>(this.inMemoryMergedMapOutputs);
        this.inMemoryMergedMapOutputs.clear();
        memory.addAll(this.inMemoryMapOutputs);
        this.inMemoryMapOutputs.clear();
        ArrayList<CompressAwarePath> disk = new ArrayList<CompressAwarePath>(this.onDiskMapOutputs);
        this.onDiskMapOutputs.clear();
        return this.finalMerge(this.jobConf, this.rfs, memory, disk);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void combineAndSpill(RawKeyValueIterator kvIter, Counters.Counter inCounter) throws IOException {
        JobConf job = this.jobConf;
        Reducer combiner = ReflectionUtils.newInstance(this.combinerClass, job);
        Class<?> keyClass = job.getMapOutputKeyClass();
        Class<?> valClass = job.getMapOutputValueClass();
        RawComparator comparator = job.getCombinerKeyGroupingComparator();
        try {
            Task.CombineValuesIterator values = new Task.CombineValuesIterator(kvIter, comparator, keyClass, valClass, job, Reporter.NULL, inCounter);
            while (values.more()) {
                combiner.reduce(values.getKey(), values, this.combineCollector, Reporter.NULL);
                values.nextKey();
            }
        }
        finally {
            combiner.close();
        }
    }

    private long createInMemorySegments(List<InMemoryMapOutput<K, V>> inMemoryMapOutputs, List<Merger.Segment<K, V>> inMemorySegments, long leaveBytes) throws IOException {
        long totalSize = 0L;
        long fullSize = 0L;
        for (InMemoryMapOutput<K, V> mo : inMemoryMapOutputs) {
            fullSize += (long)mo.getMemory().length;
        }
        while (fullSize > leaveBytes) {
            InMemoryMapOutput<K, V> mo = inMemoryMapOutputs.remove(0);
            byte[] data = mo.getMemory();
            long size = data.length;
            totalSize += size;
            fullSize -= size;
            InMemoryReader reader = new InMemoryReader(this, mo.getMapId(), data, 0, (int)size, this.jobConf);
            inMemorySegments.add(new Merger.Segment(reader, true, mo.isPrimaryMapOutput() ? this.mergedMapOutputsCounter : null));
        }
        return totalSize;
    }

    @VisibleForTesting
    final long getMaxInMemReduceLimit() {
        float maxRedPer = this.jobConf.getFloat("mapreduce.reduce.input.buffer.percent", 0.0f);
        if ((double)maxRedPer > 1.0 || (double)maxRedPer < 0.0) {
            throw new RuntimeException(maxRedPer + ": " + "mapreduce.reduce.input.buffer.percent" + " must be a float between 0 and 1.0");
        }
        return (long)((float)this.memoryLimit * maxRedPer);
    }

    private RawKeyValueIterator finalMerge(JobConf job, FileSystem fs, List<InMemoryMapOutput<K, V>> inMemoryMapOutputs, List<CompressAwarePath> onDiskMapOutputs) throws IOException {
        CompressAwarePath[] onDisk;
        LOG.info("finalMerge called with " + inMemoryMapOutputs.size() + " in-memory map-outputs and " + onDiskMapOutputs.size() + " on-disk map-outputs");
        long maxInMemReduce = this.getMaxInMemReduceLimit();
        Class<?> keyClass = job.getMapOutputKeyClass();
        Class<?> valueClass = job.getMapOutputValueClass();
        boolean keepInputs = job.getKeepFailedTaskFiles();
        Path tmpDir = new Path(this.reduceId.toString());
        RawComparator comparator = job.getOutputKeyComparator();
        ArrayList<Merger.Segment<K, V>> memDiskSegments = new ArrayList<Merger.Segment<K, V>>();
        long inMemToDiskBytes = 0L;
        boolean mergePhaseFinished = false;
        if (inMemoryMapOutputs.size() > 0) {
            TaskID mapId = inMemoryMapOutputs.get(0).getMapId().getTaskID();
            inMemToDiskBytes = this.createInMemorySegments(inMemoryMapOutputs, memDiskSegments, maxInMemReduce);
            int numMemDiskSegments = memDiskSegments.size();
            if (numMemDiskSegments > 0 && this.ioSortFactor > onDiskMapOutputs.size()) {
                mergePhaseFinished = true;
                Path outputPath = this.mapOutputFile.getInputFileForWrite(mapId, inMemToDiskBytes).suffix(Task.MERGED_OUTPUT_PREFIX);
                RawKeyValueIterator rIter = Merger.merge(job, fs, keyClass, valueClass, memDiskSegments, numMemDiskSegments, tmpDir, comparator, this.reporter, this.spilledRecordsCounter, null, this.mergePhase);
                FSDataOutputStream out = CryptoUtils.wrapIfNecessary((Configuration)job, fs.create(outputPath));
                IFile.Writer writer = new IFile.Writer(job, out, keyClass, valueClass, this.codec, null, true);
                try {
                    Merger.writeFile(rIter, writer, this.reporter, job);
                    writer.close();
                    onDiskMapOutputs.add(new CompressAwarePath(outputPath, writer.getRawLength(), writer.getCompressedLength()));
                    writer = null;
                }
                catch (IOException e) {
                    if (null != outputPath) {
                        try {
                            fs.delete(outputPath, true);
                        }
                        catch (IOException iOException) {
                            // empty catch block
                        }
                    }
                    throw e;
                }
                finally {
                    if (null != writer) {
                        writer.close();
                    }
                }
                LOG.info("Merged " + numMemDiskSegments + " segments, " + inMemToDiskBytes + " bytes to disk to satisfy " + "reduce memory limit");
                inMemToDiskBytes = 0L;
                memDiskSegments.clear();
            } else if (inMemToDiskBytes != 0L) {
                LOG.info("Keeping " + numMemDiskSegments + " segments, " + inMemToDiskBytes + " bytes in memory for " + "intermediate, on-disk merge");
            }
        }
        ArrayList diskSegments = new ArrayList();
        long onDiskBytes = inMemToDiskBytes;
        long rawBytes = inMemToDiskBytes;
        for (CompressAwarePath file : onDisk = onDiskMapOutputs.toArray(new CompressAwarePath[onDiskMapOutputs.size()])) {
            long fileLength = fs.getFileStatus(file).getLen();
            onDiskBytes += fileLength;
            rawBytes += file.getRawDataLength() > 0L ? file.getRawDataLength() : fileLength;
            LOG.debug("Disk file: " + file + " Length is " + fileLength);
            diskSegments.add(new Merger.Segment((Configuration)job, fs, (Path)file, this.codec, keepInputs, file.toString().endsWith(Task.MERGED_OUTPUT_PREFIX) ? null : this.mergedMapOutputsCounter, file.getRawDataLength()));
        }
        LOG.info("Merging " + onDisk.length + " files, " + onDiskBytes + " bytes from disk");
        Collections.sort(diskSegments, new Comparator<Merger.Segment<K, V>>(){

            @Override
            public int compare(Merger.Segment<K, V> o1, Merger.Segment<K, V> o2) {
                if (o1.getLength() == o2.getLength()) {
                    return 0;
                }
                return o1.getLength() < o2.getLength() ? -1 : 1;
            }
        });
        ArrayList<Merger.Segment<K, V>> finalSegments = new ArrayList<Merger.Segment<K, V>>();
        long inMemBytes = this.createInMemorySegments(inMemoryMapOutputs, finalSegments, 0L);
        LOG.info("Merging " + finalSegments.size() + " segments, " + inMemBytes + " bytes from memory into reduce");
        if (0L != onDiskBytes) {
            int numInMemSegments = memDiskSegments.size();
            diskSegments.addAll(0, memDiskSegments);
            memDiskSegments.clear();
            Progress thisPhase = mergePhaseFinished ? null : this.mergePhase;
            RawKeyValueIterator diskMerge = Merger.merge((Configuration)job, fs, keyClass, valueClass, this.codec, diskSegments, this.ioSortFactor, numInMemSegments, tmpDir, comparator, (Progressable)this.reporter, false, this.spilledRecordsCounter, null, thisPhase);
            diskSegments.clear();
            if (0 == finalSegments.size()) {
                return diskMerge;
            }
            finalSegments.add(new Merger.Segment(new RawKVIteratorReader(diskMerge, onDiskBytes), true, rawBytes));
        }
        return Merger.merge(job, fs, keyClass, valueClass, finalSegments, finalSegments.size(), tmpDir, comparator, this.reporter, this.spilledRecordsCounter, null, null);
    }

    static class CompressAwarePath
    extends Path {
        private long rawDataLength;
        private long compressedSize;

        public CompressAwarePath(Path path, long rawDataLength, long compressSize) {
            super(path.toUri());
            this.rawDataLength = rawDataLength;
            this.compressedSize = compressSize;
        }

        public long getRawDataLength() {
            return this.rawDataLength;
        }

        public long getCompressedSize() {
            return this.compressedSize;
        }

        @Override
        public boolean equals(Object other) {
            return super.equals(other);
        }

        @Override
        public int hashCode() {
            return super.hashCode();
        }

        @Override
        public int compareTo(Object obj) {
            if (obj instanceof CompressAwarePath) {
                CompressAwarePath compPath = (CompressAwarePath)obj;
                if (this.compressedSize < compPath.getCompressedSize()) {
                    return -1;
                }
                if (this.getCompressedSize() > compPath.getCompressedSize()) {
                    return 1;
                }
            }
            return super.compareTo(obj);
        }
    }

    class RawKVIteratorReader
    extends IFile.Reader<K, V> {
        private final RawKeyValueIterator kvIter;

        public RawKVIteratorReader(RawKeyValueIterator kvIter, long size) throws IOException {
            super(null, null, size, null, MergeManagerImpl.this.spilledRecordsCounter);
            this.kvIter = kvIter;
        }

        @Override
        public boolean nextRawKey(DataInputBuffer key) throws IOException {
            if (this.kvIter.next()) {
                DataInputBuffer kb = this.kvIter.getKey();
                int kp = kb.getPosition();
                int klen = kb.getLength() - kp;
                key.reset(kb.getData(), kp, klen);
                this.bytesRead += (long)klen;
                return true;
            }
            return false;
        }

        @Override
        public void nextRawValue(DataInputBuffer value) throws IOException {
            DataInputBuffer vb = this.kvIter.getValue();
            int vp = vb.getPosition();
            int vlen = vb.getLength() - vp;
            value.reset(vb.getData(), vp, vlen);
            this.bytesRead += (long)vlen;
        }

        @Override
        public long getPosition() throws IOException {
            return this.bytesRead;
        }

        @Override
        public void close() throws IOException {
            this.kvIter.close();
        }
    }

    private class OnDiskMerger
    extends MergeThread<CompressAwarePath, K, V> {
        public OnDiskMerger(MergeManagerImpl<K, V> manager) {
            super(manager, MergeManagerImpl.this.ioSortFactor, MergeManagerImpl.this.exceptionReporter);
            this.setName("OnDiskMerger - Thread to merge on-disk map-outputs");
            this.setDaemon(true);
        }

        @Override
        public void merge(List<CompressAwarePath> inputs) throws IOException {
            CompressAwarePath compressAwarePath;
            if (inputs == null || inputs.isEmpty()) {
                LOG.info("No ondisk files to merge...");
                return;
            }
            long approxOutputSize = 0L;
            int bytesPerSum = MergeManagerImpl.this.jobConf.getInt("io.bytes.per.checksum", 512);
            LOG.info("OnDiskMerger: We have  " + inputs.size() + " map outputs on disk. Triggering merge...");
            for (CompressAwarePath file : inputs) {
                approxOutputSize += MergeManagerImpl.this.localFS.getFileStatus(file).getLen();
            }
            approxOutputSize += ChecksumFileSystem.getChecksumLength(approxOutputSize, bytesPerSum);
            Path outputPath = MergeManagerImpl.this.localDirAllocator.getLocalPathForWrite(inputs.get(0).toString(), approxOutputSize, MergeManagerImpl.this.jobConf).suffix(Task.MERGED_OUTPUT_PREFIX);
            FSDataOutputStream out = CryptoUtils.wrapIfNecessary((Configuration)MergeManagerImpl.this.jobConf, MergeManagerImpl.this.rfs.create(outputPath));
            IFile.Writer writer = new IFile.Writer(MergeManagerImpl.this.jobConf, out, MergeManagerImpl.this.jobConf.getMapOutputKeyClass(), MergeManagerImpl.this.jobConf.getMapOutputValueClass(), MergeManagerImpl.this.codec, null, true);
            RawKeyValueIterator iter = null;
            Path tmpDir = new Path(MergeManagerImpl.this.reduceId.toString());
            try {
                iter = Merger.merge((Configuration)MergeManagerImpl.this.jobConf, MergeManagerImpl.this.rfs, MergeManagerImpl.this.jobConf.getMapOutputKeyClass(), MergeManagerImpl.this.jobConf.getMapOutputValueClass(), MergeManagerImpl.this.codec, inputs.toArray(new Path[inputs.size()]), true, MergeManagerImpl.this.ioSortFactor, tmpDir, MergeManagerImpl.this.jobConf.getOutputKeyComparator(), (Progressable)MergeManagerImpl.this.reporter, MergeManagerImpl.this.spilledRecordsCounter, null, MergeManagerImpl.this.mergedMapOutputsCounter, null);
                Merger.writeFile(iter, writer, MergeManagerImpl.this.reporter, MergeManagerImpl.this.jobConf);
                writer.close();
                compressAwarePath = new CompressAwarePath(outputPath, writer.getRawLength(), writer.getCompressedLength());
            }
            catch (IOException e) {
                MergeManagerImpl.this.localFS.delete(outputPath, true);
                throw e;
            }
            MergeManagerImpl.this.closeOnDiskFile(compressAwarePath);
            LOG.info(MergeManagerImpl.this.reduceId + " Finished merging " + inputs.size() + " map output files on disk of total-size " + approxOutputSize + "." + " Local output file is " + outputPath + " of size " + MergeManagerImpl.this.localFS.getFileStatus(outputPath).getLen());
        }
    }

    private class InMemoryMerger
    extends MergeThread<InMemoryMapOutput<K, V>, K, V> {
        public InMemoryMerger(MergeManagerImpl<K, V> manager) {
            super(manager, Integer.MAX_VALUE, MergeManagerImpl.this.exceptionReporter);
            this.setName("InMemoryMerger - Thread to merge in-memory shuffled map-outputs");
            this.setDaemon(true);
        }

        @Override
        public void merge(List<InMemoryMapOutput<K, V>> inputs) throws IOException {
            CompressAwarePath compressAwarePath;
            if (inputs == null || inputs.size() == 0) {
                return;
            }
            TaskAttemptID mapId = inputs.get(0).getMapId();
            TaskID mapTaskId = mapId.getTaskID();
            ArrayList inMemorySegments = new ArrayList();
            long mergeOutputSize = MergeManagerImpl.this.createInMemorySegments(inputs, inMemorySegments, 0L);
            int noInMemorySegments = inMemorySegments.size();
            Path outputPath = MergeManagerImpl.this.mapOutputFile.getInputFileForWrite(mapTaskId, mergeOutputSize).suffix(Task.MERGED_OUTPUT_PREFIX);
            FSDataOutputStream out = CryptoUtils.wrapIfNecessary((Configuration)MergeManagerImpl.this.jobConf, MergeManagerImpl.this.rfs.create(outputPath));
            IFile.Writer writer = new IFile.Writer(MergeManagerImpl.this.jobConf, out, MergeManagerImpl.this.jobConf.getMapOutputKeyClass(), MergeManagerImpl.this.jobConf.getMapOutputValueClass(), MergeManagerImpl.this.codec, null, true);
            RawKeyValueIterator rIter = null;
            try {
                LOG.info("Initiating in-memory merge with " + noInMemorySegments + " segments...");
                rIter = Merger.merge(MergeManagerImpl.this.jobConf, MergeManagerImpl.this.rfs, MergeManagerImpl.this.jobConf.getMapOutputKeyClass(), MergeManagerImpl.this.jobConf.getMapOutputValueClass(), inMemorySegments, inMemorySegments.size(), new Path(MergeManagerImpl.this.reduceId.toString()), MergeManagerImpl.this.jobConf.getOutputKeyComparator(), MergeManagerImpl.this.reporter, MergeManagerImpl.this.spilledRecordsCounter, null, null);
                if (null == MergeManagerImpl.this.combinerClass) {
                    Merger.writeFile(rIter, writer, MergeManagerImpl.this.reporter, MergeManagerImpl.this.jobConf);
                } else {
                    MergeManagerImpl.this.combineCollector.setWriter(writer);
                    MergeManagerImpl.this.combineAndSpill(rIter, MergeManagerImpl.this.reduceCombineInputCounter);
                }
                writer.close();
                compressAwarePath = new CompressAwarePath(outputPath, writer.getRawLength(), writer.getCompressedLength());
                LOG.info(MergeManagerImpl.this.reduceId + " Merge of the " + noInMemorySegments + " files in-memory complete." + " Local file is " + outputPath + " of size " + MergeManagerImpl.this.localFS.getFileStatus(outputPath).getLen());
            }
            catch (IOException e) {
                MergeManagerImpl.this.localFS.delete(outputPath, true);
                throw e;
            }
            MergeManagerImpl.this.closeOnDiskFile(compressAwarePath);
        }
    }

    private class IntermediateMemoryToMemoryMerger
    extends MergeThread<InMemoryMapOutput<K, V>, K, V> {
        public IntermediateMemoryToMemoryMerger(MergeManagerImpl<K, V> manager, int mergeFactor) {
            super(manager, mergeFactor, MergeManagerImpl.this.exceptionReporter);
            this.setName("InMemoryMerger - Thread to do in-memory merge of in-memory shuffled map-outputs");
            this.setDaemon(true);
        }

        @Override
        public void merge(List<InMemoryMapOutput<K, V>> inputs) throws IOException {
            if (inputs == null || inputs.size() == 0) {
                return;
            }
            TaskAttemptID dummyMapId = inputs.get(0).getMapId();
            ArrayList inMemorySegments = new ArrayList();
            long mergeOutputSize = MergeManagerImpl.this.createInMemorySegments(inputs, inMemorySegments, 0L);
            int noInMemorySegments = inMemorySegments.size();
            InMemoryMapOutput mergedMapOutputs = MergeManagerImpl.this.unconditionalReserve(dummyMapId, mergeOutputSize, false);
            InMemoryWriter writer = new InMemoryWriter(mergedMapOutputs.getArrayStream());
            LOG.info("Initiating Memory-to-Memory merge with " + noInMemorySegments + " segments of total-size: " + mergeOutputSize);
            RawKeyValueIterator rIter = Merger.merge(MergeManagerImpl.this.jobConf, MergeManagerImpl.this.rfs, MergeManagerImpl.this.jobConf.getMapOutputKeyClass(), MergeManagerImpl.this.jobConf.getMapOutputValueClass(), inMemorySegments, inMemorySegments.size(), new Path(MergeManagerImpl.this.reduceId.toString()), MergeManagerImpl.this.jobConf.getOutputKeyComparator(), MergeManagerImpl.this.reporter, null, null, null);
            Merger.writeFile(rIter, writer, MergeManagerImpl.this.reporter, MergeManagerImpl.this.jobConf);
            ((IFile.Writer)writer).close();
            LOG.info(MergeManagerImpl.this.reduceId + " Memory-to-Memory merge of the " + noInMemorySegments + " files in-memory complete.");
            MergeManagerImpl.this.closeInMemoryMergedFile(mergedMapOutputs);
        }
    }
}

