Package org.qi4j.library.eventsourcing.domain.source.jdbm

Source Code of org.qi4j.library.eventsourcing.domain.source.jdbm.JdbmEventStoreService

/**
*
* Copyright 2009-2010 Rickard Öberg AB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.qi4j.library.eventsourcing.domain.source.jdbm;

import java.io.File;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.Properties;
import jdbm.RecordManager;
import jdbm.RecordManagerFactory;
import jdbm.RecordManagerOptions;
import jdbm.Serializer;
import jdbm.btree.BTree;
import jdbm.helper.ByteArrayComparator;
import jdbm.helper.DefaultSerializer;
import jdbm.helper.Tuple;
import jdbm.helper.TupleBrowser;
import jdbm.recman.CacheRecordManager;
import org.qi4j.api.activation.Activators;
import org.qi4j.api.injection.scope.Service;
import org.qi4j.api.mixin.Mixins;
import org.qi4j.api.service.ServiceComposite;
import org.qi4j.api.service.qualifier.Tagged;
import org.qi4j.api.value.ValueSerialization;
import org.qi4j.functional.Function;
import org.qi4j.io.Input;
import org.qi4j.io.Output;
import org.qi4j.io.Receiver;
import org.qi4j.io.Sender;
import org.qi4j.io.Transforms;
import org.qi4j.library.eventsourcing.domain.api.UnitOfWorkDomainEventsValue;
import org.qi4j.library.eventsourcing.domain.source.AbstractEventStoreMixin;
import org.qi4j.library.eventsourcing.domain.source.EventManagement;
import org.qi4j.library.eventsourcing.domain.source.EventSource;
import org.qi4j.library.eventsourcing.domain.source.EventStore;
import org.qi4j.library.eventsourcing.domain.source.EventStoreActivation;
import org.qi4j.library.eventsourcing.domain.source.EventStream;
import org.qi4j.library.fileconfig.FileConfiguration;

/**
* JAVADOC
*/
@Mixins( JdbmEventStoreService.JdbmEventStoreMixin.class )
@Activators( EventStoreActivation.Activator.class )
public interface JdbmEventStoreService
    extends EventSource, EventStore, EventStream, EventManagement, EventStoreActivation, ServiceComposite
{

    class JdbmEventStoreMixin
        extends AbstractEventStoreMixin
        implements EventManagement, EventSource
    {
        @Service
        private FileConfiguration fileConfig;

        @Service
        @Tagged( ValueSerialization.Formats.JSON )
        private ValueSerialization valueSerialization;

        private RecordManager recordManager;
        private BTree index;
        private Serializer serializer;
        private File dataFile;

        private long currentCount;

        @Override
        public void activateEventStore()
            throws Exception
        {
            super.activateEventStore();
            dataFile = new File( fileConfig.dataDirectory(), identity.identity() + "/events" );
            File directory = dataFile.getAbsoluteFile().getParentFile();
            directory.mkdirs();
            String name = dataFile.getAbsolutePath();
            Properties properties = new Properties();
            properties.put( RecordManagerOptions.AUTO_COMMIT, "false" );
            properties.put( RecordManagerOptions.DISABLE_TRANSACTIONS, "false" );
            initialize( name, properties );
        }

        @Override
        public void passivateEventStore()
                throws Exception
        {
            super.passivateEventStore();
            recordManager.close();
        }

        @Override
        public Output<String, IOException> restore()
        {
            // Commit every 1000 events, convert from string to value, and then store. Put a lock around the whole thing
            Output<String, IOException> map = Transforms.map( new Transforms.ProgressLog<String>( 1000 )
            {
                @Override
                protected void logProgress()
                {
                    try
                    {
                        recordManager.commit(); // Commit every 1000 transactions to avoid OutOfMemory issues
                    }
                    catch( IOException e )
                    {
                        throw new IllegalStateException( "Could not commit data", e );
                    }
                }
            }, Transforms.map( new Function<String, UnitOfWorkDomainEventsValue>()
            {
                @Override
                public UnitOfWorkDomainEventsValue map( String item )
                {
                    return valueSerialization.<UnitOfWorkDomainEventsValue>deserialize( eventsType, item );
                }
            }, storeEvents0() ) );

            return Transforms.lock( JdbmEventStoreMixin.this.lock,
                                    map );
        }

        // EventStore implementation
        @Override
        public Input<UnitOfWorkDomainEventsValue, IOException> events( final long offset, long limit )
        {
            return new Input<UnitOfWorkDomainEventsValue, IOException>()
            {
                @Override
                public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super UnitOfWorkDomainEventsValue, ReceiverThrowableType> output )
                    throws IOException, ReceiverThrowableType
                {
                    output.receiveFrom( new Sender<UnitOfWorkDomainEventsValue, IOException>()
                    {
                        @Override
                        public <ReceiverThrowableType extends Throwable> void sendTo( Receiver<? super UnitOfWorkDomainEventsValue, ReceiverThrowableType> receiver )
                            throws ReceiverThrowableType, IOException
                        {
                            // Lock datastore first
                            lock();

                            try
                            {
                                final TupleBrowser browser = index.browse( offset + 1 );

                                Tuple tuple = new Tuple();

                                while( browser.getNext( tuple ) )
                                {
                                    // Get next transaction
                                    UnitOfWorkDomainEventsValue domainEvents = readTransactionEvents( tuple );

                                    receiver.receive( domainEvents );
                                }
                            }
                            catch( Exception e )
                            {
                                logger.warn( "Could not iterate events", e );
                            }
                            finally
                            {
                                lock.unlock();
                            }
                        }
                    } );
                }
            };
        }

        @Override
        public long count()
        {
            return currentCount;
        }

        @Override
        protected Output<UnitOfWorkDomainEventsValue, IOException> storeEvents0()
        {
            return new Output<UnitOfWorkDomainEventsValue, IOException>()
            {
                @Override
                public <SenderThrowableType extends Throwable> void receiveFrom( Sender<? extends UnitOfWorkDomainEventsValue, SenderThrowableType> sender )
                    throws IOException, SenderThrowableType
                {
                    try
                    {
                        sender.sendTo( new Receiver<UnitOfWorkDomainEventsValue, IOException>()
                        {
                            @Override
                            public void receive( UnitOfWorkDomainEventsValue item )
                                throws IOException
                            {
                                String jsonString = valueSerialization.serialize( item );
                                currentCount++;
                                index.insert( currentCount, jsonString.getBytes( "UTF-8" ), false );
                            }
                        } );
                        recordManager.commit();
                    }
                    catch( IOException e )
                    {
                        recordManager.rollback();
                        throw e;
                    }
                    catch( Throwable e )
                    {
                        recordManager.rollback();
                        throw (SenderThrowableType) e;
                    }
                }
            };
        }

        private void initialize( String name, Properties properties )
            throws IOException
        {
            recordManager = RecordManagerFactory.createRecordManager( name, properties );
            serializer = DefaultSerializer.INSTANCE;
            recordManager = new CacheRecordManager( recordManager, 1000, false );
            long recid = recordManager.getNamedObject( "index" );
            if( recid != 0 )
            {
                index = BTree.load( recordManager, recid );
                currentCount = index.size();
            }
            else
            {
                ByteArrayComparator comparator = new ByteArrayComparator();
                index = BTree.createInstance( recordManager, comparator, serializer, DefaultSerializer.INSTANCE, 16 );
                recordManager.setNamedObject( "index", index.getRecid() );
                currentCount = 0;
            }
            recordManager.commit();
        }

        private UnitOfWorkDomainEventsValue readTransactionEvents( Tuple tuple )
            throws UnsupportedEncodingException
        {
            byte[] eventData = (byte[]) tuple.getValue();
            String eventJson = new String( eventData, "UTF-8" );
            return valueSerialization.<UnitOfWorkDomainEventsValue>deserialize( eventsType, eventJson );
        }
    }
}
TOP

Related Classes of org.qi4j.library.eventsourcing.domain.source.jdbm.JdbmEventStoreService

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.