*/
private static void finalizeJob(FinalizeJobTask finalizeJobTask) {
Key jobKey = finalizeJobTask.getJobKey();
// Get the JobRecord, its finalize Barrier, all the slots in the
// finalize Barrier, and the job's output Slot.
JobRecord jobRecord = queryJobOrAbandonTask(jobKey, JobRecord.InflationType.FOR_FINALIZE);
jobRecord.getQueueSettings().merge(finalizeJobTask.getQueueSettings());
switch (jobRecord.getState()) {
case WAITING_TO_FINALIZE:
// OK, proceed
break;
case WAITING_TO_RUN:
case RETRY:
throw new RuntimeException("" + jobRecord + " is in RETRY state");
case STOPPED:
logger.info("This job has been stoped " + jobRecord);
return;
case CANCELED:
logger.info("This job has already been canceled " + jobRecord);
return;
case FINALIZED:
logger.info("This job has already been run " + jobRecord);
return;
}
Barrier finalizeBarrier = jobRecord.getFinalizeBarrierInflated();
if (null == finalizeBarrier) {
throw new RuntimeException("" + jobRecord + " has not been inflated");
}
Slot outputSlot = jobRecord.getOutputSlotInflated();
if (null == outputSlot) {
throw new RuntimeException("" + jobRecord + " has not been inflated.");
}
// release the finalize barrier now so that any concurrent
// HandleSlotFilled tasks will stop trying
finalizeBarrier.setReleased();
UpdateSpec updateSpec = new UpdateSpec(jobRecord.getRootJobKey());
updateSpec.getOrCreateTransaction("releaseFinalizeBarrier").includeBarrier(finalizeBarrier);
backEnd.save(updateSpec, jobRecord.getQueueSettings());
updateSpec = new UpdateSpec(jobRecord.getRootJobKey());
// Copy the finalize value to the output slot
List<Object> finalizeArguments = finalizeBarrier.buildArgumentList();
int numFinalizeArguments = finalizeArguments.size();
if (1 != numFinalizeArguments) {
throw new RuntimeException(
"Internal logic error: numFinalizeArguments=" + numFinalizeArguments);
}
Object finalizeValue = finalizeArguments.get(0);
logger.finest("Finalizing " + jobRecord + " with value=" + finalizeValue);
outputSlot.fill(finalizeValue);
// Change state of the job to FINALIZED and set the end time
jobRecord.setState(State.FINALIZED);
jobRecord.setEndTime(new Date());
// Propagate the filler of the finalize slot to also be the filler of the
// output slot. If there is no unique filler of the finalize slot then we
// resort to assigning the current job as the filler job.
Key fillerJobKey = getFinalizeSlotFiller(finalizeBarrier);
if (null == fillerJobKey) {
fillerJobKey = jobKey;
}
outputSlot.setSourceJobKey(fillerJobKey);
// Save the job and the output slot
updateSpec.getNonTransactionalGroup().includeJob(jobRecord);
updateSpec.getNonTransactionalGroup().includeSlot(outputSlot);
backEnd.save(updateSpec, jobRecord.getQueueSettings());
// enqueue a HandleSlotFilled task
HandleSlotFilledTask task =
new HandleSlotFilledTask(outputSlot.getKey(), jobRecord.getQueueSettings());
backEnd.enqueue(task);
}