Package org.jasig.portal.jgroups.protocols

Source Code of org.jasig.portal.jgroups.protocols.DAO_PING

/**
* Licensed to Jasig under one or more contributor license
* agreements. See the NOTICE file distributed with this work
* for additional information regarding copyright ownership.
* Jasig licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.jasig.portal.jgroups.protocols;

import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import org.jgroups.Address;
import org.jgroups.Event;
import org.jgroups.PhysicalAddress;
import org.jgroups.View;
import org.jgroups.annotations.Property;
import org.jgroups.conf.ClassConfigurator;
import org.jgroups.logging.LogFactory;
import org.jgroups.protocols.Discovery;

/**
* Discovery protocol that delegates to a {@link PingDao} implementation. The {@link PingDao} impl
* MUST call the static {@link #setPingDao(PingDao)} method when it is ready to handle calls. Until that
* time the DAO_PING protocol will behave as if there are no other visible members.
*
* @author Eric Dalquist
*/
public class DAO_PING extends Discovery {
    static {
        //Register the protocol with jGroups
        ClassConfigurator.addProtocol((short) 600, DAO_PING.class);
    }
   
    private static volatile PingDao pingDao;
   
    public static void setPingDao(PingDao pingDao) {
        if (DAO_PING.pingDao != null) {
            LogFactory.getLog(DAO_PING.class).warn("A PingDao was already set. " + DAO_PING.pingDao + " will be replaced with " + pingDao);
        }
        DAO_PING.pingDao = pingDao;
    }
   
    @Property(description="Interval (in milliseconds) at which the own Address is written. 0 disables it.")
    protected long interval=60000;
   
    private Future<?> writer_future;

    @Override
    public boolean sendDiscoveryRequestsInParallel() {
        return true;
    }

    @Override
    public boolean isDynamic() {
        return true;
    }

    public void start() throws Exception {
        super.start();
        if(interval > 0) {
            writer_future=timer.scheduleWithFixedDelay(new WriterTask(), interval, interval, TimeUnit.MILLISECONDS);
        }
    }

    public void stop() {
        final Future<?> wf = writer_future;
        if(wf != null) {
            wf.cancel(false);
            writer_future=null;
        }
       
        super.stop();
    }
   
    @Override
    public void destroy() {
        super.destroy();
       
        pingDao = null;
    }

    @Override
    public Collection<PhysicalAddress> fetchClusterMembers(String clusterName) {
        //If no DAO has been set just return an empty list
        if (pingDao == null) {
            log.info("No PingDao set, returning empty set for current cluster members");
            return Collections.emptyList();
        }
       
        //Get the current cluster members
        final Map<Address, PhysicalAddress> existing_mbrs = pingDao.getAddresses(clusterName);

        //Add our address to the store
        getAndSavePhysicalAddress(clusterName);
       
        //Return current members
        return existing_mbrs.values();
    }

    public Object down(Event evt) {
        final Object retval = super.down(evt);
       
        if (evt.getType() == Event.VIEW_CHANGE) {
            //Handle view changes to make sure the dao store is consistent
            handleView((View) evt.getArg());
        }
       
        return retval;
    }

    protected void getAndSavePhysicalAddress(String clusterName) {
        if (pingDao == null) {
            log.info("No PingDao set, skiping save of physical address for cluster " + clusterName);
            return;
        }
       
        PhysicalAddress physicalAddr = (PhysicalAddress) down(new Event(Event.GET_PHYSICAL_ADDRESS, local_addr));
        pingDao.addAddress(clusterName, local_addr, physicalAddr);
    }

    protected void handleView(View view) {
        if (pingDao == null) {
            //If no dao is set yet ignore the view change.
            log.info("No PingDao set, ignoring view change.");
            return;
        }
       
        final Collection<Address> mbrs = view.getMembers();
        final boolean is_coordinator = !mbrs.isEmpty() && mbrs.iterator().next().equals(local_addr);
        if (is_coordinator) {
            //Delete all member addresses other than those in the current view
            pingDao.purgeOtherAddresses(group_addr, mbrs);
        }
    }

    protected final class WriterTask implements Runnable {
        public void run() {
            getAndSavePhysicalAddress(group_addr);
        }
    }
}
TOP

Related Classes of org.jasig.portal.jgroups.protocols.DAO_PING

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.