Package com.lmax.disruptor.dsl

Source Code of com.lmax.disruptor.dsl.ConsumerRepositoryTest

/*
* Copyright 2011 LMAX Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.lmax.disruptor.dsl;

import com.lmax.disruptor.EventProcessor;
import com.lmax.disruptor.Sequence;
import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.dsl.stubs.SleepingEventHandler;
import com.lmax.disruptor.support.TestEvent;
import org.jmock.Expectations;
import org.jmock.Mockery;
import org.junit.Before;
import org.junit.Test;

import static org.hamcrest.CoreMatchers.*;
import static org.junit.Assert.*;

public class ConsumerRepositoryTest
{
    private final Mockery mockery = new Mockery();

    private ConsumerRepository<TestEvent> consumerRepository;
    private EventProcessor eventProcessor1;
    private EventProcessor eventProcessor2;
    private SleepingEventHandler handler1;
    private SleepingEventHandler handler2;
    private SequenceBarrier barrier1;
    private SequenceBarrier barrier2;

    @Before
    public void setUp() throws Exception
    {
        consumerRepository = new ConsumerRepository<TestEvent>();
        eventProcessor1 = mockery.mock(EventProcessor.class, "eventProcessor1");
        eventProcessor2 = mockery.mock(EventProcessor.class, "eventProcessor2");

        final Sequence sequence1 = new Sequence();
        final Sequence sequence2 = new Sequence();
        mockery.checking(new Expectations()
        {
            {
                allowing(eventProcessor1).getSequence();
                will(returnValue(sequence1));

                allowing(eventProcessor1).isRunning();
                will(returnValue(true));

                allowing(eventProcessor2).getSequence();
                will(returnValue(sequence2));

                allowing(eventProcessor2).isRunning();
                will(returnValue(true));
            }
        });
        handler1 = new SleepingEventHandler();
        handler2 = new SleepingEventHandler();

        barrier1 = mockery.mock(SequenceBarrier.class, "barrier1");
        barrier2 = mockery.mock(SequenceBarrier.class, "barrier2");
    }

    @Test
    public void shouldGetBarrierByHandler() throws Exception
    {
        consumerRepository.add(eventProcessor1, handler1, barrier1);

        assertThat(consumerRepository.getBarrierFor(handler1), sameInstance(barrier1));
    }

    @Test
    public void shouldReturnNullForBarrierWhenHandlerIsNotRegistered() throws Exception
    {
        assertThat(consumerRepository.getBarrierFor(handler1), is(nullValue()));
    }

    @Test
    public void shouldGetLastEventProcessorsInChain() throws Exception
    {
        consumerRepository.add(eventProcessor1, handler1, barrier1);
        consumerRepository.add(eventProcessor2, handler2, barrier2);

        consumerRepository.unMarkEventProcessorsAsEndOfChain(eventProcessor2.getSequence());


        final Sequence[] lastEventProcessorsInChain = consumerRepository.getLastSequenceInChain(true);
        assertThat(lastEventProcessorsInChain.length, equalTo(1));
        assertThat(lastEventProcessorsInChain[0], sameInstance(eventProcessor1.getSequence()));
    }

    @Test
    public void shouldRetrieveEventProcessorForHandler() throws Exception
    {
        consumerRepository.add(eventProcessor1, handler1, barrier1);

        assertThat(consumerRepository.getEventProcessorFor(handler1), sameInstance(eventProcessor1));
    }

    @Test(expected = IllegalArgumentException.class)
    public void shouldThrowExceptionWhenHandlerIsNotRegistered() throws Exception
    {
        consumerRepository.getEventProcessorFor(new SleepingEventHandler());
    }

    @Test
    public void shouldIterateAllEventProcessors() throws Exception
    {
        consumerRepository.add(eventProcessor1, handler1, barrier1);
        consumerRepository.add(eventProcessor2, handler2, barrier2);

        boolean seen1 = false;
        boolean seen2 = false;
        for (ConsumerInfo testEntryEventProcessorInfo : consumerRepository)
        {
            final EventProcessorInfo<?> eventProcessorInfo = (EventProcessorInfo<?>) testEntryEventProcessorInfo;
            if (!seen1 && eventProcessorInfo.getEventProcessor() == eventProcessor1 &&
                eventProcessorInfo.getHandler() == handler1)
            {
                seen1 = true;
            }
            else if (!seen2 && eventProcessorInfo.getEventProcessor() == eventProcessor2 &&
                     eventProcessorInfo.getHandler() == handler2)
            {
                seen2 = true;
            }
            else
            {
                fail("Unexpected eventProcessor info: " + testEntryEventProcessorInfo);
            }
        }

        assertTrue("Included eventProcessor 1", seen1);
        assertTrue("Included eventProcessor 2", seen2);
    }
}
TOP

Related Classes of com.lmax.disruptor.dsl.ConsumerRepositoryTest

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.