Package com.alu.e3.data

Source Code of com.alu.e3.data.TopologyClient

/**
* Copyright © 2012 Alcatel-Lucent.
*
* See the NOTICE file distributed with this work for additional
* information regarding copyright ownership.
* Licensed to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*          http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied.  See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.alu.e3.data;

import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alu.e3.common.caching.ICacheManager;
import com.alu.e3.common.caching.ICacheTable;
import com.alu.e3.common.caching.IEntryListener;
import com.alu.e3.common.osgi.api.ITopologyClient;
import com.alu.e3.common.tools.CanonicalizedIpAddress;
import com.alu.e3.common.tools.CommonTools;
import com.alu.e3.data.model.Instance;
import com.alu.e3.data.topology.IInstanceListener;
import com.alu.e3.data.topology.InstanceEvent;

/**
* Class to access/set topology information in cache
*/
public class TopologyClient implements ITopologyClient {

  private static final Logger logger = LoggerFactory.getLogger(TopologyClient.class);
 
  private ICacheTable<String, ArrayList<Instance>> cachingTableTopology;

  /* area of this instance. */
  private String currentArea;
  private Set<String> currentAreaList;
 
  private ICacheManager cacheManager;

  private Map<String,Instance> whoIAm = new HashMap<String,Instance>();

  public TopologyClient() {}
 
  /**
   * Set the cache manager
   */
  public void setCacheManager(ICacheManager cacheManager) {
    if (logger.isDebugEnabled()) {
      logger.debug("Set ICacheManager on TopologyClient");
    }
    this.cacheManager = cacheManager;
  }
 
  public void init() {
    if (logger.isDebugEnabled()) {
      logger.debug("TopologyClient initialization");
    }
    cachingTableTopology = cacheManager.createTable("cachingTableTopology", true, null);
  }

  /**
   * Adds an {@link Instance} to the local topology.
   * If the given {@link Instance} already present in this topology,
   * the given {@link Instance} will be ignored.
   * @see instanceEquals method
   * @param inst the {@link Instance} to add
   */
  public void addInstance(Instance inst) {
    if (logger.isDebugEnabled()) {
      logger.debug("Adding instance: {}", inst);
    }
   
    if (inst == null || inst.getType().isEmpty() || inst.getInternalIP().isEmpty())
      throw new IllegalArgumentException("Invalid type or ip");
   
    ArrayList<Instance> list = cachingTableTopology.get(inst.getType());
    if (list == null)
    {
      if (logger.isDebugEnabled()) {
        logger.debug("Creating cachingTableTopology element for type: {}", inst.getType());
      }
      list = new ArrayList<Instance>();
    }
   
    for (Instance i : list)
    {
      if (instanceEquals(i, inst)) {
        if (logger.isDebugEnabled()) {
          logger.debug("Intance:{} already presents in topology for type: {}", inst, inst.getType());
        }
        return;
      }
    }
   
    list.add(inst);
    cachingTableTopology.set(inst.getType(), list);
   
    fireInstanceAdded(new InstanceEvent(inst));
   
    // This is to clean the current cache
    synchronized (this) {
      currentAreaList = null;
    }
   
    if (logger.isDebugEnabled()) {
      logger.debug("Instance added: {}", inst);
    }
  }

  /**
   * Deletes an instance
   */
  public boolean deleteInstance(Instance inst) {
    ArrayList<Instance> list = cachingTableTopology.get(inst.getType());
    if (list == null)
      return false;

    Iterator<Instance> itr = list.iterator();
    while (itr.hasNext()) {
      if (itr.next().getName().equals(inst.getName())) {
        itr.remove();
        cachingTableTopology.set(inst.getType(), list);
        fireInstanceRemoved(new InstanceEvent(inst));
        // This is to clean the current cache
        currentAreaList = null;
       
        return true;
      }
    }
    return false;
  }

  /**
   * Get all instances info of a type.
   */
  @Override
  public List<Instance> getAllInstancesOfType(String type) {

    if (type == null)
      throw new IllegalArgumentException("Invalid type");

    return cachingTableTopology.get(type);
  }

  /**
   * Get all instances info of a type.
   */
  @Override
  public Set<String> getAllInternalIPsOfType(String type) {

    return this.getInternalIPsOfType(type, null);
  }

  /**
   * Get all instances info of a type in a given area
   */
  @Override
  public Set<String> getInternalIPsOfType(String type, String area) {

    if (type == null)
      throw new IllegalArgumentException("Invalid type");

    Set<String> set = new HashSet<String>();

    ArrayList<Instance> list = cachingTableTopology.get(type);
    if (list != null)
    {
      /* copy now the array to avoid concurency problems. */
      Instance[] tabInstances = new Instance[0];
      tabInstances = list.toArray(tabInstances);

      for (int i = 0; i < tabInstances.length; i++) {
        Instance inst = tabInstances[i];

        if (area == null || area.equals(inst.getArea()))
          set.add(inst.getInternalIP());
      }
    }

    return set;
  }

  /**
   * Get all instances info of a type.
   */
  @Override
  public Set<String> getAllExternalIPsOfType(String type) {

    return this.getExternalIPsOfType(type, null);
  }

  /**
   * Get all instances info of a type in a given area
   */
  @Override
  public Set<String> getExternalIPsOfType(String type, String area) {

    if (type == null)
      throw new IllegalArgumentException("Invalid type");

    Set<String> set = new HashSet<String>();

    ArrayList<Instance> list = cachingTableTopology.get(type);
    if (list != null)
    {
      /* copy now the array to avoid concurency problems. */
      Instance[] tabInstances = new Instance[0];
      tabInstances = list.toArray(tabInstances);

      for (int i = 0; i < tabInstances.length; i++) {
        Instance inst = tabInstances[i];

        if (area == null || area.equals(inst.getArea()))
          set.add(inst.getExternalIP());
      }
    }

    return set;
  }

  /**
   * Get all instances info of a type in a given area.
   */
  @Override
  public List<Instance> getInstancesOfType(String type, String area) {

    if (type == null)
      throw new IllegalArgumentException("Invalid type");

    /* copy now the array to avoid concurency problems. */
    Instance[] tabInstances = new Instance[0];
   
    ArrayList<Instance> tmp = cachingTableTopology.get(type);
    if(tmp != null)
      tabInstances = tmp.toArray(tabInstances);
   
   
    ArrayList<Instance> list = new ArrayList<Instance>();

    for (int i = 0; i < tabInstances.length; i++) {
      Instance inst = tabInstances[i];

      if (area == null || area.equals(inst.getArea()))
        list.add(inst);
    }

    return list;
  }

  /**
   * Get the current area.
   */
  @Override
  public String getMyArea() {

    if (currentArea != null)
      return currentArea;

    Iterator<String> itKeys = cachingTableTopology.getAllKeys().iterator();
   
    while (itKeys.hasNext()) {
      Instance[] tabInstances = new Instance[0];
      tabInstances = cachingTableTopology.get(itKeys.next()).toArray(tabInstances);

      for (int i = 0; i < tabInstances.length; i++) {
        Instance inst = tabInstances[i];

        if (CommonTools.isLocal(inst.getExternalIP()) || CommonTools.isLocal(inst.getInternalIP())) {
          currentArea = inst.getArea();
          return currentArea;
        }
      }
    }

    /* Area not found or error while fetching ip address. */
    return null;
  }

  /**
   * Get all the area available
   */
  @Override
  public synchronized Set<String> getAllAreas() {

    if (currentAreaList != null)
      return currentAreaList;

      currentAreaList = new HashSet<String>();

      Iterator<String> itKeys = cachingTableTopology.getAllKeys().iterator();
      while (itKeys.hasNext()) {

        Instance[] tabInstances = new Instance[0];
        tabInstances = cachingTableTopology.get(itKeys.next()).toArray(tabInstances);

        for (int i = 0; i < tabInstances.length; i++) {
          String area = tabInstances[i].getArea();
          if (area != null && !area.isEmpty())
            currentAreaList.add(area);
        }
      }
    return currentAreaList;
  }

  /**
   * Get all the other areas available
   */
  @Override
  public Set<String> getAllOtherAreas() {

    Set<String> allAreas = this.getAllAreas();
    Set<String> tmp = new HashSet<String>(allAreas);
    String myArea = this.getMyArea();
    tmp.remove(myArea);

    return tmp;
  }

  /**
   * Add listener to any change of the topology in cache
   */
  public void addInstanceTypeListener(IEntryListener<String, ArrayList<Instance>> listener) {
    cachingTableTopology.addEntryListener(listener);
  }

  /**
   * Remove listener.
   */
  public void removeInstanceTypeListener(IEntryListener<String, ArrayList<Instance>> listener) {
    cachingTableTopology.removeEntryListener(listener);
  }

  /**
   * The list of instance listeners
   */
  private LinkedList<IInstanceListener> instanceListeners = new LinkedList<IInstanceListener>();

  /**
   * Add an instance listener
   */
  @Override
  public void addInstanceListener(IInstanceListener listener) {
    instanceListeners.add(listener);
  }

  /**
   * Remove an instance listener
   */
  @Override
  public void removeInstanceListener(IInstanceListener listener) {
    instanceListeners.remove(listener);
  }

  /**
   * Fire the instanceAdded event
   */
  private void fireInstanceAdded(final InstanceEvent event) {
    if (logger.isDebugEnabled()) {
      logger.debug("Firing instance added event for instance:{}", event.getInstance());
    }

    IInstanceListener[] currentListOfListeners = instanceListeners.toArray(new IInstanceListener[]{});
   
    for(IInstanceListener listener : currentListOfListeners) {
      listener.instanceAdded(event);
    }
  }

  /**
   * Fire the instanceRemoved event
   */
  private void fireInstanceRemoved(final InstanceEvent event) {
    if (logger.isDebugEnabled()) {
      logger.debug("Firing instance removed event for instance:{}", event.getInstance());
    }
   
    IInstanceListener[] currentListOfListeners = instanceListeners.toArray(new IInstanceListener[]{});
   
    for(IInstanceListener listener : currentListOfListeners) {
      listener.instanceRemoved(event);
    }
  }

  @Override
  public synchronized void addAreas(Set<String> areas) {         
    if(currentAreaList == null){
      currentAreaList = new HashSet<String>();     
    }
    this.currentAreaList.addAll(areas);
  }
 
  public synchronized void setMyArea(String area) {         
   
    this.currentArea = area;
  }
 
  // If the topology hasn't entered the data cache, this function will
  // NOT return whoIAm, it will return null.  So, if you use this, you
  // MUST include some sort of failover ID for this case
  public Instance whoAmI(String type) {
    if (whoIAm.get(type) != null) {
      return whoIAm.get(type);
    }
    if (logger.isDebugEnabled()) {
      logger.debug("Trying to find my Instance for type: " + type);
   

    List<Instance> instances;
    instances = getAllInstancesOfType(type);
    if (instances == null) {
      if (logger.isDebugEnabled()) {
        logger.debug("Found no Instances of type.");
      }
      return null;
    }
    //add all possible IPs (and hostname) to this map for easy retrieval
    Map<String,Instance> ipToInstance = new HashMap<String,Instance>();
    for (Instance inst : instances) {
      String internalIP = inst.getInternalIP();
      if (!CanonicalizedIpAddress.isValidIp(internalIP)) {
        try {
          internalIP = InetAddress.getByName(internalIP).getHostAddress();
        } catch (UnknownHostException e) {;}
      }
      if (!internalIP.isEmpty()) ipToInstance.put(internalIP, inst);
     
      String externalIP = inst.getExternalIP();
      // if externalIP is a hostname, try to resolve it
      if (!CanonicalizedIpAddress.isValidIp(externalIP)) {
        try {
          externalIP = InetAddress.getByName(externalIP).getHostAddress();
        } catch (UnknownHostException e) {;}
      }
      if (!externalIP.isEmpty()) ipToInstance.put(externalIP, inst);
     
      String externalDNS = inst.getExternalDNS();
      if (!externalDNS.isEmpty()) ipToInstance.put(externalDNS, inst);
    }
   
    //check network interfaces
    Enumeration<NetworkInterface> nets;
    try {
      nets = NetworkInterface.getNetworkInterfaces();
      for (NetworkInterface netint : Collections.list(nets)) {
        if (logger.isDebugEnabled()) {
          logger.debug("Checking Interface " + netint.getDisplayName());
        }
       
        //check ips for each interface
            Enumeration<InetAddress> inetAddresses = netint.getInetAddresses();
            for (InetAddress inetAddress : Collections.list(inetAddresses)) {
              String myIp = inetAddress.getHostAddress();
              if (logger.isDebugEnabled()) {
                logger.debug("Checking IP : " + myIp);
              }
             
              //if there is an instance associated with myIP, that is me
              Instance instance = ipToInstance.get(myIp);
              if (instance != null) {
                whoIAm.put(type, instance);
                if (logger.isDebugEnabled()) {
                  logger.debug("Found an IP match with Instance " + instance.getName());
                }
                return instance;
              }
            }
          }
    } catch (SocketException e) {
      logger.error("Got an exception checking my Network Interfaces for matching IPs: " +
          e.getMessage());
      logger.warn(e.getMessage(), e);
    }
    if (logger.isDebugEnabled()) {
      logger.debug("Could not find an Instance with a matching IP");
    }
    return null;
  }

  @Override
  public void reloadInstanceTopology(Instance toReloadInstance){
    logger.info("Reloading topology for node: {}", toReloadInstance);
   
    String reloadIP = toReloadInstance.getInternalIP();
    if( !currentArea.equals(toReloadInstance.getArea()) ){
      reloadIP = toReloadInstance.getExternalIP();
    }
   
    if (logger.isDebugEnabled()) {
      logger.debug("Reloading topology for ip: {}", reloadIP);
    }
    reloadInstanceTopology(reloadIP);
  }
 
  @Override
  public void reloadInstanceTopology(String ip) {
    cachingTableTopology.reloadSlave(ip);
  }
 
  /**
   * Tells if two {@link Instance} object are in fact same
   * @param a the first instance to compare
   * @param b the second instance to compare
   * @return true if a and b are in fact the same
   */
  public boolean instanceEquals(final Instance a, final Instance b) {
    return a != null && b != null && a.getName() != null && a.getName().equals(b.getName());
  }
}
TOP

Related Classes of com.alu.e3.data.TopologyClient

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.