Package org.apache.qpid.management.domain.services

Source Code of org.apache.qpid.management.domain.services.BrokerMessageListener$Log

/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*   http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied.  See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.management.domain.services;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;

import org.apache.qpid.api.Message;
import org.apache.qpid.management.Messages;
import org.apache.qpid.management.Protocol;
import org.apache.qpid.management.domain.handler.base.IMessageHandler;
import org.apache.qpid.management.domain.model.DomainModel;
import org.apache.qpid.nclient.util.MessageListener;
import org.apache.qpid.transport.codec.BBDecoder;
import org.apache.qpid.transport.util.Logger;

/**
* Message listener used for processing incoming messages.
* So it is installed as a consumer on a specific channel and when a new message arrives:
*
* 1) Performs a sanity check on the message (magic number, sequence number)
* 2) Extracts the opcode and looks for one message handler associated with that opcode.
* 3) If a message handler is found the delegates the message processing; otherwise a log message is written to indicate
* that the message will be skipped.
*
* @author Andrea Gazzarini
*/
class BrokerMessageListener implements MessageListener
{
    private  final static Logger LOGGER = Logger.get(BrokerMessageListener.class);
   
    private static class Log
    {           
        // Debugs the content of the incoming message.
        static void debugIncomingMessage(ByteBuffer message)
        {
            if (LOGGER.isDebugEnabled())
            {
                LOGGER.debug(Messages.QMAN_200001_INCOMING_MESSAGE_HAS_BEEN_RECEIVED, Arrays.toString(message.array()));
            }
        }

        // Debugs all the configured handlers.
        static void debugConfiguredHandlers (Map<Character, IMessageHandler> _handlers)
        {
            if (LOGGER.isDebugEnabled())
            {
                for (Entry<Character, IMessageHandler> entry : _handlers.entrySet())
                {
                  LOGGER.debug(Messages.QMAN_200002_OPCODE_HANDLER_ASSOCIATION,entry.getKey(),entry.getValue())
                }
            }
        }
    }
   
    Map<Character, IMessageHandler> _handlers = new HashMap<Character, IMessageHandler>();
    private DomainModel _domainModel;
       
    /**
     * Builds a new message listener with the given  broker domain model.
     *
     * @param model the managed broker domain model.
     */
    BrokerMessageListener(DomainModel model)
    {
        this._domainModel = model;
    }
   
    /**
     * When a new message arrives this method is called.
     * 1) Performs a sanity check on the message (magic number, sequence number)
     * 2) Extracts the opcode and looks for one message handler associated with that opcode.
     * 3) If a message handler is found the delegates the message processing; otherwise a log message is written to indicate
     * that the message will be skipped.
     *
     * @param message the incoming message.
     */
    public void onMessage (Message compoundMessage)
    {
    try
    {
      MessageTokenizer tokenizer = new MessageTokenizer(compoundMessage);
      while (tokenizer.hasMoreElements())
      {
          dispatch(tokenizer.nextElement());
      }
    } catch(IOException exception)
        {
      LOGGER.error(exception,Messages.QMAN_100002_MESSAGE_READ_FAILURE);                       
        } catch(Exception exception)
      {
          LOGGER.error(exception,Messages.QMAN_100003_MESSAGE_PROCESS_FAILURE);           
      }
    }
   
    /**
     * Configures a new handler with this listener.
     * After that, each time a message arrives with the specified opcode, this handler will be responsible for
     * processing.
     * Note that calling this method will switch this listener to a WORKING state.
     *
     * @param opcode the operation code.
     * @param handler the message handler.
     */
    void setHandlers(Map<Character, IMessageHandler> handlers)
    {
        for (Entry<Character, IMessageHandler> entry : handlers.entrySet())
        {
            char opcode = entry.getKey();
            IMessageHandler handler = entry.getValue();
            try
            {
                handler.setDomainModel(_domainModel);
                _handlers.put(opcode, handler);
            } catch(Exception exception) {
                LOGGER.error(exception,Messages.QMAN_100004_HANDLER_INITIALIZATION_FAILURE, opcode);
            }
        }       
    }
   
    /**
     * Dispatches the given message to the appropriate handler.
     *
     * @param message the incoming message.
     * @throws IOException when the message content cannot be read.
     */
    private void dispatch(Message message) throws IOException
    {
       ByteBuffer buffer = message.readData();

         String magicNumber = new String(new byte[] {buffer.get(),buffer.get(),buffer.get()});
         if (!Protocol.MAGIC_NUMBER.equals(magicNumber))
         {
             LOGGER.error(Messages.QMAN_100001_BAD_MAGIC_NUMBER_FAILURE,magicNumber);
             return;
         }
        
         char opcode = (char)buffer.get();
        
         IMessageHandler handler = _handlers.get(opcode);
         if (handler != null)
         {
             BBDecoder decoder = new BBDecoder();
             decoder.init(buffer);
            
             LOGGER.debug(Messages.QMAN_200003_MESSAGE_FORWARDING,opcode,handler);
            
             handler.process(decoder,decoder.readSequenceNo());
         } else
         {
             LOGGER.warn(Messages.QMAN_300001_MESSAGE_DISCARDED,opcode);
             Log.debugConfiguredHandlers(_handlers);
         }
    }
}
TOP

Related Classes of org.apache.qpid.management.domain.services.BrokerMessageListener$Log

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.