Package backtype.storm.contrib.signals

Source Code of backtype.storm.contrib.signals.StormSignalConnection

package backtype.storm.contrib.signals;

import java.util.Iterator;
import java.util.List;
import java.util.Map;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import backtype.storm.utils.Utils;

import com.netflix.curator.framework.CuratorFramework;
import com.netflix.curator.framework.CuratorFrameworkFactory;
import com.netflix.curator.retry.RetryNTimes;

public class StormSignalConnection extends AbstractSignalConnection {
    private static final Logger LOG = LoggerFactory.getLogger(StormSignalConnection.class);

    public StormSignalConnection(String name, SignalListener listener) {
        this.name = name;
        this.listener = listener;
    }

    @SuppressWarnings("rawtypes")
    public void init(Map conf) throws Exception {
        String connectString = zkHosts(conf);
        int retryCount = Utils.getInt(conf.get("storm.zookeeper.retry.times"));
        int retryInterval = Utils.getInt(conf.get("storm.zookeeper.retry.interval"));

        this.client = CuratorFrameworkFactory.builder().namespace(namespace).connectString(connectString)
                .retryPolicy(new RetryNTimes(retryCount, retryInterval)).build();
        this.client.start();

        super.initWatcher();
    }

    @SuppressWarnings({ "rawtypes", "unchecked" })
    private static String zkHosts(Map conf) {
        int zkPort = Utils.getInt(conf.get("storm.zookeeper.port"));
        List<String> zkServers = (List<String>) conf.get("storm.zookeeper.servers");

        Iterator<String> it = zkServers.iterator();
        StringBuffer sb = new StringBuffer();
        while (it.hasNext()) {
            sb.append(it.next());
            sb.append(":");
            sb.append(zkPort);
            if (it.hasNext()) {
                sb.append(",");
            }
        }
        return sb.toString();
    }
}
TOP

Related Classes of backtype.storm.contrib.signals.StormSignalConnection

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.