Package org.springframework.amqp.rabbit.core

Source Code of org.springframework.amqp.rabbit.core.RabbitBindingIntegrationTests

/*
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.springframework.amqp.rabbit.core;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;

import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.RabbitAccessor;
import org.springframework.amqp.rabbit.listener.ActiveObjectCounter;
import org.springframework.amqp.rabbit.listener.BlockingQueueConsumer;
import org.springframework.amqp.rabbit.support.DefaultMessagePropertiesConverter;
import org.springframework.amqp.rabbit.test.BrokerRunning;
import org.springframework.amqp.rabbit.test.BrokerTestUtils;
import org.springframework.amqp.support.converter.SimpleMessageConverter;

import com.rabbitmq.client.Channel;

/**
* @author Dave Syer
* @author Gunnar Hillert
* @author Gary Russell
*/
public class RabbitBindingIntegrationTests {

  private static Queue queue = new Queue("test.queue");

  private CachingConnectionFactory connectionFactory;

  private RabbitTemplate template;

  @Rule
  public BrokerRunning brokerIsRunning = BrokerRunning.isRunningWithEmptyQueues(queue);

  @Before
  public void setup() {
    connectionFactory = new CachingConnectionFactory(BrokerTestUtils.getPort());
    connectionFactory.setHost("localhost");
    template = new RabbitTemplate(connectionFactory);
  }

  @After
  public void cleanUp() {
    if (connectionFactory != null) {
      connectionFactory.destroy();
    }
  }

  @Test
  public void testSendAndReceiveWithTopicSingleCallback() throws Exception {

    final RabbitAdmin admin = new RabbitAdmin(connectionFactory);
    final TopicExchange exchange = new TopicExchange("topic");
    admin.declareExchange(exchange);
    template.setExchange(exchange.getName());

    admin.declareBinding(BindingBuilder.bind(queue).to(exchange).with("*.end"));

    template.execute(new ChannelCallback<Void>() {
      @Override
      public Void doInRabbit(Channel channel) throws Exception {

        BlockingQueueConsumer consumer = createConsumer(template);
        String tag = consumer.getConsumerTag();
        assertNotNull(tag);

        template.convertAndSend("foo", "message");

        try {

          String result = getResult(consumer);
          assertEquals(null, result);

          template.convertAndSend("foo.end", "message");
          result = getResult(consumer);
          assertEquals("message", result);

        } finally {
          consumer.getChannel().basicCancel(tag);
        }

        return null;

      }

    });

  }

  @Test
  public void testSendAndReceiveWithNonDefaultExchange() throws Exception {

    final RabbitAdmin admin = new RabbitAdmin(connectionFactory);
    final TopicExchange exchange = new TopicExchange("topic");
    admin.declareExchange(exchange);

    admin.declareBinding(BindingBuilder.bind(queue).to(exchange).with("*.end"));

    template.execute(new ChannelCallback<Void>() {
      @Override
      public Void doInRabbit(Channel channel) throws Exception {

        BlockingQueueConsumer consumer = createConsumer(template);
        String tag = consumer.getConsumerTag();
        assertNotNull(tag);

        template.convertAndSend("topic", "foo", "message");

        try {

          String result = getResult(consumer);
          assertEquals(null, result);

          template.convertAndSend("topic", "foo.end", "message");
          result = getResult(consumer);
          assertEquals("message", result);

        } finally {
          consumer.getChannel().basicCancel(tag);
        }

        return null;

      }
    });

  }

  @Test
  // @Ignore("Not sure yet if we need to support a use case like this")
  public void testSendAndReceiveWithTopicConsumeInBackground() throws Exception {

    RabbitAdmin admin = new RabbitAdmin(connectionFactory);
    TopicExchange exchange = new TopicExchange("topic");
    admin.declareExchange(exchange);
    template.setExchange(exchange.getName());

    admin.declareBinding(BindingBuilder.bind(queue).to(exchange).with("*.end"));

    final CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
    cachingConnectionFactory.setHost("localhost");
    final RabbitTemplate template = new RabbitTemplate(cachingConnectionFactory);
    template.setExchange(exchange.getName());

    BlockingQueueConsumer consumer = template.execute(new ChannelCallback<BlockingQueueConsumer>() {
      @Override
      public BlockingQueueConsumer doInRabbit(Channel channel) throws Exception {

        BlockingQueueConsumer consumer = createConsumer(template);
        String tag = consumer.getConsumerTag();
        assertNotNull(tag);

        return consumer;

      }
    });

    template.convertAndSend("foo", "message");
    String result = getResult(consumer);
    assertEquals(null, result);

    template.convertAndSend("foo.end", "message");
    result = getResult(consumer);
    assertEquals("message", result);

    consumer.stop();
    cachingConnectionFactory.destroy();

  }

  @Test
  public void testSendAndReceiveWithTopicTwoCallbacks() throws Exception {

    RabbitAdmin admin = new RabbitAdmin(connectionFactory);
    TopicExchange exchange = new TopicExchange("topic");
    admin.declareExchange(exchange);
    template.setExchange(exchange.getName());

    admin.declareBinding(BindingBuilder.bind(queue).to(exchange).with("*.end"));

    template.execute(new ChannelCallback<Void>() {
      @Override
      public Void doInRabbit(Channel channel) throws Exception {

        BlockingQueueConsumer consumer = createConsumer(template);
        String tag = consumer.getConsumerTag();
        assertNotNull(tag);

        try {
          template.convertAndSend("foo", "message");
          String result = getResult(consumer);
          assertEquals(null, result);
        } finally {
          consumer.stop();
        }

        return null;

      }
    });

    template.execute(new ChannelCallback<Void>() {
      @Override
      public Void doInRabbit(Channel channel) throws Exception {

        BlockingQueueConsumer consumer = createConsumer(template);
        String tag = consumer.getConsumerTag();
        assertNotNull(tag);

        try {
          template.convertAndSend("foo.end", "message");
          String result = getResult(consumer);
          assertEquals("message", result);
        } finally {
          consumer.stop();
        }

        return null;

      }
    });

  }

  @Test
  public void testSendAndReceiveWithFanout() throws Exception {

    RabbitAdmin admin = new RabbitAdmin(connectionFactory);
    FanoutExchange exchange = new FanoutExchange("fanout");
    admin.declareExchange(exchange);
    template.setExchange(exchange.getName());

    admin.declareBinding(BindingBuilder.bind(queue).to(exchange));

    template.execute(new ChannelCallback<Void>() {
      @Override
      public Void doInRabbit(Channel channel) throws Exception {

        BlockingQueueConsumer consumer = createConsumer(template);
        String tag = consumer.getConsumerTag();
        assertNotNull(tag);

        try {
          template.convertAndSend("message");
          String result = getResult(consumer);
          assertEquals("message", result);
        } finally {
          consumer.stop();
        }

        return null;

      }
    });

  }

  private BlockingQueueConsumer createConsumer(RabbitAccessor accessor) {
    BlockingQueueConsumer consumer = new BlockingQueueConsumer(
        accessor.getConnectionFactory(), new DefaultMessagePropertiesConverter(),
        new ActiveObjectCounter<BlockingQueueConsumer>(), AcknowledgeMode.AUTO, true, 1, queue.getName());
    consumer.start();
    // wait for consumeOk...
    int n = 0;
    while (n++ < 100) {
      if (consumer.getConsumerTag() == null) {
        try {
          Thread.sleep(100);
        }
        catch (InterruptedException e) {
          Thread.currentThread().interrupt();
          break;
        }
      }
    }
    return consumer;
  }

  private String getResult(final BlockingQueueConsumer consumer) throws InterruptedException {
    Message response = consumer.nextMessage(2000L);
    if (response == null) {
      return null;
    }
    return (String) new SimpleMessageConverter().fromMessage(response);
  }
}
TOP

Related Classes of org.springframework.amqp.rabbit.core.RabbitBindingIntegrationTests

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.