/*
* cc.redberry.pipe: java library for implementation of concurrent pipelines
* Copyright (c) 2010-2012
* Bolotin Dmitriy <bolotin.dmitriy@gmail.com>
*
* This program is free software: you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the Free
* Software Foundation, either version 3 of the License, or (at your option)
* any later version.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
* more details.
*
* You should have received a copy of the GNU General Public License along
* with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package cc.redberry.pipe.blocks;
import cc.redberry.pipe.OutputPort;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* Implementation of {@link cc.redberry.pipe.OutputPort} interface with proper synchronization for close() method.
*
* @param <T> type of stream object
*/
//TODO call _close after null was returned by _take ??
public abstract class AbstractOutputPort<T> implements OutputPort<T> {
protected final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
protected boolean closed = false;
@Override
public final T take() throws InterruptedException {
final Lock lock = readWriteLock.readLock();
lock.lockInterruptibly();
try {
if (closed)
return null;
T value = _take();
//Not synchronized...
if (value == null)
closed = true;
return value;
} finally {
lock.unlock();
}
}
public abstract T _take() throws InterruptedException;
/**
* (!) Do not run this method from inside {@link #_take()} method.
*/
@Override
public final void close() {
final Lock lock = readWriteLock.writeLock();
lock.lock();
try {
closed = true;
_close();
} finally {
lock.unlock();
}
}
public void _close() {
}
public final boolean isClosed() {
final Lock lock = readWriteLock.readLock();
lock.lock();
try {
return closed;
} finally {
lock.unlock();
}
}
}