Package org.kiji.schema.impl.hbase

Source Code of org.kiji.schema.impl.hbase.HBaseMaterializedKijiResult

/**
* (c) Copyright 2014 WibiData, Inc.
*
* See the NOTICE file distributed with this work for additional
* information regarding copyright ownership.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.kiji.schema.impl.hbase;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;

import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSortedMap;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Result;

import org.kiji.annotations.ApiAudience;
import org.kiji.schema.EntityId;
import org.kiji.schema.KijiCell;
import org.kiji.schema.KijiColumnName;
import org.kiji.schema.KijiDataRequest;
import org.kiji.schema.KijiDataRequest.Column;
import org.kiji.schema.KijiResult;
import org.kiji.schema.NoSuchColumnException;
import org.kiji.schema.hbase.HBaseColumnName;
import org.kiji.schema.impl.DefaultKijiResult;
import org.kiji.schema.layout.HBaseColumnNameTranslator;
import org.kiji.schema.layout.KijiTableLayout;
import org.kiji.schema.layout.impl.CellDecoderProvider;

/**
* A {@link KijiResult} backed by an HBase {@link Result}.
*
* @param <T> The type of {@code KijiCell} values in the view.
*/
@ApiAudience.Private
public final class HBaseMaterializedKijiResult<T> implements KijiResult<T> {
  private final EntityId mEntityId;
  private final KijiDataRequest mDataRequest;
  private final KijiTableLayout mLayout;
  private final HBaseColumnNameTranslator mColumnTranslator;
  private final CellDecoderProvider mDecoderProvider;
  private final SortedMap<KijiColumnName, List<KeyValue>> mColumnResults;

  /**
   * Create a {@code KijiResult} backed by an HBase {@link Result}.
   *
   * @param entityId The entity ID of the row to which the {@code Result} belongs.
   * @param dataRequest The data request which defines the columns in this {@code KijiResult}.
   * @param layout The Kiji table layout of the table.
   * @param columnTranslator The Kiji column name translator of the table.
   * @param decoderProvider The Kiji cell decoder provider of the table.
   * @param columnResults The materialized HBase results.
   */
  private HBaseMaterializedKijiResult(
      final EntityId entityId,
      final KijiDataRequest dataRequest,
      final KijiTableLayout layout,
      final HBaseColumnNameTranslator columnTranslator,
      final CellDecoderProvider decoderProvider,
      final SortedMap<KijiColumnName, List<KeyValue>> columnResults
  ) {
    mEntityId = entityId;
    mDataRequest = dataRequest;
    mLayout = layout;
    mColumnTranslator = columnTranslator;
    mDecoderProvider = decoderProvider;
    mColumnResults = columnResults;
  }

  /**
   * Create a {@code KijiResult} backed by an HBase {@link Result}.
   *
   * @param entityId The entity ID of the row to which the {@code Result} belongs.
   * @param dataRequest The data request which defines the columns in this {@code KijiResult}.
   * @param result The backing HBase result.
   * @param layout The Kiji table layout of the table.
   * @param columnTranslator The Kiji column name translator of the table.
   * @param decoderProvider The Kiji cell decoder provider of the table.
   * @param <T> The type of {@code KijiCell} values in the view.
   * @return A {@code KijiResult} backed by an HBase {@code Result}.
   */
  public static <T> HBaseMaterializedKijiResult<T> create(
      final EntityId entityId,
      final KijiDataRequest dataRequest,
      final Result result,
      final KijiTableLayout layout,
      final HBaseColumnNameTranslator columnTranslator,
      final CellDecoderProvider decoderProvider
  ) {
    final ImmutableSortedMap.Builder<KijiColumnName, List<KeyValue>> columnResults =
        ImmutableSortedMap.naturalOrder();

    for (Column columnRequest : dataRequest.getColumns()) {
      // TODO: determine via benchmarks whether it would be faster to make a copy of the
      // columnResult list so that the underlying Result may be garbage collected.
      List<KeyValue> columnResult = getColumnKeyValues(columnRequest, columnTranslator, result);
      columnResults.put(columnRequest.getColumnName(), columnResult);
    }

    return new HBaseMaterializedKijiResult<T>(
        entityId,
        dataRequest,
        layout,
        columnTranslator,
        decoderProvider,
        columnResults.build());
  }

  /** {@inheritDoc} */
  @Override
  public EntityId getEntityId() {
    return mEntityId;
  }

  /** {@inheritDoc} */
  @Override
  public KijiDataRequest getDataRequest() {
    return mDataRequest;
  }

  /** {@inheritDoc} */
  @Override
  public Iterator<KijiCell<T>> iterator() {
    final List<Iterator<KijiCell<T>>> columnIterators =
        Lists.newArrayListWithCapacity(mColumnResults.size());

    for (Map.Entry<KijiColumnName, List<KeyValue>> entry : mColumnResults.entrySet()) {
      final Function<KeyValue, KijiCell<T>> decoder =
          ResultDecoders.getDecoderFunction(
              entry.getKey(),
              mLayout,
              mColumnTranslator,
              mDecoderProvider);

      columnIterators.add(Iterators.transform(entry.getValue().iterator(), decoder));
    }
    return Iterators.concat(columnIterators.iterator());
  }

  /** {@inheritDoc} */
  @Override
  @SuppressWarnings("unchecked")
  public <U extends T> HBaseMaterializedKijiResult<U> narrowView(final KijiColumnName column) {
    final KijiDataRequest narrowRequest = DefaultKijiResult.narrowRequest(column, mDataRequest);
    if (narrowRequest.equals(mDataRequest)) {
      return (HBaseMaterializedKijiResult<U>) this;
    }

    final ImmutableSortedMap.Builder<KijiColumnName, List<KeyValue>> narrowedResults =
        ImmutableSortedMap.naturalOrder();

    for (Column columnRequest : narrowRequest.getColumns()) {
      final KijiColumnName requestColumnName = columnRequest.getColumnName();

      // We get here IF

      // `column` is a family, and `mDataRequest` contains a column request for the entire family.

      // OR

      // `column` is a family, and `mDataRequest` contains a column request for a qualified column
      // in the family.

      // OR

      // `column` is a qualified-column, and `mDataRequest` contains a request for the qualified
      // column.

      final List<KeyValue> exactColumn = mColumnResults.get(requestColumnName);
      if (exactColumn != null) {
        narrowedResults.put(requestColumnName, exactColumn);
      } else {

        // `column` is a qualified-column, and `mDataRequest` contains a column request for the
        // column's family.

        final List<KeyValue> familyResults =
            mColumnResults.get(KijiColumnName.create(requestColumnName.getFamily(), null));
        final List<KeyValue> qualifiedColumnResults =
            getQualifiedColumnKeyValues(columnRequest, mColumnTranslator, familyResults);

        narrowedResults.put(requestColumnName, qualifiedColumnResults);
      }
    }

    return new HBaseMaterializedKijiResult<U>(
        mEntityId,
        mDataRequest,
        mLayout,
        mColumnTranslator,
        mDecoderProvider,
        narrowedResults.build());
  }

  /** {@inheritDoc} */
  @Override
  public void close() throws IOException {
    // No-op
  }

  // -----------------------------------------------------------------------------------------------
  // Static classes and helper methods
  // -----------------------------------------------------------------------------------------------

  private static final byte[] EMPTY_BYTES = new byte[0];

  /**
   * Get the list of {@code KeyValue}s in a {@code Result} belonging to a column request.
   *
   * <p>
   *   This method will filter extra version from the result if necessary.
   * </p>
   *
   * @param columnRequest of the column whose {@code KeyValues} to return.
   * @param translator for the table.
   * @param result the scan results.
   * @return the {@code KeyValue}s for the column.
   */
  private static List<KeyValue> getColumnKeyValues(
      final Column columnRequest,
      final HBaseColumnNameTranslator translator,
      final Result result
  ) {
    final KijiColumnName column = columnRequest.getColumnName();
    final List<KeyValue> keyValues = Arrays.asList(result.raw());

    if (column.isFullyQualified()) {
      return getQualifiedColumnKeyValues(columnRequest, translator, keyValues);
    } else {
      return getFamilyKeyValues(columnRequest, translator, keyValues);
    }
  }

  /**
   * Get the list of {@code KeyValue}s in the {@code Result} belonging to a fully-qualified column
   * request.
   *
   * <p>
   *   This method will filter extra versions from the result if the number of versions in the
   *   result is greater than the column's requested max versions.
   * </p>
   *
   * @param columnRequest of the column whose {@code KeyValues} to return.
   * @param translator for the table.
   * @param result the scan results.
   * @return the {@code KeyValue}s for the qualified column.
   */
  private static List<KeyValue> getQualifiedColumnKeyValues(
      final Column columnRequest,
      final HBaseColumnNameTranslator translator,
      final List<KeyValue> result
  ) {
    if (result.size() == 0) {
      return ImmutableList.of();
    }
    final byte[] rowkey = result.get(0).getRow();
    final byte[] family;
    final byte[] qualifier;
    try {
      final HBaseColumnName hbaseColumn =
          translator.toHBaseColumnName(columnRequest.getColumnName());
      family = hbaseColumn.getFamily();
      qualifier = hbaseColumn.getQualifier();
    } catch (NoSuchColumnException e) {
      throw new IllegalArgumentException(e);
    }

    final KeyValue start = new KeyValue(rowkey, family, qualifier, Long.MAX_VALUE, EMPTY_BYTES);
    // HBase will never return a KeyValue with a negative timestamp, so -1 is fine for exclusive end
    final KeyValue end = new KeyValue(rowkey, family, qualifier, -1, EMPTY_BYTES);

    final List<KeyValue> columnKeyValues = getSublist(result, KeyValue.COMPARATOR, start, end);

    final int maxVersions = columnRequest.getMaxVersions();
    if (columnKeyValues.size() > maxVersions) {
      return columnKeyValues.subList(0, maxVersions);
    } else {
      return columnKeyValues;
    }
  }

  /**
   * Get the list of {@code KeyValue}s in a {@code Result} belonging to a familyRequest request.
   *
   * <p>
   *   This method will filter extra versions from each column if necessary.
   * </p>
   *
   * @param familyRequest The familyRequest whose {@code KeyValues} to return.
   * @param translator The column name translator for the table.
   * @param result The scan results.
   * @return the {@code KeyValue}s for the specified familyRequest.
   */
  private static List<KeyValue> getFamilyKeyValues(
      final Column familyRequest,
      final HBaseColumnNameTranslator translator,
      final List<KeyValue> result
  ) {
    final KijiColumnName column = familyRequest.getColumnName();
    if (result.size() == 0) {
      return ImmutableList.of();
    }
    final byte[] rowkey = result.get(0).getRow();
    final byte[] family;
    final byte[] qualifier;
    try {
      final HBaseColumnName hbaseColumn =
          translator.toHBaseColumnName(column);
      family = hbaseColumn.getFamily();
      qualifier = hbaseColumn.getQualifier();
    } catch (NoSuchColumnException e) {
      throw new IllegalArgumentException(e);
    }

    List<KeyValue> keyValues = Lists.newArrayList();

    final KeyValue familyStartKV =
        new KeyValue(rowkey, family, qualifier, Integer.MAX_VALUE, EMPTY_BYTES);

    // Index of the current qualified column index
    int columnStart = getElementIndex(result, 0, familyStartKV);
    while (columnStart < result.size()
        && column.getFamily().equals(getKeyValueColumnName(result.get(columnStart), translator)
            .getFamily())) {

      final KeyValue start = result.get(columnStart);
      final KeyValue end = new KeyValue(
          rowkey,
          start.getFamily(),
          start.getQualifier(),
          -1,
          EMPTY_BYTES);

      final int columnEnd = getElementIndex(result, columnStart, end);

      final int length = Math.min(columnEnd - columnStart, familyRequest.getMaxVersions());

      keyValues.addAll(result.subList(columnStart, columnStart + length));
      columnStart = columnEnd;
    }

    return keyValues;
  }

  /**
   * Get the KijiColumnName encoded in the Key of a given KeyValue.
   *
   * @param kv KeyValue from which to get the encoded KijiColumnName.
   * @param translator for table.
   * @return the KijiColumnName encoded in the Key of a given KeyValue.
   */
  private static KijiColumnName getKeyValueColumnName(
      final KeyValue kv,
      final HBaseColumnNameTranslator translator
  ) {
    final HBaseColumnName hBaseColumnName = new HBaseColumnName(kv.getFamily(), kv.getQualifier());
    try {
      return translator.toKijiColumnName(hBaseColumnName);
    } catch (NoSuchColumnException nsce) {
      // This should not happen since it's only called on data returned by HBase.
      throw new IllegalStateException(
          String.format("Unknown column name in KeyValue: %s.", kv));
    }
  }

  /**
   * Get the index of a {@code KeyValue} in an array.
   *
   * @param result to search for the {@code KeyValue}. Must be sorted.
   * @param start index to start search for the {@code KeyValue}.
   * @param element to search for.
   * @return the index that the element resides, or would reside if it were to be inserted into the
   *     sorted array.
   */
  private static int getElementIndex(
      final List<KeyValue> result,
      final int start,
      final KeyValue element
  ) {
    final List<KeyValue> sublist = result.subList(start, result.size());
    final int index = Collections.binarySearch(sublist, element, KeyValue.COMPARATOR);
    if (index < 0) {
      return (-1 - index) + start;
    } else {
      return index + start;
    }
  }

  /**
   * Get a sublist of a list starting at the {@code start} element (inclusive), and extending to the
   * {@code end} element (exclusive).
   *
   * @param list from which to take the sublist. Must be sorted.
   * @param comparator which the list is sorted with.
   * @param start element for the sublist (inclusive).
   * @param end element for the sublist (exclusive).
   * @param <T> The type of list element.
   * @return the sublist from the provided sorted list which contains the {@code start} element and
   *    excludes the {@code end} element.
   */
  private static <T> List<T> getSublist(
      final List<T> list,
      final Comparator<? super T> comparator,
      final T start,
      final T end
  ) {
    int startIndex = Collections.binarySearch(list, start, comparator);
    if (startIndex < 0) {
      startIndex = -1 - startIndex;
    }
    int endIndex = Collections.binarySearch(list, end, comparator);
    if (endIndex < 0) {
      endIndex = -1 - endIndex;
    }

    return list.subList(startIndex, endIndex);
  }
}
TOP

Related Classes of org.kiji.schema.impl.hbase.HBaseMaterializedKijiResult

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.