Package org.apache.qpid.server.queue

Source Code of org.apache.qpid.server.queue.IncomingMessage

/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*   http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied.  See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.queue;

import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.exchange.NoRouteException;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.AMQException;
import org.apache.log4j.Logger;

import java.util.ArrayList;
import java.util.Collection;

public class IncomingMessage implements Filterable<RuntimeException>
{

    /** Used for debugging purposes. */
    private static final Logger _logger = Logger.getLogger(IncomingMessage.class);

    private static final boolean SYNCHED_CLOCKS =
            ApplicationRegistry.getInstance().getConfiguration().getBoolean("advanced.synced-clocks", false);

    private final MessagePublishInfo _messagePublishInfo;
    private ContentHeaderBody _contentHeaderBody;
    private AMQMessageHandle _messageHandle;
    private final Long _messageId;
    private final TransactionalContext _txnContext;

    private static final boolean MSG_AUTH =
        ApplicationRegistry.getInstance().getConfiguration().getBoolean("security.msg-auth", false);


    /**
     * Keeps a track of how many bytes we have received in body frames
     */
    private long _bodyLengthReceived = 0;

    /**
     * This is stored during routing, to know the queues to which this message should immediately be
     * delivered. It is <b>cleared after delivery has been attempted</b>. Any persistent record of destinations is done
     * by the message handle.
     */
    private ArrayList<AMQQueue> _destinationQueues;

    private AMQProtocolSession _publisher;
    private MessageStore _messageStore;
    private long _expiration;
   
    private Exchange _exchange;


    public IncomingMessage(final Long messageId,
                           final MessagePublishInfo info,
                           final TransactionalContext txnContext,
                           final AMQProtocolSession publisher)
    {
        _messageId = messageId;
        _messagePublishInfo = info;
        _txnContext = txnContext;
        _publisher = publisher;

    }

    public void setContentHeaderBody(final ContentHeaderBody contentHeaderBody) throws AMQException
    {
        _contentHeaderBody = contentHeaderBody;
    }

    public void setExpiration()
    {
            long expiration =
                    ((BasicContentHeaderProperties) _contentHeaderBody.properties).getExpiration();
            long timestamp =
                    ((BasicContentHeaderProperties) _contentHeaderBody.properties).getTimestamp();

            if (SYNCHED_CLOCKS)
            {
                _expiration = expiration;
            }
            else
            {
                // Update TTL to be in broker time.
                if (expiration != 0L)
                {
                    if (timestamp != 0L)
                    {
                        // todo perhaps use arrival time
                        long diff = (System.currentTimeMillis() - timestamp);

                        if ((diff > 1000L) || (diff < 1000L))
                        {
                            _expiration = expiration + diff;
                        }
                    }
                }
            }

    }

    public void routingComplete(final MessageStore store,
                                final MessageHandleFactory factory) throws AMQException
    {

        final boolean persistent = isPersistent();
        _messageHandle = factory.createMessageHandle(_messageId, store, persistent);
        if (persistent)
        {
            _txnContext.beginTranIfNecessary();
             // enqueuing the messages ensure that if required the destinations are recorded to a
            // persistent store

            if(_destinationQueues != null)
            {
                for (int i = 0; i < _destinationQueues.size(); i++)
                {
                    store.enqueueMessage(_txnContext.getStoreContext(),
                            _destinationQueues.get(i), _messageId);
                }
            }
        }
    }

    public AMQMessage deliverToQueues()
            throws AMQException
    {

        // we get a reference to the destination queues now so that we can clear the
        // transient message data as quickly as possible
        if (_logger.isDebugEnabled())
        {
            _logger.debug("Delivering message " + _messageId + " to " + _destinationQueues);
        }

        AMQMessage message = null;

        try
        {
            // first we allow the handle to know that the message has been fully received. This is useful if it is
            // maintaining any calculated values based on content chunks
            _messageHandle.setPublishAndContentHeaderBody(_txnContext.getStoreContext(),
                                                          _messagePublishInfo, getContentHeaderBody());

           
           
            message = new AMQMessage(_messageHandle,_txnContext.getStoreContext(), _messagePublishInfo);

            message.setExpiration(_expiration);
            message.setClientIdentifier(_publisher.getSessionIdentifier());

            // we then allow the transactional context to do something with the message content
            // now that it has all been received, before we attempt delivery
            _txnContext.messageFullyReceived(isPersistent());
           
            AMQShortString userID = getContentHeaderBody().properties instanceof BasicContentHeaderProperties ?
                     ((BasicContentHeaderProperties) getContentHeaderBody().properties).getUserId() : null;
           
            if (MSG_AUTH && !_publisher.getAuthorizedID().getName().equals(userID == null? "" : userID.toString()))
            {
                throw new UnauthorizedAccessException("Acccess Refused",message);
            }
           
            if ((_destinationQueues == null) || _destinationQueues.size() == 0)
            {

                if (isMandatory() || isImmediate())
                {
                    throw new NoRouteException("No Route for message", message);

                }
                else
                {
                    _logger.warn("MESSAGE DISCARDED: No routes for message - " + message);
                }
            }
            else
            {
                int offset;
                final int queueCount = _destinationQueues.size();
                message.incrementReference(queueCount);
                if(queueCount == 1)
                {
                    offset = 0;
                }
                else
                {
                    offset = ((int)(message.getMessageId().longValue())) % queueCount;
                    if(offset < 0)
                    {
                        offset = -offset;
                    }
                }
                for (int i = offset; i < queueCount; i++)
                {
                    // normal deliver so add this message at the end.
                    _txnContext.deliver(_destinationQueues.get(i), message);
                }
                for (int i = 0; i < offset; i++)
                {
                    // normal deliver so add this message at the end.
                    _txnContext.deliver(_destinationQueues.get(i), message);
                }
            }

            message.clearStoreContext();
            return message;
        }
        finally
        {
            // Remove refence for routing process . Reference count should now == delivered queue count
            if(message != null) message.decrementReference(_txnContext.getStoreContext());
        }

    }

    public void addContentBodyFrame(final ContentChunk contentChunk)
            throws AMQException
    {

        _bodyLengthReceived += contentChunk.getSize();

        _messageHandle.addContentBodyFrame(_txnContext.getStoreContext(), contentChunk, allContentReceived());

    }

    public boolean allContentReceived()
    {
        return (_bodyLengthReceived == getContentHeaderBody().bodySize);
    }

    public AMQShortString getExchange() throws AMQException
    {
        return _messagePublishInfo.getExchange();
    }

    public AMQShortString getRoutingKey() throws AMQException
    {
        return _messagePublishInfo.getRoutingKey();
    }

    public boolean isMandatory() throws AMQException
    {
        return _messagePublishInfo.isMandatory();
    }


    public boolean isImmediate() throws AMQException
    {
        return _messagePublishInfo.isImmediate();
    }

    public ContentHeaderBody getContentHeaderBody()
    {
        return _contentHeaderBody;
    }


    public boolean isPersistent()
    {
        //todo remove literal values to a constant file such as AMQConstants in common
        return getContentHeaderBody().properties instanceof BasicContentHeaderProperties &&
             ((BasicContentHeaderProperties) getContentHeaderBody().properties).getDeliveryMode() == 2;
    }
   
    public boolean isRedelivered()
    {
        return false;
    }

    public void setMessageStore(final MessageStore messageStore)
    {
        _messageStore = messageStore;
    }

    public Long getMessageId()
    {
        return _messageId;
    }

    public void setExchange(final Exchange e)
    {
        _exchange = e;
    }

    public void route() throws AMQException
    {
        _exchange.route(this);
    }

    public void enqueue(final ArrayList<AMQQueue> queues)
    {
        _destinationQueues = queues;
    }
}
TOP

Related Classes of org.apache.qpid.server.queue.IncomingMessage

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.