/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.processor;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.impl.DefaultExchange;
import org.apache.camel.impl.ServiceSupport;
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import static org.apache.camel.util.ExchangeHelper.copyResultsPreservePattern;
/**
* A content enricher that enriches input data by first obtaining additional
* data from a <i>resource</i> represented by an endpoint <code>producer</code>
* and second by aggregating input data and additional data. Aggregation of
* input data and additional data is delegated to an {@link AggregationStrategy}
* object.
*/
public class Enricher extends ServiceSupport implements Processor {
private static final transient Log LOG = LogFactory.getLog(Enricher.class);
private AggregationStrategy aggregationStrategy;
private Producer producer;
/**
* Creates a new {@link Enricher}. The default aggregation strategy is to
* copy the additional data obtained from the enricher's resource over the
* input data. When using the copy aggregation strategy the enricher
* degenerates to a normal transformer.
*
* @param producer producer to resource endpoint.
*/
public Enricher(Producer producer) {
this(defaultAggregationStrategy(), producer);
}
/**
* Creates a new {@link Enricher}.
*
* @param aggregationStrategy aggregation strategy to aggregate input data and additional data.
* @param producer producer to resource endpoint.
*/
public Enricher(AggregationStrategy aggregationStrategy, Producer producer) {
this.aggregationStrategy = aggregationStrategy;
this.producer = producer;
}
/**
* Sets the aggregation strategy for this enricher.
*
* @param aggregationStrategy the aggregationStrategy to set
*/
public void setAggregationStrategy(AggregationStrategy aggregationStrategy) {
this.aggregationStrategy = aggregationStrategy;
}
/**
* Sets the default aggregation strategy for this enricher.
*/
public void setDefaultAggregationStrategy() {
this.aggregationStrategy = defaultAggregationStrategy();
}
/**
* Enriches the input data (<code>exchange</code>) by first obtaining
* additional data from an endpoint represented by an endpoint
* <code>producer</code> and second by aggregating input data and additional
* data. Aggregation of input data and additional data is delegated to an
* {@link AggregationStrategy} object set at construction time. If the
* message exchange with the resource endpoint fails then no aggregation
* will be done and the failed exchange content is copied over to the
* original message exchange.
*
* @param exchange input data.
*/
public void process(Exchange exchange) throws Exception {
Exchange resourceExchange = createResourceExchange(exchange, ExchangePattern.InOut);
producer.process(resourceExchange);
if (resourceExchange.isFailed()) {
// copy resource exchange onto original exchange (preserving pattern)
copyResultsPreservePattern(exchange, resourceExchange);
} else {
prepareResult(exchange);
// aggregate original exchange and resource exchange
// but do not aggregate if the resource exchange was filtered
Boolean filtered = resourceExchange.getProperty(Exchange.FILTERED, Boolean.class);
if (filtered == null || !filtered) {
Exchange aggregatedExchange = aggregationStrategy.aggregate(exchange, resourceExchange);
// copy aggregation result onto original exchange (preserving pattern)
copyResultsPreservePattern(exchange, aggregatedExchange);
} else {
if (LOG.isTraceEnabled()) {
LOG.trace("Cannot aggregate exchange as its filtered: " + resourceExchange);
}
}
}
}
/**
* Creates a new {@link DefaultExchange} instance from the given
* <code>exchange</code>. The resulting exchange's pattern is defined by
* <code>pattern</code>.
*
* @param source exchange to copy from.
* @param pattern exchange pattern to set.
* @return created exchange.
*/
protected Exchange createResourceExchange(Exchange source, ExchangePattern pattern) {
DefaultExchange target = new DefaultExchange(source.getContext());
target.copyFrom(source);
target.setPattern(pattern);
return target;
}
private static void prepareResult(Exchange exchange) {
if (exchange.getPattern().isOutCapable()) {
exchange.getOut().copyFrom(exchange.getIn());
}
}
private static AggregationStrategy defaultAggregationStrategy() {
return new CopyAggregationStrategy();
}
protected void doStart() throws Exception {
producer.start();
}
protected void doStop() throws Exception {
producer.stop();
}
private static class CopyAggregationStrategy implements AggregationStrategy {
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
copyResultsPreservePattern(oldExchange, newExchange);
return oldExchange;
}
}
}