/*
* JBoss, Home of Professional Open Source
* Copyright 2006, JBoss Inc., and individual contributors as indicated
* by the @authors tag. See the copyright.txt in the distribution for a
* full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.jboss.internal.soa.esb.couriers.tests;
import junit.framework.Assert;
import junit.framework.JUnit4TestAdapter;
import junit.framework.TestCase;
import org.jboss.internal.soa.esb.couriers.InVMCourier;
import org.jboss.internal.soa.esb.couriers.transport.InVMTemporaryTransport;
import org.jboss.internal.soa.esb.couriers.transport.InVMTransport;
import org.jboss.internal.soa.esb.services.registry.InVMRegistryInterceptor;
import org.jboss.internal.soa.esb.services.registry.MockRegistry;
import org.jboss.soa.esb.addressing.EPR;
import org.jboss.soa.esb.addressing.eprs.InVMEpr;
import org.jboss.soa.esb.couriers.CourierFactoryUtil;
import org.jboss.soa.esb.message.Message;
import org.jboss.soa.esb.message.format.MessageFactory;
import org.jboss.soa.esb.services.registry.ServiceNotFoundException;
import org.jboss.soa.esb.services.registry.RegistryException;
import org.junit.Test;
import java.net.URI;
public class InVMCourierUnitTest extends TestCase {
public static junit.framework.Test suite() {
return new JUnit4TestAdapter(InVMCourierUnitTest.class);
}
protected void tearDown() throws Exception {
CourierFactoryUtil.resetCourierFactory();
}
@Test
public void testUnthreadedDeliver() throws Exception {
InVMEpr epr = new InVMEpr(new URI("invm://serviceid1"));
InVMTransport.getInstance().registerEPR("x", "y", epr) ;
try
{
InVMCourier courier = new InVMCourier(epr);
Producer producer = new Producer(courier);
Consumer consumer = new Consumer(courier);
producer.run();
consumer.run();
Assert.assertEquals(true, consumer.valid());
}
finally
{
InVMTransport.getInstance().unRegisterEPR("x", "y", epr) ;
}
}
@Test
public void testThreadedDeliver() throws Exception {
InVMEpr epr = new InVMEpr(new URI("invm://serviceid2"));
InVMTransport.getInstance().registerEPR("x", "y", epr) ;
try
{
InVMCourier courier = new InVMCourier(epr);
Producer producer = new Producer(courier);
Consumer consumer = new Consumer(courier);
producer.start();
consumer.start();
consumer.join(TIMEOUT) ;
Assert.assertTrue("Consumer valid", consumer.valid()) ;
Assert.assertEquals(consumer.valid(), true);
}
finally
{
InVMTransport.getInstance().unRegisterEPR("x", "y", epr) ;
}
}
@Test
public void testDelayedThreadedDeliver() throws Exception {
InVMEpr epr = new InVMEpr(new URI("invm://serviceid3"));
InVMTransport.getInstance().registerEPR("x", "y", epr) ;
try
{
InVMCourier courier = new InVMCourier(epr);
Producer producer = new Producer(courier);
Consumer consumer = new Consumer(courier);
consumer.start();
try {
Thread.sleep(500);
}
catch (Exception ex) {
}
producer.start();
consumer.join(TIMEOUT) ;
Assert.assertTrue("Consumer valid", consumer.valid()) ;
Assert.assertEquals(consumer.valid(), true);
}
finally
{
InVMTransport.getInstance().unRegisterEPR("x", "y", epr) ;
}
}
@Test
public void testThreadedNullDeliver() throws Exception {
InVMEpr epr = new InVMEpr(new URI("invm://serviceid4"));
InVMTransport.getInstance().registerEPR("x", "y", epr) ;
try
{
InVMCourier courier = new InVMCourier(epr);
Consumer consumer = new Consumer(courier);
consumer.start();
try {
Thread.sleep(500);
}
catch (Exception ex) {
}
consumer.join(TIMEOUT) ;
Assert.assertFalse("Consumer terminated", consumer.isAlive()) ;
Assert.assertFalse("Consumer valid", consumer.valid()) ;
Assert.assertEquals(consumer.valid(), false);
}
finally
{
InVMTransport.getInstance().unRegisterEPR("x", "y", epr) ;
}
}
@Test
public void testLockstepDeliver() throws Exception {
InVMEpr epr = new InVMEpr(new URI("invm://serviceid5?true#2000"));
InVMTransport.getInstance().registerEPR("x", "y", epr) ;
try
{
InVMCourier courier = new InVMCourier(epr);
Producer producer = new Producer(courier);
Consumer consumer = new Consumer(courier);
consumer.start();
producer.start();
consumer.join(TIMEOUT) ;
Assert.assertTrue("Consumer valid", consumer.valid()) ;
Assert.assertEquals(consumer.valid(), true);
}
finally
{
InVMTransport.getInstance().unRegisterEPR("x", "y", epr) ;
}
}
@Test
public void testPassByValueDeliver() throws Exception {
InVMEpr epr = new InVMEpr(new URI("invm://serviceid5/true?true#2000"));
InVMTransport.getInstance().registerEPR("x", "y", epr) ;
try
{
InVMCourier courier = new InVMCourier(epr);
Producer producer = new Producer(courier);
Consumer consumer = new Consumer(courier);
consumer.start();
producer.start();
consumer.join(TIMEOUT) ;
Assert.assertTrue("Consumer valid", consumer.valid()) ;
Assert.assertEquals(consumer.valid(), true);
}
finally
{
InVMTransport.getInstance().unRegisterEPR("x", "y", epr) ;
}
}
@Test
public void testLockstepMultiProducerPerformance() throws Exception {
InVMEpr epr = new InVMEpr(new URI("invm://serviceid5?true#2000"));
InVMTransport.getInstance().registerEPR("x", "y", epr) ;
try
{
int iters = 1000;
int numberOfProducers = 50;
Producer[] producer = new Producer[numberOfProducers];
InVMCourier courier = new InVMCourier(epr);
Consumer consumer = new Consumer(courier, iters * numberOfProducers,
false);
long stime = System.currentTimeMillis();
for (int i = 0; i < numberOfProducers; i++)
producer[i] = new Producer(courier, iters, false);
consumer.start();
for (int j = 0; j < numberOfProducers; j++)
producer[j].start();
consumer.join(TIMEOUT) ;
Assert.assertTrue("Consumer valid", consumer.valid()) ;
long ftime = System.currentTimeMillis();
double factor = 1000.00 / (ftime - stime);
double msgsPerSecond = iters * factor * numberOfProducers;
if (!consumer.valid())
System.err.println("Completed " + consumer.itersCompleted());
Assert.assertEquals(consumer.valid(), true);
System.err.println("Time for " + iters * numberOfProducers
+ " messages is " + (ftime - stime) + " milliseconds.");
System.err.println("Messages per second: " + msgsPerSecond);
}
finally
{
InVMTransport.getInstance().unRegisterEPR("x", "y", epr) ;
}
}
@Test
public void test_JBESB_2108_01() throws ServiceNotFoundException, RegistryException {
InVMRegistryInterceptor reg = new InVMRegistryInterceptor();
try
{
reg.setRegistry(new MockRegistry()) ;
reg.registerEPR("x", "y", "blah", new InVMEpr(new EPR(URI.create("x://123"))), "blah");
EPR eprFromReg = reg.findEPR("x", "y");
Assert.assertEquals("InVM EPR count", 1, InVMTransport.getInstance().findEPRs("x", "y").size()) ;
// Register the EPR again...
reg.registerEPR("x", "y", "blah", new InVMEpr(new EPR(URI.create("x://123"))), "blah");
Assert.assertEquals("InVM EPR count", 2, InVMTransport.getInstance().findEPRs("x", "y").size()) ;
// Unregister it once...
reg.unRegisterEPR("x", "y", new InVMEpr(new EPR(URI.create("x://123"))));
// Should still be able to find it...
reg.findEPR("x", "y");
Assert.assertEquals("InVM EPR count", 1, InVMTransport.getInstance().findEPRs("x", "y").size()) ;
// Unregister it again...
reg.unRegisterEPR("x", "y", new InVMEpr(new EPR(URI.create("x://123"))));
Assert.assertEquals("InVM EPR count", 0, InVMTransport.getInstance().findEPRs("x", "y").size()) ;
}
finally
{
InVMTransport.getInstance().unRegisterService("x", "y") ;
}
}
@Test
public void test_JBESB_2108_02() throws ServiceNotFoundException, RegistryException {
// KEV
InVMRegistryInterceptor reg = new InVMRegistryInterceptor();
try
{
reg.setRegistry(new MockRegistry()) ;
reg.registerEPR("xy", "z", "blah", new InVMEpr(new EPR(URI.create("x://123"))), "blah");
try {
reg.registerEPR("x", "yz", "blah", new InVMEpr(new EPR(URI.create("x://123"))), "blah");
Assert.fail("Expected RegistryException");
} catch (RegistryException e) {} // expected
}
finally
{
InVMTransport.getInstance().unRegisterService("xy", "z") ;
InVMTransport.getInstance().unRegisterService("x", "yz") ;
}
}
@Test
public void test_JBESB_2108_03() throws ServiceNotFoundException, RegistryException {
InVMRegistryInterceptor reg = new InVMRegistryInterceptor();
try
{
EPR xy_z = new InVMEpr(new EPR(URI.create("x://123")));
EPR x_yz = new InVMEpr(new EPR(URI.create("x://1234")));
reg.setRegistry(new MockRegistry()) ;
// Register "xy:z"...
reg.registerEPR("xy", "z", "blah", xy_z, "blah");
Assert.assertEquals(xy_z, reg.findEPR("xy", "z"));
Assert.assertEquals("InVM EPR count", 1, InVMTransport.getInstance().findEPRs("xy", "z").size()) ;
Assert.assertEquals("InVM EPR count", 0, InVMTransport.getInstance().findEPRs("x", "yz").size()) ;
// Register "x:yz"...
reg.registerEPR("x", "yz", "blah", x_yz, "blah");
Assert.assertEquals(x_yz, reg.findEPR("x", "yz"));
Assert.assertEquals("InVM EPR count", 1, InVMTransport.getInstance().findEPRs("xy", "z").size()) ;
Assert.assertEquals("InVM EPR count", 1, InVMTransport.getInstance().findEPRs("x", "yz").size()) ;
// Unregister "xy:z"...
reg.unRegisterEPR("xy", "z", xy_z);
Assert.assertEquals("InVM EPR count", 0, InVMTransport.getInstance().findEPRs("xy", "z").size()) ;
Assert.assertEquals("InVM EPR count", 1, InVMTransport.getInstance().findEPRs("x", "yz").size()) ;
// Unregister "x:yz"...
reg.unRegisterEPR("x", "yz", x_yz);
Assert.assertEquals("InVM EPR count", 0, InVMTransport.getInstance().findEPRs("xy", "z").size()) ;
Assert.assertEquals("InVM EPR count", 0, InVMTransport.getInstance().findEPRs("x", "yz").size()) ;
}
finally
{
InVMTransport.getInstance().unRegisterService("x", "yz") ;
InVMTransport.getInstance().unRegisterService("xy", "z") ;
}
}
@Test
public void testTemporaryUnthreadedDeliver() throws Exception {
InVMEpr epr = new InVMEpr(new URI("invm://serviceid1"));
epr.setTemporaryEPR(true) ;
Assert.assertEquals("Ordered entry size", 0, InVMTemporaryTransport.getInstance().getOrderedEntriesSize()) ;
Assert.assertEquals("Service id size", 0, InVMTemporaryTransport.getInstance().getServiceIdToEntrySize()) ;
InVMCourier courier = new InVMCourier(epr);
Producer producer = new Producer(courier);
Consumer consumer = new Consumer(courier);
producer.run();
consumer.run();
Assert.assertEquals(true, consumer.valid());
Assert.assertEquals("Ordered entry size", 0, InVMTemporaryTransport.getInstance().getOrderedEntriesSize()) ;
Assert.assertEquals("Service id size", 0, InVMTemporaryTransport.getInstance().getServiceIdToEntrySize()) ;
}
@Test
public void testTemporaryThreadedDeliver() throws Exception {
InVMEpr epr = new InVMEpr(new URI("invm://serviceid2"));
epr.setTemporaryEPR(true) ;
Assert.assertEquals("Ordered entry size", 0, InVMTemporaryTransport.getInstance().getOrderedEntriesSize()) ;
Assert.assertEquals("Service id size", 0, InVMTemporaryTransport.getInstance().getServiceIdToEntrySize()) ;
InVMCourier courier = new InVMCourier(epr);
Producer producer = new Producer(courier);
Consumer consumer = new Consumer(courier);
producer.start();
consumer.start();
consumer.join(TIMEOUT) ;
Assert.assertTrue("Consumer valid", consumer.valid()) ;
Assert.assertEquals(consumer.valid(), true);
Assert.assertEquals("Ordered entry size", 0, InVMTemporaryTransport.getInstance().getOrderedEntriesSize()) ;
Assert.assertEquals("Service id size", 0, InVMTemporaryTransport.getInstance().getServiceIdToEntrySize()) ;
}
@Test
public void testTemporaryDelayedThreadedDeliver() throws Exception {
InVMEpr epr = new InVMEpr(new URI("invm://serviceid3"));
epr.setTemporaryEPR(true) ;
Assert.assertEquals("Ordered entry size", 0, InVMTemporaryTransport.getInstance().getOrderedEntriesSize()) ;
Assert.assertEquals("Service id size", 0, InVMTemporaryTransport.getInstance().getServiceIdToEntrySize()) ;
InVMCourier courier = new InVMCourier(epr);
Producer producer = new Producer(courier);
Consumer consumer = new Consumer(courier);
consumer.start();
try {
Thread.sleep(500);
}
catch (Exception ex) {
}
producer.start();
consumer.join(TIMEOUT) ;
Assert.assertTrue("Consumer valid", consumer.valid()) ;
Assert.assertEquals(consumer.valid(), true);
Assert.assertEquals("Ordered entry size", 0, InVMTemporaryTransport.getInstance().getOrderedEntriesSize()) ;
Assert.assertEquals("Service id size", 0, InVMTemporaryTransport.getInstance().getServiceIdToEntrySize()) ;
}
@Test
public void testTemporaryThreadedNullDeliver() throws Exception {
InVMEpr epr = new InVMEpr(new URI("invm://serviceid4"));
epr.setTemporaryEPR(true) ;
Assert.assertEquals("Ordered entry size", 0, InVMTemporaryTransport.getInstance().getOrderedEntriesSize()) ;
Assert.assertEquals("Service id size", 0, InVMTemporaryTransport.getInstance().getServiceIdToEntrySize()) ;
InVMCourier courier = new InVMCourier(epr);
Consumer consumer = new Consumer(courier);
consumer.start();
try {
Thread.sleep(500);
}
catch (Exception ex) {
}
consumer.join(TIMEOUT) ;
Assert.assertFalse("Consumer terminated", consumer.isAlive()) ;
Assert.assertFalse("Consumer valid", consumer.valid()) ;
Assert.assertEquals(consumer.valid(), false);
Assert.assertEquals("Ordered entry size", 0, InVMTemporaryTransport.getInstance().getOrderedEntriesSize()) ;
Assert.assertEquals("Service id size", 0, InVMTemporaryTransport.getInstance().getServiceIdToEntrySize()) ;
}
@Test
public void testTemporaryLockstepDeliver() throws Exception {
InVMEpr epr = new InVMEpr(new URI("invm://serviceid5?true#2000"));
epr.setTemporaryEPR(true) ;
Assert.assertEquals("Ordered entry size", 0, InVMTemporaryTransport.getInstance().getOrderedEntriesSize()) ;
Assert.assertEquals("Service id size", 0, InVMTemporaryTransport.getInstance().getServiceIdToEntrySize()) ;
InVMCourier courier = new InVMCourier(epr);
Producer producer = new Producer(courier);
Consumer consumer = new Consumer(courier);
consumer.start();
producer.start();
consumer.join(TIMEOUT) ;
Assert.assertTrue("Consumer valid", consumer.valid()) ;
Assert.assertEquals(consumer.valid(), true);
Assert.assertEquals("Ordered entry size", 0, InVMTemporaryTransport.getInstance().getOrderedEntriesSize()) ;
Assert.assertEquals("Service id size", 0, InVMTemporaryTransport.getInstance().getServiceIdToEntrySize()) ;
}
@Test
public void testTemporaryPassByValueDeliver() throws Exception {
InVMEpr epr = new InVMEpr(new URI("invm://serviceid5/true?true#2000"));
epr.setTemporaryEPR(true) ;
Assert.assertEquals("Ordered entry size", 0, InVMTemporaryTransport.getInstance().getOrderedEntriesSize()) ;
Assert.assertEquals("Service id size", 0, InVMTemporaryTransport.getInstance().getServiceIdToEntrySize()) ;
InVMCourier courier = new InVMCourier(epr);
Producer producer = new Producer(courier);
Consumer consumer = new Consumer(courier);
consumer.start();
producer.start();
consumer.join(TIMEOUT) ;
Assert.assertTrue("Consumer valid", consumer.valid()) ;
Assert.assertEquals(consumer.valid(), true);
Assert.assertEquals("Ordered entry size", 0, InVMTemporaryTransport.getInstance().getOrderedEntriesSize()) ;
Assert.assertEquals("Service id size", 0, InVMTemporaryTransport.getInstance().getServiceIdToEntrySize()) ;
}
@Test
public void testTemporaryLockstepMultiProducerPerformance() throws Exception {
InVMEpr epr = new InVMEpr(new URI("invm://serviceid5?true#2000"));
epr.setTemporaryEPR(true) ;
Assert.assertEquals("Ordered entry size", 0, InVMTemporaryTransport.getInstance().getOrderedEntriesSize()) ;
Assert.assertEquals("Service id size", 0, InVMTemporaryTransport.getInstance().getServiceIdToEntrySize()) ;
int iters = 1000;
int numberOfProducers = 50;
Producer[] producer = new Producer[numberOfProducers];
InVMCourier courier = new InVMCourier(epr);
Consumer consumer = new Consumer(courier, iters * numberOfProducers,
false);
long stime = System.currentTimeMillis();
for (int i = 0; i < numberOfProducers; i++)
producer[i] = new Producer(courier, iters, false);
consumer.start();
for (int j = 0; j < numberOfProducers; j++)
producer[j].start();
consumer.join(TIMEOUT) ;
Assert.assertTrue("Consumer valid", consumer.valid()) ;
long ftime = System.currentTimeMillis();
double factor = 1000.00 / (ftime - stime);
double msgsPerSecond = iters * factor * numberOfProducers;
if (!consumer.valid())
System.err.println("Completed " + consumer.itersCompleted());
Assert.assertEquals(consumer.valid(), true);
Assert.assertEquals("Ordered entry size", 0, InVMTemporaryTransport.getInstance().getOrderedEntriesSize()) ;
Assert.assertEquals("Service id size", 0, InVMTemporaryTransport.getInstance().getServiceIdToEntrySize()) ;
System.err.println("Time for " + iters * numberOfProducers
+ " messages is " + (ftime - stime) + " milliseconds.");
System.err.println("Messages per second: " + msgsPerSecond);
}
public static final int ITERATIONS = 10;
public static final int TIMEOUT = 30000; // on slow machines this may need to be increased.
}
class Producer extends Thread {
public Producer(InVMCourier courier, int iters, boolean debug) {
this.courier = courier;
_iters = iters;
_debug = debug;
}
public Producer(InVMCourier courier) {
this(courier, InVMCourierUnitTest.ITERATIONS, true);
}
public void run() {
try {
int number = 0;
int iterations = _iters;
while (number < iterations) {
Message msg = MessageFactory.getInstance().getMessage();
msg.getBody().add(new String("Message: " + number++));
courier.deliver(msg);
if (_debug)
System.err.println("Delivered " + msg);
Thread.yield();
}
}
catch (Exception ex) {
ex.printStackTrace();
}
courier = null;
}
private InVMCourier courier;
private int _iters;
private boolean _debug;
}
class Consumer extends Thread {
public Consumer(InVMCourier courier) {
this(courier, InVMCourierUnitTest.ITERATIONS, true);
}
public Consumer(InVMCourier courier, int iters, boolean debug) {
this.courier = courier;
_iters = iters;
_debug = debug;
}
public void run() {
try {
int i;
if (_debug)
System.out.println("Consumer Iters: " + _iters);
for (i = 0; i < _iters; i++) {
Message msg = courier.pickup(2000);
if (_debug)
System.err.println("Received: " + msg);
if (msg == null)
return;
else
Thread.yield();
}
if (i == _iters)
_valid = true;
_itersCompleted = i;
}
catch (Exception ex) {
ex.printStackTrace();
}
courier = null;
}
public int itersCompleted() {
return _itersCompleted;
}
public boolean valid() {
return _valid;
}
private InVMCourier courier;
private boolean _valid = false;
private int _iters;
private boolean _debug;
private int _itersCompleted = 0;
}