Path[] onDisk = onDiskMapOutputs.toArray(new Path[onDiskMapOutputs.size()]);
for (Path file : onDisk) {
onDiskBytes += fs.getFileStatus(file).getLen();
LOG.debug("Disk file: " + file + " Length is " +
fs.getFileStatus(file).getLen());
diskSegments.add(new Segment(job, fs, file, codec, ifileReadAhead,
ifileReadAheadLength, ifileBufferSize, false,
(file.toString().endsWith(
Constants.MERGED_OUTPUT_PREFIX) ?
null : mergedMapOutputsCounter)
));
}
LOG.info("Merging " + onDisk.length + " files, " +
onDiskBytes + " bytes from disk");
Collections.sort(diskSegments, new Comparator<Segment>() {
public int compare(Segment o1, Segment o2) {
if (o1.getLength() == o2.getLength()) {
return 0;
}
return o1.getLength() < o2.getLength() ? -1 : 1;
}
});
// build final list of segments from merged backed by disk + in-mem
List<Segment> finalSegments = new ArrayList<Segment>();
long inMemBytes = createInMemorySegments(inMemoryMapOutputs,
finalSegments, 0);
LOG.info("Merging " + finalSegments.size() + " segments, " +
inMemBytes + " bytes from memory into reduce");
if (0 != onDiskBytes) {
final int numInMemSegments = memDiskSegments.size();
diskSegments.addAll(0, memDiskSegments);
memDiskSegments.clear();
TezRawKeyValueIterator diskMerge = TezMerger.merge(
job, fs, keyClass, valueClass, diskSegments,
ioSortFactor, numInMemSegments, tmpDir, comparator,
nullProgressable, false, spilledRecordsCounter, null, null);
diskSegments.clear();
if (0 == finalSegments.size()) {
return diskMerge;
}
finalSegments.add(new Segment(
new RawKVIteratorReader(diskMerge, onDiskBytes), true));
}
return TezMerger.merge(job, fs, keyClass, valueClass,
finalSegments, finalSegments.size(), tmpDir,
comparator, nullProgressable, spilledRecordsCounter, null,