Package org.apache.hedwig.server.subscriptions

Source Code of org.apache.hedwig.server.subscriptions.ZkSubscriptionManager

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hedwig.server.subscriptions;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.hedwig.exceptions.PubSubException;
import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionState;
import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
import org.apache.hedwig.server.common.ServerConfiguration;
import org.apache.hedwig.server.persistence.PersistenceManager;
import org.apache.hedwig.server.topics.TopicManager;
import org.apache.hedwig.util.Callback;
import org.apache.hedwig.zookeeper.SafeAsyncZKCallback;
import org.apache.hedwig.zookeeper.ZkUtils;

public class ZkSubscriptionManager extends AbstractSubscriptionManager {

    ZooKeeper zk;

    protected final static Logger logger = LoggerFactory.getLogger(ZkSubscriptionManager.class);

    public ZkSubscriptionManager(ZooKeeper zk, TopicManager topicMgr, PersistenceManager pm, ServerConfiguration cfg,
                                 ScheduledExecutorService scheduler) {
        super(cfg, topicMgr, pm, scheduler);
        this.zk = zk;
    }

    private StringBuilder topicSubscribersPath(StringBuilder sb, ByteString topic) {
        return cfg.getZkTopicPath(sb, topic).append("/subscribers");
    }

    private String topicSubscriberPath(ByteString topic, ByteString subscriber) {
        return topicSubscribersPath(new StringBuilder(), topic).append("/").append(subscriber.toStringUtf8())
               .toString();
    }

    @Override
    protected void readSubscriptions(final ByteString topic,
                                     final Callback<Map<ByteString, InMemorySubscriptionState>> cb, final Object ctx) {

        String topicSubscribersPath = topicSubscribersPath(new StringBuilder(), topic).toString();
        zk.getChildren(topicSubscribersPath, false, new SafeAsyncZKCallback.ChildrenCallback() {
            @Override
            public void safeProcessResult(int rc, String path, final Object ctx, final List<String> children) {

                if (rc != Code.OK.intValue() && rc != Code.NONODE.intValue()) {
                    KeeperException e = ZkUtils.logErrorAndCreateZKException("Could not read subscribers for topic "
                                        + topic.toStringUtf8(), path, rc);
                    cb.operationFailed(ctx, new PubSubException.ServiceDownException(e));
                    return;
                }

                final Map<ByteString, InMemorySubscriptionState> topicSubs = new ConcurrentHashMap<ByteString, InMemorySubscriptionState>();

                if (rc == Code.NONODE.intValue() || children.size() == 0) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("No subscriptions found while acquiring topic: " + topic.toStringUtf8());
                    }
                    cb.operationFinished(ctx, topicSubs);
                    return;
                }

                final AtomicBoolean failed = new AtomicBoolean();
                final AtomicInteger count = new AtomicInteger();

                for (final String child : children) {

                    final ByteString subscriberId = ByteString.copyFromUtf8(child);
                    final String childPath = path + "/" + child;

                    zk.getData(childPath, false, new SafeAsyncZKCallback.DataCallback() {
                        @Override
                        public void safeProcessResult(int rc, String path, Object ctx, byte[] data, Stat stat) {

                            if (rc != Code.OK.intValue()) {
                                KeeperException e = ZkUtils.logErrorAndCreateZKException(
                                                        "Could not read subscription data for topic: " + topic.toStringUtf8()
                                                        + ", subscriberId: " + subscriberId.toStringUtf8(), path, rc);
                                reportFailure(new PubSubException.ServiceDownException(e));
                                return;
                            }

                            if (failed.get()) {
                                return;
                            }

                            SubscriptionState state;

                            try {
                                state = SubscriptionState.parseFrom(data);
                            } catch (InvalidProtocolBufferException ex) {
                                String msg = "Failed to deserialize state for topic: " + topic.toStringUtf8()
                                             + " subscriberId: " + subscriberId.toStringUtf8();
                                logger.error(msg, ex);
                                reportFailure(new PubSubException.UnexpectedConditionException(msg));
                                return;
                            }

                            if (logger.isDebugEnabled()) {
                                logger.debug("Found subscription while acquiring topic: " + topic.toStringUtf8()
                                             + " subscriberId: " + child + "state: "
                                             + SubscriptionStateUtils.toString(state));
                            }

                            topicSubs.put(subscriberId, new InMemorySubscriptionState(state));
                            if (count.incrementAndGet() == children.size()) {
                                assert topicSubs.size() == count.get();
                                cb.operationFinished(ctx, topicSubs);
                            }
                        }

                        private void reportFailure(PubSubException e) {
                            if (failed.compareAndSet(false, true))
                                cb.operationFailed(ctx, e);
                        }
                    }, ctx);
                }
            }
        }, ctx);
    }

    @Override
    protected void createSubscriptionState(final ByteString topic, final ByteString subscriberId,
                                           final SubscriptionState state, final Callback<Void> callback, final Object ctx) {
        ZkUtils.createFullPathOptimistic(zk, topicSubscriberPath(topic, subscriberId), state.toByteArray(),
        Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, new SafeAsyncZKCallback.StringCallback() {

            @Override
            public void safeProcessResult(int rc, String path, Object ctx, String name) {
                if (rc == Code.OK.intValue()) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Successfully recorded subscription for topic: " + topic.toStringUtf8()
                                     + " subscriberId: " + subscriberId.toStringUtf8() + " state: "
                                     + SubscriptionStateUtils.toString(state));
                    }
                    callback.operationFinished(ctx, null);
                } else {
                    KeeperException ke = ZkUtils.logErrorAndCreateZKException(
                                             "Could not record new subscription for topic: " + topic.toStringUtf8()
                                             + " subscriberId: " + subscriberId.toStringUtf8(), path, rc);
                    callback.operationFailed(ctx, new PubSubException.ServiceDownException(ke));
                }
            }
        }, ctx);
    }

    @Override
    protected void updateSubscriptionState(final ByteString topic, final ByteString subscriberId,
                                           final SubscriptionState state, final Callback<Void> callback, final Object ctx) {
        zk.setData(topicSubscriberPath(topic, subscriberId), state.toByteArray(), -1,
        new SafeAsyncZKCallback.StatCallback() {
            @Override
            public void safeProcessResult(int rc, String path, Object ctx, Stat stat) {
                if (rc != Code.OK.intValue()) {
                    KeeperException e = ZkUtils.logErrorAndCreateZKException("Topic: " + topic.toStringUtf8()
                                        + " subscriberId: " + subscriberId.toStringUtf8()
                                        + " could not set subscription state: " + SubscriptionStateUtils.toString(state),
                                        path, rc);
                    callback.operationFailed(ctx, new PubSubException.ServiceDownException(e));
                } else {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Successfully updated subscription for topic: " + topic.toStringUtf8()
                                     + " subscriberId: " + subscriberId.toStringUtf8() + " state: "
                                     + SubscriptionStateUtils.toString(state));
                    }

                    callback.operationFinished(ctx, null);
                }
            }
        }, ctx);
    }

    @Override
    protected void deleteSubscriptionState(final ByteString topic, final ByteString subscriberId,
                                           final Callback<Void> callback, final Object ctx) {
        zk.delete(topicSubscriberPath(topic, subscriberId), -1, new SafeAsyncZKCallback.VoidCallback() {
            @Override
            public void safeProcessResult(int rc, String path, Object ctx) {
                if (rc == Code.OK.intValue()) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Successfully deleted subscription for topic: " + topic.toStringUtf8()
                                     + " subscriberId: " + subscriberId.toStringUtf8());
                    }

                    callback.operationFinished(ctx, null);
                    return;
                }

                KeeperException e = ZkUtils.logErrorAndCreateZKException("Topic: " + topic.toStringUtf8()
                                    + " subscriberId: " + subscriberId.toStringUtf8() + " failed to delete subscription", path, rc);
                callback.operationFailed(ctx, new PubSubException.ServiceDownException(e));
            }
        }, ctx);
    }

}
TOP

Related Classes of org.apache.hedwig.server.subscriptions.ZkSubscriptionManager

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.