Package org.apache.tuscany.sca.core.scope

Source Code of org.apache.tuscany.sca.core.scope.ConversationalScopeContainer$ConversationalInstanceReaper

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*   http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied.  See the License for the
* specific language governing permissions and limitations
* under the License.   
*/

package org.apache.tuscany.sca.core.scope;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.apache.tuscany.sca.core.context.InstanceWrapper;
import org.apache.tuscany.sca.core.conversation.ConversationListener;
import org.apache.tuscany.sca.core.conversation.ConversationManager;
import org.apache.tuscany.sca.core.conversation.ExtendedConversation;
import org.apache.tuscany.sca.core.invocation.ThreadMessageContext;
import org.apache.tuscany.sca.invocation.Message;
import org.apache.tuscany.sca.runtime.RuntimeComponent;
import org.apache.tuscany.sca.store.Store;
import org.osoa.sca.ConversationEndedException;

/**
* A scope context which manages atomic component instances keyed on ConversationID
*/
public class ConversationalScopeContainer extends AbstractScopeContainer<Object> implements ConversationListener {
    private ConversationManager conversationManager;
    private Map<Object, InstanceLifeCycleWrapper> instanceLifecycleCollection =
        new ConcurrentHashMap<Object, InstanceLifeCycleWrapper>();

    //TODO: This needs to observe the value set by ConversationalAttributes for now we will hard code it.
    private long max_age = 60 * 60 * 1000; // 1 hour;
    private long max_idle_time = 60 * 60 * 1000; // 1 hour;
    private long reaper_interval = 60; // every minute;
    private ScheduledExecutorService scheduler;

    public ConversationalScopeContainer(Store aStore, RuntimeComponent component) {
        super(Scope.CONVERSATION, component);

        // Note: aStore is here to preserve the original factory interface. It is not currently used in this
        // implementation since we do not support instance persistence.

        // Check System properties to see if timeout values have been specified. All timeout values
        // will be specified in seconds.
        //
        String aProperty;
        aProperty = System.getProperty("org.apache.tuscany.sca.core.scope.ConversationalScopeContainer.MaxIdleTime");
        if (aProperty != null) {
            try {
                max_idle_time = (new Long(aProperty) * 1000);
            } catch (NumberFormatException nfe) {
                // Ignore
            }
        }

        aProperty = System.getProperty("org.apache.tuscany.sca.core.scope.ConversationalScopeContainer.MaxAge");
        if (aProperty != null) {
            try {
                max_age = (new Long(aProperty) * 1000);
            } catch (NumberFormatException nfe) {
                // Ignore
            }
        }

        aProperty = System.getProperty("org.apache.tuscany.sca.core.scope.ConversationalScopeContainer.ReaperInterval");
        if (aProperty != null) {
            try {
                reaper_interval = new Long(aProperty);
            } catch (NumberFormatException nfe) {
                // Ignore
            }
        }

        // Check to see if the maxAge and/or maxIdleTime have been specified using @ConversationAttributes. 
        // Implementation annoated attributes are honored first.
        if (this.getComponent().getImplementationProvider() instanceof ScopedImplementationProvider) {
            ScopedImplementationProvider aScopedImpl =
                (ScopedImplementationProvider)this.getComponent().getImplementationProvider();

            long maxAge = aScopedImpl.getMaxAge();
            if (maxAge > 0) {
                max_age = maxAge;
            }
            long maxIdleTime = aScopedImpl.getMaxIdleTime();
            if (maxIdleTime > 0) {
                max_idle_time = maxIdleTime;
            }
        }

    }

    @Override
    public synchronized void start() {
        if (lifecycleState != UNINITIALIZED && lifecycleState != STOPPED) {
            throw new IllegalStateException("Scope must be in UNINITIALIZED or STOPPED state [" + lifecycleState + "]");
        }

        // Get a scheduler and scheduled a task to be run in the future indefinitely until its explicitly shutdown.
        this.scheduler = Executors.newSingleThreadScheduledExecutor();
        scheduler.scheduleAtFixedRate(new ConversationalInstanceReaper(this.instanceLifecycleCollection),
                                      3,
                                      reaper_interval,
                                      TimeUnit.SECONDS);

        lifecycleState = RUNNING;
    }

    @Override
    public synchronized void stop() {

        // Prevent the scheduler from submitting any additional reapers, initiate an orderly shutdown if a reaper task is in progress.
        if (this.scheduler != null)
            this.scheduler.shutdown();

        lifecycleState = STOPPED;
    }

    protected InstanceWrapper getInstanceWrapper(boolean create, Object contextId) throws TargetResolutionException {

        // we might get a null context if the target service has
        // conversational scope but only its callback interface
        // is conversational. In this case we need to invent a
        // conversation Id here to store the service against
        // and populate the thread context
        if (contextId == null) {
            contextId = UUID.randomUUID().toString();
            Message msgContext = ThreadMessageContext.getMessageContext();

            if (msgContext != null) {
                msgContext.getTo().getReferenceParameters().setConversationID(contextId);
            }
        }

        InstanceLifeCycleWrapper anInstanceWrapper = this.instanceLifecycleCollection.get(contextId);

        if (anInstanceWrapper == null && !create)
            return null;

        if (anInstanceWrapper == null) {
            anInstanceWrapper = new InstanceLifeCycleWrapper(contextId);
            this.instanceLifecycleCollection.put(contextId, anInstanceWrapper);
        }
        // If an existing instance is found return it only if its not expired and update its
        // last referenced time.
        else {
            if (anInstanceWrapper.isExpired())
                throw new ConversationEndedException();
            anInstanceWrapper.updateLastReferencedTime();
        }

        return anInstanceWrapper.getInstanceWrapper(contextId);

    }

    @Override
    public InstanceWrapper getWrapper(Object contextId) throws TargetResolutionException {
        return getInstanceWrapper(true, contextId);
    }

    /**
     * This method allows a new context id to be registered alongside an existing one. This happens in
     * one case, when a conversation includes a stateful callback. The client component instance
     * must be registered against all outgoing conversation ids so that the component instance
     * can be found when the callback arrives
     *
     * @param existingContextId the context id against which the component is already registered
     * @param context this should be a conversation object so that the conversation can b stored
     *                and reset when the component instance is removed
     */
    public void addWrapperReference(Object existingContextId, Object contextId) throws TargetResolutionException {
        // get the instance wrapper via the existing id
        InstanceLifeCycleWrapper existingInstanceWrapper = this.instanceLifecycleCollection.get(existingContextId);
        InstanceLifeCycleWrapper newInstanceWrapper = this.instanceLifecycleCollection.get(contextId);

        // only add the extra reference once
        if (newInstanceWrapper == null) {
            // add the id to the list of ids that the wrapper holds. Used for reference
            // counting and conversation resetting on destruction.
            existingInstanceWrapper.addCallbackConversation(contextId);

            // add the reference to the collection
            this.instanceLifecycleCollection.put(contextId, existingInstanceWrapper);
        }
    }

    public void registerWrapper(InstanceWrapper wrapper, Object contextId) throws TargetResolutionException {
        // if a wrapper for a different instance is already registered for this contextId, remove it
        InstanceLifeCycleWrapper anInstanceWrapper = this.instanceLifecycleCollection.get(contextId);
        if (anInstanceWrapper != null) {
            if (anInstanceWrapper.getInstanceWrapper(contextId).getInstance() != wrapper.getInstance()) {
                remove(contextId);
            } else {
                return;
            }
        }

        anInstanceWrapper = new InstanceLifeCycleWrapper(wrapper, contextId);
        this.instanceLifecycleCollection.put(contextId, anInstanceWrapper);
    }

    // The remove is invoked when a conversation is explicitly ended.  This can occur by using the @EndsConversation or API. 
    // In this case the instance is immediately removed.  A new conversation will be started on the next operation
    // associated with this conversationId's service reference.
    //
    @Override
    public void remove(Object contextId) throws TargetDestructionException {
        if (contextId != null) {
            if (this.instanceLifecycleCollection.containsKey(contextId)) {
                InstanceLifeCycleWrapper anInstanceLifeCycleWrapper = this.instanceLifecycleCollection.get(contextId);
                this.instanceLifecycleCollection.remove(contextId);
                anInstanceLifeCycleWrapper.removeInstanceWrapper(contextId);
            }
        }
    }

    /*
     *  This is an inner class that keeps track of the lifecycle of a conversation scoped
     *  implementation instance.  
     *
     */

    private class InstanceLifeCycleWrapper {
        private Object clientConversationId;
        private List<Object> callbackConversations = new ArrayList<Object>();
        private long creationTime;
        private long lastReferencedTime;
        private long expirationInterval;
        private long maxIdleTime;

        private InstanceLifeCycleWrapper(Object contextId) throws TargetResolutionException {
            this.clientConversationId = contextId;
            this.creationTime = System.currentTimeMillis();
            this.lastReferencedTime = this.creationTime;
            this.expirationInterval = max_age;
            this.maxIdleTime = max_idle_time;
            this.createInstance(contextId);
        }

        private InstanceLifeCycleWrapper(InstanceWrapper wrapper, Object contextId) throws TargetResolutionException {
            this.clientConversationId = contextId;
            this.creationTime = System.currentTimeMillis();
            this.lastReferencedTime = this.creationTime;
            this.expirationInterval = max_age;
            this.maxIdleTime = max_idle_time;
            wrappers.put(contextId, wrapper);
        }

        private boolean isExpired() {
            long currentTime = System.currentTimeMillis();
            if ((this.lastReferencedTime + this.maxIdleTime) < currentTime) // max idle time exceeded
                return true;
            if ((this.creationTime + this.expirationInterval) < currentTime) // max time to live exceeded
                return true;

            return false;
        }

        private void updateLastReferencedTime() {
            this.lastReferencedTime = System.currentTimeMillis();
        }

        // Associates a callback conversation with this instance. Each time the scope container
        // is asked to remove an object given a ontextId an associated conversation object will
        // have its conversationId reset to null. When the list of ids is empty the component instance
        // will be removed from the scope container
        private void addCallbackConversation(Object conversationID) {
            InstanceWrapper ctx = getInstanceWrapper(clientConversationId);
            callbackConversations.add(conversationID);
            wrappers.put(conversationID, ctx);
        }

        //
        // Return the backing implementation instance 
        //
        private InstanceWrapper getInstanceWrapper(Object contextId) {
            InstanceWrapper ctx = wrappers.get(contextId);
            return ctx;
        }

        private void removeInstanceWrapper(Object contextId) throws TargetDestructionException {
            InstanceWrapper ctx = getInstanceWrapper(contextId);
            wrappers.remove(contextId);

            // find out if we are dealing with the original client conversation id
            // and reset accordingly
            if (clientConversationId.equals(contextId)) {
                clientConversationId = null;
            } else {
                // reset the conversationId in the conversation object if present
                // so that and ending callback causes the conversation in the originating
                // service reference in the client to be reset
                callbackConversations.remove(contextId);
            }

            // stop the component if this removes the last reference
            if (clientConversationId == null && callbackConversations.isEmpty()) {
                ctx.stop();
            }
        }

        private void createInstance(Object contextId) throws TargetResolutionException {
            InstanceWrapper instanceWrapper = createInstanceWrapper();
            instanceWrapper.start();
            wrappers.put(contextId, instanceWrapper);
        }

    }

    //
    // This inner class is an instance reaper.  It periodically iterates over the InstanceLifeCycleCollection
    // and for any instances that have expired removes the backing instance and the entry in the InstanceLifeCycle
    // Collection.
    //
    class ConversationalInstanceReaper implements Runnable {
        private Map<Object, InstanceLifeCycleWrapper> instanceLifecycleCollection;

        public ConversationalInstanceReaper(Map<Object, InstanceLifeCycleWrapper> aMap) {
            this.instanceLifecycleCollection = aMap;
        }

        public void run() {
            Iterator<Map.Entry<Object, InstanceLifeCycleWrapper>> anIterator =
                this.instanceLifecycleCollection.entrySet().iterator();

            while (anIterator.hasNext()) {
                Map.Entry<Object, InstanceLifeCycleWrapper> anEntry = anIterator.next();
                InstanceLifeCycleWrapper anInstanceLifeCycleWrapper = anEntry.getValue();
                if (anInstanceLifeCycleWrapper.isExpired()) {
                    try {
                        // cycle through all the references to this instance and
                        // remove them from the underlying wrappers collection and
                        // from the lifecycle wrappers collection
                        for (Object conversationID : anInstanceLifeCycleWrapper.callbackConversations) {
                            anInstanceLifeCycleWrapper.removeInstanceWrapper(conversationID);
                            this.instanceLifecycleCollection.remove(conversationID);
                        }

                        if (anInstanceLifeCycleWrapper.clientConversationId != null) {
                            anInstanceLifeCycleWrapper
                                .removeInstanceWrapper(anInstanceLifeCycleWrapper.clientConversationId);
                            this.instanceLifecycleCollection.remove(anInstanceLifeCycleWrapper.clientConversationId);
                        }
                    } catch (Exception ex) {
                        // TODO - what to do with any asynchronous exceptions?
                    }
                }
            }
        }
    }

    /**
     * @see org.apache.tuscany.sca.core.conversation.ConversationListener#conversationEnded(org.apache.tuscany.sca.core.conversation.ExtendedConversation)
     */
    public void conversationEnded(ExtendedConversation conversation) {
        stopContext(conversation.getConversationID());
    }

    /**
     * @see org.apache.tuscany.sca.core.conversation.ConversationListener#conversationExpired(org.apache.tuscany.sca.core.conversation.ExtendedConversation)
     */
    public void conversationExpired(ExtendedConversation conversation) {
    }

    /**
     * @see org.apache.tuscany.sca.core.conversation.ConversationListener#conversationStarted(org.apache.tuscany.sca.core.conversation.ExtendedConversation)
     */
    public void conversationStarted(ExtendedConversation conversation) {
        startContext(conversation.getConversationID());
    }

    /**
     * @return the conversationManager
     */
    public ConversationManager getConversationManager() {
        return conversationManager;
    }

    /**
     * @param conversationManager the conversationManager to set
     */
    public void setConversationManager(ConversationManager conversationManager) {
        this.conversationManager = conversationManager;
    }

}
TOP

Related Classes of org.apache.tuscany.sca.core.scope.ConversationalScopeContainer$ConversationalInstanceReaper

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.