/*=============================================================================*
* Copyright 2004 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*=============================================================================*/
package org.apache.ws.notification.topics.impl;
import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.ws.XmlObjectWrapper;
import org.apache.ws.addressing.EndpointReference;
import org.apache.ws.addressing.XmlBeansEndpointReference;
import org.apache.ws.addressing.v2003_03.AddressingConstants;
import org.apache.ws.notification.base.Subscription;
import org.apache.ws.notification.base.v2004_06.BaseNotificationConstants;
import org.apache.ws.notification.topics.Topic;
import org.apache.ws.notification.topics.TopicListener;
import org.apache.ws.pubsub.emitter.EmitterTask;
import org.apache.ws.resource.properties.ResourcePropertySet;
import org.apache.ws.resource.properties.query.InvalidQueryExpressionException;
import org.apache.ws.resource.properties.query.QueryEngine;
import org.apache.ws.resource.properties.query.QueryEvaluationErrorException;
import org.apache.ws.resource.properties.query.QueryExpression;
import org.apache.ws.resource.properties.query.UnknownQueryExpressionDialectException;
import org.apache.ws.resource.properties.query.impl.QueryEngineImpl;
import org.apache.ws.util.JaxpUtils;
import org.apache.ws.util.XmlBeanUtils;
import org.apache.ws.util.thread.NamedThread;
import org.apache.xmlbeans.XmlCursor;
import org.apache.xmlbeans.XmlObject;
import org.apache.xmlbeans.XmlOptions;
import org.oasisOpen.docs.wsn.x2004.x06.wsnWSBaseNotification12Draft01.NotificationMessageHolderType;
import org.oasisOpen.docs.wsn.x2004.x06.wsnWSBaseNotification12Draft01.NotifyDocument;
import org.oasisOpen.docs.wsn.x2004.x06.wsnWSBaseNotification12Draft01.TopicExpressionType;
import org.w3c.dom.Document;
import org.xml.sax.SAXException;
import org.xmlsoap.schemas.ws.x2003.x03.addressing.EndpointReferenceType;
import org.xmlsoap.schemas.ws.x2003.x03.addressing.ReferencePropertiesType;
import javax.xml.parsers.ParserConfigurationException;
import javax.xml.soap.MessageFactory;
import javax.xml.soap.SOAPBody;
import javax.xml.soap.SOAPElement;
import javax.xml.soap.SOAPEnvelope;
import javax.xml.soap.SOAPFactory;
import javax.xml.soap.SOAPHeader;
import javax.xml.soap.SOAPHeaderElement;
import javax.xml.soap.SOAPMessage;
import java.io.IOException;
import java.io.Serializable;
import java.net.URL;
/**
* Topic listener implementation that is associated with a particular {@link Subscription}. Upon a message being
* published to the topic being listened to, a notification will be emitted to the consumer associated with the
* subscription.
*/
public class SubscriptionTopicListener
implements TopicListener,
Serializable
{
private static Log LOG = LogFactory.getLog( SubscriptionTopicListener.class.getName( ) );
// the thread pool used to emit notifications
private static final PooledExecutor EMITTER_POOL;
static
{
EMITTER_POOL = new PooledExecutor( 100 );
// make sure the threads are non-daemon threads so they have time to complete even if the JVM wants to shut down
EMITTER_POOL.setThreadFactory( new NamedThread.ConcurrentThreadFactory( "notifmgr-emitter", false ) );
}
private static final QueryEngine QUERY_ENGINE = new QueryEngineImpl( );
private Subscription m_subscription;
/**
* Construct a listener instance.
*
* @param subscription the subscription which is being wrapped in a listener
*/
public SubscriptionTopicListener( Subscription subscription )
{
m_subscription = subscription;
}
/**
* @return Subscription
*/
public Subscription getSubscription( )
{
return m_subscription;
}
/**
* DOCUMENT_ME
*
* @param topic DOCUMENT_ME
*/
public void topicChanged( Topic topic )
{
if ( m_subscription != null )
{
try
{
notify( topic.getCurrentMessage( ) );
}
catch ( Exception e )
{
LOG.debug( "Notification for topic " + topic + " failed for subscription: " + m_subscription, e );
}
}
}
private void addReferencePropertyHeaders( SOAPHeader header,
EndpointReferenceType consumerEPR )
throws Exception
{
SOAPHeaderElement headerElem;
ReferencePropertiesType props = consumerEPR.getReferenceProperties( );
if ( props != null )
{
XmlCursor cursor = props.newCursor( );
boolean hasAnotherChild = cursor.toFirstChild( );
while ( hasAnotherChild )
{
// TODO: *SJC* the below logic should handle refProps that are complexTypes..
// Best way to do may be to build SOAPMessage as XmlBean and use MessageFactory.createMessage(..) to construct using the InputStream
SOAPElement soapElem = XmlBeanUtils.toSOAPElement( cursor.getObject( ) );
headerElem = header.addHeaderElement( soapElem.getElementName( ) );
headerElem.addTextNode( soapElem.getValue( ) );
hasAnotherChild = cursor.toNextSibling( );
}
cursor.dispose( );
}
}
/*
* Add WS-Addressing headers to a notification.
*
* @param header - The header to which to add the WS-Addressing headers.
* @param consumerEPR - An endpointReferece to the consumer of this notification.
* @throws Exception
*/
private void addWSAHeaders( SOAPHeader header,
EndpointReferenceType consumerEPR )
throws Exception
{
SOAPFactory factory = SOAPFactory.newInstance( );
// TODO: *SJC* this should not be hard-coded to use WSA 2003/03. Once a new version of WSN is implemented we will need to support multiple versions
SOAPHeaderElement headerElem =
header.addHeaderElement( factory.createName( org.apache.ws.addressing.v2003_03.AddressingConstants.TO,
org.apache.ws.addressing.v2003_03.AddressingConstants.NSPREFIX_ADDRESSING_SCHEMA,
org.apache.ws.addressing.v2003_03.AddressingConstants.NSURI_ADDRESSING_SCHEMA ) );
headerElem.addTextNode( consumerEPR.getAddress( ).getStringValue( ) );
headerElem =
header.addHeaderElement( factory.createName( AddressingConstants.ACTION,
AddressingConstants.NSPREFIX_ADDRESSING_SCHEMA,
AddressingConstants.NSURI_ADDRESSING_SCHEMA ) );
headerElem.addTextNode( BaseNotificationConstants.NOTIFY_ACTION_URL );
addReferencePropertyHeaders( header, consumerEPR );
}
private SOAPMessage buildSOAPMessage( Document fullMsgBodyElem,
EndpointReferenceType consumerEPR )
throws Exception
{
SOAPMessage msg = MessageFactory.newInstance( ).createMessage( );
SOAPEnvelope envelope = msg.getSOAPPart( ).getEnvelope( );
SOAPBody body = envelope.getBody( );
body.addDocument( fullMsgBodyElem );
SOAPHeader header = msg.getSOAPHeader( );
addWSAHeaders( header, consumerEPR );
return msg;
}
private boolean evaluatePrecondition( QueryExpression precondition,
ResourcePropertySet propSet )
throws Exception
{
boolean result;
if ( precondition == null )
{
result = true;
}
else
{
Object queryResult = QUERY_ENGINE.executeQuery( precondition, propSet );
try
{
result = ( (Boolean) queryResult ).booleanValue( );
LOG.debug( "Notification precondition '" + precondition + "' evaluated to " + result
+ " for subscription with id " + m_subscription.getID( ) + "." );
}
catch ( RuntimeException re )
{
result = false;
LOG.error( "Notification precondition '" + precondition
+ "' did not evaluate to a Boolean at notification time." );
}
}
return result;
}
private boolean evaluateSelector( QueryExpression selector,
XmlObject msg )
throws UnknownQueryExpressionDialectException,
QueryEvaluationErrorException,
InvalidQueryExpressionException
{
boolean result;
if ( selector == null )
{
result = true;
}
else
{
Object queryResult = QUERY_ENGINE.executeQuery( selector, msg );
try
{
result = ( (Boolean) queryResult ).booleanValue( );
}
catch ( RuntimeException re )
{
result = false;
LOG.error( "Notification selector '" + selector
+ "' did not evaluate to a Boolean at notification time." );
}
LOG.debug( "Notification selector '" + selector + "' evaluated to " + result
+ " for subscription with id " + m_subscription.getID( ) + "." );
}
return result;
}
private void notify( Object rawMsg )
throws Exception
{
synchronized ( m_subscription )
{
if ( !m_subscription.isPaused( ) )
{
LOG.debug( "Notification being sent for subscription with id " + m_subscription.getID( )
+ "; message value: " + rawMsg );
XmlObject msg = XmlBeanUtils.toXmlObject( rawMsg );
if ( evaluateSelector( m_subscription.getSelector( ),
msg )
&& evaluatePrecondition( m_subscription.getPrecondition( ),
m_subscription.getProducerResource( ).getResourcePropertySet( ) ) )
{
if ( m_subscription.getUseNotify( ) )
{
msg = wrapMessageWithNotify( msg );
}
Document dom = toDomDocument( msg );
EndpointReference consumerEPR = m_subscription.getConsumerReference( );
SOAPMessage soapMsg =
buildSOAPMessage( dom,
(EndpointReferenceType) ( (XmlObjectWrapper) consumerEPR ).getXmlObject( ) );
EMITTER_POOL.execute( EmitterTask.createEmitterTask( soapMsg,
new URL( consumerEPR.getAddress( ).toString( ) ) ) );
}
}
}
}
private Document toDomDocument( XmlObject notifyDoc )
throws ParserConfigurationException,
SAXException,
IOException
{
Document dom;
if ( XmlBeanUtils.isDocument( notifyDoc ) )
{
dom = (Document) notifyDoc.newDomNode( );
}
else
{
String notifyDocAsString = notifyDoc.xmlText( new XmlOptions( ).setSaveOuter( ) );
dom = JaxpUtils.toDocument( notifyDocAsString );
}
return dom;
}
private XmlObject wrapMessageWithNotify( XmlObject msg )
{
NotifyDocument notifyDoc = NotifyDocument.Factory.newInstance( );
NotifyDocument.Notify notify = notifyDoc.addNewNotify( );
NotificationMessageHolderType notifMsgHolder = notify.addNewNotificationMessage( );
notifMsgHolder.setMessage( msg );
EndpointReference producerEPR =
m_subscription.getProducerResource( ).getEndpointReference( );
XmlBeansEndpointReference xBeansProducerEPR = ( (XmlBeansEndpointReference) producerEPR );
notifMsgHolder.setProducerReference( (EndpointReferenceType) xBeansProducerEPR.getXmlObject( org.apache.ws.addressing.v2003_03.AddressingConstants.NSURI_ADDRESSING_SCHEMA ) );
TopicExpressionType topicExpr =
(TopicExpressionType) ( (XmlObjectWrapper) m_subscription.getTopicExpression( ) ).getXmlObject( );
notifMsgHolder.setTopic( topicExpr );
return notifyDoc;
}
}