HandleSpec streamOutputSpec = command.getOutputSpec();
loadFuncSpec = new FuncSpec(streamOutputSpec.getSpec());
} else {
int errCode = 2006;
String msg = "TypeCastInserter invoked with an invalid operator class name: " + lo.getClass().getSimpleName();
throw new OptimizerException(msg, errCode, PigException.BUG);
}
cast.setLoadFuncSpec(loadFuncSpec);
typeChanges.put(fs.canonicalName, fs.type);
if(determinedSchema == null) {
// Reset the loads field schema to byte array so that it
// will reflect reality.
fs.type = DataType.BYTEARRAY;
} else {
// Reset the type to what determinedSchema says it is
fs.type = determinedSchema.getField(i).type;
}
}
}
}
// Build a foreach to insert after the load, giving it a cast for each
// position that has a type other than byte array.
LOForEach foreach = new LOForEach(mPlan,
OperatorKey.genOpKey(scope), genPlans, flattens);
foreach.setAlias(lo.getAlias());
// Insert the foreach into the plan and patch up the plan.
insertAfter(lo, foreach, null);
rebuildSchemas();
} catch (OptimizerException oe) {
throw oe;
} catch (Exception e) {
int errCode = 2007;
String msg = "Unable to insert type casts into plan";
throw new OptimizerException(msg, errCode, PigException.BUG, e);
}
}