Package org.apache.hedwig.admin

Source Code of org.apache.hedwig.admin.HedwigAdmin$HubStats

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hedwig.admin;

import java.util.Arrays;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.versioning.Versioned;
import org.apache.hedwig.exceptions.PubSubException;
import org.apache.hedwig.protocol.PubSubProtocol.LedgerRange;
import org.apache.hedwig.protocol.PubSubProtocol.LedgerRanges;
import org.apache.hedwig.protocol.PubSubProtocol.Message;
import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionData;
import org.apache.hedwig.server.common.ServerConfiguration;
import org.apache.hedwig.server.meta.MetadataManagerFactory;
import org.apache.hedwig.server.meta.FactoryLayout;
import org.apache.hedwig.server.meta.SubscriptionDataManager;
import org.apache.hedwig.server.meta.TopicOwnershipManager;
import org.apache.hedwig.server.meta.TopicPersistenceManager;
import org.apache.hedwig.server.subscriptions.InMemorySubscriptionState;
import org.apache.hedwig.server.topics.HubInfo;
import org.apache.hedwig.server.topics.HubLoad;
import org.apache.hedwig.util.Callback;
import org.apache.hedwig.util.HedwigSocketAddress;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;

/**
* Hedwig Admin
*/
public class HedwigAdmin {
    static final Logger LOG = LoggerFactory.getLogger(HedwigAdmin.class);

    // NOTE: now it is fixed passwd used in hedwig
    static byte[] passwd = "sillysecret".getBytes();

    protected final ZooKeeper zk;
    protected final BookKeeper bk;
    protected final MetadataManagerFactory mmFactory;
    protected final SubscriptionDataManager sdm;
    protected final TopicOwnershipManager tom;
    protected final TopicPersistenceManager tpm;

    // hub configurations
    protected final ServerConfiguration serverConf;
    // bookkeeper configurations
    protected final ClientConfiguration bkClientConf;

    protected final CountDownLatch zkReadyLatch = new CountDownLatch(1);

    // Empty watcher
    private class MyWatcher implements Watcher {
        public void process(WatchedEvent event) {
            if (Event.KeeperState.SyncConnected.equals(event.getState())) {
                zkReadyLatch.countDown();
            }
        }
    }

    static class SyncObj<T> {
        boolean finished = false;
        boolean success = false;
        T value = null;
        PubSubException exception = null;

        synchronized void success(T v) {
            finished = true;
            success = true;
            value = v;
            notify();
        }

        synchronized void fail(PubSubException pse) {
            finished = true;
            success = false;
            exception = pse;
            notify();
        }

        synchronized void block() {
            try {
                while (!finished) {
                    wait();
                }
            } catch (InterruptedException ie) {
            }
        }

        synchronized boolean isSuccess() {
            return success;
        }
    }

    /**
     * Stats of a hub
     */
    public static class HubStats {
        HubInfo hubInfo;
        HubLoad hubLoad;

        public HubStats(HubInfo info, HubLoad load) {
            this.hubInfo = info;
            this.hubLoad = load;
        }

        @Override
        public String toString() {
            StringBuilder sb = new StringBuilder();
            sb.append("info : [").append(hubInfo.toString().trim().replaceAll("\n", ", "))
              .append("], load : [").append(hubLoad.toString().trim().replaceAll("\n", ", "))
              .append("]");
            return sb.toString();
        }
    }

    /**
     * Hedwig Admin Constructor
     *
     * @param bkConf
     *          BookKeeper Client Configuration.
     * @param hubConf
     *          Hub Server Configuration.
     * @throws Exception
     */
    public HedwigAdmin(ClientConfiguration bkConf, ServerConfiguration hubConf) throws Exception {
        this.serverConf = hubConf;
        this.bkClientConf = bkConf;

        // connect to zookeeper
        zk = new ZooKeeper(hubConf.getZkHost(), hubConf.getZkTimeout(), new MyWatcher());
        LOG.debug("Connecting to zookeeper {}, timeout = {}",
                hubConf.getZkHost(), hubConf.getZkTimeout());
        // wait until connection is ready
        if (!zkReadyLatch.await(hubConf.getZkTimeout() * 2, TimeUnit.MILLISECONDS)) {
            throw new Exception("Count not establish connection with ZooKeeper after " + hubConf.getZkTimeout() * 2 + " ms.");
        }

        // construct the metadata manager factory
        mmFactory = MetadataManagerFactory.newMetadataManagerFactory(hubConf, zk);
        tpm = mmFactory.newTopicPersistenceManager();
        tom = mmFactory.newTopicOwnershipManager();
        sdm = mmFactory.newSubscriptionDataManager();

        // connect to bookkeeper
        bk = new BookKeeper(bkClientConf, zk);
        LOG.debug("Connecting to bookkeeper");
    }

    /**
     * Close the hedwig admin.
     *
     * @throws Exception
     */
    public void close() throws Exception {
        tpm.close();
        tom.close();
        sdm.close();
        mmFactory.shutdown();
        bk.close();
        zk.close();
    }

    /**
     * Return zookeeper handle used in hedwig admin.
     *
     * @return zookeeper handle
     */
    public ZooKeeper getZkHandle() {
        return zk;
    }

    /**
     * Return bookkeeper handle used in hedwig admin.
     *
     * @return bookkeeper handle
     */
    public BookKeeper getBkHandle() {
        return bk;
    }

    /**
     * Return hub server configuration used in hedwig admin
     *
     * @return hub server configuration
     */
    public ServerConfiguration getHubServerConf() {
        return serverConf;
    }

    /**
     * Return metadata manager factory.
     *
     * @return metadata manager factory instance.
     */
    public MetadataManagerFactory getMetadataManagerFactory() {
        return mmFactory;
    }

    /**
     * Return bookeeper passwd used in hedwig admin
     *
     * @return bookeeper passwd
     */
    public byte[] getBkPasswd() {
        return Arrays.copyOf(passwd, passwd.length);
    }

    /**
     * Return digest type used in hedwig admin
     *
     * @return bookeeper digest type
     */
    public DigestType getBkDigestType() {
        return DigestType.CRC32;
    }

    /**
     * Dose topic exist?
     *
     * @param topic
     *            Topic name
     * @return whether topic exists or not?
     * @throws Exception
     */
    public boolean hasTopic(ByteString topic) throws Exception {
        // current persistence info is bound with a topic, so if there is persistence info
        // there is topic.
        final SyncObj<Boolean> syncObj = new SyncObj<Boolean>();
        tpm.readTopicPersistenceInfo(topic, new Callback<Versioned<LedgerRanges>>() {
            @Override
            public void operationFinished(Object ctx, Versioned<LedgerRanges> result) {
                if (null == result) {
                    syncObj.success(false);
                } else {
                    syncObj.success(true);
                }
            }
            @Override
            public void operationFailed(Object ctx, PubSubException pse) {
                syncObj.fail(pse);
            }
        }, syncObj);

        syncObj.block();

        if (!syncObj.isSuccess()) {
            throw syncObj.exception;
        }

        return syncObj.value;
    }

    /**
     * Get available hubs.
     *
     * @return available hubs and their loads
     * @throws Exception
     */
    public Map<HedwigSocketAddress, HubStats> getAvailableHubs() throws Exception {
        String zkHubsPath = serverConf.getZkHostsPrefix(new StringBuilder()).toString();
        Map<HedwigSocketAddress, HubStats> hubs =
            new HashMap<HedwigSocketAddress, HubStats>();
        List<String> hosts = zk.getChildren(zkHubsPath, false);
        for (String host : hosts) {
            String zkHubPath = serverConf.getZkHostsPrefix(new StringBuilder())
                                         .append("/").append(host).toString();
            HedwigSocketAddress addr = new HedwigSocketAddress(host);
            try {
                Stat stat = new Stat();
                byte[] data = zk.getData(zkHubPath, false, stat);
                if (data == null) {
                    continue;
                }
                HubLoad load = HubLoad.parse(new String(data));
                HubInfo info = new HubInfo(addr, stat.getCzxid());
                hubs.put(addr, new HubStats(info, load));
            } catch (KeeperException ke) {
                LOG.warn("Couldn't read hub data from ZooKeeper", ke);
            } catch (InterruptedException ie) {
                LOG.warn("Interrupted during read", ie);
            }
        }
        return hubs;
    }

    /**
     * Get list of topics
     *
     * @return list of topics
     * @throws Exception
     */
    public Iterator<ByteString> getTopics() throws Exception {
        return mmFactory.getTopics();
    }

    /**
     * Return the topic owner of a topic
     *
     * @param topic
     *            Topic name
     * @return the address of the owner of a topic
     * @throws Exception
     */
    public HubInfo getTopicOwner(ByteString topic) throws Exception {
        final SyncObj<HubInfo> syncObj = new SyncObj<HubInfo>();
        tom.readOwnerInfo(topic, new Callback<Versioned<HubInfo>>() {
            @Override
            public void operationFinished(Object ctx, Versioned<HubInfo> result) {
                if (null == result) {
                    syncObj.success(null);
                } else {
                    syncObj.success(result.getValue());
                }
            }
            @Override
            public void operationFailed(Object ctx, PubSubException pse) {
                syncObj.fail(pse);
            }
        }, syncObj);

        syncObj.block();

        if (!syncObj.isSuccess()) {
            throw syncObj.exception;
        }

        return syncObj.value;
    }

    private static LedgerRange buildLedgerRange(long ledgerId, long startOfLedger, MessageSeqId endOfLedger) {
        LedgerRange.Builder builder =
            LedgerRange.newBuilder().setLedgerId(ledgerId).setStartSeqIdIncluded(startOfLedger)
                       .setEndSeqIdIncluded(endOfLedger);
        return builder.build();
    }

    /**
     * Return the ledger range forming the topic
     *
     * @param topic
     *          Topic name
     * @return ledger ranges forming the topic
     * @throws Exception
     */
    public List<LedgerRange> getTopicLedgers(ByteString topic) throws Exception {
        final SyncObj<LedgerRanges> syncObj = new SyncObj<LedgerRanges>();
        tpm.readTopicPersistenceInfo(topic, new Callback<Versioned<LedgerRanges>>() {
            @Override
            public void operationFinished(Object ctx, Versioned<LedgerRanges> result) {
                if (null == result) {
                    syncObj.success(null);
                } else {
                    syncObj.success(result.getValue());
                }
            }
            @Override
            public void operationFailed(Object ctx, PubSubException pse) {
                syncObj.fail(pse);
            }
        }, syncObj);

        syncObj.block();

        if (!syncObj.isSuccess()) {
            throw syncObj.exception;
        }

        LedgerRanges ranges = syncObj.value;
        if (null == ranges) {
            return null;
        }
        List<LedgerRange> results = new ArrayList<LedgerRange>();
        List<LedgerRange> lrs = ranges.getRangesList();
        long startSeqId = 1L;
        if (!lrs.isEmpty()) {
            LedgerRange range = lrs.get(0);
            if (!range.hasStartSeqIdIncluded() && range.hasEndSeqIdIncluded()) {
                long ledgerId = range.getLedgerId();
                try {
                    LedgerHandle lh = bk.openLedgerNoRecovery(ledgerId, DigestType.CRC32, passwd);
                    long numEntries = lh.readLastConfirmed() + 1;
                    long endOfLedger = range.getEndSeqIdIncluded().getLocalComponent();
                    startSeqId = endOfLedger - numEntries + 1;
                } catch (BKException.BKNoSuchLedgerExistsException be) {
                    // ignore it
                }
            }
        }
        Iterator<LedgerRange> lrIter = lrs.iterator();
        while (lrIter.hasNext()) {
            LedgerRange range = lrIter.next();
            if (range.hasEndSeqIdIncluded()) {
                long endOfLedger = range.getEndSeqIdIncluded().getLocalComponent();
                if (range.hasStartSeqIdIncluded()) {
                    startSeqId = range.getStartSeqIdIncluded();
                } else {
                    range = buildLedgerRange(range.getLedgerId(), startSeqId, range.getEndSeqIdIncluded());
                }
                results.add(range);
                if (startSeqId < endOfLedger + 1) {
                    startSeqId = endOfLedger + 1;
                }
                continue;
            }
            if (lrIter.hasNext()) {
                throw new IllegalStateException("Ledger " + range.getLedgerId() + " for topic " + topic.toString()
                                                + " is not the last one but still does not have an end seq-id");
            }

            if (range.hasStartSeqIdIncluded()) {
                startSeqId = range.getStartSeqIdIncluded();
            }

            LedgerHandle lh = bk.openLedgerNoRecovery(range.getLedgerId(), DigestType.CRC32, passwd);
            long endOfLedger = startSeqId + lh.readLastConfirmed();
            MessageSeqId endSeqId = MessageSeqId.newBuilder().setLocalComponent(endOfLedger).build();
            results.add(buildLedgerRange(range.getLedgerId(), startSeqId, endSeqId));
        }
        return results;
    }

    /**
     * Return subscriptions of a topic
     *
     * @param topic
     *          Topic name
     * @return subscriptions of a topic
     * @throws Exception
     */
    public Map<ByteString, SubscriptionData> getTopicSubscriptions(ByteString topic)
        throws Exception {

        final SyncObj<Map<ByteString, SubscriptionData>> syncObj =
            new SyncObj<Map<ByteString, SubscriptionData>>();
        sdm.readSubscriptions(topic, new Callback<Map<ByteString, Versioned<SubscriptionData>>>() {
            @Override
            public void operationFinished(Object ctx, Map<ByteString, Versioned<SubscriptionData>> result) {
                // It was just used to console tool to print some information, so don't need to return version for it
                // just keep the getTopicSubscriptions interface as before
                Map<ByteString, SubscriptionData> subs = new ConcurrentHashMap<ByteString, SubscriptionData>();
                for (Map.Entry<ByteString, Versioned<SubscriptionData>> subEntry : result.entrySet()) {
                    subs.put(subEntry.getKey(), subEntry.getValue().getValue());
                }
                syncObj.success(subs);
            }
            @Override
            public void operationFailed(Object ctx, PubSubException pse) {
                syncObj.fail(pse);
            }
        }, syncObj);

        syncObj.block();

        if (!syncObj.isSuccess()) {
            throw syncObj.exception;
        }

        return syncObj.value;
    }

    /**
     * Return subscription state of a subscriber of topic
     *
     * @param topic
     *          Topic name
     * @param subscriber
     *          Subscriber name
     * @return subscription state
     * @throws Exception
     */
    public SubscriptionData getSubscription(ByteString topic, ByteString subscriber) throws Exception {
        final SyncObj<SubscriptionData> syncObj = new SyncObj<SubscriptionData>();
        sdm.readSubscriptionData(topic, subscriber, new Callback<Versioned<SubscriptionData>>() {
            @Override
            public void operationFinished(Object ctx, Versioned<SubscriptionData> result) {
                if (null == result) {
                    syncObj.success(null);
                } else {
                    syncObj.success(result.getValue());
                }
            }
            @Override
            public void operationFailed(Object ctx, PubSubException pse) {
                syncObj.fail(pse);
            }
        }, syncObj);

        syncObj.block();

        if (!syncObj.isSuccess()) {
            throw syncObj.exception;
        }

        return syncObj.value;
    }

    /**
     * Format metadata for Hedwig.
     */
    public void format() throws Exception {
        // format metadata first
        mmFactory.format(serverConf, zk);
        LOG.info("Formatted Hedwig metadata successfully.");
        // remove metadata layout
        FactoryLayout.deleteLayout(zk, serverConf);
        LOG.info("Removed old factory layout.");
        // create new metadata manager factory and write new metadata layout
        MetadataManagerFactory.createMetadataManagerFactory(serverConf, zk,
            serverConf.getMetadataManagerFactoryClass());
        LOG.info("Created new factory layout.");
    }
}
TOP

Related Classes of org.apache.hedwig.admin.HedwigAdmin$HubStats

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.