Package org.springframework.xd.dirt.integration.rabbit

Source Code of org.springframework.xd.dirt.integration.rabbit.RabbitMessageBus

/*
* Copyright 2013-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*      http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.xd.dirt.integration.rabbit;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;

import org.aopalliance.aop.Advice;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.config.RetryInterceptorBuilder;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelCallback;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.expression.Expression;
import org.springframework.http.MediaType;
import org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter;
import org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint;
import org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.endpoint.EventDrivenConsumer;
import org.springframework.integration.handler.AbstractMessageHandler;
import org.springframework.integration.handler.AbstractReplyProducingMessageHandler;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.retry.interceptor.RetryOperationsInterceptor;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import org.springframework.xd.dirt.integration.bus.AbstractBusPropertiesAccessor;
import org.springframework.xd.dirt.integration.bus.Binding;
import org.springframework.xd.dirt.integration.bus.BusProperties;
import org.springframework.xd.dirt.integration.bus.MessageBus;
import org.springframework.xd.dirt.integration.bus.MessageBusSupport;
import org.springframework.xd.dirt.integration.bus.serializer.MultiTypeCodec;

import com.rabbitmq.client.Channel;

/**
* A {@link MessageBus} implementation backed by RabbitMQ.
*
* @author Mark Fisher
* @author Gary Russell
* @author Jennifer Hickey
*/
public class RabbitMessageBus extends MessageBusSupport implements DisposableBean {

  private static final AcknowledgeMode DEFAULT_ACKNOWLEDGE_MODE = AcknowledgeMode.AUTO;

  private static final MessageDeliveryMode DEFAULT_DEFAULT_DELIVERY_MODE = MessageDeliveryMode.PERSISTENT;

  private static final boolean DEFAULT_DEFAULT_REQUEUE_REJECTED = true;

  private static final int DEFAULT_MAX_CONCURRENCY = 1;

  private static final int DEFAULT_PREFETCH_COUNT = 1;

  private static final String DEFAULT_RABBIT_PREFIX = "xdbus.";

  private static final int DEFAULT_TX_SIZE = 1;

  private static final String[] DEFAULT_REQUEST_HEADER_PATTERNS = new String[] { "STANDARD_REQUEST_HEADERS", "*" };

  private static final String[] DEFAULT_REPLY_HEADER_PATTERNS = new String[] { "STANDARD_REPLY_HEADERS", "*" };

  private static final Set<Object> RABBIT_CONSUMER_PROPERTIES = new HashSet<Object>(Arrays.asList(new String[] {
    BusProperties.MAX_CONCURRENCY,
    RabbitPropertiesAccessor.ACK_MODE,
    RabbitPropertiesAccessor.PREFETCH,
    RabbitPropertiesAccessor.PREFIX,
    RabbitPropertiesAccessor.REQUEST_HEADER_PATTERNS,
    RabbitPropertiesAccessor.REQUEUE,
    RabbitPropertiesAccessor.TRANSACTED,
    RabbitPropertiesAccessor.TX_SIZE,
    RabbitPropertiesAccessor.AUTO_BIND_DLQ
  }));

  /**
   * Retry + rabbit consumer properties.
   */
  private static final Set<Object> SUPPORTED_BASIC_CONSUMER_PROPERTIES = new SetBuilder()
      .addAll(CONSUMER_RETRY_PROPERTIES)
      .addAll(RABBIT_CONSUMER_PROPERTIES)
      .build();

  private static final Set<Object> SUPPORTED_PUBSUB_CONSUMER_PROPERTIES = SUPPORTED_BASIC_CONSUMER_PROPERTIES;

  /**
   * Basic + concurrency.
   */
  private static final Set<Object> SUPPORTED_NAMED_CONSUMER_PROPERTIES = new SetBuilder()
      .addAll(SUPPORTED_BASIC_CONSUMER_PROPERTIES)
      .add(BusProperties.CONCURRENCY)
      .build();

  /**
   * Basic + concurrency + partitioning.
   */
  private static final Set<Object> SUPPORTED_CONSUMER_PROPERTIES = new SetBuilder()
      .addAll(SUPPORTED_BASIC_CONSUMER_PROPERTIES)
      .add(BusProperties.CONCURRENCY)
      .add(BusProperties.PARTITION_INDEX)
      .build();

  /**
   * Basic + concurrency + reply headers + delivery mode (reply).
   */
  private static final Set<Object> SUPPORTED_REPLYING_CONSUMER_PROPERTIES = new SetBuilder()
      // request
      .addAll(SUPPORTED_BASIC_CONSUMER_PROPERTIES)
      .add(BusProperties.CONCURRENCY)
      // reply
      .add(RabbitPropertiesAccessor.REPLY_HEADER_PATTERNS)
      .add(RabbitPropertiesAccessor.DELIVERY_MODE)
      .build();

  /**
   * Rabbit producer properties.
   */
  private static final Set<Object> SUPPORTED_BASIC_PRODUCER_PROPERTIES = new SetBuilder()
      .addAll(PRODUCER_STANDARD_PROPERTIES)
      .add(RabbitPropertiesAccessor.DELIVERY_MODE)
      .add(RabbitPropertiesAccessor.PREFIX)
      .add(RabbitPropertiesAccessor.REQUEST_HEADER_PATTERNS)
      .build();

  private static final Set<Object> SUPPORTED_PUBSUB_PRODUCER_PROPERTIES = SUPPORTED_BASIC_PRODUCER_PROPERTIES;

  private static final Set<Object> SUPPORTED_NAMED_PRODUCER_PROPERTIES = SUPPORTED_BASIC_PRODUCER_PROPERTIES;

  /**
   * Partitioning + rabbit producer properties.
   */
  private static final Set<Object> SUPPORTED_PRODUCER_PROPERTIES = new SetBuilder()
      .addAll(PRODUCER_PARTITIONING_PROPERTIES)
      .addAll(SUPPORTED_BASIC_PRODUCER_PROPERTIES)
      .add(BusProperties.DIRECT_BINDING_ALLOWED)
      .build();

  /**
   * Basic producer + basic consumer + concurrency + reply headers.
   */
  private static final Set<Object> SUPPORTED_REQUESTING_PRODUCER_PROPERTIES = new SetBuilder()
      // request
      .addAll(SUPPORTED_BASIC_PRODUCER_PROPERTIES)
      // reply
      .addAll(SUPPORTED_BASIC_CONSUMER_PROPERTIES)
      .add(BusProperties.CONCURRENCY)
      .add(RabbitPropertiesAccessor.REPLY_HEADER_PATTERNS)
      .build();

  private final Log logger = LogFactory.getLog(this.getClass());

  private final RabbitAdmin rabbitAdmin;

  private final RabbitTemplate rabbitTemplate = new RabbitTemplate();

  private final ConnectionFactory connectionFactory;

  private final GenericApplicationContext autoDeclareContext = new GenericApplicationContext();

  // Default RabbitMQ Container properties

  private volatile AcknowledgeMode defaultAcknowledgeMode = DEFAULT_ACKNOWLEDGE_MODE;

  private volatile boolean defaultChannelTransacted;

  private volatile MessageDeliveryMode defaultDefaultDeliveryMode = DEFAULT_DEFAULT_DELIVERY_MODE;

  private volatile boolean defaultDefaultRequeueRejected = DEFAULT_DEFAULT_REQUEUE_REJECTED;

  private volatile int defaultMaxConcurrency = DEFAULT_MAX_CONCURRENCY;

  private volatile int defaultPrefetchCount = DEFAULT_PREFETCH_COUNT;

  private volatile int defaultTxSize = DEFAULT_TX_SIZE;

  private volatile String defaultPrefix = DEFAULT_RABBIT_PREFIX;

  private volatile String[] defaultRequestHeaderPatterns = DEFAULT_REQUEST_HEADER_PATTERNS;

  private volatile String[] defaultReplyHeaderPatterns = DEFAULT_REPLY_HEADER_PATTERNS;

  private volatile boolean defaultAutoBindDLQ = false;


  public RabbitMessageBus(ConnectionFactory connectionFactory, MultiTypeCodec<Object> codec) {
    Assert.notNull(connectionFactory, "connectionFactory must not be null");
    Assert.notNull(codec, "codec must not be null");
    this.connectionFactory = connectionFactory;
    this.rabbitTemplate.setConnectionFactory(connectionFactory);
    this.rabbitTemplate.afterPropertiesSet();
    this.rabbitAdmin = new RabbitAdmin(connectionFactory);
    this.autoDeclareContext.refresh();
    this.rabbitAdmin.setApplicationContext(this.autoDeclareContext);
    this.rabbitAdmin.afterPropertiesSet();
    this.setCodec(codec);
  }

  public void setDefaultAcknowledgeMode(AcknowledgeMode defaultAcknowledgeMode) {
    Assert.notNull(defaultAcknowledgeMode, "'defaultAcknowledgeMode' cannot be null");
    this.defaultAcknowledgeMode = defaultAcknowledgeMode;
  }

  public void setDefaultChannelTransacted(boolean defaultChannelTransacted) {
    this.defaultChannelTransacted = defaultChannelTransacted;
  }

  public void setDefaultDefaultDeliveryMode(MessageDeliveryMode defaultDefaultDeliveryMode) {
    Assert.notNull(defaultDefaultDeliveryMode, "'defaultDeliveryMode' cannot be null");
    this.defaultDefaultDeliveryMode = defaultDefaultDeliveryMode;
  }

  public void setDefaultDefaultRequeueRejected(boolean defaultDefaultRequeueRejected) {
    this.defaultDefaultRequeueRejected = defaultDefaultRequeueRejected;
  }

  /**
   * Set the bus's default max consumers; can be overridden by consumer.maxConcurrency. Values
   * less than 'concurrency' will be coerced to be equal to concurrency.
   * @param defaultMaxConcurrency The default max concurrency.
   */
  public void setDefaultMaxConcurrency(int defaultMaxConcurrency) {
    this.defaultMaxConcurrency = defaultMaxConcurrency;
  }

  public void setDefaultPrefetchCount(int defaultPrefetchCount) {
    this.defaultPrefetchCount = defaultPrefetchCount;
  }

  public void setDefaultTxSize(int defaultTxSize) {
    this.defaultTxSize = defaultTxSize;
  }

  public void setDefaultPrefix(String defaultPrefix) {
    Assert.notNull(defaultPrefix, "'defaultPrefix' cannot be null");
    this.defaultPrefix = defaultPrefix.trim();
  }

  public void setDefaultRequestHeaderPatterns(String[] defaultRequestHeaderPatterns) {
    this.defaultRequestHeaderPatterns = defaultRequestHeaderPatterns;
  }

  public void setDefaultReplyHeaderPatterns(String[] defaultReplyHeaderPatterns) {
    this.defaultReplyHeaderPatterns = defaultReplyHeaderPatterns;
  }

  public void setDefaultAutoBindDLQ(boolean defaultAutoBindDLQ) {
    this.defaultAutoBindDLQ = defaultAutoBindDLQ;
  }

  @Override
  public void bindConsumer(final String name, MessageChannel moduleInputChannel, Properties properties) {
    if (logger.isInfoEnabled()) {
      logger.info("declaring queue for inbound: " + name);
    }
    if (name.startsWith(P2P_NAMED_CHANNEL_TYPE_PREFIX)) {
      validateConsumerProperties(name, properties, SUPPORTED_NAMED_CONSUMER_PROPERTIES);
    }
    else {
      validateConsumerProperties(name, properties, SUPPORTED_CONSUMER_PROPERTIES);
    }
    RabbitPropertiesAccessor accessor = new RabbitPropertiesAccessor(properties);
    String queueName = accessor.getPrefix(this.defaultPrefix) + name;
    int partitionIndex = accessor.getPartitionIndex();
    if (partitionIndex >= 0) {
      queueName += "-" + partitionIndex;
    }
    Queue queue = new Queue(queueName);
    declareQueueIfNotPresent(queue);
    autoBindDLQ(name, accessor);
    doRegisterConsumer(name, moduleInputChannel, queue, accessor, false);
    bindExistingProducerDirectlyIfPossible(name, moduleInputChannel);
  }

  @Override
  public void bindPubSubConsumer(String name, MessageChannel moduleInputChannel, Properties properties) {
    if (logger.isInfoEnabled()) {
      logger.info("declaring pubsub for inbound: " + name);
    }
    RabbitPropertiesAccessor accessor = new RabbitPropertiesAccessor(properties);
    validateConsumerProperties(name, properties, SUPPORTED_PUBSUB_CONSUMER_PROPERTIES);
    String prefix = accessor.getPrefix(this.defaultPrefix);
    FanoutExchange exchange = new FanoutExchange(prefix + "topic." + name);
    declareExchangeIfNotPresent(exchange);
    String uniqueName = name + "." + UUID.randomUUID().toString();
    Queue queue = new Queue(prefix + uniqueName, false, true, true);
    declareQueueIfNotPresent(queue);
    org.springframework.amqp.core.Binding binding = BindingBuilder.bind(queue).to(exchange);
    this.rabbitAdmin.declareBinding(binding);
    // register with context so they will be redeclared after a connection failure
    this.autoDeclareContext.getBeanFactory().registerSingleton(queue.getName(), queue);
    String bindingBeanName = exchange.getName() + "." + queue.getName() + ".binding";
    if (!autoDeclareContext.containsBean(bindingBeanName)) {
      this.autoDeclareContext.getBeanFactory().registerSingleton(bindingBeanName, binding);
    }
    doRegisterConsumer(name, moduleInputChannel, queue, accessor, true);
    autoBindDLQ(uniqueName, accessor);
  }

  private void doRegisterConsumer(String name, MessageChannel moduleInputChannel, Queue queue,
      RabbitPropertiesAccessor properties, boolean isPubSub) {
    SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer(this.connectionFactory);
    listenerContainer.setAcknowledgeMode(properties.getAcknowledgeMode(this.defaultAcknowledgeMode));
    listenerContainer.setChannelTransacted(properties.getTransacted(this.defaultChannelTransacted));
    listenerContainer.setDefaultRequeueRejected(properties.getRequeueRejected(this.defaultDefaultRequeueRejected));
    if (!isPubSub) {
      int concurrency = properties.getConcurrency(this.defaultConcurrency);
      concurrency = concurrency > 0 ? concurrency : 1;
      listenerContainer.setConcurrentConsumers(concurrency);
      int maxConcurrency = properties.getMaxConcurrency(this.defaultMaxConcurrency);
      if (maxConcurrency > concurrency) {
        listenerContainer.setMaxConcurrentConsumers(maxConcurrency);
      }
    }
    listenerContainer.setPrefetchCount(properties.getPrefetchCount(this.defaultPrefetchCount));
    listenerContainer.setTxSize(properties.getTxSize(this.defaultTxSize));
    listenerContainer.setTaskExecutor(new SimpleAsyncTaskExecutor(queue.getName() + "-"));
    listenerContainer.setQueues(queue);
    int maxAttempts = properties.getMaxAttempts(this.defaultMaxAttempts);
    if (maxAttempts > 1) {
      RetryOperationsInterceptor retryInterceptor = RetryInterceptorBuilder.stateless()
          .maxAttempts(maxAttempts)
          .backOffOptions(properties.getBackOffInitialInterval(this.defaultBackOffInitialInterval),
              properties.getBackOffMultiplier(this.defaultBackOffMultiplier),
              properties.getBackOffMaxInterval(this.defaultBackOffMaxInterval))
          .recoverer(new RejectAndDontRequeueRecoverer())
          .build();
      listenerContainer.setAdviceChain(new Advice[] { retryInterceptor });
    }
    listenerContainer.afterPropertiesSet();
    AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(listenerContainer);
    adapter.setBeanFactory(this.getBeanFactory());
    DirectChannel bridgeToModuleChannel = new DirectChannel();
    bridgeToModuleChannel.setBeanFactory(this.getBeanFactory());
    bridgeToModuleChannel.setBeanName(name + ".bridge");
    adapter.setOutputChannel(bridgeToModuleChannel);
    adapter.setBeanName("inbound." + name);
    DefaultAmqpHeaderMapper mapper = new DefaultAmqpHeaderMapper();
    mapper.setRequestHeaderNames(properties.getRequestHeaderPattens(this.defaultRequestHeaderPatterns));
    mapper.setReplyHeaderNames(properties.getReplyHeaderPattens(this.defaultReplyHeaderPatterns));
    adapter.setHeaderMapper(mapper);
    adapter.afterPropertiesSet();
    Binding consumerBinding = Binding.forConsumer(name, adapter, moduleInputChannel, properties);
    addBinding(consumerBinding);
    ReceivingHandler convertingBridge = new ReceivingHandler();
    convertingBridge.setOutputChannel(moduleInputChannel);
    convertingBridge.setBeanName(name + ".convert.bridge");
    convertingBridge.afterPropertiesSet();
    bridgeToModuleChannel.subscribe(convertingBridge);
    consumerBinding.start();
  }

  @Override
  public void bindProducer(final String name, MessageChannel moduleOutputChannel,
      Properties properties) {
    Assert.isInstanceOf(SubscribableChannel.class, moduleOutputChannel);
    RabbitPropertiesAccessor accessor = new RabbitPropertiesAccessor(properties);
    if (name.startsWith(P2P_NAMED_CHANNEL_TYPE_PREFIX)) {
      validateProducerProperties(name, properties, SUPPORTED_NAMED_PRODUCER_PROPERTIES);
    }
    else {
      validateProducerProperties(name, properties, SUPPORTED_PRODUCER_PROPERTIES);
    }
    if (!bindNewProducerDirectlyIfPossible(name, (SubscribableChannel) moduleOutputChannel, accessor)) {
      if (logger.isInfoEnabled()) {
        logger.info("declaring queue for outbound: " + name);
      }
      AmqpOutboundEndpoint queue = this.buildOutboundEndpoint(name, accessor);
      doRegisterProducer(name, moduleOutputChannel, queue, accessor);
    }
  }

  private AmqpOutboundEndpoint buildOutboundEndpoint(final String name, RabbitPropertiesAccessor properties) {
    String queueName = properties.getPrefix(this.defaultPrefix) + name;
    String partitionKeyExtractorClass = properties.getPartitionKeyExtractorClass();
    Expression partitionKeyExpression = properties.getPartitionKeyExpression();
    AmqpOutboundEndpoint queue = new AmqpOutboundEndpoint(rabbitTemplate);
    if (partitionKeyExpression == null && !StringUtils.hasText(partitionKeyExtractorClass)) {
      declareQueueIfNotPresent(new Queue(queueName));
      queue.setRoutingKey(queueName); // uses default exchange
    }
    else {
      queue.setRoutingKeyExpression(buildPartitionRoutingExpression(queueName));
      for (int i = 0; i < properties.getPartitionCount(); i++) {
        this.rabbitAdmin.declareQueue(new Queue(queueName + "-" + i));
      }
    }
    configureOutboundHandler(queue, properties);
    return queue;
  }

  private void configureOutboundHandler(AmqpOutboundEndpoint handler, RabbitPropertiesAccessor properties) {
    DefaultAmqpHeaderMapper mapper = new DefaultAmqpHeaderMapper();
    mapper.setRequestHeaderNames(properties.getRequestHeaderPattens(this.defaultRequestHeaderPatterns));
    mapper.setReplyHeaderNames(properties.getReplyHeaderPattens(this.defaultReplyHeaderPatterns));
    handler.setHeaderMapper(mapper);
    handler.setDefaultDeliveryMode(properties.getDeliveryMode(this.defaultDefaultDeliveryMode));
    handler.setBeanFactory(this.getBeanFactory());
    handler.afterPropertiesSet();
  }

  @Override
  public void bindPubSubProducer(String name, MessageChannel moduleOutputChannel,
      Properties properties) {
    validateProducerProperties(name, properties, SUPPORTED_PUBSUB_PRODUCER_PROPERTIES);
    RabbitPropertiesAccessor accessor = new RabbitPropertiesAccessor(properties);
    String exchangeName = accessor.getPrefix(this.defaultPrefix) + "topic." + name;
    declareExchangeIfNotPresent(new FanoutExchange(exchangeName));
    AmqpOutboundEndpoint fanout = new AmqpOutboundEndpoint(rabbitTemplate);
    fanout.setExchangeName(exchangeName);
    configureOutboundHandler(fanout, accessor);
    doRegisterProducer(name, moduleOutputChannel, fanout, accessor);
  }

  private void doRegisterProducer(final String name, MessageChannel moduleOutputChannel,
      AmqpOutboundEndpoint delegate, RabbitPropertiesAccessor properties) {
    this.doRegisterProducer(name, moduleOutputChannel, delegate, null, properties);
  }

  private void doRegisterProducer(final String name, MessageChannel moduleOutputChannel,
      AmqpOutboundEndpoint delegate, String replyTo, RabbitPropertiesAccessor properties) {
    Assert.isInstanceOf(SubscribableChannel.class, moduleOutputChannel);
    MessageHandler handler = new SendingHandler(delegate, replyTo, properties);
    EventDrivenConsumer consumer = new EventDrivenConsumer((SubscribableChannel) moduleOutputChannel, handler);
    consumer.setBeanFactory(getBeanFactory());
    consumer.setBeanName("outbound." + name);
    consumer.afterPropertiesSet();
    Binding producerBinding = Binding.forProducer(name, moduleOutputChannel, consumer, properties);
    addBinding(producerBinding);
    producerBinding.start();
  }

  @Override
  public void bindRequestor(String name, MessageChannel requests, MessageChannel replies,
      Properties properties) {
    if (logger.isInfoEnabled()) {
      logger.info("binding requestor: " + name);
    }
    validateProducerProperties(name, properties, SUPPORTED_REQUESTING_PRODUCER_PROPERTIES);
    Assert.isInstanceOf(SubscribableChannel.class, requests);
    RabbitPropertiesAccessor accessor = new RabbitPropertiesAccessor(properties);
    String queueName = name + ".requests";
    AmqpOutboundEndpoint queue = this.buildOutboundEndpoint(queueName, accessor);
    queue.setBeanFactory(this.getBeanFactory());

    String replyQueueName = accessor.getPrefix(this.defaultPrefix) + name + ".replies."
        + this.getIdGenerator().generateId();
    this.doRegisterProducer(name, requests, queue, replyQueueName, accessor);
    Queue replyQueue = new Queue(replyQueueName, false, false, true); // auto-delete
    declareQueueIfNotPresent(replyQueue);
    // register with context so it will be redeclared after a connection failure
    this.autoDeclareContext.getBeanFactory().registerSingleton(replyQueueName, replyQueue);
    this.doRegisterConsumer(name, replies, replyQueue, accessor, false);
  }

  @Override
  public void bindReplier(String name, MessageChannel requests, MessageChannel replies,
      Properties properties) {
    if (logger.isInfoEnabled()) {
      logger.info("binding replier: " + name);
    }
    validateConsumerProperties(name, properties, SUPPORTED_REPLYING_CONSUMER_PROPERTIES);
    RabbitPropertiesAccessor accessor = new RabbitPropertiesAccessor(properties);
    Queue requestQueue = new Queue(accessor.getPrefix(this.defaultPrefix) + name + ".requests");
    declareQueueIfNotPresent(requestQueue);
    this.doRegisterConsumer(name, requests, requestQueue, accessor, false);

    AmqpOutboundEndpoint replyQueue = new AmqpOutboundEndpoint(rabbitTemplate);
    replyQueue.setRoutingKeyExpression("headers['" + AmqpHeaders.REPLY_TO + "']");
    configureOutboundHandler(replyQueue, accessor);
    doRegisterProducer(name, replies, replyQueue, accessor);
  }

  /**
   * Try passive declaration first, in case the user has pre-configured the queue with
   * incompatible arguments.
   * @param queue The queue.
   */
  private void declareQueueIfNotPresent(Queue queue) {
    if (this.rabbitAdmin.getQueueProperties(queue.getName()) == null) {
      this.rabbitAdmin.declareQueue(queue);
    }
  }

  /**
   * Try passive declaration first, in case the user has pre-configured the exchange with
   * incompatible arguments.
   * @param exchange
   */
  private void declareExchangeIfNotPresent(final Exchange exchange) {
    this.rabbitTemplate.execute(new ChannelCallback<Void>() {

      @Override
      public Void doInRabbit(Channel channel) throws Exception {
        try {
          channel.exchangeDeclarePassive(exchange.getName());
        }
        catch (IOException e) {
          RabbitMessageBus.this.rabbitAdmin.declareExchange(exchange);
        }
        return null;
      }

    });
  }

  /**
   * If so requested, declare the DLX/DLQ and bind it. The DLQ is bound
   * to the DLX with a routing key of the original queue name because we
   * use default exchange routing by queue name for the original message.
   * @param name The name.
   * @param properties The properties accessor.
   */
  private void autoBindDLQ(final String name, RabbitPropertiesAccessor properties) {
    if (properties.getAutoBindDLQ(this.defaultAutoBindDLQ)) {
      String prefix = properties.getPrefix(this.defaultPrefix);
      String dlqName = prefix + name + ".dlq";
      Queue dlq = new Queue(dlqName);
      declareQueueIfNotPresent(dlq);
      final String dlxName = properties.getPrefix(this.defaultPrefix) + "DLX";
      final DirectExchange dlx = new DirectExchange(dlxName);
      declareExchangeIfNotPresent(dlx);
      this.rabbitAdmin.declareBinding(BindingBuilder.bind(dlq).to(dlx).with(prefix + name));
    }
  }

  @Override
  public void destroy() {
    stopBindings();
  }

  private class SendingHandler extends AbstractMessageHandler {

    private final MessageHandler delegate;

    private final String replyTo;

    private final PartitioningMetadata partitioningMetadata;

    private SendingHandler(MessageHandler delegate, String replyTo, RabbitPropertiesAccessor properties) {
      this.delegate = delegate;
      this.replyTo = replyTo;
      this.partitioningMetadata = new PartitioningMetadata(properties);
      this.setBeanFactory(RabbitMessageBus.this.getBeanFactory());
    }

    @Override
    protected void handleMessageInternal(Message<?> message) throws Exception {
      Message<?> messageToSend = serializePayloadIfNecessary(message,
          MediaType.APPLICATION_OCTET_STREAM);
      Map<String, Object> additionalHeaders = null;
      if (replyTo != null) {
        additionalHeaders = new HashMap<String, Object>();
        additionalHeaders.put(AmqpHeaders.REPLY_TO, this.replyTo);
      }
      if (this.partitioningMetadata.isPartitionedModule()) {
        if (additionalHeaders == null) {
          additionalHeaders = new HashMap<String, Object>();
        }
        additionalHeaders.put(PARTITION_HEADER, determinePartition(message, this.partitioningMetadata));
      }
      if (additionalHeaders != null) {
        messageToSend = getMessageBuilderFactory().fromMessage(messageToSend)
            .copyHeaders(additionalHeaders)
            .build();
      }
      this.delegate.handleMessage(messageToSend);
    }
  }

  private class ReceivingHandler extends AbstractReplyProducingMessageHandler {

    public ReceivingHandler() {
      super();
      this.setBeanFactory(RabbitMessageBus.this.getBeanFactory());
    }

    @Override
    protected Object handleRequestMessage(Message<?> requestMessage) {
      return deserializePayloadIfNecessary(requestMessage);
    }

    @Override
    protected boolean shouldCopyRequestHeaders() {
      /*
       * we've already copied the headers so no need for the ARPMH to do it, and we don't want the content-type
       * restored if absent.
       */
      return false;
    }

  };

  /**
   * Property accessor for the RabbitMessageBus. Refer to the Spring-AMQP
   * documentation for information on the specific properties.
   */
  private static class RabbitPropertiesAccessor extends AbstractBusPropertiesAccessor {

    /**
     * The acknowledge mode.
     */
    private static final String ACK_MODE = "ackMode";

    /**
     * The delivery mode.
     */
    private static final String DELIVERY_MODE = "deliveryMode";

    /**
     * The prefetch count (basic qos).
     */
    private static final String PREFETCH = "prefetch";

    /**
     * The prefix for queues, exchanges.
     */
    private static final String PREFIX = "prefix";

    /**
     * The reply header patterns.
     */
    private static final String REPLY_HEADER_PATTERNS = "replyHeaderPatterns";

    /**
     * The request header patterns.
     */
    private static final String REQUEST_HEADER_PATTERNS = "requestHeaderPatterns";

    /**
     * Whether delivery failures should be requeued.
     */
    private static final String REQUEUE = "requeue";

    /**
     * Whether to use transacted channels.
     */
    private static final String TRANSACTED = "transacted";

    /**
     * The number of deliveries between acks.
     */
    private static final String TX_SIZE = "txSize";

    /**
     * Whether to automatically declare the DLQ and bind it to the bus DLX.
     */
    private static final String AUTO_BIND_DLQ = "autoBindDLQ";

    public RabbitPropertiesAccessor(Properties properties) {
      super(properties);
    }

    public AcknowledgeMode getAcknowledgeMode(AcknowledgeMode defaultValue) {
      String ackknowledgeMode = getProperty(ACK_MODE);
      if (StringUtils.hasText(ackknowledgeMode)) {
        return AcknowledgeMode.valueOf(ackknowledgeMode);
      }
      else {
        return defaultValue;
      }
    }

    public MessageDeliveryMode getDeliveryMode(MessageDeliveryMode defaultValue) {
      String deliveryMode = getProperty(DELIVERY_MODE);
      if (StringUtils.hasText(deliveryMode)) {
        return MessageDeliveryMode.valueOf(deliveryMode);
      }
      else {
        return defaultValue;
      }
    }

    public int getPrefetchCount(int defaultValue) {
      return getProperty(PREFETCH, defaultValue);
    }

    public String getPrefix(String defaultValue) {
      return getProperty(PREFIX, defaultValue);
    }

    public String[] getReplyHeaderPattens(String[] defaultValue) {
      return asStringArray(getProperty(REPLY_HEADER_PATTERNS), defaultValue);
    }

    public String[] getRequestHeaderPattens(String[] defaultValue) {
      return asStringArray(getProperty(REQUEST_HEADER_PATTERNS), defaultValue);
    }

    public boolean getRequeueRejected(boolean defaultValue) {
      return getProperty(REQUEUE, defaultValue);
    }

    public boolean getTransacted(boolean defaultValue) {
      return getProperty(TRANSACTED, defaultValue);
    }

    public int getTxSize(int defaultValue) {
      return getProperty(TX_SIZE, defaultValue);
    }

    public boolean getAutoBindDLQ(boolean defaultValue) {
      return getProperty(AUTO_BIND_DLQ, defaultValue);
    }

  }

}
TOP

Related Classes of org.springframework.xd.dirt.integration.rabbit.RabbitMessageBus

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.