Package parquet.io

Source Code of parquet.io.FilteredRecordReader

/**
* Copyright 2012 Twitter, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package parquet.io;

import parquet.column.ColumnReader;
import parquet.column.impl.ColumnReadStoreImpl;
import parquet.filter.RecordFilter;
import parquet.filter.UnboundRecordFilter;
import parquet.io.api.RecordMaterializer;

/**
* Extends the
* @author Jacob Metcalf
*
*/
class FilteredRecordReader<T> extends RecordReaderImplementation<T> {

  private final RecordFilter recordFilter;
  private final long recordCount;
  private long recordsRead = 0;

  /**
   * @param root          the root of the schema
   * @param validating
   * @param columnStore
   * @param unboundFilter Filter records, pass in NULL_FILTER to leave unfiltered.
   */
  public FilteredRecordReader(MessageColumnIO root, RecordMaterializer<T> recordMaterializer, boolean validating,
                              ColumnReadStoreImpl columnStore, UnboundRecordFilter unboundFilter, long recordCount) {
    super(root, recordMaterializer, validating, columnStore);
    this.recordCount = recordCount;
    if ( unboundFilter != null ) {
      recordFilter = unboundFilter.bind(getColumnReaders());
    } else {
      recordFilter = null;
    }
  }

  /**
   * Override read() method to provide skip.
   */
  @Override
  public T read() {
    skipToMatch();
    if (recordsRead == recordCount) {
      return null;
    }
    ++ recordsRead;
    return super.read();
  }


  /**
   * Skips forwards until the filter finds the first match. Returns false
   * if none found.
   */
  private void skipToMatch() {
    while (recordsRead < recordCount && !recordFilter.isMatch()) {
      State currentState = getState(0);
      do {
        ColumnReader columnReader = currentState.column;

        // currentLevel = depth + 1 at this point
        // set the current value
        if (columnReader.getCurrentDefinitionLevel() >= currentState.maxDefinitionLevel) {
          columnReader.skip();
        }
        columnReader.consume();

        // Based on repetition level work out next state to go to
        int nextR = currentState.maxRepetitionLevel == 0 ? 0 : columnReader.getCurrentRepetitionLevel();
        currentState = currentState.getNextState(nextR);
      } while (currentState != null);
      ++ recordsRead;
    }
  }
}
TOP

Related Classes of parquet.io.FilteredRecordReader

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.