Package org.mule.util.queue

Source Code of org.mule.util.queue.LocalTxQueueTransactionRecovererTestCase

/*
* Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
* The software in this package is published under the terms of the CPAL v1.0
* license, a copy of which has been included with this distribution in the
* LICENSE.txt file.
*/
package org.mule.util.queue;

import static org.hamcrest.core.Is.is;
import static org.hamcrest.core.IsNull.notNullValue;
import static org.hamcrest.core.IsNull.nullValue;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;

import org.mule.api.MuleEvent;
import org.mule.api.context.MuleContextBuilder;
import org.mule.config.DefaultMuleConfiguration;
import org.mule.tck.junit4.AbstractMuleContextTestCase;
import org.mule.util.journal.queue.LocalTxQueueTransactionJournal;
import org.mule.util.journal.queue.LocalTxQueueTransactionRecoverer;
import org.mule.util.xa.ResourceManagerException;

import java.io.Serializable;

import org.apache.commons.lang.NotImplementedException;
import org.hamcrest.core.IsNull;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class LocalTxQueueTransactionRecovererTestCase extends AbstractMuleContextTestCase
{

    public static final String QUEUE_NAME = "inQueue";
    public static final String MESSAGE_CONTENT = "data";

    @Rule
    public TemporaryFolder temporaryFolder = new TemporaryFolder();
    private DefaultQueueStore inQueue;


    @Override
    protected void configureMuleContext(MuleContextBuilder contextBuilder)
    {
        DefaultMuleConfiguration muleConfiguration = new DefaultMuleConfiguration();
        muleConfiguration.setWorkingDirectory(temporaryFolder.getRoot().getAbsolutePath());
        contextBuilder.setMuleConfiguration(muleConfiguration);
    }

    @Test
    public void pollAndFailThenRecover() throws Exception
    {
        ((DefaultMuleConfiguration) muleContext.getConfiguration()).setWorkingDirectory(temporaryFolder.getRoot().getAbsolutePath());
        LocalTxQueueTransactionJournal txLog = new LocalTxQueueTransactionJournal(temporaryFolder.getRoot().getAbsolutePath(), muleContext);
        inQueue = new DefaultQueueStore(QUEUE_NAME, muleContext, new DefaultQueueConfiguration(0, true));
        PersistentQueueTransactionContext persistentTransactionContext = new PersistentQueueTransactionContext(txLog, createQueueProvider(inQueue));
        MuleEvent testEvent = getTestEvent(MESSAGE_CONTENT);
        inQueue.offer(testEvent, 0, 10);
        Serializable value = persistentTransactionContext.poll(inQueue, 100000);
        assertThat(inQueue.poll(10), IsNull.nullValue());
        assertThat(value, notNullValue());
        txLog.close();
        txLog = new LocalTxQueueTransactionJournal(temporaryFolder.getRoot().getAbsolutePath(), muleContext);
        LocalTxQueueTransactionRecoverer queueTransactionRecoverer = new LocalTxQueueTransactionRecoverer(txLog, createQueueProvider(inQueue));
        queueTransactionRecoverer.recover();
        Serializable muleEvent = inQueue.poll(10);
        assertThat(muleEvent, notNullValue());
        assertThat(testEvent.equals(muleEvent), is(true));
    }

    @Test
    public void failBetweenLogEntryWriteAndRealPoolThenRecover() throws Exception
    {
        ((DefaultMuleConfiguration) muleContext.getConfiguration()).setWorkingDirectory(temporaryFolder.getRoot().getAbsolutePath());
        LocalTxQueueTransactionJournal txLog = new LocalTxQueueTransactionJournal(temporaryFolder.getRoot().getAbsolutePath(), muleContext);
        final DefaultQueueStore inQueue = new DefaultQueueStore(QUEUE_NAME, muleContext, new DefaultQueueConfiguration(0, true));
        PersistentQueueTransactionContext persistentTransactionContext = new PersistentQueueTransactionContext(txLog, createQueueProvider(inQueue));
        MuleEvent testEvent = getTestEvent(MESSAGE_CONTENT);
        inQueue.offer(testEvent, 0, 10);
        persistentTransactionContext.poll(inQueue, 10);
        txLog.close();
        txLog = new LocalTxQueueTransactionJournal(temporaryFolder.getRoot().getAbsolutePath(), muleContext);
        LocalTxQueueTransactionRecoverer queueTransactionRecoverer = new LocalTxQueueTransactionRecoverer(txLog, createQueueProvider(inQueue));
        queueTransactionRecoverer.recover();
        Serializable muleEvent = inQueue.poll(10);
        assertThat(muleEvent, notNullValue());
        assertThat(testEvent.equals(muleEvent), is(true));
        muleEvent = inQueue.poll(10);
        assertThat(muleEvent, IsNull.nullValue());
    }

    private QueueProvider createQueueProvider(final DefaultQueueStore queue)
    {
        return new QueueProvider()
        {
            @Override
            public QueueStore getQueue(String queueName)
            {
                throw new NotImplementedException();
            }

            @Override
            public RecoverableQueueStore getRecoveryQueue(String queueName)
            {
                return queue;
            }
        };
    }

    @Test
    public void offerAndFailThenRecover() throws Exception
    {
        ((DefaultMuleConfiguration) muleContext.getConfiguration()).setWorkingDirectory(temporaryFolder.getRoot().getAbsolutePath());
        LocalTxQueueTransactionJournal txLog = new LocalTxQueueTransactionJournal(temporaryFolder.getRoot().getAbsolutePath(), muleContext);
        final DefaultQueueStore outQueue = new DefaultQueueStore(QUEUE_NAME, muleContext, new DefaultQueueConfiguration(0, true));
        PersistentQueueTransactionContext persistentTransactionContext = new PersistentQueueTransactionContext(txLog, createQueueProvider(outQueue));
        MuleEvent testEvent = getTestEvent(MESSAGE_CONTENT);
        persistentTransactionContext.offer(outQueue, testEvent, 10);
        assertThat(outQueue.poll(10), IsNull.nullValue());
        txLog.close();
        txLog = new LocalTxQueueTransactionJournal(temporaryFolder.getRoot().getAbsolutePath(), muleContext);
        LocalTxQueueTransactionRecoverer queueTransactionRecoverer = new LocalTxQueueTransactionRecoverer(txLog, createQueueProvider(outQueue));
        queueTransactionRecoverer.recover();
        Serializable muleEvent = outQueue.poll(10);
        assertThat(muleEvent, nullValue());
    }

    @Test
    public void offerAndFailBetweenRealOfferAndCommitThenRecover() throws Exception
    {
        ((DefaultMuleConfiguration) muleContext.getConfiguration()).setWorkingDirectory(temporaryFolder.getRoot().getAbsolutePath());
        TestTransactionLogger txLog = new TestTransactionLogger(temporaryFolder.getRoot().getAbsolutePath(), muleContext).failDuringLogCommit();
        final DefaultQueueStore outQueue = new DefaultQueueStore(QUEUE_NAME, muleContext, new DefaultQueueConfiguration(0, true));
        PersistentQueueTransactionContext persistentTransactionContext = new PersistentQueueTransactionContext(txLog, createQueueProvider(outQueue));
        MuleEvent testEvent = getTestEvent(MESSAGE_CONTENT);
        persistentTransactionContext.offer(outQueue, testEvent, 10);
        try
        {
            persistentTransactionContext.doCommit();
            fail();
        }
        catch (ResourceManagerException e)
        {
            //expected
        }
        txLog.close();
        txLog = new TestTransactionLogger(temporaryFolder.getRoot().getAbsolutePath(), muleContext);
        LocalTxQueueTransactionRecoverer queueTransactionRecoverer = new LocalTxQueueTransactionRecoverer(txLog, createQueueProvider(outQueue));
        queueTransactionRecoverer.recover();
        Serializable muleEvent = outQueue.poll(10);
        assertThat(muleEvent, nullValue());
    }

}
TOP

Related Classes of org.mule.util.queue.LocalTxQueueTransactionRecovererTestCase

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.