package org.servicemix.ws.notification.impl.invoke;
import org.activemq.ActiveMQConnection;
import org.activemq.ActiveMQMessageConsumer;
import org.activemq.message.ActiveMQTopic;
import org.activemq.service.Service;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.xmlbeans.XmlException;
import org.apache.xmlbeans.XmlObject;
import org.servicemix.ws.notification.NotificationConsumer;
import org.servicemix.ws.notification.impl.invoke.InvokerSupport;
import org.servicemix.ws.resource.ResourceProperties;
import org.servicemix.ws.xmlbeans.addressing.v2003_03.EndpointReferenceType;
import org.servicemix.ws.xmlbeans.notification.base.NotificationMessageHolderType;
import org.servicemix.ws.xmlbeans.notification.base.NotifyDocument;
import org.servicemix.ws.xmlbeans.notification.base.NotifyDocument.Notify;
import org.servicemix.ws.xmlbeans.notification.base.TopicExpressionType;
import org.servicemix.ws.xmlbeans.resource.properties.GetResourcePropertyDocument;
import org.servicemix.ws.xmlbeans.resource.properties.GetResourcePropertyResponseDocument;
import org.servicemix.ws.xmlbeans.resource.properties.QueryExpressionType;
import javax.jms.BytesMessage;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.StringReader;
import java.util.Calendar;
/**
* Uses an instance of {@link NotificationConsumer} to invoke the notification
*/
public class NotificationConsumerInvoker extends InvokerSupport implements MessageListener {
private NotificationConsumer notificationConsumer;
public NotificationConsumerInvoker(NotificationConsumer notificationConsumer) {
this.notificationConsumer = notificationConsumer;
}
protected void dispatchMessage(TopicExpressionType topic, Message message) throws JMSException, XmlException, IOException {
if (message instanceof BytesMessage) {
BytesMessage bm = (BytesMessage) message;
byte data[] = new byte[(int) bm.getBodyLength()];
bm.readBytes(data);
XmlObject xml = XmlObject.Factory.parse(new ByteArrayInputStream(data));
dispatch(topic, xml);
}
else if (message instanceof TextMessage) {
TextMessage tm = (TextMessage) message;
XmlObject xml = XmlObject.Factory.parse(new StringReader(tm.getText()));
dispatch(topic, xml);
}
}
private void dispatch(TopicExpressionType topic, XmlObject xml) {
NotifyDocument request = NotifyDocument.Factory.newInstance();
Notify notify = request.addNewNotify();
NotificationMessageHolderType messageHolder = notify.addNewNotificationMessage();
EndpointReferenceType producerReference = getProducerReference();
if (producerReference != null) {
messageHolder.setProducerReference((EndpointReferenceType) producerReference.copy());
}
messageHolder.setTopic(topic);
messageHolder.setMessage(xml);
notificationConsumer.notify(request);
}
static private String toJMSSelector(QueryExpressionType selector) {
return null;
}
}