Package org.apache.muse.ws.notification.impl

Source Code of org.apache.muse.ws.notification.impl.SimpleNotificationConsumer$NotifyThread

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied.  See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

package org.apache.muse.ws.notification.impl;

import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;

import javax.xml.namespace.QName;

import org.apache.muse.core.AbstractCapability;
import org.apache.muse.core.routing.MessageHandler;
import org.apache.muse.core.serializer.SerializerRegistry;
import org.apache.muse.util.LoggingUtils;
import org.apache.muse.util.MultiMap;
import org.apache.muse.ws.addressing.soap.SoapFault;
import org.apache.muse.ws.notification.NotificationConsumer;
import org.apache.muse.ws.notification.NotificationMessage;
import org.apache.muse.ws.notification.NotificationMessageListener;
import org.apache.muse.ws.notification.TopicListener;

/**
*
* SimpleNotificationConsumer is Muse's default implementation of the
* WS-Notification NotificationConsumer port type and the listener API
* that augments it.
*
* @author Dan Jemiolo (danj)
*
*/

public class SimpleNotificationConsumer
    extends AbstractCapability implements NotificationConsumer
{   
    //
    // listeners that can be applied to any message
    //
    private Collection _messageListeners = new ArrayList();
   
    //
    // listeners that can be applied to a message with a specific topic
    //
    private Map _topicListeners = new MultiMap();
   
    public void addMessageListener(NotificationMessageListener listener)
    {
        _messageListeners.add(listener);
    }
   
    public void addTopicListener(TopicListener listener)
    {
        _topicListeners.put(listener.getTopic(), listener);
    }
   
    protected MessageHandler createNotifyHandler()
    {
        MessageHandler handler = new NotifyHandler();
        Method method = null;
       
        try
        {
            //
            // can't use ReflectUtils.getFirstMethod() because it might
            // return Object.notify()
            //
            method = getClass().getMethod("notify", new Class[]{ NotificationMessage[].class });
        }
       
        catch (Throwable error)
        {
            throw new RuntimeException(error.getMessage(), error);
        }
       
        handler.setMethod(method);
        return handler;
    }
   
    public Collection getMessageListeners()
    {
        return Collections.unmodifiableCollection(_messageListeners);
    }
   
    public Collection getTopicListeners(QName topic)
    {
        Collection listeners = (Collection)_topicListeners.get(topic);
       
        if (listeners == null)
            return Collections.EMPTY_LIST;
       
        return Collections.unmodifiableCollection(listeners);
    }
   
    public void initialize()
        throws SoapFault
    {
        super.initialize();
       
        //
        // add NotificationMessage to the collection of serializable types in
        // case the user forgot to put it in muse.xml
        //
        SerializerRegistry registry = SerializerRegistry.getInstance();
        registry.registerSerializer(NotificationMessage.class, new NotificationMessageSerializer());
       
        //
        // add message handler for <Notify/>, which has array parameters
        // that are outside our WSDL conventions
        //
        setMessageHandler(createNotifyHandler());
    }
   
    /**
     *
     * This implementation spawns a thread to handle the passing of the message
     * to the message listeners so that the method can return immediately and
     * speed up the notification process for the reporter.
     *
     */
    public void notify(NotificationMessage[] messages)
    {
        NotifyThread thread = new NotifyThread(messages);
        thread.start();
    }
   
    public void removeMessageListener(NotificationMessageListener listener)
    {
        _messageListeners.remove(listener);
    }
   
    public void removeTopicListener(TopicListener listener)
    {
        QName topic = listener.getTopic();
        Collection listeners = (Collection)_topicListeners.get(topic);
        listeners.remove(listener);
    }
   
    /**
     *
     * NotifyThread is a simple thread that iterates over the collection of
     * message listeners and provides the latest message to each of them.
     *
     * @author Dan Jemiolo (danj)
     *
     */
    private class NotifyThread extends Thread
    {
        private NotificationMessage[] _messages = null;
       
        public NotifyThread(NotificationMessage[] messages)
        {
            _messages = messages;
        }
       
        private void processMessageListeners(NotificationMessage message)
        {
            Iterator i = getMessageListeners().iterator();
           
            while (i.hasNext())
            {
                NotificationMessageListener listener = (NotificationMessageListener)i.next();
               
                try
                {
                    if (listener.accepts(message))
                        listener.process(message);
                }
               
                catch (Throwable error)
                {
                    LoggingUtils.logError(getLog(), error);
                }
            }
        }
       
        private void processTopicListeners(NotificationMessage message)
        {
            QName topic = message.getTopic();
            Iterator i = getTopicListeners(topic).iterator();
           
            while (i.hasNext())
            {
                NotificationMessageListener listener = (NotificationMessageListener)i.next();
               
                try
                {
                    //
                    // don't call accepts() - we assume that all listeners
                    // added for the topic want the message and require no
                    // further analysis
                    //
                    listener.process(message);
                }
               
                catch (Throwable error)
                {
                    LoggingUtils.logError(getLog(), error);
                }
            }
        }
       
        public void run()
        {
            for (int n = 0; n < _messages.length; ++n)
            {
                //
                // if a topic is available, pass it along to the
                // topic listeners
                //
                QName topic = _messages[n].getTopic();
                Collection topicListeners = getTopicListeners(topic);
               
                //
                // our check is if there are zero topic listeners, not
                // whether a topic merely exists in the message. this
                // means that pre-2.2 users who created all listeners
                // as message listeners will not experience a change
                // in behavior for messages with topics in them
                //
                if (!topicListeners.isEmpty())
                    processTopicListeners(_messages[n]);
               
                else
                    processMessageListeners(_messages[n]);
            }
        }
    }
}
TOP

Related Classes of org.apache.muse.ws.notification.impl.SimpleNotificationConsumer$NotifyThread

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.