Package org.projectx.zookeeper

Source Code of org.projectx.zookeeper.ZkClientFactoryBean

package org.projectx.zookeeper;

import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkConnection;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.IZkStateListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.I0Itec.zkclient.serialize.BytesPushThroughSerializer;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.util.Assert;

/**
* A {@link FactoryBean} implementation for {@link ZkClient} objects.
* <p>
* This class uses either {@link #setZkConnection(ZkConnection) zkConnection} or
* {@link #setEnsemble(String) ensemble} to construct a {@link ZkClient} (in
* that order of preference). If none is provided an exception will be thrown
* durinig initialization.
* <p>
* The {@link #setZkConnection(ZkConnection)} method allows for provisioning a
* different {@link IZkConnection} implementation (possibly for testing
* purposes).
* <p>
*
* @author Erez Mazor (erezmazor@gmail.com)
*/
public class ZkClientFactoryBean implements FactoryBean<ZkClient>, InitializingBean, DisposableBean {

  private String ensemble;
  private int connectionTimeout = 20000; // 20 seconds time default to connect
  private int sessionTimeout = Integer.MAX_VALUE;
  private ZkSerializer serializer = new BytesPushThroughSerializer();
  private ZkClient zkClient;
  private IZkConnection zkConnection;

  private List<IZkStateListener> stateListeners;
  private Map<String, IZkChildListener> childListeners;
  private Map<String, IZkDataListener> dataListeners;

  public void setEnsemble(final String ensemble) {
    Assert.hasText(ensemble, "ensemble must be in the form host:port");
    this.ensemble = ensemble;
  }

  /**
   * The connection timeout is used only when {@link #setEnsemble(String)} is
   * used, it is ignored when {@link #setZkConnection(ZkConnection)} is
   * provided.
   *
   * @param connectionTimeout
   */
  public void setConnectionTimeout(final int connectionTimeout) {
    this.connectionTimeout = connectionTimeout;
  }

  public void setSessionTimeout(final int sessionTimeout) {
    this.sessionTimeout = sessionTimeout;
  }

  public void setSerializer(final ZkSerializer serializer) {
    this.serializer = serializer;
  }

  /**
   * An optional property, when provided this will override the value provided
   * by the {@link #setEnsemble(String)} property.
   *
   * @param zkConnection
   */
  public void setZkConnection(final IZkConnection zkConnection) {
    this.zkConnection = zkConnection;
  }

  /**
   * Provide a collection of {@link IZkStateListener} to handle global state
   * changes in Zookeeper. These listeners will be registered after
   * {@link ZkClient} creation
   *
   * @param stateListeners
   *          a collection of {@link IZkStateListener}
   */
  public void setStateListeners(final List<IZkStateListener> stateListeners) {
    this.stateListeners = stateListeners;
  }

  /**
   * Provide a map of child paths wiht associated {@link IZkChildListener} to
   * handle child changes in Zookeeper. These listeners will be registered after
   * {@link ZkClient} creation.
   *
   * Note that child listeners can be set up on paths that do not exist yet in
   * Zookeeper as "future" path watches, theses will be invoked upon the
   * creation of the designated path in Zookeper
   *
   * @param childListeners
   *          a map of path to {@link IZkChildListener}
   */
  public void setChildListeners(final Map<String, IZkChildListener> childListeners) {
    this.childListeners = childListeners;
  }

  /**
   * Provide a map of child paths wiht associated {@link IZkDataListener} to
   * handle data changes in Zookeeper. These listeners will be registered after
   * {@link ZkClient} creation.
   *
   * Note that data listeners can be set up on paths that do not exist yet in
   * Zookeeper as "future" path watches, theses will be invoked upon the
   * creation of the designated path in Zookeper
   *
   * @param dataListeners
   *          a map of path to {@link IZkDataListener}
   */
  public void setDataListeners(final Map<String, IZkDataListener> dataListeners) {
    this.dataListeners = dataListeners;
  }

  @Override
  public ZkClient getObject() throws Exception {
    return zkClient;
  }

  @Override
  public Class<ZkClient> getObjectType() {
    return ZkClient.class;
  }

  @Override
  public boolean isSingleton() {
    return true;
  }

  @Override
  public void afterPropertiesSet() throws Exception {

    if (null != zkConnection) {
      zkClient = new ZkClient(zkConnection, connectionTimeout, serializer);
    } else if (null != ensemble) {
      zkClient = new ZkClient(ensemble, sessionTimeout, connectionTimeout, serializer);
    } else {
      throw new IllegalArgumentException(
          "Either zkConnection or ensemble must be provided to create a ZkClient instance.");
    }

    registerListeners(zkClient);
  }

  /**
   * Register all supplied listeners if any.
   *
   * @param zkClient
   */
  protected void registerListeners(final ZkClient zkClient) {
    if (null != stateListeners) {
      for (final IZkStateListener stateListener : stateListeners) {
        zkClient.subscribeStateChanges(stateListener);

      }
    }

    if (null != childListeners) {
      for (final Entry<String, IZkChildListener> entry : childListeners.entrySet()) {
        zkClient.subscribeChildChanges(entry.getKey(), entry.getValue());
      }
    }

    if (null != dataListeners) {
      for (final Entry<String, IZkDataListener> entry : dataListeners.entrySet()) {
        zkClient.subscribeDataChanges(entry.getKey(), entry.getValue());
      }
    }

  }

  @Override
  public void destroy() throws Exception {
    zkClient.close();
  }
}
TOP

Related Classes of org.projectx.zookeeper.ZkClientFactoryBean

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.