Package org.mule.processor.chain

Source Code of org.mule.processor.chain.DefaultMessageProcessorChain

/*
* Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
* The software in this package is published under the terms of the CPAL v1.0
* license, a copy of which has been included with this distribution in the
* LICENSE.txt file.
*/
package org.mule.processor.chain;

import org.mule.MessageExchangePattern;
import org.mule.OptimizedRequestContext;
import org.mule.VoidMuleEvent;
import org.mule.api.MessagingException;
import org.mule.api.MuleContext;
import org.mule.api.MuleEvent;
import org.mule.api.MuleException;
import org.mule.api.component.Component;
import org.mule.api.construct.FlowConstruct;
import org.mule.api.construct.Pipeline;
import org.mule.api.endpoint.OutboundEndpoint;
import org.mule.api.processor.MessageProcessor;
import org.mule.api.processor.MessageProcessorChain;
import org.mule.api.processor.RequestReplyReplierMessageProcessor;
import org.mule.api.service.Service;
import org.mule.api.transformer.Transformer;
import org.mule.construct.Flow;
import org.mule.execution.MessageProcessorExecutionTemplate;
import org.mule.routing.MessageFilter;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

public class DefaultMessageProcessorChain extends AbstractMessageProcessorChain
{
    protected MessageProcessorExecutionTemplate messageProcessorExecutionTemplate = MessageProcessorExecutionTemplate.createExecutionTemplate();

    protected DefaultMessageProcessorChain(List<MessageProcessor> processors)
    {
        super(null, processors);
    }

    protected DefaultMessageProcessorChain(MessageProcessor... processors)
    {
        super(null, new ArrayList(Arrays.asList(processors)));
    }

    protected DefaultMessageProcessorChain(String name, List<MessageProcessor> processors)
    {
        super(name, processors);
    }

    protected DefaultMessageProcessorChain(String name, MessageProcessor... processors)
    {
        super(name, Arrays.asList(processors));
    }

    public static MessageProcessorChain from(MessageProcessor messageProcessor)
    {
        return new DefaultMessageProcessorChain(messageProcessor);
    }

    public static MessageProcessorChain from(MessageProcessor... messageProcessors) throws MuleException
    {
        return new DefaultMessageProcessorChainBuilder().chain(messageProcessors).build();
    }

    public static MessageProcessorChain from(List<MessageProcessor> messageProcessors) throws MuleException
    {
        return new DefaultMessageProcessorChainBuilder().chain(messageProcessors).build();
    }

    protected MuleEvent doProcess(MuleEvent event) throws MuleException
    {
        if (event.getMuleContext() != null
            && event.getMuleContext().getConfiguration().isFlowEndingWithOneWayEndpointReturnsNull())
        {
            return doProcessFlowEndingWithOneWayEndpointReturnsNull(event);
        }
        else
        {
            FlowConstruct flowConstruct = event.getFlowConstruct();
            boolean flowConstructIsNotAService = !(flowConstruct instanceof Service);
            MuleEvent copy = null;
           
            for (int i = 0; i < processors.size(); i++)
            {
                MessageProcessor processor = processors.get(i);
                if (flowConstructIsNotAService && processorMayReturnNull(processor))
                {
                    copy = OptimizedRequestContext.criticalSetEvent(event);
                }

                event = messageProcessorExecutionTemplate.execute(processor, event);

                if (VoidMuleEvent.getInstance().equals(event))
                {
                    if (flowConstructIsNotAService)
                    {
                        event = copy;
                    }
                    else
                    {
                        // But in a service we don't do any implicit branching.
                        return null;
                    }
                }
                else if (event == null)
                {
                    return null;
                }
            }
            return event;
        }
    }

    /*
     * Using old implementation 100% as is.
     */
    private MuleEvent doProcessFlowEndingWithOneWayEndpointReturnsNull(MuleEvent event)
        throws MessagingException
    {
        FlowConstruct flowConstruct = event.getFlowConstruct();
        MuleEvent currentEvent = event;
        MuleEvent resultEvent;
        MuleEvent copy = null;
        Iterator<MessageProcessor> processorIterator = processors.iterator();
        MessageProcessor processor = null;
        if (processorIterator.hasNext())
        {
            processor = processorIterator.next();
        }
        boolean resultWasNull = false;
        while (processor != null)
        {
            MessageProcessor nextProcessor = null;
            if (processorIterator.hasNext())
            {
                nextProcessor = processorIterator.next();
            }

            if (flowConstruct instanceof Flow && nextProcessor != null && processorMayReturnNull(processor))
            {
                copy = OptimizedRequestContext.criticalSetEvent(currentEvent);
            }

            resultEvent = messageProcessorExecutionTemplate.execute(processor, currentEvent);

            if (resultWasNull && processor instanceof RequestReplyReplierMessageProcessor)
            {
                // reply-to processing should not resurrect a dead event
                resultEvent = null;
            }

            if (resultEvent != null && !VoidMuleEvent.getInstance().equals(resultEvent))
            {
                resultWasNull = false;
                currentEvent = resultEvent;
            }
            else if (VoidMuleEvent.getInstance().equals(resultEvent))
            {
                if (flowConstruct instanceof Flow && nextProcessor != null)
                {
                    resultWasNull = true;
                    // // In a flow when a MessageProcessor returns null the next
                    // processor acts as an implicit
                    // // branch receiving a copy of the message used for previous
                    // MessageProcessor
                    if (copy != null)
                    {
                        currentEvent = copy;
                    }
                    else
                    {
                        // this should not happen
                        currentEvent = OptimizedRequestContext.criticalSetEvent(currentEvent);
                    }
                }
                else
                {
                    // But in a service we don't do any implicit branching.
                    return VoidMuleEvent.getInstance();
                }
            }
            else if (resultEvent == null)
            {
                return null;
            }
            processor = nextProcessor;
        }
        return currentEvent;
    }

    protected boolean processorMayReturnNull(MessageProcessor processor)
    {
        if (processor instanceof OutboundEndpoint)
        {
            MessageExchangePattern exchangePattern = ((OutboundEndpoint) processor).getExchangePattern();
            return exchangePattern == null ? true : !exchangePattern.hasResponse();
        }
        else if (processor instanceof Component || processor instanceof Transformer
                 || processor instanceof MessageFilter)
        {
            return false;
        }
        else
        {
            return true;
        }
    }

    @Override
    public void setMuleContext(MuleContext context)
    {
        super.setMuleContext(context);
    }
}
TOP

Related Classes of org.mule.processor.chain.DefaultMessageProcessorChain

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.