Package org.springframework.amqp.rabbit.retry

Source Code of org.springframework.amqp.rabbit.retry.MissingIdRetryTests

/*
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*      http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.amqp.rabbit.retry;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.aopalliance.aop.Advice;
import org.junit.Rule;
import org.junit.Test;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.config.StatefulRetryOperationsInterceptorFactoryBean;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.amqp.rabbit.test.BrokerRunning;
import org.springframework.beans.DirectFieldAccessor;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.retry.policy.MapRetryContextCache;
import org.springframework.retry.policy.RetryContextCache;
import org.springframework.retry.support.RetryTemplate;

/**
* @author Gary Russell
* @since 1.1.2
*
*/
public class MissingIdRetryTests {

  private volatile CountDownLatch latch;

  @Rule
  public BrokerRunning brokerIsRunning = BrokerRunning.isRunning();

  @SuppressWarnings("rawtypes")
  @Test
  public void testWithNoId() throws Exception {
    // 2 messsages; each retried once by missing id interceptor
    this.latch = new CountDownLatch(4);
    ConfigurableApplicationContext ctx = new ClassPathXmlApplicationContext("retry-context.xml", this.getClass());
    RabbitTemplate template = ctx.getBean(RabbitTemplate.class);
    ConnectionFactory connectionFactory = ctx.getBean(ConnectionFactory.class);
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
    container.setMessageListener(new MessageListenerAdapter(new POJO()));
    container.setQueueNames("retry.test.queue");

    StatefulRetryOperationsInterceptorFactoryBean fb = new StatefulRetryOperationsInterceptorFactoryBean();

    // use an external template so we can share his cache
    RetryTemplate retryTemplate = new RetryTemplate();
    RetryContextCache cache = new MapRetryContextCache();
    retryTemplate.setRetryContextCache(cache);
    fb.setRetryOperations(retryTemplate);

    // give him a reference to the retry cache so he can clean it up
    MissingMessageIdAdvice missingIdAdvice = new MissingMessageIdAdvice(cache);

    Advice retryInterceptor = fb.getObject();
    // add both advices
    container.setAdviceChain(new Advice[] {missingIdAdvice, retryInterceptor});
    container.start();

    template.convertAndSend("retry.test.exchange", "retry.test.binding", "Hello, world!");
    template.convertAndSend("retry.test.exchange", "retry.test.binding", "Hello, world!");
    assertTrue(latch.await(10, TimeUnit.SECONDS));
    Thread.sleep(2000);
    assertEquals(0, ((Map) new DirectFieldAccessor(cache).getPropertyValue("map")).size());
    container.stop();
    ctx.close();
  }

  @SuppressWarnings("rawtypes")
  @Test
  public void testWithId() throws Exception {
    // 2 messsages; each retried twice by retry interceptor
    this.latch = new CountDownLatch(6);
    ConfigurableApplicationContext ctx = new ClassPathXmlApplicationContext("retry-context.xml", this.getClass());
    RabbitTemplate template = ctx.getBean(RabbitTemplate.class);
    ConnectionFactory connectionFactory = ctx.getBean(ConnectionFactory.class);
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
    container.setMessageListener(new MessageListenerAdapter(new POJO()));
    container.setQueueNames("retry.test.queue");

    StatefulRetryOperationsInterceptorFactoryBean fb = new StatefulRetryOperationsInterceptorFactoryBean();

    // use an external template so we can share his cache
    RetryTemplate retryTemplate = new RetryTemplate();
    RetryContextCache cache = new MapRetryContextCache();
    retryTemplate.setRetryContextCache(cache);
    fb.setRetryOperations(retryTemplate);
    fb.setMessageRecoverer(new RejectAndDontRequeueRecoverer());

    // give him a reference to the retry cache so he can clean it up
    MissingMessageIdAdvice missingIdAdvice = new MissingMessageIdAdvice(cache);

    Advice retryInterceptor = fb.getObject();
    // add both advices
    container.setAdviceChain(new Advice[] {missingIdAdvice, retryInterceptor});
    container.start();

    MessageProperties messageProperties = new MessageProperties();
    messageProperties.setContentType("text/plain");
    messageProperties.setMessageId("foo");
    Message message = new Message("Hello, world!".getBytes(), messageProperties);
    template.send("retry.test.exchange", "retry.test.binding", message);
    template.send("retry.test.exchange", "retry.test.binding", message);
    assertTrue(latch.await(10, TimeUnit.SECONDS));
    Thread.sleep(2000);
    assertEquals(0, ((Map) new DirectFieldAccessor(cache).getPropertyValue("map")).size());
    container.stop();
    ctx.close();
  }

  public class POJO {
    public void handleMessage(String foo) {
      latch.countDown();
      throw new RuntimeException("fail");
    }
  }
}
TOP

Related Classes of org.springframework.amqp.rabbit.retry.MissingIdRetryTests

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.