Package com.tdunning.plume.local.lazy

Source Code of com.tdunning.plume.local.lazy.LocalExecutor

/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements.  See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.tdunning.plume.local.lazy;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.tdunning.plume.DoFn;
import com.tdunning.plume.EmitFn;
import com.tdunning.plume.PCollection;
import com.tdunning.plume.Pair;
import com.tdunning.plume.local.lazy.op.DeferredOp;
import com.tdunning.plume.local.lazy.op.Flatten;
import com.tdunning.plume.local.lazy.op.GroupByKey;
import com.tdunning.plume.local.lazy.op.MultipleParallelDo;
import com.tdunning.plume.local.lazy.op.ParallelDo;

/**
* Dummy local executor that goes down-top by using recursive formulas and stores all intermediate results in-memory.
*/
public class LocalExecutor {

  /**
   * Execute one-output flow
   *
   * @param <T>
   * @param output
   * @return
   */
  @SuppressWarnings({ "unchecked", "rawtypes" })
  public <T> Iterable<T> execute(LazyCollection<T> output) {
    if (output.isMaterialized()) {
      return output.getData(); // nothing else to execute
    } else {
      DeferredOp op = output.getDeferredOp();
      final List<T> result = Lists.newArrayList();
      // Flatten op
      if(op instanceof Flatten) {
        Flatten<T> flatten = (Flatten<T>)op;
        for(PCollection<T> col: flatten.getOrigins()) {
          Iterable<T> res = execute((LazyCollection<T>) col );
          result.addAll(Lists.newArrayList(res));
        }
        return result; // done with it
      }
      Iterable parent;
      EmitFn<T> emitter = new EmitFn<T>() {
        @Override
        public void emit(T v) {
          result.add(v);
        }
      };
      // ParallelDo
      if (op instanceof ParallelDo) {
        ParallelDo pDo = (ParallelDo) op;
        parent = execute((LazyCollection)pDo.getOrigin());
        for (Object obj : parent) {
          pDo.getFunction().process(obj, emitter);
        }
      // MultipleParallelDo -> parallel operations that read the same collection
      // In this version of executor, we will only compute the current collection, not its neighbors
      } else if(op instanceof MultipleParallelDo) {
        MultipleParallelDo mPDo = (MultipleParallelDo) op;
        parent = execute((LazyCollection)mPDo.getOrigin());
        DoFn function = (DoFn)mPDo.getDests().get(output); // get the function that corresponds to this collection
        for (Object obj : parent) {
          function.process(obj, emitter);
        }
      // GroupByKey
      } else if(op instanceof GroupByKey) {
        GroupByKey gBK = (GroupByKey) op;
        parent = execute(gBK.getOrigin());
        Map<Object, List> groupMap = Maps.newHashMap();
        // Perform in-memory group by operation
        for (Object obj : parent) {
          Pair p = (Pair)obj;
          List list = groupMap.get(p.getKey());
          if(list == null) {
            list = new ArrayList();
          }
          list.add(p.getValue());
          groupMap.put(p.getKey(), list);
        }
        for (Map.Entry<Object, List> entry: groupMap.entrySet()) {
          result.add((T)new Pair(entry.getKey(), entry.getValue()));
        }
      }
      return result;
    }
  }
}
TOP

Related Classes of com.tdunning.plume.local.lazy.LocalExecutor

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.