Package org.drools.reteoo

Source Code of org.drools.reteoo.PartitionTaskManager$LeftTupleRetractAction

/*
* Copyright 2008 JBoss Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*      http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.drools.reteoo;

import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference;

import org.drools.common.InternalFactHandle;
import org.drools.common.InternalWorkingMemory;
import org.drools.spi.PropagationContext;

/**
* A class to control the tasks for a given rulebase partition.
* It requires a thread pool that is created in the working
* memory and injected in here.
*
* @author <a href="mailto:tirelli@post.com">Edson Tirelli</a>
*/
public class PartitionTaskManager {

    private PartitionTask task = null;
    private AtomicReference<ExecutorService> pool = new AtomicReference<ExecutorService>();

    public PartitionTaskManager( final InternalWorkingMemory workingMemory ) {
        this.task = new PartitionTask( workingMemory );
    }

    /**
     * Sets the thread pool to be used by this partition
     * @param pool
     */
    public void setPool(ExecutorService pool) {
        if( pool != null && this.pool.compareAndSet( null, pool ) ) {
            int size = this.task.queue.size();
            for( int i = 0; i < size; i++ ) {
                this.pool.get().execute( this.task );
            }
        } else {
            this.pool.set( pool );
        }
    }


    /**
     * Adds the given action to the processing queue
     *
     * @param action the action to be processed
     * @return true if the action was successfully added to the processing queue. false otherwise.
     */
    public boolean enqueue( final Action action ) {
        boolean result = this.task.enqueue( action );
        assert result : "result must be true";
        ExecutorService service = this.pool.get();
        ifservice != null ) {
            service.execute( this.task );
        }
        return result;
    }

    /**
     * A worker task that keeps processing the nodes queue.
     * The task uses a non-blocking queue and is re-submitted
     * for execution for each element that is added to the queue.
     */
    public static class PartitionTask implements Runnable {

        // the queue with the nodes that need to be processed
        private Queue<Action> queue;

        // the working memory reference
        private InternalWorkingMemory workingMemory;

        /**
         * Constructor
         *
         * @param workingMemory the working memory reference that is used for node processing
         */
        public PartitionTask( final InternalWorkingMemory workingMemory ) {
            this.queue = new ConcurrentLinkedQueue<Action>();
            this.workingMemory = workingMemory;
        }

        /**
         * Default execution method.
         *
         * @see Runnable
         */
        public void run() {
            Action action = queue.poll();
            if( action != null ) {
                action.execute( workingMemory );
            }
        }

        /**
         * Adds the given action to the processing queue returning true if the action
         * was correctly added or false otherwise.
         *
         * @param action the action to add to the processing queue
         * @return true if the node was successfully added to the queue. false otherwise.
         */
        public boolean enqueue( final Action action ) {
            return this.queue.add( action );
        }
    }

    /**
     * An interface for all actions to be executed by the PartitionTask
     */
    public static interface Action extends Externalizable {
        public abstract void execute( final InternalWorkingMemory workingMemory );
    }

    /**
     * An abstract super class for all handle-related actions
     */
    public static abstract class FactAction implements Action, Externalizable {

        protected InternalFactHandle handle;
        protected PropagationContext context;
        protected ObjectSink         sink;

        public FactAction() {
        }

        public FactAction( final InternalFactHandle handle, final PropagationContext context,
                           final ObjectSink sink ) {
            super();
            this.handle = handle;
            this.context = context;
            this.sink = sink;
        }

        public void readExternal( ObjectInput in ) throws IOException, ClassNotFoundException {
            handle = (InternalFactHandle) in.readObject();
            context = (PropagationContext) in.readObject();
            sink = (ObjectSink) in.readObject();
        }

        public void writeExternal( ObjectOutput out ) throws IOException {
            out.writeObject( handle );
            out.writeObject( context );
            out.writeObject( sink );
        }

        public abstract void execute( final InternalWorkingMemory workingMemory );
    }

    public static class FactAssertAction extends FactAction {
        private static final long serialVersionUID = -8478488926430845209L;

        FactAssertAction() {
        }

        public FactAssertAction( final InternalFactHandle handle, final PropagationContext context,
                                 final ObjectSink sink ) {
            super( handle, context, sink );
        }

        public void execute( final InternalWorkingMemory workingMemory ) {
            sink.assertObject( this.handle, this.context, workingMemory );
        }
    }

    /**
     * An abstract super class for all leftTuple-related actions
     */
    public static abstract class LeftTupleAction implements Action, Externalizable {

        protected LeftTuple          leftTuple;
        protected PropagationContext context;
        protected LeftTupleSink      sink;

        public LeftTupleAction() {
        }

        public LeftTupleAction( final LeftTuple leftTuple, final PropagationContext context,
                           final LeftTupleSink sink ) {
            super();
            this.leftTuple = leftTuple;
            this.context = context;
            this.sink = sink;
        }

        public void readExternal( ObjectInput in ) throws IOException, ClassNotFoundException {
            leftTuple = (LeftTuple) in.readObject();
            context = (PropagationContext) in.readObject();
            sink = (LeftTupleSink) in.readObject();
        }

        public void writeExternal( ObjectOutput out ) throws IOException {
            out.writeObject( leftTuple );
            out.writeObject( context );
            out.writeObject( sink );
        }

        public abstract void execute( final InternalWorkingMemory workingMemory );
    }

    public static class LeftTupleAssertAction extends LeftTupleAction {

        public LeftTupleAssertAction() {
        }
       
        public LeftTupleAssertAction( LeftTuple leftTuple, PropagationContext context, LeftTupleSink sink ) {
            super(leftTuple, context, sink );
        }

        public void execute( InternalWorkingMemory workingMemory ) {
            this.sink.assertLeftTuple( leftTuple, context, workingMemory );
        }
    }


    public static class LeftTupleRetractAction extends LeftTupleAction {
       
        public LeftTupleRetractAction() {
        }

        public LeftTupleRetractAction( LeftTuple leftTuple, PropagationContext context, LeftTupleSink sink ) {
            super(leftTuple, context, sink );
        }

        public void execute( InternalWorkingMemory workingMemory ) {
            this.sink.assertLeftTuple( leftTuple, context, workingMemory );
        }
    }
}
TOP

Related Classes of org.drools.reteoo.PartitionTaskManager$LeftTupleRetractAction

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.