/**
*
* Copyright 2004 Hiram Chirino
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/
package org.codehaus.activemq.ra;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.message.ActiveMQQueue;
import org.codehaus.activemq.message.ActiveMQTopic;
import javax.jms.*;
import javax.resource.ResourceException;
import javax.resource.spi.endpoint.MessageEndpoint;
import javax.resource.spi.work.*;
import javax.transaction.xa.XAResource;
import java.util.ArrayList;
import java.util.LinkedList;
/**
* @version $Revision: 1.3 $ $Date: 2004/07/31 21:11:00 $
*/
public class ActiveMQAsfEndpointWorker extends ActiveMQBaseEndpointWorker {
private static final Log log = LogFactory.getLog(ActiveMQAsfEndpointWorker.class);
private static final int MAX_MSGS_PER_SESSION = 1;
private static final int MAX_SESSION = 10;
ConnectionConsumer consumer;
private ServerSessionPoolImpl serverSessionPool;
/**
* @param adapter
* @param key
* @throws ResourceException
*/
public ActiveMQAsfEndpointWorker(ActiveMQResourceAdapter adapter, ActiveMQEndpointActivationKey key) throws ResourceException {
super(adapter, key);
}
public void start() throws WorkException, ResourceException {
log.debug("Starting");
boolean ok = false;
try {
serverSessionPool = new ServerSessionPoolImpl();
ActiveMQActivationSpec activationSpec = endpointActivationKey.getActivationSpec();
Destination dest = null;
if ("javax.jms.Queue".equals(activationSpec.getDestinationType())) {
dest = new ActiveMQQueue(activationSpec.getDestinationName());
} else if ("javax.jms.Topic".equals(activationSpec.getDestinationType())) {
dest = new ActiveMQTopic(activationSpec.getDestinationName());
} else {
throw new ResourceException("Unknown destination type: " + activationSpec.getDestinationType());
}
if (emptyToNull(activationSpec.getDurableSubscriptionName()) != null) {
consumer = adapter.getPhysicalConnection().createDurableConnectionConsumer((Topic)dest,activationSpec.getDurableSubscriptionName(), emptyToNull(activationSpec.getMessageSelector()), serverSessionPool, MAX_MSGS_PER_SESSION);
} else {
consumer = adapter.getPhysicalConnection().createConnectionConsumer(dest, emptyToNull(activationSpec.getMessageSelector()), serverSessionPool, MAX_MSGS_PER_SESSION);
}
ok = true;
log.debug("Started");
} catch (JMSException e) {
throw new ResourceException("Could not start the endpoint.", e);
} finally {
// We don't want to leak sessions on errors. Keep them around only if
// there were no errors.
if (!ok) {
safeClose(consumer);
}
}
}
/**
*
*/
public void stop() throws InterruptedException {
safeClose(consumer);
serverSessionPool.close();
}
class ServerSessionPoolImpl implements ServerSessionPool {
ServerSessionImpl ss;
ArrayList idleSessions = new ArrayList();
LinkedList activeSessions = new LinkedList();
int sessionIds=0;
int nextUsedSession;
boolean closing=false;
public ServerSessionPoolImpl() {
}
public ServerSessionImpl createServerSessionImpl() throws JMSException {
Session session = adapter.getPhysicalConnection().createSession(true, Session.SESSION_TRANSACTED);
return new ServerSessionImpl(this, session);
}
/**
*/
synchronized public ServerSession getServerSession() throws JMSException {
log.debug("ServerSession requested.");
if( closing )
throw new JMSException("Session Pool Shutting Down.");
if( idleSessions.size()>0 ) {
ServerSessionImpl ss = (ServerSessionImpl) idleSessions.remove(idleSessions.size()-1);
activeSessions.addLast(ss);
log.debug("Using idle session: "+ss);
return ss;
} else {
// Are we at the upper limit?
if( activeSessions.size() >= MAX_SESSION ) {
// then reuse the allready created sessions..
// This is going to queue up messages into a session for processing.
ServerSessionImpl ss = (ServerSessionImpl) activeSessions.removeFirst();
activeSessions.addLast(ss);
log.debug("Reusing an active session: "+ss);
return ss;
} else {
ServerSessionImpl ss = createServerSessionImpl();
activeSessions.addLast(ss);
log.debug("Created a new session: "+ss);
return ss;
}
}
}
synchronized public void returnToPool(ServerSessionImpl ss) {
log.debug("Session returned to pool: "+ss);
idleSessions.add(ss);
}
public void close() {
synchronized( this ) {
closing = true;
}
}
}
class ServerSessionImpl implements ServerSession, Work, MessageListener {
Session session;
private final ServerSessionPoolImpl pool;
private Object runControlMutex = new Object();
boolean workPendingFlag=false;
boolean runningFlag=false;
int runCounter=0;
XAResource xaResource;
public ServerSessionImpl(ServerSessionPoolImpl pool, Session session) throws JMSException {
this.pool = pool;
this.session=session;
this.session.setMessageListener(this);
if( session instanceof XASession ) {
xaResource = ((XASession)session).getXAResource();
}
}
/**
* @see javax.jms.ServerSession#getSession()
*/
public Session getSession() throws JMSException {
return session;
}
/**
* @see javax.jms.ServerSession#start()
*/
public void start() throws JMSException {
log.debug("ServerSession started.");
synchronized(runControlMutex) {
runCounter++;
// Is our thread allready running
if( runningFlag || workPendingFlag ) {
// let it know that it should do more work..
workPendingFlag=true;
log.debug("ServerSession allready running.");
return;
}
workPendingFlag=true;
}
// We get here because we need to start a async worker.
log.debug("ServerSession queuing request for a run.");
try {
workManager.scheduleWork(this, WorkManager.INDEFINITE, null,
new WorkListener() {
//The work listener is useful only for debugging...
public void workAccepted(WorkEvent event) {
log.debug("Work accepted: " + event);
}
public void workRejected(WorkEvent event) {
log.debug("Work rejected: " + event);
}
public void workStarted(WorkEvent event) {
log.debug("Work started: " + event);
}
public void workCompleted(WorkEvent event) {
log.debug("Work completed: " + event);
}
});
} catch ( WorkException e ) {
throw (JMSException)new JMSException("Work could not be started: "+e).initCause(e);
}
}
/**
* @see java.lang.Runnable#run()
*/
public void run() {
while(true) {
synchronized(runControlMutex) {
workPendingFlag=false;
runningFlag=true;
}
log.debug("Running: " + this);
session.run();
synchronized(runControlMutex) {
runCounter--;
runningFlag=false;
if( !workPendingFlag ) {
if( runCounter==0 )
pool.returnToPool(this);
break;
}
}
}
}
/**
* @see javax.resource.spi.work.Work#release()
*/
public void release() {
log.debug("release called");
}
/**
* @see javax.jms.MessageListener#onMessage(javax.jms.Message)
*/
public void onMessage(Message message) {
try {
MessageEndpoint endpoint = endpointFactory.createEndpoint(xaResource);
MessageListener listener = (MessageListener) endpoint;
endpoint.beforeDelivery(ON_MESSAGE_METHOD);
try {
listener.onMessage(message);
} finally {
endpoint.afterDelivery();
}
} catch (NoSuchMethodException e) {
log.info(e);
} catch (ResourceException e) {
log.info(e);
}
}
/**
* @see java.lang.Object#toString()
*/
public String toString() {
return "ServerSessionImpl[session="+session+"]";
}
}
private String emptyToNull(String value) {
if ("".equals(value)) {
return null;
}
return value;
}
}