Package org.apache.sirona.cube

Source Code of org.apache.sirona.cube.DisruptorPathTrackingDataStore$PathTrackingEntryEventHandler

/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements.  See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License.  You may obtain a copy of the License at
*
*      http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.sirona.cube;

import com.lmax.disruptor.BusySpinWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.EventTranslator;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import org.apache.sirona.configuration.Configuration;
import org.apache.sirona.configuration.ioc.AutoSet;
import org.apache.sirona.configuration.ioc.Created;
import org.apache.sirona.configuration.ioc.Destroying;
import org.apache.sirona.configuration.ioc.IoCs;
import org.apache.sirona.store.tracking.BatchPathTrackingDataStore;
import org.apache.sirona.store.tracking.CollectorPathTrackingDataStore;
import org.apache.sirona.tracking.PathTrackingEntry;
import org.apache.sirona.util.SerializeUtils;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
*
*/
@AutoSet
public class DisruptorPathTrackingDataStore
    extends BatchPathTrackingDataStore
    implements CollectorPathTrackingDataStore
{
    private static final Cube CUBE = IoCs.findOrCreateInstance( CubeBuilder.class ).build();

    private static boolean USE_SINGLE_STORE = Boolean.parseBoolean(
        Configuration.getProperty( Configuration.CONFIG_PROPERTY_PREFIX + "pathtracking.singlestore", "false" ) );

    private RingBuffer<PathTrackingEntry> ringBuffer;

    private Disruptor<PathTrackingEntry> disruptor;

    private int ringBufferSize = 4096;

    private int numberOfConsumers = 4;

    @Created
    public void initialize()
    {
        ExecutorService exec = Executors.newCachedThreadPool();

        // FIXME make configurable: WaitStrategy

        disruptor = new Disruptor<PathTrackingEntry>( new EventFactory<PathTrackingEntry>()
        {
            @Override
            public PathTrackingEntry newInstance()
            {
                return new PathTrackingEntry();
            }
        }, ringBufferSize, exec, ProducerType.SINGLE, new BusySpinWaitStrategy()
        );

        for ( int i = 0; i < numberOfConsumers; i++ )
        {
            disruptor.handleEventsWith( new PathTrackingEntryEventHandler( i, numberOfConsumers ) );
        }
        ringBuffer = disruptor.start();

    }

    private static class PathTrackingEntryEventHandler
        implements EventHandler<PathTrackingEntry>
    {

        private final long ordinal;

        private final long numberOfConsumers;

        public PathTrackingEntryEventHandler( final long ordinal, final long numberOfConsumers )
        {
            this.ordinal = ordinal;
            this.numberOfConsumers = numberOfConsumers;
        }

        public void onEvent( final PathTrackingEntry entry, final long sequence, final boolean endOfBatch )
            throws Exception
        {
            if ( ( sequence % numberOfConsumers ) == ordinal )
            {
                CUBE.doPostBytes( SerializeUtils.serialize( entry ), PathTrackingEntry.class.getName() );
            }
        }

    }

    @Override
    public void store( final PathTrackingEntry pathTrackingEntry )
    {

        ringBuffer.publishEvent( new EventTranslator<PathTrackingEntry>()
        {
            @Override
            public void translateTo( PathTrackingEntry event, long sequence )
            {
                event.setClassName( pathTrackingEntry.getClassName() );
                event.setExecutionTime( pathTrackingEntry.getExecutionTime() );
                event.setLevel( pathTrackingEntry.getLevel() );
                event.setMethodName( pathTrackingEntry.getMethodName() );
                event.setNodeId( pathTrackingEntry.getNodeId() );
                event.setStartTime( pathTrackingEntry.getStartTime() );
                event.setTrackingId( pathTrackingEntry.getTrackingId() );
            }
        } );
    }

    @Override
    protected void pushEntriesByBatch( Map<String, List<Pointer>> pathTrackingEntries )
    {
        if ( !USE_SINGLE_STORE )
        {

            for ( Map.Entry<String, List<Pointer>> entry : pathTrackingEntries.entrySet() )
            {
                for ( Pointer pointer : entry.getValue() )
                {
                    if ( !pointer.isFree() )
                    {
                        CUBE.doPostBytes( readBytes( pointer ), PathTrackingEntry.class.getName() );
                        pointer.freeMemory();
                    }
                }
            }
        }
    }

    public RingBuffer<PathTrackingEntry> getRingBuffer()
    {
        return ringBuffer;
    }

    public void setRingBuffer( RingBuffer<PathTrackingEntry> ringBuffer )
    {
        this.ringBuffer = ringBuffer;
    }

    public int getNumberOfConsumers()
    {
        return numberOfConsumers;
    }

    public void setNumberOfConsumers( int numberOfConsumers )
    {
        this.numberOfConsumers = numberOfConsumers;
    }

    public int getRingBufferSize()
    {
        return ringBufferSize;
    }

    public void setRingBufferSize( int ringBufferSize )
    {
        this.ringBufferSize = ringBufferSize;
    }

    @Destroying
    public void destroy()
    {
        // FIXME timeout??
        disruptor.shutdown();
    }


}
TOP

Related Classes of org.apache.sirona.cube.DisruptorPathTrackingDataStore$PathTrackingEntryEventHandler

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.