Package org.apache.activegroups

Source Code of org.apache.activegroups.Group$ElectionService

/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements.  See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License.  You may obtain a copy of the License at
*
*      http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activegroups;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.Topic;
import org.apache.activegroups.command.AsyncMapRequest;
import org.apache.activegroups.command.ElectionMessage;
import org.apache.activegroups.command.EntryKey;
import org.apache.activegroups.command.EntryMessage;
import org.apache.activegroups.command.EntryValue;
import org.apache.activegroups.command.MapRequest;
import org.apache.activemq.ActiveMQMessageConsumer;
import org.apache.activemq.Service;
import org.apache.activemq.advisory.ConsumerEvent;
import org.apache.activemq.advisory.ConsumerEventSource;
import org.apache.activemq.advisory.ConsumerListener;
import org.apache.activemq.thread.SchedulerTimerTask;
import org.apache.activemq.util.IdGenerator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/**
* <P>
* A <CODE>Group</CODE> is a distributed collaboration implementation that is
* used to shared state and process messages amongst a distributed group of
* other <CODE>Group</CODE> instances. Membership of a group is handled
* automatically using discovery.
* <P>
* The underlying transport is JMS and there are some optimizations that occur
* for membership if used with ActiveMQ - but <CODE>Group</CODE> can be used
* with any JMS implementation.
*
* <P>
* Updates to the group shared map are controlled by a coordinator. The
* coordinator is elected by the member with the lowest lexicographical id -
* based on the bully algorithm [Silberschatz et al. 1993]
* <P>
* The {@link #selectCordinator(Collection<Member> members)} method may be
* overridden to implement a custom mechanism for choosing how the coordinator
* is elected for the map.
* <P>
* New <CODE>Group</CODE> instances have their state updated by the
* coordinator, and coordinator failure is handled automatically within the
* group.
* <P>
* All map updates are totally ordered through the coordinator, whilst read
* operations happen locally.
* <P>
* A <CODE>Group</CODE> supports the concept of owner only updates(write
* locks), shared updates, entry expiration times and removal on owner exit -
* all of which are optional. In addition, you can grab and release locks for
* values in the map, independently of who created them.
* <P>
* In addition, members of a group can broadcast messages and implement
* request/response with other <CODE>Group</CODE> instances.
*
* <P>
*
* @param <K>
*            the key type
* @param <V>
*            the value type
*
*/
public class Group<K, V> implements Map<K, V>, Service {
    /**
     * default interval within which to detect a member failure
     */
    public static final long DEFAULT_HEART_BEAT_INTERVAL = 1000;
    private static final long EXPIRATION_SWEEP_INTERVAL = 500;
    private static final Log LOG = LogFactory.getLog(Group.class);
    private static final String STATE_PREFIX = "STATE." + Group.class.getName()
            + ".";
    private static final String GROUP_MESSAGE_PREFIX = "MESSAGE."
            + Group.class.getName() + ".";
    private static final String STATE_TYPE = "state";
    private static final String MESSAGE_TYPE = "message";
    private static final String MEMBER_ID_PROPERTY = "memberId";
    protected Member local;
    private final Object mapMutex = new Object();
    private Map<K, EntryValue<V>> localMap;
    Map<String, Member> members = new ConcurrentHashMap<String, Member>();
    private Map<String, MapRequest> stateRequests = new HashMap<String, MapRequest>();
    private Map<String, MapRequest> messageRequests = new HashMap<String, MapRequest>();
    private List<MemberChangedListener> membershipListeners = new CopyOnWriteArrayList<MemberChangedListener>();
    private List<GroupStateChangedListener> mapChangedListeners = new CopyOnWriteArrayList<GroupStateChangedListener>();
    private List<GroupMessageListener> groupMessageListeners = new CopyOnWriteArrayList<GroupMessageListener>();
    private Member coordinator;
    private String groupName;
    private boolean alwaysLock;
    private Connection connection;
    private Session stateSession;
    private Session messageSession;
    private Topic stateTopic;
    private Topic heartBeatTopic;
    private Topic inboxTopic;
    private Topic messageTopic;
    private Queue messageQueue;
    private MessageProducer stateProducer;
    private MessageProducer messageProducer;
    private ConsumerEventSource consumerEvents;
    private AtomicBoolean started = new AtomicBoolean();
    private SchedulerTimerTask heartBeatTask;
    private SchedulerTimerTask checkMembershipTask;
    private SchedulerTimerTask expirationTask;
    private Timer timer;
    private long heartBeatInterval = DEFAULT_HEART_BEAT_INTERVAL;
    private IdGenerator idGenerator = new IdGenerator();
    private boolean removeOwnedObjectsOnExit;
    private boolean releaseLockOnExit = true;
    private int timeToLive;
    private int lockTimeToLive;
    private int minimumGroupSize = 1;
    private int coordinatorWeight = 0;
    private final AtomicBoolean electionFinished = new AtomicBoolean(true);
    private ExecutorService stateExecutor;
    private ExecutorService messageExecutor;
    private ThreadPoolExecutor electionExecutor;
    private final Object memberMutex = new Object();

    /**
     * @param connection
     * @param name
     */
    public Group(Connection connection, String name) {
        this(connection, "default", name);
    }

    /**
     * @param connection
     * @param groupName
     * @param name
     */
    public Group(Connection connection, String groupName, String name) {
        this.connection = connection;
        this.local = new Member(name);
        this.coordinator = this.local;
        this.groupName = groupName;
    }

    /**
     * Set the local map implementation to be used By default its a HashMap -
     * but you could use a Cache for example
     *
     * @param map
     */
    public void setLocalMap(Map map) {
        synchronized (this.mapMutex) {
            this.localMap = map;
        }
    }

    /**
     * Start membership to the group
     *
     * @throws Exception
     *
     */
    public void start() throws Exception {
        if (this.started.compareAndSet(false, true)) {
            synchronized (this.mapMutex) {
                if (this.localMap == null) {
                    this.localMap = new HashMap<K, EntryValue<V>>();
                }
            }
            this.connection.start();
            this.stateSession = this.connection.createSession(false,
                    Session.AUTO_ACKNOWLEDGE);
            this.messageSession = this.connection.createSession(false,
                    Session.AUTO_ACKNOWLEDGE);
            this.stateProducer = this.stateSession.createProducer(null);
            this.stateProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            this.inboxTopic = this.stateSession.createTemporaryTopic();
            String stateTopicName = STATE_PREFIX + this.groupName;
            this.stateTopic = this.stateSession.createTopic(stateTopicName);
            this.heartBeatTopic = this.stateSession.createTopic(stateTopicName
                    + ".heartbeat");
            String messageDestinationName = GROUP_MESSAGE_PREFIX
                    + this.groupName;
            this.messageTopic = this.messageSession
                    .createTopic(messageDestinationName);
            this.messageQueue = this.messageSession
                    .createQueue(messageDestinationName);
            MessageConsumer privateInbox = this.messageSession
                    .createConsumer(this.inboxTopic);
            MessageConsumer memberChangeConsumer = this.stateSession
                    .createConsumer(this.stateTopic);
            String memberId = null;
            if (memberChangeConsumer instanceof ActiveMQMessageConsumer) {
                memberId = ((ActiveMQMessageConsumer) memberChangeConsumer)
                        .getConsumerId().toString();
            } else {
                memberId = this.idGenerator.generateId();
            }
            this.local.setId(memberId);
            this.local.setInBoxDestination(this.inboxTopic);
            this.local.setCoordinatorWeight(getCoordinatorWeight());
            privateInbox.setMessageListener(new MessageListener() {
                public void onMessage(Message message) {
                    processJMSMessage(message);
                }
            });
            memberChangeConsumer.setMessageListener(new MessageListener() {
                public void onMessage(Message message) {
                    processJMSMessage(message);
                }
            });
            this.messageProducer = this.messageSession.createProducer(null);
            this.messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            MessageConsumer topicMessageConsumer = this.messageSession
                    .createConsumer(this.messageTopic);
            topicMessageConsumer.setMessageListener(new MessageListener() {
                public void onMessage(Message message) {
                    processJMSMessage(message);
                }
            });
            MessageConsumer queueMessageConsumer = this.messageSession
                    .createConsumer(this.messageQueue);
            queueMessageConsumer.setMessageListener(new MessageListener() {
                public void onMessage(Message message) {
                    processJMSMessage(message);
                }
            });
            MessageConsumer heartBeatConsumer = this.stateSession
                    .createConsumer(this.heartBeatTopic);
            heartBeatConsumer.setMessageListener(new MessageListener() {
                public void onMessage(Message message) {
                    handleHeartbeats(message);
                }
            });
            this.consumerEvents = new ConsumerEventSource(this.connection,
                    this.stateTopic);
            this.consumerEvents.setConsumerListener(new ConsumerListener() {
                public void onConsumerEvent(ConsumerEvent event) {
                    handleConsumerEvents(event);
                }
            });
            this.consumerEvents.start();
            this.electionExecutor = new ThreadPoolExecutor(1, 1, 0L,
                    TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),
                    new ThreadFactory() {
                        public Thread newThread(Runnable runnable) {
                            Thread thread = new Thread(runnable, "Election{"
                                    + Group.this.local + "}");
                            thread.setDaemon(true);
                            return thread;
                        }
                    });
            this.stateExecutor = Executors
                    .newSingleThreadExecutor(new ThreadFactory() {
                        public Thread newThread(Runnable runnable) {
                            Thread thread = new Thread(runnable, "Group State{"
                                    + Group.this.local + "}");
                            thread.setDaemon(true);
                            return thread;
                        }
                    });
            this.messageExecutor = Executors
                    .newSingleThreadExecutor(new ThreadFactory() {
                        public Thread newThread(Runnable runnable) {
                            Thread thread = new Thread(runnable,
                                    "Group Messages{" + Group.this.local + "}");
                            thread.setDaemon(true);
                            return thread;
                        }
                    });
            sendHeartBeat();
            this.heartBeatTask = new SchedulerTimerTask(new Runnable() {
                public void run() {
                    sendHeartBeat();
                }
            });
            this.checkMembershipTask = new SchedulerTimerTask(new Runnable() {
                public void run() {
                    checkMembership();
                }
            });
            this.expirationTask = new SchedulerTimerTask(new Runnable() {
                public void run() {
                    expirationSweep();
                }
            });
            this.timer = new Timer("Distributed heart beat", true);
            this.timer.scheduleAtFixedRate(this.heartBeatTask,
                    getHeartBeatInterval() / 3, getHeartBeatInterval() / 2);
            this.timer.scheduleAtFixedRate(this.checkMembershipTask,
                    getHeartBeatInterval(), getHeartBeatInterval());
            this.timer.scheduleAtFixedRate(this.expirationTask,
                    EXPIRATION_SWEEP_INTERVAL, EXPIRATION_SWEEP_INTERVAL);
            // await for members to join
            long timeout = (long) (this.heartBeatInterval
                    * this.minimumGroupSize *1.5);
            long deadline = System.currentTimeMillis() + timeout;
            while ((this.members.size() < this.minimumGroupSize || !this.electionFinished
                    .get())
                    && timeout > 0) {
                synchronized (this.electionFinished) {
                    this.electionFinished.wait(timeout);
                }
                timeout = Math.max(deadline - System.currentTimeMillis(), 0);
            }
        }
    }

    /**
     * stop membership to the group
     *
     * @throws Exception
     */
    public void stop() {
        if (this.started.compareAndSet(true, false)) {
            this.expirationTask.cancel();
            this.checkMembershipTask.cancel();
            this.heartBeatTask.cancel();
            this.expirationTask.cancel();
            this.timer.purge();
            if (this.electionExecutor != null) {
                this.electionExecutor.shutdownNow();
            }
            if (this.stateExecutor != null) {
                this.stateExecutor.shutdownNow();
            }
            if (this.messageExecutor != null) {
                this.messageExecutor.shutdownNow();
            }
            try {
                this.consumerEvents.stop();
                this.stateSession.close();
                this.messageSession.close();
                this.connection.close();
            } catch (Exception e) {
                LOG.debug("Caught exception stopping", e);
            }
           
        }
    }

    /**
     * @return true if started
     */
    public boolean isStarted() {
        return this.started.get();
    }

    /**
     * @return true if there is elections have finished
     */
    public boolean isElectionFinished() {
        return this.electionFinished.get();
    }

    void setElectionFinished(boolean flag) {
        this.electionFinished.set(flag);
    }

    /**
     * @return the partitionName
     */
    public String getGroupName() {
        return this.groupName;
    }

    /**
     * @return the name ofthis map
     */
    public String getName() {
        return this.local.getName();
    }

    /**
     * @return true if by default always lock objects (default is false)
     */
    public boolean isAlwaysLock() {
        return this.alwaysLock;
    }

    /**
     * @param alwaysLock -
     *            set true if objects inserted will always be locked (default is
     *            false)
     */
    public void setAlwaysLock(boolean alwaysLock) {
        this.alwaysLock = alwaysLock;
    }

    /**
     * @return the heartBeatInterval
     */
    public long getHeartBeatInterval() {
        return this.heartBeatInterval;
    }

    /**
     * @param heartBeatInterval
     *            the heartBeatInterval to set
     */
    public void setHeartBeatInterval(long heartBeatInterval) {
        this.heartBeatInterval = heartBeatInterval;
    }

    /**
     * Add a listener for membership changes
     *
     * @param l
     */
    public void addMemberChangedListener(MemberChangedListener l) {
        this.membershipListeners.add(l);
    }

    /**
     * Remove a listener for membership changes
     *
     * @param l
     */
    public void removeMemberChangedListener(MemberChangedListener l) {
        this.membershipListeners.remove(l);
    }

    /**
     * Add a listener for map changes
     *
     * @param l
     */
    public void addMapChangedListener(GroupStateChangedListener l) {
        this.mapChangedListeners.add(l);
    }

    /**
     * Remove a listener for map changes
     *
     * @param l
     */
    public void removeMapChangedListener(GroupStateChangedListener l) {
        this.mapChangedListeners.remove(l);
    }

    /**
     * Add a listener for group messages
     *
     * @param l
     */
    public void addGroupMessageListener(GroupMessageListener l) {
        this.groupMessageListeners.add(l);
    }

    /**
     * remove a listener for group messages
     *
     * @param l
     */
    public void removeGroupMessageListener(GroupMessageListener l) {
        this.groupMessageListeners.remove(l);
    }

    /**
     * @return the timeToLive
     */
    public int getTimeToLive() {
        return this.timeToLive;
    }

    /**
     * @param timeToLive
     *            the timeToLive to set
     */
    public void setTimeToLive(int timeToLive) {
        this.timeToLive = timeToLive;
    }

    /**
     * @return the removeOwnedObjectsOnExit
     */
    public boolean isRemoveOwnedObjectsOnExit() {
        return this.removeOwnedObjectsOnExit;
    }

    /**
     * Sets the policy for owned objects in the group If set to true, when this
     * <code>GroupMap<code> stops,
     * any objects it owns will be removed from the group map
     * @param removeOwnedObjectsOnExit the removeOwnedObjectsOnExit to set
     */
    public void setRemoveOwnedObjectsOnExit(boolean removeOwnedObjectsOnExit) {
        this.removeOwnedObjectsOnExit = removeOwnedObjectsOnExit;
    }

    /**
     * @return releaseLockOnExit - true by default
     */
    public boolean isReleaseLockOnExit() {
        return releaseLockOnExit;
    }

    /**
     * set release lock on exit - true by default
     *
     * @param releaseLockOnExit
     *            the releaseLockOnExit to set
     */
    public void setReleaseLockOnExit(boolean releaseLockOnExit) {
        this.releaseLockOnExit = releaseLockOnExit;
    }

    /**
     * @return the lockTimeToLive
     */
    public int getLockTimeToLive() {
        return lockTimeToLive;
    }

    /**
     * @param lockTimeToLive
     *            the lockTimeToLive to set
     */
    public void setLockTimeToLive(int lockTimeToLive) {
        this.lockTimeToLive = lockTimeToLive;
    }

    /**
     * @return the minimumGroupSize
     */
    public int getMinimumGroupSize() {
        return this.minimumGroupSize;
    }

    /**
     * @param minimumGroupSize
     *            the minimumGroupSize to set
     */
    public void setMinimumGroupSize(int minimumGroupSize) {
        this.minimumGroupSize = minimumGroupSize;
    }

    /**
     * @return the coordinatorWeight
     */
    public int getCoordinatorWeight() {
        return this.coordinatorWeight;
    }

    /**
     * @param coordinatorWeight
     *            the coordinatorWeight to set
     */
    public void setCoordinatorWeight(int coordinatorWeight) {
        this.coordinatorWeight = coordinatorWeight;
    }

    /**
     * clear entries from the Map
     *
     * @throws IllegalStateException
     */
    public void clear() throws IllegalStateException {
        checkStatus();
        if (this.localMap != null && !this.localMap.isEmpty()) {
            Set<K> keys = null;
            synchronized (this.mapMutex) {
                keys = new HashSet<K>(this.localMap.keySet());
            }
            for (K key : keys) {
                remove(key);
            }
        }
        this.localMap.clear();
    }

    public boolean containsKey(Object key) {
        synchronized (this.mapMutex) {
            return this.localMap != null ? this.localMap.containsKey(key)
                    : false;
        }
    }

    public boolean containsValue(Object value) {
        EntryValue entryValue = new EntryValue(null, value);
        synchronized (this.mapMutex) {
            return this.localMap != null ? this.localMap
                    .containsValue(entryValue) : false;
        }
    }

    public Set<java.util.Map.Entry<K, V>> entrySet() {
        Map<K, V> result = new HashMap<K, V>();
        synchronized (this.mapMutex) {
            if (this.localMap != null) {
                for (EntryValue<V> entry : this.localMap.values()) {
                    result.put((K) entry.getKey(), entry.getValue());
                }
            }
        }
        return result.entrySet();
    }

    public V get(Object key) {
        EntryValue<V> value = null;
        synchronized (this.mapMutex) {
            value = this.localMap != null ? this.localMap.get(key) : null;
        }
        return value != null ? value.getValue() : null;
    }

    public boolean isEmpty() {
        synchronized (this.mapMutex) {
            return this.localMap != null ? this.localMap.isEmpty() : true;
        }
    }

    public Set<K> keySet() {
        Set<K> result = null;
        synchronized (this.mapMutex) {
            result = new HashSet<K>(this.localMap.keySet());
        }
        return result;
    }

    /**
     * Puts an value into the map associated with the key
     *
     * @param key
     * @param value
     * @return the old value or null
     * @throws GroupUpdateException
     * @throws IllegalStateException
     *
     */
    public V put(K key, V value) throws GroupUpdateException,
            IllegalStateException {
        return put(key, value, isAlwaysLock(), isRemoveOwnedObjectsOnExit(),
                isReleaseLockOnExit(), getTimeToLive(), getLockTimeToLive());
    }

    /**
     * Puts an value into the map associated with the key
     *
     * @param key
     * @param value
     * @param lock
     * @param removeOnExit
     * @param releaseLockOnExit
     * @param timeToLive
     * @param leaseTime
     * @return the old value or null
     * @throws GroupUpdateException
     * @throws IllegalStateException
     *
     */
    public V put(K key, V value, boolean lock, boolean removeOnExit,
            boolean releaseLockOnExit, long timeToLive, long leaseTime)
            throws GroupUpdateException, IllegalStateException {
        checkStatus();
        EntryKey<K> entryKey = new EntryKey<K>(this.local, key);
        entryKey.setLocked(lock);
        entryKey.setRemoveOnExit(removeOnExit);
        entryKey.setReleaseLockOnExit(releaseLockOnExit);
        entryKey.setTimeToLive(timeToLive);
        entryKey.setLockLeaseTime(leaseTime);
        EntryMessage entryMsg = new EntryMessage();
        entryMsg.setKey(entryKey);
        entryMsg.setValue(value);
        entryMsg.setType(EntryMessage.MessageType.INSERT);
        return (V) sendStateRequest(getCoordinator(), entryMsg);
    }

    /**
     * Remove a lock on a key
     *
     * @param key
     * @throws GroupUpdateException
     */
    public void unlock(K key) throws GroupUpdateException {
        EntryKey<K> entryKey = new EntryKey<K>(this.local, key);
        entryKey.setLocked(false);
        EntryMessage entryMsg = new EntryMessage();
        entryMsg.setKey(entryKey);
        entryMsg.setLockUpdate(true);
        sendStateRequest(getCoordinator(), entryMsg);
    }

    /**
     * Lock a key in the distributed map
     *
     * @param key
     * @throws GroupUpdateException
     */
    public void lock(K key) throws GroupUpdateException {
        lock(key, getLockTimeToLive());
    }

    /**
     * Lock a key in the distributed map
     *
     * @param key
     * @param leaseTime
     * @throws GroupUpdateException
     */
    public void lock(K key, long leaseTime) throws GroupUpdateException {
        EntryKey<K> entryKey = new EntryKey<K>(this.local, key);
        entryKey.setLocked(true);
        entryKey.setLockLeaseTime(leaseTime);
        EntryMessage entryMsg = new EntryMessage();
        entryMsg.setKey(entryKey);
        entryMsg.setLockUpdate(true);
        sendStateRequest(getCoordinator(), entryMsg);
    }

    /**
     * Add the Map to the distribution
     *
     * @param t
     * @throws GroupUpdateException
     * @throws IllegalStateException
     */
    public void putAll(Map<? extends K, ? extends V> t)
            throws GroupUpdateException, IllegalStateException {
        putAll(t, isAlwaysLock(), isRemoveOwnedObjectsOnExit(),
                isReleaseLockOnExit(), getTimeToLive(), getLockTimeToLive());
    }

    /**
     * Add the Map to the distribution
     *
     * @param t
     * @param lock
     * @param removeOnExit
     * @param releaseLockOnExit
     * @param timeToLive
     * @param lockTimeToLive
     * @throws GroupUpdateException
     * @throws IllegalStateException
     */
    public void putAll(Map<? extends K, ? extends V> t, boolean lock,
            boolean removeOnExit, boolean releaseLockOnExit, long timeToLive,
            long lockTimeToLive) throws GroupUpdateException,
            IllegalStateException {
        for (java.util.Map.Entry<? extends K, ? extends V> entry : t.entrySet()) {
            put(entry.getKey(), entry.getValue(), lock, removeOnExit,
                    releaseLockOnExit, timeToLive, lockTimeToLive);
        }
    }

    /**
     * remove a value from the map associated with the key
     *
     * @param key
     * @return the Value or null
     * @throws GroupUpdateException
     * @throws IllegalStateException
     *
     */
    public V remove(Object key) throws GroupUpdateException,
            IllegalStateException {
        EntryKey<K> entryKey = new EntryKey<K>(this.local, (K) key);
        return doRemove(entryKey);
    }

    V doRemove(EntryKey<K> key) throws GroupUpdateException,
            IllegalStateException {
        checkStatus();
        EntryMessage entryMsg = new EntryMessage();
        entryMsg.setKey(key);
        entryMsg.setType(EntryMessage.MessageType.DELETE);
        return (V) sendStateRequest(getCoordinator(), entryMsg);
    }

    public int size() {
        synchronized (this.mapMutex) {
            return this.localMap != null ? this.localMap.size() : 0;
        }
    }

    public Collection<V> values() {
        List<V> result = new ArrayList<V>();
        synchronized (this.mapMutex) {
            if (this.localMap != null) {
                for (EntryValue<V> value : this.localMap.values()) {
                    result.add(value.getValue());
                }
            }
        }
        return result;
    }

    /**
     * @return a set of the members
     */
    public Set<Member> getMembers() {
        Set<Member> result = new HashSet<Member>();
        result.addAll(this.members.values());
        return result;
    }

    /**
     * Get a member by its unique id
     *
     * @param id
     * @return
     */
    public Member getMemberById(String id) {
        return this.members.get(id);
    }

    /**
     * Return a member of the Group with the matching name
     *
     * @param name
     * @return
     */
    public Member getMemberByName(String name) {
        if (name != null) {
            for (Member member : this.members.values()) {
                if (member.getName().equals(name)) {
                    return member;
                }
            }
        }
        return null;
    }

    /**
     * @return the local member that represents this <CODE>Group</CODE>
     *         instance
     */
    public Member getLocalMember() {
        return this.local;
    }

    /**
     * @param key
     * @return true if this is the owner of the key
     */
    public boolean isOwner(K key) {
        EntryValue<V> entryValue = null;
        synchronized (this.mapMutex) {
            entryValue = this.localMap != null ? this.localMap.get(key) : null;
        }
        boolean result = false;
        if (entryValue != null) {
            result = entryValue.getKey().getOwner().getId().equals(
                    this.local.getId());
        }
        return result;
    }

    /**
     * Get the owner of a key
     *
     * @param key
     * @return the owner - or null if the key doesn't exist
     */
    EntryKey getKey(Object key) {
        EntryValue<V> entryValue = null;
        synchronized (this.mapMutex) {
            entryValue = this.localMap != null ? this.localMap.get(key) : null;
        }
        return entryValue != null ? entryValue.getKey() : null;
    }

    /**
     * @return true if the coordinator for the map
     */
    protected boolean isCoordinator() {
        return isCoordinatorMatch() && this.electionFinished.get();
    }

    /**
     * @return true if the coordinator for the map
     */
    protected boolean isCoordinatorMatch() {
        return this.local.equals(this.coordinator);
    }

    /**
     * @return the coordinator
     */
    public Member getCoordinator() {
        return this.coordinator;
    }

    void setCoordinator(Member member) {
        this.coordinator = member;
    }

    /**
     * Broadcast a message to the group
     *
     * @param message
     * @throws JMSException
     */
    public void broadcastMessage(Object message) throws JMSException {
        checkStatus();
        ObjectMessage objMsg = this.messageSession
                .createObjectMessage((Serializable) message);
        objMsg.setJMSCorrelationID(this.idGenerator.generateId());
        objMsg.setJMSType(MESSAGE_TYPE);
        objMsg.setStringProperty(MEMBER_ID_PROPERTY, this.local.getId());
        this.messageProducer.send(this.messageTopic, objMsg);
    }

    /**
     * As the group for a response - one will be selected from the group
     *
     * @param member
     * @param message
     * @param timeout
     *            in milliseconds - a value if 0 means wait until complete
     * @return
     * @throws JMSException
     */
    public Serializable broadcastMessageRequest(Object message, long timeout)
            throws JMSException {
        checkStatus();
        Object result = null;
        MapRequest request = new MapRequest();
        String id = this.idGenerator.generateId();
        synchronized (this.messageRequests) {
            this.messageRequests.put(id, request);
        }
        ObjectMessage objMsg = this.stateSession
                .createObjectMessage((Serializable) message);
        objMsg.setJMSReplyTo(this.inboxTopic);
        objMsg.setJMSCorrelationID(id);
        objMsg.setJMSType(MESSAGE_TYPE);
        objMsg.setStringProperty(MEMBER_ID_PROPERTY, this.local.getId());
        this.messageProducer.send(this.messageQueue, objMsg);
        result = request.get(timeout);
        return (Serializable) result;
    }

    /**
     * Send a message to the group - but only the least loaded member will
     * process it
     *
     * @param message
     * @throws JMSException
     */
    public void sendMessage(Object message) throws JMSException {
        checkStatus();
        ObjectMessage objMsg = this.messageSession
                .createObjectMessage((Serializable) message);
        objMsg.setJMSCorrelationID(this.idGenerator.generateId());
        objMsg.setJMSType(MESSAGE_TYPE);
        objMsg.setStringProperty(MEMBER_ID_PROPERTY, this.local.getId());
        this.messageProducer.send(this.messageQueue, objMsg);
    }

    /**
     * Send a message to an individual member
     *
     * @param member
     * @param message
     * @throws JMSException
     */
    public void sendMessage(Member member, Object message) throws JMSException {
        checkStatus();
        ObjectMessage objMsg = this.messageSession
                .createObjectMessage((Serializable) message);
        objMsg.setJMSCorrelationID(this.idGenerator.generateId());
        objMsg.setJMSType(MESSAGE_TYPE);
        objMsg.setStringProperty(MEMBER_ID_PROPERTY, this.local.getId());
        this.messageProducer.send(member.getInBoxDestination(), objMsg);
    }

    /**
     * Send a request to a member
     *
     * @param member
     * @param message
     * @param timeout
     *            in milliseconds - a value if 0 means wait until complete
     * @return the request or null
     * @throws JMSException
     */
    public Object sendMessageRequest(Member member, Object message, long timeout)
            throws JMSException {
        checkStatus();
        Object result = null;
        MapRequest request = new MapRequest();
        String id = this.idGenerator.generateId();
        synchronized (this.messageRequests) {
            this.messageRequests.put(id, request);
        }
        ObjectMessage objMsg = this.stateSession
                .createObjectMessage((Serializable) message);
        objMsg.setJMSReplyTo(this.inboxTopic);
        objMsg.setJMSCorrelationID(id);
        objMsg.setJMSType(MESSAGE_TYPE);
        objMsg.setStringProperty(MEMBER_ID_PROPERTY, this.local.getId());
        this.messageProducer.send(member.getInBoxDestination(), objMsg);
        result = request.get(timeout);
        return result;
    }

    /**
     * send a response to a message
     *
     * @param member
     * @param replyId
     * @param message
     * @throws JMSException
     */
    public void sendMessageResponse(Member member, String replyId,
            Object message) throws JMSException {
        checkStatus();
        ObjectMessage objMsg = this.messageSession
                .createObjectMessage((Serializable) message);
        objMsg.setJMSCorrelationID(replyId);
        objMsg.setJMSType(MESSAGE_TYPE);
        objMsg.setStringProperty(MEMBER_ID_PROPERTY, this.local.getId());
        this.messageProducer.send(member.getInBoxDestination(), objMsg);
    }

    /**
     * Select a coordinator - coordinator weighting is used - or if everything
     * is equal - a comparison of member ids.
     *
     * @param members
     * @return
     */
    protected Member selectCordinator(List<Member> list) {
        List<Member> sorted = sortMemberList(list);
        Member result = sorted.isEmpty() ? this.local : sorted
                .get(list.size() - 1);
        return result;
    }

    protected List<Member> sortMemberList(List<Member> list) {
        Collections.sort(list, new Comparator<Member>() {
            public int compare(Member m1, Member m2) {
                int result = m1.getCoordinatorWeight()
                        - m2.getCoordinatorWeight();
                if (result == 0) {
                    result = m1.getId().compareTo(m2.getId());
                }
                return result;
            }
        });
        return list;
    }

    Object sendStateRequest(Member member, Serializable payload) {
        Object result = null;
        MapRequest request = new MapRequest();
        String id = this.idGenerator.generateId();
        synchronized (this.stateRequests) {
            this.stateRequests.put(id, request);
        }
        try {
            ObjectMessage objMsg = this.stateSession
                    .createObjectMessage(payload);
            objMsg.setJMSReplyTo(this.inboxTopic);
            objMsg.setJMSCorrelationID(id);
            objMsg.setJMSType(STATE_TYPE);
            this.stateProducer.send(member.getInBoxDestination(), objMsg);
            result = request.get(getHeartBeatInterval());
        } catch (JMSException e) {
            if (this.started.get()) {
                LOG.error("Failed to send request " + payload, e);
            }
        }
        if (result instanceof GroupUpdateException) {
            throw (GroupUpdateException) result;
        }
        if (result instanceof EntryMessage) {
            EntryMessage entryMsg = (EntryMessage) result;
            result = entryMsg.getOldValue();
        }
        return result;
    }

    void sendAsyncStateRequest(AsyncMapRequest asyncRequest, Member member,
            Serializable payload) {
        MapRequest request = new MapRequest();
        String id = this.idGenerator.generateId();
        asyncRequest.add(id, request);
        synchronized (this.stateRequests) {
            this.stateRequests.put(id, request);
        }
        try {
            ObjectMessage objMsg = this.stateSession
                    .createObjectMessage(payload);
            objMsg.setJMSReplyTo(this.inboxTopic);
            objMsg.setJMSCorrelationID(id);
            objMsg.setJMSType(STATE_TYPE);
            this.stateProducer.send(member.getInBoxDestination(), objMsg);
        } catch (JMSException e) {
            if (this.started.get()) {
                LOG.error("Failed to send async request " + payload, e);
            }
        }
    }

    void sendReply(Object reply, Destination replyTo, String id) {
        if (this.started.get()) {
            if (replyTo != null) {
                if (replyTo.equals(this.local.getInBoxDestination())) {
                    processRequest(id, reply);
                } else {
                    try {
                        ObjectMessage replyMsg = this.stateSession
                                .createObjectMessage((Serializable) reply);
                        replyMsg.setJMSCorrelationID(id);
                        replyMsg.setJMSType(STATE_TYPE);
                        this.stateProducer.send(replyTo, replyMsg);
                    } catch (JMSException e) {
                        LOG.error("Couldn't send reply from co-ordinator", e);
                    }
                }
            } else {
                LOG.error("NULL replyTo destination");
            }
        }
    }

    void broadcastMapUpdate(EntryMessage entry, String correlationId) {
        if (this.started.get()) {
            try {
                EntryMessage copy = entry.copy();
                copy.setMapUpdate(true);
                ObjectMessage objMsg = this.stateSession
                        .createObjectMessage(copy);
                objMsg.setJMSCorrelationID(correlationId);
                objMsg.setJMSType(STATE_TYPE);
                this.stateProducer.send(this.stateTopic, objMsg);
            } catch (JMSException e) {
                if (this.started.get()) {
                    LOG.error("Failed to send EntryMessage " + entry, e);
                }
            }
        }
    }

    void processJMSMessage(Message message) {
        if (message instanceof ObjectMessage) {
            ObjectMessage objMsg = (ObjectMessage) message;
            try {
                String messageType = objMsg.getJMSType();
                String id = objMsg.getJMSCorrelationID();
                String memberId = objMsg.getStringProperty(MEMBER_ID_PROPERTY);
                Destination replyTo = objMsg.getJMSReplyTo();
                Object payload = objMsg.getObject();
                if (messageType != null) {
                    if (messageType.equals(STATE_TYPE)) {
                        if (payload instanceof Member) {
                            handleHeartbeats((Member) payload);
                        } else if (payload instanceof EntryMessage) {
                            EntryMessage entryMsg = (EntryMessage) payload;
                            entryMsg = entryMsg.copy();
                            if (entryMsg.isLockUpdate()) {
                                processLockUpdate(entryMsg, replyTo, id);
                            } else if (entryMsg.isMapUpdate()) {
                                processMapUpdate(entryMsg);
                            } else {
                                processEntryMessage(entryMsg, replyTo, id);
                            }
                        } else if (payload instanceof ElectionMessage) {
                            ElectionMessage electionMsg = (ElectionMessage) payload;
                            electionMsg = electionMsg.copy();
                            processElectionMessage(electionMsg, replyTo, id);
                        }
                    } else if (messageType.equals(MESSAGE_TYPE)) {
                        processGroupMessage(memberId, id, replyTo, payload);
                    } else {
                        LOG.error("Unknown message type: " + messageType);
                    }
                    processRequest(id, payload);
                } else {
                    LOG.error("Can't process a message type of null");
                }
            } catch (JMSException e) {
                LOG.warn("Failed to process message: " + message, e);
            }
        }
    }

    void processRequest(String id, Object value) {
        if (id != null) {
            MapRequest result = null;
            synchronized (this.stateRequests) {
                result = this.stateRequests.remove(id);
            }
            if (result != null) {
                result.put(id, value);
            }
        }
    }

    void processLockUpdate(EntryMessage entryMsg, Destination replyTo,
            String correlationId) {
        waitForElection();
        synchronized (this.mapMutex) {
            boolean newLock = entryMsg.getKey().isLocked();
            Member newOwner = entryMsg.getKey().getOwner();
            long newLockExpiration = newLock ? entryMsg.getKey()
                    .getLockExpiration() : 0l;
            if (isCoordinator() && !entryMsg.isMapUpdate()) {
                EntryKey originalKey = getKey(entryMsg.getKey().getKey());
                if (originalKey != null) {
                    if (originalKey.isLocked()) {
                        if (!originalKey.getOwner().equals(
                                entryMsg.getKey().getOwner())) {
                            Serializable reply = new GroupUpdateException(
                                    "Owned by " + originalKey.getOwner());
                            sendReply(reply, replyTo, correlationId);
                        } else {
                            originalKey.setLocked(newLock);
                            originalKey.setOwner(newOwner);
                            originalKey.setLockExpiration(newLockExpiration);
                            broadcastMapUpdate(entryMsg, correlationId);
                        }
                    } else {
                        originalKey.setLocked(newLock);
                        originalKey.setOwner(newOwner);
                        originalKey.setLockExpiration(newLockExpiration);
                        broadcastMapUpdate(entryMsg, correlationId);
                    }
                }
            } else {
                EntryKey originalKey = getKey(entryMsg.getKey().getKey());
                if (originalKey != null) {
                    originalKey.setLocked(newLock);
                    originalKey.setOwner(newOwner);
                    originalKey.setLockExpiration(newLockExpiration);
                }
            }
        }
    }

    void processEntryMessage(EntryMessage entryMsg, Destination replyTo,
            String correlationId) {
        waitForElection();
        if (isCoordinator()) {
            EntryKey<K> key = entryMsg.getKey();
            EntryValue<V> value = new EntryValue<V>(key, (V) entryMsg
                    .getValue());
            boolean insert = entryMsg.isInsert();
            boolean containsKey = false;
            synchronized (this.mapMutex) {
                containsKey = this.localMap.containsKey(key.getKey());
            }
            if (containsKey) {
                EntryKey originalKey = getKey(key.getKey());
                if (originalKey.equals(key.getOwner())
                        || !originalKey.isLocked()) {
                    EntryValue<V> old = null;
                    if (insert) {
                        synchronized (this.mapMutex) {
                            old = this.localMap.put(key.getKey(), value);
                        }
                    } else {
                        synchronized (this.mapMutex) {
                            old = this.localMap.remove(key.getKey());
                        }
                    }
                    entryMsg.setOldValue(old.getValue());
                    broadcastMapUpdate(entryMsg, correlationId);
                    fireMapChanged(key.getOwner(), key.getKey(),
                            old.getValue(), value.getValue(), false);
                } else {
                    Serializable reply = new GroupUpdateException(
                            "Owned by " + originalKey.getOwner());
                    sendReply(reply, replyTo, correlationId);
                }
            } else {
                if (insert) {
                    synchronized (this.mapMutex) {
                        this.localMap.put(key.getKey(), value);
                    }
                    broadcastMapUpdate(entryMsg, correlationId);
                    fireMapChanged(key.getOwner(), key.getKey(), null, value
                            .getValue(), false);
                } else {
                    sendReply(null, replyTo, correlationId);
                }
            }
        }
    }

    void processMapUpdate(EntryMessage entryMsg) {
        boolean containsKey = false;
        EntryKey<K> key = entryMsg.getKey();
        EntryValue<V> value = new EntryValue<V>(key, (V) entryMsg.getValue());
        boolean insert = entryMsg.isInsert() || entryMsg.isSync();
        synchronized (this.mapMutex) {
            containsKey = this.localMap.containsKey(key.getKey());
        }
        waitForElection();
        if (!isCoordinator() || entryMsg.isSync()) {
            if (containsKey) {
                if (key.isLockExpired()) {
                    EntryValue old = this.localMap.get(key.getKey());
                    if (old != null) {
                        old.getKey().setLocked(false);
                    }
                } else {
                    EntryValue<V> old = null;
                    if (insert) {
                        synchronized (this.mapMutex) {
                            old = this.localMap.put(key.getKey(), value);
                        }
                    } else {
                        synchronized (this.mapMutex) {
                            old = this.localMap.remove(key.getKey());
                            value.setValue(null);
                        }
                    }
                    fireMapChanged(key.getOwner(), key.getKey(),
                            old.getValue(), value.getValue(), entryMsg
                                    .isExpired());
                }
            } else {
                if (insert) {
                    synchronized (this.mapMutex) {
                        this.localMap.put(key.getKey(), value);
                    }
                    fireMapChanged(key.getOwner(), key.getKey(), null, value
                            .getValue(), false);
                }
            }
        }
    }

    void processGroupMessage(String memberId, String replyId,
            Destination replyTo, Object payload) {
        Member member = this.members.get(memberId);
        if (member != null) {
            fireMemberMessage(member, replyId, payload);
        }
        if (replyId != null) {
            MapRequest result = null;
            synchronized (this.messageRequests) {
                result = this.messageRequests.remove(replyId);
            }
            if (result != null) {
                result.put(replyId, payload);
            }
        }
    }

    void handleHeartbeats(Message message) {
        try {
            if (message instanceof ObjectMessage) {
                ObjectMessage objMsg = (ObjectMessage) message;
                Member member = (Member) objMsg.getObject();
                handleHeartbeats(member);
            } else {
                LOG.warn("Unexpected message: " + message);
            }
        } catch (JMSException e) {
            LOG.warn("Failed to handle heart beat", e);
        }
    }

    void handleHeartbeats(Member member) {
        member.setTimeStamp(System.currentTimeMillis());
        if (this.members.put(member.getId(), member) == null) {
            fireMemberStarted(member);
            if (!member.equals(this.local)) {
                sendHeartBeat(member.getInBoxDestination());
            }
            election(member, true);
            synchronized (this.memberMutex) {
                this.memberMutex.notifyAll();
            }
        }
    }

    void handleConsumerEvents(ConsumerEvent event) {
        if (!event.isStarted()) {
            Member member = this.members.remove(event.getConsumerId()
                    .toString());
            if (member != null) {
                fireMemberStopped(member);
                election(member, false);
            }
        }
    }

    void checkMembership() {
        if (this.started.get() && this.electionFinished.get()) {
            long checkTime = System.currentTimeMillis()
                    - getHeartBeatInterval();
            boolean doElection = false;
            for (Member member : this.members.values()) {
                if (member.getTimeStamp() < checkTime) {
                    LOG.info("Member timestamp expired " + member);
                    this.members.remove(member.getId());
                    fireMemberStopped(member);
                    doElection = true;
                }
            }
            if (doElection) {
                election(null, false);
            }
        }
    }

    void expirationSweep() {
        waitForElection();
        if (isCoordinator() && this.started.get()
                && this.electionFinished.get()) {
            List<EntryKey> expiredMessages = null;
            List<EntryKey> expiredLocks = null;
            synchronized (this.mapMutex) {
                Map<K, EntryValue<V>> map = this.localMap;
                if (map != null) {
                    long currentTime = System.currentTimeMillis();
                    for (EntryValue value : map.values()) {
                        EntryKey k = value.getKey();
                        if (k.isExpired(currentTime)) {
                            if (expiredMessages == null) {
                                expiredMessages = new ArrayList<EntryKey>();
                                expiredMessages.add(k);
                            }
                        } else if (k.isLockExpired(currentTime)) {
                            k.setLocked(false);
                            if (expiredLocks == null) {
                                expiredLocks = new ArrayList<EntryKey>();
                                expiredLocks.add(k);
                            }
                            expiredLocks.add(k);
                        }
                    }
                }
            }
            // do the actual removal of entries in a separate thread
            if (expiredMessages != null) {
                final List<EntryKey> expire = expiredMessages;
                this.stateExecutor.execute(new Runnable() {
                    public void run() {
                        doMessageExpiration(expire);
                    }
                });
            }
            if (expiredLocks != null) {
                final List<EntryKey> expire = expiredLocks;
                this.stateExecutor.execute(new Runnable() {
                    public void run() {
                        doLockExpiration(expire);
                    }
                });
            }
        }
    }

    void doMessageExpiration(List<EntryKey> list) {
        if (this.started.get() && this.electionFinished.get()
                && isCoordinator()) {
            for (EntryKey k : list) {
                EntryValue<V> old = null;
                synchronized (this.mapMutex) {
                    old = this.localMap.remove(k.getKey());
                }
                if (old != null) {
                    EntryMessage entryMsg = new EntryMessage();
                    entryMsg.setType(EntryMessage.MessageType.DELETE);
                    entryMsg.setExpired(true);
                    entryMsg.setKey(k);
                    entryMsg.setValue(old.getValue());
                    broadcastMapUpdate(entryMsg, "");
                    fireMapChanged(k.getOwner(), k.getKey(), old.getValue(),
                            null, true);
                }
            }
        }
    }

    void doLockExpiration(List<EntryKey> list) {
        if (this.started.get() && this.electionFinished.get()
                && isCoordinator()) {
            for (EntryKey k : list) {
                EntryMessage entryMsg = new EntryMessage();
                entryMsg.setType(EntryMessage.MessageType.DELETE);
                entryMsg.setLockExpired(true);
                entryMsg.setKey(k);
                broadcastMapUpdate(entryMsg, "");
            }
        }
    }

    void sendHeartBeat() {
        sendHeartBeat(this.heartBeatTopic);
    }

    void sendHeartBeat(Destination destination) {
        if (this.started.get()) {
            try {
                ObjectMessage msg = this.stateSession
                        .createObjectMessage(this.local);
                msg.setJMSType(STATE_TYPE);
                this.stateProducer.send(destination, msg);
            } catch (javax.jms.IllegalStateException e) {
                // ignore - as we are probably stopping
            } catch (Throwable e) {
                if (this.started.get()) {
                    LOG.warn("Failed to send heart beat", e);
                }
            }
        }
    }

    void updateNewMemberMap(Member member) {
        List<Map.Entry<K, EntryValue<V>>> list = new ArrayList<Map.Entry<K, EntryValue<V>>>();
        synchronized (this.mapMutex) {
            if (this.localMap != null) {
                for (Map.Entry<K, EntryValue<V>> entry : this.localMap
                        .entrySet()) {
                    list.add(entry);
                }
            }
        }
        try {
            for (Map.Entry<K, EntryValue<V>> entry : list) {
                EntryMessage entryMsg = new EntryMessage();
                entryMsg.setKey(entry.getValue().getKey());
                entryMsg.setValue(entry.getValue().getValue());
                entryMsg.setType(EntryMessage.MessageType.SYNC);
                entryMsg.setMapUpdate(true);
                ObjectMessage objMsg = this.stateSession
                        .createObjectMessage(entryMsg);
                if (!member.equals(entry.getValue().getKey().getOwner())) {
                    objMsg.setJMSType(STATE_TYPE);
                    this.stateProducer.send(member.getInBoxDestination(),
                            objMsg);
                }
            }
        } catch (javax.jms.IllegalStateException e) {
            // ignore - as closing
        } catch (JMSException e) {
            if (started.get()) {
                LOG.warn("Failed to update new member ", e);
            }
        }
    }

    void fireMemberStarted(Member member) {
        LOG.info(this.local.getName() + " Member started " + member);
        for (MemberChangedListener l : this.membershipListeners) {
            l.memberStarted(member);
        }
    }

    void fireMemberStopped(Member member) {
        LOG.info(this.local.getName() + " Member stopped " + member);
        for (MemberChangedListener l : this.membershipListeners) {
            l.memberStopped(member);
        }
        // remove all entries owned by the stopped member
        List<EntryKey<K>> tmpList = new ArrayList<EntryKey<K>>();
        boolean mapExists = false;
        synchronized (this.mapMutex) {
            mapExists = this.localMap != null;
            if (mapExists) {
                for (EntryValue value : this.localMap.values()) {
                    EntryKey entryKey = value.getKey();
                    if (entryKey.getOwner().equals(member)) {
                        if (entryKey.isRemoveOnExit()) {
                            tmpList.add(entryKey);
                        }
                        if (entryKey.isReleaseLockOnExit()) {
                            entryKey.setLocked(false);
                        }
                    }
                }
            }
        }
        if (mapExists) {
            for (EntryKey<K> entryKey : tmpList) {
                EntryValue<V> value = null;
                synchronized (this.mapMutex) {
                    value = this.localMap.remove(entryKey);
                }
                fireMapChanged(member, entryKey.getKey(), value.getValue(),
                        null, false);
            }
        }
    }

    void fireMemberMessage(final Member member, final String replyId,
            final Object message) {
        if (this.started.get() && this.stateExecutor != null
                && !this.messageExecutor.isShutdown()) {
            this.messageExecutor.execute(new Runnable() {
                public void run() {
                    doFireMemberMessage(member, replyId, message);
                }
            });
        }
    }

    void doFireMemberMessage(Member sender, String replyId, Object message) {
        if (this.started.get()) {
            for (GroupMessageListener l : this.groupMessageListeners) {
                l.messageDelivered(sender, replyId, message);
            }
        }
    }

    void fireMapChanged(final Member owner, final Object key,
            final Object oldValue, final Object newValue, final boolean expired) {
        if (this.started.get() && this.stateExecutor != null
                && !this.stateExecutor.isShutdown()) {
            this.stateExecutor.execute(new Runnable() {
                public void run() {
                    doFireMapChanged(owner, key, oldValue, newValue, expired);
                }
            });
        }
    }

    void doFireMapChanged(Member owner, Object key, Object oldValue,
            Object newValue, boolean expired) {
        if (this.started.get()) {
            for (GroupStateChangedListener l : this.mapChangedListeners) {
                if (oldValue == null) {
                    l.mapInsert(owner, key, newValue);
                } else if (newValue == null) {
                    l.mapRemove(owner, key, oldValue, expired);
                } else {
                    l.mapUpdate(owner, key, oldValue, newValue);
                }
            }
        }
    }

    void checkStatus() throws IllegalStateException {
        if (!started.get()) {
            throw new IllegalStateException("GroupMap " + this.local.getName()
                    + " not started");
        }
        waitForElection();
    }

    public String toString() {
        return "Group:" + getName() + "{id=" + this.local.getId()
                + ",coordinator=" + isCoordinator() + ",inbox="
                + this.local.getInBoxDestination() + "}";
    }

    void election(final Member member, final boolean memberStarted) {
        if (this.started.get() && this.stateExecutor != null
                && !this.electionExecutor.isShutdown()) {
            synchronized (this.electionFinished) {
                this.electionFinished.set(false);
            }
            synchronized (this.electionExecutor) {
                // remove any queued election tasks
                List<Runnable> list = new ArrayList<Runnable>(
                        this.electionExecutor.getQueue());
                for (Runnable r : list) {
                    ElectionService es = (ElectionService) r;
                    es.stop();
                    this.electionExecutor.remove(es);
                }
            }
            ElectionService es = new ElectionService(member, memberStarted);
            es.start();
            this.electionExecutor.execute(es);
        }
    }

    boolean callElection() {
        List<Member> members = new ArrayList<Member>(this.members.values());
        List<Member> sorted = sortMemberList(members);
        AsyncMapRequest request = new AsyncMapRequest();
        boolean doCall = false;
        for (Member member : sorted) {
            if (this.local.equals(member)) {
                doCall = true;
            } else if (doCall) {
                ElectionMessage msg = new ElectionMessage();
                msg.setMember(this.local);
                msg.setType(ElectionMessage.MessageType.ELECTION);
                sendAsyncStateRequest(request, member, msg);
            }
        }
        boolean result = request.isSuccess(getHeartBeatInterval());
        return result;
    }

    void processElectionMessage(ElectionMessage msg, Destination replyTo,
            String correlationId) {
        if (msg.isElection()) {
            msg.setType(ElectionMessage.MessageType.ANSWER);
            msg.setMember(this.local);
            sendReply(msg, replyTo, correlationId);
            election(null, false);
        } else if (msg.isCoordinator()) {
            synchronized (this.electionFinished) {
                this.coordinator = msg.getMember();
                this.electionFinished.set(true);
                this.electionFinished.notifyAll();
            }
        }
    }

    void broadcastElectionType(ElectionMessage.MessageType type) {
        if (started.get()) {
            try {
                ElectionMessage msg = new ElectionMessage();
                msg.setMember(this.local);
                msg.setType(type);
                ObjectMessage objMsg = this.stateSession
                        .createObjectMessage(msg);
                objMsg.setJMSType(STATE_TYPE);
                this.stateProducer.send(this.stateTopic, objMsg);
            } catch (javax.jms.IllegalStateException e) {
                // ignore - we are stopping
            } catch (JMSException e) {
                if (this.started.get()) {
                    LOG.error("Failed to broadcast election message: " + type,
                            e);
                }
            }
        }
    }

    void waitForElection() {
        synchronized (this.electionFinished) {
            while (started.get() && !this.electionFinished.get()) {
                try {
                    this.electionFinished.wait(200);
                } catch (InterruptedException e) {
                    LOG.warn("Interrupted in waitForElection");
                    stop();
                }
            }
        }
    }
    class ElectionService implements Runnable {
        private AtomicBoolean started = new AtomicBoolean();
        private Member member;
        private boolean memberStarted;

        ElectionService(Member member, boolean memberStarted) {
            this.member = member;
            this.memberStarted = memberStarted;
        }

        void start() {
            this.started.set(true);
        }

        void stop() {
            this.started.set(false);
        }

        public void run() {
            doElection();
        }

        void doElection() {
            if ((this.member == null || (!this.member.equals(Group.this.local) || Group.this.members
                    .size() == getMinimumGroupSize()))) {
                boolean wasCoordinator = isCoordinatorMatch() && !isEmpty();
                // call an election
                while (!callElection() && isStarted() && this.started.get())
                    ;
                if (isStarted() && this.started.get()) {
                    List<Member> members = new ArrayList<Member>(
                            Group.this.members.values());
                    Group.this.coordinator = selectCordinator(members);
                    if (isCoordinatorMatch()) {
                        broadcastElectionType(ElectionMessage.MessageType.COORDINATOR);
                    }
                    if (this.memberStarted && this.member != null) {
                        if (wasCoordinator || isCoordinator()
                                && this.started.get()) {
                            updateNewMemberMap(this.member);
                        }
                    }
                    if (!isElectionFinished() && this.started.get()) {
                        try {
                            synchronized (Group.this.electionFinished) {
                                Group.this.electionFinished
                                        .wait(Group.this.heartBeatInterval * 2);
                            }
                        } catch (InterruptedException e) {
                        }
                    }
                    if (!isElectionFinished() && this.started.get()) {
                        // we must be the coordinator
                        setCoordinator(getLocalMember());
                        setElectionFinished(true);
                        broadcastElectionType(ElectionMessage.MessageType.COORDINATOR);
                    }
                }
            }
        }
    }
}
TOP

Related Classes of org.apache.activegroups.Group$ElectionService

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.