/*
* Copyright (c) 2014 Carman Consulting, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.carmanconsulting.cassidy.index.def;
import com.carmanconsulting.cassidy.index.IndexDao;
import com.carmanconsulting.cassidy.index.IndexValue;
import com.carmanconsulting.cassidy.index.Indexer;
import com.carmanconsulting.cassidy.util.CassidyUtils;
import com.carmanconsulting.cassidy.util.ColumnFamilyInitializer;
import me.prettyprint.cassandra.serializers.DynamicCompositeSerializer;
import me.prettyprint.cassandra.serializers.LongSerializer;
import me.prettyprint.cassandra.serializers.StringSerializer;
import me.prettyprint.hector.api.Cluster;
import me.prettyprint.hector.api.Keyspace;
import me.prettyprint.hector.api.Serializer;
import me.prettyprint.hector.api.beans.AbstractComposite;
import me.prettyprint.hector.api.beans.ColumnSlice;
import me.prettyprint.hector.api.beans.DynamicComposite;
import me.prettyprint.hector.api.beans.HColumn;
import me.prettyprint.hector.api.ddl.ColumnFamilyDefinition;
import me.prettyprint.hector.api.ddl.ComparatorType;
import me.prettyprint.hector.api.factory.HFactory;
import me.prettyprint.hector.api.mutation.Mutator;
import me.prettyprint.hector.api.query.QueryResult;
import me.prettyprint.hector.api.query.SliceQuery;
import java.util.LinkedList;
import java.util.List;
/**
* Implements the <a href="http://anuff.com/2010/07/secondary-indexes-in-cassandra/">indexing strategy</a> prescribed
* by Ed Anuff in his blog.
* <p/>
* The supplied {@link com.carmanconsulting.cassidy.index.Indexer} is used to extract "index components" from each
* object to be indexed.
* <p/>
* The indexing requires two column families to accommodate Cassandra's "eventual consistency." The "indexColumnFamily"
* has the following structure:
* <p/>
* <table>
* <tr>
* <th>Key</th>
* <td>indexName</td>
* </tr>
* <tr>
* <th>Column Names</th>
* <td>DynamicComposite(index components, item key, timestamp)</td>
* </tr>
* <tr>
* <th>Column Values</th>
* <td>item key</td>
* </tr>
* </table>
* <p/>
* The "entryColumnFamily" has the following structure:
* <p/>
* <table>
* <tr>
* <th>Key</th>
* <tr>DynamicComposite(item key, indexName)</tr>
* </tr>
* <tr>
* <th>Column Names</th>
* <td>timestamp</td>
* </tr>
* <tr>
* <th>Column Values</th>
* <td>index components</td>
* </tr>
* </table>
*
* @param <K> the key type
* @param <V> the value type
*/
public class DefaultIndexDao<K, V> implements ColumnFamilyInitializer, IndexDao<K,V> {
//----------------------------------------------------------------------------------------------------------------------
// Fields
//----------------------------------------------------------------------------------------------------------------------
public static final DynamicCompositeSerializer DYNAMIC_COMPOSITE_SERIALIZER = DynamicCompositeSerializer.get();
public static final LongSerializer LONG_SERIALIZER = LongSerializer.get();
public static final StringSerializer STRING_SERIALIZER = StringSerializer.get();
private final Keyspace keyspace;
private final String indexColumnFamily;
private final String entryColumnFamily;
private final String indexName;
private final Indexer<V> indexer;
private final Serializer<K> keySerializer;
//----------------------------------------------------------------------------------------------------------------------
// Constructors
//----------------------------------------------------------------------------------------------------------------------
public DefaultIndexDao(Keyspace keyspace, String indexColumnFamily, String entryColumnFamily, String indexName, Serializer<K> keySerializer, Indexer<V> indexer) {
this.keyspace = keyspace;
this.indexColumnFamily = indexColumnFamily;
this.entryColumnFamily = entryColumnFamily;
this.indexName = indexName;
this.keySerializer = keySerializer;
this.indexer = indexer;
}
public DefaultIndexDao(Keyspace keyspace, String indexColumnFamily, String entryColumnFamily, String indexName, Class<K> keyType, Indexer<V> indexer) {
this(keyspace, indexColumnFamily, entryColumnFamily, indexName, CassidyUtils.inferSerializer(keyType), indexer);
}
//----------------------------------------------------------------------------------------------------------------------
// ColumnFamilyInitializer Implementation
//----------------------------------------------------------------------------------------------------------------------
@Override
public void initializeColumnFamilies(Cluster cluster) {
ColumnFamilyDefinition indexColumnFamilyDef = HFactory.createColumnFamilyDefinition(keyspace.getKeyspaceName(), indexColumnFamily, ComparatorType.DYNAMICCOMPOSITETYPE);
indexColumnFamilyDef.setComparatorTypeAlias(DynamicComposite.DEFAULT_DYNAMIC_COMPOSITE_ALIASES);
CassidyUtils.createColumnFamily(cluster, indexColumnFamilyDef);
ColumnFamilyDefinition entryColumnFamilyDef = HFactory.createColumnFamilyDefinition(keyspace.getKeyspaceName(), entryColumnFamily, ComparatorType.INTEGERTYPE);
CassidyUtils.createColumnFamily(cluster, entryColumnFamilyDef);
}
//----------------------------------------------------------------------------------------------------------------------
// IndexDao Implementation
//----------------------------------------------------------------------------------------------------------------------
@Override
public void delete(K itemKey) {
final Mutator<String> indexMutator = indexMutator();
final Mutator<DynamicComposite> entryMutator = entryMutator();
performCleanup(itemKey, indexMutator, entryMutator);
indexMutator.execute();
entryMutator.execute();
}
@Override
public List<K> find(Object... values) {
DynamicComposite start = new DynamicComposite();
DynamicComposite end = new DynamicComposite();
for (int i = 0; i < values.length; i++) {
Object value = values[i];
start.addComponent(i, value, AbstractComposite.ComponentEquality.EQUAL);
end.addComponent(i, value, i == values.length - 1 ? AbstractComposite.ComponentEquality.GREATER_THAN_EQUAL : AbstractComposite.ComponentEquality.EQUAL);
}
final SliceQuery<String, DynamicComposite, K> query = HFactory.createSliceQuery(keyspace, StringSerializer.get(), DynamicCompositeSerializer.get(), keySerializer);
query.setKey(indexName);
query.setColumnFamily(indexColumnFamily);
query.setRange(start, end, false, Integer.MAX_VALUE);
List<K> keys = new LinkedList<>();
for (HColumn<DynamicComposite, K> column : query.execute().get().getColumns()) {
keys.add(column.getValue());
}
return keys;
}
@Override
public void index(K itemKey, V item) {
final long timestamp = HFactory.createClock();
final Mutator<String> indexMutator = indexMutator();
final Mutator<DynamicComposite> entryMutator = entryMutator();
performCleanup(itemKey, indexMutator, entryMutator);
List<IndexValue<?>> indexValues = indexer.index(item);
insertIndexColumn(itemKey, timestamp, indexValues, indexMutator);
insertEntryColumn(itemKey, timestamp, indexValues, entryMutator);
indexMutator.execute();
entryMutator.execute();
}
//----------------------------------------------------------------------------------------------------------------------
// Other Methods
//----------------------------------------------------------------------------------------------------------------------
private Mutator<DynamicComposite> entryMutator() {
return HFactory.createMutator(keyspace, DYNAMIC_COMPOSITE_SERIALIZER);
}
private Mutator<String> indexMutator() {
return HFactory.createMutator(keyspace, STRING_SERIALIZER);
}
private void insertEntryColumn(K itemKey, long timestamp, List<IndexValue<?>> indexValues, Mutator<DynamicComposite> entryMutator) {
DynamicComposite entryValue = new DynamicComposite();
IndexValue.addAll(indexValues, entryValue);
final HColumn<Long, DynamicComposite> entryColumn = HFactory.createColumn(timestamp, entryValue, LONG_SERIALIZER, DYNAMIC_COMPOSITE_SERIALIZER);
entryMutator.addInsertion(entryKey(itemKey), entryColumnFamily, entryColumn);
}
private void insertIndexColumn(K itemKey, long timestamp, List<IndexValue<?>> indexValues, Mutator<String> indexMutator) {
DynamicComposite indexColumnName = new DynamicComposite();
IndexValue.addAll(indexValues, indexColumnName);
indexColumnName.addComponent(itemKey, keySerializer);
indexColumnName.addComponent(timestamp, LONG_SERIALIZER);
final HColumn<DynamicComposite, K> indexColumn = HFactory.createColumn(indexColumnName, itemKey, DYNAMIC_COMPOSITE_SERIALIZER, keySerializer);
indexMutator.addInsertion(indexName, indexColumnFamily, indexColumn);
}
private void performCleanup(K key, Mutator<String> indexMutator, Mutator<DynamicComposite> entryMutator) {
final DynamicComposite entryKey = entryKey(key);
final List<HColumn<Long, DynamicComposite>> entries = getEntries(entryKey);
for (HColumn<Long, DynamicComposite> entry : entries) {
final Long entryTimestamp = entry.getName();
DynamicComposite indexColumnName = new DynamicComposite();
appendComponents(entry.getValue(), indexColumnName);
indexColumnName.addComponent(key, keySerializer);
indexColumnName.addComponent(entryTimestamp, LONG_SERIALIZER);
indexMutator.addDeletion(indexName, indexColumnFamily, indexColumnName, DYNAMIC_COMPOSITE_SERIALIZER);
entryMutator.addDeletion(entryKey, entryColumnFamily, entryTimestamp, LONG_SERIALIZER);
}
}
private DynamicComposite entryKey(K itemKey) {
DynamicComposite entryKey = new DynamicComposite();
entryKey.addComponent(itemKey, keySerializer);
entryKey.addComponent(indexName, STRING_SERIALIZER);
return entryKey;
}
private List<HColumn<Long, DynamicComposite>> getEntries(DynamicComposite entryKey) {
final SliceQuery<DynamicComposite, Long, DynamicComposite> entryQuery = HFactory.createSliceQuery(keyspace, DYNAMIC_COMPOSITE_SERIALIZER, LONG_SERIALIZER, DYNAMIC_COMPOSITE_SERIALIZER);
entryQuery.setKey(entryKey);
entryQuery.setColumnFamily(entryColumnFamily);
entryQuery.setRange(null, null, false, Integer.MAX_VALUE);
final QueryResult<ColumnSlice<Long, DynamicComposite>> result = entryQuery.execute();
final ColumnSlice<Long, DynamicComposite> slice = result.get();
return slice.getColumns();
}
private void appendComponents(DynamicComposite src, DynamicComposite dest) {
for (AbstractComposite.Component<?> component : src.getComponents()) {
appendComponent(dest, component);
}
}
private <T> void appendComponent(DynamicComposite composite, AbstractComposite.Component<T> component) {
composite.addComponent(component.getValue(), component.getSerializer());
}
}