sorter.setup(context, sv2, incoming);
Stopwatch w = new Stopwatch();
w.start();
sorter.sort(sv2);
// logger.debug("Took {} us to sort {} records", w.elapsed(TimeUnit.MICROSECONDS), count);
RecordBatchData rbd = new RecordBatchData(incoming);
if (incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.NONE) {
rbd.setSv2(sv2);
}
batchGroups.add(new BatchGroup(rbd.getContainer(), rbd.getSv2()));
batchesSinceLastSpill++;
if (batchGroups.size() > SPILL_THRESHOLD && batchesSinceLastSpill >= SPILL_BATCH_GROUP_SIZE) {
mergeAndSpill();
batchesSinceLastSpill = 0;
}
long t = w.elapsed(TimeUnit.MICROSECONDS);
// logger.debug("Took {} us to sort {} records", t, count);
break;
case OUT_OF_MEMORY:
if (batchesSinceLastSpill > 2) mergeAndSpill();
batchesSinceLastSpill = 0;
break;
default:
throw new UnsupportedOperationException();
}
}
// if (schema == null || totalcount == 0){
// builder may be null at this point if the first incoming batch is empty
// useIncomingSchema = true;
// return IterOutcome.NONE;
// }
if (spillCount == 0) {
Stopwatch watch = new Stopwatch();
watch.start();
// if (schema == null){
// builder may be null at this point if the first incoming batch is empty
// useIncomingSchema = true;
// return IterOutcome.NONE;
// }
builder = new SortRecordBatchBuilder(oContext.getAllocator(), MAX_SORT_BYTES);
for (BatchGroup group : batchGroups) {
RecordBatchData rbd = new RecordBatchData(group.getFirstContainer());
rbd.setSv2(group.getSv2());
builder.add(rbd);
}
builder.build(context, container);
sv4 = builder.getSv4();