Package org.apache.uima.adapter.jms.activemq

Source Code of org.apache.uima.adapter.jms.activemq.ConcurrentMessageListener

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*   http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied.  See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.uima.adapter.jms.activemq;

import java.io.InvalidClassException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

import org.apache.uima.UIMAFramework;
import org.apache.uima.aae.controller.AggregateAnalysisEngineController;
import org.apache.uima.aae.controller.AggregateAnalysisEngineController_impl;
import org.apache.uima.aae.controller.AnalysisEngineController;
import org.apache.uima.aae.controller.LocalCache.CasStateEntry;
import org.apache.uima.aae.delegate.Delegate;
import org.apache.uima.aae.message.AsynchAEMessage;
import org.apache.uima.adapter.jms.JmsConstants;
import org.apache.uima.util.Level;
import org.springframework.jms.listener.SessionAwareMessageListener;

/**
* Message listener injected at runtime into Aggregate to handle a race condition when multiple
* threads simultaneously process messages from a Cas Multiplier. It is only used to process
* messages from a Cas Multiplier and only if the reply queue has more than one consumer thread
* configured in a deployment descriptor. The listener creates a pool of threads equal to the number
* of concurrent consumers defined in the DD for the listener on the reply queue. Once the message
* is handled in onMessage(), it is than delegated for processing to one of the available threads
* from the pool.
*
* This listener guarantees processing order. It receives messages from Spring in a single thread
* and if it finds a child CAS in the message, it increments the parent (input) CAS child count and
* delegates processing to the InputChannel instance.
*
* The race condition: The Cas Multiplier sends the last child and the parent almost at the same
* time. Both are received by the aggregate and are processed in different threads, if a scaleout is
* used on the reply queue. One thread may start processing the input CAS while the other thread
* (with the last child) is not yet allowed to run. The first thread takes the input CAS all the way
* to the final step and since at this time, the input CAS has no children ( the thread processing
* the last child has not updated the child count yet), it will be prematurely released. When the
* thread with the last child is allowed to run, it finds that the parent no longer exists in the
* cache.
*
*
*/
public class ConcurrentMessageListener implements SessionAwareMessageListener {
  private static final Class CLASS_NAME = ConcurrentMessageListener.class;

  private SessionAwareMessageListener delegateListener;

  private int concurrentThreadCount = 0;

  private AnalysisEngineController controller;

  private ThreadPoolExecutor executor = null;

  private LinkedBlockingQueue<Runnable> workQueue;

  private CountDownLatch controllerLatch = new CountDownLatch(1);

  /**
   * Creates a listener with a given number of process threads. This listener is injected between
   * Spring and JmsInputChannel to enable orderly processing of CASes. This listener is only used on
   * reply queues that have scale out attribute in DD greater than 1. Its main job is to increment
   * number of child CASes for a given input CAS. It does so in a single thread, and once it
   * completes the update this listener submits the CAS for further processing up to the
   * JmsInputChannel. The CAS is submitted to a queue where the executor assigns a free thread to
   * process the CAS.
   *
   * @param concurrentThreads
   *          - number of threads to use to process CASes
   * @param delegateListener
   *          - JmsInputChannel instance to delegate CAS to
   * @throws InvalidClassException
   */
  public ConcurrentMessageListener(int concurrentThreads, Object delegateListener)
          throws InvalidClassException {
    if (!(delegateListener instanceof SessionAwareMessageListener)) {
      throw new InvalidClassException("Invalid Delegate Listener. Expected Object of Type:"
              + SessionAwareMessageListener.class + " Received:" + delegateListener.getClass());
    }
    concurrentThreadCount = concurrentThreads;
    this.delegateListener = (SessionAwareMessageListener) delegateListener;
    if (concurrentThreads > 1) {
      workQueue = new LinkedBlockingQueue<Runnable>(concurrentThreadCount);
      executor = new ThreadPoolExecutor(concurrentThreads, concurrentThreads, Long.MAX_VALUE,
              TimeUnit.NANOSECONDS, workQueue);
      executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
      executor.prestartAllCoreThreads();
    }
  }

  public void stop() {
    if (executor != null) {
      executor.shutdownNow();
      while (!executor.isTerminated()) {
        try {
          executor.awaitTermination(200, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
          break;
        }
      }
    }
  }

  public void setAnalysisEngineController(AnalysisEngineController controller) {
    this.controller = controller;
    controllerLatch.countDown();
  }

  private boolean isMessageFromCasMultiplier(final Message message) throws JMSException {
    return message.propertyExists(AsynchAEMessage.CasSequence);
  }

  /**
   * Intercept a message to increment a child count of the input CAS. This method is always called
   * in a single thread, guaranteeing order of processing. The child CAS will always come here
   * first. Once the count is updated, or this CAS is not an child, the message will be delegated to
   * one of the threads in the pool that will eventually call InputChannel object where the actual
   * processing of the message begins.
   *
   */
  public void onMessage(final Message message, final Session session) throws JMSException {
    try {
      // Wait until the controller is plugged in
      controllerLatch.await();
    } catch (InterruptedException e) {
    }
    if (isMessageFromCasMultiplier(message)) {
      // Check if the message came from a Cas Multiplier and it contains a new Process Request
      int command = message.getIntProperty(AsynchAEMessage.Command);
      int messageType = message.getIntProperty(AsynchAEMessage.MessageType);
      // Intercept Cas Process Request from a Cas Multiplier
      if (command == AsynchAEMessage.Process && messageType == AsynchAEMessage.Request) {
        String msgFrom = (String) message.getStringProperty(AsynchAEMessage.MessageFrom);
        if (msgFrom != null && controller instanceof AggregateAnalysisEngineController) {
          String delegateKey = ((AggregateAnalysisEngineController) controller)
                  .lookUpDelegateKey(msgFrom);
          if (delegateKey != null) {
            Delegate delegate = ((AggregateAnalysisEngineController) controller)
                    .lookupDelegate(delegateKey);
            delegate.setConcurrentConsumersOnReplyQueue();
          }
        }
        try {
          String parentCasReferenceId = message
                  .getStringProperty(AsynchAEMessage.InputCasReference);
          // Fetch parent CAS entry from the local cache
          CasStateEntry parentEntry = controller.getLocalCache().lookupEntry(parentCasReferenceId);
          // increment number of child CASes this parent has in play
          parentEntry.incrementSubordinateCasInPlayCount();
        } catch (Exception e) {
          if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
                    "onMessage", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
                    "UIMAJMS_exception__WARNING", new Object[] { JmsConstants.threadName(), e });
          }
        }
      }

    }
    if (concurrentThreadCount > 1) {
      // Delegate meesage to the JmsInputChannel
      executor.execute(new Runnable() {
        public void run() {
          try {
            delegateListener.onMessage(message, session);
          } catch (Exception e) {
            if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
              UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(),
                      "onMessage", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
                      "UIMAJMS_exception__WARNING", new Object[] { JmsConstants.threadName(), e });
            }
          }
        }
      });
    } else {
      // Just handoff the message to the InputChannel object
      delegateListener.onMessage(message, session);
    }
  }
}
TOP

Related Classes of org.apache.uima.adapter.jms.activemq.ConcurrentMessageListener

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.