/* */ package org.jboss.ejb.plugins.inflow;
/* */
/* */ import java.lang.reflect.Method;
/* */ import java.util.concurrent.atomic.AtomicBoolean;
/* */ import javax.resource.ResourceException;
/* */ import javax.transaction.Transaction;
/* */ import javax.transaction.TransactionManager;
/* */ import javax.transaction.xa.XAResource;
/* */ import org.jboss.ejb.MessageDrivenContainer;
/* */ import org.jboss.invocation.Invocation;
/* */ import org.jboss.invocation.InvocationContext;
/* */ import org.jboss.logging.Logger;
/* */ import org.jboss.proxy.Interceptor;
/* */
/* */ public class MessageEndpointInterceptor extends Interceptor
/* */ {
/* */ private static final long serialVersionUID = -8740717288847385688L;
/* 50 */ private static final Logger log = Logger.getLogger(MessageEndpointInterceptor.class);
/* */ public static final String MESSAGE_ENDPOINT_FACTORY = "MessageEndpoint.Factory";
/* */ public static final String MESSAGE_ENDPOINT_XARESOURCE = "MessageEndpoint.XAResource";
/* 59 */ private boolean trace = log.isTraceEnabled();
/* */
/* 62 */ private String cachedProxyString = null;
/* */
/* 65 */ protected AtomicBoolean released = new AtomicBoolean(false);
/* */
/* 68 */ protected AtomicBoolean delivered = new AtomicBoolean(false);
/* */
/* 71 */ protected Thread inUseThread = null;
/* */
/* 74 */ protected ClassLoader oldClassLoader = null;
/* */
/* 77 */ protected Transaction transaction = null;
/* */
/* 80 */ protected Transaction suspended = null;
/* */
/* 83 */ protected boolean beforeDeliveryInvoked = false;
/* */ private JBossMessageEndpointFactory endpointFactory;
/* */
/* */ public Object invoke(Invocation mi)
/* */ throws Throwable
/* */ {
/* 95 */ if (this.released.get()) {
/* 96 */ throw new IllegalStateException("This message endpoint + " + getProxyString(mi) + " has been released");
/* */ }
/* */
/* 99 */ synchronized (this)
/* */ {
/* 101 */ Thread currentThread = Thread.currentThread();
/* 102 */ if ((this.inUseThread != null) && (!this.inUseThread.equals(currentThread)))
/* 103 */ throw new IllegalStateException("This message endpoint + " + getProxyString(mi) + " is already in use by another thread " + this.inUseThread);
/* 104 */ this.inUseThread = currentThread;
/* */ }
/* */
/* 107 */ String method = mi.getMethod().getName();
/* 108 */ if (this.trace) {
/* 109 */ log.trace("MessageEndpoint " + getProxyString(mi) + " in use by " + method + " " + this.inUseThread);
/* */ }
/* */
/* 112 */ if (method.equals("release"))
/* */ {
/* 114 */ release(mi);
/* 115 */ return null;
/* */ }
/* 117 */ if (method.equals("beforeDelivery"))
/* */ {
/* 119 */ before(mi);
/* 120 */ return null;
/* */ }
/* 122 */ if (method.equals("afterDelivery"))
/* */ {
/* 124 */ after(mi);
/* 125 */ return null;
/* */ }
/* */
/* 128 */ return delivery(mi);
/* */ }
/* */
/* */ protected void release(Invocation mi)
/* */ throws Throwable
/* */ {
/* 140 */ this.released.set(true);
/* */
/* 142 */ if (this.trace) {
/* 143 */ log.trace("MessageEndpoint " + getProxyString(mi) + " released");
/* */ }
/* */
/* 146 */ if (getOldClassLoader() != null)
/* */ {
/* */ try
/* */ {
/* 150 */ finish("release", mi, false);
/* */ }
/* */ catch (Throwable t)
/* */ {
/* 154 */ log.warn("Error in release ", t);
/* */ }
/* */ }
/* */ }
/* */
/* */ protected void before(Invocation mi)
/* */ throws Throwable
/* */ {
/* 168 */ if (getBeforeDeliveryInvoke()) {
/* 169 */ throw new IllegalStateException("Missing afterDelivery from the previous beforeDelivery for message endpoint " + getProxyString(mi));
/* */ }
/* */
/* 172 */ MessageDrivenContainer container = getContainer(mi);
/* 173 */ synchronized (this)
/* */ {
/* 175 */ this.oldClassLoader = GetTCLAction.getContextClassLoader(this.inUseThread);
/* 176 */ SetTCLAction.setContextClassLoader(this.inUseThread, container.getClassLoader());
/* */ }
/* 178 */ if (this.trace) {
/* 179 */ log.trace("MessageEndpoint " + getProxyString(mi) + " set context classloader to " + container.getClassLoader());
/* */ }
/* */
/* */ try
/* */ {
/* 184 */ startTransaction("beforeDelivery", mi, container);
/* 185 */ setBeforeDeliveryInvoke(true);
/* */ }
/* */ catch (Throwable t)
/* */ {
/* 189 */ setBeforeDeliveryInvoke(false);
/* 190 */ resetContextClassLoader(mi);
/* 191 */ throw new ResourceException(t);
/* */ }
/* */ }
/* */
/* */ protected void after(Invocation mi)
/* */ throws Throwable
/* */ {
/* 204 */ if (!getBeforeDeliveryInvoke())
/* */ {
/* 206 */ throw new IllegalStateException("afterDelivery without a previous beforeDelivery for message endpoint " + getProxyString(mi));
/* */ }
/* */
/* */ try
/* */ {
/* 213 */ finish("afterDelivery", mi, true);
/* */ }
/* */ catch (Throwable t)
/* */ {
/* 217 */ throw new ResourceException(t);
/* */ }
/* */ }
/* */
/* */ protected Object delivery(Invocation mi)
/* */ throws Throwable
/* */ {
/* 232 */ if (this.delivered.get()) {
/* 233 */ throw new IllegalStateException("Multiple message delivery between before and after delivery is not allowed for message endpoint " + getProxyString(mi));
/* */ }
/* 235 */ if (this.trace) {
/* 236 */ log.trace("MessageEndpoint " + getProxyString(mi) + " delivering");
/* */ }
/* */
/* 239 */ if (getOldClassLoader() != null) {
/* 240 */ this.delivered.set(true);
/* */ }
/* 242 */ MessageDrivenContainer container = getContainer(mi);
/* 243 */ boolean commit = true;
/* */ try
/* */ {
/* 247 */ if (getOldClassLoader() == null)
/* 248 */ startTransaction("delivery", mi, container);
/* 249 */ localObject1 = getNext().invoke(mi);
/* */ }
/* */ catch (Throwable t)
/* */ {
/* */ Object localObject1;
/* 253 */ if (this.trace)
/* 254 */ log.trace("MessageEndpoint " + getProxyString(mi) + " delivery error", t);
/* 255 */ if (((t instanceof Error)) || ((t instanceof RuntimeException)))
/* */ {
/* 257 */ Transaction transaction = getTransaction();
/* 258 */ if (transaction != null)
/* 259 */ transaction.setRollbackOnly();
/* 260 */ commit = false;
/* */ }
/* 262 */ throw t;
/* */ }
/* */ finally
/* */ {
/* 267 */ if (getOldClassLoader() != null);
/* */ }
/* */
/* 276 */ ret;
/* */ }
/* */
/* */ protected void finish(String context, Invocation mi, boolean commit)
/* */ throws Throwable
/* */ {
/* */ try
/* */ {
/* 294 */ endTransaction(mi, commit);
/* */ }
/* */ finally
/* */ {
/* 298 */ setBeforeDeliveryInvoke(false);
/* */
/* 300 */ this.delivered.set(false);
/* */
/* 302 */ resetContextClassLoader(mi);
/* */
/* 304 */ releaseThreadLock(mi);
/* */ }
/* */ }
/* */
/* */ protected void startTransaction(String context, Invocation mi, MessageDrivenContainer container)
/* */ throws Throwable
/* */ {
/* 319 */ XAResource resource = (XAResource)mi.getInvocationContext().getValue("MessageEndpoint.XAResource");
/* */
/* 321 */ Method method = null;
/* */
/* 324 */ if ("delivery".equals(context)) {
/* 325 */ method = mi.getMethod();
/* */ }
/* */ else {
/* 328 */ method = (Method)mi.getArguments()[0];
/* */ }
/* */
/* 331 */ boolean isTransacted = getMessageEndpointFactory(mi).isDeliveryTransacted(method);
/* */
/* 333 */ if (this.trace) {
/* 334 */ log.trace("MessageEndpoint " + getProxyString(mi) + " " + context + " method=" + method + " xaResource=" + resource + " transacted=" + isTransacted);
/* */ }
/* */
/* 337 */ TransactionManager tm = container.getTransactionManager();
/* 338 */ Transaction tx = tm.suspend();
/* 339 */ synchronized (this)
/* */ {
/* 341 */ this.suspended = tx;
/* */ }
/* */
/* 344 */ if (this.trace) {
/* 345 */ log.trace("MessageEndpoint " + getProxyString(mi) + " " + context + " currentTx=" + this.suspended);
/* */ }
/* */
/* 348 */ if (isTransacted)
/* */ {
/* 351 */ if (this.suspended == null)
/* */ {
/* 353 */ tm.begin();
/* 354 */ tx = tm.getTransaction();
/* 355 */ synchronized (this)
/* */ {
/* 357 */ this.transaction = tx;
/* */ }
/* 359 */ if (this.trace) {
/* 360 */ log.trace("MessageEndpoint " + getProxyString(mi) + " started transaction=" + this.transaction);
/* */ }
/* */
/* 363 */ if (resource != null)
/* */ {
/* 365 */ this.transaction.enlistResource(resource);
/* 366 */ if (this.trace) {
/* 367 */ log.trace("MessageEndpoint " + getProxyString(mi) + " enlisted=" + resource);
/* */ }
/* */ }
/* */ }
/* */ else
/* */ {
/* */ try
/* */ {
/* 375 */ tm.resume(this.suspended);
/* */ }
/* */ finally
/* */ {
/* 379 */ synchronized (this)
/* */ {
/* 381 */ this.suspended = null;
/* */ }
/* 383 */ if (this.trace)
/* 384 */ log.trace("MessageEndpoint " + getProxyString(mi) + " transaction=" + this.suspended + " already active, IGNORED=" + resource);
/* */ }
/* */ }
/* */ }
/* */ }
/* */
/* */ protected void endTransaction(Invocation mi, boolean commit)
/* */ throws Throwable
/* */ {
/* 399 */ TransactionManager tm = null;
/* 400 */ Transaction currentTx = null;
/* */ try
/* */ {
/* 404 */ Transaction transaction = getTransaction();
/* 405 */ if (transaction != null)
/* */ {
/* 407 */ tm = getContainer(mi).getTransactionManager();
/* 408 */ currentTx = tm.getTransaction();
/* */
/* 411 */ if ((currentTx != null) && (!currentTx.equals(transaction)))
/* */ {
/* 413 */ log.warn("Current transaction " + currentTx + " is not the expected transaction.");
/* 414 */ tm.suspend();
/* 415 */ tm.resume(transaction);
/* */ }
/* */ else
/* */ {
/* 420 */ currentTx = null;
/* */ }
/* */
/* 424 */ if ((!commit) || (transaction.getStatus() == 1))
/* */ {
/* 426 */ if (this.trace)
/* 427 */ log.trace("MessageEndpoint " + getProxyString(mi) + " rollback");
/* 428 */ tm.rollback();
/* */ }
/* */ else
/* */ {
/* 432 */ if (this.trace)
/* 433 */ log.trace("MessageEndpoint " + getProxyString(mi) + " commit");
/* 434 */ tm.commit();
/* */ }
/* */
/* */ }
/* */
/* 439 */ Transaction suspended = getSuspended();
/* 440 */ if (suspended != null)
/* */ {
/* */ try
/* */ {
/* 444 */ tm = getContainer(mi).getTransactionManager();
/* 445 */ tm.resume(suspended);
/* */ }
/* */ finally
/* */ {
/* 449 */ synchronized (this)
/* */ {
/* 451 */ this.suspended = null;
/* */ }
/* */ }
/* */ }
/* */ }
/* */ finally
/* */ {
/* 458 */ synchronized (this)
/* */ {
/* 460 */ this.transaction = null;
/* */ }
/* */
/* 464 */ if (currentTx != null)
/* */ {
/* */ try
/* */ {
/* 468 */ tm.resume(currentTx);
/* */ }
/* */ catch (Throwable t)
/* */ {
/* 472 */ log.warn("MessageEndpoint " + getProxyString(mi) + " failed to resume old transaction " + currentTx);
/* */ }
/* */ }
/* */ }
/* */ }
/* */
/* */ protected void resetContextClassLoader(Invocation mi)
/* */ {
/* 485 */ synchronized (this)
/* */ {
/* 487 */ if (this.trace)
/* 488 */ log.trace("MessageEndpoint " + getProxyString(mi) + " reset classloader " + this.oldClassLoader);
/* 489 */ SetTCLAction.setContextClassLoader(this.inUseThread, this.oldClassLoader);
/* 490 */ this.oldClassLoader = null;
/* */ }
/* */ }
/* */
/* */ protected void setBeforeDeliveryInvoke(boolean bdi)
/* */ {
/* 496 */ this.beforeDeliveryInvoked = bdi;
/* */ }
/* */
/* */ protected boolean getBeforeDeliveryInvoke()
/* */ {
/* 502 */ return this.beforeDeliveryInvoked;
/* */ }
/* */
/* */ protected void releaseThreadLock(Invocation mi)
/* */ {
/* 512 */ synchronized (this)
/* */ {
/* 514 */ if (this.trace)
/* 515 */ log.trace("MessageEndpoint " + getProxyString(mi) + " no longer in use by " + this.inUseThread);
/* 516 */ this.inUseThread = null;
/* */ }
/* */ }
/* */
/* */ protected String getProxyString(Invocation mi)
/* */ {
/* 528 */ if (this.cachedProxyString == null)
/* 529 */ this.cachedProxyString = mi.getInvocationContext().getCacheId().toString();
/* 530 */ return this.cachedProxyString;
/* */ }
/* */
/* */ protected JBossMessageEndpointFactory getMessageEndpointFactory(Invocation mi)
/* */ {
/* 541 */ if (this.endpointFactory == null)
/* 542 */ this.endpointFactory = ((JBossMessageEndpointFactory)mi.getInvocationContext().getValue("MessageEndpoint.Factory"));
/* 543 */ if (this.endpointFactory == null)
/* 544 */ throw new IllegalStateException("No message endpoint factory in " + mi.getInvocationContext().context);
/* 545 */ return this.endpointFactory;
/* */ }
/* */
/* */ protected MessageDrivenContainer getContainer(Invocation mi)
/* */ {
/* 556 */ JBossMessageEndpointFactory messageEndpointFactory = getMessageEndpointFactory(mi);
/* 557 */ MessageDrivenContainer container = messageEndpointFactory.getContainer();
/* 558 */ if (container == null)
/* 559 */ throw new IllegalStateException("No container associated with message endpoint factory: " + messageEndpointFactory.getServiceName());
/* 560 */ return container;
/* */ }
/* */
/* */ protected synchronized ClassLoader getOldClassLoader()
/* */ {
/* 565 */ return this.oldClassLoader;
/* */ }
/* */
/* */ protected synchronized Transaction getTransaction()
/* */ {
/* 570 */ return this.transaction;
/* */ }
/* */
/* */ protected synchronized Transaction getSuspended()
/* */ {
/* 575 */ return this.suspended;
/* */ }
/* */ }
/* Location: /home/mnovotny/projects/EMBEDDED_JBOSS_BETA3_COMMUNITY/embedded/output/lib/embedded-jboss/lib/jboss-embedded-all.jar
* Qualified Name: org.jboss.ejb.plugins.inflow.MessageEndpointInterceptor
* JD-Core Version: 0.6.0
*/