/**
*
* Copyright 2004 Protique Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/
package org.codehaus.activemq.message.util;
import java.util.ArrayList;
import java.util.List;
import junit.framework.Test;
import junit.framework.TestCase;
import junit.framework.TestSuite;
import org.codehaus.activemq.capacity.CapacityMonitorEvent;
import org.codehaus.activemq.capacity.CapacityMonitorEventListener;
import org.codehaus.activemq.message.Packet;
import org.codehaus.activemq.message.Receipt;
import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
/**
* MemoryBoundedQueueTest
*
* @version $Revision: 1.2 $
*/
public class MemoryBoundedQueueTest extends TestCase {
private static final int TEST_INSTANCE_SIZE = 2048;
private static final int TEST_ENQUEUE_SIZE = TEST_INSTANCE_SIZE / 2;
private static final String QUEUE_NAME = "TestQueue";
private final int TOTAL_LOAD = 50000;
private final int NUMBER_CONSUMERS = 10;
private SynchronizedInt count = new SynchronizedInt(0);
private SynchronizedInt stoppedCount = new SynchronizedInt(0);
private final MemoryBoundedQueueManager queueManager = new MemoryBoundedQueueManager("testmanager", 1024 * 1024);
private class Dequeue implements Runnable {
private MemoryBoundedQueue queue;
private Object mutex;
private int num = 0;
private int internalCount = 0;
private int localCount;
Dequeue(MemoryBoundedQueue q, int num, Object mutex, int localCount) {
this.queue = q;
this.num = num;
this.mutex = mutex;
this.localCount = localCount;
}
public void run() {
while (internalCount < localCount) {
try {
Packet obj = queue.dequeue();
if (obj != null) {
count.increment();
internalCount++;
if (count.get() == TOTAL_LOAD) {
synchronized (mutex) {
queue.stop();
mutex.notify();
}
}
}
else {
break;
}
}
catch (InterruptedException ie) {
ie.printStackTrace();
}
Thread.yield();
}
stoppedCount.increment();
}
public String toString() {
String result = "Dequeue(" + num + ") count = " + internalCount;
return result;
}
}
public MemoryBoundedQueueTest(String s) {
super(s);
/*
queueManager.addCapacityChangedEventListener(new CapacityMonitorEventListener(){
public void capacityChanged(CapacityMonitorEvent event) {
System.out.println("Capacity Changed: = " + event);
}
});
*/
}
protected void setUp() {
}
protected void tearDown() {
}
public void testLoad() throws Exception {
Object mutex = new Object();
final MemoryBoundedQueue queue = queueManager.getMemoryBoundedQueue(QUEUE_NAME);
queueManager.setValueLimit(TEST_INSTANCE_SIZE * 100);
final List list = new ArrayList(NUMBER_CONSUMERS);
int numberOfMessages = TOTAL_LOAD / NUMBER_CONSUMERS;
for (int i = 0;i < NUMBER_CONSUMERS;i++) {
Dequeue dq = new Dequeue(queue, i, mutex, numberOfMessages);
list.add(dq);
Thread t = new Thread(dq);
t.setPriority(Thread.NORM_PRIORITY - 1);
t.start();
}
Thread t = new Thread(new Runnable() {
public void run() {
try {
while (count.get() < TOTAL_LOAD) {
Thread.sleep(250);
//System.out.println("Count so far = " + count);
}
}
catch (Throwable e) {
e.printStackTrace();
}
}
});
t.setPriority(Thread.MAX_PRIORITY);
t.start();
for (int i = 0;i < TOTAL_LOAD;i++) {
Receipt rec = new Receipt();
rec.setMemoryUsage(TEST_INSTANCE_SIZE);
queue.enqueue(rec);
}
try {
synchronized (mutex) {
while (count.get() < TOTAL_LOAD) {
mutex.wait(250);
}
}
}
catch (InterruptedException ie) {
ie.printStackTrace();
}
//System.out.println("Finished!");
Thread.sleep(250);
assertTrue(stoppedCount.get() == NUMBER_CONSUMERS);
//System.out.println("total memory left = " + MemoryBoundedQueueManager.instance.getTotalMemoryUsedSize());
assertTrue(queueManager.getTotalMemoryUsedSize() == 0);
queue.close();
}
public void testClear() {
final MemoryBoundedQueue queue = queueManager.getMemoryBoundedQueue(QUEUE_NAME);
queueManager.setValueLimit(TEST_INSTANCE_SIZE);
Receipt obj = new Receipt();
queue.enqueue(obj);
queue.clear();
assertTrue(queue.size() == 0);
queue.close();
}
public void testDequeue() throws Exception {
final MemoryBoundedQueue queue = queueManager.getMemoryBoundedQueue(QUEUE_NAME);
queueManager.setValueLimit(TEST_INSTANCE_SIZE * 100);
Receipt obj = new Receipt();
queue.enqueue(obj);
Object result = queue.dequeue();
assertTrue(result == obj);
queue.close();
}
public void testClose() {
/** @todo: Insert test code here. Use assertEquals(), for example. */
final MemoryBoundedQueue queue = queueManager.getMemoryBoundedQueue(QUEUE_NAME);
queueManager.setValueLimit(TEST_ENQUEUE_SIZE);
final SynchronizedBoolean success = new SynchronizedBoolean(false);
final MemoryBoundedQueue q1 = queueManager.getMemoryBoundedQueue(QUEUE_NAME);
assertTrue(queue == q1);
Thread t = new Thread(new Runnable() {
public void run() {
try {
Thread.sleep(250);
queue.dequeue();
}
catch (Exception e) {
e.printStackTrace();
}
synchronized (success) {
success.set(true);
success.notify();
}
}
});
t.start();
queue.close();
try {
synchronized (success) {
if (!success.get()) {
success.wait(2000);
}
}
}
catch (Throwable e) {
e.printStackTrace();
}
assertTrue(success.get());
MemoryBoundedQueue q2 = queueManager.getMemoryBoundedQueue(QUEUE_NAME);
assertTrue(queue != q2);
}
public void testDequeueNoWait() throws Exception {
final MemoryBoundedQueue queue = queueManager.getMemoryBoundedQueue(QUEUE_NAME);
Object obj = queue.dequeueNoWait();
assertTrue(obj == null);
queue.close();
}
public void testEnqueueFirst() throws Exception {
final MemoryBoundedQueue queue = queueManager.getMemoryBoundedQueue(QUEUE_NAME);
assertTrue(queueManager.getTotalMemoryUsedSize() == 0);
Object mutex = new Object();
queueManager.setValueLimit(TEST_INSTANCE_SIZE * 100);
for (int i = 0;i < 10;i++) {
queue.enqueue(new Receipt());
}
Receipt test = new Receipt();
test.setId("FIRST");
queue.enqueueFirst(test);
Object obj = queue.dequeue();
assertTrue(obj == test);
queue.close();
}
public void testEnqueueNoBlock() {
MemoryBoundedQueue queue = queueManager.getMemoryBoundedQueue(QUEUE_NAME);
queueManager.setValueLimit(TEST_ENQUEUE_SIZE);
Receipt test = new Receipt();
queue.enqueueNoBlock(test);
assertTrue(true);
queue.close();
}
public void testIsEmpty() {
int size = 10;
MemoryBoundedQueue queue = queueManager.getMemoryBoundedQueue(QUEUE_NAME);
for (int i = 0;i < size;i++) {
queue.enqueue(new Receipt());
}
queue.clear();
assertTrue(queue.isEmpty());
queue.close();
}
public void testRemove() {
MemoryBoundedQueue queue = queueManager.getMemoryBoundedQueue(QUEUE_NAME);
Receipt test = new Receipt();
queue.enqueue(test);
assertTrue(queue.remove(test));
queue.close();
}
public void testSize() {
int size = 10;
MemoryBoundedQueue queue = queueManager.getMemoryBoundedQueue(QUEUE_NAME);
for (int i = 0;i < size;i++) {
queue.enqueue(new Receipt());
}
assertTrue(queue.size() == size);
queue.close();
}
public void testRemovePacket(){
int size = 100;
MemoryBoundedQueue queue = queueManager.getMemoryBoundedQueue(QUEUE_NAME);
List list = new ArrayList(size);
for (int i = 0;i < size;i++) {
Packet p = new Receipt();
p.setId(""+i);
list.add(p);
queue.enqueue(p);
}
for (int i =0; i < size; i++){
queue.remove((Packet)list.get(i));
}
assertTrue(queue.size() == 0);
queue.close();
}
public void testRemovePacketById(){
int size = 100;
MemoryBoundedQueue queue = queueManager.getMemoryBoundedQueue(QUEUE_NAME);
List list = new ArrayList(size);
for (int i = 0;i < size;i++) {
Packet p = new Receipt();
p.setId(""+i);
list.add(p);
queue.enqueue(p);
}
for (int i =0; i < size; i++){
Packet p = (Packet)list.get(i);
Packet removed = queue.remove(p.getId());
assertTrue(removed != null);
assertTrue(removed == p);
}
assertTrue(queue.size() == 0);
queue.close();
}
public static Test suite() {
return new TestSuite(MemoryBoundedQueueTest.class);
}
public static void main(String[] args) {
MemoryBoundedQueueTest test = new MemoryBoundedQueueTest("test");
test.setUp();
test.testClose();
}
}