Package org.apache.hedwig.client.netty

Source Code of org.apache.hedwig.client.netty.HedwigSubscriber

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hedwig.client.netty;

import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.protobuf.ByteString;

import org.apache.hedwig.client.api.MessageHandler;
import org.apache.hedwig.client.api.Subscriber;
import org.apache.hedwig.client.conf.ClientConfiguration;
import org.apache.hedwig.client.data.PubSubData;
import org.apache.hedwig.client.data.TopicSubscriber;
import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
import org.apache.hedwig.client.exceptions.InvalidSubscriberIdException;
import org.apache.hedwig.client.handlers.PubSubCallback;
import org.apache.hedwig.client.handlers.SubscribeResponseHandler;
import org.apache.hedwig.filter.ClientMessageFilter;
import org.apache.hedwig.exceptions.PubSubException;
import org.apache.hedwig.exceptions.PubSubException.ClientAlreadySubscribedException;
import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
import org.apache.hedwig.exceptions.PubSubException.CouldNotConnectException;
import org.apache.hedwig.exceptions.PubSubException.ServiceDownException;
import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody;
import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions;
import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
import org.apache.hedwig.util.Callback;
import org.apache.hedwig.util.SubscriptionListener;

/**
* This is the Hedwig Netty specific implementation of the Subscriber interface.
*
*/
public class HedwigSubscriber implements Subscriber {

    private static Logger logger = LoggerFactory.getLogger(HedwigSubscriber.class);

    protected final ClientConfiguration cfg;
    protected final HChannelManager channelManager;

    public HedwigSubscriber(HedwigClientImpl client) {
        this.cfg = client.getConfiguration();
        this.channelManager = client.getHChannelManager();
    }

    public void addSubscriptionListener(SubscriptionListener listener) {
        channelManager.getSubscriptionEventEmitter()
                      .addSubscriptionListener(listener);
    }

    public void removeSubscriptionListener(SubscriptionListener listener) {
        channelManager.getSubscriptionEventEmitter()
                      .removeSubscriptionListener(listener);
    }

    // Private method that holds the common logic for doing synchronous
    // Subscribe or Unsubscribe requests. This is for code reuse since these
    // two flows are very similar. The assumption is that the input
    // OperationType is either SUBSCRIBE or UNSUBSCRIBE.
    private void subUnsub(ByteString topic, ByteString subscriberId, OperationType operationType,
                          SubscriptionOptions options)
            throws CouldNotConnectException, ClientAlreadySubscribedException,
        ClientNotSubscribedException, ServiceDownException {
        if (logger.isDebugEnabled()) {
            StringBuilder debugMsg = new StringBuilder().append("Calling a sync subUnsub request for topic: ")
                                     .append(topic.toStringUtf8()).append(", subscriberId: ")
                                     .append(subscriberId.toStringUtf8()).append(", operationType: ")
                                     .append(operationType);
            if (null != options) {
                debugMsg.append(", createOrAttach: ").append(options.getCreateOrAttach())
                        .append(", messageBound: ").append(options.getMessageBound());
            }
            logger.debug(debugMsg.toString());
        }
        PubSubData pubSubData = new PubSubData(topic, null, subscriberId, operationType, options, null, null);
        synchronized (pubSubData) {
            PubSubCallback pubSubCallback = new PubSubCallback(pubSubData);
            asyncSubUnsub(topic, subscriberId, pubSubCallback, null, operationType, options);
            try {
                while (!pubSubData.isDone)
                    pubSubData.wait();
            } catch (InterruptedException e) {
                throw new ServiceDownException("Interrupted Exception while waiting for async subUnsub call");
            }
            // Check from the PubSubCallback if it was successful or not.
            if (!pubSubCallback.getIsCallSuccessful()) {
                // See what the exception was that was thrown when the operation
                // failed.
                PubSubException failureException = pubSubCallback.getFailureException();
                if (failureException == null) {
                    // This should not happen as the operation failed but a null
                    // PubSubException was passed. Log a warning message but
                    // throw a generic ServiceDownException.
                    logger.error("Sync SubUnsub operation failed but no PubSubException was passed!");
                    throw new ServiceDownException("Server ack response to SubUnsub request is not successful");
                }
                // For the expected exceptions that could occur, just rethrow
                // them.
                else if (failureException instanceof CouldNotConnectException)
                    throw (CouldNotConnectException) failureException;
                else if (failureException instanceof ClientAlreadySubscribedException)
                    throw (ClientAlreadySubscribedException) failureException;
                else if (failureException instanceof ClientNotSubscribedException)
                    throw (ClientNotSubscribedException) failureException;
                else if (failureException instanceof ServiceDownException)
                    throw (ServiceDownException) failureException;
                else {
                    logger.error("Unexpected PubSubException thrown: ", failureException);
                    // Throw a generic ServiceDownException but wrap the
                    // original PubSubException within it.
                    throw new ServiceDownException(failureException);
                }
            }
        }
    }

    // Private method that holds the common logic for doing asynchronous
    // Subscribe or Unsubscribe requests. This is for code reuse since these two
    // flows are very similar. The assumption is that the input OperationType is
    // either SUBSCRIBE or UNSUBSCRIBE.
    private void asyncSubUnsub(ByteString topic, ByteString subscriberId,
                               Callback<ResponseBody> callback, Object context,
                               OperationType operationType, SubscriptionOptions options) {
        if (logger.isDebugEnabled()) {
            StringBuilder debugMsg = new StringBuilder().append("Calling a async subUnsub request for topic: ")
                                     .append(topic.toStringUtf8()).append(", subscriberId: ")
                                     .append(subscriberId.toStringUtf8()).append(", operationType: ")
                                     .append(operationType);
            if (null != options) {
                debugMsg.append(", createOrAttach: ").append(options.getCreateOrAttach())
                        .append(", messageBound: ").append(options.getMessageBound());
            }
            logger.debug(debugMsg.toString());
        }
        if (OperationType.SUBSCRIBE.equals(operationType)) {
            if (options.getMessageBound() <= 0 &&
                cfg.getSubscriptionMessageBound() > 0) {
                SubscriptionOptions.Builder soBuilder =
                    SubscriptionOptions.newBuilder(options).setMessageBound(
                        cfg.getSubscriptionMessageBound());
                options = soBuilder.build();
            }
        }
        PubSubData pubSubData = new PubSubData(topic, null, subscriberId, operationType,
                                               options, callback, context);
        channelManager.submitOp(pubSubData);
    }

    public void subscribe(ByteString topic, ByteString subscriberId, CreateOrAttach mode)
            throws CouldNotConnectException, ClientAlreadySubscribedException, ServiceDownException,
        InvalidSubscriberIdException {
        SubscriptionOptions options = SubscriptionOptions.newBuilder().setCreateOrAttach(mode).build();
        subscribe(topic, subscriberId, options, false);
    }

    public void subscribe(ByteString topic, ByteString subscriberId, SubscriptionOptions options)
            throws CouldNotConnectException, ClientAlreadySubscribedException, ServiceDownException,
         InvalidSubscriberIdException {
        subscribe(topic, subscriberId, options, false);
    }

    protected void subscribe(ByteString topic, ByteString subscriberId, SubscriptionOptions options, boolean isHub)
            throws CouldNotConnectException, ClientAlreadySubscribedException, ServiceDownException,
        InvalidSubscriberIdException {
        // Validate that the format of the subscriberId is valid either as a
        // local or hub subscriber.
        if (!isValidSubscriberId(subscriberId, isHub)) {
            throw new InvalidSubscriberIdException("SubscriberId passed is not valid: " + subscriberId.toStringUtf8()
                                                   + ", isHub: " + isHub);
        }
        try {
            subUnsub(topic, subscriberId, OperationType.SUBSCRIBE, options);
        } catch (ClientNotSubscribedException e) {
            logger.error("Unexpected Exception thrown: ", e);
            // This exception should never be thrown here. But just in case,
            // throw a generic ServiceDownException but wrap the original
            // Exception within it.
            throw new ServiceDownException(e);
        }
    }

    public void asyncSubscribe(ByteString topic, ByteString subscriberId, CreateOrAttach mode, Callback<Void> callback,
                               Object context) {
        SubscriptionOptions options = SubscriptionOptions.newBuilder().setCreateOrAttach(mode).build();
        asyncSubscribe(topic, subscriberId, options, callback, context, false);
    }

    public void asyncSubscribe(ByteString topic, ByteString subscriberId, SubscriptionOptions options,
                               Callback<Void> callback, Object context) {
        asyncSubscribe(topic, subscriberId, options, callback, context, false);
    }

    protected void asyncSubscribe(ByteString topic, ByteString subscriberId,
                                  SubscriptionOptions options,
                                  Callback<Void> callback, Object context, boolean isHub) {
        // Validate that the format of the subscriberId is valid either as a
        // local or hub subscriber.
        if (!isValidSubscriberId(subscriberId, isHub)) {
            callback.operationFailed(context, new ServiceDownException(new InvalidSubscriberIdException(
                                         "SubscriberId passed is not valid: " + subscriberId.toStringUtf8() + ", isHub: " + isHub)));
            return;
        }
        asyncSubUnsub(topic, subscriberId,
                      new VoidCallbackAdapter<ResponseBody>(callback), context,
                      OperationType.SUBSCRIBE, options);
    }

    public void unsubscribe(ByteString topic, ByteString subscriberId) throws CouldNotConnectException,
        ClientNotSubscribedException, ServiceDownException, InvalidSubscriberIdException {
        unsubscribe(topic, subscriberId, false);
    }

    protected void unsubscribe(ByteString topic, ByteString subscriberId, boolean isHub)
            throws CouldNotConnectException, ClientNotSubscribedException, ServiceDownException,
        InvalidSubscriberIdException {
        // Validate that the format of the subscriberId is valid either as a
        // local or hub subscriber.
        if (!isValidSubscriberId(subscriberId, isHub)) {
            throw new InvalidSubscriberIdException("SubscriberId passed is not valid: " + subscriberId.toStringUtf8()
                                                   + ", isHub: " + isHub);
        }
        // Synchronously close the subscription on the client side. Even
        // if the unsubscribe request to the server errors out, we won't be
        // delivering messages for this subscription to the client. The client
        // can later retry the unsubscribe request to the server so they are
        // "fully" unsubscribed from the given topic.
        closeSubscription(topic, subscriberId);
        try {
            subUnsub(topic, subscriberId, OperationType.UNSUBSCRIBE, null);
        } catch (ClientAlreadySubscribedException e) {
            logger.error("Unexpected Exception thrown: ", e);
            // This exception should never be thrown here. But just in case,
            // throw a generic ServiceDownException but wrap the original
            // Exception within it.
            throw new ServiceDownException(e);
        }
    }

    public void asyncUnsubscribe(final ByteString topic, final ByteString subscriberId,
                                 final Callback<Void> callback, final Object context) {
        doAsyncUnsubscribe(topic, subscriberId,
                           new VoidCallbackAdapter<ResponseBody>(callback),
                           context, false);
    }

    protected void asyncUnsubscribe(final ByteString topic, final ByteString subscriberId,
                                    final Callback<Void> callback, final Object context, boolean isHub) {
        doAsyncUnsubscribe(topic, subscriberId,
                           new VoidCallbackAdapter<ResponseBody>(callback),
                           context, isHub);
    }

    private void doAsyncUnsubscribe(final ByteString topic, final ByteString subscriberId,
                                    final Callback<ResponseBody> callback,
                                    final Object context, boolean isHub) {
        // Validate that the format of the subscriberId is valid either as a
        // local or hub subscriber.
        if (!isValidSubscriberId(subscriberId, isHub)) {
            callback.operationFailed(context, new ServiceDownException(new InvalidSubscriberIdException(
                                         "SubscriberId passed is not valid: " + subscriberId.toStringUtf8() + ", isHub: " + isHub)));
            return;
        }
        // Asynchronously close the subscription. On the callback to that
        // operation once it completes, post the async unsubscribe request.
        doAsyncCloseSubscription(topic, subscriberId, new Callback<ResponseBody>() {
            @Override
            public void operationFinished(Object ctx, ResponseBody resultOfOperation) {
                asyncSubUnsub(topic, subscriberId, callback, context, OperationType.UNSUBSCRIBE, null);
            }

            @Override
            public void operationFailed(Object ctx, PubSubException exception) {
                callback.operationFailed(context, exception);
            }
        }, null);
    }

    // This is a helper method to determine if a subscriberId is valid as either
    // a hub or local subscriber
    private boolean isValidSubscriberId(ByteString subscriberId, boolean isHub) {
        if ((isHub && !SubscriptionStateUtils.isHubSubscriber(subscriberId))
                || (!isHub && SubscriptionStateUtils.isHubSubscriber(subscriberId)))
            return false;
        else
            return true;
    }

    public void consume(ByteString topic, ByteString subscriberId, MessageSeqId messageSeqId)
            throws ClientNotSubscribedException {
        TopicSubscriber topicSubscriber = new TopicSubscriber(topic, subscriberId);
        logger.debug("Calling consume for {}, messageSeqId: {}.",
                     topicSubscriber, messageSeqId);

        SubscribeResponseHandler subscribeResponseHandler =
            channelManager.getSubscribeResponseHandler(topicSubscriber);
        // Check that this topic subscription on the client side exists.
        if (null == subscribeResponseHandler ||
            !subscribeResponseHandler.hasSubscription(topicSubscriber)) {
            throw new ClientNotSubscribedException(
                "Cannot send consume message since client is not subscribed to topic: "
                + topic.toStringUtf8() + ", subscriberId: " + subscriberId.toStringUtf8());
        }
        // Send the consume message to the server using the same subscribe
        // channel that the topic subscription uses.
        subscribeResponseHandler.consume(topicSubscriber, messageSeqId);
    }

    public boolean hasSubscription(ByteString topic, ByteString subscriberId) throws CouldNotConnectException,
        ServiceDownException {
        // The subscription type of info should be stored on the server end, not
        // the client side. Eventually, the server will have the Subscription
        // Manager part that ties into Zookeeper to manage this info.
        // Commenting out these type of API's related to that here for now until
        // this data is available on the server. Will figure out what the
        // correct way to contact the server to get this info is then.
        // The client side just has soft memory state for client subscription
        // information.
        TopicSubscriber topicSubscriber = new TopicSubscriber(topic, subscriberId);
        SubscribeResponseHandler subscribeResponseHandler =
            channelManager.getSubscribeResponseHandler(topicSubscriber);
        return !(null == subscribeResponseHandler ||
                 !subscribeResponseHandler.hasSubscription(topicSubscriber));
    }

    public List<ByteString> getSubscriptionList(ByteString subscriberId) throws CouldNotConnectException,
        ServiceDownException {
        // Same as the previous hasSubscription method, this data should reside
        // on the server end, not the client side.
        return null;
    }

    public void startDelivery(final ByteString topic, final ByteString subscriberId,
                              MessageHandler messageHandler)
            throws ClientNotSubscribedException, AlreadyStartDeliveryException {
        TopicSubscriber topicSubscriber = new TopicSubscriber(topic, subscriberId);
        logger.debug("Starting delivery for {}.", topicSubscriber);
        channelManager.startDelivery(topicSubscriber, messageHandler);
    }

    public void startDeliveryWithFilter(final ByteString topic, final ByteString subscriberId,
                                        MessageHandler messageHandler,
                                        ClientMessageFilter messageFilter)
            throws ClientNotSubscribedException, AlreadyStartDeliveryException {
        if (null == messageHandler || null == messageFilter) {
            throw new NullPointerException("Null message handler or message filter is       provided.");
        }
        TopicSubscriber topicSubscriber = new TopicSubscriber(topic, subscriberId);
        messageHandler = new FilterableMessageHandler(messageHandler, messageFilter);
        logger.debug("Starting delivery with filter for {}.", topicSubscriber);
        channelManager.startDelivery(topicSubscriber, messageHandler);
    }

    public void stopDelivery(final ByteString topic, final ByteString subscriberId)
    throws ClientNotSubscribedException {
        TopicSubscriber topicSubscriber = new TopicSubscriber(topic, subscriberId);
        logger.debug("Stopping delivery for {}.", topicSubscriber);
        channelManager.stopDelivery(topicSubscriber);
    }

    public void closeSubscription(ByteString topic, ByteString subscriberId) throws ServiceDownException {
        PubSubData pubSubData = new PubSubData(topic, null, subscriberId, null, null, null, null);
        synchronized (pubSubData) {
            PubSubCallback pubSubCallback = new PubSubCallback(pubSubData);
            doAsyncCloseSubscription(topic, subscriberId, pubSubCallback, null);
            try {
                while (!pubSubData.isDone)
                    pubSubData.wait();
            } catch (InterruptedException e) {
                throw new ServiceDownException("Interrupted Exception while waiting for asyncCloseSubscription call");
            }
            // Check from the PubSubCallback if it was successful or not.
            if (!pubSubCallback.getIsCallSuccessful()) {
                throw new ServiceDownException("Exception while trying to close the subscription for topic: "
                                               + topic.toStringUtf8() + ", subscriberId: " + subscriberId.toStringUtf8());
            }
        }
    }

    public void asyncCloseSubscription(final ByteString topic, final ByteString subscriberId,
                                       final Callback<Void> callback, final Object context) {
        doAsyncCloseSubscription(topic, subscriberId,
                                 new VoidCallbackAdapter<ResponseBody>(callback), context);
    }

    private void doAsyncCloseSubscription(final ByteString topic, final ByteString subscriberId,
                                          final Callback<ResponseBody> callback, final Object context) {
        TopicSubscriber topicSubscriber = new TopicSubscriber(topic, subscriberId);
        logger.debug("Stopping delivery for {} before closing subscription.", topicSubscriber);
        // We only stop delivery here not in channel manager
        // Because channelManager#asyncCloseSubscription will called
        // when subscription channel disconnected to clear local subscription
        try {
            channelManager.stopDelivery(topicSubscriber);
        } catch (ClientNotSubscribedException cnse) {
            // it is OK to ignore the exception when closing subscription
        }
        logger.debug("Closing subscription asynchronously for {}.", topicSubscriber);
        channelManager.asyncCloseSubscription(topicSubscriber, callback, context);
    }
}
TOP

Related Classes of org.apache.hedwig.client.netty.HedwigSubscriber

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.