/*
* A task and its child task has been converted from join to mapjoin.
* See if the two tasks can be merged.
*/
private void mergeMapJoinTaskWithChildMapJoinTask(MapRedTask task, Configuration conf) {
MapRedTask childTask = (MapRedTask)task.getChildTasks().get(0);
MapredWork work = task.getWork();
MapredLocalWork localWork = work.getMapLocalWork();
MapredWork childWork = childTask.getWork();
MapredLocalWork childLocalWork = childWork.getMapLocalWork();
// Can this be merged
Map<String, Operator<? extends OperatorDesc>> aliasToWork = work.getAliasToWork();
if (aliasToWork.size() > 1) {
return;
}
Operator<? extends OperatorDesc> op = aliasToWork.values().iterator().next();
while (op.getChildOperators() != null) {
// Dont perform this optimization for multi-table inserts
if (op.getChildOperators().size() > 1) {
return;
}
op = op.getChildOperators().get(0);
}
if (!(op instanceof FileSinkOperator)) {
return;
}
FileSinkOperator fop = (FileSinkOperator)op;
String workDir = fop.getConf().getDirName();
Map<String, ArrayList<String>> childPathToAliases = childWork.getPathToAliases();
if (childPathToAliases.size() > 1) {
return;
}
// The filesink writes to a different directory
if (!childPathToAliases.keySet().iterator().next().equals(workDir)) {
return;
}
// Either of them should not be bucketed
if ((localWork.getBucketMapjoinContext() != null) ||
(childLocalWork.getBucketMapjoinContext() != null)) {
return;
}
// Merge the trees
if (childWork.getAliasToWork().size() > 1) {
return;
}
long mapJoinSize = HiveConf.getLongVar(conf,
HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD);
long localTableTotalSize = 0;
for (String alias : localWork.getAliasToWork().keySet()) {
Long tabSize = aliasToSize.get(alias);
if (tabSize == null) {
/* if the size is unavailable, we need to assume a size 1 greater than mapJoinSize
* this implies that merge cannot happen so we can return.
*/
return;
}
localTableTotalSize += tabSize;
}
for (String alias : childLocalWork.getAliasToWork().keySet()) {
Long tabSize = aliasToSize.get(alias);
if (tabSize == null) {
/* if the size is unavailable, we need to assume a size 1 greater than mapJoinSize
* this implies that merge cannot happen so we can return.
*/
return;
}
localTableTotalSize += tabSize;
if (localTableTotalSize > mapJoinSize) {
return;
}
}
Operator<? extends Serializable> childAliasOp =
childWork.getAliasToWork().values().iterator().next();
if (fop.getParentOperators().size() > 1) {
return;
}
// Merge the 2 trees - remove the FileSinkOperator from the first tree pass it to the
// top of the second
Operator<? extends Serializable> parentFOp = fop.getParentOperators().get(0);
parentFOp.getChildOperators().remove(fop);
parentFOp.getChildOperators().add(childAliasOp);
List<Operator<? extends OperatorDesc>> parentOps =
new ArrayList<Operator<? extends OperatorDesc>>();
parentOps.add(parentFOp);
childAliasOp.setParentOperators(parentOps);
work.getAliasToPartnInfo().putAll(childWork.getAliasToPartnInfo());
for (Map.Entry<String, PartitionDesc> childWorkEntry :
childWork.getPathToPartitionInfo().entrySet()) {
if (childWork.getAliasToPartnInfo().containsValue(childWorkEntry.getKey())) {
work.getPathToPartitionInfo().put(childWorkEntry.getKey(), childWorkEntry.getValue());
}
}
localWork.getAliasToFetchWork().putAll(childLocalWork.getAliasToFetchWork());
localWork.getAliasToWork().putAll(childLocalWork.getAliasToWork());
// remove the child task
List<Task<? extends Serializable>> oldChildTasks = childTask.getChildTasks();
task.setChildTasks(oldChildTasks);
if (oldChildTasks != null) {
for (Task<? extends Serializable> oldChildTask : oldChildTasks) {
oldChildTask.getParentTasks().remove(childTask);
oldChildTask.getParentTasks().add(task);