/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.muse.ws.notification.impl;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import javax.xml.namespace.QName;
import org.apache.muse.core.AbstractCapability;
import org.apache.muse.core.routing.MessageHandler;
import org.apache.muse.core.serializer.SerializerRegistry;
import org.apache.muse.util.LoggingUtils;
import org.apache.muse.util.MultiMap;
import org.apache.muse.ws.addressing.soap.SoapFault;
import org.apache.muse.ws.notification.NotificationConsumer;
import org.apache.muse.ws.notification.NotificationMessage;
import org.apache.muse.ws.notification.NotificationMessageListener;
import org.apache.muse.ws.notification.TopicListener;
/**
*
* SimpleNotificationConsumer is Muse's default implementation of the
* WS-Notification NotificationConsumer port type and the listener API
* that augments it.
*
* @author Dan Jemiolo (danj)
*
*/
public class SimpleNotificationConsumer
extends AbstractCapability implements NotificationConsumer
{
//
// listeners that can be applied to any message
//
private Collection _messageListeners = new ArrayList();
//
// listeners that can be applied to a message with a specific topic
//
private Map _topicListeners = new MultiMap();
public void addMessageListener(NotificationMessageListener listener)
{
_messageListeners.add(listener);
}
public void addTopicListener(TopicListener listener)
{
_topicListeners.put(listener.getTopic(), listener);
}
protected MessageHandler createNotifyHandler()
{
MessageHandler handler = new NotifyHandler();
Method method = null;
try
{
//
// can't use ReflectUtils.getFirstMethod() because it might
// return Object.notify()
//
method = getClass().getMethod("notify", new Class[]{ NotificationMessage[].class });
}
catch (Throwable error)
{
throw new RuntimeException(error.getMessage(), error);
}
handler.setMethod(method);
return handler;
}
public Collection getMessageListeners()
{
return Collections.unmodifiableCollection(_messageListeners);
}
public Collection getTopicListeners(QName topic)
{
Collection listeners = (Collection)_topicListeners.get(topic);
if (listeners == null)
return Collections.EMPTY_LIST;
return Collections.unmodifiableCollection(listeners);
}
public void initialize()
throws SoapFault
{
super.initialize();
//
// add NotificationMessage to the collection of serializable types in
// case the user forgot to put it in muse.xml
//
SerializerRegistry registry = SerializerRegistry.getInstance();
registry.registerSerializer(NotificationMessage.class, new NotificationMessageSerializer());
//
// add message handler for <Notify/>, which has array parameters
// that are outside our WSDL conventions
//
setMessageHandler(createNotifyHandler());
}
/**
*
* This implementation spawns a thread to handle the passing of the message
* to the message listeners so that the method can return immediately and
* speed up the notification process for the reporter.
*
*/
public void notify(NotificationMessage[] messages)
{
NotifyThread thread = new NotifyThread(messages);
thread.start();
}
public void removeMessageListener(NotificationMessageListener listener)
{
_messageListeners.remove(listener);
}
public void removeTopicListener(TopicListener listener)
{
QName topic = listener.getTopic();
Collection listeners = (Collection)_topicListeners.get(topic);
listeners.remove(listener);
}
/**
*
* NotifyThread is a simple thread that iterates over the collection of
* message listeners and provides the latest message to each of them.
*
* @author Dan Jemiolo (danj)
*
*/
private class NotifyThread extends Thread
{
private NotificationMessage[] _messages = null;
public NotifyThread(NotificationMessage[] messages)
{
_messages = messages;
}
private void processMessageListeners(NotificationMessage message)
{
Iterator i = getMessageListeners().iterator();
while (i.hasNext())
{
NotificationMessageListener listener = (NotificationMessageListener)i.next();
try
{
if (listener.accepts(message))
listener.process(message);
}
catch (Throwable error)
{
LoggingUtils.logError(getLog(), error);
}
}
}
private void processTopicListeners(NotificationMessage message)
{
QName topic = message.getTopic();
Iterator i = getTopicListeners(topic).iterator();
while (i.hasNext())
{
NotificationMessageListener listener = (NotificationMessageListener)i.next();
try
{
//
// don't call accepts() - we assume that all listeners
// added for the topic want the message and require no
// further analysis
//
listener.process(message);
}
catch (Throwable error)
{
LoggingUtils.logError(getLog(), error);
}
}
}
public void run()
{
for (int n = 0; n < _messages.length; ++n)
{
//
// if a topic is available, pass it along to the
// topic listeners
//
QName topic = _messages[n].getTopic();
Collection topicListeners = getTopicListeners(topic);
//
// our check is if there are zero topic listeners, not
// whether a topic merely exists in the message. this
// means that pre-2.2 users who created all listeners
// as message listeners will not experience a change
// in behavior for messages with topics in them
//
if (!topicListeners.isEmpty())
processTopicListeners(_messages[n]);
else
processMessageListeners(_messages[n]);
}
}
}
}