Package org.springframework.amqp.rabbit.connection

Source Code of org.springframework.amqp.rabbit.connection.CachingConnectionFactoryIntegrationTests

/*
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.springframework.amqp.rabbit.connection;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import javax.net.ServerSocketFactory;
import javax.net.SocketFactory;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

import org.springframework.amqp.AmqpIOException;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory.CacheMode;
import org.springframework.amqp.rabbit.core.ChannelCallback;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.test.BrokerRunning;
import org.springframework.amqp.rabbit.test.BrokerTestUtils;
import org.springframework.amqp.utils.test.TestUtils;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;

/**
* @author Dave Syer
* @author Gunnar Hillert
* @author Gary Russell
* @since 1.0
*
*/
public class CachingConnectionFactoryIntegrationTests {

  private static Log logger = LogFactory.getLog(CachingConnectionFactoryIntegrationTests.class);

  private CachingConnectionFactory connectionFactory;

  @Rule
  public BrokerRunning brokerIsRunning = BrokerRunning.isRunning();

  @Rule
  public ExpectedException exception = ExpectedException.none();

  @Before
  public void open() {
    connectionFactory = new CachingConnectionFactory();
    connectionFactory.setHost("localhost");
    connectionFactory.setPort(BrokerTestUtils.getPort());
  }

  @After
  public void close() {
    connectionFactory.destroy();
  }

  @Test
  public void testCachedConnections() {
    connectionFactory.setCacheMode(CacheMode.CONNECTION);
    connectionFactory.setConnectionCacheSize(5);
    connectionFactory.setExecutor(Executors.newCachedThreadPool());
    List<Connection> connections = new ArrayList<Connection>();
    connections.add(connectionFactory.createConnection());
    connections.add(connectionFactory.createConnection());
    assertNotSame(connections.get(0), connections.get(1));
    connections.add(connectionFactory.createConnection());
    connections.add(connectionFactory.createConnection());
    connections.add(connectionFactory.createConnection());
    connections.add(connectionFactory.createConnection());
    Set<?> openConnections = TestUtils.getPropertyValue(connectionFactory, "openConnections", Set.class);
    assertEquals(6, openConnections.size());
    for (Connection connection : connections) {
      connection.close();
    }
    assertEquals(5, openConnections.size());
    BlockingQueue<?> idleConnections = TestUtils.getPropertyValue(connectionFactory, "idleConnections", BlockingQueue.class);
    assertEquals(5, idleConnections.size());
    connections.clear();
    connections.add(connectionFactory.createConnection());
    connections.add(connectionFactory.createConnection());
    assertEquals(5, openConnections.size());
    assertEquals(3, idleConnections.size());
    for (Connection connection : connections) {
      connection.close();
    }
  }

  @Test
  public void testCachedConnectionsAndChannels() throws Exception {
    connectionFactory.setCacheMode(CacheMode.CONNECTION);
    connectionFactory.setConnectionCacheSize(1);
    connectionFactory.setChannelCacheSize(3);
    List<Connection> connections = new ArrayList<Connection>();
    connections.add(connectionFactory.createConnection());
    connections.add(connectionFactory.createConnection());
    Set<?> openConnections = TestUtils.getPropertyValue(connectionFactory, "openConnections", Set.class);
    assertEquals(2, openConnections.size());
    assertNotSame(connections.get(0), connections.get(1));
    List<Channel> channels = new ArrayList<Channel>();
    for (int i = 0; i < 5; i++) {
      channels.add(connections.get(0).createChannel(false));
      channels.add(connections.get(1).createChannel(false));
      channels.add(connections.get(0).createChannel(true));
      channels.add(connections.get(1).createChannel(true));
    }
    @SuppressWarnings("unchecked")
    Map<?, List<?>> cachedChannels = TestUtils.getPropertyValue(connectionFactory, "openConnectionNonTransactionalChannels", Map.class);
    assertEquals(0, cachedChannels.get(connections.get(0)).size());
    assertEquals(0, cachedChannels.get(connections.get(1)).size());
    @SuppressWarnings("unchecked")
    Map<?, List<?>> cachedTxChannels = TestUtils.getPropertyValue(connectionFactory, "openConnectionTransactionalChannels", Map.class);
    assertEquals(0, cachedTxChannels.get(connections.get(0)).size());
    assertEquals(0, cachedTxChannels.get(connections.get(1)).size());
    for (Channel channel : channels) {
      channel.close();
    }
    assertEquals(3, cachedChannels.get(connections.get(0)).size());
    assertEquals(3, cachedChannels.get(connections.get(1)).size());
    assertEquals(3, cachedTxChannels.get(connections.get(0)).size());
    assertEquals(3, cachedTxChannels.get(connections.get(1)).size());
    for (int i = 0; i < 3; i++) {
      assertEquals(channels.get(i * 4), connections.get(0).createChannel(false));
      assertEquals(channels.get(i * 4 + 1), connections.get(1).createChannel(false));
      assertEquals(channels.get(i * 4 + 2), connections.get(0).createChannel(true));
      assertEquals(channels.get(i * 4 + 3), connections.get(1).createChannel(true));
    }
    assertEquals(0, cachedChannels.get(connections.get(0)).size());
    assertEquals(0, cachedChannels.get(connections.get(1)).size());
    assertEquals(0, cachedTxChannels.get(connections.get(0)).size());
    assertEquals(0, cachedTxChannels.get(connections.get(1)).size());
    for (Channel channel : channels) {
      channel.close();
    }
    for (Connection connection : connections) {
      connection.close();
    }
    assertEquals(3, cachedChannels.get(connections.get(0)).size());
    assertNull(cachedChannels.get(connections.get(1)));
    assertEquals(3, cachedTxChannels.get(connections.get(0)).size());
    assertNull(cachedTxChannels.get(connections.get(1)));

    assertEquals(1, openConnections.size());

    Connection connection = connectionFactory.createConnection();
    Connection rabbitConnection = TestUtils.getPropertyValue(connection, "target", Connection.class);
    rabbitConnection.close();
    Channel channel = connection.createChannel(false);
    assertEquals(1, openConnections.size());
    channel.close();
    connection.close();
    assertEquals(1, openConnections.size());
  }

  @Test
  public void testSendAndReceiveFromVolatileQueue() throws Exception {

    RabbitTemplate template = new RabbitTemplate(connectionFactory);

    RabbitAdmin admin = new RabbitAdmin(connectionFactory);
    Queue queue = admin.declareQueue();
    template.convertAndSend(queue.getName(), "message");
    String result = (String) template.receiveAndConvert(queue.getName());
    assertEquals("message", result);

  }

  @Test
  public void testReceiveFromNonExistentVirtualHost() throws Exception {

    connectionFactory.setVirtualHost("non-existent");
    RabbitTemplate template = new RabbitTemplate(connectionFactory);
    // Wrong vhost is very unfriendly to client - the exception has no clue (just an EOF)
    exception.expect(AmqpIOException.class);
    String result = (String) template.receiveAndConvert("foo");
    assertEquals("message", result);

  }

  @Test
  public void testSendAndReceiveFromVolatileQueueAfterImplicitRemoval() throws Exception {

    RabbitTemplate template = new RabbitTemplate(connectionFactory);

    RabbitAdmin admin = new RabbitAdmin(connectionFactory);
    Queue queue = admin.declareQueue();
    template.convertAndSend(queue.getName(), "message");

    // Force a physical close of the channel
    connectionFactory.destroy();

    // The queue was removed when the channel was closed
    exception.expect(AmqpIOException.class);

    String result = (String) template.receiveAndConvert(queue.getName());
    assertEquals("message", result);

  }

  @Test
  public void testMixTransactionalAndNonTransactional() throws Exception {

    RabbitTemplate template1 = new RabbitTemplate(connectionFactory);
    RabbitTemplate template2 = new RabbitTemplate(connectionFactory);
    template1.setChannelTransacted(true);

    RabbitAdmin admin = new RabbitAdmin(connectionFactory);
    Queue queue = admin.declareQueue();

    template1.convertAndSend(queue.getName(), "message");
    String result = (String) template2.receiveAndConvert(queue.getName());
    assertEquals("message", result);

    // The channel is not transactional
    exception.expect(AmqpIOException.class);

    template2.execute(new ChannelCallback<Void>() {
      @Override
      public Void doInRabbit(Channel channel) throws Exception {
        // Should be an exception because the channel is not transactional
        channel.txRollback();
        return null;
      }
    });

  }

  @Test
  public void testHardErrorAndReconnect() throws Exception {

    RabbitTemplate template = new RabbitTemplate(connectionFactory);
    RabbitAdmin admin = new RabbitAdmin(connectionFactory);
    Queue queue = new Queue("foo");
    admin.declareQueue(queue);
    final String route = queue.getName();

    final CountDownLatch latch = new CountDownLatch(1);
    try {
      template.execute(new ChannelCallback<Object>() {
        @Override
        public Object doInRabbit(Channel channel) throws Exception {
          channel.getConnection().addShutdownListener(new ShutdownListener() {
            @Override
            public void shutdownCompleted(ShutdownSignalException cause) {
              logger.info("Error", cause);
              latch.countDown();
              // This will be thrown on the Connection thread just before it dies, so basically ignored
              throw new RuntimeException(cause);
            }
          });
          String tag = channel.basicConsume(route, new DefaultConsumer(channel));
          // Consume twice with the same tag is a hard error (connection will be reset)
          String result = channel.basicConsume(route, false, tag, new DefaultConsumer(channel));
          fail("Expected IOException, got: " + result);
          return null;
        }
      });
      fail("Expected AmqpIOException");
    } catch (AmqpIOException e) {
      // expected
    }
    template.convertAndSend(route, "message");
    assertTrue(latch.await(1000, TimeUnit.MILLISECONDS));
    String result = (String) template.receiveAndConvert(route);
    assertEquals("message", result);
    result = (String) template.receiveAndConvert(route);
    assertEquals(null, result);
  }

  @Test
  @Ignore // Don't run this on the CI build server
  public void hangOnClose() throws Exception {
    final Socket proxy = SocketFactory.getDefault().createSocket("localhost", 5672);
    final ServerSocket server = ServerSocketFactory.getDefault().createServerSocket(2765);
    final AtomicBoolean hangOnClose = new AtomicBoolean();
    // create a simple proxy so we can drop the close response
    Executors.newSingleThreadExecutor().execute(new Runnable() {

      @Override
      public void run() {
        try {
          final Socket socket = server.accept();
          Executors.newSingleThreadExecutor().execute(new Runnable() {

            @Override
            public void run() {
              while (!socket.isClosed()) {
                try {
                  int c = socket.getInputStream().read();
                  if (c >= 0) {
                    proxy.getOutputStream().write(c);
                  }
                }
                catch (Exception e) {
                  try {
                    socket.close();
                    proxy.close();
                  }
                  catch (Exception ee) {}
                }
              }
            }
          });
          while (!proxy.isClosed()) {
            try {
              int c = proxy.getInputStream().read();
              if (c >= 0 && !hangOnClose.get()) {
                socket.getOutputStream().write(c);
              }
            }
            catch (Exception e) {
              try {
                socket.close();
                proxy.close();
              }
              catch (Exception ee) {}
            }
          }
          socket.close();
        }
        catch (Exception e) {
          e.printStackTrace();
        }
      }
    });
    CachingConnectionFactory factory = new CachingConnectionFactory(2765);
    factory.createConnection();
    hangOnClose.set(true);
    factory.destroy();
  }

}
TOP

Related Classes of org.springframework.amqp.rabbit.connection.CachingConnectionFactoryIntegrationTests

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.