Custom Aggregator using bean

This example shows how to build and use a custom aggregator used in Camel routes.

Aggregator

Java class

MaxAggregatorStrategy.java
import org.apache.camel.Exchange;
import org.apache.camel.processor.aggregate.AggregationStrategy;

/**
 * A custom aggregation strategy that keeps the max value of a specified header
 */
public class MaxAggregatorStrategy implements AggregationStrategy {
    private String m_headerName;
    private Object m_initValue;

    public String getHeaderName() {
        return m_headerName;
    }

    public void setHeaderName(String headerName) {
        m_headerName = headerName;
    }

    public void init(Exchange initExchange) {
        if (m_headerName == null || m_headerName.equals("")) {
            return;
        }
        Object initValue = initExchange.getIn().getHeader(m_headerName);
        if (initValue instanceof Comparable) {
            m_initValue = initValue;
        }
    }

    @Override
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        if (m_headerName == null || m_headerName.equals("")) {
            // not configured, the new exchange wins
            return newExchange;
        }

        Object oldHeader;
        if (oldExchange == null) {
            if (m_initValue != null) {
                // the first time with init value, the init value wins
                oldHeader = m_initValue;
                oldExchange = newExchange;
            } else {
                // the first time we (without init value) only have the new exchange so it wins the first round
                return newExchange;
            }
        } else {
            oldHeader = oldExchange.getIn().getHeader(m_headerName);
        }

        Object newHeader = newExchange.getIn().getHeader(m_headerName);

        if (!(oldHeader instanceof Comparable<?>) || !(newHeader instanceof Comparable<?>)) {
            // make sure that the headers are comparable. if not, the old exchange wins unmodified.
            return oldExchange;
        }

        if (oldHeader instanceof String) {
            oldHeader = ((String) oldHeader).trim();
        }

        if (newHeader instanceof String) {
            newHeader = ((String) newHeader).trim();
        }

        if (((Comparable) oldHeader).compareTo(newHeader) < 0) {
            // keep the maximum value in the header
            oldExchange.getIn().setHeader(m_headerName, newHeader);
        } else {
            oldExchange.getIn().setHeader(m_headerName, oldHeader);
        }

        return oldExchange;
    }
}

Compile & package

You can also use your favorite IDE or tool (Maven, Ant, etc.) to compile and package the aggregator. This method only requires a Java 8 JDK installer on your environment.

Make sure you have the following directory structure in order for the following example to work:

- classes
- lib
  |- org.apache.camel.camel-core_2.16.2.jar
- src
  |- MaxAggregatorStrategy.java
- target 

Compile:

javac -cp lib/org.apache.camel.camel-core_2.16.2.jar -d classes src/**

Build jar:

jar cvf target/camel-agg-max.jar -C classes/ .

Camel library

Create a library named maxagg and upload the jar file camel-agg-max.jar.

Camel connector

Create a connector, in order to configure the aggregator and specify the header to use to hold the aggregated value:

  • The connector name will be used to reference the corresponding bean from routes: we will use maxagg here.
  • Specify MaxAggregatorStrategy as class name, in library maxagg.
  • Configure the headerName property to be temp.

Camel route

Create the following test route, with name test:

route.xml
<routes xmlns:u="http://www.systar.com/aluminium/camel-util" xmlns="http://camel.apache.org/schema/spring">
    <route>
        <from uri="timer://config?repeatCount=1" />

        <!-- Initial header value -->
        <setHeader headerName="temp">
            <constant>4</constant>        
        </setHeader>
        <!-- Init the aggregator, for example with a value from a previous run. The aggregator will read the configured header for its initial value. -->
        <to uri="bean:maxagg?method=init"/>

        <!-- Initial hardcoded body, to be split -->
        <setBody>
            <simple>1, 5, 3</simple>
        </setBody>
        <to uri="log:maxagg.test?level=WARN"/>
        
        <!-- Split the body, specifying the 'maxagg' aggregation strategy -->
        <split strategyRef="maxagg">
            <tokenize token=","/>
            <!-- Set split result into 'temp' header -->
            <setHeader headerName="temp">
                <simple>body</simple>        
            </setHeader>
        </split>
        
        <!-- Log the header from the split aggregator -->
        <setBody> 
            <simple>header.temp</simple>
        </setBody>
        <to uri="log:maxagg.test?level=WARN"/>
   </route>
</routes>

Output sample

Start the route test and check the logs to see the following output:

2013-01-24 22:34:36,671 [Camel (41-camel-2) thread #0 - timer://config] WARN maxagg.test - Exchange[ExchangePattern:InOnly, BodyType:String, Body:1, 5, 3]
2013-01-24 22:34:36,695 [Camel (41-camel-2) thread #0 - timer://config] WARN maxagg.test - Exchange[ExchangePattern:InOnly, BodyType:String, Body:5]

Related Links