Package io.fabric8.api.permit

Source Code of io.fabric8.api.permit.DefaultPermitManager$PermitState

/**
*  Copyright 2005-2014 Red Hat, Inc.
*
*  Red Hat licenses this file to you under the Apache License, version
*  2.0 (the "License"); you may not use this file except in compliance
*  with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
*  Unless required by applicable law or agreed to in writing, software
*  distributed under the License is distributed on an "AS IS" BASIS,
*  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
*  implied.  See the License for the specific language governing
*  permissions and limitations under the License.
*/
package io.fabric8.api.permit;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;

import org.jboss.gravia.utils.IllegalArgumentAssertion;
import org.jboss.gravia.utils.IllegalStateAssertion;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* The default implementation of a {@link PermitManager}
*
* @author thomas.diesler@jboss.com
* @since 05-Mar-2014
*/
public final class DefaultPermitManager implements PermitManager {

    public static long DEFAULT_TIMEOUT = 60000L;

    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultPermitManager.class);
    private final Map<PermitKey<?>, PermitState<?>> permits = new HashMap<PermitKey<?>, PermitState<?>>();

    @Override
    public <T> void activate(PermitKey<T> key, T instance) {
        getPermitState(key).activate(instance);
    }

    @Override
    public void deactivate(PermitKey<?> key) {
        getPermitState(key).deactivate(DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS);
    }

    @Override
    public void deactivate(PermitKey<?> key, long timeout, TimeUnit unit) {
        getPermitState(key).deactivate(timeout, unit);
    }

    @Override
    public <T> Permit<T> aquirePermit(PermitKey<T> key, boolean exclusive) {
        return getPermitState(key).acquire(exclusive, DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS);
    }

    @Override
    public <T> Permit<T> aquirePermit(PermitKey<T> key, boolean exclusive, long timeout, TimeUnit unit) {
        return getPermitState(key).acquire(exclusive, timeout, unit);
    }

    @SuppressWarnings("unchecked")
    private <T> PermitState<T> getPermitState(PermitKey<T> key) {
        IllegalArgumentAssertion.assertNotNull(key, "key");
        synchronized (permits) {
            PermitState<?> permitState = permits.get(key);
            if (permitState == null) {
                permitState = new PermitState<T>(key);
                permits.put(key, permitState);
            }
            return (PermitState<T>) permitState;
        }
    }

    static class PermitState<T> {

        private final Semaphore semaphore = new Semaphore(0);
        private final ReentrantReadWriteLock rwlock = new ReentrantReadWriteLock(true);
        private final AtomicReference<T> activeInstance = new AtomicReference<T>();
        private final AtomicBoolean exclusiveLock = new AtomicBoolean();
        private final AtomicBoolean active = new AtomicBoolean();
        private final PermitKey<T> key;

        PermitState(PermitKey<T> key) {
            this.key = key;
        }

        void activate(T instance) {
            boolean nowactive = active.compareAndSet(false, true);
            IllegalStateAssertion.assertTrue(nowactive, "Cannot activate an already active state");

            LOGGER.debug("activating: {}",  key);

            activeInstance.set(instance);
            semaphore.release(1);
        }

        Permit<T> acquire(final boolean exclusive, long timeout, TimeUnit unit) {

            final String exclstr = exclusive ? " exclusive" : "";
            if (LOGGER.isTraceEnabled()) {
                String timestr = unit != null ? " in " + unit.toMillis(timeout) + "ms" : "";
                LOGGER.trace("aquiring" + exclstr + timestr + ": {}", key);
            }

            // A Permit has a single semaphore permit - get it
            // Once we pass this, no other thread can get passed here
            // Same happens in deactivate, so no two threads can activate/deactivate at the same time
            getSinglePermit(timeout, unit);

            // Get a reference to the instance that is currently
            // associated with this Permit
            final T instance = activeInstance.get();

            final Lock lock;
            if (exclusive) {
                // For an exclusive Permit we aquire a write-lock
                // This will block until all readers/writers have returned their locks
                // We do not release the semaphore permit, this will get done when the exclusive Permit is released
                lock = writeLock(timeout, unit);
                exclusiveLock.set(true);
            } else {
                // For an non-exclusive Permit we aquire a read-lock
                // This will block until a writer returns its lock
                // Multiple readers can get passed this
                // We do release the semaphore permit
                lock = readLock(timeout, unit);

                // Release the single semahore permit
                semaphore.release(1);
            }

            LOGGER.trace("aquired" + exclstr + ": {}", key);

            return new Permit<T>() {

                @Override
                public PermitKey<T> getPermitKey() {
                    return key;
                }

                @Override
                public T getInstance() {
                    return instance;
                }

                @Override
                public void release() {
                    LOGGER.trace("releasing" + exclstr + ": {}", key);

                    // Always unlock the read/write lock
                    lock.unlock();

                    // Release the semaphore permit for a writer
                    if (exclusive) {
                        exclusiveLock.set(false);
                        semaphore.release(1);
                    }
                }
            };
        }

        void deactivate(long timeout, TimeUnit unit) {

            LOGGER.trace("deactivating: {}",  key);

            // Deactivate on an already inactive Permit has no effect
            if (!active.get()) {
                LOGGER.trace("not active: {}",  key);
                return;
            }

            // Deactivating while holding an exclusive lock
            if (exclusiveLock.get()) {
                LOGGER.debug("deactivated (exclusive): {}",  key);
                active.set(false);
                return;
            }

            // A Permit has a single semaphore permit - get it
            // Once we pass this, no other thread can get passed here
            // Same happens in aquire, so no two threads can activate/deactivate at the same time
            getSinglePermit(timeout, unit);

            // Deactivation requires a write-lock
            // We do not release the semaphore permit - this happens in activate
            try {
                writeLock(timeout, unit).unlock();
                LOGGER.debug("deactivated: {}",  key);
                active.set(false);
            } catch (PermitStateTimeoutException ex) {
                semaphore.release(1);
                throw new PermitStateTimeoutException("Cannot deactivate state [" + key.getName() + "] in time", key, timeout, unit);
            }
        }

        private void getSinglePermit(long timeout, TimeUnit unit) {
            try {
                if (!semaphore.tryAcquire(timeout, unit)) {
                    throw new PermitStateTimeoutException("Cannot aquire permit for [" + key.getName() + "] in time", key, timeout, unit);
                }
            } catch (InterruptedException ex) {
                throw new IllegalStateException(ex);
            }
        }

        private ReadLock readLock(long timeout, TimeUnit unit) {
            ReadLock lock = rwlock.readLock();
            try {
                if (!lock.tryLock() && !lock.tryLock(timeout, unit))
                    throw new PermitStateTimeoutException("Cannot aquire read lock for [" + key.getName() + "] in time", key, timeout, unit);
            } catch (InterruptedException ex) {
                throw new IllegalStateException(ex);
            }
            return lock;
        }

        private WriteLock writeLock(long timeout, TimeUnit unit) {
            WriteLock lock = rwlock.writeLock();
            try {
                if (!lock.tryLock() && !lock.tryLock(timeout, unit))
                    throw new PermitStateTimeoutException("Cannot aquire write lock for [" + key.getName() + "] in time", key, timeout, unit);
            } catch (InterruptedException ex) {
                throw new IllegalStateException(ex);
            }
            return lock;
        }
    }
}
TOP

Related Classes of io.fabric8.api.permit.DefaultPermitManager$PermitState

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.