Package org.codehaus.activemq.ra

Source Code of org.codehaus.activemq.ra.ActiveMQAsfEndpointWorker$ServerSessionImpl

/**
*
* Copyright 2004 Hiram Chirino
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/
package org.codehaus.activemq.ra;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.message.ActiveMQQueue;
import org.codehaus.activemq.message.ActiveMQTopic;

import javax.jms.*;
import javax.resource.ResourceException;
import javax.resource.spi.endpoint.MessageEndpoint;
import javax.resource.spi.work.*;
import javax.transaction.xa.XAResource;
import java.util.ArrayList;
import java.util.LinkedList;

/**
* @version $Revision: 1.3 $ $Date: 2004/07/31 21:11:00 $
*/
public class ActiveMQAsfEndpointWorker extends ActiveMQBaseEndpointWorker {

  private static final Log log = LogFactory.getLog(ActiveMQAsfEndpointWorker.class);
    private static final int MAX_MSGS_PER_SESSION = 1;
    private static final int MAX_SESSION = 10;


    ConnectionConsumer consumer;
  private ServerSessionPoolImpl serverSessionPool;

    /**
   * @param adapter
   * @param key
     * @throws ResourceException
   */
  public ActiveMQAsfEndpointWorker(ActiveMQResourceAdapter adapter, ActiveMQEndpointActivationKey key) throws ResourceException {
    super(adapter, key);
  }

 
    public void start() throws WorkException, ResourceException {
      log.debug("Starting");
        boolean ok = false;
        try {
        serverSessionPool = new ServerSessionPoolImpl();
            ActiveMQActivationSpec activationSpec = endpointActivationKey.getActivationSpec();       
         
            Destination dest = null;

            if ("javax.jms.Queue".equals(activationSpec.getDestinationType())) {
                dest = new ActiveMQQueue(activationSpec.getDestinationName());
            } else if ("javax.jms.Topic".equals(activationSpec.getDestinationType())) {
                dest = new ActiveMQTopic(activationSpec.getDestinationName());
            } else {
                throw new ResourceException("Unknown destination type: " + activationSpec.getDestinationType());
            }
           
            if (emptyToNull(activationSpec.getDurableSubscriptionName()) != null) {
                consumer = adapter.getPhysicalConnection().createDurableConnectionConsumer((Topic)dest,activationSpec.getDurableSubscriptionName(), emptyToNull(activationSpec.getMessageSelector()), serverSessionPool, MAX_MSGS_PER_SESSION);
            } else {
                consumer = adapter.getPhysicalConnection().createConnectionConsumer(dest, emptyToNull(activationSpec.getMessageSelector()), serverSessionPool, MAX_MSGS_PER_SESSION);
            }

            ok = true;
          log.debug("Started");

        } catch (JMSException e) {
            throw new ResourceException("Could not start the endpoint.", e);
        } finally {
            // We don't want to leak sessions on errors.  Keep them around only if
            // there were no errors.
            if (!ok) {
                safeClose(consumer);
            }
        }
    }
   
    /**
   *
   */
    public void stop() throws InterruptedException {
      safeClose(consumer);
      serverSessionPool.close();
    }
   
  class ServerSessionPoolImpl implements ServerSessionPool {

    ServerSessionImpl ss;
    ArrayList idleSessions = new ArrayList();
    LinkedList activeSessions = new LinkedList();
    int sessionIds=0;
    int nextUsedSession;
    boolean closing=false;
   
    public ServerSessionPoolImpl() {
    }
   
    public ServerSessionImpl createServerSessionImpl() throws JMSException {
      Session session = adapter.getPhysicalConnection().createSession(true, Session.SESSION_TRANSACTED);
      return new ServerSessionImpl(this, session);
    }
   
    /**
     */
    synchronized public ServerSession getServerSession() throws JMSException {
          log.debug("ServerSession requested.");
      if( closing )
        throw new JMSException("Session Pool Shutting Down.");
     
      if( idleSessions.size()>0 ) {
        ServerSessionImpl ss = (ServerSessionImpl) idleSessions.remove(idleSessions.size()-1);
        activeSessions.addLast(ss);
            log.debug("Using idle session: "+ss);
        return ss;
      } else {
        // Are we at the upper limit?
        if( activeSessions.size() >= MAX_SESSION ) {
          // then reuse the allready created sessions..
          // This is going to queue up messages into a session for processing.
          ServerSessionImpl ss = (ServerSessionImpl) activeSessions.removeFirst();         
          activeSessions.addLast(ss);
              log.debug("Reusing an active session: "+ss);
          return ss;
        } else {
          ServerSessionImpl ss = createServerSessionImpl();         
          activeSessions.addLast(ss);
              log.debug("Created a new session: "+ss);
          return ss;
        }
      }
    }
   
    synchronized public void returnToPool(ServerSessionImpl ss) {
          log.debug("Session returned to pool: "+ss);
      idleSessions.add(ss);
    }
   
    public void close() {
      synchronized( this ) {
        closing = true;
      }     
    }
  }
 
  class ServerSessionImpl implements ServerSession, Work, MessageListener {
   
    Session session;
    private final ServerSessionPoolImpl pool;

    private Object runControlMutex = new Object();
      boolean workPendingFlag=false;
      boolean runningFlag=false;
    int runCounter=0;
    XAResource xaResource;
     

    public ServerSessionImpl(ServerSessionPoolImpl pool, Session session) throws JMSException {
      this.pool = pool;
      this.session=session;
      this.session.setMessageListener(this);
      if( session instanceof XASession ) {
        xaResource = ((XASession)session).getXAResource();
      }
    }

    /**
     * @see javax.jms.ServerSession#getSession()
     */
    public Session getSession() throws JMSException {
      return session;
    }

   
    /**
     * @see javax.jms.ServerSession#start()
     */
    public void start() throws JMSException {

          log.debug("ServerSession started.");
      synchronized(runControlMutex) {
        runCounter++;
        // Is our thread allready running
        if( runningFlag || workPendingFlag ) {
          // let it know that it should do more work..
          workPendingFlag=true;
              log.debug("ServerSession allready running.");
          return;
        }
        workPendingFlag=true;
      }
     
      // We get here because we need to start a async worker.
          log.debug("ServerSession queuing request for a run.");
      try {
              workManager.scheduleWork(this, WorkManager.INDEFINITE, null,
                  new WorkListener() {
                      //The work listener is useful only for debugging...
                      public void workAccepted(WorkEvent event) {
                          log.debug("Work accepted: " + event);
                      }
 
                      public void workRejected(WorkEvent event) {
                          log.debug("Work rejected: " + event);
                      }
 
                      public void workStarted(WorkEvent event) {
                          log.debug("Work started: " + event);
                      }
 
                      public void workCompleted(WorkEvent event) {
                          log.debug("Work completed: " + event);
                      }
 
                  });
      } catch ( WorkException e ) {
        throw (JMSException)new JMSException("Work could not be started: "+e).initCause(e);
      }
    }

    /**
     * @see java.lang.Runnable#run()
     */
    public void run() {
      while(true) {
        synchronized(runControlMutex) {
          workPendingFlag=false;
          runningFlag=true;
        }

        log.debug("Running: " + this);       
        session.run();
       
        synchronized(runControlMutex) {
          runCounter--;
          runningFlag=false;
          if( !workPendingFlag ) {
            if( runCounter==0 )
              pool.returnToPool(this);           
            break;
           
          }
        }
      }
    }

    /**
     * @see javax.resource.spi.work.Work#release()
     */
    public void release() {
      log.debug("release called");       
    }

    /**
     * @see javax.jms.MessageListener#onMessage(javax.jms.Message)
     */
    public void onMessage(Message message) {     
            try {
             
                MessageEndpoint endpoint = endpointFactory.createEndpoint(xaResource);
                MessageListener listener = (MessageListener) endpoint;

                endpoint.beforeDelivery(ON_MESSAGE_METHOD);
                try {
                    listener.onMessage(message);
                } finally {
                    endpoint.afterDelivery();
                }
               
            } catch (NoSuchMethodException e) {
                log.info(e);
            } catch (ResourceException e) {
                log.info(e);
            }     
    }
   
    /**
     * @see java.lang.Object#toString()
     */
    public String toString() {
      return "ServerSessionImpl[session="+session+"]";
    }
 
  }

    private String emptyToNull(String value) {
        if ("".equals(value)) {
            return null;
        }
        return value;
    }

}
TOP

Related Classes of org.codehaus.activemq.ra.ActiveMQAsfEndpointWorker$ServerSessionImpl

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.