Package org.axonframework.gae.eventstore

Source Code of org.axonframework.gae.eventstore.GaeSnapshotter

/*
* Copyright (c) 2010-2014. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.axonframework.gae.eventstore;

import com.google.appengine.api.taskqueue.Queue;
import com.google.appengine.api.taskqueue.QueueFactory;
import com.google.appengine.api.taskqueue.TaskOptions;
import org.axonframework.common.io.IOUtils;
import org.axonframework.domain.DomainEventMessage;
import org.axonframework.domain.DomainEventStream;
import org.axonframework.domain.GenericDomainEventMessage;
import org.axonframework.eventsourcing.AggregateFactory;
import org.axonframework.eventsourcing.EventSourcedAggregateRoot;
import org.axonframework.eventsourcing.Snapshotter;
import org.axonframework.eventstore.SnapshotEventStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* @author Jettro Coenradie
*/
public class GaeSnapshotter implements Snapshotter, InitializingBean, ApplicationContextAware {

    private static final Logger logger = LoggerFactory.getLogger(GaeSnapshotter.class);

    private SnapshotEventStore eventStore;
    private Map<String, AggregateFactory<?>> aggregateFactories = new ConcurrentHashMap<String, AggregateFactory<?>>();
    private ApplicationContext applicationContext;

    @Autowired
    public GaeSnapshotter(SnapshotEventStore eventStore) {
        this.eventStore = eventStore;
    }

    @Override
    public void scheduleSnapshot(String typeIdentifier, Object aggregateIdentifier) {
        logger.debug("Schedule a new task to create a snapshot for type {} and aggregate {}",
                     typeIdentifier, aggregateIdentifier);

        Queue queue = QueueFactory.getQueue("snapshotter");

        queue.add(TaskOptions.Builder.withUrl("/task/snapshot")
                             .param("typeIdentifier", typeIdentifier)
                             .param("aggregateIdentifier", aggregateIdentifier.toString())
                             .method(TaskOptions.Method.POST)
        );
    }

    public void createSnapshot(String typeIdentifier, String aggregateIdentifier) {
        DomainEventStream eventStream = null;
        try {
            eventStream = eventStore.readEvents(typeIdentifier, aggregateIdentifier);
            DomainEventMessage snapshotEvent = createSnapshot(typeIdentifier, eventStream);
            if (snapshotEvent != null) {
                eventStore.appendSnapshotEvent(typeIdentifier, snapshotEvent);
            }
        } finally {
            IOUtils.closeQuietlyIfCloseable(eventStream);
        }
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        Map<String, AggregateFactory> factories = applicationContext.getBeansOfType(AggregateFactory.class);

        for (AggregateFactory factory : factories.values()) {
            this.aggregateFactories.put(factory.getTypeIdentifier(), factory);
        }
    }

    private DomainEventMessage createSnapshot(String typeIdentifier, DomainEventStream eventStream) {
        AggregateFactory<?> aggregateFactory = aggregateFactories.get(typeIdentifier);

        DomainEventMessage firstEvent = eventStream.peek();
        Object aggregateIdentifier = firstEvent.getAggregateIdentifier();

        EventSourcedAggregateRoot aggregate = aggregateFactory.createAggregate(aggregateIdentifier, firstEvent);
        aggregate.initializeState(eventStream);

        return new GenericDomainEventMessage<EventSourcedAggregateRoot>(
                aggregate.getIdentifier(), aggregate.getVersion(), aggregate);
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }
}
TOP

Related Classes of org.axonframework.gae.eventstore.GaeSnapshotter

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.