Package net.timewalker.ffmq3.remote.session

Source Code of net.timewalker.ffmq3.remote.session.RemoteSession

/*
* This file is part of FFMQ.
*
* FFMQ is free software; you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* FFMQ is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with FFMQ; if not, write to the Free Software
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
*/
package net.timewalker.ffmq3.remote.session;

import java.util.ArrayList;
import java.util.List;
import java.util.Vector;

import javax.jms.Destination;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;

import net.timewalker.ffmq3.FFMQClientSettings;
import net.timewalker.ffmq3.FFMQException;
import net.timewalker.ffmq3.client.ClientEnvironment;
import net.timewalker.ffmq3.common.destination.DestinationTools;
import net.timewalker.ffmq3.common.destination.TemporaryQueueRef;
import net.timewalker.ffmq3.common.destination.TemporaryTopicRef;
import net.timewalker.ffmq3.common.message.AbstractMessage;
import net.timewalker.ffmq3.common.message.MessageTools;
import net.timewalker.ffmq3.common.session.AbstractSession;
import net.timewalker.ffmq3.remote.connection.RemoteConnection;
import net.timewalker.ffmq3.transport.PacketTransportEndpoint;
import net.timewalker.ffmq3.transport.packet.query.AcknowledgeQuery;
import net.timewalker.ffmq3.transport.packet.query.AsyncAcknowledgeQuery;
import net.timewalker.ffmq3.transport.packet.query.CloseSessionQuery;
import net.timewalker.ffmq3.transport.packet.query.CommitQuery;
import net.timewalker.ffmq3.transport.packet.query.CreateSessionQuery;
import net.timewalker.ffmq3.transport.packet.query.CreateTemporaryQueueQuery;
import net.timewalker.ffmq3.transport.packet.query.CreateTemporaryTopicQuery;
import net.timewalker.ffmq3.transport.packet.query.MultiplePutQuery;
import net.timewalker.ffmq3.transport.packet.query.PutQuery;
import net.timewalker.ffmq3.transport.packet.query.RecoverQuery;
import net.timewalker.ffmq3.transport.packet.query.RollbackQuery;
import net.timewalker.ffmq3.transport.packet.query.UnsubscribeQuery;
import net.timewalker.ffmq3.transport.packet.response.CreateTemporaryQueueResponse;
import net.timewalker.ffmq3.transport.packet.response.CreateTemporaryTopicResponse;
import net.timewalker.ffmq3.utils.ErrorTools;
import net.timewalker.ffmq3.utils.StringTools;
import net.timewalker.ffmq3.utils.id.IntegerID;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/**
* RemoteSession
*/
public class RemoteSession extends AbstractSession
{
    private static final Log log = LogFactory.getLog(RemoteSession.class);
   
    // Parent connection
    protected final PacketTransportEndpoint transportEndpoint;
   
    // Settings
    private int maxPendingMessages;
   
    // Runtime
    private List pendingMessages = new Vector();
    private List deliveredMessageIDs = new Vector();
    private boolean debugEnabled = log.isDebugEnabled();
    private boolean sendAcksAsync;
   
    /**
     * Constructor
     */
    public RemoteSession( IntegerID sessionId , RemoteConnection connection , PacketTransportEndpoint transportEndpoint , boolean transacted , int acknowledgeMode )
    {
        super(sessionId,connection,transacted,acknowledgeMode);
        this.transportEndpoint = transportEndpoint;
        this.maxPendingMessages =
            ClientEnvironment.getSettings().getIntProperty(FFMQClientSettings.PRODUCER_BUFFERING_SIZE,2);
        this.sendAcksAsync = ClientEnvironment.getSettings().getBooleanProperty(FFMQClientSettings.CONSUMER_SEND_ACKS_ASYNC, true);
        log.debug("New remote session ID is "+sessionId);
    }

    /**
     * Initialize the remote endpoint for this session
     */
    public void remoteInit() throws JMSException
    {
        CreateSessionQuery query = new CreateSessionQuery();
        query.setSessionId(id);
        query.setTransacted(transacted);
        query.setAcknowledgeMode(acknowledgeMode);
        transportEndpoint.blockingRequest(query);
    }
   
    public final PacketTransportEndpoint getTransportEndpoint()
    {
        return transportEndpoint;
    }
   
    protected final void dispatch( Message message ) throws JMSException
    {
        if (maxPendingMessages <= 0 || !transacted)
        {
            // Send immediately
            remoteSend(message);
        }
        else
        {
            // Client-side buffering
            List toBeDispatched = null;
            synchronized (pendingMessages)
            {
                Message msgCopy = MessageTools.makeInternalCopy(message);
                pendingMessages.add(msgCopy);
                if (pendingMessages.size() > maxPendingMessages)
                {
                    toBeDispatched = new ArrayList();
                    toBeDispatched.addAll(pendingMessages);
                    pendingMessages.clear();
                }
                else
                {
                  if (debugEnabled)
                    log.debug("#"+id+" Buffering message to be sent : "+message.getJMSMessageID());
                }
            }
            if (toBeDispatched != null)
                remoteSend(toBeDispatched);
        }
    }
   
    /**
     * Remotely send a message to a target destination
     */
    private void remoteSend( Message message ) throws JMSException
    {
      if (debugEnabled)
        log.debug("#"+id+" Sending message "+message.getJMSMessageID());
     
        PutQuery query = new PutQuery();
        query.setSessionId(id);
        query.setMessage((AbstractMessage)message);
        transportEndpoint.blockingRequest(query);
    }
   
    /**
     * Remotely send messages to a target destination
     */
    private void remoteSend( List messages ) throws JMSException
    {
      if (debugEnabled)
        log.debug("#"+id+" Sending messages ("+messages.size()+")");
     
        MultiplePutQuery query = new MultiplePutQuery();
        query.setSessionId(id);
        query.setMessages(messages);
        transportEndpoint.blockingRequest(query);
    }
   
    /**
     * Add a delivered message ID
     * @param deliveredMessageID
     */
    public final void addDeliveredMessageID( String deliveredMessageID )
    {
      synchronized (externalAccessLock)
    {
        if (closed)
          return;
       
        if (debugEnabled)
          log.debug(this+" Adding delivered message ID : "+deliveredMessageID);
       
        this.deliveredMessageIDs.add(deliveredMessageID);
    }
    }
   
    /* (non-Javadoc)
     * @see javax.jms.Session#createBrowser(javax.jms.Queue, java.lang.String)
     */
    public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException
    {
      synchronized (externalAccessLock)
    {
        checkNotClosed();
        RemoteQueueBrowser browser = new RemoteQueueBrowser(idProvider.createID(),
                                                        this,
                                                        queue,
                                                        messageSelector);
        registerBrowser(browser);
        browser.remoteInit();
        return browser;
    }
    }

    /* (non-Javadoc)
     * @see javax.jms.Session#createConsumer(javax.jms.Destination, java.lang.String, boolean)
     */
    public final MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) throws JMSException
    {
      synchronized (externalAccessLock)
    {
          checkNotClosed();
          RemoteMessageConsumer consumer =  new RemoteMessageConsumer(idProvider.createID(),
                                                                  this,
                                                                  DestinationTools.asRef(destination),
                                                                  messageSelector,
                                                                  noLocal);
          registerConsumer(consumer);
          consumer.remoteInit();
          return consumer;
    }
    }

    /* (non-Javadoc)
     * @see javax.jms.Session#createDurableSubscriber(javax.jms.Topic, java.lang.String, java.lang.String, boolean)
     */
    public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException
    {
      synchronized (externalAccessLock)
    {
        checkNotClosed();
          RemoteDurableTopicSubscriber subscriber = new RemoteDurableTopicSubscriber(idProvider.createID(),
                                                                                 this,
                                                                                     topic,
                                                                                     messageSelector,
                                                                                     noLocal,
                                                                                     name);
          registerConsumer(subscriber);
          subscriber.remoteInit();
          return subscriber;
    }
    }

    /* (non-Javadoc)
     * @see javax.jms.Session#createProducer(javax.jms.Destination)
     */
    public final MessageProducer createProducer(Destination destination) throws JMSException
    {
      synchronized (externalAccessLock)
    {
          checkNotClosed();
          RemoteMessageProducer producer = new RemoteMessageProducer(this,
                                                                     DestinationTools.asRef(destination),
                                                                     idProvider.createID());
          registerProducer(producer);
          return producer;
    }
    }

    /* (non-Javadoc)
     * @see javax.jms.Session#createTemporaryQueue()
     */
    public TemporaryQueue createTemporaryQueue() throws JMSException
    {
      synchronized (externalAccessLock)
    {
          checkNotClosed();
         
          CreateTemporaryQueueQuery query = new CreateTemporaryQueueQuery();
          query.setSessionId(id);
          CreateTemporaryQueueResponse response = (CreateTemporaryQueueResponse)transportEndpoint.blockingRequest(query);
         
          return new TemporaryQueueRef(connection,response.getQueueName());
    }
    }

    /* (non-Javadoc)
     * @see javax.jms.Session#createTemporaryTopic()
     */
    public TemporaryTopic createTemporaryTopic() throws JMSException
    {
      synchronized (externalAccessLock)
    {
          checkNotClosed();
         
          CreateTemporaryTopicQuery query = new CreateTemporaryTopicQuery();
          query.setSessionId(id);
          CreateTemporaryTopicResponse response =
              (CreateTemporaryTopicResponse)transportEndpoint.blockingRequest(query);
         
          return new TemporaryTopicRef(connection,response.getTopicName());
    }
    }

    /* (non-Javadoc)
     * @see javax.jms.Session#recover()
     */
    public final void recover() throws JMSException
    {
      if (transacted)
            throw new IllegalStateException("Session is transacted"); // [JMS SPEC]
     
      synchronized (externalAccessLock)
    {
          checkNotClosed();

          RecoverQuery query = new RecoverQuery();
          query.setSessionId(id);
          query.setDeliveredMessageIDs(deliveredMessageIDs);
          transportEndpoint.blockingRequest(query);
          deliveredMessageIDs.clear();
    }
    }

    /* (non-Javadoc)
     * @see javax.jms.Session#unsubscribe(java.lang.String)
     */
    public void unsubscribe(String subscriptionName) throws JMSException
    {
      if (StringTools.isEmpty(subscriptionName))
            throw new FFMQException("Empty subscription name","INVALID_SUBSCRIPTION_NAME");
     
      synchronized (externalAccessLock)
    {
          checkNotClosed();
         
          UnsubscribeQuery query = new UnsubscribeQuery();
          query.setSessionId(id);
          query.setSubscriptionName(subscriptionName);
          transportEndpoint.blockingRequest(query);
    }
    }

    /*
     * (non-Javadoc)
     * @see javax.jms.Session#commit()
     */
    public final void commit() throws JMSException
    {
      if (!transacted)
            throw new IllegalStateException("Session is not transacted"); // [JMS SPEC]
     
      log.debug("#"+id+" commit()");
     
      synchronized (externalAccessLock)
    {
          checkNotClosed();
         
          // Flush pending messages if any
          List toBeDispatched = null;
          synchronized (pendingMessages)
            {
              if (!pendingMessages.isEmpty())
                {
                    toBeDispatched = new ArrayList();
                    toBeDispatched.addAll(pendingMessages);
                    pendingMessages.clear();
                }
            }
         
          CommitQuery query = new CommitQuery();
          query.setSessionId(id);
          query.setDeliveredMessageIDs(deliveredMessageIDs);
          query.setMessages(toBeDispatched);
          transportEndpoint.blockingRequest(query);
          deliveredMessageIDs.clear();
    }
    }

    /*
     * (non-Javadoc)
     * @see javax.jms.Session#rollback()
     */
    public final void rollback() throws JMSException
    {
      if (!transacted)
            throw new IllegalStateException("Session is not transacted"); // [JMS SPEC]
     
      log.debug("#"+id+" rollback()");
     
      synchronized (externalAccessLock)
    {
          checkNotClosed();
 
          pendingMessages.clear();
         
          RollbackQuery query = new RollbackQuery();
          query.setSessionId(id);
          query.setDeliveredMessageIDs(deliveredMessageIDs);
          transportEndpoint.blockingRequest(query);
          deliveredMessageIDs.clear();
    }
    }

    /* (non-Javadoc)
     * @see net.timewalker.ffmq3.common.session.AbstractSession#onSessionClose()
     */
    protected void onSessionClose()
    {
      super.onSessionClose();
     
      try
      {
          CloseSessionQuery query = new CloseSessionQuery();
          query.setSessionId(id);
          transportEndpoint.blockingRequest(query);
      }
        catch (JMSException e)
        {
            ErrorTools.log(e, log);
        }
    }
   
    /*
     * (non-Javadoc)
     * @see net.timewalker.ffmq3.common.session.AbstractSession#onSessionClosed()
     */
    protected void onSessionClosed()
    {
        super.onSessionClosed();
        transportEndpoint.close();
    }
   
    /*
     * (non-Javadoc)
     * @see net.timewalker.ffmq3.common.session.AbstractSession#acknowledge()
     */
    public final void acknowledge() throws JMSException
    {
      if (transacted)
            throw new IllegalStateException("Session is transacted"); // [JMS SPEC]
     
      synchronized (externalAccessLock)
    {
          checkNotClosed();
          if (deliveredMessageIDs.isEmpty())
            throw new FFMQException("No received message to acknowledge","INTERNAL_ERROR");
         
          if (sendAcksAsync)
          {
            // Copy message list
            List messageIDs = new ArrayList(deliveredMessageIDs.size());
            for(int n=0;n<deliveredMessageIDs.size();n++)
              messageIDs.add(deliveredMessageIDs.get(n));
            deliveredMessageIDs.clear();
           
            AsyncAcknowledgeQuery query = new AsyncAcknowledgeQuery();
            query.setSessionId(id);
            query.setDeliveredMessageIDs(messageIDs);
            transportEndpoint.nonBlockingRequest(query);
          }
          else
          {
            AcknowledgeQuery query = new AcknowledgeQuery();
            query.setSessionId(id);
            query.setDeliveredMessageIDs(deliveredMessageIDs);
            transportEndpoint.blockingRequest(query);
            deliveredMessageIDs.clear();
          }
    }
    }
}
TOP

Related Classes of net.timewalker.ffmq3.remote.session.RemoteSession

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.