Package org.apache.mahout.h2obindings

Source Code of org.apache.mahout.h2obindings.MRTaskNonZero

/*
*  Licensed to the Apache Software Foundation (ASF) under one or more
*  contributor license agreements.  See the NOTICE file distributed with
*  this work for additional information regarding copyright ownership.
*  The ASF licenses this file to You under the Apache License, Version 2.0
*  (the "License"); you may not use this file except in compliance with
*  the License.  You may obtain a copy of the License at
*
*      http://www.apache.org/licenses/LICENSE-2.0
*
*  Unless required by applicable law or agreed to in writing, software
*  distributed under the License is distributed on an "AS IS" BASIS,
*  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*  See the License for the specific language governing permissions and
*  limitations under the License.
*/

package org.apache.mahout.h2obindings;

import org.apache.mahout.math.Matrix;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.DenseMatrix;
import org.apache.mahout.math.SparseMatrix;
import org.apache.mahout.math.DenseVector;

import water.MRTask;
import water.Futures;
import water.fvec.Frame;
import water.fvec.Vec;
import water.fvec.Chunk;
import water.parser.ValueString;
import water.util.ArrayUtils;

import java.util.Map;
import java.util.HashMap;

import org.apache.mahout.h2obindings.drm.H2ODrm;

/**
* Collection of helper methods for H2O backend.
*/
public class H2OHelper {
  /**
   * Predicate to check if data is sparse in Frame.
   *
   * If the number of missing elements is 32x times the number of present
   * elements, consider it as sparse.
   *
   * @param frame Frame storing matrix data.
   * @return True if data is sparse in Frame.
   */
  public static boolean isSparse(Frame frame) {
    long rows = frame.numRows();
    long cols = frame.numCols();

    /**
     * MRTask to aggregate precalculated per-chunk sparse lengths
     */
    class MRTaskNZ extends MRTask<MRTaskNZ> {
      long sparselen;
      @Override
      public void map(Chunk chks[]) {
        for (Chunk chk : chks) {
          sparselen += chk.sparseLen();
        }
      }
      @Override
      public void reduce(MRTaskNZ other) {
        sparselen += other.sparselen;
      }
    }

    long sparselen = new MRTaskNZ().doAll(frame).sparselen;

    return (((rows * cols) / (sparselen + 1)) > 32);
  }

  /**
   * Create a Mahout Matrix from a DRM.
   *
   * Create either Sparse or Dense Matrix depending on number of missing
   * elements in DRM.
   *
   * @param drm DRM object to create Matrix from.
   * @return created Matrix.
   */
  public static Matrix matrixFromDrm(H2ODrm drm) {
    Frame frame = drm.frame;
    Vec labels = drm.keys;
    Matrix m;

    if (isSparse(frame)) {
      m = new SparseMatrix((int)frame.numRows(), frame.numCols());
    } else {
      m = new DenseMatrix((int)frame.numRows(), frame.numCols());
    }

    int c = 0;
    // Fill matrix, column at a time.
    for (Vec v : frame.vecs()) {
      for (int r = 0; r < frame.numRows(); r++) {
        double d = 0.0;
        if (!v.isNA(r) && ((d = v.at(r)) != 0.0)) {
          m.setQuick(r, c, d);
        }
      }
      c++;
    }

    // If string keyed, set the stings as rowlabels.
    if (labels != null) {
      HashMap<String,Integer> map = new HashMap<String,Integer>();
      ValueString vstr = new ValueString();
      for (long i = 0; i < labels.length(); i++) {
        map.put(labels.atStr(vstr, i).toString(), (int)i);
      }
      m.setRowLabelBindings(map);
    }
    return m;
  }

  /**
   * Calculate Means of elements in a column, and return as a Vector.
   *
   * H2O precalculates means in a Vec, and a Vec corresponds to a column.
   *
   * @param frame Frame backing the H2O DRM.
   * @return Vector of pre-calculated means.
   */
  public static Vector colMeans(Frame frame) {
    double means[] = new double[frame.numCols()];
    for (int i = 0; i < frame.numCols(); i++) {
      means[i] = frame.vecs()[i].mean();
    }
    return new DenseVector(means);
  }

  /**
   * Calculate Sums of elements in a column, and return as a Vector.
   *
   * Run an MRTask Job to add up sums.
   * WARNING: Vulnerable to overflow. No way around it.
   *
   * @param frame Frame backing the H2O DRM.
   * @return Vector of calculated sums.
   */
  public static Vector colSums(Frame frame) {
    /**
     * MRTask to calculate sums of elements in all columns.
     */
    class MRTaskSum extends MRTask<MRTaskSum> {
      public double sums[];
      @Override
      public void map(Chunk chks[]) {
        sums = new double[chks.length];

        for (int c = 0; c < chks.length; c++) {
          for (int r = 0; r < chks[c].len(); r++) {
            sums[c] += chks[c].at0(r);
          }
        }
      }
      @Override
      public void reduce(MRTaskSum other) {
        ArrayUtils.add(sums, other.sums);
      }
    }
    return new DenseVector(new MRTaskSum().doAll(frame).sums);
  }

  /**
   * Calculate Sum of squares of all elements in the DRM.
   *
   * Run an MRTask Job to add up sums of squares.
   * WARNING: Vulnerable to overflow. No way around it.
   *
   * @param frame Frame backing the H2O DRM.
   * @return Sum of squares of all elements in the DRM.
   */
  public static double sumSqr(Frame frame) {
    /**
     * MRTask to calculate sums of squares of all elements.
     */
    class MRTaskSumSqr extends MRTask<MRTaskSumSqr> {
      public double sumSqr;
      @Override
      public void map(Chunk chks[]) {
        for (int c = 0; c < chks.length; c++) {
          for (int r = 0; r < chks[c].len(); r++) {
            sumSqr += (chks[c].at0(r) * chks[c].at0(r));
          }
        }
      }
      @Override
      public void reduce(MRTaskSumSqr other) {
        sumSqr += other.sumSqr;
      }
    }
    return new MRTaskSumSqr().doAll(frame).sumSqr;
  }

  /**
   * Count non-zero elements in all columns, and return as a Vector.
   *
   * Run an MRTask Job to count non-zero elements per column.
   *
   * @param frame Frame backing the H2O DRM.
   * @return Vector of counted non-zero elements.
   */
  public static Vector nonZeroCnt(Frame frame) {
    /**
     * MRTask to count all non-zero elements.
     */
    class MRTaskNonZero extends MRTask<MRTaskNonZero> {
      public double sums[];
      @Override
      public void map(Chunk chks[]) {
        sums = new double[chks.length];

        for (int c = 0; c < chks.length; c++) {
          for (int r = 0; r < chks[c].len(); r++) {
            if ((long)chks[c].at0(r) != 0) {
              sums[c] ++;
            }
          }
        }
      }
      @Override
      public void reduce(MRTaskNonZero other) {
        ArrayUtils.add(sums, other.sums);
      }
    }
    return new DenseVector(new MRTaskNonZero().doAll(frame).sums);
  }

  /** Convert String->Integer map to Integer->String map */
  private static Map<Integer,String> reverseMap(Map<String, Integer> map) {
    if (map == null) {
      return null;
    }

    Map<Integer,String> rmap = new HashMap<Integer,String>();

    for(Map.Entry<String,Integer> entry : map.entrySet()) {
      rmap.put(entry.getValue(),entry.getKey());
    }

    return rmap;
  }

  /**
   * Calculate optimum chunk size for given parameters.
   *
   * Chunk size is the number of elements stored per partition per column.
   *
   * @param nrow Number of rows in the DRM.
   * @param ncol Number of columns in the DRM.
   * @param minHint Minimum number of partitions to create, if passed value is not -1.
   * @param exactHint Exact number of partitions to create, if passed value is not -1.
   * @return Calculated optimum chunk size.
   */
  private static int chunkSize(long nrow, int ncol, int minHint, int exactHint) {
    int chunkSz;
    int partsHint = Math.max(minHint, exactHint);

    if (partsHint < 1) {
      /* XXX: calculate based on cloud size and # of cpu */
      partsHint = 4;
    }

    chunkSz = (int)(((nrow - 1) / partsHint) + 1);
    if (exactHint > 0) {
      return chunkSz;
    }

    if (chunkSz > 1e6) {
      chunkSz = (int)1e6;
    }

    if (minHint > 0) {
      return chunkSz;
    }

    if (chunkSz < 1e3) {
      chunkSz = (int)1e3;
    }

    return chunkSz;
  }

  /**
   * Ingest a Mahout Matrix into an H2O DRM.
   *
   * Frame is the backing data structure behind CheckpointedDrm.
   *
   * @param m Mahout Matrix to ingest data from.
   * @param minHint Hint for minimum number of partitions in created DRM.
   * @param exactHint Hint for exact number of partitions in created DRM.
   * @return Created H2O backed DRM.
   */
  public static H2ODrm drmFromMatrix(Matrix m, int minHint, int exactHint) {
    // First create an empty (0-filled) frame of the required dimensions
    Frame frame = emptyFrame(m.rowSize(), m.columnSize(), minHint, exactHint);
    Vec labels = null;
    Vec.Writer writers[] = new Vec.Writer[m.columnSize()];
    Futures closer = new Futures();

    // "open" vectors for writing efficiently in bulk
    for (int i = 0; i < writers.length; i++) {
      writers[i] = frame.vecs()[i].open();
    }

    for (int r = 0; r < m.rowSize(); r++) {
      for (int c = 0; c < m.columnSize(); c++) {
        writers[c].set(r, m.getQuick(r, c));
      }
    }

    for (int c = 0; c < m.columnSize(); c++) {
      writers[c].close(closer);
    }

    // If string labeled matrix, create aux Vec
    Map<String,Integer> map = m.getRowLabelBindings();
    if (map != null) {
      // label vector must be similarly partitioned like the Frame
      labels = frame.anyVec().makeZero();
      Vec.Writer writer = labels.open();
      Map<Integer,String> rmap = reverseMap(map);

      for (long r = 0; r < m.rowSize(); r++) {
        writer.set(r, rmap.get(r));
      }

      writer.close(closer);
    }

    closer.blockForPending();

    return new H2ODrm(frame, labels);
  }

  /**
   * Create an empty (zero-filled) H2O Frame efficiently.
   *
   * Create a zero filled Frame with specified cardinality.
   * Do not actually fill zeroes in each cell, create pre-compressed chunks.
   * Time taken per column asymptotically at O(nChunks), not O(nrow).
   *
   * @param nrow Number of rows in the Frame.
   * @param ncol Number of columns in the Frame.
   * @param minHint Hint for minimum number of chunks per column in created Frame.
   * @param exactHint Hint for exact number of chunks per column in created Frame.
   * @return Created Frame.
   */
  public static Frame emptyFrame(long nrow, int ncol, int minHint, int exactHint) {
    Vec.VectorGroup vg = new Vec.VectorGroup();

    return emptyFrame(nrow, ncol, minHint, exactHint, vg);
  }

  /**
   * Create an empty (zero-filled) H2O Frame efficiently.
   *
   * Create a zero filled Frame with specified cardinality.
   * Do not actually fill zeroes in each cell, create pre-compressed chunks.
   * Time taken per column asymptotically at O(nChunks), not O(nrow).
   *
   * @param nrow Number of rows in the Frame.
   * @param ncol Number of columns in the Frame.
   * @param minHint Hint for minimum number of chunks per column in created Frame.
   * @param exactHint Hint for exact number of chunks per column in created Frame.
   * @param vg Shared VectorGroup so that all columns are similarly partitioned.
   * @return Created Frame.
   */
  public static Frame emptyFrame(long nrow, int ncol, int minHint, int exactHint, Vec.VectorGroup vg) {
    int chunkSz = chunkSize(nrow, ncol, minHint, exactHint);
    int nchunks = (int)((nrow - 1) / chunkSz) + 1; // Final number of Chunks per Vec
    long espc[] = new long[nchunks + 1];
    final Vec[] vecs = new Vec[ncol];

    for (int i = 0; i < nchunks; i++) {
      espc[i] = i * chunkSz;
    }
    espc[nchunks] = nrow;

    for (int i = 0; i < vecs.length; i++) {
      vecs[i] = Vec.makeCon(0, null, vg, espc);
    }

    return new Frame(vecs);
  }

  /**
   * Create an empty (zero-filled) H2O DRM.
   *
   * Create a zero filled DRM with specified cardinality.
   * Use the efficient emptyFrame() method internally.
   *
   * @param nrow Number of rows in the Frame.
   * @param ncol Number of columns in the Frame.
   * @param minHint Hint for minimum number of chunks per column in created Frame.
   * @param exactHint Hint for exact number of chunks per column in created Frame.
   * @return Created DRM.
   */
  public static H2ODrm emptyDrm(long nrow, int ncol, int minHint, int exactHint) {
    return new H2ODrm(emptyFrame(nrow, ncol, minHint, exactHint));
  }
}
TOP

Related Classes of org.apache.mahout.h2obindings.MRTaskNonZero

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.