return (new GenMRRedSink1()).process(nd, stack, opProcCtx, nodeOutputs);
// union consisted on a bunch of map-reduce jobs, and it has been split at the union
Operator<? extends Serializable> reducer = op.getChildOperators().get(0);
Map<Operator<? extends Serializable>, GenMapRedCtx> mapCurrCtx = ctx.getMapCurrCtx();
GenMapRedCtx mapredCtx = mapCurrCtx.get(op.getParentOperators().get(0));
Task<? extends Serializable> currTask = mapredCtx.getCurrTask();
mapredWork plan = (mapredWork) currTask.getWork();
HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = ctx.getOpTaskMap();
Task<? extends Serializable> opMapTask = opTaskMap.get(reducer);
ctx.setCurrTask(currTask);
// If the plan for this reducer does not exist, initialize the plan
if (opMapTask == null) {
// When the reducer is encountered for the first time
if (plan.getReducer() == null)
GenMapRedUtils.initUnionPlan(op, ctx);
// When union is followed by a multi-table insert
else
GenMapRedUtils.splitPlan(op, ctx);
}
// The union is already initialized. However, the union is walked from another input
// initUnionPlan is idempotent
else if (plan.getReducer() == reducer)
GenMapRedUtils.initUnionPlan(op, ctx);
// There is a join after union. One of the branches of union has already been initialized.
// Initialize the current branch, and join with the original plan.
else {
GenMapRedUtils.initUnionPlan(ctx, currTask, false);
GenMapRedUtils.joinPlan(op, currTask, opMapTask, ctx, -1, true, false, false);
}
mapCurrCtx.put(op, new GenMapRedCtx(ctx.getCurrTask(), ctx.getCurrTopOp(), ctx.getCurrAliasId()));
// the union operator has been processed
ctx.setCurrUnionOp(null);
return null;
}