Package com.carmanconsulting.cassidy.index.def

Source Code of com.carmanconsulting.cassidy.index.def.DefaultIndexDao

/*
* Copyright (c) 2014 Carman Consulting, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*   http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.carmanconsulting.cassidy.index.def;

import com.carmanconsulting.cassidy.index.IndexDao;
import com.carmanconsulting.cassidy.index.IndexValue;
import com.carmanconsulting.cassidy.index.Indexer;
import com.carmanconsulting.cassidy.util.CassidyUtils;
import com.carmanconsulting.cassidy.util.ColumnFamilyInitializer;
import me.prettyprint.cassandra.serializers.DynamicCompositeSerializer;
import me.prettyprint.cassandra.serializers.LongSerializer;
import me.prettyprint.cassandra.serializers.StringSerializer;
import me.prettyprint.hector.api.Cluster;
import me.prettyprint.hector.api.Keyspace;
import me.prettyprint.hector.api.Serializer;
import me.prettyprint.hector.api.beans.AbstractComposite;
import me.prettyprint.hector.api.beans.ColumnSlice;
import me.prettyprint.hector.api.beans.DynamicComposite;
import me.prettyprint.hector.api.beans.HColumn;
import me.prettyprint.hector.api.ddl.ColumnFamilyDefinition;
import me.prettyprint.hector.api.ddl.ComparatorType;
import me.prettyprint.hector.api.factory.HFactory;
import me.prettyprint.hector.api.mutation.Mutator;
import me.prettyprint.hector.api.query.QueryResult;
import me.prettyprint.hector.api.query.SliceQuery;

import java.util.LinkedList;
import java.util.List;

/**
* Implements the <a href="http://anuff.com/2010/07/secondary-indexes-in-cassandra/">indexing strategy</a> prescribed
* by Ed Anuff in his blog.
* <p/>
* The supplied {@link com.carmanconsulting.cassidy.index.Indexer} is used to extract "index components" from each
* object to be indexed.
* <p/>
* The indexing requires two column families to accommodate Cassandra's "eventual consistency."  The "indexColumnFamily"
* has the following structure:
* <p/>
* <table>
* <tr>
* <th>Key</th>
* <td>indexName</td>
* </tr>
* <tr>
* <th>Column Names</th>
* <td>DynamicComposite(index components, item key, timestamp)</td>
* </tr>
* <tr>
* <th>Column Values</th>
* <td>item key</td>
* </tr>
* </table>
* <p/>
* The "entryColumnFamily" has the following structure:
* <p/>
* <table>
* <tr>
* <th>Key</th>
* <tr>DynamicComposite(item key, indexName)</tr>
* </tr>
* <tr>
* <th>Column Names</th>
* <td>timestamp</td>
* </tr>
* <tr>
* <th>Column Values</th>
* <td>index components</td>
* </tr>
* </table>
*
* @param <K> the key type
* @param <V> the value type
*/
public class DefaultIndexDao<K, V> implements ColumnFamilyInitializer, IndexDao<K,V> {
//----------------------------------------------------------------------------------------------------------------------
// Fields
//----------------------------------------------------------------------------------------------------------------------

    public static final DynamicCompositeSerializer DYNAMIC_COMPOSITE_SERIALIZER = DynamicCompositeSerializer.get();
    public static final LongSerializer LONG_SERIALIZER = LongSerializer.get();
    public static final StringSerializer STRING_SERIALIZER = StringSerializer.get();
    private final Keyspace keyspace;
    private final String indexColumnFamily;
    private final String entryColumnFamily;
    private final String indexName;
    private final Indexer<V> indexer;
    private final Serializer<K> keySerializer;

//----------------------------------------------------------------------------------------------------------------------
// Constructors
//----------------------------------------------------------------------------------------------------------------------

    public DefaultIndexDao(Keyspace keyspace, String indexColumnFamily, String entryColumnFamily, String indexName, Serializer<K> keySerializer, Indexer<V> indexer) {
        this.keyspace = keyspace;
        this.indexColumnFamily = indexColumnFamily;
        this.entryColumnFamily = entryColumnFamily;
        this.indexName = indexName;
        this.keySerializer = keySerializer;
        this.indexer = indexer;
    }

    public DefaultIndexDao(Keyspace keyspace, String indexColumnFamily, String entryColumnFamily, String indexName, Class<K> keyType, Indexer<V> indexer) {
        this(keyspace, indexColumnFamily, entryColumnFamily, indexName, CassidyUtils.inferSerializer(keyType), indexer);
    }

//----------------------------------------------------------------------------------------------------------------------
// ColumnFamilyInitializer Implementation
//----------------------------------------------------------------------------------------------------------------------

    @Override
    public void initializeColumnFamilies(Cluster cluster) {
        ColumnFamilyDefinition indexColumnFamilyDef = HFactory.createColumnFamilyDefinition(keyspace.getKeyspaceName(), indexColumnFamily, ComparatorType.DYNAMICCOMPOSITETYPE);
        indexColumnFamilyDef.setComparatorTypeAlias(DynamicComposite.DEFAULT_DYNAMIC_COMPOSITE_ALIASES);
        CassidyUtils.createColumnFamily(cluster, indexColumnFamilyDef);

        ColumnFamilyDefinition entryColumnFamilyDef = HFactory.createColumnFamilyDefinition(keyspace.getKeyspaceName(), entryColumnFamily, ComparatorType.INTEGERTYPE);
        CassidyUtils.createColumnFamily(cluster, entryColumnFamilyDef);
    }

//----------------------------------------------------------------------------------------------------------------------
// IndexDao Implementation
//----------------------------------------------------------------------------------------------------------------------


    @Override
    public void delete(K itemKey) {
        final Mutator<String> indexMutator = indexMutator();
        final Mutator<DynamicComposite> entryMutator = entryMutator();

        performCleanup(itemKey, indexMutator, entryMutator);

        indexMutator.execute();
        entryMutator.execute();
    }

    @Override
    public List<K> find(Object... values) {
        DynamicComposite start = new DynamicComposite();
        DynamicComposite end = new DynamicComposite();
        for (int i = 0; i < values.length; i++) {
            Object value = values[i];
            start.addComponent(i, value, AbstractComposite.ComponentEquality.EQUAL);
            end.addComponent(i, value, i == values.length - 1 ? AbstractComposite.ComponentEquality.GREATER_THAN_EQUAL : AbstractComposite.ComponentEquality.EQUAL);
        }
        final SliceQuery<String, DynamicComposite, K> query = HFactory.createSliceQuery(keyspace, StringSerializer.get(), DynamicCompositeSerializer.get(), keySerializer);
        query.setKey(indexName);
        query.setColumnFamily(indexColumnFamily);
        query.setRange(start, end, false, Integer.MAX_VALUE);
        List<K> keys = new LinkedList<>();
        for (HColumn<DynamicComposite, K> column : query.execute().get().getColumns()) {
            keys.add(column.getValue());
        }
        return keys;
    }

    @Override
    public void index(K itemKey, V item) {
        final long timestamp = HFactory.createClock();

        final Mutator<String> indexMutator = indexMutator();
        final Mutator<DynamicComposite> entryMutator = entryMutator();

        performCleanup(itemKey, indexMutator, entryMutator);

        List<IndexValue<?>> indexValues = indexer.index(item);

        insertIndexColumn(itemKey, timestamp, indexValues, indexMutator);
        insertEntryColumn(itemKey, timestamp, indexValues, entryMutator);

        indexMutator.execute();
        entryMutator.execute();
    }

//----------------------------------------------------------------------------------------------------------------------
// Other Methods
//----------------------------------------------------------------------------------------------------------------------

    private Mutator<DynamicComposite> entryMutator() {
        return HFactory.createMutator(keyspace, DYNAMIC_COMPOSITE_SERIALIZER);
    }

    private Mutator<String> indexMutator() {
        return HFactory.createMutator(keyspace, STRING_SERIALIZER);
    }

    private void insertEntryColumn(K itemKey, long timestamp, List<IndexValue<?>> indexValues, Mutator<DynamicComposite> entryMutator) {
        DynamicComposite entryValue = new DynamicComposite();
        IndexValue.addAll(indexValues, entryValue);
        final HColumn<Long, DynamicComposite> entryColumn = HFactory.createColumn(timestamp, entryValue, LONG_SERIALIZER, DYNAMIC_COMPOSITE_SERIALIZER);
        entryMutator.addInsertion(entryKey(itemKey), entryColumnFamily, entryColumn);
    }

    private void insertIndexColumn(K itemKey, long timestamp, List<IndexValue<?>> indexValues, Mutator<String> indexMutator) {
        DynamicComposite indexColumnName = new DynamicComposite();
        IndexValue.addAll(indexValues, indexColumnName);
        indexColumnName.addComponent(itemKey, keySerializer);
        indexColumnName.addComponent(timestamp, LONG_SERIALIZER);
        final HColumn<DynamicComposite, K> indexColumn = HFactory.createColumn(indexColumnName, itemKey, DYNAMIC_COMPOSITE_SERIALIZER, keySerializer);
        indexMutator.addInsertion(indexName, indexColumnFamily, indexColumn);
    }

    private void performCleanup(K key, Mutator<String> indexMutator, Mutator<DynamicComposite> entryMutator) {
        final DynamicComposite entryKey = entryKey(key);
        final List<HColumn<Long, DynamicComposite>> entries = getEntries(entryKey);
        for (HColumn<Long, DynamicComposite> entry : entries) {
            final Long entryTimestamp = entry.getName();

            DynamicComposite indexColumnName = new DynamicComposite();
            appendComponents(entry.getValue(), indexColumnName);
            indexColumnName.addComponent(key, keySerializer);
            indexColumnName.addComponent(entryTimestamp, LONG_SERIALIZER);

            indexMutator.addDeletion(indexName, indexColumnFamily, indexColumnName, DYNAMIC_COMPOSITE_SERIALIZER);
            entryMutator.addDeletion(entryKey, entryColumnFamily, entryTimestamp, LONG_SERIALIZER);
        }
    }

    private DynamicComposite entryKey(K itemKey) {
        DynamicComposite entryKey = new DynamicComposite();
        entryKey.addComponent(itemKey, keySerializer);
        entryKey.addComponent(indexName, STRING_SERIALIZER);
        return entryKey;
    }

    private List<HColumn<Long, DynamicComposite>> getEntries(DynamicComposite entryKey) {
        final SliceQuery<DynamicComposite, Long, DynamicComposite> entryQuery = HFactory.createSliceQuery(keyspace, DYNAMIC_COMPOSITE_SERIALIZER, LONG_SERIALIZER, DYNAMIC_COMPOSITE_SERIALIZER);
        entryQuery.setKey(entryKey);
        entryQuery.setColumnFamily(entryColumnFamily);
        entryQuery.setRange(null, null, false, Integer.MAX_VALUE);

        final QueryResult<ColumnSlice<Long, DynamicComposite>> result = entryQuery.execute();
        final ColumnSlice<Long, DynamicComposite> slice = result.get();
        return slice.getColumns();
    }

    private void appendComponents(DynamicComposite src, DynamicComposite dest) {
        for (AbstractComposite.Component<?> component : src.getComponents()) {
            appendComponent(dest, component);
        }
    }

    private <T> void appendComponent(DynamicComposite composite, AbstractComposite.Component<T> component) {
        composite.addComponent(component.getValue(), component.getSerializer());
    }
}
TOP

Related Classes of com.carmanconsulting.cassidy.index.def.DefaultIndexDao

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.