Package com.netflix.astyanax.thrift

Source Code of com.netflix.astyanax.thrift.ThriftKeyspaceImplTest$UserInfo

package com.netflix.astyanax.thrift;

import java.io.IOException;
import java.io.PrintWriter;
import java.io.Serializable;
import java.io.StringReader;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Random;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;

import junit.framework.Assert;

import org.apache.cassandra.thrift.KsDef;
import org.apache.cassandra.utils.Pair;
import org.apache.commons.lang.RandomStringUtils;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.MapDifference;
import com.google.common.collect.Maps;
import com.netflix.astyanax.AstyanaxContext;
import com.netflix.astyanax.Cluster;
import com.netflix.astyanax.ColumnListMutation;
import com.netflix.astyanax.ExceptionCallback;
import com.netflix.astyanax.Keyspace;
import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.RowCallback;
import com.netflix.astyanax.Serializer;
import com.netflix.astyanax.SerializerPackage;
import com.netflix.astyanax.connectionpool.Host;
import com.netflix.astyanax.connectionpool.NodeDiscoveryType;
import com.netflix.astyanax.connectionpool.OperationResult;
import com.netflix.astyanax.connectionpool.TokenRange;
import com.netflix.astyanax.connectionpool.exceptions.BadRequestException;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import com.netflix.astyanax.connectionpool.exceptions.NotFoundException;
import com.netflix.astyanax.connectionpool.impl.ConnectionPoolConfigurationImpl;
import com.netflix.astyanax.connectionpool.impl.ConnectionPoolType;
import com.netflix.astyanax.connectionpool.impl.CountingConnectionPoolMonitor;
import com.netflix.astyanax.cql.CqlStatementResult;
import com.netflix.astyanax.ddl.ColumnFamilyDefinition;
import com.netflix.astyanax.ddl.FieldMetadata;
import com.netflix.astyanax.ddl.KeyspaceDefinition;
import com.netflix.astyanax.impl.AstyanaxConfigurationImpl;
import com.netflix.astyanax.impl.FilteringHostSupplier;
import com.netflix.astyanax.impl.RingDescribeHostSupplier;
import com.netflix.astyanax.model.Column;
import com.netflix.astyanax.model.ColumnFamily;
import com.netflix.astyanax.model.ColumnList;
import com.netflix.astyanax.model.ColumnSlice;
import com.netflix.astyanax.model.ConsistencyLevel;
import com.netflix.astyanax.model.CqlResult;
import com.netflix.astyanax.model.Equality;
import com.netflix.astyanax.model.Row;
import com.netflix.astyanax.model.Rows;
import com.netflix.astyanax.query.AllRowsQuery;
import com.netflix.astyanax.query.ColumnQuery;
import com.netflix.astyanax.query.IndexQuery;
import com.netflix.astyanax.query.PreparedIndexExpression;
import com.netflix.astyanax.query.RowQuery;
import com.netflix.astyanax.retry.ExponentialBackoff;
import com.netflix.astyanax.serializers.AnnotatedCompositeSerializer;
import com.netflix.astyanax.serializers.ByteBufferSerializer;
import com.netflix.astyanax.serializers.LongSerializer;
import com.netflix.astyanax.serializers.ObjectSerializer;
import com.netflix.astyanax.serializers.PrefixedSerializer;
import com.netflix.astyanax.serializers.SerializerPackageImpl;
import com.netflix.astyanax.serializers.StringSerializer;
import com.netflix.astyanax.serializers.TimeUUIDSerializer;
import com.netflix.astyanax.serializers.UnknownComparatorException;
import com.netflix.astyanax.test.SessionEvent;
import com.netflix.astyanax.util.ColumnarRecordWriter;
import com.netflix.astyanax.util.CsvColumnReader;
import com.netflix.astyanax.util.CsvRecordReader;
import com.netflix.astyanax.util.JsonRowsWriter;
import com.netflix.astyanax.util.RangeBuilder;
import com.netflix.astyanax.util.RecordReader;
import com.netflix.astyanax.util.RecordWriter;
import com.netflix.astyanax.util.SingletonEmbeddedCassandra;
import com.netflix.astyanax.util.TimeUUIDUtils;

public class ThriftKeyspaceImplTest {

    private static Logger LOG = LoggerFactory.getLogger(ThriftKeyspaceImplTest.class);

    private static Keyspace                  keyspace;
    private static AstyanaxContext<Keyspace> keyspaceContext;

    private static ColumnFamily<String, String> CF_USER_INFO = ColumnFamily.newColumnFamily(
            "UserInfo", // Column Family Name
            StringSerializer.get(), // Key Serializer
            StringSerializer.get()); // Column Serializer

    private static ColumnFamily<Long, Long> CF_DELETE = ColumnFamily
            .newColumnFamily(
                    "delete",
                    LongSerializer.get(),
                    LongSerializer.get());
   
    private static ColumnFamily<Long, String> CF_USERS = ColumnFamily
            .newColumnFamily(
                    "users",
                    LongSerializer.get(),
                    StringSerializer.get());

    private static ColumnFamily<String, String> CF_TTL = ColumnFamily
            .newColumnFamily(
                    "ttl",
                    StringSerializer.get(),
                    StringSerializer.get());

    public static ColumnFamily<String, String> CF_CQL = ColumnFamily
            .newColumnFamily(
                    "cfcql",
                    StringSerializer.get(),
                    StringSerializer.get());

    public static ColumnFamily<String, String> CF_CALLBACK = ColumnFamily
            .newColumnFamily(
                    "cfcallback",
                    StringSerializer.get(),
                    StringSerializer.get());

    public static ColumnFamily<String, String> CF_STANDARD1 = ColumnFamily
            .newColumnFamily(
                    "Standard1",
                    StringSerializer.get(),
                    StringSerializer.get());

    public static ColumnFamily<String, Long> CF_LONGCOLUMN = ColumnFamily
            .newColumnFamily(
                    "LongColumn1",
                    StringSerializer.get(),
                    LongSerializer.get());

    public static ColumnFamily<String, String> CF_STANDARD2 = ColumnFamily
            .newColumnFamily(
                    "Standard2",
                    StringSerializer.get(),
                    StringSerializer.get());

    public static ColumnFamily<String, String> CF_ALLROWS = ColumnFamily
            .newColumnFamily(
                    "AllRows",
                    StringSerializer.get(),
                    StringSerializer.get());

    public static ColumnFamily<String, String> CF_COUNTER1 = ColumnFamily
            .newColumnFamily(
                    "Counter1",
                    StringSerializer.get(),
                    StringSerializer.get());

    public static ColumnFamily<String, String> CF_NOT_DEFINED = ColumnFamily
            .newColumnFamily(
                    "NotDefined",
                    StringSerializer.get(),
                    StringSerializer.get());

    public static ColumnFamily<String, String> CF_EMPTY = ColumnFamily
            .newColumnFamily(
                    "NotDefined",
                    StringSerializer.get(),
                    StringSerializer.get());

    public static ColumnFamily<Long, Long> ATOMIC_UPDATES = ColumnFamily
            .newColumnFamily(
                    "AtomicUpdates",
                    LongSerializer.get(),
                    LongSerializer.get());

    public static AnnotatedCompositeSerializer<MockCompositeType> M_SERIALIZER = new AnnotatedCompositeSerializer<MockCompositeType>(
            MockCompositeType.class);
   
    public static ColumnFamily<String, MockCompositeType> CF_COMPOSITE = ColumnFamily
            .newColumnFamily(
                    "CompositeColumn",
                    StringSerializer.get(),
                    M_SERIALIZER);

    public static ColumnFamily<ByteBuffer, ByteBuffer> CF_COMPOSITE_CSV = ColumnFamily
            .newColumnFamily(
                    "CompositeCsv",
                    ByteBufferSerializer.get(),
                    ByteBufferSerializer.get());

    public static ColumnFamily<MockCompositeType, String> CF_COMPOSITE_KEY = ColumnFamily
            .newColumnFamily(
                    "CompositeKey",
                    M_SERIALIZER,
                    StringSerializer.get());

    public static ColumnFamily<String, UUID> CF_TIME_UUID = ColumnFamily
            .newColumnFamily(
                    "TimeUUID1",
                    StringSerializer.get(),
                    TimeUUIDSerializer.get());

    public static AnnotatedCompositeSerializer<SessionEvent> SE_SERIALIZER = new AnnotatedCompositeSerializer<SessionEvent>(
            SessionEvent.class);

    public static ColumnFamily<String, SessionEvent> CF_CLICK_STREAM = ColumnFamily
            .newColumnFamily("ClickStream", StringSerializer.get(),
                    SE_SERIALIZER);

    private static final String SEEDS = "localhost:9160";
    private static final long   CASSANDRA_WAIT_TIME = 3000;
    private static String TEST_CLUSTER_NAME  = "cass_sandbox";
    private static String TEST_KEYSPACE_NAME = "AstyanaxUnitTests";

   
    @BeforeClass
    public static void setup() throws Exception {
        System.out.println("TESTING THRIFT KEYSPACE");

        SingletonEmbeddedCassandra.getInstance();
       
        Thread.sleep(CASSANDRA_WAIT_TIME);
       
        createKeyspace();
    }

    @AfterClass
    public static void teardown() throws Exception {
        if (keyspaceContext != null)
            keyspaceContext.shutdown();
       
        Thread.sleep(CASSANDRA_WAIT_TIME);
    }

    public static void createKeyspace() throws Exception {
        keyspaceContext = new AstyanaxContext.Builder()
                .forCluster(TEST_CLUSTER_NAME)
                .forKeyspace(TEST_KEYSPACE_NAME)
                .withAstyanaxConfiguration(
                        new AstyanaxConfigurationImpl()
                                .setDiscoveryType(NodeDiscoveryType.RING_DESCRIBE)
                                .setConnectionPoolType(ConnectionPoolType.ROUND_ROBIN)
                                .setDiscoveryDelayInSeconds(60000))
                .withConnectionPoolConfiguration(
                        new ConnectionPoolConfigurationImpl(TEST_CLUSTER_NAME
                                + "_" + TEST_KEYSPACE_NAME)
                                .setSocketTimeout(30000)
                                .setMaxTimeoutWhenExhausted(2000)
                                .setMaxConnsPerHost(20)
                                .setInitConnsPerHost(10)
                                .setSeeds(SEEDS)
                                )
                .withConnectionPoolMonitor(new CountingConnectionPoolMonitor())
                .buildKeyspace(ThriftFamilyFactory.getInstance());

        keyspaceContext.start();
       
        keyspace = keyspaceContext.getClient();
       
        try {
            keyspace.dropKeyspace();
        }
        catch (Exception e) {
            LOG.info(e.getMessage());
        }
       
        ImmutableMap<String, Object> ksOptions = ImmutableMap.<String, Object>builder()
                .put("strategy_options", ImmutableMap.<String, Object>builder()
                        .put("replication_factor", "1")
                        .build())
                .put("strategy_class",     "SimpleStrategy")
                .build();
       
        ImmutableMap<String, Object> NO_OPTIONS = ImmutableMap.of();
       
        Map<ColumnFamily, Map<String, Object>> cfs = ImmutableMap.<ColumnFamily, Map<String, Object>>builder()
                .put(CF_STANDARD1,
                     ImmutableMap.<String, Object>builder()
                         .put("column_metadata", ImmutableMap.<String, Object>builder()
                             .put("Index1", ImmutableMap.<String, Object>builder()
                                 .put("validation_class", "UTF8Type")
                                 .put("index_type",       "KEYS")
                                 .build())
                             .put("Index2", ImmutableMap.<String, Object>builder()
                                 .put("validation_class", "UTF8Type")
                                 .put("index_type",       "KEYS")
                                 .build())
                             .build())
                         .build())
                .put(CF_TTL,        NO_OPTIONS)
                .build();
        keyspace.createKeyspace(ksOptions, cfs);
       
        keyspace.createColumnFamily(CF_STANDARD2,  null);
        keyspace.createColumnFamily(CF_ALLROWS,  null);
        keyspace.createColumnFamily(CF_LONGCOLUMN, null);
        keyspace.createColumnFamily(CF_DELETE,     null);
        keyspace.createColumnFamily(ATOMIC_UPDATES,null);
        keyspace.createColumnFamily(CF_CQL,  null);
        keyspace.createColumnFamily(CF_CALLBACK,  null);
       
        keyspace.createColumnFamily(CF_COUNTER1, ImmutableMap.<String, Object>builder()
                .put("default_validation_class", "CounterColumnType")
                .build());
        keyspace.createColumnFamily(CF_CLICK_STREAM, ImmutableMap.<String, Object>builder()
                .put("comparator_type", "CompositeType(UTF8Type, TimeUUIDType)")
                .build());
        keyspace.createColumnFamily(CF_COMPOSITE_CSV, ImmutableMap.<String, Object>builder()
                .put("default_validation_class", "UTF8Type")
                .put("key_validation_class",     "UTF8Type")
                .put("comparator_type",          "CompositeType(UTF8Type, LongType)")
                .build());
        keyspace.createColumnFamily(CF_COMPOSITE, ImmutableMap.<String, Object>builder()
                .put("comparator_type", "CompositeType(AsciiType, IntegerType(reversed=true), IntegerType, BytesType, UTF8Type)")
                .build());
        keyspace.createColumnFamily(CF_COMPOSITE_KEY, ImmutableMap.<String, Object>builder()
                .put("key_validation_class", "BytesType")
                .build());
        keyspace.createColumnFamily(CF_TIME_UUID,         null);
        keyspace.createColumnFamily(CF_USER_INFO,         null);
        keyspace.createColumnFamily(CF_USERS, ImmutableMap.<String, Object>builder()
                .put("default_validation_class", "UTF8Type")
                .put("column_metadata", ImmutableMap.<String, Object>builder()
                        .put("firstname",  ImmutableMap.<String, Object>builder()
                                .put("validation_class", "UTF8Type")
                                .put("index_type",       "KEYS")
                                .build())
                        .put("lastname", ImmutableMap.<String, Object>builder()
                                .put("validation_class", "UTF8Type")
                                .put("index_type",       "KEYS")
                                .build())
                        .put("age", ImmutableMap.<String, Object>builder()
                                .put("validation_class", "LongType")
                                .put("index_type",       "KEYS")
                                .build())
                        .build())
                     .build());
       
        KeyspaceDefinition ki = keyspaceContext.getClient().describeKeyspace();
        System.out.println("Describe Keyspace: " + ki.getName());

        try {
            //
            // CF_Super :
            // 'A' :
            // 'a' :
            // 1 : 'Aa1',
            // 2 : 'Aa2',
            // 'b' :
            // ...
            // 'z' :
            // ...
            // 'B' :
            // ...
            //
            // CF_Standard :
            // 'A' :
            // 'a' : 1,
            // 'b' : 2,
            // ...
            // 'z' : 26,
            // 'B' :
            // ...
            //

            MutationBatch m;
            OperationResult<Void> result;
            m = keyspace.prepareMutationBatch();

            for (char keyName = 'A'; keyName <= 'Z'; keyName++) {
                String rowKey = Character.toString(keyName);
                ColumnListMutation<String> cfmStandard = m.withRow(
                        CF_STANDARD1, rowKey);
                for (char cName = 'a'; cName <= 'z'; cName++) {
                    cfmStandard.putColumn(Character.toString(cName),
                            (int) (cName - 'a') + 1, null);
                }
                cfmStandard
                        .putColumn("Index1", (int) (keyName - 'A') + 1, null);
                cfmStandard.putColumn("Index2", 42, null);
                m.execute();
            }

            m.withRow(CF_STANDARD1, "Prefixes").putColumn("Prefix1_a", 1, null)
                    .putColumn("Prefix1_b", 2, null)
                    .putColumn("prefix2_a", 3, null);

            result = m.execute();

            String rowKey = "A";
            ColumnListMutation<Long> cfmLong = m.withRow(CF_LONGCOLUMN, rowKey);
            for (Long l = -10L; l < 10L; l++) {
                cfmLong.putEmptyColumn(l, null);
            }
            cfmLong.putEmptyColumn(Long.MAX_VALUE, null);
            result = m.execute();

            m.withRow(CF_USER_INFO, "acct1234")
                .putColumn("firstname", "john", null)
                .putColumn("lastname", "smith", null)
                .putColumn("address", "555 Elm St", null)
                .putColumn("age", 30, null)
                .putEmptyColumn("empty");

            m.execute();
           
           
            // Inserts for CF_ALLROWS
            m = keyspace.prepareMutationBatch();

            for (char keyName = 'A'; keyName <= 'Z'; keyName++) {
                rowKey = Character.toString(keyName);
                ColumnListMutation<String> cfmStandard = m.withRow(
                        CF_ALLROWS, rowKey);
                for (char cName = 'a'; cName <= 'z'; cName++) {
                    cfmStandard.putColumn(Character.toString(cName),
                            (int) (cName - 'a') + 1, null);
                }
                m.execute();
            }
           

        } catch (Exception e) {
            System.out.println(e.getMessage());
            Assert.fail();
        }
    }

    @Test
    public void testMultiColumnDelete() throws Exception {
        MutationBatch mb = keyspace.prepareMutationBatch();
        mb.withRow(CF_DELETE, 1L)
            .setTimestamp(1).putEmptyColumn(1L, null)
            .setTimestamp(10).putEmptyColumn(2L, null)
            ;
        mb.execute();
       
        ColumnList<Long> result1 = keyspace.prepareQuery(CF_DELETE).getRow(1L).execute().getResult();
        Assert.assertEquals(2, result1.size());
        Assert.assertNotNull(result1.getColumnByName(1L));
        Assert.assertNotNull(result1.getColumnByName(2L));
       
        logColumnList("Insert", result1);
       
        mb = keyspace.prepareMutationBatch();
        mb.withRow(CF_DELETE,  1L)
            .setTimestamp(result1.getColumnByName(1L).getTimestamp()-1)
            .deleteColumn(1L)
            .setTimestamp(result1.getColumnByName(2L).getTimestamp()-1)
            .deleteColumn(2L)
            .putEmptyColumn(3L, null);
       
        mb.execute();
       
        result1 = keyspace.prepareQuery(CF_DELETE).getRow(1L).execute().getResult();
        logColumnList("Delete with older timestamp", result1);
        Assert.assertEquals(3, result1.size());
       
        LOG.info("Delete L2 with TS: " + (result1.getColumnByName(2L).getTimestamp()+1));
        mb.withRow(CF_DELETE,  1L)
            .setTimestamp(result1.getColumnByName(1L).getTimestamp()+1)
            .deleteColumn(1L)
            .setTimestamp(result1.getColumnByName(2L).getTimestamp()+1)
            .deleteColumn(2L);
        mb.execute();
       
        result1 = keyspace.prepareQuery(CF_DELETE).getRow(1L).execute().getResult();
        logColumnList("Delete with newer timestamp", result1);
        Assert.assertEquals(1, result1.size());
    }
   
    <T> void logColumnList(String label, ColumnList<T> cl) {
        LOG.info(">>>>>> " + label);
        for (Column<T> c : cl) {
            LOG.info(c.getName() + " " + c.getTimestamp());
        }
        LOG.info("<<<<<<");
    }
   
    @Test
    public void testCqlComposite() throws Exception {
        CqlStatementResult result = keyspace.prepareCqlStatement()
            .withCql("SELECT * FROM " + CF_COMPOSITE_CSV.getName())
            .execute()
            .getResult();
       
        result.getSchema();
        result.getRows(CF_COMPOSITE_CSV);
    }
   
    @Test
    public void testHasValue() throws Exception {
        ColumnList<String> response = keyspace.prepareQuery(CF_USER_INFO).getRow("acct1234").execute().getResult();
        Assert.assertEquals("firstname", response.getColumnByName("firstname").getName());
        Assert.assertEquals("firstname", response.getColumnByName("firstname").getName());
        Assert.assertEquals("john", response.getColumnByName("firstname").getStringValue());
        Assert.assertEquals("john", response.getColumnByName("firstname").getStringValue());
        Assert.assertEquals(true,  response.getColumnByName("firstname").hasValue());
        Assert.assertEquals(false, response.getColumnByName("empty").hasValue());
       
    }
   
    @Test
    public void getKeyspaceDefinition() throws Exception {
        KeyspaceDefinition def = keyspaceContext.getEntity().describeKeyspace();
        Collection<String> fieldNames = def.getFieldNames();
        LOG.info("Getting field names");
        for (String field : fieldNames) {
            LOG.info(field);
        }
        LOG.info(fieldNames.toString());
       
        for (FieldMetadata field : def.getFieldsMetadata()) {
            LOG.info(field.getName() + " = " + def.getFieldValue(field.getName()) + " (" + field.getType() + ")");
        }
       
        for (ColumnFamilyDefinition cfDef : def.getColumnFamilyList()) {
            LOG.info("----------" );
            for (FieldMetadata field : cfDef.getFieldsMetadata()) {
                LOG.info(field.getName() + " = " + cfDef.getFieldValue(field.getName()) + " (" + field.getType() + ")");
            }
        }
    }
   
    @Test
    public void testCopyKeyspace() throws Exception {
        KeyspaceDefinition def = keyspaceContext.getEntity().describeKeyspace();
        Properties props = def.getProperties();
       
        for (Entry<Object, Object> prop : props.entrySet()) {
            LOG.info(prop.getKey() + " : " + prop.getValue());
        }
       
        KsDef def2 = ThriftUtils.getThriftObjectFromProperties(KsDef.class, props);
        Properties props2 = ThriftUtils.getPropertiesFromThrift(def2);

        LOG.info("Props1:" + new TreeMap<Object, Object>(props));
        LOG.info("Props2:" + new TreeMap<Object, Object>(props2));
        MapDifference<Object, Object> diff = Maps.difference(props,  props2);
        LOG.info("Not copied : " + diff.entriesOnlyOnLeft());
        LOG.info("Added      : " + diff.entriesOnlyOnRight());
        LOG.info("Differing  : " + diff.entriesDiffering());
       
       
        Assert.assertTrue(diff.areEqual());
    }
   
    @Test
    public void testNonExistentKeyspace()  {
        AstyanaxContext<Keyspace> ctx = new AstyanaxContext.Builder()
            .forCluster(TEST_CLUSTER_NAME)
            .forKeyspace(TEST_KEYSPACE_NAME + "_NonExistent")
            .withAstyanaxConfiguration(
                    new AstyanaxConfigurationImpl()
                            .setDiscoveryType(NodeDiscoveryType.RING_DESCRIBE)
                            .setConnectionPoolType(ConnectionPoolType.ROUND_ROBIN)
                            .setDiscoveryDelayInSeconds(60000))
            .withConnectionPoolConfiguration(
                    new ConnectionPoolConfigurationImpl(TEST_CLUSTER_NAME
                            + "_" + TEST_KEYSPACE_NAME)
                            .setSocketTimeout(30000)
                            .setMaxTimeoutWhenExhausted(2000)
                            .setMaxConnsPerHost(20)
                            .setInitConnsPerHost(10)
                            .setSeeds(SEEDS))
            .withConnectionPoolMonitor(new CountingConnectionPoolMonitor())
            .buildKeyspace(ThriftFamilyFactory.getInstance());       
       
        ctx.start();
       
        try {
            KeyspaceDefinition keyspaceDef = ctx.getEntity().describeKeyspace();
            Assert.fail();
        } catch (ConnectionException e) {
            LOG.info(e.getMessage());
        }
       
    }
   
    @Test
    public void testDescribeRing() throws Exception {
        // [TokenRangeImpl [startToken=0, endToken=0, endpoints=[127.0.0.1]]]
        List<TokenRange> ring = keyspaceContext.getEntity().describeRing();
        LOG.info(ring.toString());
       
        // 127.0.0.1
        RingDescribeHostSupplier ringSupplier = new RingDescribeHostSupplier(keyspaceContext.getEntity(), 9160);
        List<Host> hosts = ringSupplier.get();
        Assert.assertEquals(1, hosts.get(0).getTokenRanges().size());
        LOG.info(hosts.toString());
       
        Supplier<List<Host>> sourceSupplier1 = Suppliers.ofInstance((List<Host>)Lists.newArrayList(new Host("127.0.0.1", 9160)));
        Supplier<List<Host>> sourceSupplier2 = Suppliers.ofInstance((List<Host>)Lists.newArrayList(new Host("127.0.0.2", 9160)));
       
        // 127.0.0.1
        LOG.info(sourceSupplier1.get().toString());
       
        // 127.0.0.2
        LOG.info(sourceSupplier2.get().toString());
       
        hosts = new FilteringHostSupplier(ringSupplier, sourceSupplier1).get();
        LOG.info(hosts.toString());
       
        Assert.assertEquals(1, hosts.size());
        Assert.assertEquals(1, hosts.get(0).getTokenRanges().size());
        hosts = new FilteringHostSupplier(ringSupplier, sourceSupplier2).get();
        LOG.info(hosts.toString());
        Assert.assertEquals(1, hosts.size());
    }
   
    @Test
    public void paginateColumns() throws Exception {
        String column = "";
        ColumnList<String> columns;
        int pageize = 10;
        RowQuery<String, String> query = keyspace
                .prepareQuery(CF_STANDARD1)
                .getKey("A")
                .autoPaginate(true)
                .withColumnRange(
                        new RangeBuilder().setStart(column)
                                .setLimit(pageize).build());

        while (!(columns = query.execute().getResult()).isEmpty()) {
            for (Column<String> c : columns) {
            }
            // column = Iterables.getLast(columns).getName() + "\u0000";
        }
    }

    @Test
    public void example() {
        AstyanaxContext<Keyspace> context = new AstyanaxContext.Builder()
                .forCluster(TEST_CLUSTER_NAME)
                .forKeyspace(TEST_KEYSPACE_NAME)
                .withAstyanaxConfiguration(
                        new AstyanaxConfigurationImpl()
                                .setDiscoveryType(NodeDiscoveryType.NONE))
                .withConnectionPoolConfiguration(
                        new ConnectionPoolConfigurationImpl("MyConnectionPool")
                                .setMaxConnsPerHost(1).setSeeds(
                                        "127.0.0.1:9160"))
                .withConnectionPoolMonitor(new CountingConnectionPoolMonitor())
                .buildKeyspace(ThriftFamilyFactory.getInstance());

        context.start();
        Keyspace keyspace = context.getEntity();

        MutationBatch m = keyspace.prepareMutationBatch();

        // m.withRow(CF_USER_STATS, "acct1234")
        // .incrementCounterColumn("loginCount", 1);

        try {
            OperationResult<Void> result = m.execute();
        } catch (ConnectionException e) {
            System.out.println(e);
        }

        try {
            OperationResult<ColumnList<String>> result = keyspace
                    .prepareQuery(CF_USER_INFO).getKey("acct1234").execute();
            ColumnList<String> columns = result.getResult();

            // Lookup columns in response by name
            int age = columns.getColumnByName("age").getIntegerValue();
            String address = columns.getColumnByName("address")
                    .getStringValue();

            // Or, iterate through the columns
            for (Column<String> c : result.getResult()) {
                System.out.println(c.getName());
            }
        } catch (ConnectionException e) {
            System.out.println(e);
        }

    }
   
    @Test
    public void paginateLongColumns() {
        Long column = Long.MIN_VALUE;
        ColumnList<Long> columns;
        int pageize = 10;
        try {
            RowQuery<String, Long> query = keyspace
                    .prepareQuery(CF_LONGCOLUMN)
                    .getKey("A")
                    .autoPaginate(true)
                    .withColumnRange(
                            new RangeBuilder().setStart(column)
                                    .setLimit(pageize).build());

            while (!(columns = query.execute().getResult()).isEmpty()) {
                LOG.info("-----");
                for (Column<Long> c : columns) {
                    LOG.info(Long.toString(c.getName()));
                }
                // column = Iterables.getLast(columns).getName() + "\u0000";
            }
        } catch (ConnectionException e) {
        }
    }

    @Test
    public void getAll() {
        AtomicLong counter = new AtomicLong(0);
        try {
            OperationResult<Rows<String, String>> rows = keyspace
                    .prepareQuery(CF_ALLROWS).getAllRows().setConcurrencyLevel(2).setRowLimit(10)
                    .setRepeatLastToken(false)
                    .withColumnRange(new RangeBuilder().setLimit(0).build())
                    .setExceptionCallback(new ExceptionCallback() {
                        @Override
                        public boolean onException(ConnectionException e) {
                            Assert.fail(e.getMessage());
                            return true;
                        }
                    }).execute();
            for (Row<String, String> row : rows.getResult()) {
                counter.incrementAndGet();
                LOG.info("ROW: " + row.getKey() + " " + row.getColumns().size());
            }
            Assert.assertEquals(26, counter.get());
        } catch (ConnectionException e) {
            Assert.fail();
        }
    }
   
    @Test
    public void getAllWithCallback() {
        try {
         
            MutationBatch m = keyspace.prepareMutationBatch();
            for (char keyName = 'A'; keyName <= 'Z'; keyName++) {
                String rowKey = Character.toString(keyName);
                ColumnListMutation<String> cfmStandard = m.withRow(CF_CALLBACK, rowKey);
                for (char cName = 'a'; cName <= 'z'; cName++) {
                    cfmStandard.putColumn(Character.toString(cName),
                            (int) (cName - 'a') + 1, null);
                }
                m.execute();
            }

            final AtomicLong counter = new AtomicLong();

            keyspace.prepareQuery(CF_CALLBACK).getAllRows().setRowLimit(3)
                    .setRepeatLastToken(false)
                    .setConcurrencyLevel(2)
                    .withColumnRange(new RangeBuilder().setLimit(2).build())
                    .executeWithCallback(new RowCallback<String, String>() {
                        @Override
                        public void success(Rows<String, String> rows) {
                            for (Row<String, String> row : rows) {
                                LOG.info("ROW: " + row.getKey() + " "
                                        + row.getColumns().size());
                                counter.incrementAndGet();
                            }
                        }

                        @Override
                        public boolean failure(ConnectionException e) {
                            LOG.error(e.getMessage(), e);
                            return false;
                        }
                    });
            LOG.info("Read " + counter.get() + " keys");
            Assert.assertEquals(26,  counter.get());
        } catch (ConnectionException e) {
            Assert.fail();
        }
    }

    static class UserInfo implements Serializable {
        private static final long serialVersionUID = 6366200973810770033L;

        private String firstName;
        private String lastName;

        public UserInfo() {

        }

        public void setFirstName(String firstName) {
            this.firstName = firstName;
        }

        public String getFirstName() {
            return this.firstName;
        }

        public void setLastName(String lastName) {
            this.lastName = lastName;
        }

        public String getLastName() {
            return this.lastName;
        }

        public boolean equals(Object other) {
            UserInfo smo = (UserInfo) other;
            return firstName.equals(smo.firstName)
                    && lastName.equals(smo.lastName);
        }
    }

    @Test
    public void testSerializedClassValue() {
        UserInfo smo = new UserInfo();
        smo.setLastName("Landau");
        smo.setFirstName("Eran");

        try {
            ByteBuffer bb = ObjectSerializer.get().toByteBuffer(smo);
            keyspace.prepareColumnMutation(CF_STANDARD1, "Key_SerializeTest",
                    "Column1").putValue(bb, null).execute();

            UserInfo smo2 = (UserInfo) keyspace.prepareQuery(CF_STANDARD1)
                    .getKey("Key_SerializeTest").getColumn("Column1").execute()
                    .getResult().getValue(ObjectSerializer.get());

            Assert.assertEquals(smo, smo2);
        } catch (ConnectionException e) {
            Assert.fail();
        }
    }   
   
    @Test
    public void testSingleOps() throws Exception {
        String key = "SingleOpsTest";
        Random prng = new Random();

        // Set a string value
        {
            String column = "StringColumn";
            String value = RandomStringUtils.randomAlphanumeric(32);
            // Set
            keyspace.prepareColumnMutation(CF_STANDARD1, key, column)
                    .putValue(value, null).execute();
            // Read
            ColumnQuery<String> query = keyspace.prepareQuery(CF_STANDARD1).getKey(key)
                    .getColumn(column);
           
            String v = query.execute().getResult().getStringValue();
            Assert.assertEquals(value, v);
           
            v = query.execute().getResult().getStringValue();
            Assert.assertEquals(value, v);

            // Delete
            keyspace.prepareColumnMutation(CF_STANDARD1, key, column)
                    .deleteColumn().execute();
            try {
                keyspace.prepareQuery(CF_STANDARD1).getKey(key)
                        .getColumn(column).execute().getResult()
                        .getStringValue();
                Assert.fail();
            } catch (NotFoundException e) {
            } catch (ConnectionException e) {
                Assert.fail();
            }
        }

        // Set a byte value
        {
            String column = "ByteColumn";
            byte value = (byte) prng.nextInt(Byte.MAX_VALUE);
            // Set
            keyspace.prepareColumnMutation(CF_STANDARD1, key, column)
                    .putValue(value, null).execute();
            // Read
            byte v = keyspace.prepareQuery(CF_STANDARD1).getKey(key)
                    .getColumn(column).execute().getResult().getByteValue();
            Assert.assertEquals(value, v);
            // Delete
            keyspace.prepareColumnMutation(CF_STANDARD1, key, column)
                    .deleteColumn().execute();
            // verify column gone
            try {
                keyspace.prepareQuery(CF_STANDARD1).getKey(key)
                        .getColumn(column).execute().getResult().getByteValue();
                Assert.fail();
            } catch (NotFoundException e) {
              // expected
            }
        }
       
        // Set a short value
        {
            String column = "ShortColumn";
            short value = (short) prng.nextInt(Short.MAX_VALUE);
            // Set
            keyspace.prepareColumnMutation(CF_STANDARD1, key, column)
                    .putValue(value, null).execute();
            // Read
            short v = keyspace.prepareQuery(CF_STANDARD1).getKey(key)
                    .getColumn(column).execute().getResult().getShortValue();
            Assert.assertEquals(value, v);
            // Delete
            keyspace.prepareColumnMutation(CF_STANDARD1, key, column)
                    .deleteColumn().execute();
            // verify column gone
            try {
                keyspace.prepareQuery(CF_STANDARD1).getKey(key)
                        .getColumn(column).execute().getResult().getShortValue();
                Assert.fail();
            } catch (NotFoundException e) {
              // expected
            }
        }
       
        // Set a int value
        {
            String column = "IntColumn";
            int value = prng.nextInt();
            // Set
            keyspace.prepareColumnMutation(CF_STANDARD1, key, column)
                    .putValue(value, null).execute();
            // Read
            int v = keyspace.prepareQuery(CF_STANDARD1).getKey(key)
                    .getColumn(column).execute().getResult().getIntegerValue();
            Assert.assertEquals(value, v);
            // Delete
            keyspace.prepareColumnMutation(CF_STANDARD1, key, column)
                    .deleteColumn().execute();
            // verify column gone
            try {
                keyspace.prepareQuery(CF_STANDARD1).getKey(key)
                        .getColumn(column).execute().getResult().getIntegerValue();
                Assert.fail();
            } catch (NotFoundException e) {
              // expected
            }
        }
       
        // Set a long value
        {
            String column = "LongColumn";
            long value = prng.nextLong();
            // Set
            keyspace.prepareColumnMutation(CF_STANDARD1, key, column)
                    .putValue(value, null).execute();
            // Read
            long v = keyspace.prepareQuery(CF_STANDARD1).getKey(key)
                    .getColumn(column).execute().getResult().getLongValue();
            Assert.assertEquals(value, v);
         // get as integer should fail
            try {
                keyspace.prepareQuery(CF_STANDARD1).getKey(key)
                        .getColumn(column).execute().getResult()
                        .getIntegerValue();
                Assert.fail();
            } catch (Exception e) {
              // expected
            }
            // Delete
            keyspace.prepareColumnMutation(CF_STANDARD1, key, column)
                    .deleteColumn().execute();
            // verify column gone
            try {
                keyspace.prepareQuery(CF_STANDARD1).getKey(key)
                        .getColumn(column).execute().getResult().getLongValue();
                Assert.fail();
            } catch (NotFoundException e) {
              // expected
            }
        }
       
        // Set a float value
        {
            String column = "FloatColumn";
            float value = prng.nextFloat();
            // Set
            keyspace.prepareColumnMutation(CF_STANDARD1, key, column)
                    .putValue(value, null).execute();
            // Read
            float v = keyspace.prepareQuery(CF_STANDARD1).getKey(key)
                    .getColumn(column).execute().getResult().getFloatValue();
            Assert.assertEquals(value, v);
            // Delete
            keyspace.prepareColumnMutation(CF_STANDARD1, key, column)
                    .deleteColumn().execute();
            // verify column gone
            try {
                keyspace.prepareQuery(CF_STANDARD1).getKey(key)
                        .getColumn(column).execute().getResult().getFloatValue();
                Assert.fail();
            } catch (NotFoundException e) {
              // expected
            }
        }

        // Set a double value
        {
            String column = "IntColumn";
            double value = prng.nextDouble();
            // Set
            keyspace.prepareColumnMutation(CF_STANDARD1, key, column)
                    .putValue(value, null).execute();
            // Read
            double v = keyspace.prepareQuery(CF_STANDARD1).getKey(key)
                    .getColumn(column).execute().getResult().getDoubleValue();
            Assert.assertEquals(value, v);
            // get as integer should fail
            try {
                keyspace.prepareQuery(CF_STANDARD1).getKey(key)
                        .getColumn(column).execute().getResult()
                        .getIntegerValue();
                Assert.fail();
            } catch (Exception e) {
              // expected
            }
            // Delete
            keyspace.prepareColumnMutation(CF_STANDARD1, key, column)
                    .deleteColumn().execute();
            try {
                keyspace.prepareQuery(CF_STANDARD1).getKey(key)
                        .getColumn(column).execute().getResult()
                        .getDoubleValue();
                Assert.fail();
            } catch (NotFoundException e) {
            } catch (ConnectionException e) {
                Assert.fail();
            }
        }
       
        // Set long column with timestamp
        {
            String column = "TimestampColumn";
            long value = prng.nextLong();

            // Set
            keyspace.prepareColumnMutation(CF_STANDARD1, key, column)
                    .withTimestamp(100)
                    .putValue(value, null)
                    .execute();

            // Read
            Column<String> c = keyspace.prepareQuery(CF_STANDARD1).getKey(key)
                    .getColumn(column).execute().getResult();
            Assert.assertEquals(100,  c.getTimestamp());
        }
    }

    @Test
    public void testTimeUUIDUnique() {
        long now = System.currentTimeMillis();
        UUID uuid1 = TimeUUIDUtils.getTimeUUID(now);
        UUID uuid2 = TimeUUIDUtils.getTimeUUID(now);
        LOG.info(uuid1.toString());
        LOG.info(uuid2.toString());
        Assert.assertTrue(uuid1.equals(uuid2));
    }

    @Test
    public void testTimeUUID2() {
        MutationBatch m = keyspace.prepareMutationBatch();
        String rowKey = "Key2";
        m.withRow(CF_TIME_UUID, rowKey).delete();
        try {
            m.execute();
        } catch (ConnectionException e) {
            Assert.fail(e.getMessage());
        }

        long now = System.currentTimeMillis();
        long msecPerDay = 86400000;
        for (int i = 0; i < 100; i++) {
            m.withRow(CF_TIME_UUID, rowKey).putColumn(
                    TimeUUIDUtils.getTimeUUID(now - i * msecPerDay), i, null);
        }
        try {
            m.execute();
        } catch (ConnectionException e) {
            Assert.fail(e.getMessage());
        }

        try {
            OperationResult<ColumnList<UUID>> result = keyspace
                    .prepareQuery(CF_TIME_UUID)
                    .getKey(rowKey)
                    .withColumnRange(
                            new RangeBuilder()
                                    .setLimit(100)
                                    .setStart(
                                            TimeUUIDUtils.getTimeUUID(now - 20
                                                    * msecPerDay)).build())
                    .execute();
            for (Column<UUID> column : result.getResult()) {
                System.out.println((now - TimeUUIDUtils.getTimeFromUUID(column
                        .getName())) / msecPerDay);
            }
        } catch (ConnectionException e) {
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testTimeUUID() {
        MutationBatch m = keyspace.prepareMutationBatch();

        UUID columnName = TimeUUIDUtils.getUniqueTimeUUIDinMillis();
        long columnTime = TimeUUIDUtils.getTimeFromUUID(columnName);
        String rowKey = "Key1";

        m.withRow(CF_TIME_UUID, rowKey).delete();
        try {
            m.execute();
        } catch (ConnectionException e1) {
            Assert.fail();
        }

        int startTime = 100;
        int endTime = 200;

        m.withRow(CF_TIME_UUID, rowKey).putColumn(columnName, 42, null);
        for (int i = startTime; i < endTime; i++) {
            // UUID c = TimeUUIDUtils.getTimeUUID(i);
            LOG.info(TimeUUIDUtils.getTimeUUID(columnTime + i).toString());

            m.withRow(CF_TIME_UUID, rowKey).putColumn(
                    TimeUUIDUtils.getTimeUUID(columnTime + i), i, null);
        }

        try {
            m.execute();
        } catch (ConnectionException e) {
            LOG.error(e.getMessage(), e);
            Assert.fail();
        }

        try {
            OperationResult<Column<UUID>> result = keyspace
                    .prepareQuery(CF_TIME_UUID).getKey(rowKey)
                    .getColumn(columnName).execute();

            Assert.assertEquals(columnName, result.getResult().getName());
            Assert.assertTrue(result.getResult().getIntegerValue() == 42);

            OperationResult<ColumnList<UUID>> result2 = keyspace
                    .prepareQuery(CF_TIME_UUID).getKey(rowKey).execute();

            result2 = keyspace
                    .prepareQuery(CF_TIME_UUID)
                    .getKey(rowKey)
                    .withColumnRange(
                            new RangeBuilder()
                                    .setLimit(10)
                                    .setStart(TimeUUIDUtils.getTimeUUID(0))
                                    .setEnd(TimeUUIDUtils
                                            .getTimeUUID(Long.MAX_VALUE >> 8))
                                    .build()).execute();
            Assert.assertEquals(10, result2.getResult().size());

        } catch (ConnectionException e) {
            LOG.error(e.getMessage(), e);
            Assert.fail();
        }

        UUID currentUUID = TimeUUIDUtils.getUniqueTimeUUIDinMicros();

        SerializerPackage pkg = null;
        try {
            pkg = keyspace.getSerializerPackage(CF_TIME_UUID.getName(), false);
        } catch (ConnectionException e) {
            Assert.fail();
            e.printStackTrace();
        } catch (UnknownComparatorException e) {
            Assert.fail();
            e.printStackTrace();
        }
        Serializer<UUID> serializer = (Serializer<UUID>) pkg
                .getColumnNameSerializer();

        ByteBuffer buffer = serializer.toByteBuffer(currentUUID);
        String value = serializer.getString(buffer);
        LOG.info("UUID Time = " + value);

        // Test timeUUID pagination
        RowQuery<String, UUID> query = keyspace
                .prepareQuery(CF_TIME_UUID)
                .getKey(rowKey)
                .withColumnRange(
                        new RangeBuilder()
                                .setLimit(10)
                                .setStart(
                                        TimeUUIDUtils.getTimeUUID(columnTime
                                                + startTime))
                                .setEnd(TimeUUIDUtils.getTimeUUID(columnTime
                                        + endTime)).build()).autoPaginate(true);
        OperationResult<ColumnList<UUID>> result;
        int pageCount = 0;
        int rowCount = 0;
        try {
            LOG.info("starting pagination");
            while (!(result = query.execute()).getResult().isEmpty()) {
                pageCount++;
                rowCount += result.getResult().size();
                LOG.info("==== Block ====");
                for (Column<UUID> column : result.getResult()) {
                    LOG.info("Column is " + column.getName());
                }
            }
            LOG.info("pagination complete");
        } catch (ConnectionException e) {
            Assert.fail();
            LOG.info(e.getMessage());
            e.printStackTrace();
        }

    }

    @Test
    public void testCopy() {
        String keyName = "A";

        try {
            keyspace.prepareQuery(CF_STANDARD1).getKey(keyName)
                    .copyTo(CF_STANDARD2, keyName).execute();

            ColumnList<String> list1 = keyspace.prepareQuery(CF_STANDARD1)
                    .getKey(keyName).execute().getResult();

            ColumnList<String> list2 = keyspace.prepareQuery(CF_STANDARD2)
                    .getKey(keyName).execute().getResult();

            Iterator<Column<String>> iter1 = list1.iterator();
            Iterator<Column<String>> iter2 = list2.iterator();

            while (iter1.hasNext()) {
                Column<String> column1 = iter1.next();
                Column<String> column2 = iter2.next();
                Assert.assertEquals(column1.getName(), column2.getName());
                Assert.assertEquals(column1.getByteBufferValue(),
                        column2.getByteBufferValue());
            }
            Assert.assertFalse(iter2.hasNext());

        } catch (ConnectionException e) {
            LOG.error(e.getMessage(), e);
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testMutationBatchMultipleWithRow() throws Exception {
        MutationBatch mb = keyspace.prepareMutationBatch();
       
        Long key = 9L;
       
        mb.withRow(CF_USERS, key).delete();
        mb.withRow(CF_USERS, key).putEmptyColumn("test", null);
       
        mb.execute();
       
        ColumnList<String> result = keyspace.prepareQuery(CF_USERS).getRow(key).execute().getResult();
       
        Assert.assertEquals(1, result.size());
    }
   
    @Test
    public void testAtomicBatchMutation() throws Exception {
        MutationBatch mb = keyspace.prepareMutationBatch().withAtomicBatch(true);
       
        mb.withRow(ATOMIC_UPDATES, 1L)
            .putColumn(11L, 11L)
            .putColumn(12L, 12L);
        mb.withRow(ATOMIC_UPDATES, 2L)
            .putColumn(21L, 21L)
            .putColumn(22L, 22L);
       
        mb.execute();
       
        Rows<Long, Long> result =
                keyspace.prepareQuery(ATOMIC_UPDATES).getAllRows().execute().getResult();
       
        int size = 0;

        for (Row<Long, Long> row : result) {
            LOG.info("ROW: " + row.getKey() + " " + row.getColumns().size());
            size++;
            Assert.assertEquals(2, row.getColumns().size());
        }
        Assert.assertEquals(2, size);
       
        size = 0;
        mb = keyspace.prepareMutationBatch().withAtomicBatch(true);
       
        mb.withRow(ATOMIC_UPDATES, 3L)
            .putColumn(11L, 11L)
            .putColumn(12L, 12L);
        mb.withRow(ATOMIC_UPDATES, 1L).delete();
        mb.withRow(ATOMIC_UPDATES, 2L).delete();
       
        mb.execute();

        result = keyspace.prepareQuery(ATOMIC_UPDATES).getAllRows().execute().getResult();

        for (Row<Long, Long> row : result) {
            LOG.info("ROW: " + row.getKey() + " " + row.getColumns().size());
            size++;
            Assert.assertEquals(2, row.getColumns().size());
        }
        Assert.assertEquals(1, size);
       
       mb = keyspace.prepareMutationBatch().withAtomicBatch(true);
       mb.withRow(ATOMIC_UPDATES, 3L).delete();
       mb.execute();
    }
   
    @Test
    public void testClickStream() {
        MutationBatch m = keyspace.prepareMutationBatch();
        String userId = "UserId";

        long timeCounter = 0;
        for (int i = 0; i < 10; i++) {
            String sessionId = "Session" + i;

            for (int j = 0; j < 10; j++) {
                m.withRow(CF_CLICK_STREAM, userId).putColumn(
                        new SessionEvent(sessionId,
                                TimeUUIDUtils.getTimeUUID(j)),
                        Long.toString(timeCounter), null);
                timeCounter++;
            }
        }

        try {
            m.execute();
        } catch (ConnectionException e) {
            LOG.error(e.getMessage(), e);
            Assert.fail();
        }

        try {
            OperationResult<ColumnList<SessionEvent>> result;

            result = keyspace
                    .prepareQuery(CF_CLICK_STREAM)
                    .getKey(userId)
                    .withColumnRange(
                            SE_SERIALIZER.buildRange()
                                    .greaterThanEquals("Session3")
                                    .lessThanEquals("Session5").build())
                    .execute();
            // Assert.assertEquals(10, result.getResult().size());
            // LOG.info("*********************** INCLUSIVE - INCLUSIVE");
            // for (Column<SessionEvent> column : result.getResult()) {
            // LOG.info("####### " + column.getName() + " = " +
            // column.getLongValue());
            // }

            result = keyspace
                    .prepareQuery(CF_CLICK_STREAM)
                    .getKey(userId)
                    .withColumnRange(
                            SE_SERIALIZER.buildRange()
                                    .greaterThanEquals("Session3")
                                    .lessThan("Session5").build()).execute();
            // Assert.assertEquals(10, result.getResult().size());
            // LOG.info("XXXXXXXXXXXXXXXXXXXXXXXX INCLUSIVE - NON_INCLUSIVE");
            // for (Column<SessionEvent> column : result.getResult()) {
            // LOG.info("####### " + column.getName() + " = " +
            // column.getLongValue());
            // }

            result = keyspace
                    .prepareQuery(CF_CLICK_STREAM)
                    .getKey(userId)
                    .withColumnRange(
                            SE_SERIALIZER.buildRange().greaterThan("Session3")
                                    .lessThanEquals("Session5").build())
                    .execute();
            // LOG.info("XXXXXXXXXXXXXXXXXXXXXXXX NON_INCLUSIVE - INCLUSIVE");
            // Assert.assertEquals(10, result.getResult().size());
            // for (Column<SessionEvent> column : result.getResult()) {
            // LOG.info("####### " + column.getName() + " = " +
            // column.getLongValue());
            // }

            result = keyspace
                    .prepareQuery(CF_CLICK_STREAM)
                    .getKey(userId)
                    .withColumnRange(
                            SE_SERIALIZER.buildRange().greaterThan("Session3")
                                    .lessThan("Session5").build()).execute();
            // LOG.info("XXXXXXXXXXXXXXXXXXXXXXXX NON_INCLUSIVE - NON_INCLUSIVE");
            // for (Column<SessionEvent> column : result.getResult()) {
            // LOG.info("####### " + column.getName() + " = " +
            // column.getLongValue());
            // }

            result = keyspace
                    .prepareQuery(CF_CLICK_STREAM)
                    .getKey(userId)
                    .withColumnRange(
                            SE_SERIALIZER
                                    .buildRange()
                                    .withPrefix("Session3")
                                    .greaterThanEquals(
                                            TimeUUIDUtils.getTimeUUID(2))
                                    .lessThanEquals(
                                            TimeUUIDUtils.getTimeUUID(8))
                                    .build()).execute();

            // Assert.assertEquals(10, result.getResult().size());
            // LOG.info("XXXXXXXXXXXXXXXXXXXXXXXX EQUAL - EQUAL");
            // for (Column<SessionEvent> column : result.getResult()) {
            // LOG.info("####### " + column.getName() + " = " +
            // column.getLongValue());
            // }
        } catch (ConnectionException e) {
            LOG.error(e.getMessage(), e);
            Assert.fail();
        }
    }

    @Test
    public void testChangeConsistencyLevel() {
        try {
            keyspace.prepareQuery(CF_STANDARD1)
                    .setConsistencyLevel(ConsistencyLevel.CL_ONE).getKey("A")
                    .execute();
        } catch (ConnectionException e) {
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testCompositeKey() {
        MockCompositeType key = new MockCompositeType("A", 1, 2, true, "B");
        MutationBatch m = keyspace.prepareMutationBatch();
        m.withRow(CF_COMPOSITE_KEY, key).putColumn("Test", "Value", null);
        try {
            m.execute();
        } catch (ConnectionException e) {
            LOG.error(e.getMessage(), e);
            Assert.fail();
        }

        try {
            ColumnList<String> row = keyspace.prepareQuery(CF_COMPOSITE_KEY)
                    .getKey(key).execute().getResult();
            Assert.assertFalse(row.isEmpty());
        } catch (ConnectionException e) {
            LOG.error(e.getMessage(), e);
            Assert.fail();
        }

    }

    @Test
    public void testComposite() {
        String rowKey = "Composite1";

        boolean bool = false;
        MutationBatch m = keyspace.prepareMutationBatch();
        ColumnListMutation<MockCompositeType> mRow = m.withRow(CF_COMPOSITE,
                rowKey);
        int columnCount = 0;
        for (char part1 = 'a'; part1 <= 'b'; part1++) {
            for (int part2 = 0; part2 < 10; part2++) {
                for (int part3 = 10; part3 < 11; part3++) {
                    bool = !bool;
                    columnCount++;
                    mRow.putEmptyColumn(
                            new MockCompositeType(Character.toString(part1),
                                    part2, part3, bool, "UTF"), null);
                }
            }
        }
        LOG.info("Created " + columnCount + " columns");
       
        try {
            m.execute();
        } catch (ConnectionException e) {
            LOG.error(e.getMessage(), e);
            Assert.fail();
        }

        OperationResult<ColumnList<MockCompositeType>> result;
        try {
            result = keyspace.prepareQuery(CF_COMPOSITE).getKey(rowKey)
                    .execute();
            Assert.assertEquals(columnCount,  result.getResult().size());
            for (Column<MockCompositeType> col : result.getResult()) {
                LOG.info("COLUMN: " + col.getName().toString());
            }
        } catch (ConnectionException e) {
            LOG.error(e.getMessage(), e);
            Assert.fail();
        }

        try {
            Column<MockCompositeType> column = keyspace
                    .prepareQuery(CF_COMPOSITE).getKey(rowKey)
                    .getColumn(new MockCompositeType("a", 0, 10, true, "UTF"))
                    .execute().getResult();
            LOG.info("Got single column: " + column.getName().toString());
        } catch (ConnectionException e) {
            LOG.error(e.getMessage(), e);
            Assert.fail();
        }

        LOG.info("Range builder");
        try {
            result = keyspace
                    .prepareQuery(CF_COMPOSITE)
                    .getKey(rowKey)
                    .withColumnRange(
                            M_SERIALIZER
                                    .buildRange()
                                    .withPrefix("a")
                                    .greaterThanEquals(1)
                                    .lessThanEquals(1)
                                    .build()).execute();
            for (Column<MockCompositeType> col : result.getResult()) {
                LOG.info("COLUMN: " + col.getName().toString());
            }
        } catch (ConnectionException e) {
            LOG.error(e.getMessage(), e);
            Assert.fail();
        }

       
        /*
         * Composite c = new Composite(); c.addComponent("String1",
         * StringSerializer.get()) .addComponent(123, IntegerSerializer.get());
         *
         * MutationBatch m = keyspace.prepareMutationBatch();
         * m.withRow(CF_COMPOSITE, "Key1") .putColumn(c, 123, null);
         *
         * try { m.execute(); } catch (ConnectionException e) { Assert.fail(); }
         *
         * try { OperationResult<Column<Composite>> result =
         * keyspace.prepareQuery(CF_COMPOSITE) .getKey("Key1") .getColumn(c)
         * .execute();
         *
         * Assert.assertEquals(123, result.getResult().getIntegerValue()); }
         * catch (ConnectionException e) { Assert.fail(); }
         */
    }

    @Test
    public void testCompositeSlice() throws ConnectionException {
        AnnotatedCompositeSerializer<MockCompositeType> ser = new AnnotatedCompositeSerializer<MockCompositeType>(
                MockCompositeType.class);

        keyspace.prepareQuery(CF_COMPOSITE)
                .getKey("Key1")
                .withColumnRange(
                        ser.makeEndpoint("sessionid1", Equality.LESS_THAN)
                                .toBytes(),
                        ser.makeEndpoint("sessionid1", Equality.GREATER_THAN)
                                .toBytes(), false, 100).execute();
    }

    @Test
    public void testIndexQueryWithPagination() {
        OperationResult<Rows<String, String>> result;
        try {
            LOG.info("************************************************** testIndexQueryWithPagination: ");

            int rowCount = 0;
            int pageCount = 0;
            IndexQuery<String, String> query = keyspace
                    .prepareQuery(CF_STANDARD1).searchWithIndex()
                    .setRowLimit(10).autoPaginateRows(true).addExpression()
                    .whereColumn("Index2").equals().value(42);

            while (!(result = query.execute()).getResult().isEmpty()) {
                pageCount++;
                rowCount += result.getResult().size();
                LOG.info("==== Block ====");
                for (Row<String, String> row : result.getResult()) {
                    LOG.info("RowKey is " + row.getKey());
                }
            }

            Assert.assertEquals(pageCount, 3);
            Assert.assertEquals(rowCount, 26);
            LOG.info("************************************************** Index query: "
                    + result.getResult().size());

        } catch (ConnectionException e) {
            LOG.error(e.getMessage(), e);
            e.printStackTrace();
            Assert.fail();
        } catch (Exception e) {
            LOG.error(e.getMessage(), e);
            e.printStackTrace();
            Assert.fail();
        }
    }

    @Test
    public void testIndexQuery() {
        OperationResult<Rows<String, String>> result;
        try {
            LOG.info("************************************************** prepareGetMultiRowIndexQuery: ");

            result = keyspace.prepareQuery(CF_STANDARD1).searchWithIndex()
                    .setStartKey("").addExpression().whereColumn("Index1")
                    .equals().value(26).execute();
            Assert.assertEquals(1, result.getResult().size());
            Assert.assertEquals("Z", result.getResult().getRowByIndex(0)
                    .getKey());
            /*
             * for (Row<String, String> row : result.getResult()) {
             * LOG.info("RowKey is " + row.getKey()); for (Column<String> column
             * : row.getColumns()) { LOG.info("  Column: " + column.getName() +
             * "=" + column.getIntegerValue()); } }
             */

            LOG.info("************************************************** Index query: "
                    + result.getResult().size());

        } catch (ConnectionException e) {
            LOG.error(e.getMessage(), e);
            e.printStackTrace();
            Assert.fail();
        } catch (Exception e) {
            LOG.error(e.getMessage(), e);
            e.printStackTrace();
            Assert.fail();
        }
    }

    @Test
    public void testPreparedIndexQuery() {
        OperationResult<Rows<String, String>> result;
        try {
            LOG.info("************************************************** prepareGetMultiRowIndexQuery: ");

            PreparedIndexExpression<String, String> clause = CF_STANDARD1
                    .newIndexClause().whereColumn("Index1").equals().value(26);

            result = keyspace.prepareQuery(CF_STANDARD1).searchWithIndex()
                    .setStartKey("")
                    .addPreparedExpressions(Arrays.asList(clause)).execute();

            for (Row<String, String> row : result.getResult()) {
                LOG.info("RowKey is " + row.getKey() + " columnCount="
                        + row.getColumns().size());
                for (Column<String> column : row.getColumns()) {
                    LOG.info("  Column: " + column.getName() + "="
                            + column.getIntegerValue());
                }
            }
            Assert.assertEquals(1, result.getResult().size());
            Assert.assertEquals("Z", result.getResult().getRowByIndex(0)
                    .getKey());

            LOG.info("************************************************** Index query: "
                    + result.getResult().size());

        } catch (ConnectionException e) {
            e.printStackTrace();
            LOG.error(e.getMessage(), e);
            Assert.fail();
        } catch (Exception e) {
            e.printStackTrace();
            LOG.error(e.getMessage(), e);
            Assert.fail();
        }
    }

    @Test
    public void testIncrementCounter() {
        long baseAmount, incrAmount = 100;
        Column<String> column;

        try {
            column = getColumnValue(CF_COUNTER1, "CounterRow1", "MyCounter");
            baseAmount = column.getLongValue();
        } catch (Exception e) {
            baseAmount = 0;
        }

        MutationBatch m = keyspace.prepareMutationBatch();
        m.withRow(CF_COUNTER1, "CounterRow1").incrementCounterColumn(
                "MyCounter", incrAmount);
        try {
            m.execute();
        } catch (ConnectionException e) {
            LOG.error(e.getMessage(), e);
            Assert.fail();
        }

        column = getColumnValue(CF_COUNTER1, "CounterRow1", "MyCounter");
        Assert.assertNotNull(column);
        Assert.assertEquals(column.getLongValue(), baseAmount + incrAmount);

        m = keyspace.prepareMutationBatch();
        m.withRow(CF_COUNTER1, "CounterRow1").incrementCounterColumn(
                "MyCounter", incrAmount);
        try {
            m.execute();
        } catch (ConnectionException e) {
            LOG.error(e.getMessage(), e);
            Assert.fail();
        }

        column = getColumnValue(CF_COUNTER1, "CounterRow1", "MyCounter");
        Assert.assertNotNull(column);
        Assert.assertEquals(column.getLongValue(), baseAmount + 2 * incrAmount);
    }

    @Test
    public void testDeleteCounter() {
        Column<String> column;
        String rowKey = "CounterRowDelete1";
        String counterName = "MyCounter";

        // Increment the column
        MutationBatch m = keyspace.prepareMutationBatch();
        m.withRow(CF_COUNTER1, rowKey).incrementCounterColumn(counterName, 1);
        try {
            m.execute();
        } catch (ConnectionException e) {
            LOG.error(e.getMessage(), e);
            Assert.fail();
        }

        // Read back the value
        column = getColumnValue(CF_COUNTER1, rowKey, counterName);
        Assert.assertNotNull(column);
        Assert.assertEquals(column.getLongValue(), 1);

        // Delete the column
        try {
            // keyspace.prepareColumnMutation(CF_COUNTER1, rowKey, counterName)
            // .deleteCounterColumn().execute();
            keyspace.prepareColumnMutation(CF_COUNTER1, rowKey, counterName)
                    .deleteCounterColumn().execute();
            /*
             * m = keyspace.prepareMutationBatch(); m.withRow(CF_COUNTER1,
             * rowKey) .deleteColumn(counterName); m.execute();
             */
        } catch (ConnectionException e) {
            LOG.error(e.getMessage(), e);
            Assert.fail();
        }

        // Try to read back
        // This should be non-existent
        column = getColumnValue(CF_COUNTER1, rowKey, counterName);
        if (column != null) {
            LOG.error("Counter has value: " + column.getLongValue());
            Assert.fail();
        }
    }

    @Test
    public void testEmptyRowKey() {
        try {
            keyspace.prepareMutationBatch().withRow(CF_STANDARD1, "");
            Assert.fail();
        }
        catch (Exception e) {
            LOG.info(e.getMessage());
        }
       
        try {
            keyspace.prepareMutationBatch().withRow(CF_STANDARD1, null);
            Assert.fail();
        }
        catch (Exception e) {
            LOG.info(e.getMessage());
        }
    }
   
    @Test
    public void testEmptyColumn() {
        ColumnListMutation<String> mutation = keyspace.prepareMutationBatch().withRow(CF_STANDARD1, "ABC");
       
        try {
            mutation.putColumn(null,  1L);
            Assert.fail();
        }
        catch (Exception e) {
            LOG.info(e.getMessage());
        }
       
        try {
            mutation.putColumn(""1L);
            Assert.fail();
        }
        catch (Exception e) {
            LOG.info(e.getMessage());
        }

        try {
            mutation.deleteColumn("");
            Assert.fail();
        }
        catch (Exception e) {
            LOG.info(e.getMessage());
        }
       
        try {
            mutation.deleteColumn(null);
            Assert.fail();
        }
        catch (Exception e) {
            LOG.info(e.getMessage());
        }
       

    }
   
    @Test
    public void testCql() {
        try {
         
            System.out.println("testCQL");
            LOG.info("CQL Test");

            MutationBatch m = keyspace.prepareMutationBatch();
            for (char keyName = 'A'; keyName <= 'Z'; keyName++) {
                String rowKey = Character.toString(keyName);
                ColumnListMutation<String> cfmStandard = m.withRow(CF_CQL, rowKey);
                for (char cName = 'a'; cName <= 'z'; cName++) {
                    cfmStandard.putColumn(Character.toString(cName),
                            (int) (cName - 'a') + 1, null);
                }
                m.execute();
            }

            OperationResult<CqlResult<String, String>> result = keyspace
                    .prepareQuery(CF_STANDARD1)
                    .withCql("SELECT * FROM cfcql;").execute();
            Assert.assertTrue(result.getResult().hasRows());
            Assert.assertEquals(26, result.getResult().getRows().size());
            Assert.assertFalse(result.getResult().hasNumber());
           
            Row<String, String> row;
           
            row = result.getResult().getRows().getRow("A");
            Assert.assertEquals("A", row.getKey());
           
            row = result.getResult().getRows().getRow("B");
            Assert.assertEquals("B", row.getKey());
           
            row = result.getResult().getRows().getRow("NonExistent");
            Assert.assertNull(row);
           
            row = result.getResult().getRows().getRowByIndex(9);
            Assert.assertEquals("I", row.getKey());
           
            for (Row<String, String> row1 : result.getResult().getRows()) {
              LOG.info("KEY***: " + row1.getKey());
            }
           
        } catch (ConnectionException e) {
            LOG.error(e.getMessage(), e);
            Assert.fail();
        }
    }

    @Test
    public void testCqlCount() {
        try {
            LOG.info("CQL Test");
            OperationResult<CqlResult<String, String>> result = keyspace
                    .prepareQuery(CF_STANDARD1)
                    .withCql("SELECT count(*) FROM Standard1 where KEY='A';")
                    .execute();

            long count = result.getResult().getRows().getRowByIndex(0).getColumns().getColumnByName("count").getLongValue();
            LOG.info("CQL Count: " + count);
        } catch (ConnectionException e) {
            LOG.error(e.getMessage(), e);
            Assert.fail();
        }
    }

    @Test
    public void testGetSingleColumn() {
        Column<String> column = getColumnValue(CF_STANDARD1, "A", "a");
        Assert.assertNotNull(column);
        Assert.assertEquals(1, column.getIntegerValue());
    }

    @Test
    public void testColumnFamilyDoesntExist() {
        ColumnFamily<String, String> cf = new ColumnFamily<String, String>(
                "DoesntExist", StringSerializer.get(), StringSerializer.get());
        OperationResult<Void> result;
        try {
            MutationBatch m = keyspace.prepareMutationBatch();
            m.withRow(cf, "Key1").putColumn("Column2", "Value2", null);
            result = m.execute();
            Assert.fail();
        } catch (ConnectionException e) {
            LOG.info(e.getMessage());
        }
    }

    @Test
    public void testKeyspaceDoesntExist() {
        AstyanaxContext<Keyspace> keyspaceContext = new AstyanaxContext.Builder()
                .forCluster(TEST_CLUSTER_NAME)
                .forKeyspace(TEST_KEYSPACE_NAME + "_DOESNT_EXIST")
                .withAstyanaxConfiguration(
                        new AstyanaxConfigurationImpl()
                                .setDiscoveryType(NodeDiscoveryType.NONE))
                .withConnectionPoolConfiguration(
                        new ConnectionPoolConfigurationImpl(TEST_CLUSTER_NAME
                                + "_" + TEST_KEYSPACE_NAME + "_DOESNT_EXIST")
                                .setMaxConnsPerHost(1).setSeeds(SEEDS))
                .buildKeyspace(ThriftFamilyFactory.getInstance());

        try {
            keyspaceContext.start();

            Keyspace ks = keyspaceContext.getEntity();

            OperationResult<Void> result = null;
            try {
                MutationBatch m = ks.prepareMutationBatch();
                m.withRow(CF_STANDARD1, "Key1").putColumn("Column2", "Value2",
                        null);
                result = m.execute();
                Assert.fail();
            } catch (ConnectionException e) {
                LOG.info(e.getMessage());
            }
        } finally {
            keyspaceContext.shutdown();
        }
    }
   

    @Test
    public void testCreateKeyspaceThatAlreadyExists() {

      String keyspaceName = TEST_KEYSPACE_NAME + "_ksAlreadyExists";

      AstyanaxContext<Keyspace> keyspaceContext = new AstyanaxContext.Builder()
      .forCluster(TEST_CLUSTER_NAME)
      .forKeyspace(keyspaceName)
      .withAstyanaxConfiguration(
          new AstyanaxConfigurationImpl()
          .setDiscoveryType(NodeDiscoveryType.NONE))
          .withConnectionPoolConfiguration(
              new ConnectionPoolConfigurationImpl(keyspaceName)
              .setMaxConnsPerHost(1).setSeeds(SEEDS))
              .buildKeyspace(ThriftFamilyFactory.getInstance());

      Keyspace ks = null;
      try {
        keyspaceContext.start();
        ks = keyspaceContext.getClient();

        Properties props = new Properties();
        props.setProperty("name", keyspaceName);
        props.setProperty("strategy_class", "SimpleStrategy");
        props.setProperty("strategy_options.replication_factor", "1");

        try {
          ks.createKeyspaceIfNotExists(props);

          KeyspaceDefinition ksDef = ks.describeKeyspace();
          Assert.assertNotNull(ksDef);

        } catch (Exception e) {
          Assert.fail(e.getMessage());
        }


        // NOW create is again.
        try {
          ks.createKeyspaceIfNotExists(props);
        } catch (Exception e) {
          Assert.fail(e.getMessage());
        }
      } finally {

        try {
          if (ks != null) {
            ks.dropKeyspace();
          }
        } catch (Exception e) {
          LOG.info(e.getMessage());
        }

        keyspaceContext.shutdown();
      }
    }


    @Test
    public void testGetSingleColumnNotExists() {
        Column<String> column = getColumnValue(CF_STANDARD1, "A",
                "DoesNotExist");
        Assert.assertNull(column);
    }

    @Test
    public void testGetSingleColumnNotExistsAsync() {
        Future<OperationResult<Column<String>>> future = null;
        try {
            future = keyspace.prepareQuery(CF_STANDARD1).getKey("A")
                    .getColumn("DoesNotExist").executeAsync();
            future.get(1000, TimeUnit.MILLISECONDS);
        } catch (ConnectionException e) {
            LOG.info("ConnectionException: " + e.getMessage());
            Assert.fail();
        } catch (InterruptedException e) {
            LOG.info(e.getMessage());
            Assert.fail();
        } catch (ExecutionException e) {
            if (e.getCause() instanceof NotFoundException)
                LOG.info(e.getCause().getMessage());
            else {
                Assert.fail(e.getMessage());
            }
        } catch (TimeoutException e) {
            future.cancel(true);
            LOG.info(e.getMessage());
            Assert.fail();
        }
    }

    @Test
    public void testGetSingleKeyNotExists() {
        Column<String> column = getColumnValue(CF_STANDARD1, "AA", "ab");
        Assert.assertNull(column);
    }

    @Test
    public void testFunctionalQuery() throws ConnectionException {
        OperationResult<ColumnList<String>> r1 = keyspace
                .prepareQuery(CF_STANDARD1).getKey("A").execute();
        Assert.assertEquals(28, r1.getResult().size());

        /*
         * OperationResult<Rows<String, String>> r2 = keyspace.prepareQuery()
         * .fromColumnFamily(CF_STANDARD1) .selectKeyRange("A", "Z", null, null,
         * 5) .execute();
         */
    }
   
    @Test
    public void testNullKeyInMutation() throws ConnectionException {
        try {
            keyspace.prepareMutationBatch()
                .withRow(CF_STANDARD1,  null)
                .putColumn("abc", "def");
           
            Assert.fail();
        }
        catch (NullPointerException e) {
           
        }
    }
   

    @Test
    public void testColumnSlice() throws ConnectionException {
        OperationResult<ColumnList<String>> r1 = keyspace
                .prepareQuery(CF_STANDARD1).getKey("A")
                .withColumnSlice("a", "b").execute();
        Assert.assertEquals(2, r1.getResult().size());
    }

    @Test
    public void testColumnRangeSlice() throws ConnectionException {
        OperationResult<ColumnList<String>> r1 = keyspace
                .prepareQuery(CF_STANDARD1)
                .getKey("A")
                .withColumnRange(
                        new RangeBuilder().setStart("a").setEnd("b")
                                .setLimit(5).build()).execute();
        Assert.assertEquals(2, r1.getResult().size());

        OperationResult<ColumnList<String>> r2 = keyspace
                .prepareQuery(CF_STANDARD1).getKey("A")
                .withColumnRange("a", null, false, 5).execute();
        Assert.assertEquals(5, r2.getResult().size());
        Assert.assertEquals("a", r2.getResult().getColumnByIndex(0).getName());

        ByteBuffer EMPTY_BUFFER = ByteBuffer.wrap(new byte[0]);
        OperationResult<ColumnList<String>> r3 = keyspace
                .prepareQuery(CF_STANDARD1).getKey("A")
                .withColumnRange(EMPTY_BUFFER, EMPTY_BUFFER, true, 5).execute();
        Assert.assertEquals(5, r3.getResult().size());
        Assert.assertEquals("z", r3.getResult().getColumnByIndex(0).getName());
    }

    @Test
    public void testGetColumnsWithPrefix() throws ConnectionException {
        OperationResult<ColumnList<String>> r = keyspace
                .prepareQuery(CF_STANDARD1)
                .getKey("Prefixes")
                .withColumnRange("Prefix1_\u00000", "Prefix1_\uffff", false,
                        Integer.MAX_VALUE).execute();
        Assert.assertEquals(2, r.getResult().size());
        Assert.assertEquals("Prefix1_a", r.getResult().getColumnByIndex(0)
                .getName());
        Assert.assertEquals("Prefix1_b", r.getResult().getColumnByIndex(1)
                .getName());
    }

    @Test
    public void testGetCounters() throws ConnectionException {
        LOG.info("Starting testGetCounters...");

        try {
            OperationResult<Column<String>> result = keyspace
                    .prepareQuery(CF_COUNTER1).getKey("CounterRow1")
                    .getColumn("TestCounter").execute();

            Long count = result.getResult().getLongValue();
            Assert.assertNotNull(count);
            Assert.assertTrue(count > 0);
        } catch (NotFoundException e) {

        }

        LOG.info("... testGetCounters done");
    }

    @Test
    public void testGetSingleKey() {
        try {
            for (char key = 'A'; key <= 'Z'; key++) {
                String keyName = Character.toString(key);
                OperationResult<ColumnList<String>> result = keyspace
                        .prepareQuery(CF_STANDARD1).getKey(keyName).execute();

                Assert.assertNotNull(result.getResult());

                System.out.printf("%s executed on %s in %d msec size=%d\n",
                        keyName, result.getHost(), result.getLatency(), result
                                .getResult().size());
            }
        } catch (ConnectionException e) {
            LOG.error(e.getMessage(), e);
            Assert.fail();
        }
    }

    @Test
    public void testGetSingleKeyAsync() {
        try {
            Future<OperationResult<ColumnList<String>>> result = keyspace
                    .prepareQuery(CF_STANDARD1).getKey("A").executeAsync();

            result.get(1000, TimeUnit.MILLISECONDS);

        } catch (ConnectionException e) {
            LOG.error(e.getMessage(), e);
            e.printStackTrace();
            Assert.fail();
        } catch (InterruptedException e) {
            LOG.error(e.getMessage(), e);
            e.printStackTrace();
            Assert.fail();
        } catch (ExecutionException e) {
            LOG.error(e.getMessage(), e);
            e.printStackTrace();
            Assert.fail();
        } catch (TimeoutException e) {
            LOG.error(e.getMessage(), e);
            e.printStackTrace();
            Assert.fail();
        }
    }

    @Test
    public void testGetAllKeysRoot() {
        LOG.info("Starting testGetAllKeysRoot...");

        try {
            List<String> keys = new ArrayList<String>();
            for (char key = 'A'; key <= 'Z'; key++) {
                String keyName = Character.toString(key);
                keys.add(keyName);
            }

            OperationResult<Rows<String, String>> result = keyspace
                    .prepareQuery(CF_STANDARD1)
                    .getKeySlice(keys.toArray(new String[keys.size()]))
                    .execute();

            Assert.assertEquals(26,  result.getResult().size());
           
            Row<String, String> row;
           
            row = result.getResult().getRow("A");
            Assert.assertEquals("A", row.getKey());
           
            row = result.getResult().getRow("B");
            Assert.assertEquals("B", row.getKey());
           
            row = result.getResult().getRow("NonExistent");
            Assert.assertNull(row);
           
            row = result.getResult().getRowByIndex(10);
            Assert.assertEquals("M", row.getKey());
            /*
             * LOG.info("Get " + result.getResult().size() + " keys"); for
             * (Row<String, String> row : result.getResult()) {
             * LOG.info(String.format("%s executed on %s in %d msec size=%d\n",
             * row.getKey(), result.getHost(), result.getLatency(),
             * row.getColumns().size())); for (Column<String> sc :
             * row.getColumns()) { LOG.info("  " + sc.getName());
             * ColumnList<Integer> subColumns =
             * sc.getSubColumns(IntegerSerializer.get()); for (Column<Integer>
             * sub : subColumns) { LOG.info("    " + sub.getName() + "=" +
             * sub.getStringValue()); } } }
             */

        } catch (ConnectionException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
            Assert.fail();
        }

        LOG.info("... testGetAllKeysRoot");
    }

    @Test
    public void testGetColumnSlice() {
        LOG.info("Starting testGetColumnSlice...");
        try {
            OperationResult<ColumnList<String>> result = keyspace
                    .prepareQuery(CF_STANDARD1)
                    .getKey("A")
                    .withColumnSlice(
                            new ColumnSlice<String>("c", "h").setLimit(5))
                    .execute();
            Assert.assertNotNull(result.getResult());
            Assert.assertEquals(5, result.getResult().size());
        } catch (ConnectionException e) {
            Assert.fail(e.getMessage());
        }

    }

    @Test
    public void testGetAllKeysPath() {
        LOG.info("Starting testGetAllKeysPath...");

        try {
            List<String> keys = new ArrayList<String>();
            for (char key = 'A'; key <= 'Z'; key++) {
                String keyName = Character.toString(key);
                keys.add(keyName);
            }

            OperationResult<Rows<String, String>> result = keyspace
                    .prepareQuery(CF_STANDARD1)
                    .getKeySlice(keys.toArray(new String[keys.size()]))
                    .execute();
            /*
             * System.out.printf("%s executed on %s in %d msec size=%d\n",
             * row.getKey(), result.getHost(), result.getLatency(),
             * row.getColumns().size());
             */

            // for (Row<String, String> row : result.getResult()) {
            // for (Column<Integer> column : row.getColumns()) {
            // System.out.println("  Column: " + column.getName());
            // }
            // }
           
            OperationResult<Map<String, Integer>> counts = keyspace
                .prepareQuery(CF_STANDARD1)
                .getKeySlice(keys.toArray(new String[keys.size()]))
                .getColumnCounts()
                .execute();
                       
            Assert.assertEquals(26, counts.getResult().size());
           
            for (Entry<String, Integer> count : counts.getResult().entrySet()) {
                Assert.assertEquals(new Integer(28), count.getValue());
            }
           
        } catch (ConnectionException e) {
            LOG.error(e.getMessage(), e);
            Assert.fail();
        }

        LOG.info("Starting testGetAllKeysPath...");
    }
   
    @Test
    public void testDeleteMultipleKeys() {
        LOG.info("Starting testDeleteMultipleKeys...");
        LOG.info("... testGetAllKeysPath");

    }

    @Test
    public void testMutationMerge() {
        MutationBatch m1 = keyspace.prepareMutationBatch();
        MutationBatch m2 = keyspace.prepareMutationBatch();
        MutationBatch m3 = keyspace.prepareMutationBatch();
        MutationBatch m4 = keyspace.prepareMutationBatch();
        MutationBatch m5 = keyspace.prepareMutationBatch();

        m1.withRow(CF_STANDARD1, "1").putColumn("1", "X", null);
        m2.withRow(CF_STANDARD1, "2").putColumn("2", "X", null)
                .putColumn("3", "X", null);
        m3.withRow(CF_STANDARD1, "3").putColumn("4", "X", null)
                .putColumn("5", "X", null).putColumn("6", "X", null);
        m4.withRow(CF_STANDARD1, "1").putColumn("7", "X", null)
                .putColumn("8", "X", null).putColumn("9", "X", null)
                .putColumn("10", "X", null);

        MutationBatch merged = keyspace.prepareMutationBatch();
        LOG.info(merged.toString());
        Assert.assertEquals(merged.getRowCount(), 0);

        merged.mergeShallow(m1);
        LOG.info(merged.toString());
        Assert.assertEquals(merged.getRowCount(), 1);

        merged.mergeShallow(m2);
        LOG.info(merged.toString());
        Assert.assertEquals(merged.getRowCount(), 2);

        merged.mergeShallow(m3);
        LOG.info(merged.toString());
        Assert.assertEquals(merged.getRowCount(), 3);

        merged.mergeShallow(m4);
        LOG.info(merged.toString());
        Assert.assertEquals(merged.getRowCount(), 3);

        merged.mergeShallow(m5);
        LOG.info(merged.toString());
        Assert.assertEquals(merged.getRowCount(), 3);
    }

    @Test
    public void testDelete() {
        LOG.info("Starting testDelete...");

        String rowKey = "DeleteMe_testDelete";

        MutationBatch m = keyspace.prepareMutationBatch();
        m.withRow(CF_STANDARD1, rowKey).putColumn("Column1", "X", null)
                .putColumn("Column2", "X", null);

        try {
            m.execute();
        } catch (ConnectionException e) {
            LOG.error(e.getMessage(), e);
            Assert.fail();
        }

        Assert.assertEquals(getColumnValue(CF_STANDARD1, rowKey, "Column1")
                .getStringValue(), "X");
        Assert.assertTrue(deleteColumn(CF_STANDARD1, rowKey, "Column1"));
        Assert.assertNull(getColumnValue(CF_STANDARD1, rowKey, "Column1"));

        LOG.info("... testDelete");
    }

    @Test
    public void testDeleteLotsOfColumns() {
        LOG.info("Starting testDelete...");

        String rowKey = "DeleteMe_testDeleteLotsOfColumns";

        int nColumns = 100;
        int pageSize = 25;

        // Insert a bunch of rows
        MutationBatch m = keyspace.prepareMutationBatch();
        ColumnListMutation<String> rm = m.withRow(CF_STANDARD1, rowKey);

        for (int i = 0; i < nColumns; i++) {
            rm.putEmptyColumn("" + i, null);
        }

        try {
            m.execute();
        } catch (ConnectionException e) {
            LOG.error(e.getMessage(), e);
            Assert.fail();
        }

        // Verify count
        try {
            int count = keyspace.prepareQuery(CF_STANDARD1)
                    .setConsistencyLevel(ConsistencyLevel.CL_QUORUM)
                    .getKey(rowKey).getCount().execute().getResult();
            Assert.assertEquals(nColumns, count);
        } catch (ConnectionException e) {
            Assert.fail(e.getMessage());
        }

        // Delete half of the columns
        m = keyspace.prepareMutationBatch().setConsistencyLevel(
                ConsistencyLevel.CL_QUORUM);
        rm = m.withRow(CF_STANDARD1, rowKey);

        for (int i = 0; i < nColumns / 2; i++) {
            rm.deleteColumn("" + i);
        }

        try {
            m.execute();
        } catch (ConnectionException e) {
            Assert.fail(e.getMessage());
        }

        // Verify count
        try {
            int count = getRowColumnCount(CF_STANDARD1, rowKey);
            Assert.assertEquals(nColumns / 2, count);

            count = getRowColumnCountWithPagination(CF_STANDARD1, rowKey,
                    pageSize);
            Assert.assertEquals(nColumns / 2, count);

        } catch (ConnectionException e) {
            Assert.fail(e.getMessage());
        }

        // Delete all of the columns
        m = keyspace.prepareMutationBatch().setConsistencyLevel(
                ConsistencyLevel.CL_QUORUM);
        rm = m.withRow(CF_STANDARD1, rowKey);

        for (int i = 0; i < nColumns; i++) {
            rm.deleteColumn("" + i);
        }

        try {
            m.execute();
        } catch (ConnectionException e) {
            Assert.fail(e.getMessage());
        }

        // Verify count
        try {
            int count = getRowColumnCount(CF_STANDARD1, rowKey);
            Assert.assertEquals(0, count);

            count = getRowColumnCountWithPagination(CF_STANDARD1, rowKey,
                    pageSize);
            Assert.assertEquals(0, count);
        } catch (ConnectionException e) {
            Assert.fail(e.getMessage());
        }

        LOG.info("... testDelete");
    }

    private <K, C> int getRowColumnCount(ColumnFamily<K, C> cf, K rowKey)
            throws ConnectionException {
        int count = keyspace.prepareQuery(cf)
                .setConsistencyLevel(ConsistencyLevel.CL_QUORUM).getKey(rowKey)
                .getCount().execute().getResult();

        return count;
    }

    private <K, C> int getRowColumnCountWithPagination(ColumnFamily<K, C> cf,
            K rowKey, int pageSize) throws ConnectionException {
        RowQuery<K, C> query = keyspace.prepareQuery(cf)
                .setConsistencyLevel(ConsistencyLevel.CL_QUORUM).getKey(rowKey)
                .withColumnRange(new RangeBuilder().setLimit(pageSize).build())
                .autoPaginate(true);

        ColumnList<C> result;
        int count = 0;
        while (!(result = query.execute().getResult()).isEmpty()) {
            count += result.size();
        }

        return count;
    }

    @Test
    public void testCsvLoader() {
        StringBuilder sb = new StringBuilder()
                .append("key, firstname, lastname, age, test\n")
                .append("1, eran, landau, 34, a\n")
                .append("2, netta, landau, 33, b\n")
                .append("3, arielle, landau, 6, c\n")
                .append("4, eyal, landau, 2, d\n");

        RecordReader reader = new CsvRecordReader(new StringReader(
                sb.toString()));
        RecordWriter writer = new ColumnarRecordWriter(keyspace,
                CF_USERS.getName());

        try {
            reader.start();
            writer.start();
            List<Pair<String, String>> record = null;
            while (null != (record = reader.next())) {
                writer.write(record);
            }
        } catch (IOException e) {
            LOG.error(e.getMessage(), e);
            Assert.fail();
        } catch (ConnectionException e) {
            LOG.error(e.getMessage(), e);
            Assert.fail();
        } finally {
            reader.shutdown();
            writer.shutdown();
        }

        try {
            Rows<Long, String> rows = keyspace.prepareQuery(CF_USERS)
                    .getAllRows().execute().getResult();
            new JsonRowsWriter(new PrintWriter(System.out, true),
                    keyspace.getSerializerPackage(CF_USERS.getName(), false))
                    .setRowsAsArray(false).write(rows);

            new JsonRowsWriter(new PrintWriter(System.out, true),
                    keyspace.getSerializerPackage(CF_USERS.getName(), false))
                    .setRowsAsArray(true).setCountName("_count_")
                    .setRowsName("_rows_").setNamesName("_names_").write(rows);

            new JsonRowsWriter(new PrintWriter(System.out, true),
                    keyspace.getSerializerPackage(CF_USERS.getName(), false))
                    .setRowsAsArray(true).setDynamicColumnNames(true)
                    .write(rows);

            new JsonRowsWriter(new PrintWriter(System.out, true),
                    keyspace.getSerializerPackage(CF_USERS.getName(), false))
                    .setRowsAsArray(true).setIgnoreUndefinedColumns(true)
                    .write(rows);

            new JsonRowsWriter(new PrintWriter(System.out, true),
                    keyspace.getSerializerPackage(CF_USERS.getName(), false))
                    .setRowsAsArray(true)
                    .setFixedColumnNames("firstname", "lastname")
                    .setIgnoreUndefinedColumns(true).write(rows);

            LOG.info("******* COLUMNS AS ROWS ********");
            new JsonRowsWriter(new PrintWriter(System.out, true),
                    keyspace.getSerializerPackage(CF_USERS.getName(), false))
                    .setRowsAsArray(true).setColumnsAsRows(true).write(rows);

        } catch (ConnectionException e) {
            LOG.error(e.getMessage(), e);
            Assert.fail();
        } catch (Exception e) {
            LOG.error(e.getMessage(), e);
            Assert.fail();
        }
    }

    @Test
    public void testCsvLoaderWithCustomSerializers() {
        StringBuilder sb = new StringBuilder()
                .append("key, firstname, lastname, age, test\n")
                .append("1, eran, landau, 34, a\n")
                .append("2, netta, landau, 33, b\n")
                .append("3, arielle, landau, 6, c\n")
                .append("4, eyal, landau, 2, d\n");

        SerializerPackageImpl pkg = null;
        try {
            pkg = new SerializerPackageImpl().setKeyType("LongType")
                    .setColumnNameType("UTF8Type")
                    .setDefaultValueType("UTF8Type")
                    .setValueType("firstname", "UTF8Type")
                    .setValueType("lastname", "UTF8Type")
                    .setValueType("age", "LongType");
        } catch (UnknownComparatorException e) {
            Assert.fail();
        }

        RecordReader reader = new CsvRecordReader(new StringReader(
                sb.toString()));
        RecordWriter writer = new ColumnarRecordWriter(keyspace,
                CF_USERS.getName(), pkg);

        try {
            reader.start();
            writer.start();
            List<Pair<String, String>> record = null;
            while (null != (record = reader.next())) {
                writer.write(record);
            }
        } catch (IOException e) {
            LOG.error(e.getMessage(), e);
            Assert.fail();
        } catch (ConnectionException e) {
            LOG.error(e.getMessage(), e);
            Assert.fail();
        } finally {
            reader.shutdown();
            writer.shutdown();
        }

        try {
            Rows<Long, String> rows = keyspace.prepareQuery(CF_USERS)
                    .getAllRows().execute().getResult();
            new JsonRowsWriter(new PrintWriter(System.out, true),
                    keyspace.getSerializerPackage(CF_USERS.getName(), false))
                    .setRowsAsArray(false).write(rows);
        } catch (ConnectionException e) {
            LOG.error(e.getMessage(), e);
            Assert.fail();
        } catch (Exception e) {
            LOG.error(e.getMessage(), e);
            Assert.fail();
        }
    }

    @Test
    public void testCsvLoaderComposite() {
        StringBuilder sb = new StringBuilder().append("key, column, value\n")
                .append("1, a:1, 1a1\n").append("1, b:1, 2b1\n")
                .append("2, a:1, 3a1\n").append("3, a:1, 4a1\n");

        CsvColumnReader reader = new CsvColumnReader(new StringReader(
                sb.toString()));
        RecordWriter writer = new ColumnarRecordWriter(keyspace,
                CF_COMPOSITE_CSV.getName());

        try {
            reader.start();
            writer.start();
            List<Pair<String, String>> record = null;
            while (null != (record = reader.next())) {
                writer.write(record);
            }
        } catch (IOException e) {
            LOG.error(e.getMessage(), e);
            Assert.fail();
        } catch (ConnectionException e) {
            LOG.error(e.getMessage(), e);
            Assert.fail();
        } finally {
            reader.shutdown();
            writer.shutdown();
        }

        try {
            Rows<ByteBuffer, ByteBuffer> rows = keyspace
                    .prepareQuery(CF_COMPOSITE_CSV).getAllRows().execute()
                    .getResult();
            new JsonRowsWriter(new PrintWriter(System.out, true),
                    keyspace.getSerializerPackage(CF_COMPOSITE_CSV.getName(),
                            false)).setRowsAsArray(false).write(rows);

            new JsonRowsWriter(new PrintWriter(System.out, true),
                    keyspace.getSerializerPackage(CF_COMPOSITE_CSV.getName(),
                            false)).setRowsAsArray(true)
                    .setCountName("_count_").setRowsName("_rows_")
                    .setNamesName("_names_").write(rows);

            new JsonRowsWriter(new PrintWriter(System.out, true),
                    keyspace.getSerializerPackage(CF_COMPOSITE_CSV.getName(),
                            false)).setRowsAsArray(true)
                    .setDynamicColumnNames(true).write(rows);

            new JsonRowsWriter(new PrintWriter(System.out, true),
                    keyspace.getSerializerPackage(CF_COMPOSITE_CSV.getName(),
                            false)).setRowsAsArray(true)
                    .setIgnoreUndefinedColumns(true).write(rows);

            LOG.info("******* COLUMNS AS ROWS ********");
            new JsonRowsWriter(new PrintWriter(System.out, true),
                    keyspace.getSerializerPackage(CF_COMPOSITE_CSV.getName(),
                            false)).setRowsAsArray(true).setColumnsAsRows(true)
                    .write(rows);

        } catch (ConnectionException e) {
            LOG.error(e.getMessage(), e);
            Assert.fail();
        } catch (Exception e) {
            LOG.error(e.getMessage(), e);
            Assert.fail();
        }
    }

    @Test
    public void testTtlValues() throws Exception {
        MutationBatch mb = keyspace.prepareMutationBatch();
        mb.withRow(CF_TTL, "row")
          .putColumn("TTL0", "TTL0", 0)
          .putColumn("TTLNULL", "TTLNULL", null)
          .putColumn("TTL1", "TTL1", 1);
       
        mb.execute();
       
        Thread.sleep(2000);
       
        ColumnList<String> result = keyspace.prepareQuery(CF_TTL)
            .getRow("row")
            .execute().getResult();
      
        Assert.assertEquals(2,  result.size());
        Assert.assertNotNull(result.getColumnByName("TTL0"));
        Assert.assertNotNull(result.getColumnByName("TTLNULL"));
    }
   
    @Test
    public void testCluster() {
        AstyanaxContext<Cluster> clusterContext = new AstyanaxContext.Builder()
                .forCluster(TEST_CLUSTER_NAME)
                .withAstyanaxConfiguration(new AstyanaxConfigurationImpl())
                .withConnectionPoolConfiguration(
                        new ConnectionPoolConfigurationImpl(TEST_CLUSTER_NAME)
                                .setSeeds(SEEDS).setSocketTimeout(30000)
                                .setMaxTimeoutWhenExhausted(200)
                                .setMaxConnsPerHost(1))
                .withConnectionPoolMonitor(new CountingConnectionPoolMonitor())
                .buildCluster(ThriftFamilyFactory.getInstance());

        clusterContext.start();
        Cluster cluster = clusterContext.getEntity();

        try {
            cluster.describeClusterName();
            List<KeyspaceDefinition> keyspaces = cluster.describeKeyspaces();
            LOG.info("Keyspace count:" + keyspaces.size());
            for (KeyspaceDefinition keyspace : keyspaces) {
                LOG.info("Keyspace: " + keyspace.getName());
            }
            Assert.assertNotNull(keyspaces);
            Assert.assertTrue(keyspaces.size() > 0);
        } catch (Exception e) {
            Assert.fail(e.getMessage());
        } finally {
            clusterContext.shutdown();
        }
    }

    @Test
    public void testPrefixedSerializer() {
        ColumnFamily<String, String> cf = new ColumnFamily<String, String>(
                "Standard1", StringSerializer.get(), StringSerializer.get());

        ColumnFamily<String, String> cf1 = new ColumnFamily<String, String>(
                "Standard1", new PrefixedSerializer<String, String>("Prefix1_",
                        StringSerializer.get(), StringSerializer.get()),
                StringSerializer.get());

        ColumnFamily<String, String> cf2 = new ColumnFamily<String, String>(
                "Standard1", new PrefixedSerializer<String, String>("Prefix2_",
                        StringSerializer.get(), StringSerializer.get()),
                StringSerializer.get());

        MutationBatch m = keyspace.prepareMutationBatch();
        m.withRow(cf1, "A").putColumn("Column1", "Value1", null);
        m.withRow(cf2, "A").putColumn("Column1", "Value2", null);

        try {
            m.execute();
        } catch (ConnectionException e) {
            LOG.error(e.getMessage(), e);
            Assert.fail();
        }

        try {
            OperationResult<ColumnList<String>> result = keyspace
                    .prepareQuery(cf).getKey("Prefix1_A").execute();
            Assert.assertEquals(1, result.getResult().size());
            Column<String> c = result.getResult().getColumnByName("Column1");
            Assert.assertEquals("Value1", c.getStringValue());
        } catch (ConnectionException e) {
            LOG.error(e.getMessage(), e);
            Assert.fail();
        }

        try {
            OperationResult<ColumnList<String>> result = keyspace
                    .prepareQuery(cf).getKey("Prefix2_A").execute();
            Assert.assertEquals(1, result.getResult().size());
            Column<String> c = result.getResult().getColumnByName("Column1");
            Assert.assertEquals("Value2", c.getStringValue());
        } catch (ConnectionException e) {
            LOG.error(e.getMessage(), e);
            Assert.fail();
        }

    }

    @Test
    public void testWithRetry() {
        String clusterName = TEST_CLUSTER_NAME + "_DOESNT_EXIST";
        AstyanaxContext<Keyspace> keyspaceContext = new AstyanaxContext.Builder()
                .forCluster(clusterName)
                .forKeyspace(TEST_KEYSPACE_NAME)
                .withAstyanaxConfiguration(
                        new AstyanaxConfigurationImpl()
                                .setDiscoveryType(NodeDiscoveryType.NONE))
                .withConnectionPoolConfiguration(
                        new ConnectionPoolConfigurationImpl(clusterName + "_"
                                + TEST_KEYSPACE_NAME).setMaxConnsPerHost(1)
                                .setSeeds(SEEDS))
                .withConnectionPoolMonitor(new CountingConnectionPoolMonitor())
                .buildKeyspace(ThriftFamilyFactory.getInstance());

        ColumnFamily<String, String> cf = new ColumnFamily<String, String>(
                "DoesntExist", StringSerializer.get(), StringSerializer.get());
        try {
            MutationBatch m = keyspaceContext.getEntity()
                    .prepareMutationBatch()
                    .withRetryPolicy(new ExponentialBackoff(10, 3));
            m.withRow(cf, "Key1").putColumn("Column2", "Value2", null);
            m.execute();
            Assert.fail();
        } catch (ConnectionException e) {
            LOG.info(e.getMessage());
        }
    }
   
    // This test confirms the fix for https://github.com/Netflix/astyanax/issues/170
    @Test
    public void columnAutoPaginateTest() throws Exception {
        final ColumnFamily<String, UUID> CF1 = ColumnFamily.newColumnFamily("CF1", StringSerializer.get(),
                TimeUUIDSerializer.get());
        final ColumnFamily<String, String> CF2 = ColumnFamily.newColumnFamily("CF2", StringSerializer.get(),
                StringSerializer.get());
       
        keyspace.createColumnFamily(CF1, null);
        Thread.sleep(3000);
        keyspace.createColumnFamily(CF2, null);
        Thread.sleep(3000);
   
        // query on another column family with different column key type
        // does not seem to work after the first query
        keyspace.prepareQuery(CF2).getKey("anything").execute();

        MutationBatch m = keyspace.prepareMutationBatch();
        m.withRow(CF1, "test").putColumn(TimeUUIDUtils.getUniqueTimeUUIDinMillis(), "value1", null);
        m.execute();
   
        RowQuery<String, UUID> query = keyspace.prepareQuery(CF1).getKey("test").autoPaginate(true);
   
        // Adding a column range removes the problem
        // query.withColumnRange(new RangeBuilder().build());
   
        ColumnList<UUID> columns = query.execute().getResult();
       
        keyspace.prepareQuery(CF2).getKey("anything").execute();
    }
   
    @Test
    public void testDDLWithProperties() throws Exception {
        String keyspaceName = "DDLPropertiesKeyspace";
       
        Properties props = new Properties();
        props.put("name", keyspaceName + "_wrong");
        props.put("strategy_class", "SimpleStrategy");
        props.put("strategy_options.replication_factor", "1");
       
        AstyanaxContext<Keyspace> kc = new AstyanaxContext.Builder()
            .forCluster(TEST_CLUSTER_NAME)
            .forKeyspace(keyspaceName)
            .withAstyanaxConfiguration(
                    new AstyanaxConfigurationImpl()
                            .setDiscoveryType(NodeDiscoveryType.RING_DESCRIBE)
                            .setConnectionPoolType(ConnectionPoolType.ROUND_ROBIN)
                            .setDiscoveryDelayInSeconds(60000))
            .withConnectionPoolConfiguration(
                    new ConnectionPoolConfigurationImpl(TEST_CLUSTER_NAME + "_" + keyspaceName)
                            .setSocketTimeout(30000)
                            .setMaxTimeoutWhenExhausted(2000)
                            .setMaxConnsPerHost(20)
                            .setInitConnsPerHost(10)
                            .setSeeds(SEEDS)
                            )
            .withConnectionPoolMonitor(new CountingConnectionPoolMonitor())
            .buildKeyspace(ThriftFamilyFactory.getInstance());
       
        kc.start();

        Keyspace ks = kc.getClient();
       
        try {
            ks.createKeyspace(props);
            Assert.fail("Should have gotten name mismatch error");
        }
        catch (BadRequestException e) {
            LOG.info(e.getMessage());
        }
       
        props.put("name", keyspaceName);
        ks.createKeyspace(props);
       
        Properties props1 = ks.getKeyspaceProperties();
       
        LOG.info(props.toString());
        LOG.info(props1.toString());
    }
   
    private boolean deleteColumn(ColumnFamily<String, String> cf,
            String rowKey, String columnName) {
        MutationBatch m = keyspace.prepareMutationBatch();
        m.withRow(cf, rowKey).deleteColumn(columnName);

        try {
            m.execute();
            return true;
        } catch (ConnectionException e) {
            LOG.error(e.getMessage(), e);
            Assert.fail();
            return false;
        }
    }

    private Column<String> getColumnValue(ColumnFamily<String, String> cf,
            String rowKey, String columnName) {
        OperationResult<Column<String>> result;
        try {
            result = keyspace.prepareQuery(cf).getKey(rowKey)
                    .getColumn(columnName).execute();
            return result.getResult();
        } catch (NotFoundException e) {
            LOG.info(e.getMessage());
            return null;
        } catch (ConnectionException e) {
            LOG.error(e.getMessage(), e);
            Assert.fail();
            return null;
        }
    }
}
TOP

Related Classes of com.netflix.astyanax.thrift.ThriftKeyspaceImplTest$UserInfo

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.