/*
* JBoss, Home of Professional Open Source
* Copyright 2006, JBoss Inc., and others contributors as indicated
* by the @authors tag. All rights reserved.
* See the copyright.txt in the distribution for a
* full listing of individual contributors.
* This copyrighted material is made available to anyone wishing to use,
* modify, copy, or redistribute it subject to the terms and conditions
* of the GNU Lesser General Public License, v. 2.1.
* This program is distributed in the hope that it will be useful, but WITHOUT A
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
* PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
* You should have received a copy of the GNU Lesser General Public License,
* v.2.1 along with this distribution; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
* MA 02110-1301, USA.
*
* (C) 2005-2006, JBoss Inc.
*/
package org.jboss.soa.esb.actions.aggregation;
import junit.framework.TestCase;
import org.jboss.internal.soa.esb.couriers.MockCourierFactory;
import org.jboss.internal.soa.esb.services.registry.MockRegistry;
import org.jboss.soa.esb.ConfigurationException;
import org.jboss.soa.esb.actions.*;
import org.jboss.soa.esb.client.ServiceInvoker;
import org.jboss.soa.esb.helpers.ConfigTree;
import org.jboss.soa.esb.listeners.message.MessageDeliverException;
import org.jboss.soa.esb.message.Message;
import org.jboss.soa.esb.message.format.MessageFactory;
import org.jboss.soa.esb.services.registry.RegistryException;
import org.jboss.soa.esb.testutils.ESBConfigUtil;
import org.xml.sax.SAXException;
import java.util.Map;
import java.io.IOException;
/**
* Tests for JBESB-1201 re timeout management.
* <p/>
* Make sure the message aggregation info flows in the following scenario...
* <pre>
*
* |----- service1 -----|
* | |
* splitter --| |-- aggregator
* | |
* |----- service2 -----|
*
* </pre>
*
* @author <a href="mailto:tom.fennelly@gmail.com">tom.fennelly@gmail.com</a>
*/
public class JBESB_1201_UnitTest extends TestCase {
private TestCourier service1Courier;
private TestCourier service2Courier;
private TestCourier aggregatorCourier;
private TestCourier dlqServiceCourier;
private StaticRouter splitter;
private StaticRouter service1;
private StaticRouter service2;
private Aggregator aggregator;
protected void setUp() throws Exception {
MockCourierFactory.install();
MockRegistry.install();
service1Courier = new TestCourier();
service2Courier = new TestCourier();
aggregatorCourier = new TestCourier();
dlqServiceCourier = new TestCourier();
MockRegistry.register("test", "service1", service1Courier);
MockRegistry.register("test", "service2", service2Courier);
MockRegistry.register("test", "aggregator", aggregatorCourier);
MockRegistry.register(ServiceInvoker.INTERNAL_SERVICE_CATEGORY, ServiceInvoker.DEAD_LETTER_SERVICE_NAME, dlqServiceCourier);
initaliseServices();
}
private void initaliseServices() throws ConfigurationException, RegistryException, ActionLifecycleException, IOException, SAXException {
ESBConfigUtil esbConfig = new ESBConfigUtil(getClass().getResourceAsStream("action-configs-01.xml"));
ConfigTree splitterConfig = esbConfig.getActionConfig("null-listener", "splitter1-action");
ConfigTree service1Config = esbConfig.getActionConfig("null-listener", "service1-config");
ConfigTree service2Config = esbConfig.getActionConfig("null-listener", "service2-config");
ConfigTree aggregatorConfig = esbConfig.getActionConfig("null-listener", "aggregator-config");
splitter = new StaticRouter(splitterConfig);
splitter.initialise();
service1 = new StaticRouter(service1Config);
service1.initialise();
service2 = new StaticRouter(service2Config);
service2.initialise();
aggregator = new Aggregator(aggregatorConfig);
aggregator.initialise();
}
protected void tearDown() throws Exception {
splitter.destroy();
service1.destroy();
service2.destroy();
aggregator.destroy();
MockRegistry.uninstall();
MockCourierFactory.uninstall();
}
public void test_not_timed_out() throws RegistryException, ConfigurationException, ActionProcessingException, MessageDeliverException {
Message messageIn = MessageFactory.getInstance().getMessage();
// Get the aggregators message map and make sure it's empty...
Map<String, Map<Integer, Message>> aggrMessageMap = aggregator.getAggregatedMessageMap();
assertTrue(aggrMessageMap.isEmpty());
// Manually deliver the message to the splitter service...
splitter.process(messageIn);
AggregationDetails service1Message = Aggregator.getAggregatorDetails(service1Courier.messages.get(0), 0, false);
assertNotNull(service1Message);
// Manually deliver the message in service1Courier into service1...
service1.process(service1Courier.messages.get(0));
// Manually deliver the message in service2Courier into service2...
service2.process(service2Courier.messages.get(0));
// 2 messages should arrive at the aggregatorCourier...
assertEquals(2, aggregatorCourier.messages.size());
// Aggregators message map should be empty...
assertTrue(aggrMessageMap.isEmpty());
// Pump the 1st message into the aggregater...
aggregator.process(aggregatorCourier.messages.get(0));
assertService1MessageDetailsOK(aggrMessageMap, service1Message);
// Pump the 2nd (last) message into the aggregater...
Message aggregateMessage = aggregator.process(aggregatorCourier.messages.get(1));
assertNotNull(aggregateMessage);
assertEquals(2, aggregateMessage.getAttachment().getUnnamedCount());
// Make sure the aggregators message map is empty again...
assertTrue(aggrMessageMap.isEmpty());
}
public void test_timed_out_non_delivered() throws RegistryException, ConfigurationException, ActionProcessingException, MessageDeliverException, InterruptedException {
Message messageIn = MessageFactory.getInstance().getMessage();
// Get the aggregators message map and make sure it's empty...
Map<String, Map<Integer, Message>> aggrMessageMap = aggregator.getAggregatedMessageMap();
assertTrue(aggrMessageMap.isEmpty());
// Manually deliver the message to the splitter service...
splitter.process(messageIn);
AggregationDetails service1Message = Aggregator.getAggregatorDetails(service1Courier.messages.get(0), 0, false);
assertNotNull(service1Message);
// The aggregator timeout is 2000ms... sleep here for 3000ms to force all
// messages delivered to the aggregator to be timed out - should all be
// ignored...
Thread.sleep(3000);
// Manually deliver the message in service1Courier into service1...
service1.process(service1Courier.messages.get(0));
// Manually deliver the message in service2Courier into service2...
service2.process(service2Courier.messages.get(0));
// 2 messages should arrive at the aggregatorCourier...
assertEquals(2, aggregatorCourier.messages.size());
// Aggregators message map should be empty...
assertTrue(aggrMessageMap.isEmpty());
// Pump the 1st message into the aggregater...
aggregator.process(aggregatorCourier.messages.get(0));
// Aggregators message map should still be empty because the message should
// have been ignored...
assertTrue(aggrMessageMap.isEmpty());
// Pump the 2nd (last) message into the aggregater...
aggregator.process(aggregatorCourier.messages.get(1));
// Aggregators message map should still be empty because the message should
// have been ignored...
assertTrue(aggrMessageMap.isEmpty());
}
public void test_timed_out_some_delivered() throws RegistryException, ConfigurationException, ActionProcessingException, MessageDeliverException, InterruptedException {
Message messageIn = MessageFactory.getInstance().getMessage();
// Get the aggregators message map and make sure it's empty...
Map<String, Map<Integer, Message>> aggrMessageMap = aggregator.getAggregatedMessageMap();
assertTrue(aggrMessageMap.isEmpty());
// Manually deliver the message to the splitter service...
splitter.process(messageIn);
AggregationDetails service1Message = Aggregator.getAggregatorDetails(service1Courier.messages.get(0), 0, false);
assertNotNull(service1Message);
// Manually deliver the message in service1Courier into service1...
service1.process(service1Courier.messages.get(0));
// 1 messages should arrive at the aggregatorCourier...
assertEquals(1, aggregatorCourier.messages.size());
// Aggregators message map should be empty...
assertTrue(aggrMessageMap.isEmpty());
// Pump the 1st message into the aggregater and make sure it's
// details are added properly to the map...
aggregator.process(aggregatorCourier.messages.get(0));
assertService1MessageDetailsOK(aggrMessageMap, service1Message);
// The deadServiceCourier should be empty...
assertEquals(0, dlqServiceCourier.messages.size());
// The aggregator timeout is 2000ms... sleep here for 3000ms to force the 2nd
// message delivered to the aggregator to be timed out...
Thread.sleep(3000);
// Should have timed out and "notified" on the first message...
assertEquals(1, dlqServiceCourier.messages.size());
// Aggregators message map should be empty again...
assertTrue(aggrMessageMap.isEmpty());
// Manually deliver the message in service2Courier into service2...
service2.process(service2Courier.messages.get(0));
// 2 messages should be at the aggregatorCourier...
assertEquals(2, aggregatorCourier.messages.size());
// Pump the 2nd message into the aggregater...
Message aggregateMessage = aggregator.process(aggregatorCourier.messages.get(1));
assertNull(aggregateMessage);
// Aggregators message map should be empty because the message should
// have been ignored...
assertTrue(aggrMessageMap.isEmpty());
}
public void test_timeoutchecker() throws RegistryException, ConfigurationException, ActionProcessingException, MessageDeliverException, InterruptedException {
Message messageIn = MessageFactory.getInstance().getMessage();
// Get the aggregators message map and make sure it's empty...
Map<String, Map<Integer, Message>> aggrMessageMap = aggregator.getAggregatedMessageMap();
assertTrue(aggrMessageMap.isEmpty());
// Manually deliver the message to the splitter service...
splitter.process(messageIn);
AggregationDetails service1Message = Aggregator.getAggregatorDetails(service1Courier.messages.get(0), 0, false);
assertNotNull(service1Message);
// Manually deliver the message in service1Courier into service1...
service1.process(service1Courier.messages.get(0));
// 1 messages should arrive at the aggregatorCourier...
assertEquals(1, aggregatorCourier.messages.size());
// Aggregators message map should be empty...
assertTrue(aggrMessageMap.isEmpty());
// Pump the message into the aggregater and make sure it's
// details are added properly to the map...
aggregator.process(aggregatorCourier.messages.get(0));
assertService1MessageDetailsOK(aggrMessageMap, service1Message);
// The aggregator timeout is 2000ms... sleep here for 3000ms...
Thread.sleep(4000);
// Aggregators message map should be empty because the message should
// have timed out...
assertTrue("Message didn't get removed from map after timeout", aggrMessageMap.isEmpty());
}
private void assertService1MessageDetailsOK(Map<String, Map<Integer, Message>> aggrMessageMap, AggregationDetails service1Message) {
// Aggregators message map should have 1 entry for the above message...
assertEquals(1, aggrMessageMap.size());
Map<Integer, Message> messageSeries = aggrMessageMap.get(service1Message.getSeriesUuid());
assertNotNull(messageSeries);
assertTrue(messageSeries.get(service1Message.getMessageNumber()) != null);
}
}