Package com.google.appengine.demos.mapreduce.randomcollisions

Source Code of com.google.appengine.demos.mapreduce.randomcollisions.UsingPipelineServlet

package com.google.appengine.demos.mapreduce.randomcollisions;

import static com.google.appengine.demos.mapreduce.randomcollisions.CollisionFindingServlet.createMapReduceSpec;
import static com.google.appengine.demos.mapreduce.randomcollisions.CollisionFindingServlet.getBucketParam;
import static com.google.appengine.demos.mapreduce.randomcollisions.CollisionFindingServlet.getLongParam;
import static com.google.appengine.demos.mapreduce.randomcollisions.CollisionFindingServlet.getSettings;
import static com.google.appengine.demos.mapreduce.randomcollisions.CollisionFindingServlet.getStringParam;

import com.google.appengine.tools.cloudstorage.GcsFilename;
import com.google.appengine.tools.mapreduce.GoogleCloudStorageFileSet;
import com.google.appengine.tools.mapreduce.MapReduceJob;
import com.google.appengine.tools.mapreduce.MapReduceResult;
import com.google.appengine.tools.mapreduce.MapReduceSettings;
import com.google.appengine.tools.mapreduce.MapReduceSpecification;
import com.google.appengine.tools.pipeline.FutureValue;
import com.google.appengine.tools.pipeline.Job0;
import com.google.appengine.tools.pipeline.Job1;
import com.google.appengine.tools.pipeline.JobSetting;
import com.google.appengine.tools.pipeline.PipelineService;
import com.google.appengine.tools.pipeline.PipelineServiceFactory;
import com.google.appengine.tools.pipeline.Value;
import com.google.common.primitives.Ints;

import java.io.IOException;
import java.util.ArrayList;
import java.util.logging.Logger;

import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

/**
* This is an alternative to {@link CollisionFindingServlet} that uses Pipelines to start the
* MapReduce.
*
* This example shows running a follow up task after the MapReduce has run.
*/
public class UsingPipelineServlet extends HttpServlet {
  private static final long serialVersionUID = -8159877366348539461L;

  private static final Logger LOG = Logger.getLogger(UsingPipelineServlet.class.getName());

  /**
   * A two step pipeline: 1. Run a MapReduce 2. Run LogFileNamesJob with the output of the
   * MapReduce.
   */
  private static class MyPipelineJob extends Job0<Void> {
    private static final long serialVersionUID = 1954542676168374323L;

    private final String bucket;
    private final long start;
    private final long limit;
    private final int shards;

    MyPipelineJob(String bucket, long start, long limit, int shards) {
      this.bucket = bucket;
      this.start = start;
      this.limit = limit;
      this.shards = shards;
    }

    @Override
    public Value<Void> run() throws Exception {
      MapReduceSpecification<Long, Integer, Integer, ArrayList<Integer>, GoogleCloudStorageFileSet>
      spec = createMapReduceSpec(bucket, start, limit, shards);
      MapReduceSettings settings = getSettings(bucket, null, null);
      // [START start_as_pipeline]
      FutureValue<MapReduceResult<GoogleCloudStorageFileSet>> mapReduceResult =
          futureCall(new MapReduceJob<>(spec, settings));
      // [END start_as_pipeline]
      return futureCall(new LogFileNamesJob(), mapReduceResult);
    }
  }

  private static class LogFileNamesJob extends
      Job1<Void, MapReduceResult<GoogleCloudStorageFileSet>> {
    private static final long serialVersionUID = 6277239748168293296L;

    @Override
    public Value<Void> run(MapReduceResult<GoogleCloudStorageFileSet> result) {
      for (GcsFilename name : result.getOutputResult().getFiles()) {
        LOG.info("Output stored to file: " + name);
      }
      return null;
    }
  }

  @Override
  public void doPost(HttpServletRequest req, HttpServletResponse resp) throws IOException {
    String queue = getStringParam(req, "queue", "mapreduce-workers");
    String module = getStringParam(req, "module", "mapreduce");
    String bucket = getBucketParam(req);
    long start = getLongParam(req, "start", 0);
    long limit = getLongParam(req, "limit", 100 * 1000 * 1000);
    int shards = Math.max(1, Math.min(100, Ints.saturatedCast(getLongParam(req, "shards", 30))));
    PipelineService service = PipelineServiceFactory.newPipelineService();
    String pipelineId = service.startNewPipeline(new MyPipelineJob(bucket, start, limit, shards),
        new JobSetting.OnQueue(queue), new JobSetting.OnModule(module));
    resp.sendRedirect("/_ah/pipeline/status.html?root=" + pipelineId);
  }
}
TOP

Related Classes of com.google.appengine.demos.mapreduce.randomcollisions.UsingPipelineServlet

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.