/* */ package org.jboss.jms.client;
/* */
/* */ import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
/* */ import java.util.ArrayList;
/* */ import java.util.List;
/* */ import javax.jms.ConnectionConsumer;
/* */ import javax.jms.JMSException;
/* */ import javax.jms.Message;
/* */ import javax.jms.MessageListener;
/* */ import javax.jms.ServerSession;
/* */ import javax.jms.ServerSessionPool;
/* */ import org.jboss.jms.client.delegate.DelegateSupport;
/* */ import org.jboss.jms.client.state.ConsumerState;
/* */ import org.jboss.jms.delegate.ConnectionDelegate;
/* */ import org.jboss.jms.delegate.ConsumerDelegate;
/* */ import org.jboss.jms.delegate.SessionDelegate;
/* */ import org.jboss.jms.destination.JBossDestination;
/* */ import org.jboss.jms.message.MessageProxy;
/* */ import org.jboss.logging.Logger;
/* */ import org.jboss.messaging.util.MessageQueueNameHelper;
/* */
/* */ public class JBossConnectionConsumer
/* */ implements ConnectionConsumer, Runnable
/* */ {
/* 67 */ private static Logger log = Logger.getLogger(JBossConnectionConsumer.class);
/* */
/* 69 */ private static boolean trace = log.isTraceEnabled();
/* */ private static final int TIMEOUT = 20000;
/* */ private ConsumerDelegate cons;
/* */ private SessionDelegate sess;
/* */ private String consumerID;
/* */ private ServerSessionPool serverSessionPool;
/* */ private int maxMessages;
/* */ private volatile boolean closed;
/* */ private Thread internalThread;
/* */ private int id;
/* 98 */ private static SynchronizedInt threadId = new SynchronizedInt(0);
/* */ private int maxDeliveries;
/* */ private String queueName;
/* */ private boolean shouldAck;
/* */
/* */ public JBossConnectionConsumer(ConnectionDelegate conn, JBossDestination dest, String subName, String messageSelector, ServerSessionPool sessPool, int maxMessages)
/* */ throws JMSException
/* */ {
/* 114 */ this.serverSessionPool = sessPool;
/* 115 */ this.maxMessages = maxMessages;
/* */
/* 117 */ if (this.maxMessages < 1)
/* */ {
/* 119 */ this.maxMessages = 1;
/* */ }
/* */
/* 124 */ this.sess = conn.createSessionDelegate(false, 2, false);
/* */
/* 126 */ this.cons = this.sess.createConsumerDelegate(dest, messageSelector, false, subName, true, true);
/* */
/* 128 */ ConsumerState state = (ConsumerState)((DelegateSupport)this.cons).getState();
/* */
/* 130 */ this.consumerID = state.getConsumerID();
/* */
/* 132 */ this.maxDeliveries = state.getMaxDeliveries();
/* */
/* 134 */ this.shouldAck = state.isShouldAck();
/* */
/* 136 */ if (subName != null)
/* */ {
/* 138 */ this.queueName = MessageQueueNameHelper.createSubscriptionName(conn.getClientID(), subName);
/* */ }
/* */ else
/* */ {
/* 142 */ this.queueName = dest.getName();
/* */ }
/* */
/* 145 */ this.id = threadId.increment();
/* 146 */ this.internalThread = new Thread(this, "Connection Consumer for dest " + dest + " id=" + this.id);
/* 147 */ this.internalThread.start();
/* */
/* 149 */ if (trace) log.trace(this + " created");
/* */ }
/* */
/* */ public ServerSessionPool getServerSessionPool()
/* */ throws JMSException
/* */ {
/* 156 */ return this.serverSessionPool;
/* */ }
/* */
/* */ public void close() throws JMSException
/* */ {
/* 161 */ if (trace) log.trace("close " + this);
/* */
/* 163 */ doClose();
/* */
/* 166 */ if (trace) log.trace(this + " Waiting for internal thread to complete");
/* */
/* */ try
/* */ {
/* 170 */ this.internalThread.join(20000L);
/* */
/* 172 */ if (this.internalThread.isAlive())
/* */ {
/* 174 */ throw new JMSException(this + " Waited " + 20000 + " ms for internal thread to complete, but it didn't");
/* */ }
/* */ }
/* */ catch (InterruptedException e)
/* */ {
/* 179 */ if (trace) log.trace(this + " Thread interrupted while waiting for internal thread to complete");
/* */
/* */ }
/* */
/* 183 */ if (trace) log.trace("Closed: " + this);
/* */ }
/* */
/* */ public void run()
/* */ {
/* 190 */ if (trace) log.trace("running connection consumer");
/* */ try
/* */ {
/* 193 */ List mesList = new ArrayList();
/* */ while (true)
/* */ {
/* 197 */ if (this.closed)
/* */ {
/* 199 */ if (!trace) break; log.trace("Connection consumer is closed, breaking"); break;
/* */ }
/* */
/* 203 */ if (mesList.isEmpty())
/* */ {
/* 206 */ for (int i = 0; i < this.maxMessages; i++)
/* */ {
/* 210 */ if (trace) log.trace(this + " attempting to get message with receiveNoWait");
/* */
/* 212 */ Message m = null;
/* */ try
/* */ {
/* 216 */ m = this.cons.receive(-1L);
/* */ }
/* */ catch (JMSException e)
/* */ {
/* 221 */ if (!this.closed)
/* */ {
/* 223 */ throw e;
/* */ }
/* */ }
/* */
/* 227 */ if (m == null)
/* */ {
/* 229 */ if (!trace) break; log.trace("receiveNoWait did not retrieve any message"); break;
/* */ }
/* */
/* 233 */ if (trace) log.trace("receiveNoWait got message " + m + " adding to queue");
/* 234 */ mesList.add(m);
/* */ }
/* */
/* 237 */ if (mesList.isEmpty())
/* */ {
/* 242 */ if (trace) log.trace(this + " attempting to get message with blocking receive (no timeout)");
/* */
/* 244 */ Message m = null;
/* */ try
/* */ {
/* 248 */ m = this.cons.receive(0L);
/* */ }
/* */ catch (JMSException e)
/* */ {
/* 253 */ if (!this.closed)
/* */ {
/* 255 */ throw e;
/* */ }
/* */ }
/* */
/* 259 */ if (m != null)
/* */ {
/* 261 */ if (trace) log.trace("receive (no timeout) got message " + m + " adding to queue");
/* 262 */ mesList.add(m);
/* */ }
/* */ else
/* */ {
/* 267 */ if (!trace) break; log.trace("blocking receive returned null, consumer must have closed"); break;
/* */ }
/* */ }
/* */
/* */ }
/* */
/* 273 */ if (mesList.isEmpty())
/* */ continue;
/* 275 */ if (trace) log.trace("there are " + mesList.size() + " messages to send to session");
/* */
/* 277 */ ServerSession serverSession = this.serverSessionPool.getServerSession();
/* 278 */ JBossSession session = (JBossSession)serverSession.getSession();
/* */
/* 280 */ MessageListener listener = session.getMessageListener();
/* */
/* 282 */ if (listener == null)
/* */ {
/* 285 */ if (trace) log.trace(this + ": session " + session + " did not have a set MessageListener");
/* */ }
/* */
/* 288 */ for (int i = 0; i < mesList.size(); i++)
/* */ {
/* 290 */ MessageProxy m = (MessageProxy)mesList.get(i);
/* 291 */ session.addAsfMessage(m, this.consumerID, this.queueName, this.maxDeliveries, this.sess, this.shouldAck);
/* 292 */ if (!trace) continue; log.trace("added " + m + " to session");
/* */ }
/* */
/* 295 */ if (trace) log.trace(this + " starting serverSession " + serverSession);
/* */
/* 297 */ serverSession.start();
/* */
/* 299 */ if (trace) log.trace(this + "'s serverSession processed messages");
/* */
/* 301 */ mesList.clear();
/* */ }
/* */
/* 304 */ if (trace) log.trace("ConnectionConsumer run() exiting");
/* */ }
/* */ catch (Throwable t)
/* */ {
/* 308 */ log.debug("Connection consumer closing due to error in listening thread " + this, t);
/* */ try
/* */ {
/* 313 */ doClose();
/* */ }
/* */ catch (JMSException e)
/* */ {
/* 317 */ log.error("Failed to close connection consumer", e);
/* */ }
/* */ }
/* */ }
/* */
/* */ protected synchronized void doClose() throws JMSException
/* */ {
/* 324 */ if (this.closed)
/* */ {
/* 326 */ return;
/* */ }
/* */
/* 329 */ this.closed = true;
/* */ try
/* */ {
/* 337 */ this.sess.closing(-1L);
/* 338 */ this.sess.close();
/* */ }
/* */ catch (Throwable t)
/* */ {
/* 344 */ log.trace("Failed to close session", t);
/* */ }
/* */
/* 347 */ if (trace) log.trace(this + "Closed message handler");
/* */ }
/* */
/* */ public String toString()
/* */ {
/* 354 */ return "JBossConnectionConsumer[" + this.consumerID + ", " + this.id + "]";
/* */ }
/* */ }
/* Location: /home/mnovotny/projects/EMBEDDED_JBOSS_BETA3_COMMUNITY/embedded/output/lib/embedded-jboss/lib/jboss-embedded-all.jar
* Qualified Name: org.jboss.jms.client.JBossConnectionConsumer
* JD-Core Version: 0.6.0
*/