Package org.springframework.integration.voldemort.store

Source Code of org.springframework.integration.voldemort.store.VoldemortMessageStore$AddKeyUpdateAction

/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*      http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.voldemort.store;

import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.locks.Lock;

import org.springframework.beans.factory.InitializingBean;
import org.springframework.integration.store.AbstractKeyValueMessageStore;
import org.springframework.integration.store.MessageGroupStore;
import org.springframework.integration.store.MessageStore;
import org.springframework.integration.util.DefaultLockRegistry;
import org.springframework.integration.util.LockRegistry;
import org.springframework.integration.util.UUIDConverter;
import voldemort.client.StoreClient;
import voldemort.client.UpdateAction;
import voldemort.serialization.SerializationException;
import voldemort.versioning.Versioned;

/**
* Voldemort implementation of the key-value style {@link MessageStore} and {@link MessageGroupStore}.
* Implementation note: message identifiers are persisted as {@link String}s.
*
* @author Lukasz Antoniak
* @since 1.0
*/
@SuppressWarnings("unchecked")
public class VoldemortMessageStore extends AbstractKeyValueMessageStore implements InitializingBean {
  /**
   * Key under which message store tracks all currently saved message identifiers.
   */
  protected static final String MESSAGE_KEY_LIST = "MESSAGE_KEY_LIST";

  /**
   * Key under which message store tracks all currently saved message group identifiers.
   */
  protected static final String MESSAGE_GROUP_KEY_LIST = "MESSAGE_GROUP_KEY_LIST";

  private static final LockRegistry LOCK_REGISTRY = new DefaultLockRegistry();

  private final StoreClient client;

  public VoldemortMessageStore(StoreClient client) {
    this.client = client;
  }

  @Override
  public void afterPropertiesSet() throws Exception {
    // Initialize empty set of message and message group identifiers if needed.
    synchronized ( getClass() ) {
      if ( client.get( MESSAGE_KEY_LIST ) == null ) {
        client.put( MESSAGE_KEY_LIST, new HashSet<Object>() );
      }
      if ( client.get( MESSAGE_GROUP_KEY_LIST ) == null ) {
        client.put( MESSAGE_GROUP_KEY_LIST, new HashSet<Object>() );
      }
    }
  }

  @Override
  protected Object doRetrieve(Object id) {
    final Versioned version = client.get( id );
    if ( version != null ) {
      return version.getValue();
    }
    return null;
  }

  @Override
  protected void doStore(final Object id, final Object objectToStore) {
    final Lock messageLock = doLock( id );
    try {
      client.put( id, objectToStore );
      // Keeping track of stored message identifiers for efficient implementation of #doListKeys(String) method.
      // Voldemort does not provide API to list all keys matching specified pattern.
      final String keyListKey = computeKeyListKey( id );
      final Lock keyListLock = doLock( keyListKey );
      try {
        client.applyUpdate( new AddKeyUpdateAction( id, keyListKey ) );
      }
      finally {
        keyListLock.unlock();
      }
    }
    catch ( SerializationException e ) {
      throw new IllegalArgumentException( "Voldemort failed to serialize message with id: " + id + ".", e );
    }
    finally {
      messageLock.unlock();
    }
  }

  @Override
  protected Object doRemove(final Object id) {
    final Object message = doRetrieve( id );
    final Lock messageLock = doLock( id );
    try {
      client.delete( id );
      // Keeping track of stored message identifiers for efficient implementation of #doListKeys(String) method.
      // Voldemort does not provide API to list all keys matching specified pattern.
      final String keyListKey = computeKeyListKey( id );
      final Lock keyListLock = doLock( keyListKey );
      try {
        client.applyUpdate( new RemoveKeyUpdateAction( id, keyListKey ) );
      }
      finally {
        keyListLock.unlock();
      }
      return message;
    }
    finally {
      messageLock.unlock();
    }
  }

  @Override
  protected Collection<?> doListKeys(String keyPattern) {
    return Collections.unmodifiableSet( (Set<Object>) client.get( computeKeyListKey( keyPattern ) ).getValue() );
  }

  /**
   * @param id Message or message group identifier. {@link String} type required.
   * @return Identifier under which set of all currently persisted message or message group keys is saved.
   */
  private String computeKeyListKey(Object id) {
    final String key = (String) id;
    if ( isMessageGroupKey( key ) ) {
      return MESSAGE_GROUP_KEY_LIST;
    }
    else if ( isMessageKey( key ) ) {
      return MESSAGE_KEY_LIST;
    }
    else {
      throw new IllegalArgumentException("Unsupported identifier: " + key + ".");
    }
  }

  /**
   * @param key Message or message group identifier.
   * @return {@code true} in case of message identifier, {@code false} otherwise.
   */
  private boolean isMessageKey(String key) {
    return key.startsWith( AbstractKeyValueMessageStore.MESSAGE_KEY_PREFIX );
  }

  /**
   * @param key Message or message group identifier.
   * @return {@code true} in case of message group identifier, {@code false} otherwise.
   */
  private boolean isMessageGroupKey(String key) {
    return key.startsWith( AbstractKeyValueMessageStore.MESSAGE_GROUP_KEY_PREFIX );
  }

  /**
   * Acquire JVM wide lock on the given object.
   * @param obj Object.
   * @return Lock.
   */
  private Lock doLock(Object obj) {
    final Lock lock = LOCK_REGISTRY.obtain( UUIDConverter.getUUID( obj ).toString() );
    lock.lock();
    return lock;
  }

  /**
   * Voldemort update action that adds given key to the list of currently saved identifiers.
   */
  private static final class AddKeyUpdateAction extends UpdateAction {
    private final Object id;
    private final String keyListKey;

    /**
     * The only constructor.
     *
     * @param id Message or message group identifier.
     * @param keyListKey Key under which Voldemort stores set of all currently persisted identifiers.
     */
    private AddKeyUpdateAction(Object id, String keyListKey) {
      this.id = id;
      this.keyListKey = keyListKey;
    }

    @Override
    public void update(StoreClient storeClient) {
      final Set<Object> keys = (Set<Object>) storeClient.get( keyListKey ).getValue();
      if ( keys.add( id ) ) {
        storeClient.put( keyListKey, keys );
      }
    }
  }

  /**
   * Voldemort update action that removes given key from the list of currently saved identifiers.
   */
  private static final class RemoveKeyUpdateAction extends UpdateAction {
    private final Object id;
    private final String keyListKey;

    /**
     * The only constructor.
     *
     * @param id Message or message group identifier.
     * @param keyListKey Key under which Voldemort stores set of all currently persisted identifiers.
     */
    private RemoveKeyUpdateAction(Object id, String keyListKey) {
      this.id = id;
      this.keyListKey = keyListKey;
    }

    @Override
    public void update(StoreClient storeClient) {
      final Set<Object> keys = (Set<Object>) storeClient.get( keyListKey ).getValue();
      if ( keys.remove( id ) ) {
        storeClient.put( keyListKey, keys );
      }
    }
  }
}
TOP

Related Classes of org.springframework.integration.voldemort.store.VoldemortMessageStore$AddKeyUpdateAction

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.