Package org.apache.accumulo.core.iterators

Source Code of org.apache.accumulo.core.iterators.AggregatingIterator

/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements.  See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.accumulo.core.iterators;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Map.Entry;

import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.PartialKey;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.aggregation.Aggregator;
import org.apache.accumulo.core.iterators.conf.ColumnToClassMapping;
import org.apache.accumulo.start.classloader.AccumuloClassLoader;
import org.apache.log4j.Logger;

/**
* This iterator wraps another iterator. It automatically aggregates.
*
* @deprecated since 1.4, replaced by {@link org.apache.accumulo.core.iterators.Combiner}
*/

@Deprecated
public class AggregatingIterator implements SortedKeyValueIterator<Key,Value>, OptionDescriber {
 
  private SortedKeyValueIterator<Key,Value> iterator;
  private ColumnToClassMapping<Aggregator> aggregators;
 
  private Key workKey = new Key();
 
  private Key aggrKey;
  private Value aggrValue;
  // private boolean propogateDeletes;
  private static final Logger log = Logger.getLogger(AggregatingIterator.class);
 
  public AggregatingIterator deepCopy(IteratorEnvironment env) {
    return new AggregatingIterator(this, env);
  }
 
  private AggregatingIterator(AggregatingIterator other, IteratorEnvironment env) {
    iterator = other.iterator.deepCopy(env);
    aggregators = other.aggregators;
  }
 
  public AggregatingIterator() {}
 
  private void aggregateRowColumn(Aggregator aggr) throws IOException {
    // this function assumes that first value is not delete
   
    if (iterator.getTopKey().isDeleted())
      return;
   
    workKey.set(iterator.getTopKey());
   
    Key keyToAggregate = workKey;
   
    aggr.reset();
   
    aggr.collect(iterator.getTopValue());
    iterator.next();
   
    while (iterator.hasTop() && !iterator.getTopKey().isDeleted() && iterator.getTopKey().equals(keyToAggregate, PartialKey.ROW_COLFAM_COLQUAL_COLVIS)) {
      aggr.collect(iterator.getTopValue());
      iterator.next();
    }
   
    aggrKey = workKey;
    aggrValue = aggr.aggregate();
   
  }
 
  private void findTop() throws IOException {
    // check if aggregation is needed
    if (iterator.hasTop()) {
      Aggregator aggr = aggregators.getObject(iterator.getTopKey());
      if (aggr != null) {
        aggregateRowColumn(aggr);
      }
    }
  }
 
  public AggregatingIterator(SortedKeyValueIterator<Key,Value> iterator, ColumnToClassMapping<Aggregator> aggregators) throws IOException {
    this.iterator = iterator;
    this.aggregators = aggregators;
  }
 
  @Override
  public Key getTopKey() {
    if (aggrKey != null) {
      return aggrKey;
    }
    return iterator.getTopKey();
  }
 
  @Override
  public Value getTopValue() {
    if (aggrKey != null) {
      return aggrValue;
    }
    return iterator.getTopValue();
  }
 
  @Override
  public boolean hasTop() {
    return aggrKey != null || iterator.hasTop();
  }
 
  @Override
  public void next() throws IOException {
    if (aggrKey != null) {
      aggrKey = null;
      aggrValue = null;
    } else {
      iterator.next();
    }
   
    findTop();
  }
 
  @Override
  public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
    // do not want to seek to the middle of a value that should be
    // aggregated...
   
    Range seekRange = IteratorUtil.maximizeStartKeyTimeStamp(range);
   
    iterator.seek(seekRange, columnFamilies, inclusive);
    findTop();
   
    if (range.getStartKey() != null) {
      while (hasTop() && getTopKey().equals(range.getStartKey(), PartialKey.ROW_COLFAM_COLQUAL_COLVIS)
          && getTopKey().getTimestamp() > range.getStartKey().getTimestamp()) {
        // the value has a more recent time stamp, so
        // pass it up
        // log.debug("skipping "+getTopKey());
        next();
      }
     
      while (hasTop() && range.beforeStartKey(getTopKey())) {
        next();
      }
    }
   
  }
 
  @Override
  public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
   
    this.iterator = source;
   
    try {
      this.aggregators = new ColumnToClassMapping<Aggregator>(options, Aggregator.class);
    } catch (ClassNotFoundException e) {
      log.error(e.toString());
      throw new IllegalArgumentException(e);
    } catch (InstantiationException e) {
      log.error(e.toString());
      throw new IllegalArgumentException(e);
    } catch (IllegalAccessException e) {
      log.error(e.toString());
      throw new IllegalArgumentException(e);
    }
  }
 
  @Override
  public IteratorOptions describeOptions() {
    return new IteratorOptions("agg", "Aggregators apply aggregating functions to values with identical keys", null,
        Collections.singletonList("<columnName> <aggregatorClass>"));
  }
 
  @Override
  public boolean validateOptions(Map<String,String> options) {
    for (Entry<String,String> entry : options.entrySet()) {
      String classname = entry.getValue();
      if (classname == null)
        return false;
      Class<? extends Aggregator> clazz;
      try {
        clazz = AccumuloClassLoader.loadClass(classname, Aggregator.class);
        clazz.newInstance();
      } catch (ClassNotFoundException e) {
        throw new IllegalArgumentException("class not found: " + classname);
      } catch (InstantiationException e) {
        throw new IllegalArgumentException("instantiation exception: " + classname);
      } catch (IllegalAccessException e) {
        throw new IllegalArgumentException("illegal access exception: " + classname);
      }
    }
    return true;
  }
}
TOP

Related Classes of org.apache.accumulo.core.iterators.AggregatingIterator

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.