: (bufvoid - bufend) + bufstart) +
partitions * APPROX_HEADER_LENGTH;
FSDataOutputStream out = null;
try {
// create spill file
final TezSpillRecord spillRec = new TezSpillRecord(partitions);
final Path filename =
mapOutputFile.getSpillFileForWrite(numSpills, size);
out = rfs.create(filename);
int spindex = mstart;
final InMemValBytes value = createInMemValBytes();
for (int i = 0; i < partitions; ++i) {
IFile.Writer writer = null;
try {
long segmentStart = out.getPos();
writer = new Writer(conf, out, keyClass, valClass, codec,
spilledRecordsCounter, null);
if (combiner == null) {
// spill directly
DataInputBuffer key = new DataInputBuffer();
while (spindex < mend &&
kvmeta.get(offsetFor(spindex) + PARTITION) == i) {
final int kvoff = offsetFor(spindex);
int keystart = kvmeta.get(kvoff + KEYSTART);
int valstart = kvmeta.get(kvoff + VALSTART);
key.reset(kvbuffer, keystart, valstart - keystart);
getVBytesForOffset(kvoff, value);
writer.append(key, value);
++spindex;
}
} else {
int spstart = spindex;
while (spindex < mend &&
kvmeta.get(offsetFor(spindex)
+ PARTITION) == i) {
++spindex;
}
// Note: we would like to avoid the combiner if we've fewer
// than some threshold of records for a partition
if (spstart != spindex) {
TezRawKeyValueIterator kvIter =
new MRResultIterator(spstart, spindex);
if (LOG.isDebugEnabled()) {
LOG.debug("Running combine processor");
}
runCombineProcessor(kvIter, writer);
}
}
// close the writer
writer.close();
if (numSpills > 0) {
additionalSpillBytesWritten.increment(writer.getCompressedLength());
numAdditionalSpills.increment(1);
// Reset the value will be set during the final merge.
outputBytesWithOverheadCounter.setValue(0);
} else {
// Set this up for the first write only. Subsequent ones will be handled in the final merge.
outputBytesWithOverheadCounter.increment(writer.getRawLength());
}
// record offsets
final TezIndexRecord rec =
new TezIndexRecord(
segmentStart,
writer.getRawLength(),
writer.getCompressedLength());
spillRec.putIndex(rec, i);
writer = null;
} finally {
if (null != writer) writer.close();
}
}
if (totalIndexCacheMemory >= indexCacheMemoryLimit) {
// create spill index file
Path indexFilename =
mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
* MAP_OUTPUT_INDEX_RECORD_LENGTH);
spillRec.writeToFile(indexFilename, conf);
} else {
indexCacheList.add(spillRec);
totalIndexCacheMemory +=
spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
}
LOG.info("Finished spill " + numSpills);
++numSpills;
} finally {
if (out != null) out.close();