Package org.apache.camel.processor

Source Code of org.apache.camel.processor.Enricher$CopyAggregationStrategy

/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements.  See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License.  You may obtain a copy of the License at
*
*      http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.processor;

import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.impl.DefaultExchange;
import org.apache.camel.impl.ServiceSupport;
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import static org.apache.camel.util.ExchangeHelper.copyResultsPreservePattern;

/**
* A content enricher that enriches input data by first obtaining additional
* data from a <i>resource</i> represented by an endpoint <code>producer</code>
* and second by aggregating input data and additional data. Aggregation of
* input data and additional data is delegated to an {@link AggregationStrategy}
* object.
*/
public class Enricher extends ServiceSupport implements Processor {

    private static final transient Log LOG = LogFactory.getLog(Enricher.class);
    private AggregationStrategy aggregationStrategy;
    private Producer producer;

    /**
     * Creates a new {@link Enricher}. The default aggregation strategy is to
     * copy the additional data obtained from the enricher's resource over the
     * input data. When using the copy aggregation strategy the enricher
     * degenerates to a normal transformer.
     *
     * @param producer producer to resource endpoint.
     */
    public Enricher(Producer producer) {
        this(defaultAggregationStrategy(), producer);
    }

    /**
     * Creates a new {@link Enricher}.
     *
     * @param aggregationStrategy  aggregation strategy to aggregate input data and additional data.
     * @param producer producer to resource endpoint.
     */
    public Enricher(AggregationStrategy aggregationStrategy, Producer producer) {
        this.aggregationStrategy = aggregationStrategy;
        this.producer = producer;
    }

    /**
     * Sets the aggregation strategy for this enricher.
     *
     * @param aggregationStrategy the aggregationStrategy to set
     */
    public void setAggregationStrategy(AggregationStrategy aggregationStrategy) {
        this.aggregationStrategy = aggregationStrategy;
    }

    /**
     * Sets the default aggregation strategy for this enricher.
     */
    public void setDefaultAggregationStrategy() {
        this.aggregationStrategy = defaultAggregationStrategy();
    }

    /**
     * Enriches the input data (<code>exchange</code>) by first obtaining
     * additional data from an endpoint represented by an endpoint
     * <code>producer</code> and second by aggregating input data and additional
     * data. Aggregation of input data and additional data is delegated to an
     * {@link AggregationStrategy} object set at construction time. If the
     * message exchange with the resource endpoint fails then no aggregation
     * will be done and the failed exchange content is copied over to the
     * original message exchange.
     *
     * @param exchange input data.
     */
    public void process(Exchange exchange) throws Exception {
        Exchange resourceExchange = createResourceExchange(exchange, ExchangePattern.InOut);
        producer.process(resourceExchange);

        if (resourceExchange.isFailed()) {
            // copy resource exchange onto original exchange (preserving pattern)
            copyResultsPreservePattern(exchange, resourceExchange);
        } else {
            prepareResult(exchange);

            // aggregate original exchange and resource exchange
            // but do not aggregate if the resource exchange was filtered
            Boolean filtered = resourceExchange.getProperty(Exchange.FILTERED, Boolean.class);
            if (filtered == null || !filtered) {
                Exchange aggregatedExchange = aggregationStrategy.aggregate(exchange, resourceExchange);
                // copy aggregation result onto original exchange (preserving pattern)
                copyResultsPreservePattern(exchange, aggregatedExchange);
            } else {
                if (LOG.isTraceEnabled()) {
                    LOG.trace("Cannot aggregate exchange as its filtered: " + resourceExchange);
                }
            }
        }
    }

    /**
     * Creates a new {@link DefaultExchange} instance from the given
     * <code>exchange</code>. The resulting exchange's pattern is defined by
     * <code>pattern</code>.
     *
     * @param source  exchange to copy from.
     * @param pattern exchange pattern to set.
     * @return created exchange.
     */
    protected Exchange createResourceExchange(Exchange source, ExchangePattern pattern) {
        DefaultExchange target = new DefaultExchange(source.getContext());
        target.copyFrom(source);
        target.setPattern(pattern);
        return target;
    }

    private static void prepareResult(Exchange exchange) {
        if (exchange.getPattern().isOutCapable()) {
            exchange.getOut().copyFrom(exchange.getIn());
        }
    }

    private static AggregationStrategy defaultAggregationStrategy() {
        return new CopyAggregationStrategy();
    }

    protected void doStart() throws Exception {
        producer.start();
    }

    protected void doStop() throws Exception {
        producer.stop();
    }

    private static class CopyAggregationStrategy implements AggregationStrategy {

        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
            copyResultsPreservePattern(oldExchange, newExchange);
            return oldExchange;
        }

    }

}
TOP

Related Classes of org.apache.camel.processor.Enricher$CopyAggregationStrategy

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.