"containerId=" + containerId);
//////////////////////////////////////////////////////////////////////
// disk or host-local allocation
//////////////////////////////////////////////////////////////////////
QueryUnitAttemptId attemptId = allocateLocalTask(host, containerId);
if (attemptId == null) { // if a local task cannot be found
HostVolumeMapping hostVolumeMapping = leafTaskHostMapping.get(host);
if(hostVolumeMapping != null) {
if(!hostVolumeMapping.isRemote(containerId)){
// assign to remote volume
hostVolumeMapping.decreaseConcurrency(containerId);
hostVolumeMapping.increaseConcurrency(containerId, HostVolumeMapping.REMOTE);
}
// this part is remote concurrency management of a tail tasks
int tailLimit = Math.max(remainingScheduledObjectNum() / (leafTaskHostMapping.size() * 2), 1);
if(hostVolumeMapping.getRemoteConcurrency() > tailLimit){
//release container
hostVolumeMapping.decreaseConcurrency(containerId);
taskRequest.getCallback().run(stopTaskRunnerReq);
subQuery.releaseContainer(containerId);
continue;
}
}
//////////////////////////////////////////////////////////////////////
// rack-local allocation
//////////////////////////////////////////////////////////////////////
attemptId = allocateRackTask(host);
//////////////////////////////////////////////////////////////////////
// random node allocation
//////////////////////////////////////////////////////////////////////
if (attemptId == null && leafTaskNum() > 0) {
synchronized (leafTasks){
attemptId = leafTasks.iterator().next();
leafTasks.remove(attemptId);
rackLocalAssigned++;
totalAssigned++;
LOG.info(String.format("Assigned Local/Remote/Total: (%d/%d/%d), Locality: %.2f%%,",
hostLocalAssigned, rackLocalAssigned, totalAssigned,
((double) hostLocalAssigned / (double) totalAssigned) * 100));
}
}
}
if (attemptId != null) {
QueryUnit task = subQuery.getQueryUnit(attemptId.getQueryUnitId());
QueryUnitRequest taskAssign = new QueryUnitRequestImpl(
attemptId,
new ArrayList<FragmentProto>(task.getAllFragments()),
"",
false,