Package org.apache.activemq.broker.virtual

Source Code of org.apache.activemq.broker.virtual.MirroredQueueCorrectMemoryUsageTest

/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements.  See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License.  You may obtain a copy of the License at
*
*      http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.virtual;

import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;

import org.apache.activemq.EmbeddedBrokerTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.DestinationInterceptor;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.virtual.MirroredQueue;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.StoreUsage;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.usage.TempUsage;
import org.apache.activemq.util.IOHelper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;

/**
* This test will determine that the producer flow control does not kick in.
* The original MirroredQueue implementation was causing the queue to update
* the topic memory usage instead of the queue memory usage.
* The reason is that the message memory usage instance will not be updated
* unless it is null.  This was the case when the message was initially sent
* to the topic but then it was non-null when it was being sent to the queue.
* When the region destination was set, the associated memory usage was not
* updated to the passed queue destination and thus the memory usage of the
* topic was being updated instead.
*
* @author Claudio Corsi
*/
public class MirroredQueueCorrectMemoryUsageTest extends EmbeddedBrokerTestSupport {

    private static final Logger logger = LoggerFactory.getLogger(MirroredQueueCorrectMemoryUsageTest.class);

    private static final long ONE_MB = 0x0100000;
    private static final long TEN_MB = ONE_MB * 10;
    private static final long TWENTY_MB = TEN_MB * 2;

    private static final String CREATED_STATIC_FOR_PERSISTENT = "created.static.for.persistent";

    @Override
    protected boolean isPersistent() {
        return true;
    }

    @Override
    protected BrokerService createBroker() throws Exception {
        // Create the broker service instance....
        BrokerService broker = super.createBroker();
        // Create and add the mirrored queue destination interceptor ....
        DestinationInterceptor[] destinationInterceptors = new DestinationInterceptor[1];
        MirroredQueue mq = new MirroredQueue();
        mq.setCopyMessage(true);
        mq.setPrefix("");
        mq.setPostfix(".qmirror");
        destinationInterceptors[0] = mq;
        broker.setDestinationInterceptors(destinationInterceptors);
        // Create the destination policy for the topics and queues
        PolicyMap policyMap = new PolicyMap();
        List<PolicyEntry> entries = new LinkedList<PolicyEntry>();
        // Create Topic policy entry
        PolicyEntry policyEntry = new PolicyEntry();
        super.useTopic = true;
        ActiveMQDestination destination = super.createDestination(">");
        Assert.isTrue(destination.isTopic(), "Created destination was not a topic");
        policyEntry.setDestination(destination);
        policyEntry.setProducerFlowControl(true);
        policyEntry.setMemoryLimit(ONE_MB); // x10
        entries.add(policyEntry);
        // Create Queue policy entry
        policyEntry = new PolicyEntry();
        super.useTopic = false;
        destination = super.createDestination(CREATED_STATIC_FOR_PERSISTENT);
        Assert.isTrue(destination.isQueue(), "Created destination was not a queue");
        policyEntry.setDestination(destination);
        policyEntry.setProducerFlowControl(true);
        policyEntry.setMemoryLimit(TEN_MB);
        entries.add(policyEntry);
        policyMap.setPolicyEntries(entries);
        broker.setDestinationPolicy(policyMap);
        // Set destinations
        broker.setDestinations(new ActiveMQDestination[] { destination });
        // Set system usage
        SystemUsage memoryManager = new SystemUsage();
        MemoryUsage memoryUsage = new MemoryUsage();
        memoryUsage.setLimit(TEN_MB);
        memoryManager.setMemoryUsage(memoryUsage);
        StoreUsage storeUsage = new StoreUsage();
        storeUsage.setLimit(TWENTY_MB);
        memoryManager.setStoreUsage(storeUsage);
        TempUsage tempDiskUsage = new TempUsage();
        tempDiskUsage.setLimit(TEN_MB);
        memoryManager.setTempUsage(tempDiskUsage);
        broker.setSystemUsage(memoryManager);
        // Set the persistent adapter
        KahaDBPersistenceAdapter persistenceAdapter = new KahaDBPersistenceAdapter();
        persistenceAdapter.setJournalMaxFileLength((int)TEN_MB);
        // Delete all current messages...
        IOHelper.deleteFile(persistenceAdapter.getDirectory());
        broker.setPersistenceAdapter(persistenceAdapter);
        return broker;
    }

    @Before
    protected void setUp() throws Exception {
        super.setUp();
    }

    @After
    protected void tearDown() throws Exception {
        super.tearDown();
    }

    @Test(timeout=40000)
    public void testNoMemoryUsageIncreaseForTopic() throws Exception {
        Connection connection = super.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
        Destination destination = session.createQueue(CREATED_STATIC_FOR_PERSISTENT);
        MessageProducer producer = session.createProducer(destination);
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
            char[] m = new char[1024];
            Arrays.fill(m, 'x');
            // create some messages that have 1k each
            for (int i = 1; i < 12000; i++) {
                 producer.send(session.createTextMessage(new String(m)));
                 logger.debug("Sent message: " + i);
            }
            producer.close();
            session.close();
            connection.stop();
            connection.close();
    }
}
TOP

Related Classes of org.apache.activemq.broker.virtual.MirroredQueueCorrectMemoryUsageTest

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.