Package com.google.appengine.tools.mapreduce.inputs

Source Code of com.google.appengine.tools.mapreduce.inputs.DatastoreInputTest

/*
* Copyright 2010 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.appengine.tools.mapreduce.inputs;

import com.google.appengine.api.NamespaceManager;
import com.google.appengine.api.datastore.DatastoreService;
import com.google.appengine.api.datastore.DatastoreServiceFactory;
import com.google.appengine.api.datastore.Entity;
import com.google.appengine.api.datastore.Key;
import com.google.appengine.api.datastore.Query;
import com.google.appengine.tools.development.testing.LocalDatastoreServiceTestConfig;
import com.google.appengine.tools.development.testing.LocalServiceTestHelper;
import com.google.appengine.tools.mapreduce.InputReader;

import junit.framework.TestCase;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.NoSuchElementException;

/**
*/
public class DatastoreInputTest extends TestCase {

  static final String ENTITY_KIND_NAME = "Bob";

  private final LocalServiceTestHelper helper
      = new LocalServiceTestHelper(new LocalDatastoreServiceTestConfig());
  private DatastoreService ds;

  @Override
  public void setUp() throws Exception {
    super.setUp();
    helper.setUp();
    ds = DatastoreServiceFactory.getDatastoreService();
  }

  @Override
  public void tearDown() throws Exception {
    helper.tearDown();
    super.tearDown();
  }

  private List<Key> populateData(int size, String namespace) {
    return populateData(size, namespace, null);
  }

  private List<Key> populateData(int size, String namespace, Key ancestor) {
    Collection<Entity> entities = new ArrayList<>(size);
    String ns = NamespaceManager.get();
    try {
      if (namespace != null) {
        NamespaceManager.set(namespace);
      }
      for (int i = 0; i < size; i++) {
        entities.add(new Entity(ENTITY_KIND_NAME, ancestor));
      }
    } finally {
      NamespaceManager.set(ns);
    }
    List<Key> keys = ds.put(entities);
    Collections.sort(keys);
    return keys;
  }

  public void testCreateQuery() {
    checkQuery("namespace", "kind");
    checkQuery(null, "kind");
  }

  private void checkQuery(String namespace, String kind) {
    Query query = BaseDatastoreInput.createQuery(namespace, kind);
    assertEquals(query.getKind(), kind);
    assertEquals(namespace == null ? "" : namespace, query.getNamespace());
  }

  public void testCreateReadersWithAncestor() throws IOException {
    List<Key> data = populateData(2, null);
    Key parrent = data.get(0);
    Key notInResult = data.get(1);
    List<Key> keys = populateData(100, null, parrent);
    DatastoreInput input = new DatastoreInput(new Query(ENTITY_KIND_NAME, parrent), 5);
    ArrayList<Key> read = new ArrayList<>();
    for (InputReader<Entity> reader : input.createReaders() ) {
      read.addAll(getEntities(reader));
    }
    Collections.sort(read);
    assertFalse(read.contains(notInResult));
    assertEquals(keys.size() + 1, read.size());
    assertEquals(parrent, read.get(0));
    for (int i = 0; i < keys.size(); i++) {
      assertEquals(keys.get(i), read.get(i+1));
    }
  }

  public void testCreateReadersWithNamespace() throws IOException {
    List<Key> keys = populateData(100, "namespace1");
    DatastoreInput input = new DatastoreInput(ENTITY_KIND_NAME, 2, "namespace1");
    InputReader<Entity> reader = input.createReaders().get(0);
    ArrayList<Key> read = new ArrayList<>();
    read.addAll(getEntities(reader));
    reader = input.createReaders().get(1);
    read.addAll(getEntities(reader));
    Collections.sort(read);
    assertEquals(keys.size(), read.size());
    for (int i = 0; i < keys.size(); i++) {
      assertEquals(keys.get(i), read.get(i));
    }
  }

  public void testCreateReadersMediumData() throws Exception {
    assertSplitting(5, (BaseDatastoreInput.RANGE_SEGMENTS_PER_SHARD / 2) * 5);
  }

  public void testCreateReadersLotsOfData() throws Exception {
    assertSplitting(5, BaseDatastoreInput.RANGE_SEGMENTS_PER_SHARD * 10 * 5);
  }

  private void assertSplitting(int numReaders, int numEntities) throws IOException {
    populateData(numEntities, null);
    List<InputReader<Entity>> splits = createReaders(numReaders);
    assertEquals(Math.max(1, Math.min(numReaders, numEntities)), splits.size());
    // Sizes are not expected to be equal due to scatter sampling
    int average = Math.round(numEntities / (float) numReaders);
    int min = (int) (average * 0.80) - 1;
    int max = (int) (average * 1.20) + 1;
    int[] counts = countEntities(splits);
    assertEquals(numEntities, total(counts));
    for (int i = 0; i < splits.size(); i++) {
      float ratio = (counts[i] / (float) average);
      assertTrue("Ratio was: " + ratio , counts[i] >= min);
      assertTrue("Ratio was: " + ratio ,counts[i] <= max);
    }
  }

  private static int total(int[] ints) {
    int result = 0;
    for (int i : ints) {
      result += i;
    }
    return result;
  }

  private int[] countEntities(List<InputReader<Entity>> splits) throws IOException {
    int[] result = new int[splits.size()];
    int i = 0;
    for (InputReader<Entity> reader : splits) {
      result[i] = getEntities(reader).size();
      i++;
    }
    return result;
  }

  private List<Key> getEntities(InputReader<Entity> reader) throws IOException {
    List<Key> result = new ArrayList<>();
    reader.beginShard();
    reader.beginSlice();
    try {
      while (true) {
        result.add(reader.next().getKey());
      }
    } catch (NoSuchElementException e) {
      // Ignore
    }
    reader.endSlice();
    reader.endShard();
    return result;
  }

  public void testCreateReadersNotEnoughData() throws Exception {
    populateData(9, null);
    List<InputReader<Entity>> splits = createReaders(5);
    int[] counts = countEntities(splits);
    assertEquals(9, total(counts));
    for (int i=0;i<counts.length;i++) {
      assertTrue(counts[i] >= 1 && counts[i] <= 6);
    }
  }

  public void testCreateReadersWithNoData() throws Exception {
    List<InputReader<Entity>> splits = createReaders(10);
    assertEquals(1, splits.size());
    InputReader<Entity> reader = splits.get(0);
    reader.beginShard();
    reader.beginSlice();
    try {
      reader.next();
      fail();
    } catch (NoSuchElementException e) {
      // Expected
    }
    reader.endSlice();
    reader.endShard();
  }

  public void testCreateReadersWithSingleKey() throws Exception {
    ds.put(new Entity(ENTITY_KIND_NAME));
    List<InputReader<Entity>> readers = createReaders(10);

    assertEquals(1, readers.size());
    List<Key> entities = getEntities(readers.get(0));
    assertEquals(1, entities.size());
  }

  public void test500() throws IOException {
    assertSplitting(2, 1000);
  }

  public void test100() throws IOException {
    assertSplitting(10, 1000);
  }

  private static List<InputReader<Entity>> createReaders(int shardCount) {
    DatastoreInput input = new DatastoreInput(ENTITY_KIND_NAME, shardCount);
    return input.createReaders();
  }
}
TOP

Related Classes of com.google.appengine.tools.mapreduce.inputs.DatastoreInputTest

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.