Package org.jboss.test.messaging.jms

Source Code of org.jboss.test.messaging.jms.DLQTest$FailingMessageListener

/*
* JBoss, Home of Professional Open Source Copyright 2005, JBoss Inc., and individual contributors as indicated by the
* @authors tag. See the copyright.txt in the distribution for a full listing of individual contributors. This is free
* software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of the License, or (at your option) any later version.
* This software is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
* warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
* details. You should have received a copy of the GNU Lesser General Public License along with this software; if not,
* write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
* site: http://www.fsf.org.
*/
package org.jboss.test.messaging.jms;

import java.util.HashMap;
import java.util.Map;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.management.ObjectName;
import javax.naming.NameNotFoundException;

import org.jboss.jms.destination.JBossQueue;
import org.jboss.jms.message.JBossMessage;
import org.jboss.test.messaging.tools.ServerManagement;

/**
* A DLQTest
*
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
* @version <tt>$Revision: 5025 $</tt> $Id: DLQTest.java 5025 2008-09-25 08:42:18Z gaohoward $
*/
public class DLQTest extends JMSTestCase
{
   // Constants -----------------------------------------------------

   // Static --------------------------------------------------------

   // Attributes ----------------------------------------------------

   // Constructors --------------------------------------------------

   public DLQTest(String name)
   {
      super(name);
   }

   // Public --------------------------------------------------------

   public void testDLQAlreadyDeployed() throws Exception
   {
      if (ServerManagement.isRemote())
      {
         return;
      }

      try
      {
         ServerManagement.deployQueue("DLQ");

         ObjectName serverPeerObjectName = ServerManagement.getServerPeerObjectName();

         ObjectName dlqObjectName = (ObjectName)ServerManagement.getAttribute(serverPeerObjectName, "DefaultDLQ");

         assertNotNull(dlqObjectName);

         String name = (String)ServerManagement.getAttribute(dlqObjectName, "Name");

         assertNotNull(name);

         assertEquals("DLQ", name);

         String jndiName = (String)ServerManagement.getAttribute(dlqObjectName, "JNDIName");

         assertNotNull(jndiName);

         assertEquals("/queue/DLQ", jndiName);

         org.jboss.messaging.core.contract.Queue dlq = ServerManagement.getServer()
                                                                       .getServerPeer()
                                                                       .getDefaultDLQInstance();

         assertNotNull(dlq);

         JBossQueue q = (JBossQueue)ic.lookup("/queue/DLQ");

         assertNotNull(q);

         assertEquals("DLQ", q.getName());
      }
      finally
      {
         ServerManagement.undeployQueue("DLQ");
      }
   }

   public void testDLQNotAlreadyDeployed() throws Exception
   {
      if (ServerManagement.isRemote())
      {
         return;
      }

      org.jboss.messaging.core.contract.Queue dlq = ServerManagement.getServer()
                                                                    .getServerPeer()
                                                                    .getDefaultDLQInstance();

      assertNull(dlq);

      try
      {
         ic.lookup("/queue/DLQ");

         fail();
      }
      catch (NameNotFoundException e)
      {
         // Ok
      }
   }

   public void testDefaultAndOverrideDLQ() throws Exception
   {
      if (ServerManagement.isRemote())
      {
         return;
      }

      final int NUM_MESSAGES = 5;

      final int MAX_DELIVERIES = 8;

      ObjectName serverPeerObjectName = ServerManagement.getServerPeerObjectName();

      String testQueueObjectName = "jboss.messaging.destination:service=Queue,name=Queue1";

      Connection conn = null;

      try
      {
         String defaultDLQObjectName = "jboss.messaging.destination:service=Queue,name=Queue2";

         String overrideDLQObjectName = "jboss.messaging.destination:service=Queue,name=Queue3";

         ServerManagement.setAttribute(serverPeerObjectName,
                                       "DefaultMaxDeliveryAttempts",
                                       String.valueOf(MAX_DELIVERIES));

         ServerManagement.setAttribute(serverPeerObjectName, "DefaultDLQ", defaultDLQObjectName);

         ServerManagement.setAttribute(new ObjectName(testQueueObjectName), "DLQ", "");

         conn = cf.createConnection();

         {
            Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);

            MessageProducer prod = sess.createProducer(queue1);

            for (int i = 0; i < NUM_MESSAGES; i++)
            {
               TextMessage tm = sess.createTextMessage("Message:" + i);

               prod.send(tm);
            }

            Session sess2 = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);

            MessageConsumer cons = sess2.createConsumer(queue1);

            conn.start();

            for (int i = 0; i < MAX_DELIVERIES; i++)
            {
               for (int j = 0; j < NUM_MESSAGES; j++)
               {
                  TextMessage tm = (TextMessage)cons.receive(1000);

                  assertNotNull(tm);

                  assertEquals("Message:" + j, tm.getText());
               }

               sess2.recover();
            }

            // Prompt them to go to DLQ
            cons.receive(100);

            // At this point all the messages have been delivered exactly MAX_DELIVERIES times

            checkEmpty(queue1);

            // Now should be in default dlq

            MessageConsumer cons3 = sess.createConsumer(queue2);

            for (int i = 0; i < NUM_MESSAGES; i++)
            {
               TextMessage tm = (TextMessage)cons3.receive(1000);

               assertNotNull(tm);

               assertEquals("Message:" + i, tm.getText());
            }

            conn.close();
         }

         {
            // Now try with overriding the default dlq

            conn = cf.createConnection();

            ServerManagement.setAttribute(new ObjectName(testQueueObjectName), "DLQ", overrideDLQObjectName);

            Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);

            MessageProducer prod = sess.createProducer(queue1);

            for (int i = 0; i < NUM_MESSAGES; i++)
            {
               TextMessage tm = sess.createTextMessage("Message:" + i);

               prod.send(tm);
            }

            Session sess2 = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);

            MessageConsumer cons = sess2.createConsumer(queue1);

            conn.start();

            for (int i = 0; i < MAX_DELIVERIES; i++)
            {
               for (int j = 0; j < NUM_MESSAGES; j++)
               {
                  TextMessage tm = (TextMessage)cons.receive(1000);

                  assertNotNull(tm);

                  assertEquals("Message:" + j, tm.getText());
               }

               sess2.recover();
            }

            cons.receive(100);

            // At this point all the messages have been delivered exactly MAX_DELIVERIES times

            checkEmpty(queue1);

            // Now should be in override dlq

            MessageConsumer cons3 = sess.createConsumer(queue3);

            for (int i = 0; i < NUM_MESSAGES; i++)
            {
               TextMessage tm = (TextMessage)cons3.receive(1000);

               assertNotNull(tm);

               assertEquals("Message:" + i, tm.getText());
            }
         }
      }
      finally
      {
         ServerManagement.setAttribute(serverPeerObjectName,
                                       "DefaultDLQ",
                                       "jboss.messaging.destination:service=Queue,name=DLQ");

         ServerManagement.setAttribute(new ObjectName(testQueueObjectName), "DLQ", "");

         if (conn != null)
         {
            conn.close();
         }
      }
   }

   public void testWithMessageListenerPersistent() throws Exception
   {
      testWithMessageListener(true);
   }

   public void testWithMessageListenerNonPersistent() throws Exception
   {
      testWithMessageListener(false);
   }

   public void testWithReceiveClientAckPersistent() throws Exception
   {
      this.testWithReceiveClientAck(true);
   }

   public void testWithReceiveClientAckNonPersistent() throws Exception
   {
      testWithReceiveClientAck(false);
   }

   public void testWithReceiveTransactionalPersistent() throws Exception
   {
      this.testWithReceiveTransactional(true);
   }

   public void testWithReceiveTransactionalNonPersistent() throws Exception
   {
      testWithReceiveTransactional(false);
   }

   public void testHeadersSet() throws Exception
   {
      Connection conn = null;

      try
      {
         ObjectName serverPeerObjectName = ServerManagement.getServerPeerObjectName();

         ServerManagement.setAttribute(serverPeerObjectName,
                                       "DefaultDLQ",
                                       "jboss.messaging.destination:service=Queue,name=Queue2");

         final int MAX_DELIVERIES = 16;

         final int NUM_MESSAGES = 5;

         ServerManagement.setAttribute(serverPeerObjectName,
                                       "DefaultMaxDeliveryAttempts",
                                       String.valueOf(MAX_DELIVERIES));

         int maxRedeliveryAttempts = ((Integer)ServerManagement.getAttribute(serverPeerObjectName,
                                                                             "DefaultMaxDeliveryAttempts")).intValue();

         assertEquals(MAX_DELIVERIES, maxRedeliveryAttempts);

         conn = cf.createConnection();

         Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);

         MessageProducer prod = sess.createProducer(queue1);

         Map origIds = new HashMap();

         for (int i = 0; i < NUM_MESSAGES; i++)
         {
            TextMessage tm = sess.createTextMessage("Message:" + i);

            prod.send(tm);

            origIds.put(tm.getText(), tm.getJMSMessageID());
         }

         Session sess2 = conn.createSession(true, Session.SESSION_TRANSACTED);

         MessageConsumer cons = sess2.createConsumer(queue1);

         conn.start();

         for (int i = 0; i < MAX_DELIVERIES; i++)
         {
            for (int j = 0; j < NUM_MESSAGES; j++)
            {
               TextMessage tm = (TextMessage)cons.receive(1000);

               assertNotNull(tm);

               assertEquals("Message:" + j, tm.getText());
            }

            sess2.rollback();
         }

         // At this point all the messages have been delivered exactly MAX_DELIVERIES times - this is ok
         // they haven't exceeded max delivery attempts so shouldn't be in the DLQ - let's check

         checkEmpty(queue2);

         // So let's try and consume them - this should cause them to go to the DLQ - since they
         // will then exceed max delivery attempts
         Message m = cons.receive(100);

         assertNull(m);

         // All the messages should now be in the DLQ

         MessageConsumer cons3 = sess.createConsumer(queue2);

         for (int i = 0; i < NUM_MESSAGES; i++)
         {
            TextMessage tm = (TextMessage)cons3.receive(1000);

            assertNotNull(tm);

            assertEquals("Message:" + i, tm.getText());

            // Check the headers
            String origDest = tm.getStringProperty(JBossMessage.JBOSS_MESSAGING_ORIG_DESTINATION);

            String origMessageId = tm.getStringProperty(JBossMessage.JBOSS_MESSAGING_ORIG_MESSAGE_ID);

            assertEquals(queue1.toString(), origDest);

            String origId = (String)origIds.get(tm.getText());

            assertEquals(origId, origMessageId);
         }
      }
      finally
      {
         if (conn != null)
            conn.close();
      }
   }

   public void testOverrideDefaultMaxDeliveryAttemptsForQueue() throws Exception
   {
      int md = getDefaultMaxDeliveryAttempts();
      try
      {
         int maxDeliveryAttempts = md - 5;
         setMaxDeliveryAttempts(new ObjectName("jboss.messaging.destination:service=Queue,name=Queue1"),
                                maxDeliveryAttempts);
         testMaxDeliveryAttempts(queue1, maxDeliveryAttempts, true);
      }
      finally
      {
         setMaxDeliveryAttempts(new ObjectName("jboss.messaging.destination:service=Queue,name=Queue1"), md);
      }
   }

   public void testOverrideDefaultMaxDeliveryAttemptsForTopic() throws Exception
   {
      int md = getDefaultMaxDeliveryAttempts();
      try
      {
         int maxDeliveryAttempts = md - 5;
         setMaxDeliveryAttempts(new ObjectName("jboss.messaging.destination:service=Topic,name=Topic1"),
                                maxDeliveryAttempts);

         testMaxDeliveryAttempts(topic1, maxDeliveryAttempts, false);
      }
      finally
      {
         setMaxDeliveryAttempts(new ObjectName("jboss.messaging.destination:service=Queue,name=Queue1"), md);
      }
   }

   public void testUseDefaultMaxDeliveryAttemptsForQueue() throws Exception
   {
      int md = getDefaultMaxDeliveryAttempts();
      try
      {
         setMaxDeliveryAttempts(new ObjectName("jboss.messaging.destination:service=Queue,name=Queue1"), -1);

         // Check that defaultMaxDeliveryAttempts takes effect
         testMaxDeliveryAttempts(queue1, getDefaultMaxDeliveryAttempts(), true);
      }
      finally
      {
         setMaxDeliveryAttempts(new ObjectName("jboss.messaging.destination:service=Queue,name=Queue1"), md);
      }
   }

   public void testUseDefaultMaxDeliveryAttemptsForTopic() throws Exception
   {
      int md = getDefaultMaxDeliveryAttempts();
      try
      {
         setMaxDeliveryAttempts(new ObjectName("jboss.messaging.destination:service=Topic,name=Topic1"), -1);

         // Check that defaultMaxDeliveryAttempts takes effect
         testMaxDeliveryAttempts(topic1, getDefaultMaxDeliveryAttempts(), false);
      }
      finally
      {
         setMaxDeliveryAttempts(new ObjectName("jboss.messaging.destination:service=Queue,name=Queue1"), md);
      }
   }

   // https://jira.jboss.org/jira/browse/JBMESSAGING-1374
   public void testForwardReferencedDLQ() throws Exception
   {
      if (ServerManagement.isRemote())
      {
         return;
      }

      final int NUM_MESSAGES = 5;

      final int MAX_DELIVERIES = 8;

      ObjectName serverPeerObjectName = ServerManagement.getServerPeerObjectName();

      String testQueueName = "testQueue0925";
      String testTopicName = "testTopic0925";

      String testDLQName = "testDLQ0925";
      String testExpiryQName = "testExpiryQ0925";

      Connection conn = null;

      try
      {
         ServerManagement.deployQueue("DLQ0925");
         ServerManagement.deployQueue("ExpiryQueue0925");
        
         ServerManagement.setAttribute(serverPeerObjectName, "DefaultDLQ", "jboss.messaging.destination:service=Queue,name=DLQ0925");

         ObjectName myQueue = ServerManagement.deployQueueWithDLQnExpiryQ(testQueueName, testDLQName, testExpiryQName);
         ObjectName myTopic = ServerManagement.deployTopicWithDLQnExpiryQ(testTopicName, testDLQName, testExpiryQName);
         ObjectName myDLQ = ServerManagement.deployQueueWithoutStart(testDLQName);
         ObjectName myExpiryQueue = ServerManagement.deployQueueWithoutStart(testExpiryQName);

         assertNotNull(myQueue);
         assertNotNull(myTopic);
         assertNotNull(myDLQ);
         assertNotNull(myExpiryQueue);

         ServerManagement.startDestinationService(new ObjectName[] { myQueue, myTopic, myDLQ, myExpiryQueue });

         ServerManagement.setAttribute(serverPeerObjectName,
                                       "DefaultMaxDeliveryAttempts",
                                       String.valueOf(MAX_DELIVERIES));

         conn = cf.createConnection();

         {
            Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);

            Queue myQRef = (Queue)ic.lookup("/queue/testQueue0925");

            MessageProducer prod = sess.createProducer(myQRef);

            for (int i = 0; i < NUM_MESSAGES; i++)
            {
               TextMessage tm = sess.createTextMessage("Message:" + i);

               prod.send(tm);
            }

            Session sess2 = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);

            MessageConsumer cons = sess2.createConsumer(myQRef);

            conn.start();

            for (int i = 0; i < MAX_DELIVERIES; i++)
            {
               for (int j = 0; j < NUM_MESSAGES; j++)
               {
                  TextMessage tm = (TextMessage)cons.receive(1000);

                  assertNotNull(tm);

                  assertEquals("Message:" + j, tm.getText());
               }

               sess2.recover();
            }

            // Prompt them to go to DLQ
            cons.receive(100);

            // At this point all the messages have been delivered exactly MAX_DELIVERIES times

            checkEmpty(myQRef);

            // Now should be in the dlq
            Queue myDLQRef = (Queue)ic.lookup("/queue/testDLQ0925");
            MessageConsumer cons3 = sess.createConsumer(myDLQRef);

            for (int i = 0; i < NUM_MESSAGES; i++)
            {
               TextMessage tm = (TextMessage)cons3.receive(1000);

               assertNotNull(tm);

               assertEquals("Message:" + i, tm.getText());
            }

            conn.close();
         }
      }
      finally
      {
         ServerManagement.setAttribute(serverPeerObjectName,
                                       "DefaultDLQ",
                                       "jboss.messaging.destination:service=Queue,name=DLQ");

         ServerManagement.undeployQueue("DLQ0925");
         ServerManagement.undeployQueue("ExpiryQueue0925");
         ServerManagement.undeployQueue("testQueue0925");
         ServerManagement.undeployQueue("testDLQ0925");
         ServerManagement.undeployQueue("testExpiryQ0925");
         ServerManagement.undeployTopic("testTopic0925");
        
         if (conn != null)
         {
            conn.close();
         }
      }
   }

   // Package protected ---------------------------------------------

   // Protected -----------------------------------------------------

   protected void testWithMessageListener(boolean persistent) throws Exception
   {
      Connection conn = null;

      try
      {
         ObjectName serverPeerObjectName = ServerManagement.getServerPeerObjectName();

         final int MAX_DELIVERIES = 16;

         final int NUM_MESSAGES = 5;

         ServerManagement.setAttribute(serverPeerObjectName,
                                       "DefaultMaxDeliveryAttempts",
                                       String.valueOf(MAX_DELIVERIES));

         String defaultDLQObjectName = "jboss.messaging.destination:service=Queue,name=Queue2";

         ServerManagement.setAttribute(serverPeerObjectName, "DefaultDLQ", defaultDLQObjectName);

         int maxRedeliveryAttempts = ((Integer)ServerManagement.getAttribute(serverPeerObjectName,
                                                                             "DefaultMaxDeliveryAttempts")).intValue();

         assertEquals(MAX_DELIVERIES, maxRedeliveryAttempts);

         conn = cf.createConnection();

         Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);

         MessageProducer prod = sess.createProducer(queue1);

         prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);

         for (int i = 0; i < NUM_MESSAGES; i++)
         {
            TextMessage tm = sess.createTextMessage("Message:" + i);

            prod.send(tm);
         }

         MessageConsumer cons = sess.createConsumer(queue1);

         FailingMessageListener listener = new FailingMessageListener(MAX_DELIVERIES * NUM_MESSAGES);

         cons.setMessageListener(listener);

         conn.start();

         listener.waitForMessages();

         assertEquals(MAX_DELIVERIES * NUM_MESSAGES, listener.deliveryCount);

         // Message should all be in the dlq - let's check

         MessageConsumer cons2 = sess.createConsumer(queue2);

         for (int i = 0; i < NUM_MESSAGES; i++)
         {
            TextMessage tm = (TextMessage)cons2.receive(1000);

            assertNotNull(tm);

            log.info("Got mnessage" + tm);

            assertEquals("Message:" + i, tm.getText());
         }

         checkEmpty(queue1);
      }
      finally
      {
         if (conn != null)
            conn.close();
      }
   }

   protected void testWithReceiveClientAck(boolean persistent) throws Exception
   {
      Connection conn = null;

      try
      {
         ObjectName serverPeerObjectName = ServerManagement.getServerPeerObjectName();

         final int MAX_DELIVERIES = 16;

         final int NUM_MESSAGES = 5;

         String defaultDLQObjectName = "jboss.messaging.destination:service=Queue,name=Queue2";

         ServerManagement.setAttribute(serverPeerObjectName, "DefaultDLQ", defaultDLQObjectName);

         ServerManagement.setAttribute(serverPeerObjectName,
                                       "DefaultMaxDeliveryAttempts",
                                       String.valueOf(MAX_DELIVERIES));

         int maxRedeliveryAttempts = ((Integer)ServerManagement.getAttribute(serverPeerObjectName,
                                                                             "DefaultMaxDeliveryAttempts")).intValue();

         assertEquals(MAX_DELIVERIES, maxRedeliveryAttempts);

         conn = cf.createConnection();

         Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);

         MessageProducer prod = sess.createProducer(queue1);

         prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);

         for (int i = 0; i < NUM_MESSAGES; i++)
         {
            TextMessage tm = sess.createTextMessage("Message:" + i);

            prod.send(tm);
         }

         Session sess2 = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);

         MessageConsumer cons = sess2.createConsumer(queue1);

         conn.start();

         for (int i = 0; i < MAX_DELIVERIES; i++)
         {
            for (int j = 0; j < NUM_MESSAGES; j++)
            {
               TextMessage tm = (TextMessage)cons.receive(1000);

               assertNotNull(tm);

               assertEquals("Message:" + j, tm.getText());
            }

            sess2.recover();
         }

         // At this point all the messages have been delivered exactly MAX_DELIVERIES times - this is ok
         // they haven't exceeded max delivery attempts so shouldn't be in the DLQ - let's check

         checkEmpty(queue2);

         // So let's try and consume them - this should cause them to go to the DLQ - since they will then exceed max
         // delivery attempts

         Message m = cons.receive(100);

         assertNull(m);

         // Now, all the messages should now be in the DLQ

         MessageConsumer cons3 = sess.createConsumer(queue2);

         for (int i = 0; i < NUM_MESSAGES; i++)
         {
            TextMessage tm = (TextMessage)cons3.receive(1000);

            assertNotNull(tm);

            assertEquals("Message:" + i, tm.getText());
         }

         // No more should be available

         cons.close();

         checkEmpty(queue1);
      }
      finally
      {
         ServerManagement.undeployQueue("DLQ");

         if (conn != null)
            conn.close();
      }
   }

   protected void testWithReceiveTransactional(boolean persistent) throws Exception
   {
      Connection conn = null;

      try
      {
         ObjectName serverPeerObjectName = ServerManagement.getServerPeerObjectName();

         final int MAX_DELIVERIES = 16;

         final int NUM_MESSAGES = 5;

         ServerManagement.setAttribute(serverPeerObjectName,
                                       "DefaultMaxDeliveryAttempts",
                                       String.valueOf(MAX_DELIVERIES));

         String defaultDLQObjectName = "jboss.messaging.destination:service=Queue,name=Queue2";

         ServerManagement.setAttribute(serverPeerObjectName, "DefaultDLQ", defaultDLQObjectName);

         int maxRedeliveryAttempts = ((Integer)ServerManagement.getAttribute(serverPeerObjectName,
                                                                             "DefaultMaxDeliveryAttempts")).intValue();

         assertEquals(MAX_DELIVERIES, maxRedeliveryAttempts);

         conn = cf.createConnection();

         Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);

         MessageProducer prod = sess.createProducer(queue1);

         prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);

         for (int i = 0; i < NUM_MESSAGES; i++)
         {
            TextMessage tm = sess.createTextMessage("Message:" + i);

            prod.send(tm);
         }

         Session sess2 = conn.createSession(true, Session.SESSION_TRANSACTED);

         MessageConsumer cons = sess2.createConsumer(queue1);

         conn.start();

         for (int i = 0; i < MAX_DELIVERIES; i++)
         {
            for (int j = 0; j < NUM_MESSAGES; j++)
            {
               TextMessage tm = (TextMessage)cons.receive(1000);

               assertNotNull(tm);

               assertEquals("Message:" + j, tm.getText());
            }

            sess2.rollback();
         }

         // At this point all the messages have been delivered exactly MAX_DELIVERIES times - this is ok
         // they haven't exceeded max delivery attempts so shouldn't be in the DLQ - let's check

         checkEmpty(queue2);

         // So let's try and consume them - this should cause them to go to the DLQ - since they will then exceed max
         // delivery attempts
         Message m = cons.receive(100);

         assertNull(m);

         // All the messages should now be in the DLQ

         MessageConsumer cons3 = sess.createConsumer(queue2);

         for (int i = 0; i < NUM_MESSAGES; i++)
         {
            TextMessage tm = (TextMessage)cons3.receive(1000);

            assertNotNull(tm);

            assertEquals("Message:" + i, tm.getText());
         }

         // No more should be available

         checkEmpty(queue1);
      }
      finally
      {
         ServerManagement.undeployQueue("DLQ");

         if (conn != null)
            conn.close();
      }
   }

   protected int getDefaultMaxDeliveryAttempts() throws Exception
   {
      return ((Integer)ServerManagement.getAttribute(ServerManagement.getServerPeerObjectName(),
                                                     "DefaultMaxDeliveryAttempts")).intValue();
   }

   protected void setMaxDeliveryAttempts(ObjectName dest, int maxDeliveryAttempts) throws Exception
   {
      ServerManagement.setAttribute(dest, "MaxDeliveryAttempts", Integer.toString(maxDeliveryAttempts));
   }

   protected void testMaxDeliveryAttempts(Destination destination, int destMaxDeliveryAttempts, boolean queue) throws Exception
   {
      Connection conn = cf.createConnection();

      if (!queue)
      {
         conn.setClientID("wib123");
      }

      try
      {
         ServerManagement.setAttribute(ServerManagement.getServerPeerObjectName(),
                                       "DefaultDLQ",
                                       "jboss.messaging.destination:service=Queue,name=Queue2");

         // Create the consumer before the producer so that the message we send doesn't
         // get lost if the destination is a Topic.
         Session consumingSession = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
         MessageConsumer destinationConsumer;

         if (queue)
         {
            destinationConsumer = consumingSession.createConsumer(destination);
         }
         else
         {
            // For topics we only keep a delivery record on the server side for durable subs
            destinationConsumer = consumingSession.createDurableSubscriber((Topic)destination, "testsub1");
         }

         {
            Session producingSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
            MessageProducer prod = producingSession.createProducer(destination);
            TextMessage tm = producingSession.createTextMessage("Message");
            prod.send(tm);
         }

         conn.start();

         // Make delivery attempts up to the maximum. The message should not end up in the DLQ.
         for (int i = 0; i < destMaxDeliveryAttempts; i++)
         {
            TextMessage tm = (TextMessage)destinationConsumer.receive(1000);
            assertNotNull("No message received on delivery attempt number " + (i + 1), tm);
            assertEquals("Message", tm.getText());
            consumingSession.recover();
         }

         // At this point the message should not yet be in the DLQ
         checkEmpty(queue2);

         // Now we try to consume the message again from the destination, which causes it
         // to go to the DLQ instead.
         Message m = destinationConsumer.receive(100);
         assertNull(m);

         // The message should be in the DLQ now
         MessageConsumer dlqConsumer = consumingSession.createConsumer(queue2);
         m = dlqConsumer.receive(1000);
         assertNotNull(m);
         assertTrue(m instanceof TextMessage);
         assertEquals("Message", ((TextMessage)m).getText());

         m.acknowledge();

         if (!queue)
         {
            destinationConsumer.close();

            consumingSession.unsubscribe("testsub1");
         }
      }
      finally
      {
         if (conn != null)
         {
            conn.close();
         }
      }
   }

   // Private -------------------------------------------------------

   // Inner classes -------------------------------------------------

   class FailingMessageListener implements MessageListener
   {
      volatile int deliveryCount;

      int numMessages;

      FailingMessageListener(int numMessages)
      {
         this.numMessages = numMessages;
      }

      synchronized void waitForMessages() throws Exception
      {
         while (deliveryCount != numMessages)
         {
            this.wait();
         }
      }

      public synchronized void onMessage(Message msg)
      {
         deliveryCount++;

         this.notify();

         throw new RuntimeException("Your mum!");
      }

   }

}
TOP

Related Classes of org.jboss.test.messaging.jms.DLQTest$FailingMessageListener

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.