/**
*
* Copyright 2004 Protique Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/
package org.activemq.ws.notification.impl;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.xml.namespace.QName;
import org.activemq.ActiveMQConnection;
import org.activemq.message.ActiveMQTopic;
import org.activemq.ws.xmlbeans.addressing.v2003_03.EndpointReferenceType;
import org.activemq.ws.xmlbeans.notification.base.NotificationMessageHolderType;
import org.activemq.ws.xmlbeans.notification.base.NotifyDocument;
import org.activemq.ws.xmlbeans.notification.base.PauseSubscriptionDocument;
import org.activemq.ws.xmlbeans.notification.base.ResumeSubscriptionDocument;
import org.activemq.ws.xmlbeans.notification.base.NotifyDocument.Notify;
import org.activemq.ws.xmlbeans.resource.properties.GetResourcePropertyDocument;
import org.activemq.ws.xmlbeans.resource.properties.GetResourcePropertyResponseDocument;
import org.apache.xmlbeans.XmlCursor;
import org.apache.xmlbeans.XmlObject;
import EDU.oswego.cs.dl.util.concurrent.Slot;
/**
* @version $Revision: 1.1.1.1 $
*/
public class ActiveMQNotificationBrokerTest extends TestSupport {
public void testGetRequiresRegistrationProperty() throws Exception {
ActiveMQNotificationBroker broker = new ActiveMQNotificationBroker();
GetResourcePropertyDocument request = GetResourcePropertyDocument.Factory.newInstance();
QName property = new QName("http://docs.oasis-open.org/wsn/2004/06/wsn-WS-BrokeredNotification-1.2-draft-01.xsd", "RequiresRegistration");
request.setGetResourceProperty( property );
System.out.println(request);
GetResourcePropertyResponseDocument response = broker.getResourceProperty(null, request);
System.out.println(response);
assertNotNull(response);
XmlCursor cursor = response.newCursor();
cursor.toChild(property);
cursor.toFirstContentToken();
assertEquals("false", cursor.getTextValue());
}
public void XtestSendNotify() throws Exception {
ActiveMQNotificationBroker broker = new ActiveMQNotificationBroker();
ActiveMQConnection connection = broker.getConnection();
Session session = connection.createSession(false, 0);
ActiveMQTopic topic = new ActiveMQTopic("Test");
MessageConsumer consumer = session.createConsumer(topic);
NotifyDocument request = NotifyDocument.Factory.newInstance();
Notify notify = request.addNewNotify();
NotificationMessageHolderType messageHolder = notify.addNewNotificationMessage();
messageHolder.setTopic( TopicExpressionConverter.toTopicExpression(topic) );
XmlObject o = createMessage();
messageHolder.setMessage(o);
System.out.println(request);
broker.notify(request);
Message message = consumer.receive(3000);
assertNotNull(message);
}
public void testSubscribe() throws JMSException, InterruptedException {
final Slot result = new Slot();
ActiveMQNotificationBroker broker = new ActiveMQNotificationBroker() {
protected org.activemq.ws.notification.NotificationConsumer createNotificationConsumer(EndpointReferenceType consumerReference) {
return new StubNotificationConsumer(result);
}
};
addSubscription(broker);
sendNotification(broker);
NotifyDocument subNotifyDoc = (NotifyDocument) result.poll(2000);
System.out.println("Got Notify: "+subNotifyDoc);
assertValidMessage(subNotifyDoc);
}
public void testSubscriptionPauseResume() throws JMSException, InterruptedException {
final Slot result = new Slot();
ActiveMQNotificationBroker broker = new ActiveMQNotificationBroker() {
protected org.activemq.ws.notification.NotificationConsumer createNotificationConsumer(EndpointReferenceType consumerReference) {
return new StubNotificationConsumer(result);
}
};
EndpointReferenceType subRef = addSubscription(broker);
// The sub should be running and we should be getting notifed now.
sendNotification(broker);
NotifyDocument subNotifyDoc = (NotifyDocument) result.poll(2000);
assertNotNull(subNotifyDoc);
// Pause the subscription.
PauseSubscriptionDocument pauseRequest = PauseSubscriptionDocument.Factory.newInstance();
pauseRequest.addNewPauseSubscription();
broker.getSubscriptionManager().pauseSubcription(pauseRequest, subRef);
// The sub should be stopped and we should not be getting notifed now.
sendNotification(broker);
subNotifyDoc = (NotifyDocument) result.poll(2000);
assertNull(subNotifyDoc);
// Resume the subscription.
ResumeSubscriptionDocument resumeRequest = ResumeSubscriptionDocument.Factory.newInstance();
resumeRequest.addNewResumeSubscription();
broker.getSubscriptionManager().resumeSubscription(resumeRequest, subRef);
// We should now get the message that was previously sent since the sub is now running.
subNotifyDoc = (NotifyDocument) result.poll(2000);
assertNotNull(subNotifyDoc);
}
}