Package org.apache.flink.streaming.examples.ml

Source Code of org.apache.flink.streaming.examples.ml.IncrementalLearningSkeleton$Predictor

/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements.  See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License.  You may obtain a copy of the License at
*
*    http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.examples.ml;

import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.co.CoMapFunction;
import org.apache.flink.streaming.api.function.source.SourceFunction;
import org.apache.flink.util.Collector;

public class IncrementalLearningSkeleton {

  // Source for feeding new data for prediction
  public static class NewDataSource implements SourceFunction<Integer> {
    private static final long serialVersionUID = 1L;

    @Override
    public void invoke(Collector<Integer> collector) throws Exception {
      while (true) {
        collector.collect(getNewData());
      }
    }

    // Method for pulling new data for prediction
    private Integer getNewData() throws InterruptedException {
      Thread.sleep(1000);
      return 1;
    }
  }

  // Source for feeding new training data for partial model building
  public static class TrainingDataSource implements SourceFunction<Integer> {
    private static final long serialVersionUID = 1L;

    @Override
    public void invoke(Collector<Integer> collector) throws Exception {

      while (true) {
        collector.collect(getTrainingData());
      }

    }

    // Method for pulling new training data
    private Integer getTrainingData() throws InterruptedException {
      Thread.sleep(1000);
      return 1;

    }
  }

  // Task for building up-to-date partial models on new training data
  public static class PartialModelBuilder implements GroupReduceFunction<Integer, Double[]> {
    private static final long serialVersionUID = 1L;

    // Method for building partial model on the grouped training data
    protected Double[] buildPartialModel(Iterable<Integer> values) {
      return new Double[] { 1. };
    }

    @Override
    public void reduce(Iterable<Integer> values, Collector<Double[]> out) throws Exception {
      out.collect(buildPartialModel(values));
    }
  }

  // Task for performing prediction using the model produced in
  // batch-processing and the up-to-date partial model
  public static class Predictor implements CoMapFunction<Integer, Double[], Integer> {
    private static final long serialVersionUID = 1L;

    Double[] batchModel = null;
    Double[] partialModel = null;

    @Override
    public Integer map1(Integer value) {
      // Return prediction
      return predict(value);
    }

    @Override
    public Integer map2(Double[] value) {
      // Update model
      partialModel = value;
      batchModel = getBatchModel();
      return 1;
    }

    // Pulls model built with batch-job on the old training data
    protected Double[] getBatchModel() {
      return new Double[] { 0. };
    }

    // Performs prediction using the two models
    protected Integer predict(Integer inTuple) {
      return 0;
    }

  }

  private static final int PARALLELISM = 1;
  private static final int SOURCE_PARALLELISM = 1;

  public static void main(String[] args) throws Exception {

    StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(
        PARALLELISM).setBufferTimeout(1000);

    // Build new model on every second of new data
    DataStream<Double[]> model = env.addSource(new TrainingDataSource(), SOURCE_PARALLELISM)
        .window(5000).reduceGroup(new PartialModelBuilder());

    // Use partial model for prediction
    DataStream<Integer> prediction = env.addSource(new NewDataSource(), SOURCE_PARALLELISM)
        .connect(model).map(new Predictor());

    prediction.print();

    env.execute();
  }



}
TOP

Related Classes of org.apache.flink.streaming.examples.ml.IncrementalLearningSkeleton$Predictor

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.