Package org.apache.crunch.impl.mr.plan

Source Code of org.apache.crunch.impl.mr.plan.MSCRPlanner$NodeVisitor

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.crunch.impl.mr.plan;

import java.io.IOException;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;

import org.apache.crunch.Source;
import org.apache.crunch.SourceTarget;
import org.apache.crunch.Target;
import org.apache.crunch.impl.mr.MRPipeline;
import org.apache.crunch.impl.mr.collect.DoCollectionImpl;
import org.apache.crunch.impl.mr.collect.DoTableImpl;
import org.apache.crunch.impl.mr.collect.InputCollection;
import org.apache.crunch.impl.mr.collect.PCollectionImpl;
import org.apache.crunch.impl.mr.collect.PGroupedTableImpl;
import org.apache.crunch.impl.mr.collect.UnionCollection;
import org.apache.crunch.impl.mr.exec.MRExecutor;
import org.apache.hadoop.conf.Configuration;

import com.google.common.collect.HashMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;

public class MSCRPlanner {

  // Used to ensure that we always build pipelines starting from the deepest
  // outputs, which
  // helps ensure that we handle intermediate outputs correctly.
  private static final Comparator<PCollectionImpl<?>> DEPTH_COMPARATOR = new Comparator<PCollectionImpl<?>>() {
    @Override
    public int compare(PCollectionImpl<?> left, PCollectionImpl<?> right) {
      int cmp = right.getDepth() - left.getDepth();
      if (cmp == 0) {
        // Ensure we don't throw away two output collections at the same depth.
        // Using the collection name would be nicer here, but names aren't
        // necessarily unique
        cmp = new Integer(right.hashCode()).compareTo(left.hashCode());
      }
      return cmp;
    }
  };

  private final MRPipeline pipeline;
  private final Map<PCollectionImpl<?>, Set<Target>> outputs;

  public MSCRPlanner(MRPipeline pipeline, Map<PCollectionImpl<?>, Set<Target>> outputs) {
    this.pipeline = pipeline;
    this.outputs = new TreeMap<PCollectionImpl<?>, Set<Target>>(DEPTH_COMPARATOR);
    this.outputs.putAll(outputs);
  }

  public MRExecutor plan(Class<?> jarClass, Configuration conf) throws IOException {
    // Constructs all of the node paths, which either start w/an input
    // or a GBK and terminate in an output collection of any type.
    NodeVisitor visitor = new NodeVisitor();
    for (PCollectionImpl<?> output : outputs.keySet()) {
      visitor.visitOutput(output);
    }

    // Pull out the node paths.
    Map<PCollectionImpl<?>, Set<NodePath>> nodePaths = visitor.getNodePaths();

    // Keeps track of the dependencies from collections -> jobs and then
    // between different jobs.
    Map<PCollectionImpl<?>, JobPrototype> assignments = Maps.newHashMap();
    Map<PCollectionImpl<?>, Set<JobPrototype>> jobDependencies = new HashMap<PCollectionImpl<?>, Set<JobPrototype>>();

    // Find the set of GBKs that DO NOT depend on any other GBK.
    Set<PGroupedTableImpl<?, ?>> workingGroupings = null;
    while (!(workingGroupings = getWorkingGroupings(nodePaths)).isEmpty()) {

      for (PGroupedTableImpl<?, ?> grouping : workingGroupings) {
        Set<NodePath> mapInputPaths = nodePaths.get(grouping);
        JobPrototype proto = JobPrototype.createMapReduceJob(grouping, mapInputPaths, pipeline.createTempPath());
        assignments.put(grouping, proto);
        if (jobDependencies.containsKey(grouping)) {
          for (JobPrototype dependency : jobDependencies.get(grouping)) {
            proto.addDependency(dependency);
          }
        }
      }

      Map<PGroupedTableImpl<?, ?>, Set<NodePath>> dependencyPaths = getDependencyPaths(workingGroupings, nodePaths);
      for (Map.Entry<PGroupedTableImpl<?, ?>, Set<NodePath>> entry : dependencyPaths.entrySet()) {
        PGroupedTableImpl<?, ?> grouping = entry.getKey();
        Set<NodePath> currentNodePaths = entry.getValue();

        JobPrototype proto = assignments.get(grouping);
        Set<NodePath> gbkPaths = Sets.newHashSet();
        for (NodePath nodePath : currentNodePaths) {
          PCollectionImpl<?> tail = nodePath.tail();
          if (tail instanceof PGroupedTableImpl) {
            gbkPaths.add(nodePath);
            if (!jobDependencies.containsKey(tail)) {
              jobDependencies.put(tail, Sets.<JobPrototype> newHashSet());
            }
            jobDependencies.get(tail).add(proto);
          }
        }

        if (!gbkPaths.isEmpty()) {
          handleGroupingDependencies(gbkPaths, currentNodePaths);
        }

        // At this point, all of the dependencies for the working groups will be
        // file outputs, and so we can add them all to the JobPrototype-- we now
        // have
        // a complete job.
        HashMultimap<Target, NodePath> reduceOutputs = HashMultimap.create();
        for (NodePath nodePath : currentNodePaths) {
          assignments.put(nodePath.tail(), proto);
          for (Target target : outputs.get(nodePath.tail())) {
            reduceOutputs.put(target, nodePath);
          }
        }
        proto.addReducePaths(reduceOutputs);

        // We've processed this GBK-- remove it from the set of nodePaths we
        // need to process in the next step.
        nodePaths.remove(grouping);
      }
    }

    // Process any map-only jobs that are remaining.
    if (!nodePaths.isEmpty()) {
      for (Map.Entry<PCollectionImpl<?>, Set<NodePath>> entry : nodePaths.entrySet()) {
        PCollectionImpl<?> collect = entry.getKey();
        if (!assignments.containsKey(collect)) {
          HashMultimap<Target, NodePath> mapOutputs = HashMultimap.create();
          for (NodePath nodePath : entry.getValue()) {
            for (Target target : outputs.get(nodePath.tail())) {
              mapOutputs.put(target, nodePath);
            }
          }
          JobPrototype proto = JobPrototype.createMapOnlyJob(mapOutputs, pipeline.createTempPath());

          if (jobDependencies.containsKey(collect)) {
            for (JobPrototype dependency : jobDependencies.get(collect)) {
              proto.addDependency(dependency);
            }
          }
          assignments.put(collect, proto);
        }
      }
    }

    MRExecutor exec = new MRExecutor(jarClass);
    for (JobPrototype proto : Sets.newHashSet(assignments.values())) {
      exec.addJob(proto.getCrunchJob(jarClass, conf, pipeline));
    }
    return exec;
  }

  private Map<PGroupedTableImpl<?, ?>, Set<NodePath>> getDependencyPaths(Set<PGroupedTableImpl<?, ?>> workingGroupings,
      Map<PCollectionImpl<?>, Set<NodePath>> nodePaths) {
    Map<PGroupedTableImpl<?, ?>, Set<NodePath>> dependencyPaths = Maps.newHashMap();
    for (PGroupedTableImpl<?, ?> grouping : workingGroupings) {
      dependencyPaths.put(grouping, Sets.<NodePath> newHashSet());
    }

    // Find the targets that depend on one of the elements of the current
    // working group.
    for (PCollectionImpl<?> target : nodePaths.keySet()) {
      if (!workingGroupings.contains(target)) {
        for (NodePath nodePath : nodePaths.get(target)) {
          if (workingGroupings.contains(nodePath.head())) {
            dependencyPaths.get(nodePath.head()).add(nodePath);
          }
        }
      }
    }
    return dependencyPaths;
  }

  private int getSplitIndex(Set<NodePath> currentNodePaths) {
    List<Iterator<PCollectionImpl<?>>> iters = Lists.newArrayList();
    for (NodePath nodePath : currentNodePaths) {
      Iterator<PCollectionImpl<?>> iter = nodePath.iterator();
      iter.next(); // prime this past the initial NGroupedTableImpl
      iters.add(iter);
    }

    // Find the lowest point w/the lowest cost to be the split point for
    // all of the dependent paths.
    boolean end = false;
    int splitIndex = -1;
    while (!end) {
      splitIndex++;
      PCollectionImpl<?> current = null;
      for (Iterator<PCollectionImpl<?>> iter : iters) {
        if (iter.hasNext()) {
          PCollectionImpl<?> next = iter.next();
          if (next instanceof PGroupedTableImpl) {
            end = true;
            break;
          } else if (current == null) {
            current = next;
          } else if (current != next) {
            end = true;
            break;
          }
        } else {
          end = true;
          break;
        }
      }
    }
    // TODO: Add costing calcs here.
    return splitIndex;
  }

  private void handleGroupingDependencies(Set<NodePath> gbkPaths, Set<NodePath> currentNodePaths) throws IOException {
    int splitIndex = getSplitIndex(currentNodePaths);
    PCollectionImpl<?> splitTarget = currentNodePaths.iterator().next().get(splitIndex);
    if (!outputs.containsKey(splitTarget)) {
      outputs.put(splitTarget, Sets.<Target> newHashSet());
    }

    SourceTarget srcTarget = null;
    Target targetToReplace = null;
    for (Target t : outputs.get(splitTarget)) {
      if (t instanceof SourceTarget) {
        srcTarget = (SourceTarget<?>) t;
        break;
      } else {
        srcTarget = t.asSourceTarget(splitTarget.getPType());
        if (srcTarget != null) {
          targetToReplace = t;
          break;
        }
      }
    }
    if (targetToReplace != null) {
      outputs.get(splitTarget).remove(targetToReplace);
    } else if (srcTarget == null) {
      srcTarget = pipeline.createIntermediateOutput(splitTarget.getPType());
    }
    outputs.get(splitTarget).add(srcTarget);
    splitTarget.materializeAt(srcTarget);

    PCollectionImpl<?> inputNode = (PCollectionImpl<?>) pipeline.read(srcTarget);
    Set<NodePath> nextNodePaths = Sets.newHashSet();
    for (NodePath nodePath : currentNodePaths) {
      if (gbkPaths.contains(nodePath)) {
        nextNodePaths.add(nodePath.splitAt(splitIndex, inputNode));
      } else {
        nextNodePaths.add(nodePath);
      }
    }
    currentNodePaths.clear();
    currentNodePaths.addAll(nextNodePaths);
  }

  private Set<PGroupedTableImpl<?, ?>> getWorkingGroupings(Map<PCollectionImpl<?>, Set<NodePath>> nodePaths) {
    Set<PGroupedTableImpl<?, ?>> gbks = Sets.newHashSet();
    for (PCollectionImpl<?> target : nodePaths.keySet()) {
      if (target instanceof PGroupedTableImpl) {
        boolean hasGBKDependency = false;
        for (NodePath nodePath : nodePaths.get(target)) {
          if (nodePath.head() instanceof PGroupedTableImpl) {
            hasGBKDependency = true;
            break;
          }
        }
        if (!hasGBKDependency) {
          gbks.add((PGroupedTableImpl<?, ?>) target);
        }
      }
    }
    return gbks;
  }

  private static class NodeVisitor implements PCollectionImpl.Visitor {

    private final Map<PCollectionImpl<?>, Set<NodePath>> nodePaths;
    private final Map<PCollectionImpl<?>, Source<?>> inputs;
    private PCollectionImpl<?> workingNode;
    private NodePath workingPath;

    public NodeVisitor() {
      this.nodePaths = new HashMap<PCollectionImpl<?>, Set<NodePath>>();
      this.inputs = new HashMap<PCollectionImpl<?>, Source<?>>();
    }

    public Map<PCollectionImpl<?>, Set<NodePath>> getNodePaths() {
      return nodePaths;
    }

    public void visitOutput(PCollectionImpl<?> output) {
      nodePaths.put(output, Sets.<NodePath> newHashSet());
      workingNode = output;
      workingPath = new NodePath();
      output.accept(this);
    }

    @Override
    public void visitInputCollection(InputCollection<?> collection) {
      workingPath.close(collection);
      inputs.put(collection, collection.getSource());
      nodePaths.get(workingNode).add(workingPath);
    }

    @Override
    public void visitUnionCollection(UnionCollection<?> collection) {
      PCollectionImpl<?> baseNode = workingNode;
      NodePath basePath = workingPath;
      for (PCollectionImpl<?> parent : collection.getParents()) {
        workingPath = new NodePath(basePath);
        workingNode = baseNode;
        processParent(parent);
      }
    }

    @Override
    public void visitDoFnCollection(DoCollectionImpl<?> collection) {
      workingPath.push(collection);
      processParent(collection.getOnlyParent());
    }

    @Override
    public void visitDoTable(DoTableImpl<?, ?> collection) {
      workingPath.push(collection);
      processParent(collection.getOnlyParent());
    }

    @Override
    public void visitGroupedTable(PGroupedTableImpl<?, ?> collection) {
      workingPath.close(collection);
      nodePaths.get(workingNode).add(workingPath);
      workingNode = collection;
      nodePaths.put(workingNode, Sets.<NodePath> newHashSet());
      workingPath = new NodePath(collection);
      processParent(collection.getOnlyParent());
    }

    private void processParent(PCollectionImpl<?> parent) {
      if (!nodePaths.containsKey(parent)) {
        parent.accept(this);
      } else {
        workingPath.close(parent);
        nodePaths.get(workingNode).add(workingPath);
      }
    }
  }
}
TOP

Related Classes of org.apache.crunch.impl.mr.plan.MSCRPlanner$NodeVisitor

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.