Package org.apache.qpid.client

Source Code of org.apache.qpid.client.BasicMessageConsumer_0_10

/* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied.  See the License for the
* specific language governing permissions and limitations
* under the License.
package org.apache.qpid.client;

import java.util.Iterator;
import java.util.concurrent.atomic.AtomicBoolean;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQDestination.AddressOption;
import org.apache.qpid.client.message.AMQMessageDelegateFactory;
import org.apache.qpid.client.message.AMQMessageDelegate_0_10;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage_0_10;
import org.apache.qpid.common.ServerPropertyNames;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.jms.Session;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.transport.Acquired;
import org.apache.qpid.transport.MessageCreditUnit;
import org.apache.qpid.transport.Option;
import org.apache.qpid.transport.Range;
import org.apache.qpid.transport.RangeSet;
import org.apache.qpid.transport.RangeSetFactory;
import org.apache.qpid.transport.SessionException;
import org.apache.qpid.transport.TransportException;

* This is a 0.10 message consumer.
public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedMessage_0_10>

     * This class logger
    private final Logger _logger = LoggerFactory.getLogger(getClass());

     * The underlying QpidSession
    private AMQSession_0_10 _0_10session;

     * Indicates whether this consumer receives pre-acquired messages
    private final boolean _preAcquire;

     * Specify whether this consumer is performing a sync receive
    private final AtomicBoolean _syncReceive = new AtomicBoolean(false);
    private String _consumerTagString;
    private final long _capacity;

    /** Flag indicating if the server supports message selectors */
    private final boolean _serverJmsSelectorSupport;

    protected BasicMessageConsumer_0_10(int channelId, AMQConnection connection, AMQDestination destination,
                                        String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory,
                                        AMQSession<?,?> session, FieldTable rawSelector,
                                        int prefetchHigh, int prefetchLow, boolean exclusive,
                                        int acknowledgeMode, boolean browseOnly, boolean autoClose)
            throws JMSException
        super(channelId, connection, destination, messageSelector, noLocal, messageFactory, session, rawSelector,
                prefetchHigh, prefetchLow, exclusive, acknowledgeMode, browseOnly, autoClose);
        _0_10session = (AMQSession_0_10) session;

        _serverJmsSelectorSupport = connection.isSupportedServerFeature(ServerPropertyNames.FEATURE_QPID_JMS_SELECTOR);
        _preAcquire = evaluatePreAcquire(browseOnly, destination, _serverJmsSelectorSupport);

        _capacity = evaluateCapacity(destination);

        // This is due to the Destination carrying the temporary subscription name which is incorrect.
        if (destination.isAddressResolved() && AMQDestination.TOPIC_TYPE == destination.getAddressType())
            boolean namedQueue = destination.getLink() != null && destination.getLink().getName() != null ;
            if (!namedQueue)

    @Override public void setConsumerTag(int consumerTag)
        _consumerTagString = String.valueOf(consumerTag);

    public String getConsumerTagString()
        return _consumerTagString;

     * This is invoked by the session thread when emptying the session message queue.
     * We first check if the message is valid (match the selector) and then deliver it to the
     * message listener or to the sync consumer queue.
     * @param jmsMessage this message has already been processed so can't redo preDeliver
    @Override public void notifyMessage(AbstractJMSMessage jmsMessage)
            if (checkPreConditions(jmsMessage))
                if (isMessageListenerSet() && _capacity == 0)
                _logger.debug("messageOk, trying to notify");
                // if we are synchronously waiting for a message
                // and messages are not pre-fetched we then need to request another one
                if(_capacity == 0)
        catch (AMQException e)
            _logger.error("Receivecd an Exception when receiving message",e);

     * This method is invoked when this consumer is stopped.
     * It tells the broker to stop delivering messages to this consumer.
    @Override void sendCancel() throws AMQException
            getSession().confirmConsumerCancelled(getConsumerTag()); // confirm cancel
        catch (SessionException se)

        AMQException amqe = _0_10session.getCurrentException();
        if (amqe != null)
            throw amqe;

    @Override void notifyMessage(UnprocessedMessage_0_10 messageFrame)

    protected void preDeliver(AbstractJMSMessage jmsMsg)

        if (getAcknowledgeMode() == org.apache.qpid.jms.Session.NO_ACKNOWLEDGE)
            //For 0-10 we need to ensure that all messages are indicated processed in some way to
            //ensure their AMQP command-id is marked completed, and so we must send a completion
            //even for no-ack messages even though there isnt actually an 'acknowledgement' occurring.
            //Add message to the unacked message list to ensure we dont lose record of it before
            //sending a completion of some sort.

    @Override public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(
            AMQMessageDelegateFactory delegateFactory, UnprocessedMessage_0_10 msg) throws Exception
        AMQMessageDelegate_0_10.updateExchangeTypeMapping(msg.getMessageTransfer().getHeader(), ((AMQSession_0_10)getSession()).getQpidSession());
        return getMessageFactory().createMessage(msg.getMessageTransfer());

     * Check whether a message can be delivered to this consumer.
     * @param message The message to be checked.
     * @return true if the message matches the selector and can be acquired, false otherwise.
     * @throws AMQException If the message preConditions cannot be checked due to some internal error.
    private boolean checkPreConditions(AbstractJMSMessage message) throws AMQException
        boolean messageOk = true;
            if (!_serverJmsSelectorSupport && getMessageSelectorFilter() != null)
                messageOk = getMessageSelectorFilter().matches(message);
        catch (Exception e)
            throw new AMQException(AMQConstant.INTERNAL_ERROR, "Error when evaluating message selector", e);

        if (_logger.isDebugEnabled())
            _logger.debug("messageOk " + messageOk);
            _logger.debug("_preAcquire " + _preAcquire);

        if (!messageOk)
            if (_preAcquire)
                // this is the case for topics
                // We need to ack this message
                if (_logger.isDebugEnabled())
                    _logger.debug("filterMessage - trying to ack message");
                if (_logger.isDebugEnabled())
                    _logger.debug("filterMessage - not ack'ing message as not acquired");
        else if (!_preAcquire && !isBrowseOnly())
            // now we need to acquire this message if needed
            // this is the case of queue with a message selector set
            if (_logger.isDebugEnabled())
                _logger.debug("filterMessage - trying to acquire message");
            messageOk = acquireMessage(message);
            _logger.debug("filterMessage - message acquire status : " + messageOk);

        return messageOk;

     * Acknowledge a message
     * @param message The message to be acknowledged
     * @throws AMQException If the message cannot be acquired due to some internal error.
    private void acknowledgeMessage(final AbstractJMSMessage message) throws AMQException
            (Range.newInstance((int) message.getDeliveryTag()),
             getAcknowledgeMode() != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE);

        final AMQException amqe = _0_10session.getCurrentException();
        if (amqe != null)
            throw amqe;

     * Flush an unwanted message. For 0-10 we need to ensure that all messages are indicated
     * processed to ensure their AMQP command-id is marked completed.
     * @param message The unwanted message to be flushed
     * @throws AMQException If the unwanted message cannot be flushed due to some internal error.
    private void flushUnwantedMessage(final AbstractJMSMessage message) throws AMQException
        _0_10session.flushProcessed(Range.newInstance((int) message.getDeliveryTag()),false);

        final AMQException amqe = _0_10session.getCurrentException();
        if (amqe != null)
            throw amqe;

     * Acquire a message
     * @param message The message to be acquired
     * @return true if the message has been acquired, false otherwise.
     * @throws AMQException If the message cannot be acquired due to some internal error.
    private boolean acquireMessage(final AbstractJMSMessage message) throws AMQException
        boolean result = false;

        final Acquired acq = _0_10session.getQpidSession().messageAcquire(Range.newInstance((int)message.getDeliveryTag())).get();

        final RangeSet acquired = acq.getTransfers();
        if (acquired != null && acquired.size() > 0)
            result = true;
        return result;

    private void messageFlow()
                                                  MessageCreditUnit.MESSAGE, 1,

    public void setMessageListener(final MessageListener messageListener) throws JMSException
            if (messageListener != null && _capacity == 0)
            if (messageListener != null && !getSynchronousQueue().isEmpty())
                Iterator messages= getSynchronousQueue().iterator();
                while (messages.hasNext())
                    AbstractJMSMessage message=(AbstractJMSMessage);
                    getSession().rejectMessage(message, true);
        catch(TransportException e)
            throw getSession().toJMSException("Exception while setting message listener:" + e.getMessage(), e);

    public void failedOverPost()
        if (_0_10session.isStarted() && _syncReceive.get())

     * When messages are not prefetched we need to request a message from the
     * broker.
     * Note that if the timeout is too short a message may be queued in _synchronousQueue until
     * this consumer closes or request it.
     * @param l
     * @return
     * @throws InterruptedException
    public Object getMessageFromQueue(long l) throws InterruptedException
        if (_capacity == 0)
        if (_0_10session.isStarted() && _capacity == 0 && getSynchronousQueue().isEmpty())
        Object o = super.getMessageFromQueue(l);
        if (o == null && _0_10session.isStarted())
                (getConsumerTagString(), Option.UNRELIABLE, Option.SYNC);
                (getConsumerTagString(), MessageCreditUnit.BYTE,
                 0xFFFFFFFF, Option.UNRELIABLE);
            if (_capacity > 0)
            o = super.getMessageFromQueue(-1);
        if (_capacity == 0)
        return o;

    void postDeliver(AbstractJMSMessage msg)

        switch (getAcknowledgeMode())
            case Session.SESSION_TRANSACTED:
            case Session.NO_ACKNOWLEDGE:
                if (!getSession().isInRecovery())
                  getSession().acknowledgeMessage(msg.getDeliveryTag(), false);
            case Session.AUTO_ACKNOWLEDGE:
                if (!getSession().isInRecovery() && getSession().getAMQConnection().getSyncAck())
                    ((AMQSession_0_10) getSession()).getQpidSession().sync();

    Message receiveBrowse() throws JMSException
        return receiveNoWait();

    @Override public void rollbackPendingMessages()
        if (getSynchronousQueue().size() > 0)
            RangeSet ranges = RangeSetFactory.createRangeSet();
            Iterator iterator = getSynchronousQueue().iterator();
            while (iterator.hasNext())

                Object o =;
                if (o instanceof AbstractJMSMessage)
                    ranges.add((int) ((AbstractJMSMessage) o).getDeliveryTag());
                    _logger.error("Queue contained a :" + o.getClass()
                                  + " unable to reject as it is not an AbstractJMSMessage. Will be cleared");

            _0_10session.flushProcessed(ranges, false);

    void postSubscription() throws AMQException
        AMQDestination dest = this.getDestination();
        if (dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR)
            if (dest.getDelete() == AddressOption.ALWAYS ||
                dest.getDelete() == AddressOption.RECEIVER )
            // Subscription queue is handled as part of linkDelete method.
            if (!isDurableSubscriber())
                ((AMQSession_0_10) getSession()).deleteSubscriptionQueue(dest);

    long getCapacity()
        return _capacity;

    boolean isPreAcquire()
        return _preAcquire;

    private boolean evaluatePreAcquire(boolean browseOnly, AMQDestination destination, boolean serverJmsSelectorSupport)
        boolean preAcquire;
        if (browseOnly)
            preAcquire = false;
            boolean isQueue = (destination instanceof AMQQueue || getDestination().getAddressType() == AMQDestination.QUEUE_TYPE);
            if (!serverJmsSelectorSupport && isQueue && getMessageSelectorFilter() != null)
                preAcquire = false;
                preAcquire = true;
        return preAcquire;

    private long evaluateCapacity(AMQDestination destination)
        long capacity = 0;
        if (destination.getLink() != null && destination.getLink().getConsumerCapacity() > 0)
            capacity = destination.getLink().getConsumerCapacity();
        else if (getSession().prefetch())
            capacity = getSession().getPrefetch();
        return capacity;


Related Classes of org.apache.qpid.client.BasicMessageConsumer_0_10

Copyright © 2018 All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact