Package org.jboss.as.clustering.ejb3.cache.backing.infinispan

Source Code of org.jboss.as.clustering.ejb3.cache.backing.infinispan.InfinispanBackingCacheEntryStore$Operation

/*
* JBoss, Home of Professional Open Source.
* Copyright 2011, Red Hat Middleware LLC, and individual contributors
* as indicated by the @author tags. See the copyright.txt file in the
* distribution for a full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/

package org.jboss.as.clustering.ejb3.cache.backing.infinispan;

import java.io.IOException;
import java.io.Serializable;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;

import org.infinispan.Cache;
import org.infinispan.affinity.KeyAffinityService;
import org.infinispan.affinity.KeyGenerator;
import org.infinispan.context.Flag;
import org.infinispan.distribution.DataLocality;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryActivated;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryPassivated;
import org.infinispan.notifications.cachelistener.event.CacheEntryActivatedEvent;
import org.infinispan.notifications.cachelistener.event.CacheEntryPassivatedEvent;
import org.infinispan.remoting.transport.Address;
import org.jboss.as.clustering.MarshalledValue;
import org.jboss.as.clustering.MarshalledValueFactory;
import org.jboss.as.clustering.infinispan.affinity.KeyAffinityServiceFactory;
import org.jboss.as.clustering.infinispan.invoker.BatchOperation;
import org.jboss.as.clustering.infinispan.invoker.CacheInvoker;
import org.jboss.as.clustering.lock.SharedLocalYieldingClusterLockManager;
import org.jboss.as.clustering.lock.SharedLocalYieldingClusterLockManager.LockResult;
import org.jboss.as.clustering.lock.TimeoutException;
import org.jboss.as.clustering.registry.Registry;
import org.jboss.as.ejb3.cache.Cacheable;
import org.jboss.as.ejb3.cache.IdentifierFactory;
import org.jboss.as.ejb3.cache.PassivationManager;
import org.jboss.as.ejb3.cache.impl.backing.clustering.ClusteredBackingCacheEntryStoreConfig;
import org.jboss.as.ejb3.cache.spi.BackingCacheEntry;
import org.jboss.as.ejb3.cache.spi.GroupCompatibilityChecker;
import org.jboss.as.ejb3.cache.spi.impl.AbstractBackingCacheEntryStore;
import org.jboss.as.ejb3.component.stateful.StatefulTimeoutInfo;
import org.jboss.ejb.client.Affinity;
import org.jboss.ejb.client.ClusterAffinity;
import org.jboss.ejb.client.NodeAffinity;
import org.jboss.logging.Logger;

/**
* Infinispan-based backing cache entry store.
* @author Paul Ferraro
*/
@Listener
public class InfinispanBackingCacheEntryStore<K extends Serializable, V extends Cacheable<K>, E extends BackingCacheEntry<K, V>, C> extends AbstractBackingCacheEntryStore<K, V, E> implements KeyGenerator<K> {
    private final Logger log = Logger.getLogger(getClass());

    private final SharedLocalYieldingClusterLockManager lockManager;
    private final LockKeyFactory<K> lockKeyFactory;
    private final MarshalledValueFactory<C> valueFactory;
    private final C context;
    private final boolean controlCacheLifecycle;
    private final Cache<K, MarshalledValue<E, C>> cache;
    private final CacheInvoker invoker;
    private final PassivationManager<K, E> passivationManager;
    private final boolean clustered;
    private final Random random = new Random(System.currentTimeMillis());
    private final Registry<String, ?> registry;
    private final IdentifierFactory<K> identifierFactory;
    private final KeyAffinityService<K> affinity;

    public InfinispanBackingCacheEntryStore(Cache<K, MarshalledValue<E, C>> cache, CacheInvoker invoker, IdentifierFactory<K> identifierFactory, KeyAffinityServiceFactory affinityFactory, PassivationManager<K, E> passivationManager, StatefulTimeoutInfo timeout, ClusteredBackingCacheEntryStoreConfig config, boolean controlCacheLifecycle, MarshalledValueFactory<C> valueFactory, C context, SharedLocalYieldingClusterLockManager lockManager, LockKeyFactory<K> lockKeyFactory, Registry<String, ?> registry) {
        super(timeout, config);
        this.cache = cache;
        this.invoker = invoker;
        this.identifierFactory = identifierFactory;
        this.passivationManager = passivationManager;
        this.controlCacheLifecycle = controlCacheLifecycle;
        this.clustered = cache.getCacheConfiguration().clustering().cacheMode().isClustered();
        this.valueFactory = valueFactory;
        this.context = context;
        this.lockManager = this.clustered ? lockManager : null;
        this.lockKeyFactory = lockKeyFactory;
        this.registry = registry;
        this.affinity = affinityFactory.createService(cache, this);
    }

    @Override
    public void start() {
        if (this.controlCacheLifecycle) {
            this.cache.start();
        }
        this.affinity.start();
    }

    @Override
    public void stop() {
        this.affinity.stop();
        if (this.controlCacheLifecycle) {
            this.cache.stop();
        }
    }

    @Override
    public K createIdentifier() {
        return this.affinity.getKeyForAddress(this.cache.getCacheManager().getAddress());
    }

    @Override
    public K getKey() {
        return this.identifierFactory.createIdentifier();
    }

    @Override
    public boolean hasAffinity(K key) {
        DistributionManager dist = this.cache.getAdvancedCache().getDistributionManager();
        if (dist != null) {
            DataLocality locality = dist.getLocality(key);
            return locality.isLocal() || locality.isUncertain();
        }
        return true;
    }

    @Override
    public Affinity getStrictAffinity() {
        return new ClusterAffinity(this.cache.getCacheManager().getClusterName());
    }

    @Override
    public Affinity getWeakAffinity(K key) {
        if (!this.hasAffinity(key)) {
            // Locate nodes on which the cache entry will reside
            List<Address> addresses = this.cache.getAdvancedCache().getDistributionManager().locate(key);
            if (!addresses.contains(this.cache.getCacheManager().getAddress())) {
                // Otherwise choose random node from hash targets
                Map.Entry<String, ?> entry = this.registry.getRemoteEntry(addresses.get(random.nextInt(addresses.size())));
                if (entry != null) {
                    return new NodeAffinity(entry.getKey());
                }
            }
        }
        return new NodeAffinity(this.registry.getLocalEntry().getKey());
    }

    @Override
    public Set<K> insert(E entry) {
        final K id = entry.getId();
        this.trace("insert(%s)", id);

        this.acquireSessionOwnership(id, true);
        try {
            final MarshalledValue<E, C> value = this.marshalEntry(entry);
            Operation<Void> operation = new Operation<Void>() {
                @Override
                public Void invoke(Cache<K, MarshalledValue<E, C>> cache) {
                    cache.getAdvancedCache().withFlags(Flag.SKIP_REMOTE_LOOKUP).put(id, value);
                    return null;
                }
            };
            this.invoke(operation);
        } finally {
            this.releaseSessionOwnership(id, false);
        }
        return Collections.emptySet();
    }

    @Override
    public E get(final K id, boolean lock) {
        this.trace("get(%s. %s)", id, lock);

        if (lock) {
            this.acquireSessionOwnership(id, false);
        }

        Operation<MarshalledValue<E, C>> operation = new Operation<MarshalledValue<E, C>>() {
            @Override
            public MarshalledValue<E, C> invoke(Cache<K, MarshalledValue<E, C>> cache) {
                return cache.get(id);
            }
        };
        return this.unmarshalEntry(id, this.invoke(operation));
    }

    @Override
    public void update(E entry, boolean modified) {
        final K id = entry.getId();
        this.trace("update(%s, %s)", id, modified);
        try {
            if (modified) {
                final MarshalledValue<E, C> value = this.marshalEntry(entry);
                Operation<Void> operation = new Operation<Void>() {
                    @Override
                    public Void invoke(Cache<K, MarshalledValue<E, C>> cache) {
                        cache.getAdvancedCache().withFlags(Flag.SKIP_REMOTE_LOOKUP).put(id, value);
                        return null;
                    }
                };
                this.invoke(operation);
            }
        } finally {
            this.releaseSessionOwnership(id, false);
        }
    }

    @Override
    public E remove(final K id) {
        this.trace("remove(%s)", id);
        Operation<MarshalledValue<E, C>> operation = new Operation<MarshalledValue<E, C>>() {
            @Override
            public MarshalledValue<E, C> invoke(Cache<K, MarshalledValue<E, C>> cache) {
                return cache.remove(id);
            }
        };
        try {
            return this.unmarshalEntry(id, this.invoke(operation));
        } finally {
            this.releaseSessionOwnership(id, true);
        }
    }

    private <R> R invoke(Operation<R> operation) {
        return this.invoker.invoke(this.cache, new BatchOperation<K, MarshalledValue<E, C>, R>(operation));
    }

    private K unmarshalKey(MarshalledValue<K, C> key) {
        try {
            return key.get(this.context);
        } catch (Exception e) {
            throw InfinispanEjbMessages.MESSAGES.deserializationFailure(e, key);
        }
    }

    private MarshalledValue<E, C> marshalEntry(E value) {
        try {
            return this.valueFactory.createMarshalledValue(value);
        } catch (IOException e) {
            throw InfinispanEjbMessages.MESSAGES.serializationFailure(e, value.getId());
        }
    }

    private E unmarshalEntry(K id, MarshalledValue<E, C> value) {
        if (value == null) return null;
        try {
            return value.get(this.context);
        } catch (Exception e) {
            throw InfinispanEjbMessages.MESSAGES.deserializationFailure(e, id);
        }
    }

    private LockResult acquireSessionOwnership(K key, boolean newLock) {
        if (this.lockManager == null) return null;

        Serializable lockKey = this.lockKeyFactory.createLockKey(key);

        this.trace("Acquiring %slock on %s", newLock ? "new " : "", lockKey);

        long timeout = this.cache.getCacheConfiguration().locking().lockAcquisitionTimeout();
        try {
            LockResult result = this.lockManager.lock(lockKey, timeout, newLock);
            this.trace("Lock acquired (%s) on %s", result, lockKey);
            return result;
        } catch (TimeoutException e) {
            throw InfinispanEjbMessages.MESSAGES.lockAcquisitionTimeout(lockKey, timeout);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw InfinispanEjbMessages.MESSAGES.lockAcquisitionInterruption(e, lockKey);
        }
    }

    private void releaseSessionOwnership(K key, boolean remove) {
        if (this.lockManager != null) {
            Serializable lockKey = this.lockKeyFactory.createLockKey(key);
            this.trace("Releasing %slock on %s", remove ? "and removing " : "", lockKey);
            this.lockManager.unlock(lockKey, remove);
            this.trace("Released %slock on %s", remove ? "and removed " : "", lockKey);
        }
    }

    @Override
    public void passivate(final E entry) {
        Operation<Void> operation = new Operation<Void>() {
            @Override
            public Void invoke(Cache<K, MarshalledValue<E, C>> cache) {
                cache.evict(entry.getId());
                return null;
            }
        };
        this.invoker.invoke(this.cache, operation);
    }

    @Override
    public boolean isClustered() {
        return this.clustered;
    }

    @Override
    public boolean isCompatibleWith(GroupCompatibilityChecker other) {
        if (other instanceof InfinispanBackingCacheEntryStore) {
            InfinispanBackingCacheEntryStore<?, ?, ?, ?> store = (InfinispanBackingCacheEntryStore<?, ?, ?, ?>) other;
            return this.cache.getCacheManager() == store.cache.getCacheManager();
        }
        return false;
    }

    @CacheEntryActivated
    public void activated(CacheEntryActivatedEvent<MarshalledValue<K, C>, MarshalledValue<E, C>> event) {
        if ((this.passivationManager != null) && !event.isPre()){
            K key = this.unmarshalKey(event.getKey());
            this.trace("activated(%s)", key);
            this.passivationManager.postActivate(this.unmarshalEntry(key, event.getValue()));
        }
    }

    @CacheEntryPassivated
    public void passivated(CacheEntryPassivatedEvent<MarshalledValue<K, C>, MarshalledValue<E, C>> event) {
        if ((this.passivationManager != null) && event.isPre()) {
            K key = this.unmarshalKey(event.getKey());
            this.trace("passivated(%s)", key);
            this.passivationManager.prePassivate(this.unmarshalEntry(key, event.getValue()));
        }
    }

    abstract class Operation<R> implements CacheInvoker.Operation<K, MarshalledValue<E, C>, R> {
    }

    private void trace(String message, Object... args) {
        if (this.log.isTraceEnabled()) {
            this.log.tracef(message, args);
        }
    }
}
TOP

Related Classes of org.jboss.as.clustering.ejb3.cache.backing.infinispan.InfinispanBackingCacheEntryStore$Operation

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.