/*=============================================================================*
* Copyright 2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*=============================================================================*/
package org.apache.muse.ws.notification.impl;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.logging.Logger;
import org.apache.muse.core.AbstractCapability;
import org.apache.muse.core.serializer.SerializerRegistry;
import org.apache.muse.util.LoggingUtils;
import org.apache.muse.ws.addressing.soap.SoapFault;
import org.apache.muse.ws.notification.NotificationConsumer;
import org.apache.muse.ws.notification.NotificationMessage;
import org.apache.muse.ws.notification.NotificationMessageListener;
/**
*
* SimpleNotificationConsumer is Muse's default implementation of the
* WS-Notification NotificationConsumer port type and the listener API
* that augments it.
*
* @author Dan Jemiolo (danj)
*
*/
public class SimpleNotificationConsumer
extends AbstractCapability implements NotificationConsumer
{
//
// Event consumers, ordered by WS-N SimpleTopic
//
private Collection _messageListeners = new ArrayList();
public void addMessageListener(NotificationMessageListener listener)
{
_messageListeners.add(listener);
}
public Collection getMessageListeners()
{
return Collections.unmodifiableCollection(_messageListeners);
}
public void initialize()
throws SoapFault
{
super.initialize();
//
// add NotificationMessage to the collection of serializable types in
// case the user forgot to put it in muse.xml
//
SerializerRegistry registry = SerializerRegistry.getInstance();
registry.registerSerializer(NotificationMessage.class, new NotificationMessageSerializer());
}
/**
*
* This implementation spawns a thread to handle the passing of the message
* to the message listeners so that the method can return immediately and
* speed up the notification process for the reporter.
*
*/
public void notify(NotificationMessage[] messages)
{
NotifyThread thread = new NotifyThread(messages);
thread.start();
}
public void removeMessageListener(NotificationMessageListener listener)
{
_messageListeners.remove(listener);
}
/**
*
* NotifyThread is a simple thread that iterates over the collection of
* message listeners and provides the latest message to each of them.
*
* @author Dan Jemiolo (danj)
*
*/
private class NotifyThread extends Thread
{
private NotificationMessage[] _messages = null;
public NotifyThread(NotificationMessage[] messages)
{
_messages = messages;
}
public void run()
{
for (int n = 0; n < _messages.length; ++n)
{
Iterator i = getMessageListeners().iterator();
while (i.hasNext())
{
NotificationMessageListener listener = (NotificationMessageListener)i.next();
try
{
if (listener.accepts(_messages[n]))
listener.process(_messages[n]);
}
catch (Throwable error)
{
Logger log = getLog();
LoggingUtils.logError(log, error);
}
}
}
}
}
}