Package org.apache.hedwig.admin

Source Code of org.apache.hedwig.admin.HedwigAdmin

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hedwig.admin;

import java.util.Arrays;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.hedwig.protocol.PubSubProtocol.LedgerRange;
import org.apache.hedwig.protocol.PubSubProtocol.LedgerRanges;
import org.apache.hedwig.protocol.PubSubProtocol.Message;
import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionState;
import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
import org.apache.hedwig.server.common.ServerConfiguration;
import org.apache.hedwig.util.HedwigSocketAddress;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;

/**
* Hedwig Admin
*/
public class HedwigAdmin {
    static final Logger LOG = LoggerFactory.getLogger(HedwigAdmin.class);

    // NOTE: now it is fixed passwd used in hedwig
    static byte[] passwd = "sillysecret".getBytes();

    protected ZooKeeper zk;
    protected BookKeeper bk;
    // hub configurations
    protected ServerConfiguration serverConf;
    // bookkeeper configurations
    protected ClientConfiguration bkClientConf;

    // Empty watcher
    private static class MyWatcher implements Watcher {
        public void process(WatchedEvent event) {
        }
    }

    /**
     * Hedwig Admin Constructor
     *
     * @param bkConf
     *          BookKeeper Client Configuration.
     * @param hubConf
     *          Hub Server Configuration.
     * @throws Exception
     */
    public HedwigAdmin(ClientConfiguration bkConf, ServerConfiguration hubConf) throws Exception {
        this.serverConf = hubConf;
        this.bkClientConf = bkConf;

        // connect to zookeeper
        zk = new ZooKeeper(hubConf.getZkHost(), hubConf.getZkTimeout(), new MyWatcher());
        if (LOG.isDebugEnabled()) {
            LOG.debug("Connecting to zookeeper " + hubConf.getZkHost() + ", timeout = "
                    + hubConf.getZkTimeout());
        }

        // connect to bookkeeper
        bk = new BookKeeper(bkClientConf, zk);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Connecting to bookkeeper");
        }
    }

    /**
     * Close the hedwig admin.
     *
     * @throws Exception
     */
    public void close() throws Exception {
        bk.close();
        zk.close();
    }

    /**
     * Return zookeeper handle used in hedwig admin.
     *
     * @return zookeeper handle
     */
    public ZooKeeper getZkHandle() {
        return zk;
    }

    /**
     * Return bookkeeper handle used in hedwig admin.
     *
     * @return bookkeeper handle
     */
    public BookKeeper getBkHandle() {
        return bk;
    }

    /**
     * Return hub server configuration used in hedwig admin
     *
     * @return hub server configuration
     */
    public ServerConfiguration getHubServerConf() {
        return serverConf;
    }

    /**
     * Return bookeeper passwd used in hedwig admin
     *
     * @return bookeeper passwd
     */
    public byte[] getBkPasswd() {
        return Arrays.copyOf(passwd, passwd.length);
    }

    /**
     * Return digest type used in hedwig admin
     *
     * @return bookeeper digest type
     */
    public DigestType getBkDigestType() {
        return DigestType.CRC32;
    }

    /**
     * Dose topic exist?
     *
     * @param topic
     *            Topic name
     * @return whether topic exists or not?
     * @throws Exception
     */
    public boolean hasTopic(ByteString topic) throws Exception {
        String topicPath = serverConf.getZkTopicPath(new StringBuilder(), topic).toString();
        Stat stat = zk.exists(topicPath, false);
        return null != stat;
    }

    /**
     * Get available hubs.
     *
     * @return available hubs and their loads
     * @throws Exception
     */
    public Map<HedwigSocketAddress, Integer> getAvailableHubs() throws Exception {
        String zkHubsPath = serverConf.getZkHostsPrefix(new StringBuilder()).toString();
        Map<HedwigSocketAddress, Integer> hubs =
            new HashMap<HedwigSocketAddress, Integer>();
        List<String> hosts = zk.getChildren(zkHubsPath, false);
        for (String host : hosts) {
            String zkHubPath = serverConf.getZkHostsPrefix(new StringBuilder())
                                         .append("/").append(host).toString();
            int load = 0;
            try {
                Stat stat = new Stat();
                byte[] data = zk.getData(zkHubPath, false, stat);
                if (data != null) {
                    load = Integer.parseInt(new String(data));
                }
            } catch (KeeperException ke) {
                LOG.warn("Couldn't read hub data from ZooKeeper", ke);
            } catch (InterruptedException ie) {
                LOG.warn("Interrupted during read", ie);
            }
            hubs.put(new HedwigSocketAddress(host), load);
        }
        return hubs;
    }

    /**
     * Get list of topics
     *
     * @return list of topics
     * @throws Exception
     */
    public List<String> getTopics() throws Exception {
        return zk.getChildren(serverConf.getZkTopicsPrefix(new StringBuilder()).toString(), false);
    }

    /**
     * Return the znode path of owner of a topic
     *
     * @param topic
     *            Topic name
     * @return znode path of owner of a topic
     */
    String hubPath(ByteString topic) {
        return serverConf.getZkTopicPath(new StringBuilder(), topic).append("/hub").toString();
    }

    /**
     * Return the topic owner of a topic
     *
     * @param topic
     *            Topic name
     * @return the address of the owner of a topic
     * @throws Exception
     */
    public HedwigSocketAddress getTopicOwner(ByteString topic) throws Exception {
        Stat stat = new Stat();
        byte[] owner = null;
        try {
            owner = zk.getData(hubPath(topic), false, stat);
        } catch (KeeperException.NoNodeException nne) {
        }
        if (null == owner) {
            return null;
        }
        return new HedwigSocketAddress(new String(owner));
    }

    /**
     * Return the znode path to store ledgers info of a topic
     *
     * @param topic
     *          Topic name
     * @return znode path to store ledgers info of a topic
     */
    String ledgersPath(ByteString topic) {
        return serverConf.getZkTopicPath(new StringBuilder(), topic).append("/ledgers").toString();
    }

    /**
     * Return the ledger range forming the topic
     *
     * @param topic
     *          Topic name
     * @return ledger ranges forming the topic
     * @throws Exception
     */
    public List<LedgerRange> getTopicLedgers(ByteString topic) throws Exception {
        LedgerRanges ranges = null;
        try {
            Stat stat = new Stat();
            byte[] ledgersData = zk.getData(ledgersPath(topic), false, stat);
            if (null != ledgersData) {
                ranges = LedgerRanges.parseFrom(ledgersData);
            }
        } catch (KeeperException.NoNodeException nne) {
        }
        if (null == ranges) {
            return null;
        }
        List<LedgerRange> lrs = ranges.getRangesList();
        if (lrs.isEmpty()) {
            return lrs;
        }
        // try to check last ledger (it may still open)
        LedgerRange lastRange = lrs.get(lrs.size() - 1);
        if (lastRange.hasEndSeqIdIncluded()) {
            return lrs;
        }
        // read last confirmed of the opened ledger
        try {
            List<LedgerRange> newLrs = new ArrayList<LedgerRange>();
            newLrs.addAll(lrs);
            lrs = newLrs;
            MessageSeqId lastSeqId;
            if (lrs.size() == 1) {
                lastSeqId = MessageSeqId.newBuilder().setLocalComponent(1).build();
            } else {
                lastSeqId = lrs.get(lrs.size() - 2).getEndSeqIdIncluded();
            }
            LedgerRange newLastRange = refreshLastLedgerRange(lastSeqId, lastRange);
            lrs.set(lrs.size() - 1, newLastRange);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return lrs;
    }

    /**
     * Refresh last ledger range to get lastConfirmed entry, which make it available to read
     *
     * @param lastSeqId
     *            Last sequence id of previous ledger
     * @param oldRange
     *            Ledger range to set lastConfirmed entry
     */
    LedgerRange refreshLastLedgerRange(MessageSeqId lastSeqId, LedgerRange oldRange)
        throws BKException, KeeperException, InterruptedException {
        LedgerHandle lh = bk.openLedgerNoRecovery(oldRange.getLedgerId(), DigestType.CRC32, passwd);
        long lastConfirmed = lh.readLastConfirmed();
        MessageSeqId newSeqId = MessageSeqId.newBuilder().mergeFrom(lastSeqId)
                                .setLocalComponent(lastSeqId.getLocalComponent() + lastConfirmed).build();
        return LedgerRange.newBuilder().mergeFrom(oldRange).setEndSeqIdIncluded(newSeqId).build();
    }

    /**
     * Return the znode path store all the subscribers of a topic.
     *
     * @param sb
     *          String builder to hold the znode path
     * @param topic
     *          Topic name
     */
    private StringBuilder topicSubscribersPath(StringBuilder sb, ByteString topic) {
        return serverConf.getZkTopicPath(sb, topic).append("/subscribers");
    }

    /**
     * Return the znode path of a subscriber of a topic.
     *
     * @param topic
     *          Topic name
     * @param subscriber
     *          Subscriber name
     */

    private String topicSubscriberPath(ByteString topic, ByteString subscriber) {
        return topicSubscribersPath(new StringBuilder(), topic).append("/")
               .append(subscriber.toStringUtf8()).toString();
    }

    /**
     * Return subscriptions of a topic
     *
     * @param topic
     *          Topic name
     * @return subscriptions of a topic
     * @throws Exception
     */
    public Map<ByteString, SubscriptionState> getTopicSubscriptions(ByteString topic)
        throws Exception {
        Map<ByteString, SubscriptionState> states =
            new HashMap<ByteString, SubscriptionState>();
        try {
            String subsPath = topicSubscribersPath(new StringBuilder(), topic).toString();
            List<String> children = zk.getChildren(subsPath, false);
            for (String child : children) {
                ByteString subscriberId = ByteString.copyFromUtf8(child);
                String subPath = topicSubscriberPath(topic, subscriberId);
                Stat stat = new Stat();
                byte[] subData = zk.getData(subPath, false, stat);
                if (null == subData) {
                    continue;
                }
                SubscriptionState state = SubscriptionState.parseFrom(subData);
                states.put(subscriberId, state);
            }
        } catch (KeeperException.NoNodeException nne) {
        }
        return states;
    }

    /**
     * Return subscription state of a subscriber of topic
     *
     * @param topic
     *          Topic name
     * @param subscriber
     *          Subscriber name
     * @return subscription state
     * @throws Exception
     */
    public SubscriptionState getSubscription(ByteString topic, ByteString subscriber) throws Exception {
        String subPath = topicSubscriberPath(topic, subscriber);
        Stat stat = new Stat();
        byte[] subData = null;
        try {
            subData = zk.getData(subPath, false, stat);
        } catch (KeeperException.NoNodeException nne) {
        }
        if (null == subData) {
            return null;
        }
        return SubscriptionState.parseFrom(subData);
    }
}
TOP

Related Classes of org.apache.hedwig.admin.HedwigAdmin

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.