te connection and channel. {@link ConnectionFactory} factory = new ConnectionFactory();Connection conn = factory.newConnection(); {@link Channel} ch1 = conn.createChannel();// Declare a queue and bind it to an exchange. String queueName = ch1.queueDeclare(). {@link AMQP.Queue.DeclareOk#getQueue getQueue}(); ch1. {@link Channel#queueBind queueBind}(queueName, exchangeName, queueName); // Create the QueueingConsumer and have it consume from the queue QueueingConsumer consumer = new {@link QueueingConsumer#QueueingConsumer QueueingConsumer}(ch1); ch1. {@link Channel#basicConsume basicConsume}(queueName, false, consumer); // Process deliveries while (/* some condition * /) { {@link QueueingConsumer.Delivery} delivery = consumer.{@link QueueingConsumer#nextDelivery nextDelivery}(); // process delivery ch1. {@link Channel#basicAck basicAck}(delivery. {@link QueueingConsumer.Delivery#getEnvelope getEnvelope}(). {@link Envelope#getDeliveryTag getDeliveryTag}(), false); }
For a more complete example, see LogTail in the test/src/com/rabbitmq/examples directory of the source distribution.
deprecated QueueingConsumer was introduced to allow applications to overcome a limitation in the way Connection managed threads and consumer dispatching. When QueueingConsumer was introduced, callbacks to Consumers were made on the Connection's thread. This had two main drawbacks. Firstly, the Consumer could stall the processing of all Channels on the Connection. Secondly, if a Consumer made a recursive synchronous call into its Channel the client would deadlock. QueueingConsumer provided client code with an easy way to obviate this problem by queueing incoming messages and processing them on a separate, application-managed thread.
The threading behaviour of
Connection and
Channel has been changed so that each
Channel uses a distinct thread for dispatching to
Consumers. This prevents
Consumers on one
Channel holding up
Consumers on another and it also prevents recursive calls from deadlocking the client.
As such, it is now safe to implement
Consumer directly or to extend
DefaultConsumer.