/**
*
* Copyright 2004 Hiram Chirino
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/
package org.codehaus.activemq.store.journal;
import java.io.DataOutputStream;
import junit.framework.TestCase;
import org.activeio.adapter.PacketByteArrayOutputStream;
import org.codehaus.activemq.io.impl.DefaultWireFormat;
import org.codehaus.activemq.message.ActiveMQBytesMessage;
import org.codehaus.activemq.message.ActiveMQMessage;
import org.codehaus.activemq.message.ActiveMQQueue;
import org.codehaus.activemq.message.MessageAck;
import org.codehaus.activemq.store.MessageStore;
import org.codehaus.activemq.store.PersistenceAdapter;
import org.codehaus.activemq.store.cache.SimpleCachePersistenceAdapter;
import EDU.oswego.cs.dl.util.concurrent.Callable;
import EDU.oswego.cs.dl.util.concurrent.Semaphore;
/**
* Used to micro benchmark the Journal Store operations.
*
* Make sure you run with jvm option -server (makes a big difference).
* The tests simulate storing 1000 1k jms messages to see the rate of
* processing msg/sec.
*
* @version $Revision: 1.5 $
*/
public class JournalStoreBenchmark extends TestCase {
private static final int MESSAGE_COUNT = Integer.parseInt(System.getProperty("MESSAGE_COUNT","100000"));
private PersistenceAdapter adapter;
private MessageStore store;
private ActiveMQBytesMessage message;
private MessageAck ack;
public static void main(String[] args) {
junit.textui.TestRunner.run(JournalStoreBenchmark.class);
}
protected void setUp() throws Exception {
JournalTestHelper helper = new JournalTestHelper();
adapter = helper.createPersistenceAdapter(JournalPersistenceAdapter.DEFAULT_JOURNAL_TYPE);
adapter = new SimpleCachePersistenceAdapter(adapter);
adapter.start();
store = adapter.createQueueMessageStore("TEST");
store.start();
message = new ActiveMQBytesMessage();
message.writeBytes(new byte[1024]);
message.setJMSDestination(new ActiveMQQueue("TEST"));
ack = new MessageAck();
ack.setDestination(message.getJMSActiveMQDestination());
}
protected void tearDown() throws Exception {
store.stop();
adapter.stop();
}
/**
*/
private void runConcurrentTest(int workers, final Callable test) throws InterruptedException, Throwable {
final Throwable workerError[] = new Throwable[1];
final Semaphore doneSemaphore = new Semaphore(1-workers);
for( int i=0; i < workers; i++ ) {
final String name = ""+i;
new Thread() {
public void run() {
try {
Thread.currentThread().setName(name);
test.call();
} catch (Throwable e) {
workerError[0] = e;
} finally {
doneSemaphore.release();
}
}
}.start();
}
doneSemaphore.acquire();
if( workerError[0] != null )
throw workerError[0];
}
static class ProgressPrinter {
private final int total;
private final int interval;
int percentDone=0;
int counter=0;
public ProgressPrinter(int total, int interval) {
this.total=total;
this.interval = interval;
}
synchronized public void increment() {
update(++counter);
}
synchronized public void update(int current) {
int at = 100*current/total;
if( (percentDone/interval) != (at/interval) ) {
percentDone=at;
System.out.println("Completed: "+percentDone+"%");
}
}
}
/**
* Runs at about 800 msg/sec on OS X G4 1.5ghz.
* Can we make this faster?
*/
public void testAsyncAddMessage() throws Exception {
message.setReceiptRequired(false);
ack.setReceiptRequired(false);
ProgressPrinter pp = new ProgressPrinter(MESSAGE_COUNT, 5);
//System.out.println("ready.");
//System.in.read();System.in.read();
long start = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) {
pp.increment();
message.setJMSMessageID("id:"+i);
store.addMessage(message);
ack.setMessageID(message.getJMSMessageID());
store.removeMessage(ack);
}
long end = System.currentTimeMillis();
System.out.println(getName() + ": test duration: " + (end - start) + " ms, published+acked msg/s: "
+ (MESSAGE_COUNT * 1000f / (end - start)));
//System.out.println("ready.");
//System.in.read();System.in.read();
}
/**
* Runs the above test in 10 concurrent threads.
*
* Runs at about 2200 msg/sec on OS X G4 1.5ghz.
* Can we make this faster?
*/
public void testConcurrentAsyncAddMessage() throws Throwable {
final ProgressPrinter pp = new ProgressPrinter(MESSAGE_COUNT, 5);
final int workers=2;
Callable task = new Callable() {
public Object call() throws Exception {
ActiveMQMessage messageCopy;
synchronized(message) {
messageCopy = message.deepCopy();
}
messageCopy.setReceiptRequired(false);
MessageAck ack = new MessageAck();
ack.setReceiptRequired(false);
ack.setDestination(messageCopy.getJMSActiveMQDestination());
messageCopy.setReceiptRequired(false);
int count = MESSAGE_COUNT/workers;
String id = "id:"+Thread.currentThread().getName()+":";
for (int i = 0; i < count; i++) {
pp.increment();
messageCopy.setJMSMessageID(id+i);
store.addMessage(messageCopy);
ack.setMessageID(message.getJMSMessageID());
store.removeMessage(ack);
}
return null;
}
};
//System.out.println("ready.");
//System.in.read();System.in.read();
long start = System.currentTimeMillis();
runConcurrentTest(workers, task);
long end = System.currentTimeMillis();
System.out.println(getName() + ": test duration: " + (end - start) + " ms, published+acked msg/s: "
+ (MESSAGE_COUNT * 1000f / (end - start)));
//System.out.println("ready.");
//System.in.read();System.in.read();
}
/**
* When a message is added to the journal. It's serialize. Test
* to see what the serialiation bottle neck is.
*
* Runs at about 2600 msg/sec on OS X G4 1.5ghz
*/
public void XtestMessageSerialization() throws Exception {
message.setReceiptRequired(false);
DefaultWireFormat wireFormat = new DefaultWireFormat();
long start = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) {
message.setJMSMessageID("id:"+i);
PacketByteArrayOutputStream pos = new PacketByteArrayOutputStream();
DataOutputStream os = new DataOutputStream(pos);
os.writeByte(1);
os.writeUTF("Test");
wireFormat.writePacket(message, os);
os.close();
}
long end = System.currentTimeMillis();
System.out.println(getName() + ": test duration: " + (end - start) + " ms, msg/s: "
+ (MESSAGE_COUNT * 1000f / (end - start)));
}
/**
* Runs the above test in 10 concurrent threads.
*
* Runs at about 3800 msg/sec on OS X G4 1.5ghz.
* Can we make this faster?
*/
public void XtestConcurrentMessageSerialization() throws Throwable {
int workers=10;
Callable task = new Callable() {
public Object call() throws Exception {
DefaultWireFormat wireFormat = new DefaultWireFormat();
ActiveMQMessage copy = message.deepCopy();
copy.setReceiptRequired(false);
for (int i = 0; i < MESSAGE_COUNT; i++) {
copy.setJMSMessageID("id:"+i);
PacketByteArrayOutputStream pos = new PacketByteArrayOutputStream();
DataOutputStream os = new DataOutputStream(pos);
os.writeByte(1);
os.writeUTF("Test");
wireFormat.writePacket(copy, os);
os.close();
}
return null;
}
};
long start = System.currentTimeMillis();
runConcurrentTest(workers, task);
long end = System.currentTimeMillis();
System.out.println(getName() + ": test duration: " + (end - start) + " ms, msg/s: "
+ (MESSAGE_COUNT * workers * 1000f / (end - start)));
}
}