Package org.mule.jbi.messaging

Source Code of org.mule.jbi.messaging.DeliveryChannelImpl

/*
* Copyright 2005 SymphonySoft Limited. All rights reserved.
* http://www.symphonysoft.com
*
* The software in this package is published under the terms of the BSD
* style license a copy of which has been included with this distribution in
* the LICENSE.txt file.
*
* ------------------------------------------------------------------------------------------------------
* $Id: DeliveryChannelImpl.java 2184 2006-06-04 23:36:19Z holger $
* $Revision: 2184 $
* $Date: 2006-06-05 07:36:19 +0800 (周一, 2006-06-05) $
*/
package org.mule.jbi.messaging;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.mule.jbi.JbiContainer;
import org.mule.jbi.registry.JbiRegistryComponent;
import org.mule.jbi.servicedesc.AbstractServiceEndpoint;
import org.mule.util.queue.Queue;
import org.mule.util.queue.QueueSession;

import javax.jbi.messaging.DeliveryChannel;
import javax.jbi.messaging.MessageExchange;
import javax.jbi.messaging.MessageExchangeFactory;
import javax.jbi.messaging.MessagingException;
import javax.jbi.servicedesc.ServiceEndpoint;
import javax.xml.namespace.QName;

/**
*
* @author <a href="mailto:gnt@codehaus.org">Guillaume Nodet</a>
*/
public class DeliveryChannelImpl implements DeliveryChannel {

    /**
     * logger used by this class
     */
    protected transient Log logger = LogFactory.getLog(getClass());

  private JbiContainer container;
    private String componentName;
  private boolean closed;
 
  public DeliveryChannelImpl(JbiContainer container, String component) {
    this.container = container;
        this.componentName = component;
  }
 
  public void close() throws MessagingException {
    this.container.getEndpoints().unregisterEndpoints(this.componentName);
    this.closed = true;
  }

  public MessageExchangeFactory createExchangeFactory() {
    MessageExchangeFactoryImpl mep = new MessageExchangeFactoryImpl(this);
    return mep;
  }

  public MessageExchangeFactory createExchangeFactory(QName interfaceName) {
    MessageExchangeFactoryImpl mep = new MessageExchangeFactoryImpl(this);
    mep.setInterfaceName(interfaceName);
    return mep;
  }

  public MessageExchangeFactory createExchangeFactoryForService(QName serviceName) {
    MessageExchangeFactoryImpl mep = new MessageExchangeFactoryImpl(this);
    mep.setService(serviceName);
    return mep;
  }

  public MessageExchangeFactory createExchangeFactory(ServiceEndpoint endpoint) {
    MessageExchangeFactoryImpl mep = new MessageExchangeFactoryImpl(this);
    mep.setEndpoint(endpoint);
    return mep;
  }

  public MessageExchange accept() throws MessagingException {
    if (this.closed) {
      throw new MessagingException("Channel is closed");
    }
    try {
      QueueSession qs = container.getQueueSession();
      Queue queue = qs.getQueue(componentName);
      MessageExchange me = (MessageExchange) queue.take();
      if (me != null) {
        handleReceive(me);
      }
      return me;
    } catch (InterruptedException e) {
      throw new MessagingException(e);
    }
  }

  public MessageExchange accept(long timeout) throws MessagingException {
    if (this.closed) {
      throw new MessagingException("Channel is closed");
    }
    try {
      QueueSession qs = container.getQueueSession();
      Queue queue = qs.getQueue(componentName);
      MessageExchange me = (MessageExchange) queue.poll(timeout);
      if (me != null) {
        handleReceive(me);
      }
      return me;
    } catch (InterruptedException e) {
      throw new MessagingException(e);
    }
  }

  public void send(MessageExchange exchange) throws MessagingException {
    if (this.closed) {
      throw new MessagingException("Channel is closed");
    }
    if (!(exchange instanceof MessageExchangeProxy)) {
      throw new MessagingException("exchange should be created with MessageExchangeFactory");
    }
    MessageExchangeProxy me = (MessageExchangeProxy) exchange;
    String target;
    if (me.getRole() == MessageExchange.Role.CONSUMER) {
      if (me.getConsumer() == null) {
        me.setConsumer(this.componentName);
        ServiceEndpoint se = this.container.getRouter().getTargetEndpoint(exchange);
        me.setEndpoint(se);
        target = ((AbstractServiceEndpoint) se).getComponent();
        me.setProvider(target);
      } else {
        target = me.getProvider();
      }
    } else {
      target = me.getConsumer();
    }
    me.handleSend(false);
        container.getRouter().send(me);

  }

  public boolean sendSync(MessageExchange exchange) throws MessagingException {
    return sendSync(exchange, 0L);
  }

  public boolean sendSync(MessageExchange exchange, long timeout) throws MessagingException {
    if (this.closed) {
      throw new MessagingException("Channel is closed");
    }
    if (!(exchange instanceof MessageExchangeProxy)) {
      throw new MessagingException("exchange should be created with MessageExchangeFactory");
    }
    MessageExchangeProxy me = (MessageExchangeProxy) exchange;
    String target;
    if (me.getRole() == MessageExchange.Role.CONSUMER) {
      if (me.getConsumer() == null) {
        me.setConsumer(this.componentName);
        ServiceEndpoint se = this.container.getRouter().getTargetEndpoint(exchange);
        me.setEndpoint(se);
        target = ((AbstractServiceEndpoint) se).getComponent();
        me.setProvider(target);
      } else {
        target = me.getProvider();
      }
    } else {
      target = me.getConsumer();
    }
    me.handleSend(true);
    DeliveryChannelImpl ch = (DeliveryChannelImpl) ((JbiRegistryComponent)container.getRegistry().getComponent(target)).getChannel();
    ch.enqueue(me.getTwin());
    try {
      synchronized (exchange) {
        exchange.wait(timeout);
        if (me.getSyncState() == MessageExchangeProxy.SYNC_STATE_SYNC_RECEIVED) {
          me.handleAccept();
          return true;
        } else {
          return false;
        }
      }
    } catch (InterruptedException e) {
      throw new MessagingException(e);
    }
  }
 
  private void handleReceive(MessageExchange exchange) throws MessagingException {
    if (!(exchange instanceof MessageExchangeProxy)) {
      throw new MessagingException("exchange should be created with MessageExchangeFactory");
    }
    MessageExchangeProxy me = (MessageExchangeProxy) exchange;
      me.handleAccept();
  }
 
  public void enqueue(MessageExchange exchange) throws MessagingException {
    if (!(exchange instanceof MessageExchangeProxy)) {
      throw new MessagingException("exchange should be created with MessageExchangeFactory");
    }
    MessageExchangeProxy me = (MessageExchangeProxy) exchange;
    if (me.getSyncState() == MessageExchangeProxy.SYNC_STATE_SYNC_SENT) {
      synchronized (me) {
        me.setSyncState(MessageExchangeProxy.SYNC_STATE_SYNC_RECEIVED);
        me.notify();
      }
    } else {
            try {
          QueueSession qs = container.getQueueSession();
          Queue queue = qs.getQueue(componentName);
                queue.put(me);
            } catch (InterruptedException e) {
                logger.error(e);
            }
        }
  }

  public boolean isClosed() {
    return closed;
  }

  public JbiContainer getContainer() {
    return container;
  }

}
TOP

Related Classes of org.mule.jbi.messaging.DeliveryChannelImpl

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.