this.template.setQueue(ROUTE);
this.template.setRoutingKey(ROUTE);
this.template.setReplyQueue(REPLY_QUEUE);
this.template.setReplyTimeout(10000);
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(this.template.getConnectionFactory());
container.setQueues(REPLY_QUEUE);
container.setMessageListener(this.template);
container.start();
int count = 10;
final Map<Double, Object> results = new HashMap<Double, Object>();
ExecutorService executor = Executors.newFixedThreadPool(10);
this.template.setCorrelationKey("CorrelationKey");
for (int i = 0; i < count; i++) {
executor.execute(new Runnable() {
@Override
public void run() {
Double request = Math.random() * 100;
Object reply = template.convertSendAndReceive(request);
results.put(request, reply);
}
});
}
for (int i = 0; i < count; i++) {
executor.execute(new Runnable() {
@Override
public void run() {
Double request = Math.random() * 100;
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_SERIALIZED_OBJECT);
Message reply = template.sendAndReceive(new Message(SerializationUtils.serialize(request), messageProperties));
results.put(request, SerializationUtils.deserialize(reply.getBody()));
}
});
}
final AtomicInteger receiveCount = new AtomicInteger();
long start = System.currentTimeMillis();
do {
template.receiveAndReply(new ReceiveAndReplyCallback<Double, Double>() {
@Override
public Double handle(Double payload) {
receiveCount.incrementAndGet();
return payload * 3;
}
});
if (System.currentTimeMillis() > start + 10000) {
fail("Something wrong with RabbitMQ");
}
} while (receiveCount.get() < count * 2);
executor.shutdown();
assertTrue(executor.awaitTermination(10, TimeUnit.SECONDS));
container.stop();
assertEquals(count * 2, results.size());
for (Map.Entry<Double, Object> entry : results.entrySet()) {
assertEquals(entry.getKey() * 3, entry.getValue());