Package edu.illinois.nchen.guavaListenableFutures

Source Code of edu.illinois.nchen.guavaListenableFutures.GuavaListenableFuturesAnalysisEngine

package edu.illinois.nchen.guavaListenableFutures;

import akka.jsr166y.ForkJoinPool;
import com.google.common.base.Function;
import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.*;
import edu.illinois.nchen.base.IAnalysisEngine;
import edu.illinois.nchen.base.SequentialAnalysisEngine;
import edu.illinois.nchen.base.businessModels.MarketModel;
import edu.illinois.nchen.base.businessModels.MarketRecommendation;
import edu.illinois.nchen.base.businessModels.StockAnalysisCollection;
import edu.illinois.nchen.base.businessModels.StockDataCollection;

import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;

public class GuavaListenableFuturesAnalysisEngine extends SequentialAnalysisEngine {

    private ListeningExecutorService executor;

    public GuavaListenableFuturesAnalysisEngine() {
        this.executor = MoreExecutors.listeningDecorator(new ForkJoinPool());
    }

    // NOTES - Current problems/limitations/inconveniences with the current way:
    // 1) Need to remember to create a ListeningExecutorService
    // 2) Slightly longer method signatures for Function but the input/output types are much clearer
    // 3) Seems to better mimic the original C# example (although lacking in syntax)
    @Override
    public void doAnalysisParallel() throws ExecutionException, InterruptedException {

        final ListenableFuture<StockDataCollection> nyseData = executor.submit(new Callable<StockDataCollection>() {
            @Override
            public StockDataCollection call() throws Exception {
                return loadNyseData();
            }
        });

        final ListenableFuture<StockDataCollection> nasdaqData = executor.submit(new Callable<StockDataCollection>() {
            @Override
            public StockDataCollection call() throws Exception {
                return loadNasdaqData();
            }
        });

        final ListenableFuture<StockDataCollection> mergedMarketData = Futures.transform(Futures.successfulAsList(nyseData, nasdaqData), new Function<List<StockDataCollection>, StockDataCollection>() {
            @Override
            public StockDataCollection apply(final List<StockDataCollection> input) {
                return mergeMarketData(input);
            }
        });

        ListenableFuture<StockDataCollection> normalizedMarketData = Futures.transform(mergedMarketData, new Function<StockDataCollection, StockDataCollection>() {
            @Override
            public StockDataCollection apply(final StockDataCollection input) {
                return normalizeData(input);
            }
        });

        final ListenableFuture<StockDataCollection> fedHistoricalData = executor.submit(new Callable<StockDataCollection>() {
            @Override
            public StockDataCollection call() throws Exception {
                return loadFedHistoricalData();
            }
        });

        final ListenableFuture<StockDataCollection> normalizedHistoricalData = Futures.transform(fedHistoricalData, new Function<StockDataCollection, StockDataCollection>() {
            @Override
            public StockDataCollection apply(final StockDataCollection input) {
                return normalizeData(input);
            }
        });

        final ListenableFuture<StockAnalysisCollection> analyzedStockData = Futures.transform(normalizedMarketData, new Function<StockDataCollection, StockAnalysisCollection>() {
            @Override
            public StockAnalysisCollection apply(final StockDataCollection input) {
                return analyzeData(input);
            }
        });

        ListenableFuture<MarketModel> modeledMarketData = Futures.transform(analyzedStockData, new Function<StockAnalysisCollection, MarketModel>() {
            @Override
            public MarketModel apply(final StockAnalysisCollection input) {
                return runModel(input);
            }
        });

        final ListenableFuture<StockAnalysisCollection> analyzedHistoricalData = Futures.transform(normalizedHistoricalData, new Function<StockDataCollection, StockAnalysisCollection>() {
            @Override
            public StockAnalysisCollection apply(final StockDataCollection input) {
                return analyzeData(input);
            }
        });

        ListenableFuture<MarketModel> modeledHistoricalData = Futures.transform(analyzedHistoricalData, new Function<StockAnalysisCollection, MarketModel>() {
            @Override
            public MarketModel apply(final StockAnalysisCollection input) {
                return runModel(input);
            }
        });

        ListenableFuture<MarketRecommendation> results = Futures.transform(Futures.successfulAsList(modeledMarketData, modeledHistoricalData), new Function<List<MarketModel>, MarketRecommendation>() {
            @Override
            public MarketRecommendation apply(final List<MarketModel> input) {
                return compareModels(input);
            }
        });

        results.get();
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        IAnalysisEngine engine = new GuavaListenableFuturesAnalysisEngine();

        Stopwatch watch;

        System.out.println("==SEQUENTIAL==");
        watch = new Stopwatch().start();
        engine.doAnalysisSequential();
        watch.stop();
        System.out.println(watch.elapsedMillis() + "ms taken.");

        System.out.println("==PARALLEL==");
        watch = new Stopwatch().start();
        engine.doAnalysisParallel();
        watch.stop();
        System.out.println(watch.elapsedMillis() + "ms taken.");
    }
}
TOP

Related Classes of edu.illinois.nchen.guavaListenableFutures.GuavaListenableFuturesAnalysisEngine

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.