/*=============================================================================*
* Copyright 2004 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*=============================================================================*/
package org.apache.ws.notification.base.impl;
import org.apache.commons.id.IdentifierUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.ws.Soap1_1Constants;
import org.apache.ws.addressing.EndpointReference;
import org.apache.ws.notification.base.NotificationProducerResource;
import org.apache.ws.notification.base.PauseFailedException;
import org.apache.ws.notification.base.ResumeFailedException;
import org.apache.ws.notification.base.Subscription;
import org.apache.ws.notification.base.SubscriptionManager;
import org.apache.ws.notification.topics.Topic;
import org.apache.ws.notification.topics.expression.TopicExpression;
import org.apache.ws.notification.topics.expression.TopicExpressionException;
import org.apache.ws.pubsub.Filter;
import org.apache.ws.pubsub.NotificationConsumer;
import org.apache.ws.pubsub.NotificationProducer;
import org.apache.ws.resource.ResourceHome;
import org.apache.ws.resource.WsrfRuntime;
import org.apache.ws.resource.faults.FaultException;
import org.apache.ws.resource.lifetime.ResourceTerminationEvent;
import org.apache.ws.resource.lifetime.ResourceTerminationListener;
import org.apache.ws.resource.lifetime.impl.ResourceTerminationEventImpl;
import org.apache.ws.resource.properties.ResourcePropertySet;
import org.apache.ws.resource.properties.query.QueryExpression;
import java.net.URI;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.List;
/**
* An abstract base class for WSN Subscription implementations.
*/
public abstract class AbstractSubscription
implements Subscription
{
private static final Log LOG = LogFactory.getLog( AbstractSubscription.class );
private Calendar m_creationTime = Calendar.getInstance( );
private String m_id = createUuid( );
private EndpointReference m_producerRef;
private EndpointReference m_consumerRef;
private TopicExpression m_topicExpr;
private Calendar m_terminationTime;
private boolean m_useNotify = true;
private QueryExpression m_precondition;
private QueryExpression m_selector;
private Object m_policy;
private boolean m_isPaused;
private EndpointReference m_epr;
private transient ResourcePropertySet m_propSet;
private NotificationConsumer m_notificationConsumer;
private NotificationProducer m_notificationProducer;
private List m_terminationListeners = new ArrayList( );
/**
* Creates a new {@link AbstractSubscription} object.
*
* @param producerRef DOCUMENT_ME
* @param consumerRef DOCUMENT_ME
* @param topicExpr DOCUMENT_ME
*/
protected AbstractSubscription( EndpointReference producerRef,
EndpointReference consumerRef,
TopicExpression topicExpr )
{
m_producerRef = producerRef;
m_consumerRef = consumerRef;
m_topicExpr = topicExpr;
}
/**
* DOCUMENT_ME
*
* @param consumerRef DOCUMENT_ME
*/
public void setConsumerReference( EndpointReference consumerRef )
{
m_consumerRef = consumerRef;
}
/**
* DOCUMENT_ME
*
* @return DOCUMENT_ME
*/
public EndpointReference getConsumerReference( )
{
return m_consumerRef;
}
/**
* DOCUMENT_ME
*
* @return DOCUMENT_ME
*/
public Calendar getCreationTime( )
{
return m_creationTime;
}
/**
* DOCUMENT_ME
*
* @return DOCUMENT_ME
*/
public Calendar getCurrentTime( )
{
return Calendar.getInstance( );
}
/**
* DOCUMENT_ME
*
* @return DOCUMENT_ME
*/
public URI getDeliveryMode( )
{
// TODO. *SJC* This is provided for pubsub abstraction layer
return null;
}
/**
* DOCUMENT_ME
*
* @param epr DOCUMENT_ME
*/
public void setEndpointReference( EndpointReference epr )
{
m_epr = epr;
}
/**
* DOCUMENT_ME
*
* @return DOCUMENT_ME
*/
public EndpointReference getEndpointReference( )
{
return m_epr;
}
/**
* DOCUMENT_ME
*
* @return DOCUMENT_ME
*/
public Filter getFilters( )
{
// TODO. *SJC* This is provided for pubsub abstraction layer
return null;
}
/**
* DOCUMENT_ME
*
* @param id DOCUMENT_ME
*/
public void setID( Object id )
{
m_id = (String) id;
}
/**
* DOCUMENT_ME
*
* @return DOCUMENT_ME
*/
public Object getID( )
{
return m_id;
}
/**
* DOCUMENT_ME
*
* @param notificationConsumer DOCUMENT_ME
*/
public void setNotificationConsumer( NotificationConsumer notificationConsumer )
{
m_notificationConsumer = notificationConsumer;
}
/**
* DOCUMENT_ME
*
* @return DOCUMENT_ME
*/
public NotificationConsumer getNotificationConsumer( )
{
return m_notificationConsumer;
}
/**
* DOCUMENT_ME
*
* @param notificationProducer DOCUMENT_ME
*/
public void setNotificationProducer( NotificationProducer notificationProducer )
{
m_notificationProducer = notificationProducer;
}
/**
* DOCUMENT_ME
*
* @return DOCUMENT_ME
*/
public NotificationProducer getNotificationProducer( )
{
return m_notificationProducer;
}
/**
* DOCUMENT_ME
*
* @return DOCUMENT_ME
*/
public boolean isPaused( )
{
return m_isPaused;
}
/**
* DOCUMENT_ME
*
* @param policy DOCUMENT_ME
*/
public void setPolicy( Object policy )
{
m_policy = policy;
}
/**
* DOCUMENT_ME
*
* @return DOCUMENT_ME
*/
public Object getPolicy( )
{
return m_policy;
}
/**
* DOCUMENT_ME
*
* @param precondition DOCUMENT_ME
*/
public void setPrecondition( QueryExpression precondition )
{
m_precondition = precondition;
}
/**
* DOCUMENT_ME
*
* @return DOCUMENT_ME
*/
public QueryExpression getPrecondition( )
{
return m_precondition;
}
/**
* DOCUMENT_ME
*
* @return DOCUMENT_ME
*/
public EndpointReference getProducerReference( )
{
return m_producerRef;
}
/**
* DOCUMENT_ME
*
* @return DOCUMENT_ME
*/
public NotificationProducerResource getProducerResource( )
{
String producerAddress = m_producerRef.getAddress( );
String producerServiceName = producerAddress.substring( producerAddress.lastIndexOf( '/' ) + 1 );
try
{
ResourceHome producerHome = WsrfRuntime.getRuntime( ).getResourceHome( producerServiceName );
Object producerId = producerHome.extractResourceIdentifier( m_producerRef );
return (NotificationProducerResource) producerHome.find( producerId );
}
catch ( Exception e )
{
throw new RuntimeException( "Failed to lookup NotificationProducer resource due to internal error.", e );
}
}
/**
* DOCUMENT_ME
*
* @param resourcePropertySet DOCUMENT_ME
*/
public void setResourcePropertySet( ResourcePropertySet resourcePropertySet )
{
m_propSet = resourcePropertySet;
}
/**
* DOCUMENT_ME
*
* @return DOCUMENT_ME
*/
public ResourcePropertySet getResourcePropertySet( )
{
return m_propSet;
}
/**
* DOCUMENT_ME
*
* @param selector DOCUMENT_ME
*/
public void setSelector( QueryExpression selector )
{
m_selector = selector;
}
/**
* DOCUMENT_ME
*
* @return DOCUMENT_ME
*/
public QueryExpression getSelector( )
{
return m_selector;
}
/**
* DOCUMENT_ME
*
* @param time DOCUMENT_ME
*/
public void setTerminationTime( Calendar time )
{
m_terminationTime = time;
}
/**
* DOCUMENT_ME
*
* @return DOCUMENT_ME
*/
public Calendar getTerminationTime( )
{
return m_terminationTime;
}
/**
* DOCUMENT_ME
*
* @param topicExpr DOCUMENT_ME
*/
public void setTopicExpression( TopicExpression topicExpr )
{
m_topicExpr = topicExpr;
}
/**
* DOCUMENT_ME
*
* @return DOCUMENT_ME
*/
public TopicExpression getTopicExpression( )
{
return m_topicExpr;
}
/**
* DOCUMENT_ME
*
* @param useNotify DOCUMENT_ME
*/
public void setUseNotify( boolean useNotify )
{
m_useNotify = useNotify;
}
/**
* DOCUMENT_ME
*
* @return DOCUMENT_ME
*/
public boolean getUseNotify( )
{
return m_useNotify;
}
/**
* DOCUMENT_ME
*
* @param resourceTerminationListener DOCUMENT_ME
*/
public void addTerminationListener( ResourceTerminationListener resourceTerminationListener )
{
m_terminationListeners.add( resourceTerminationListener );
}
/**
* DOCUMENT_ME
*/
public void destroy( )
{
SubscriptionManager.getInstance( ).removeSubscription( this,
evaluateTopicExpression( ) );
notifyResourceTerminationListeners( );
}
/**
* @see org.apache.ws.notification.base.Subscription#pause()
*/
public void pause( )
throws PauseFailedException
{
m_isPaused = true;
}
/**
* @see org.apache.ws.notification.base.Subscription#resume()
*/
public void resume( )
throws ResumeFailedException
{
m_isPaused = false;
}
/**
* @see org.apache.ws.notification.base.Subscription#unsubscribe()
*/
public void unsubscribe( )
{
destroy( );
}
private String createUuid( )
{
return IdentifierUtils.UUID_VERSION_FOUR_GENERATOR.nextIdentifier( ).toString( );
}
private Topic[] evaluateTopicExpression( )
{
try
{
return getProducerResource( ).getTopicSet( ).evaluateTopicExpression( m_topicExpr );
}
catch ( TopicExpressionException tee )
{
LOG.error( tee ); // should never happen since the TopicExpression
throw new FaultException( Soap1_1Constants.FAULT_CLIENT,
"An exception occurred evaluating the topic expression. " );
}
}
private void notifyResourceTerminationListeners( )
{
ResourceTerminationEvent rte =
new ResourceTerminationEventImpl( getID( ), "Subscription with id " + getID( ) + " destroyed." );
for ( int i = 0; i < m_terminationListeners.size( ); i++ )
{
ResourceTerminationListener resourceTerminationListener =
(ResourceTerminationListener) m_terminationListeners.get( i );
resourceTerminationListener.terminationOccurred( rte );
}
}
}