Package io.druid.segment.incremental

Source Code of io.druid.segment.incremental.IncrementalIndexStorageAdapterTest

/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013  Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
*/

package io.druid.segment.incremental;

import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import io.druid.collections.StupidPool;
import io.druid.data.input.MapBasedInputRow;
import io.druid.data.input.MapBasedRow;
import io.druid.data.input.Row;
import io.druid.granularity.QueryGranularity;
import io.druid.query.Result;
import io.druid.query.TestQueryRunners;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.aggregation.JavaScriptAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.filter.DimFilters;
import io.druid.query.groupby.GroupByQuery;
import io.druid.query.groupby.GroupByQueryConfig;
import io.druid.query.groupby.GroupByQueryEngine;
import io.druid.query.topn.TopNQueryBuilder;
import io.druid.query.topn.TopNQueryEngine;
import io.druid.query.topn.TopNResultValue;
import io.druid.segment.Cursor;
import io.druid.segment.DimensionSelector;
import io.druid.segment.filter.SelectorFilter;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;

/**
*/
public class IncrementalIndexStorageAdapterTest
{
  @Test
  public void testSanity() throws Exception
  {
    IncrementalIndex index = new IncrementalIndex(
        0, QueryGranularity.MINUTE, new AggregatorFactory[]{new CountAggregatorFactory("cnt")},
        TestQueryRunners.pool
    );

    index.add(
        new MapBasedInputRow(
            new DateTime().minus(1).getMillis(),
            Lists.newArrayList("billy"),
            ImmutableMap.<String, Object>of("billy", "hi")
        )
    );
    index.add(
        new MapBasedInputRow(
            new DateTime().minus(1).getMillis(),
            Lists.newArrayList("sally"),
            ImmutableMap.<String, Object>of("sally", "bo")
        )
    );

    GroupByQueryEngine engine = makeGroupByQueryEngine();

    final Sequence<Row> rows = engine.process(
        GroupByQuery.builder()
                    .setDataSource("test")
                    .setGranularity(QueryGranularity.ALL)
                    .setInterval(new Interval(0, new DateTime().getMillis()))
                    .addDimension("billy")
                    .addDimension("sally")
                    .addAggregator(new LongSumAggregatorFactory("cnt", "cnt"))
                    .build(),
        new IncrementalIndexStorageAdapter(index)
    );

    final ArrayList<Row> results = Sequences.toList(rows, Lists.<Row>newArrayList());

    Assert.assertEquals(2, results.size());

    MapBasedRow row = (MapBasedRow) results.get(0);
    Assert.assertEquals(ImmutableMap.of("billy", "hi", "cnt", 1l), row.getEvent());

    row = (MapBasedRow) results.get(1);
    Assert.assertEquals(ImmutableMap.of("sally", "bo", "cnt", 1l), row.getEvent());
  }

  @Test
  public void testObjectColumnSelectorOnVaryingColumnSchema() throws Exception
  {
    IncrementalIndex index = new IncrementalIndex(
        0, QueryGranularity.MINUTE, new AggregatorFactory[]{new CountAggregatorFactory("cnt")}, TestQueryRunners.pool
    );

    index.add(
        new MapBasedInputRow(
            new DateTime("2014-09-01T00:00:00"),
            Lists.newArrayList("billy"),
            ImmutableMap.<String, Object>of("billy", "hi")
        )
    );
    index.add(
        new MapBasedInputRow(
            new DateTime("2014-09-01T01:00:00"),
            Lists.newArrayList("billy", "sally"),
            ImmutableMap.<String, Object>of(
                "billy", "hip",
                "sally", "hop"
            )
        )
    );

    GroupByQueryEngine engine = makeGroupByQueryEngine();

    final Sequence<Row> rows = engine.process(
        GroupByQuery.builder()
                    .setDataSource("test")
                    .setGranularity(QueryGranularity.ALL)
                    .setInterval(new Interval(0, new DateTime().getMillis()))
                    .addDimension("billy")
                    .addDimension("sally")
                    .addAggregator(
                        new LongSumAggregatorFactory("cnt", "cnt")
                    )
                    .addAggregator(
                        new JavaScriptAggregatorFactory(
                            "fieldLength",
                            Arrays.asList("sally", "billy"),
                            "function(current, s, b) { return current + (s == null ? 0 : s.length) + (b == null ? 0 : b.length); }",
                            "function() { return 0; }",
                            "function(a,b) { return a + b; }"
                        )
                    )
                    .build(),
        new IncrementalIndexStorageAdapter(index)
    );

    final ArrayList<Row> results = Sequences.toList(rows, Lists.<Row>newArrayList());

    Assert.assertEquals(2, results.size());

    MapBasedRow row = (MapBasedRow) results.get(0);
    Assert.assertEquals(ImmutableMap.of("billy", "hi", "cnt", 1l, "fieldLength", 2.0), row.getEvent());

    row = (MapBasedRow) results.get(1);
    Assert.assertEquals(ImmutableMap.of("billy", "hip", "sally", "hop", "cnt", 1l, "fieldLength", 6.0), row.getEvent());
  }

  private static GroupByQueryEngine makeGroupByQueryEngine()
  {
    return new GroupByQueryEngine(
          Suppliers.<GroupByQueryConfig>ofInstance(
              new GroupByQueryConfig()
              {
                @Override
                public int getMaxIntermediateRows()
                {
                  return 5;
                }
              }
          ),
          new StupidPool(
              new Supplier<ByteBuffer>()
              {
                @Override
                public ByteBuffer get()
                {
                  return ByteBuffer.allocate(50000);
                }
              }
          )
      );
  }

  @Test
  public void testResetSanity() {
    IncrementalIndex index = new IncrementalIndex(
        0, QueryGranularity.MINUTE, new AggregatorFactory[]{new CountAggregatorFactory("cnt")},
        TestQueryRunners.pool
    );


    DateTime t = DateTime.now();
    Interval interval = new Interval(t.minusMinutes(1), t.plusMinutes(1));

    index.add(
        new MapBasedInputRow(
            t.minus(1).getMillis(),
            Lists.newArrayList("billy"),
            ImmutableMap.<String, Object>of("billy", "hi")
        )
    );
    index.add(
        new MapBasedInputRow(
            t.minus(1).getMillis(),
            Lists.newArrayList("sally"),
            ImmutableMap.<String, Object>of("sally", "bo")
        )
    );

    IncrementalIndexStorageAdapter adapter = new IncrementalIndexStorageAdapter(index);
    Sequence<Cursor> cursorSequence = adapter.makeCursors(new SelectorFilter("sally", "bo"),
                                                          interval,
                                                          QueryGranularity.NONE);

    Cursor cursor = Sequences.toList(Sequences.limit(cursorSequence, 1), Lists.<Cursor>newArrayList()).get(0);
    DimensionSelector dimSelector;

    dimSelector = cursor.makeDimensionSelector("sally");
    Assert.assertEquals("bo", dimSelector.lookupName(dimSelector.getRow().get(0)));

    index.add(
        new MapBasedInputRow(
            t.minus(1).getMillis(),
            Lists.newArrayList("sally"),
            ImmutableMap.<String, Object>of("sally", "ah")
        )
    );

    // Cursor reset should not be affected by out of order values
    cursor.reset();

    dimSelector = cursor.makeDimensionSelector("sally");
    Assert.assertEquals("bo", dimSelector.lookupName(dimSelector.getRow().get(0)));
  }

  @Test
  public void testSingleValueTopN()
  {
    IncrementalIndex index = new IncrementalIndex(
        0, QueryGranularity.MINUTE, new AggregatorFactory[]{new CountAggregatorFactory("cnt")},
        TestQueryRunners.pool
    );

    DateTime t = DateTime.now();
    index.add(
        new MapBasedInputRow(
            t.minus(1).getMillis(),
            Lists.newArrayList("sally"),
            ImmutableMap.<String, Object>of("sally", "bo")
        )
    );

    TopNQueryEngine engine = new TopNQueryEngine(
        new StupidPool<ByteBuffer>(
            new Supplier<ByteBuffer>()
            {
              @Override
              public ByteBuffer get()
              {
                return ByteBuffer.allocate(50000);
              }
            }
        )
    );

    final Iterable<Result<TopNResultValue>> results = Sequences.toList(
        engine.query(
            new TopNQueryBuilder().dataSource("test")
                                  .granularity(QueryGranularity.ALL)
                                  .intervals(Lists.newArrayList(new Interval(0, new DateTime().getMillis())))
                                  .dimension("sally")
                                  .metric("cnt")
                                  .threshold(10)
                                  .aggregators(
                                      Lists.<AggregatorFactory>newArrayList(
                                          new LongSumAggregatorFactory(
                                              "cnt",
                                              "cnt"
                                          )
                                      )
                                  )
                                  .build(),
            new IncrementalIndexStorageAdapter(index)
        ),
        Lists.<Result<TopNResultValue>>newLinkedList()
    );

    Assert.assertEquals(1, Iterables.size(results));
    Assert.assertEquals(1, results.iterator().next().getValue().getValue().size());
  }

  @Test
  public void testFilterByNull() throws Exception
  {
    IncrementalIndex index = new IncrementalIndex(
         0, QueryGranularity.MINUTE, new AggregatorFactory[]{new CountAggregatorFactory("cnt")},
         TestQueryRunners.pool
     );

    index.add(
        new MapBasedInputRow(
            new DateTime().minus(1).getMillis(),
            Lists.newArrayList("billy"),
            ImmutableMap.<String, Object>of("billy", "hi")
        )
    );
    index.add(
        new MapBasedInputRow(
            new DateTime().minus(1).getMillis(),
            Lists.newArrayList("sally"),
            ImmutableMap.<String, Object>of("sally", "bo")
        )
    );

    GroupByQueryEngine engine = makeGroupByQueryEngine();

    final Sequence<Row> rows = engine.process(
        GroupByQuery.builder()
                    .setDataSource("test")
                    .setGranularity(QueryGranularity.ALL)
                    .setInterval(new Interval(0, new DateTime().getMillis()))
                    .addDimension("billy")
                    .addDimension("sally")
                    .addAggregator(new LongSumAggregatorFactory("cnt", "cnt"))
                    .setDimFilter(DimFilters.dimEquals("sally", (String) null))
                    .build(),
        new IncrementalIndexStorageAdapter(index)
    );

    final ArrayList<Row> results = Sequences.toList(rows, Lists.<Row>newArrayList());

    Assert.assertEquals(1, results.size());

    MapBasedRow row = (MapBasedRow) results.get(0);
    Assert.assertEquals(ImmutableMap.of("billy", "hi", "cnt", 1l), row.getEvent());
  }
}
TOP

Related Classes of io.druid.segment.incremental.IncrementalIndexStorageAdapterTest

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.