Package org.apache.qpid.client.protocol

Source Code of org.apache.qpid.client.protocol.AMQProtocolHandler

/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*   http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied.  See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.client.protocol;

import org.apache.log4j.Logger;
import org.apache.mina.common.IdleStatus;
import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoSession;
import org.apache.mina.filter.SSLFilter;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.qpid.AMQConnectionClosedException;
import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.failover.FailoverHandler;
import org.apache.qpid.client.failover.FailoverState;
import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
import org.apache.qpid.codec.AMQCodecFactory;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.ConnectionCloseBody;
import org.apache.qpid.framing.ConnectionCloseOkBody;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.HeartbeatBody;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.ssl.BogusSSLContextFactory;

import java.util.Iterator;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.CountDownLatch;

public class AMQProtocolHandler extends IoHandlerAdapter
{
    private static final Logger _logger = Logger.getLogger(AMQProtocolHandler.class);

    /**
     * The connection that this protocol handler is associated with. There is a 1-1
     * mapping between connection instances and protocol handler instances.
     */
    private AMQConnection _connection;

    /**
     * Used only when determining whether to add the SSL filter or not. This should be made more
     * generic in future since we will potentially have many transport layer options
     */
    private boolean _useSSL;

    /**
     * Our wrapper for a protocol session that provides access to session values
     * in a typesafe manner.
     */
    private volatile AMQProtocolSession _protocolSession;

    private AMQStateManager _stateManager = new AMQStateManager();

    private final CopyOnWriteArraySet _frameListeners = new CopyOnWriteArraySet();

    /**
     * We create the failover handler when the session is created since it needs a reference to the IoSession in order
     * to be able to send errors during failover back to the client application. The session won't be available in the
     * case where we failing over due to a Connection.Redirect message from the broker.
     */
    private FailoverHandler _failoverHandler;

    /**
     * This flag is used to track whether failover is being attempted. It is used to prevent the application constantly
     * attempting failover where it is failing.
     */
    private FailoverState _failoverState = FailoverState.NOT_STARTED;

    private CountDownLatch _failoverLatch;

    public AMQProtocolHandler(AMQConnection con)
    {
        _connection = con;

        // We add a proxy for the state manager so that we can substitute the state manager easily in this class.
        // We substitute the state manager when performing failover
        _frameListeners.add(new AMQMethodListener()
        {
            public boolean methodReceived(AMQMethodEvent evt) throws AMQException
            {
                return _stateManager.methodReceived(evt);
            }

            public void error(Exception e)
            {
                _stateManager.error(e);
            }
        });
    }

    public boolean isUseSSL()
    {
        return _useSSL;
    }

    public void setUseSSL(boolean useSSL)
    {
        _useSSL = useSSL;
    }

    public void sessionCreated(IoSession session) throws Exception
    {
        _logger.debug("Protocol session created for session " + System.identityHashCode(session));
        _failoverHandler = new FailoverHandler(this, session);

        final ProtocolCodecFilter pcf = new ProtocolCodecFilter(new AMQCodecFactory(false));

        if (Boolean.getBoolean("amqj.shared_read_write_pool"))
        {
            session.getFilterChain().addBefore("AsynchronousWriteFilter", "protocolFilter", pcf);
        }
        else
        {
            session.getFilterChain().addLast("protocolFilter", pcf);
        }
        // we only add the SSL filter where we have an SSL connection
        if (_useSSL)
        {
            //FIXME: Bogus context cannot be used in production.
            SSLFilter sslFilter = new SSLFilter(BogusSSLContextFactory.getInstance(false));
            sslFilter.setUseClientMode(true);
            session.getFilterChain().addBefore("protocolFilter", "ssl", sslFilter);
        }

        _protocolSession = new AMQProtocolSession(this, session, _connection);
        _protocolSession.init();
    }

    public void sessionOpened(IoSession session) throws Exception
    {
        System.setProperty("foo", "bar");
    }

    /**
     * When the broker connection dies we can either get sessionClosed() called or exceptionCaught() followed by
     * sessionClosed() depending on whether we were trying to send data at the time of failure.
     *
     * @param session
     * @throws Exception
     */
    public void sessionClosed(IoSession session) throws Exception
    {
        if (_connection.isClosed())
        {
            _logger.info("Session closed called by client");
        }
        else
        {
            _logger.info("Session closed called with failover state currently " + _failoverState);

            //reconnetablility was introduced here so as not to disturb the client as they have made their intentions
            // known through the policy settings.

            if ((_failoverState != FailoverState.IN_PROGRESS) && _connection.failoverAllowed())
            {
                _logger.info("FAILOVER STARTING");
                if (_failoverState == FailoverState.NOT_STARTED)
                {
                    _failoverState = FailoverState.IN_PROGRESS;
                    startFailoverThread();
                }
                else
                {
                    _logger.info("Not starting failover as state currently " + _failoverState);
                }
            }
            else
            {
                _logger.info("Failover not allowed by policy.");

                if (_logger.isDebugEnabled())
                {
                    _logger.debug(_connection.getFailoverPolicy().toString());
                }

                if (_failoverState != FailoverState.IN_PROGRESS)
                {
                    _logger.info("sessionClose() not allowed to failover");
                    _connection.exceptionReceived(
                            new AMQDisconnectedException("Server closed connection and reconnection " +
                                                         "not permitted."));
                }
                else
                {
                    _logger.info("sessionClose() failover in progress");
                }
            }
        }

        _logger.info("Protocol Session [" + this + "] closed");
    }

    /**
     * See {@link FailoverHandler} to see rationale for separate thread.
     */
    private void startFailoverThread()
    {
        Thread failoverThread = new Thread(_failoverHandler);
        failoverThread.setName("Failover");
        // Do not inherit daemon-ness from current thread as this can be a daemon
        // thread such as a AnonymousIoService thread.
        failoverThread.setDaemon(false);
        failoverThread.start();
    }

    public void sessionIdle(IoSession session, IdleStatus status) throws Exception
    {
        _logger.debug("Protocol Session [" + this + ":" + session + "] idle: " + status);
        if (IdleStatus.WRITER_IDLE.equals(status))
        {
            //write heartbeat frame:
            _logger.debug("Sent heartbeat");
            session.write(HeartbeatBody.FRAME);
            HeartbeatDiagnostics.sent();
        }
        else if (IdleStatus.READER_IDLE.equals(status))
        {
            //failover:
            HeartbeatDiagnostics.timeout();
            _logger.warn("Timed out while waiting for heartbeat from peer.");
            session.close();
        }
    }

    public void exceptionCaught(IoSession session, Throwable cause) throws Exception
    {
        if (_failoverState == FailoverState.NOT_STARTED)
        {
            //if (!(cause instanceof AMQUndeliveredException) && (!(cause instanceof AMQAuthenticationException)))
            if (cause instanceof AMQConnectionClosedException)
            {
                _logger.info("Exception caught therefore going to attempt failover: " + cause, cause);
                // this will attemp failover

                sessionClosed(session);
            }
        }
        // we reach this point if failover was attempted and failed therefore we need to let the calling app
        // know since we cannot recover the situation
        else if (_failoverState == FailoverState.FAILED)
        {
            _logger.error("Exception caught by protocol handler: " + cause, cause);
            // we notify the state manager of the error in case we have any clients waiting on a state
            // change. Those "waiters" will be interrupted and can handle the exception
            AMQException amqe = new AMQException("Protocol handler error: " + cause, cause);
            propagateExceptionToWaiters(amqe);
            _connection.exceptionReceived(cause);
        }
    }

    /**
     * There are two cases where we have other threads potentially blocking for events to be handled by this
     * class. These are for the state manager (waiting for a state change) or a frame listener (waiting for a
     * particular type of frame to arrive). When an error occurs we need to notify these waiters so that they can
     * react appropriately.
     *
     * @param e the exception to propagate
     */
    public void propagateExceptionToWaiters(Exception e)
    {
        _stateManager.error(e);
        final Iterator it = _frameListeners.iterator();
        while (it.hasNext())
        {
            final AMQMethodListener ml = (AMQMethodListener) it.next();
            ml.error(e);
        }
    }

    private static int _messageReceivedCount;

    public void messageReceived(IoSession session, Object message) throws Exception
    {

        if (_messageReceivedCount++ % 1000 == 0)
        {
            _logger.debug("Received " + _messageReceivedCount + " protocol messages");
        }
        Iterator it = _frameListeners.iterator();
        AMQFrame frame = (AMQFrame) message;

        HeartbeatDiagnostics.received(frame.bodyFrame instanceof HeartbeatBody);

        if (frame.bodyFrame instanceof AMQMethodBody)
        {
            if (_logger.isDebugEnabled())
            {
                _logger.debug("Method frame received: " + frame);
            }

            final AMQMethodEvent evt = new AMQMethodEvent(frame.channel, (AMQMethodBody) frame.bodyFrame, _protocolSession);
            try
            {
                boolean wasAnyoneInterested = false;
                while (it.hasNext())
                {
                    final AMQMethodListener listener = (AMQMethodListener) it.next();
                    wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested;
                }
                if (!wasAnyoneInterested)
                {
                    throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener.");
                }
            }
            catch (AMQException e)
            {
                it = _frameListeners.iterator();
                while (it.hasNext())
                {
                    final AMQMethodListener listener = (AMQMethodListener) it.next();
                    listener.error(e);
                }
                exceptionCaught(session, e);
            }
        }
        else if (frame.bodyFrame instanceof ContentHeaderBody)
        {
            _protocolSession.messageContentHeaderReceived(frame.channel,
                                                          (ContentHeaderBody) frame.bodyFrame);
        }
        else if (frame.bodyFrame instanceof ContentBody)
        {
            _protocolSession.messageContentBodyReceived(frame.channel,
                                                        (ContentBody) frame.bodyFrame);
        }
        else if (frame.bodyFrame instanceof HeartbeatBody)
        {
            _logger.debug("Received heartbeat");
        }
        _connection.bytesReceived(_protocolSession.getIoSession().getReadBytes());
    }

    private static int _messagesOut;

    public void messageSent(IoSession session, Object message) throws Exception
    {
        if (_messagesOut++ % 1000 == 0)
        {
            _logger.debug("Sent " + _messagesOut + " protocol messages");
        }
        _connection.bytesSent(session.getWrittenBytes());
        if (_logger.isDebugEnabled())
        {
            _logger.debug("Sent frame " + message);
        }
    }

    public void addFrameListener(AMQMethodListener listener)
    {
        _frameListeners.add(listener);
    }

    public void removeFrameListener(AMQMethodListener listener)
    {
        _frameListeners.remove(listener);
    }

    public void attainState(AMQState s) throws AMQException
    {
        _stateManager.attainState(s);
    }

    /**
     * Convenience method that writes a frame to the protocol session. Equivalent
     * to calling getProtocolSession().write().
     *
     * @param frame the frame to write
     */
    public void writeFrame(AMQDataBlock frame)
    {
        _protocolSession.writeFrame(frame);
    }

    public void writeFrame(AMQDataBlock frame, boolean wait)
    {
        _protocolSession.writeFrame(frame, wait);
    }

    /**
     * Convenience method that writes a frame to the protocol session and waits for
     * a particular response. Equivalent to calling getProtocolSession().write() then
     * waiting for the response.
     *
     * @param frame
     * @param listener the blocking listener. Note the calling thread will block.
     */
    private AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame,
                                                            BlockingMethodFrameListener listener)
            throws AMQException
    {
        try
        {
            _frameListeners.add(listener);
            _protocolSession.writeFrame(frame);

            AMQMethodEvent e = listener.blockForFrame();
            return e;
            // When control resumes before this line, a reply will have been received
            // that matches the criteria defined in the blocking listener
        }
        finally
        {
            // If we don't remove the listener then no-one will
            _frameListeners.remove(listener);
        }

    }

    /**
     * More convenient method to write a frame and wait for it's response.
     */
    public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass) throws AMQException
    {
        return writeCommandFrameAndWaitForReply(frame,
                                                new SpecificMethodFrameListener(frame.channel, responseClass));
    }

    /**
     * Convenience method to register an AMQSession with the protocol handler. Registering
     * a session with the protocol handler will ensure that messages are delivered to the
     * consumer(s) on that session.
     *
     * @param channelId the channel id of the session
     * @param session   the session instance.
     */
    public void addSessionByChannel(int channelId, AMQSession session)
    {
        _protocolSession.addSessionByChannel(channelId, session);
    }

    /**
     * Convenience method to deregister an AMQSession with the protocol handler.
     *
     * @param channelId then channel id of the session
     */
    public void removeSessionByChannel(int channelId)
    {
        _protocolSession.removeSessionByChannel(channelId);
    }

    public void closeSession(AMQSession session) throws AMQException
    {
        _protocolSession.closeSession(session);
    }

    public void closeConnection() throws AMQException
    {
        _stateManager.changeState(AMQState.CONNECTION_CLOSING);

        final AMQFrame frame = ConnectionCloseBody.createAMQFrame(
                0, AMQConstant.REPLY_SUCCESS.getCode(), "JMS client is closing the connection.", 0, 0);
        syncWrite(frame, ConnectionCloseOkBody.class);

        _protocolSession.closeProtocolSession();
    }

    /**
     * @return the number of bytes read from this protocol session
     */
    public long getReadBytes()
    {
        return _protocolSession.getIoSession().getReadBytes();
    }

    /**
     * @return the number of bytes written to this protocol session
     */
    public long getWrittenBytes()
    {
        return _protocolSession.getIoSession().getWrittenBytes();
    }

    public void failover(String host, int port)
    {
        _failoverHandler.setHost(host);
        _failoverHandler.setPort(port);
        // see javadoc for FailoverHandler to see rationale for separate thread
        startFailoverThread();
    }

    public void blockUntilNotFailingOver() throws InterruptedException
    {
        if (_failoverLatch != null)
        {
            _failoverLatch.await();
        }
    }

    public String generateQueueName()
    {
        return _protocolSession.generateQueueName();
    }

    public CountDownLatch getFailoverLatch()
    {
        return _failoverLatch;
    }

    public void setFailoverLatch(CountDownLatch failoverLatch)
    {
        _failoverLatch = failoverLatch;
    }

    public AMQConnection getConnection()
    {
        return _connection;
    }

    public AMQStateManager getStateManager()
    {
        return _stateManager;
    }

    public void setStateManager(AMQStateManager stateManager)
    {
        _stateManager = stateManager;
    }

    FailoverState getFailoverState()
    {
        return _failoverState;
    }

    public void setFailoverState(FailoverState failoverState)
    {
        _failoverState = failoverState;
    }
}
TOP

Related Classes of org.apache.qpid.client.protocol.AMQProtocolHandler

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.