Package avrobase.file

Source Code of avrobase.file.FABTest

package avrobase.file;

import avrobase.AvroFormat;
import avrobase.ReversableFunction;
import avrobase.Row;
import bagcheck.Beacon;
import bagcheck.User;
import com.google.common.base.Charsets;
import com.google.common.base.Supplier;
import com.google.common.primitives.Longs;
import org.apache.avro.Schema;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.util.Utf8;
import org.jets3t.service.S3Service;
import org.jets3t.service.S3ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.model.S3Object;
import org.jets3t.service.security.AWSCredentials;
import org.jets3t.service.security.ProviderCredentials;
import org.junit.Test;

import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.GZIPInputStream;

import static junit.framework.Assert.assertEquals;

/**
* TODO: Edit this
* <p/>
* User: sam
* Date: 10/10/10
* Time: 10:09 PM
*/
public class FABTest {
  @Test
  public void putGet() {
    FAB<User, String> userRAB = getFAB("");
    User user = getUser();
    userRAB.put("test", user);
    Row<User, String> test = userRAB.get("test");
    assertEquals(user, test.value);
  }

  @Test
  public void putGet2() {
    FAB<User, String> userRAB = getFAB("");
    User user = getUser();
    Row<User, String> test = userRAB.get("test");
    assertEquals(user, test.value);
  }

  private FAB<User, String> getFAB(String base) {
    return new FAB<User, String>(base + "/tmp/users", base + "/tmp/schemas", new Supplier<String>() {
      Random random = new SecureRandom();

      @Override
      public String get() {
        return String.valueOf(random.nextLong());
      }
    }, User.SCHEMA$, AvroFormat.BINARY, new ReversableFunction<String, byte[]>() {

      @Override
      public byte[] apply(String s) {
        return s.getBytes(Charsets.UTF_8);
      }

      @Override
      public String unapply(byte[] bytes) {
        return new String(bytes, Charsets.UTF_8);
      }
    });
  }

  @Test
  public void multithreadedContention() throws InterruptedException, IOException {
    final FAB<User, String> userRAB = getFAB("");
    multithreadedtest(userRAB);
  }

  @Test
  public void multithreadedContention2() throws InterruptedException, IOException {
    final FAB<User, String> userRAB = getFAB("/Volumes/Data");
    multithreadedtest(userRAB);
  }

  private void multithreadedtest(final FAB<User, String> userRAB) throws InterruptedException {
    User user = getUser();
    final List<String> keys = new ArrayList<String>();
    for (int i = 0; i < 100; i++) {
      keys.add(userRAB.create(user));
    }
    final Random r = new SecureRandom();
    ExecutorService es = Executors.newCachedThreadPool();
    final AtomicInteger failures = new AtomicInteger(0);
    final AtomicInteger total = new AtomicInteger(0);
    final Semaphore s = new Semaphore(100);
    long start = System.currentTimeMillis();
    for (int i = 0; i < 20; i++) {
      s.acquireUninterruptibly();
      es.submit(new Runnable() {
        @Override
        public void run() {
          try {
            for (int i = 0; i < 500; i++) {
              total.incrementAndGet();
              String key = keys.get(r.nextInt(keys.size()));
              Row<User, String> userStringRow = userRAB.get(key);
              if (!userRAB.put(key, userStringRow.value, userStringRow.version)) {
                failures.incrementAndGet();
              }
            }
          } catch (Exception e) {
            e.printStackTrace();
          } finally {
            s.release();
          }
        }
      });
    }
    s.acquireUninterruptibly(100);
    es.shutdown();
    es.awaitTermination(1000, TimeUnit.SECONDS);
    System.out.println(failures + " out of " + total + " in " + (System.currentTimeMillis() - start) + "ms");
  }

  @Test
  public void scantest() {
    final FAB<User, String> userRAB = getFAB("/Volumes/Data");
    int total = 0;
    for (Row<User, String> userStringRow : userRAB.scan(null, null)) {
      total++;
    }
    System.out.println(total);
  }

  @Test
  public void halfscantest() {
    final FAB<User, String> userRAB = getFAB("/Volumes/Data");
    int total = 0;
    for (Row<User, String> userStringRow : userRAB.scan("0", null)) {
      total++;
    }
    System.out.println(total);
  }

  private User getUser() {
    User user = new User();
    user.email = $("spullara@yahoo.com");
    user.firstName = $("Sam");
    user.lastName = $("Pullara");
    user.image = $("");
    user.password = ByteBuffer.allocate(0);
    return user;
  }

  Utf8 $(String s) {
    return new Utf8(s);
  }

  public void testBeaconLoadAndScan() throws IOException, S3ServiceException, InterruptedException {
    final FAB<Beacon, byte[]> beaconFAB = new FAB<Beacon, byte[]>("/Volumes/Data/tmp/beacons", "/Volumes/Data/tmp/schemas", new Supplier<byte[]>() {
      Random r = new SecureRandom();
      @Override
      public byte[] get() {
        return Longs.toByteArray(r.nextLong());
      }
    }, Beacon.SCHEMA$, AvroFormat.BINARY, null);
    Properties p = new Properties();
    p.load(getClass().getResourceAsStream("creds.properties"));
    ProviderCredentials pc = new AWSCredentials(p.getProperty("AWS_ACCESS_KEY_ID"), p.getProperty("AWS_SECRET_ACCESS_KEY"));
    final RestS3Service s3 = new RestS3Service(pc);
    S3Object[] s3Objects = s3.listObjects("com.bagcheck.archive", "beacons/", null);
    ExecutorService es = Executors.newCachedThreadPool();
    List<Callable<Void>> callables = new ArrayList<Callable<Void>>();
    for (final S3Object s3Object : s3Objects) {
      if (!s3Object.getName().equals("beacons/")) {
        callables.add(new Callable<Void>() {
          @Override
          public Void call() throws Exception {
            S3Object object = s3.getObject(s3Object.getBucketName(), s3Object.getName());
            DataInputStream dis = new DataInputStream(new GZIPInputStream(object.getDataInputStream()));
            byte[] bytes = new byte[dis.readInt()];
            dis.readFully(bytes);
            byte[] valuebytes = new byte[1024];
            Schema writerSchema = Schema.parse(new ByteArrayInputStream(bytes));
            while(dis.readBoolean()) {
              byte[] row = new byte[dis.readInt()];
              dis.readFully(row);
              int len = dis.readInt();
              if (len > valuebytes.length) {
                valuebytes = new byte[len];
              }
              dis.readFully(valuebytes, 0, len);
              DecoderFactory decoderFactory = new DecoderFactory();
              Decoder d = decoderFactory.binaryDecoder(new ByteArrayInputStream(valuebytes, 0, len), null);
              // Read the data
              SpecificDatumReader<Beacon> sdr = new SpecificDatumReader<Beacon>(writerSchema, Beacon.SCHEMA$);
              Beacon read = sdr.read(null, d);
              beaconFAB.put(row, read);
            }
            return null;
          }
        });
      }
    }
    es.invokeAll(callables);
    es.shutdownNow();
  }

  @Test
  public void testBeaconScan() {
    final FAB<Beacon, byte[]> beaconFAB = new FAB<Beacon, byte[]>("/Volumes/Data/tmp/beacons", "/Volumes/Data/tmp/schemas", new Supplier<byte[]>() {
      Random r = new SecureRandom();
      @Override
      public byte[] get() {
        return Longs.toByteArray(r.nextLong());
      }
    }, Beacon.SCHEMA$, AvroFormat.BINARY, null);
    int total = 0;
    for (Row<Beacon, byte[]> beaconRow : beaconFAB.scan(null, null)) {
      total++;
    }
    System.out.println(total);
  }
}
TOP

Related Classes of avrobase.file.FABTest

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.