Package ch.uzh.ifi.ddis.ifp.esper.cassandra

Source Code of ch.uzh.ifi.ddis.ifp.esper.cassandra.CassandraVirtualDataWindowFactory

package ch.uzh.ifi.ddis.ifp.esper.cassandra;

/*
* #%L
* Cassandra for Esper
* %%
* Copyright (C) 2013 University of Zurich
* %%
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as
* published by the Free Software Foundation, either version 2 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this program.  If not, see
* <http://www.gnu.org/licenses/gpl-2.0.html>.
* #L%
*/

import java.util.Properties;
import java.util.Set;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Cluster.Builder;
import com.datastax.driver.core.Session;
import com.espertech.esper.client.hook.VirtualDataWindowContext;
import com.espertech.esper.client.hook.VirtualDataWindowFactory;
import com.espertech.esper.client.hook.VirtualDataWindowFactoryContext;

/**
* <p>
* Factory for creating {@link CassandraVirtualDataWindow} objects.
* </p>
*
* @author Thomas Scharrenbach
* @version 0.0.1
* @since 0.0.1
*
*/
public class CassandraVirtualDataWindowFactory implements
    VirtualDataWindowFactory {

  private static final Logger _log = LoggerFactory
      .getLogger(CassandraVirtualDataWindowFactory.class);

  //
  //
  //

  /**
   * Key for the system {@link Properties}, which stores the username for
   * authenticating to Cassandra.
   */
  public static final String CASSANDRA_USERNAME = "cassandra-username";

  /**
   * Key for the system {@link Properties}, which stores the password for
   * authenticating to Cassandra.
   */
  public static final String CASSANDRA_PASSWORD = "cassandra-password";

  //
  //
  //

  private Cluster _cluster;

  private CassandraConfiguration _configuration;

  //
  //
  //

  public CassandraVirtualDataWindowFactory() {
    return;
  }

  //
  //
  //

  /**
   * <p>
   * Initializes the Cassandra {@link Cluster}.
   * </p>
   *
   * @throws java.lang.RuntimeException
   *             In case the connection to Cassandra fails.
   */
  @Override
  public void initialize(VirtualDataWindowFactoryContext factoryContext) {
    final Object[] windowParameters = factoryContext.getParameters();

    if (windowParameters.length < 1) {
      throw new RuntimeException(String.format(
          "Cassandra window has at least one parameter. "
              + "Found %s", windowParameters.length));
    }

    final String parameter = windowParameters[0].toString().trim();

    _log.info("Started parsing Cassandra configuration...");
    try {
      _configuration = CassandraConfiguration.create(parameter);
    } catch (Exception pe) {
      _log.error("Error parsing Cassandra configuration!");
      throw new RuntimeException(pe);
    }
    _log.info("Finished parsing Cassandra configuration.");

    final String node = _configuration.getHost();
    final String username = System.getProperty(CASSANDRA_USERNAME, null);
    final String password = System.getProperty(CASSANDRA_PASSWORD, null);
    Builder clusterBuilder = Cluster.builder();

    try {
      _log.info("Started connecting to cluster: {} ...", node);
      _log.debug("Adding contact point...");
      clusterBuilder = clusterBuilder.addContactPoint(node);

      if (username != null && password != null) {
        _log.debug("Adding credentials...");
        clusterBuilder = clusterBuilder.withCredentials(username,
            password);
      }

      _log.debug("Building cluster...");
      _cluster = clusterBuilder.build();

      _log.info("Finished connecting to cluster: {} .", node);

    } catch (Exception e) {
      _log.error("Error connecting to cluster: {} !", node);

      final String errorMessage = String.format(
          "Could not connect to Cassandra cluster!", node);
      throw new RuntimeException(errorMessage, e);
    }

  }

  /**
   * <p>
   * Creates a new {@link CassandraVirtualDataWindow} by creating a Cassandra
   * {@link Session}.
   * </p>
   * <p>
   * <strong>Note: This method is thread-safe.</strong>
   * </p>
   *
   * @throws RuntimeException
   *             In case the cluster is null or the creation of the session
   *             fails.
   */
  @Override
  public CassandraVirtualDataWindow create(VirtualDataWindowContext context) {
    if (_cluster == null) {
      throw new RuntimeException(
          "Cassandra cluster has not yet been initialized!");
    }
    CassandraVirtualDataWindow result = null;
    synchronized (_cluster) {
      try {
        _log.info("Started connecting to cluster...");
        final Session session = _cluster.connect(_configuration
            .getKeyspace());
        _log.info("Finished connecting to cluster...");

        result = new CassandraVirtualDataWindow(session,
            _configuration.getTable(), context);
      } catch (Exception e) {
        _log.error("Error connecting to cluster!");
        throw new RuntimeException(e);
      }
    }
    return result;
  }

  /**
   * <p>
   * Shuts down the connection to the Cassandra {@link Cluster}.
   * </p>
   */
  @Override
  public void destroyAllContextPartitions() {
    close();
  }

  /**
   * @return null.
   */
  @Override
  public Set<String> getUniqueKeyPropertyNames() {
    return null;
  }

  //
  //
  //

  /**
   * <p>
   * Disconnect from Cassandra {@link Cluster}.
   * </p>
   */
  protected void close() {
    try {
      _log.info("Started shutting down connection to Cassandra cluster...");
      _cluster.shutdown();
      _log.info("Finished shutting down connection to Cassandra cluster.");
    }
    //
    catch (Exception e) {
      _log.error("Error shutting down connection to Cassandra cluster!",
          e);
    }
  }

}
TOP

Related Classes of ch.uzh.ifi.ddis.ifp.esper.cassandra.CassandraVirtualDataWindowFactory

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.