Package com.lmax.disruptor.example

Source Code of com.lmax.disruptor.example.TwoDisruptors

package com.lmax.disruptor.example;

import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;

public class TwoDisruptors
{
    private static class ValueEvent<T>
    {
        private T t;

        public T get()
        {
            return t;
        }

        public void set(final T t)
        {
            this.t = t;
        }
    }

    private static class Translator<T> implements EventTranslatorOneArg<ValueEvent<T>, ValueEvent<T>>
    {
        @Override
        public void translateTo(final ValueEvent<T> event, final long sequence, final ValueEvent<T> arg0)
        {
            event.set(arg0.get());
        }
    }

    private static class ValueEventHandler<T> implements EventHandler<ValueEvent<T>>
    {
        private final RingBuffer<ValueEvent<T>> ringBuffer;
        private final Translator<T> translator = new Translator<T>();

        public ValueEventHandler(final RingBuffer<ValueEvent<T>> ringBuffer)
        {
            this.ringBuffer = ringBuffer;
        }

        @Override
        public void onEvent(final ValueEvent<T> event, final long sequence, final boolean endOfBatch) throws Exception
        {
            ringBuffer.publishEvent(translator, event);
        }

        public static <T> EventFactory<ValueEvent<T>> factory()
        {
            return new EventFactory<ValueEvent<T>>()
            {
                @Override
                public ValueEvent<T> newInstance()
                {
                    return new ValueEvent<T>();
                }
            };
        }
    }

    @SuppressWarnings("unchecked")
    public static void main(final String[] args)
    {
        final Executor executor = Executors.newFixedThreadPool(2);
        final EventFactory<ValueEvent<String>> factory = ValueEventHandler.factory();

        final Disruptor<ValueEvent<String>> disruptorA =
                new Disruptor<ValueEvent<String>>(
                        factory,
                        1024,
                        executor,
                        ProducerType.MULTI,
                        new BlockingWaitStrategy());

        final Disruptor<ValueEvent<String>> disruptorB =
                new Disruptor<ValueEvent<String>>(
                        factory,
                        1024,
                        executor,
                        ProducerType.SINGLE,
                        new BlockingWaitStrategy());

        final ValueEventHandler<String> handlerA = new ValueEventHandler<String>(disruptorB.getRingBuffer());
        disruptorA.handleEventsWith(handlerA);

        final ValueEventHandler<String> handlerB = new ValueEventHandler<String>(disruptorA.getRingBuffer());
        disruptorB.handleEventsWith(handlerB);

        disruptorA.start();
        disruptorB.start();
    }
}
TOP

Related Classes of com.lmax.disruptor.example.TwoDisruptors

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.