}
@Override
public boolean cancelTask(MessageTask task) {
Message message = task.getMessage();
NotificationContext notificationContext = task.getNotificationContext();
String taskId = task.getTaskId();
synchronized (_lock) {
if (_taskMap.containsKey(taskId)) {
MessageTaskInfo taskInfo = _taskMap.get(taskId);
// cancel timeout task
if (taskInfo._timerTask != null) {
taskInfo._timerTask.cancel();
}
// cancel task
Future<HelixTaskResult> future = taskInfo.getFuture();
_statusUpdateUtil.logInfo(message, HelixTaskExecutor.class, "Canceling task: " + taskId,
notificationContext.getManager().getHelixDataAccessor());
// If the thread is still running it will be interrupted if cancel(true)
// is called. So state transition callbacks should implement logic to
// return
// if it is interrupted.
if (future.cancel(true)) {
_statusUpdateUtil.logInfo(message, HelixTaskExecutor.class, "Canceled task: " + taskId,
notificationContext.getManager().getHelixDataAccessor());
_taskMap.remove(taskId);
return true;
} else {
_statusUpdateUtil.logInfo(message, HelixTaskExecutor.class, "fail to cancel task: "
+ taskId, notificationContext.getManager().getHelixDataAccessor());
}
} else {
_statusUpdateUtil.logWarning(message, HelixTaskExecutor.class, "fail to cancel task: "
+ taskId + ", future not found", notificationContext.getManager()
.getHelixDataAccessor());
}
}
return false;
}