Custom Aggregator using bean

This example shows how to build and use a custom aggregator used in Camel routes.


Java class
import org.apache.camel.Exchange;
import org.apache.camel.processor.aggregate.AggregationStrategy;

 * A custom aggregation strategy that keeps the max value of a specified header
public class MaxAggregatorStrategy implements AggregationStrategy {
    private String m_headerName;
    private Object m_initValue;

    public String getHeaderName() {
        return m_headerName;

    public void setHeaderName(String headerName) {
        m_headerName = headerName;

    public void init(Exchange initExchange) {
        if (m_headerName == null || m_headerName.equals("")) {
        Object initValue = initExchange.getIn().getHeader(m_headerName);
        if (initValue instanceof Comparable) {
            m_initValue = initValue;

    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        if (m_headerName == null || m_headerName.equals("")) {
            // not configured, the new exchange wins
            return newExchange;

        Object oldHeader;
        if (oldExchange == null) {
            if (m_initValue != null) {
                // the first time with init value, the init value wins
                oldHeader = m_initValue;
                oldExchange = newExchange;
            } else {
                // the first time we (without init value) only have the new exchange so it wins the first round
                return newExchange;
        } else {
            oldHeader = oldExchange.getIn().getHeader(m_headerName);

        Object newHeader = newExchange.getIn().getHeader(m_headerName);

        if (!(oldHeader instanceof Comparable<?>) || !(newHeader instanceof Comparable<?>)) {
            // make sure that the headers are comparable. if not, the old exchange wins unmodified.
            return oldExchange;

        if (oldHeader instanceof String) {
            oldHeader = ((String) oldHeader).trim();

        if (newHeader instanceof String) {
            newHeader = ((String) newHeader).trim();

        if (((Comparable) oldHeader).compareTo(newHeader) < 0) {
            // keep the maximum value in the header
            oldExchange.getIn().setHeader(m_headerName, newHeader);
        } else {
            oldExchange.getIn().setHeader(m_headerName, oldHeader);

        return oldExchange;

Compile & package

You can also use your favorite IDE or tool (Maven, Ant, etc.) to compile and package the aggregator. This method only requires a Java 8 JDK installer on your environment.

Make sure you have the following directory structure in order for the following example to work:

- classes
- lib
  |- org.apache.camel.camel-core_2.16.2.jar
- src
- target 


javac -cp lib/org.apache.camel.camel-core_2.16.2.jar -d classes src/**

Build jar:

jar cvf target/camel-agg-max.jar -C classes/ .

Camel library

Create a library named maxagg and upload the jar file camel-agg-max.jar.

Camel connector

Create a connector, in order to configure the aggregator and specify the header to use to hold the aggregated value:

  • The connector name will be used to reference the corresponding bean from routes: we will use maxagg here.
  • Specify MaxAggregatorStrategy as class name, in library maxagg.
  • Configure the headerName property to be temp.

Camel route

Create the following test route, with name test:

<routes xmlns:u="" xmlns="">
        <from uri="timer://config?repeatCount=1" />

        <!-- Initial header value -->
        <setHeader headerName="temp">
        <!-- Init the aggregator, for example with a value from a previous run. The aggregator will read the configured header for its initial value. -->
        <to uri="bean:maxagg?method=init"/>

        <!-- Initial hardcoded body, to be split -->
            <simple>1, 5, 3</simple>
        <to uri="log:maxagg.test?level=WARN"/>
        <!-- Split the body, specifying the 'maxagg' aggregation strategy -->
        <split strategyRef="maxagg">
            <tokenize token=","/>
            <!-- Set split result into 'temp' header -->
            <setHeader headerName="temp">
        <!-- Log the header from the split aggregator -->
        <to uri="log:maxagg.test?level=WARN"/>

Output sample

Start the route test and check the logs to see the following output:

2013-01-24 22:34:36,671 [Camel (41-camel-2) thread #0 - timer://config] WARN maxagg.test - Exchange[ExchangePattern:InOnly, BodyType:String, Body:1, 5, 3]
2013-01-24 22:34:36,695 [Camel (41-camel-2) thread #0 - timer://config] WARN maxagg.test - Exchange[ExchangePattern:InOnly, BodyType:String, Body:5]

Related Links