Package org.apache.ambari.server.controller.ganglia

Source Code of org.apache.ambari.server.controller.ganglia.GangliaPropertyProvider$ResourceKey

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ambari.server.controller.ganglia;

import org.apache.ambari.server.controller.internal.AbstractPropertyProvider;
import org.apache.ambari.server.configuration.ComponentSSLConfiguration;
import org.apache.ambari.server.controller.internal.PropertyInfo;
import org.apache.ambari.server.controller.spi.*;
import org.apache.ambari.server.controller.utilities.StreamProvider;
import org.apache.http.client.utils.URIBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
* Abstract property provider implementation for a Ganglia source.
*/
public abstract class GangliaPropertyProvider extends AbstractPropertyProvider {

  private final StreamProvider streamProvider;

  private final GangliaHostProvider hostProvider;

  private final String clusterNamePropertyId;

  private final String hostNamePropertyId;

  private final String componentNamePropertyId;

  private final ComponentSSLConfiguration configuration;

  /**
   * Map of Ganglia cluster names keyed by component type.
   */
  static final Map<String, List<String>> GANGLIA_CLUSTER_NAME_MAP = new HashMap<String, List<String>>();

 
  static {
    GANGLIA_CLUSTER_NAME_MAP.put("NAMENODE",           Arrays.asList("HDPNameNode"));
    GANGLIA_CLUSTER_NAME_MAP.put("DATANODE",           Arrays.asList("HDPDataNode", "HDPSlaves"));
    GANGLIA_CLUSTER_NAME_MAP.put("JOBTRACKER",         Arrays.asList("HDPJobTracker"));
    GANGLIA_CLUSTER_NAME_MAP.put("TASKTRACKER",        Arrays.asList("HDPTaskTracker", "HDPSlaves"));
    GANGLIA_CLUSTER_NAME_MAP.put("RESOURCEMANAGER",    Arrays.asList("HDPResourceManager"));
    GANGLIA_CLUSTER_NAME_MAP.put("NODEMANAGER",        Arrays.asList("HDPNodeManager", "HDPSlaves"));
    GANGLIA_CLUSTER_NAME_MAP.put("HISTORYSERVER",      Arrays.asList("HDPHistoryServer"));
    GANGLIA_CLUSTER_NAME_MAP.put("HBASE_MASTER",       Arrays.asList("HDPHBaseMaster"));
    GANGLIA_CLUSTER_NAME_MAP.put("HBASE_REGIONSERVER", Arrays.asList("HDPHBaseRegionServer", "HDPSlaves"));
    GANGLIA_CLUSTER_NAME_MAP.put("FLUME_SERVER",       Arrays.asList("HDPFlumeServer", "HDPSlaves"));
    GANGLIA_CLUSTER_NAME_MAP.put("JOURNALNODE",        Arrays.asList("HDPJournalNode", "HDPSlaves"));
    GANGLIA_CLUSTER_NAME_MAP.put("NIMBUS",             Arrays.asList("HDPNimbus"));
    GANGLIA_CLUSTER_NAME_MAP.put("SUPERVISOR",         Arrays.asList("HDPSupervisor"));
  }

  protected final static Logger LOG =
      LoggerFactory.getLogger(GangliaPropertyProvider.class);

  // ----- Constructors ------------------------------------------------------

  public GangliaPropertyProvider(Map<String, Map<String, PropertyInfo>> componentPropertyInfoMap,
                                 StreamProvider streamProvider,
                                 ComponentSSLConfiguration configuration,
                                 GangliaHostProvider hostProvider,
                                 String clusterNamePropertyId,
                                 String hostNamePropertyId,
                                 String componentNamePropertyId) {

    super(componentPropertyInfoMap);

    this.streamProvider           = streamProvider;
    this.configuration            = configuration;
    this.hostProvider             = hostProvider;
    this.clusterNamePropertyId    = clusterNamePropertyId;
    this.hostNamePropertyId       = hostNamePropertyId;
    this.componentNamePropertyId  = componentNamePropertyId;
  }


  // ----- PropertyProvider --------------------------------------------------

  @Override
  public Set<Resource> populateResources(Set<Resource> resources, Request request, Predicate predicate)
      throws SystemException {

    Set<String> ids = getRequestPropertyIds(request, predicate);
    if (ids.isEmpty()) {
      return resources;
    }

    Set<Resource> keepers = new HashSet<Resource>();

    Map<String, Map<TemporalInfo, RRDRequest>> requestMap = getRRDRequests(resources, request, ids);

    // For each cluster...
    for (Map.Entry<String, Map<TemporalInfo, RRDRequest>> clusterEntry : requestMap.entrySet()) {
      // For each request ...
      for (RRDRequest rrdRequest : clusterEntry.getValue().values() ) {
        //todo: property provider can reduce set of resources
        keepers.addAll(rrdRequest.populateResources());
      }
    }
    //todo: ignoring keepers returned by the provider
    return resources;
  }


  // ----- GangliaPropertyProvider -------------------------------------------

  /**
   * Get the host name for the given resource.
   *
   * @param resource  the resource
   *
   * @return the host name
   */
  protected abstract String getHostName(Resource resource);

  /**
   * Get the component name for the given resource.
   *
   * @param resource  the resource
   *
   * @return the component name
   */
  protected abstract String getComponentName(Resource resource);

  /**
   * Get the ganglia cluster name for the given resource.
   *
   *
   * @param resource  the resource
   *
   * @return the ganglia cluster name
   */
  protected abstract Set<String> getGangliaClusterNames(Resource resource, String clusterName);


  /**
   * Get the component name property id.
   *
   * @return the component name property id
   */
  protected String getComponentNamePropertyId() {
    return componentNamePropertyId;
  }

  /**
   * Get the host name property id.
   *
   * @return the host name property id
   */
  protected String getHostNamePropertyId() {
    return hostNamePropertyId;
  }

  /**
   * Get the stream provider.
   *
   * @return the stream provider
   */
  public StreamProvider getStreamProvider() {
    return streamProvider;
  }


  // ----- helper methods ----------------------------------------------------

  /**
   * Get the request objects containing all the information required to
   * make single requests to the Ganglia rrd script.
   * Requests are created per cluster name / temporal information but
   * can span multiple resources and metrics.
   *
   * @param resources  the resources being populated
   * @param request    the request
   * @param ids        the relevant property ids
   *
   * @return a map of maps of rrd requests keyed by cluster name / temporal info
   */
  private Map<String, Map<TemporalInfo, RRDRequest>> getRRDRequests(Set<Resource> resources,
                                                                    Request request,
                                                                    Set<String> ids) {

    Map<String, Map<TemporalInfo, RRDRequest>> requestMap =
        new HashMap<String, Map<TemporalInfo, RRDRequest>>();

    for (Resource resource : resources) {
      String clusterName = (String) resource.getPropertyValue(clusterNamePropertyId);
      Map<TemporalInfo, RRDRequest> requests = requestMap.get(clusterName);
      if (requests == null) {
        requests = new HashMap<TemporalInfo, RRDRequest>();
        requestMap.put(clusterName, requests);
      }

      Set<String> gangliaClusterNames = getGangliaClusterNames(resource, clusterName);

      for (String gangliaClusterName : gangliaClusterNames) {
        ResourceKey key =
            new ResourceKey(getHostName(resource), gangliaClusterName);

        for (String id : ids) {
          Map<String, PropertyInfo> propertyInfoMap = new HashMap<String, PropertyInfo>();

          Map<String, PropertyInfo> componentMetricMap =
            getComponentMetrics().get(getComponentName(resource));

          // Not all components have metrics
          if (componentMetricMap != null &&
              !componentMetricMap.containsKey(id)) {
            updateComponentMetricMap(componentMetricMap, id);
          }

          getPropertyInfoMap(getComponentName(resource), id, propertyInfoMap);

          for (Map.Entry<String, PropertyInfo> entry : propertyInfoMap.entrySet()) {
            String propertyId = entry.getKey();
            PropertyInfo propertyInfo = entry.getValue();

            TemporalInfo temporalInfo = request.getTemporalInfo(id);

            if ((temporalInfo == null && propertyInfo.isPointInTime()) || (temporalInfo != null && propertyInfo.isTemporal())) {
              RRDRequest rrdRequest = requests.get(temporalInfo);
              if (rrdRequest == null) {
                rrdRequest = new RRDRequest(clusterName, temporalInfo);
                requests.put(temporalInfo, rrdRequest);
              }
              rrdRequest.putResource(key, resource);             
              rrdRequest.putPropertyId(propertyInfo.getPropertyId(), propertyId);
            }
          }
        }
      }
    }
    return requestMap;
  }

  /**
   * Get the spec to locate the Ganglia stream from the given
   * request info.
   *
   * @param clusterName   the cluster name
   * @param clusterSet    the set of ganglia cluster names
   * @param hostSet       the set of host names
   * @param metricSet     the set of metric names
   * @param temporalInfo  the temporal information
   *
   * @return the spec, like http://example.com/path?param1=val1&paramn=valn
   *
   * @throws SystemException if unable to get the Ganglia Collector host name
   */
  private String getSpec(String clusterName,
                         Set<String> clusterSet,
                         Set<String> hostSet,
                         Set<String> metricSet,
                         TemporalInfo temporalInfo) throws SystemException {
   
    String clusters = getSetString(clusterSet, -1);
    String hosts = getSetString(hostSet, -1);
    String metrics = getSetString(metricSet, -1);
   
    URIBuilder uriBuilder = new URIBuilder();

    if (configuration.isGangliaSSL()) {
      uriBuilder.setScheme("https");
    } else {
      uriBuilder.setScheme("http");
    }

    uriBuilder.setHost(hostProvider.getGangliaCollectorHostName(clusterName));

    uriBuilder.setPath("/cgi-bin/rrd.py");
   
    uriBuilder.setParameter("c", clusters);

    if (hosts.length() > 0) {
      uriBuilder.setParameter("h", hosts);
    }

    if (metrics.length() > 0) {
      uriBuilder.setParameter("m", metrics);
    } else {
      // get all metrics
      uriBuilder.setParameter("m", ".*");
    }

    if (temporalInfo != null) {
      long startTime = temporalInfo.getStartTime();
      if (startTime != -1) {
        uriBuilder.setParameter("s", String.valueOf(startTime));
      }

      long endTime = temporalInfo.getEndTime();
      if (endTime != -1) {
        uriBuilder.setParameter("e", String.valueOf(endTime));
      }

      long step = temporalInfo.getStep();
      if (step != -1) {
        uriBuilder.setParameter("r", String.valueOf(step));
      }
    } else {
      uriBuilder.setParameter("e", "now");
      uriBuilder.setParameter("pt", "true");
    }

    return uriBuilder.toString();
  }
 

  /**
   * Get value from the given metric.
   *
   * @param metric      the metric
   * @param isTemporal  indicates whether or not this a temporal metric
   *
   * @return a range of temporal data or a point in time value if not temporal
   */
  private static Object getValue(GangliaMetric metric, boolean isTemporal) {
    Number[][] dataPoints = metric.getDatapoints();

    int length = dataPoints.length;
    if (isTemporal) {
      return length > 0 ? dataPoints : null;
    } else {
      // return the value of the last data point
      return length > 0 ? dataPoints[length - 1][0] : 0;
    }
  }

  /**
   * Get a comma delimited string from the given set of strings or
   * an empty string if the size of the given set is greater than
   * the given limit.
   *
   * @param set    the set of strings
   * @param limit  the upper size limit for the list
   *
   * @return a comma delimited string of strings
   */
  private static String getSetString(Set<String> set, int limit) {
    StringBuilder sb = new StringBuilder();

    if (limit == -1 || set.size() <= limit) {
      for (String cluster : set) {
        if (sb.length() > 0) {
          sb.append(",");
        }
        sb.append(cluster);
      }
    }
    return sb.toString();
  }


  // ----- inner classes -----------------------------------------------------


  // ----- RRDRequest ----------------------------------------------------

  /**
   * The information required to make a single RRD request.
   */
  private class RRDRequest {
    private static final int POPULATION_TIME_UPPER_LIMIT = 5;
    private final String clusterName;
    private final TemporalInfo temporalInfo;
    private final Map<ResourceKey, Set<Resource>> resources = new HashMap<ResourceKey, Set<Resource>>();
    private final Map<String, Set<String>> metrics = new HashMap<String, Set<String>>();
    private final Set<String> clusterSet = new HashSet<String>();
    private final Set<String> hostSet = new HashSet<String>();


    private RRDRequest(String clusterName, TemporalInfo temporalInfo) {
      this.clusterName  = clusterName;
      this.temporalInfo = temporalInfo;
    }

    public void putResource(ResourceKey key, Resource resource) {
      clusterSet.add(key.getClusterName());
      hostSet.add(key.getHostName());
      Set<Resource> resourceSet = resources.get(key);
      if (resourceSet == null) {
        resourceSet = new HashSet<Resource>();
        resources.put(key, resourceSet);
      }
      resourceSet.add(resource);
    }

    public void putPropertyId(String metric, String id) {
      Set<String> propertyIds = metrics.get(metric);

      if (propertyIds == null) {
        propertyIds = new HashSet<String>();
        metrics.put(metric, propertyIds);
      }
      propertyIds.add(id);
    }

    /**
     * Populate the associated resources by making the rrd request.
     *
     * @return a collection of populated resources
     *
     * @throws SystemException if unable to populate the resources
     */
    public Collection<Resource> populateResources() throws SystemException {

      //Get full url with parameters
      String specWithParams = getSpec(clusterName, clusterSet, hostSet, metrics.keySet(), temporalInfo);
     
      //URL
      String spec = null;
      //Parameters
      String params = null;
     
      String[] tokens = specWithParams.split("\\?", 2);

      try {
        spec = tokens[0];
        params = tokens[1];
      } catch (ArrayIndexOutOfBoundsException e) {
        LOG.info(e.toString());
      }
     

      BufferedReader reader = null;
      try {
       
        //Check if host is live
        if (!hostProvider.isGangliaCollectorHostLive(clusterName)) {
          LOG.info("Ganglia host is not live");
          return Collections.emptySet();
        }

        //Check if Ganglia server component is live
        if (!hostProvider.isGangliaCollectorComponentLive(clusterName)) {
          LOG.info("Ganglia server component is not live");
          return Collections.emptySet();
        }

        reader = new BufferedReader(new InputStreamReader(
            getStreamProvider().readFrom(spec, "POST", params)));

        String feedStart = reader.readLine();
        if (feedStart == null || feedStart.isEmpty()) {
          LOG.info("Empty feed while getting ganglia metrics for spec => "+
            spec);
          return Collections.emptySet();
        }
        int startTime = convertToNumber(feedStart).intValue();

        String dsName = reader.readLine();
        if (dsName == null || dsName.isEmpty()) {
          LOG.info("Feed without body while reading ganglia metrics for spec " +
            "=> " + spec);
          return Collections.emptySet();
        }

        while(!"[~EOF]".equals(dsName)) {
          GangliaMetric metric = new GangliaMetric();
          List<GangliaMetric.TemporalMetric> listTemporalMetrics =
              new ArrayList<GangliaMetric.TemporalMetric>();

          metric.setDs_name(dsName);
          metric.setCluster_name(reader.readLine());
          metric.setHost_name(reader.readLine());
          metric.setMetric_name(reader.readLine());

          int time = convertToNumber(reader.readLine()).intValue();
          int step = convertToNumber(reader.readLine()).intValue();

          String val     = reader.readLine();
          String lastVal = null;

          while(val!=null && !"[~EOM]".equals(val)) {
            if (val.startsWith("[~r]")) {
              Integer repeat = Integer.valueOf(val.substring(4)) - 1;
              for (int i = 0; i < repeat; ++i) {
                if (! "[~n]".equals(lastVal)) {
                  GangliaMetric.TemporalMetric tm = new GangliaMetric.TemporalMetric(lastVal, time);
                  if (tm.isValid()) listTemporalMetrics.add(tm);
                }
                time += step;
              }
            } else {
              if (! "[~n]".equals(val)) {
                GangliaMetric.TemporalMetric tm = new GangliaMetric.TemporalMetric(val, time);
                if (tm.isValid()) listTemporalMetrics.add(tm);
              }
              time += step;
            }
            lastVal = val;
            val = reader.readLine();
          }

          metric.setDatapointsFromList(listTemporalMetrics);

          ResourceKey key = new ResourceKey(metric.getHost_name(), metric.getCluster_name());
          Set<Resource> resourceSet = resources.get(key);
          if (resourceSet != null) {
            for (Resource resource : resourceSet) {
              populateResource(resource, metric);
            }
          }

          dsName = reader.readLine();
          if (dsName == null || dsName.isEmpty()) {
            LOG.info("Unexpected end of stream reached while getting ganglia " +
              "metrics for spec => " + spec);
            return Collections.emptySet();
          }
        }
        String feedEnd = reader.readLine();
        if (feedEnd == null || feedEnd.isEmpty()) {
          LOG.info("Error reading end of feed while getting ganglia metrics " +
            "for spec => " + spec);
        } else {

          int endTime = convertToNumber(feedEnd).intValue();
          int totalTime = endTime - startTime;
          if (LOG.isInfoEnabled() && totalTime > POPULATION_TIME_UPPER_LIMIT) {
            LOG.info("Ganglia resource population time: " + totalTime);
          }
        }
      } catch (IOException e) {
        if (LOG.isErrorEnabled()) {
          LOG.error("Caught exception getting Ganglia metrics : spec=" + spec, e);
        }
      } finally {
        if (reader != null) {
          try {
            reader.close();
          } catch (IOException e) {
            if (LOG.isWarnEnabled()) {
              LOG.warn("Unable to close http input steam : spec=" + spec, e);
            }
          }
        }
      }
      //todo: filter out resources and return keepers
      return Collections.emptySet();
    }


    /**
     * Populate the given resource with the given Ganglia metric.
     *
     * @param resource       the resource
     * @param gangliaMetric  the Ganglia metrics
     */
    private void populateResource(Resource resource, GangliaMetric gangliaMetric) {
      String metric_name = gangliaMetric.getMetric_name();

      Set<String> propertyIdSet = metrics.get(metric_name);
      List<String> parameterList  = new LinkedList<String>();

      if (propertyIdSet == null) {
        for (Map.Entry<String, Set<String>> entry : metrics.entrySet()) {

          String key = entry.getKey();

          Pattern pattern = Pattern.compile(key);
          Matcher matcher = pattern.matcher(metric_name);

          if (matcher.matches()) {
            propertyIdSet = entry.getValue();
            // get parameters
            for (int i = 0; i < matcher.groupCount(); ++i) {
              parameterList.add(matcher.group(i + 1));
            }
            break;
          }
        }
      }
      if (propertyIdSet != null) {
        Map<String, PropertyInfo> metricsMap = getComponentMetrics().get(getComponentName(resource));
        if (metricsMap != null) {
          for (String propertyId : propertyIdSet) {
            if (propertyId != null) {
              if (metricsMap.containsKey(propertyId)){
                if (containsArguments(propertyId)) {
                  int i = 1;
                  for (String param : parameterList) {
                    propertyId = substituteArgument(propertyId, "$" + i, param);
                    ++i;
                  }
                }
                Object value = getValue(gangliaMetric, temporalInfo != null);
                if (value != null) {
                  resource.setProperty(propertyId, value);
                }
              }
            }
          }
        }
      }
    }

    private Number convertToNumber(String s) {
      return s.contains(".") ? Double.parseDouble(s) : Long.parseLong(s);
    }
  }


  // ----- ResourceKey ---------------------------------------------------

  /**
   * Key used to associate information from a Ganglia metric to a resource.
   */
  private static class ResourceKey {
    private final String hostName;
    private final String gangliaClusterName;

    private ResourceKey(String hostName, String gangliaClusterName) {
      this.hostName           = hostName;
      this.gangliaClusterName = gangliaClusterName;
    }

    public String getHostName() {
      return hostName;
    }

    public String getClusterName() {
      return gangliaClusterName;
    }

    @Override
    public boolean equals(Object o) {
      if (this == o) return true;
      if (o == null || getClass() != o.getClass()) return false;

      ResourceKey that = (ResourceKey) o;

      return
          !(gangliaClusterName != null ? !gangliaClusterName.equals(that.gangliaClusterName) : that.gangliaClusterName != null) &&
          !(hostName != null ? !hostName.equals(that.hostName) : that.hostName != null);

    }

    @Override
    public int hashCode() {
      int result = hostName != null ? hostName.hashCode() : 0;
      result = 31 * result + (gangliaClusterName != null ? gangliaClusterName.hashCode() : 0);
      return result;
    }
  }
}
TOP

Related Classes of org.apache.ambari.server.controller.ganglia.GangliaPropertyProvider$ResourceKey

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.