Package org.apache.helix.messaging.handling

Source Code of org.apache.helix.messaging.handling.HelixTask$TimeoutCancelTask

package org.apache.helix.messaging.handling;

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*   http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied.  See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Callable;

import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.InstanceType;
import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.messaging.handling.GroupMessageHandler.GroupMessageInfo;
import org.apache.helix.messaging.handling.MessageHandler.ErrorCode;
import org.apache.helix.messaging.handling.MessageHandler.ErrorType;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.Message;
import org.apache.helix.model.Message.MessageType;
import org.apache.helix.monitoring.StateTransitionContext;
import org.apache.helix.monitoring.StateTransitionDataPoint;
import org.apache.helix.util.StatusUpdateUtil;
import org.apache.log4j.Logger;


public class HelixTask implements Callable<HelixTaskResult>
{
  private static Logger             logger     = Logger.getLogger(HelixTask.class);
  private final Message             _message;
  private final MessageHandler      _handler;
  private final NotificationContext _notificationContext;
  private final HelixManager        _manager;
  StatusUpdateUtil                  _statusUpdateUtil;
  HelixTaskExecutor                 _executor;
  volatile boolean                  _isTimeout = false;

  public class TimeoutCancelTask extends TimerTask
  {
    HelixTaskExecutor   _executor;
    Message             _message;
    NotificationContext _context;

    public TimeoutCancelTask(HelixTaskExecutor executor,
                             Message message,
                             NotificationContext context)
    {
      _executor = executor;
      _message = message;
      _context = context;
    }

    @Override
    public void run()
    {
      _isTimeout = true;
      logger.warn("Message time out, canceling. id:" + _message.getMsgId()
          + " timeout : " + _message.getExecutionTimeout());
      _handler.onTimeout();
      _executor.cancelTask(_message, _context);
    }

  }

  public HelixTask(Message message,
                   NotificationContext notificationContext,
                   MessageHandler handler,
                   HelixTaskExecutor executor) throws Exception
  {
    this._notificationContext = notificationContext;
    this._message = message;
    this._handler = handler;
    this._manager = notificationContext.getManager();
    _statusUpdateUtil = new StatusUpdateUtil();
    _executor = executor;
  }

  @Override
  public HelixTaskResult call()
  {
    // Start the timeout TimerTask, if necessary
    Timer timer = null;
    if (_message.getExecutionTimeout() > 0)
    {
      timer = new Timer(true);
      timer.schedule(new TimeoutCancelTask(_executor, _message, _notificationContext),
                     _message.getExecutionTimeout());
      logger.info("Message starts with timeout " + _message.getExecutionTimeout()
          + " MsgId:" + _message.getMsgId());
    }
    else
    {
      logger.info("Message does not have timeout. MsgId:" + _message.getMsgId() + "/"
          + _message.getPartitionName());
    }

    HelixTaskResult taskResult = new HelixTaskResult();

    Exception exception = null;
    ErrorType type = ErrorType.INTERNAL;
    ErrorCode code = ErrorCode.ERROR;

    long start = System.currentTimeMillis();
    logger.info("msg:" + _message.getMsgId() + " handling task begin, at: " + start);
    HelixDataAccessor accessor = _manager.getHelixDataAccessor();
    _statusUpdateUtil.logInfo(_message,
                              HelixTask.class,
                              "Message handling task begin execute",
                              accessor);
    _message.setExecuteStartTimeStamp(new Date().getTime());

    // Handle the message
    try
    {
      taskResult = _handler.handleMessage();
      exception = taskResult.getException();
    }
    catch (InterruptedException e)
    {
      _statusUpdateUtil.logError(_message,
                                 HelixTask.class,
                                 e,
                                 "State transition interrupted, timeout:" + _isTimeout,
                                 accessor);
      logger.info("Message " + _message.getMsgId() + " is interrupted");
      taskResult.setInterrupted(true);
      taskResult.setException(e);
      exception = e;
    }
    catch (Exception e)
    {
      String errorMessage =
          "Exception while executing a message. " + e + " msgId: " + _message.getMsgId()
              + " type: " + _message.getMsgType();
      logger.error(errorMessage, e);
      _statusUpdateUtil.logError(_message, HelixTask.class, e, errorMessage, accessor);
      taskResult.setSuccess(false);
      taskResult.setException(e);
      taskResult.setMessage(e.getMessage());
      exception = e;
    }

    // Cancel the timer since the handling is done
    // it is fine if the TimerTask for canceling is called already
    if (timer != null)
    {
      timer.cancel();
    }

    if (taskResult.isSucess())
    {
      _statusUpdateUtil.logInfo(_message,
                                _handler.getClass(),
                                "Message handling task completed successfully",
                                accessor);
      logger.info("Message " + _message.getMsgId() + " completed.");
    }
    else if (taskResult.isInterrupted())
    {
      logger.info("Message " + _message.getMsgId() + " is interrupted");
      code = _isTimeout ? ErrorCode.TIMEOUT : ErrorCode.CANCEL;
      if (_isTimeout)
      {
        int retryCount = _message.getRetryCount();
        logger.info("Message timeout, retry count: " + retryCount + " MSGID:"
            + _message.getMsgId());
        _statusUpdateUtil.logInfo(_message,
                                  _handler.getClass(),
                                  "Message handling task timeout, retryCount:"
                                      + retryCount,
                                  accessor);
        // Notify the handler that timeout happens, and the number of retries left
        // In case timeout happens (time out and also interrupted)
        // we should retry the execution of the message by re-schedule it in
        if (retryCount > 0)
        {
          _message.setRetryCount(retryCount - 1);
          _executor.scheduleTask(_message, _handler, _notificationContext);
          return taskResult;
        }
      }
    }
    else
    // logging for errors
    {
      String errorMsg =
          "Message execution failed. msgId: " + _message.getMsgId()
              + taskResult.getMessage();
      if (exception != null)
      {
        errorMsg += exception;
      }
      logger.error(errorMsg, exception);
      _statusUpdateUtil.logError(_message, _handler.getClass(), errorMsg, accessor);
    }

    // Post-processing for the finished task
    try
    {
      if (!_message.getGroupMessageMode())
      {
        removeMessageFromZk(accessor, _message);
        reportMessageStat(_manager, _message, taskResult);
        sendReply(accessor, _message, taskResult);
      }
      else
      {
        GroupMessageInfo info = _executor._groupMsgHandler.onCompleteSubMessage(_message);
        if (info != null)
        {
          // TODO: changed to async update
          // group update current state
          Map<PropertyKey, CurrentState> curStateMap = info.merge();
          for (PropertyKey key : curStateMap.keySet())
          {
            accessor.updateProperty(key, curStateMap.get(key));
          }

          // remove group message
          removeMessageFromZk(accessor, _message);
          reportMessageStat(_manager, _message, taskResult);
          sendReply(accessor, _message, taskResult);
        }
      }
      _executor.reportCompletion(_message);
    }

    // TODO: capture errors and log here
    catch (Exception e)
    {
      String errorMessage =
          "Exception after executing a message, msgId: " + _message.getMsgId() + e;
      logger.error(errorMessage, e);
      _statusUpdateUtil.logError(_message, HelixTask.class, errorMessage, accessor);
      exception = e;
      type = ErrorType.FRAMEWORK;
      code = ErrorCode.ERROR;
    }
    //
    finally
    {
      long end = System.currentTimeMillis();
      logger.info("msg:" + _message.getMsgId() + " handling task completed, results:"
          + taskResult.isSucess() + ", at: " + end + ", took:" + (end - start));

      // Notify the handler about any error happened in the handling procedure, so that
      // the handler have chance to finally cleanup
      if (exception != null)
      {
        _handler.onError(exception, code, type);
      }
    }
    return taskResult;
  }

  private void removeMessageFromZk(HelixDataAccessor accessor, Message message)
  {
    Builder keyBuilder = accessor.keyBuilder();
    if (message.getTgtName().equalsIgnoreCase("controller"))
    {
      // TODO: removeProperty returns boolean
      accessor.removeProperty(keyBuilder.controllerMessage(message.getMsgId()));
    }
    else
    {
      accessor.removeProperty(keyBuilder.message(_manager.getInstanceName(),
                                                 message.getMsgId()));
    }
  }

  private void sendReply(HelixDataAccessor accessor,
                         Message message,
                         HelixTaskResult taskResult)
  {
    if (_message.getCorrelationId() != null
        && !message.getMsgType().equals(MessageType.TASK_REPLY.toString()))
    {
      logger.info("Sending reply for message " + message.getCorrelationId());
      _statusUpdateUtil.logInfo(message, HelixTask.class, "Sending reply", accessor);

      taskResult.getTaskResultMap().put("SUCCESS", "" + taskResult.isSucess());
      taskResult.getTaskResultMap().put("INTERRUPTED", "" + taskResult.isInterrupted());
      if (!taskResult.isSucess())
      {
        taskResult.getTaskResultMap().put("ERRORINFO", taskResult.getMessage());
      }
      Message replyMessage =
          Message.createReplyMessage(_message,
                                     _manager.getInstanceName(),
                                     taskResult.getTaskResultMap());
      replyMessage.setSrcInstanceType(_manager.getInstanceType());

      if (message.getSrcInstanceType() == InstanceType.PARTICIPANT)
      {
        Builder keyBuilder = accessor.keyBuilder();
        accessor.setProperty(keyBuilder.message(message.getMsgSrc(),
                                                replyMessage.getMsgId()),
                             replyMessage);
      }
      else if (message.getSrcInstanceType() == InstanceType.CONTROLLER)
      {
        Builder keyBuilder = accessor.keyBuilder();
        accessor.setProperty(keyBuilder.controllerMessage(replyMessage.getMsgId()),
                             replyMessage);
      }
      _statusUpdateUtil.logInfo(message, HelixTask.class, "1 msg replied to "
          + replyMessage.getTgtName(), accessor);
    }
  }

  private void reportMessageStat(HelixManager manager,
                                 Message message,
                                 HelixTaskResult taskResult)
  {
    // report stat
    if (!message.getMsgType().equals(MessageType.STATE_TRANSITION.toString()))
    {
      return;
    }
    long now = new Date().getTime();
    long msgReadTime = message.getReadTimeStamp();
    long msgExecutionStartTime = message.getExecuteStartTimeStamp();
    if (msgReadTime != 0 && msgExecutionStartTime != 0)
    {
      long totalDelay = now - msgReadTime;
      long executionDelay = now - msgExecutionStartTime;
      if (totalDelay > 0 && executionDelay > 0)
      {
        String fromState = message.getFromState();
        String toState = message.getToState();
        String transition = fromState + "--" + toState;

        StateTransitionContext cxt =
            new StateTransitionContext(manager.getClusterName(),
                                       manager.getInstanceName(),
                                       message.getResourceName(),
                                       transition);

        StateTransitionDataPoint data =
            new StateTransitionDataPoint(totalDelay,
                                         executionDelay,
                                         taskResult.isSucess());
        _executor.getParticipantMonitor().reportTransitionStat(cxt, data);
      }
    }
    else
    {
      logger.warn("message read time and start execution time not recorded.");
    }
  }

};
TOP

Related Classes of org.apache.helix.messaging.handling.HelixTask$TimeoutCancelTask

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.