Package cc.redberry.pipe.blocks

Source Code of cc.redberry.pipe.blocks.AbstractOutputPort

/*
* cc.redberry.pipe: java library for implementation of concurrent pipelines
* Copyright (c) 2010-2012
* Bolotin Dmitriy <bolotin.dmitriy@gmail.com>
*
* This program is free software: you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the Free
* Software Foundation, either version 3 of the License, or (at your option)
* any later version.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
* more details.
*
* You should have received a copy of the GNU General Public License along
* with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package cc.redberry.pipe.blocks;

import cc.redberry.pipe.OutputPort;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
* Implementation of {@link cc.redberry.pipe.OutputPort} interface with proper synchronization for close() method.
*
* @param <T> type of stream object
*/
//TODO call _close after null was returned by _take ??
public abstract class AbstractOutputPort<T> implements OutputPort<T> {
    protected final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    protected boolean closed = false;

    @Override
    public final T take() throws InterruptedException {
        final Lock lock = readWriteLock.readLock();
        lock.lockInterruptibly();
        try {
            if (closed)
                return null;
            T value = _take();

            //Not synchronized...
            if (value == null)
                closed = true;

            return value;
        } finally {
            lock.unlock();
        }
    }

    public abstract T _take() throws InterruptedException;

    /**
     * (!) Do not run this method from inside {@link #_take()} method.
     */
    @Override
    public final void close() {
        final Lock lock = readWriteLock.writeLock();
        lock.lock();
        try {
            closed = true;
            _close();
        } finally {
            lock.unlock();
        }
    }

    public void _close() {
    }

    public final boolean isClosed() {
        final Lock lock = readWriteLock.readLock();
        lock.lock();
        try {
            return closed;
        } finally {
            lock.unlock();
        }
    }
}
TOP

Related Classes of cc.redberry.pipe.blocks.AbstractOutputPort

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.