this.serializer = serializer;
this.eventCFName = eventCFName;
this.snapshotCFName = eventCFName + "Snapshot";
//connect to cluster
Cluster cluster = HFactory.getOrCreateCluster(clusterName, hosts);
//dynamically create keyspace
KeyspaceDefinition keyspaceDefinition = cluster.describeKeyspace(keyspaceName);
if (keyspaceDefinition == null) {
ColumnFamilyDefinition cfDef = HFactory.createColumnFamilyDefinition(
keyspaceName, eventCFName, ComparatorType.COMPOSITETYPE);
cfDef.setComparatorTypeAlias("(LongType, UTF8Type)");
ColumnFamilyDefinition snapshotCFDef = HFactory.createColumnFamilyDefinition(
keyspaceName, snapshotCFName, ComparatorType.COMPOSITETYPE);
snapshotCFDef.setComparatorTypeAlias("(LongType, UTF8Type)");
keyspaceDefinition = HFactory.createKeyspaceDefinition(
keyspaceName, ThriftKsDef.DEF_STRATEGY_CLASS, 1, Arrays.asList(cfDef, snapshotCFDef));
cluster.addKeyspace(keyspaceDefinition, true);
}
keyspace = HFactory.createKeyspace(keyspaceName, cluster);
eventTemplate = new ThriftColumnFamilyTemplate<String, Composite>(
keyspace, eventCFName, StringSerializer.get(), CompositeSerializer.get());