package org.activemq.ws.notification.impl;
import java.util.Calendar;
import javax.jms.JMSException;
import org.activemq.util.IdGenerator;
import org.activemq.ws.notification.SubscriptionManager;
import org.activemq.ws.xmlbeans.addressing.v2003_03.EndpointReferenceType;
import org.activemq.ws.xmlbeans.addressing.v2003_03.ReferencePropertiesType;
import org.activemq.ws.xmlbeans.notification.base.PauseSubscriptionDocument;
import org.activemq.ws.xmlbeans.notification.base.PauseSubscriptionResponseDocument;
import org.activemq.ws.xmlbeans.notification.base.ResumeSubscriptionDocument;
import org.activemq.ws.xmlbeans.notification.base.ResumeSubscriptionResponseDocument;
import org.activemq.ws.xmlbeans.resource.lifetime.DestroyDocument;
import org.activemq.ws.xmlbeans.resource.lifetime.DestroyResponseDocument;
import org.activemq.ws.xmlbeans.resource.lifetime.SetTerminationTimeDocument;
import org.activemq.ws.xmlbeans.resource.lifetime.SetTerminationTimeResponseDocument;
import org.activemq.ws.xmlbeans.resource.lifetime.SetTerminationTimeResponseDocument.SetTerminationTimeResponse;
import org.activemq.ws.xmlbeans.resource.properties.GetResourcePropertyDocument;
import org.activemq.ws.xmlbeans.resource.properties.GetResourcePropertyResponseDocument;
import org.apache.xmlbeans.XmlCursor;
import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
public class ActiveMQSubscriptionManager implements SubscriptionManager {
private final ConcurrentHashMap subscriptions = new ConcurrentHashMap();
private final IdGenerator referenceGenerator = new IdGenerator();
private String address="http://";
public EndpointReferenceType register(ActiveMQSubscription subscription) {
String ref = referenceGenerator.generateId();
subscriptions.put(ref, subscription);
return toEndpointReference(ref);
}
public ActiveMQSubscription getSubscription(EndpointReferenceType subscriptionReference) {
return (ActiveMQSubscription) subscriptions.get(toReferenceId(subscriptionReference));
}
public ActiveMQSubscription removeSubscription(EndpointReferenceType subscriptionReference) {
return (ActiveMQSubscription) subscriptions.remove(toReferenceId(subscriptionReference));
}
private EndpointReferenceType toEndpointReference(String ref) {
EndpointReferenceType rc = EndpointReferenceType.Factory.newInstance();
rc.addNewAddress().setStringValue(address);
ReferencePropertiesType props = rc.addNewReferenceProperties();
XmlCursor cursor = props.newCursor();
cursor.setTextValue(ref);
return rc;
}
private String toReferenceId(EndpointReferenceType subscriptionReference) {
XmlCursor cursor = subscriptionReference.getReferenceProperties().newCursor();
cursor.toFirstContentToken();
String ref = cursor.getTextValue();
return ref;
}
public PauseSubscriptionResponseDocument pauseSubcription(PauseSubscriptionDocument request, EndpointReferenceType resource) {
ActiveMQSubscription subscription = getSubscription(resource);
if( subscription == null ) {
throw new RuntimeException("Invalid endpoint reference.");
}
try {
subscription.stop();
} catch (JMSException e) {
throw new RuntimeException(e.getMessage(),e);
}
PauseSubscriptionResponseDocument response = PauseSubscriptionResponseDocument.Factory.newInstance();
response.addNewPauseSubscriptionResponse();
return response;
}
public ResumeSubscriptionResponseDocument resumeSubscription(ResumeSubscriptionDocument request, EndpointReferenceType resource) {
ActiveMQSubscription subscription = getSubscription(resource);
if( subscription == null ) {
throw new RuntimeException("Invalid endpoint reference.");
}
try {
subscription.start();
} catch (JMSException e) {
throw new RuntimeException(e.getMessage(),e);
}
ResumeSubscriptionResponseDocument response = ResumeSubscriptionResponseDocument.Factory.newInstance();
response.addNewResumeSubscriptionResponse();
return response;
}
public SetTerminationTimeResponseDocument setTerminationTime(SetTerminationTimeDocument request, EndpointReferenceType resource) {
ActiveMQSubscription subscription = getSubscription(resource);
if( subscription == null ) {
throw new RuntimeException("Invalid endpoint reference.");
}
Calendar newTime = request.getSetTerminationTime().getRequestedTerminationTime();
Calendar oldTime = subscription.setTerminationTime(newTime);
SetTerminationTimeResponseDocument response = SetTerminationTimeResponseDocument.Factory.newInstance();
SetTerminationTimeResponse ttr = response.addNewSetTerminationTimeResponse();
ttr.setCurrentTime(oldTime);
ttr.setNewTerminationTime(newTime);
return response;
}
public DestroyResponseDocument destroy(EndpointReferenceType resource, DestroyDocument request) {
ActiveMQSubscription subscription = removeSubscription(resource);
if( subscription == null ) {
throw new RuntimeException("Invalid endpoint reference.");
}
try {
subscription.close();
} catch (JMSException e) {
throw new RuntimeException(e.getMessage(),e);
}
DestroyResponseDocument response = DestroyResponseDocument.Factory.newInstance();
response.addNewDestroyResponse();
return response;
}
public GetResourcePropertyResponseDocument getResourceProperty(EndpointReferenceType resource, GetResourcePropertyDocument request) {
ActiveMQSubscription subscription = removeSubscription(resource);
if( subscription == null ) {
throw new RuntimeException("Invalid endpoint reference.");
}
return subscription.getResourceProperty(resource, request);
}
public String getAddress() {
return address;
}
public void setAddress(String address) {
this.address = address;
}
}