Decision Insight 20220523 (Latest) Save PDF Selected topic Selected topic and subtopics All content Custom Aggregator using bean This example shows how to build and use a custom aggregator used in Camel routes. Aggregator Java class MaxAggregatorStrategy.java import org.apache.camel.Exchange; import org.apache.camel.processor.aggregate.AggregationStrategy; /** * A custom aggregation strategy that keeps the max value of a specified header */ public class MaxAggregatorStrategy implements AggregationStrategy { private String m_headerName; private Object m_initValue; public String getHeaderName() { return m_headerName; } public void setHeaderName(String headerName) { m_headerName = headerName; } public void init(Exchange initExchange) { if (m_headerName == null || m_headerName.equals("")) { return; } Object initValue = initExchange.getIn().getHeader(m_headerName); if (initValue instanceof Comparable) { m_initValue = initValue; } } @Override public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { if (m_headerName == null || m_headerName.equals("")) { // not configured, the new exchange wins return newExchange; } Object oldHeader; if (oldExchange == null) { if (m_initValue != null) { // the first time with init value, the init value wins oldHeader = m_initValue; oldExchange = newExchange; } else { // the first time we (without init value) only have the new exchange so it wins the first round return newExchange; } } else { oldHeader = oldExchange.getIn().getHeader(m_headerName); } Object newHeader = newExchange.getIn().getHeader(m_headerName); if (!(oldHeader instanceof Comparable<?>) || !(newHeader instanceof Comparable<?>)) { // make sure that the headers are comparable. if not, the old exchange wins unmodified. return oldExchange; } if (oldHeader instanceof String) { oldHeader = ((String) oldHeader).trim(); } if (newHeader instanceof String) { newHeader = ((String) newHeader).trim(); } if (((Comparable) oldHeader).compareTo(newHeader) < 0) { // keep the maximum value in the header oldExchange.getIn().setHeader(m_headerName, newHeader); } else { oldExchange.getIn().setHeader(m_headerName, oldHeader); } return oldExchange; } } Compile & package You can also use your favorite IDE or tool (Maven, Ant, etc.) to compile and package the aggregator. This method only requires a Java 10 JDK installer on your environment. Make sure you have the following directory structure in order for the following example to work: - classes - lib |- org.apache.camel.camel-core_2.16.2.jar - src |- MaxAggregatorStrategy.java - target Compile: javac -cp lib/org.apache.camel.camel-core_2.16.2.jar -d classes src/** Build jar: jar cvf target/camel-agg-max.jar -C classes/ . Camel library Create a library named maxagg and upload the jar file camel-agg-max.jar. Camel connector Create a connector, in order to configure the aggregator and specify the header to use to hold the aggregated value: The connector name will be used to reference the corresponding bean from routes: we will use maxagg here. Specify MaxAggregatorStrategy as class name, in library maxagg. Configure the header name property to be temp. Camel route Create the following test route, with name test: route.xml <routes xmlns:u="http://www.systar.com/aluminium/camel-util" xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="timer://config?repeatCount=1" /> <!-- Initial header value --> <setHeader name="temp"> <constant>4</constant> </setHeader> <!-- Init the aggregator, for example with a value from a previous run. The aggregator will read the configured header for its initial value. --> <to uri="bean:maxagg?method=init"/> <!-- Initial hardcoded body, to be split --> <setBody> <simple>1, 5, 3</simple> </setBody> <to uri="log:maxagg.test?level=WARN"/> <!-- Split the body, specifying the 'maxagg' aggregation strategy --> <split strategyRef="maxagg"> <tokenize token=","/> <!-- Set split result into 'temp' header --> <setHeader name="temp"> <simple>body</simple> </setHeader> </split> <!-- Log the header from the split aggregator --> <setBody> <simple>header.temp</simple> </setBody> <to uri="log:maxagg.test?level=WARN"/> </route> </routes> Output sample Start the route test and check the logs to see the following output: 2013-01-24 22:34:36,671 [Camel (41-camel-2) thread #0 - timer://config] WARN maxagg.test - Exchange[ExchangePattern:InOnly, BodyType:String, Body:1, 5, 3] 2013-01-24 22:34:36,695 [Camel (41-camel-2) thread #0 - timer://config] WARN maxagg.test - Exchange[ExchangePattern:InOnly, BodyType:String, Body:5] Related Links
Custom Aggregator using bean This example shows how to build and use a custom aggregator used in Camel routes. Aggregator Java class MaxAggregatorStrategy.java import org.apache.camel.Exchange; import org.apache.camel.processor.aggregate.AggregationStrategy; /** * A custom aggregation strategy that keeps the max value of a specified header */ public class MaxAggregatorStrategy implements AggregationStrategy { private String m_headerName; private Object m_initValue; public String getHeaderName() { return m_headerName; } public void setHeaderName(String headerName) { m_headerName = headerName; } public void init(Exchange initExchange) { if (m_headerName == null || m_headerName.equals("")) { return; } Object initValue = initExchange.getIn().getHeader(m_headerName); if (initValue instanceof Comparable) { m_initValue = initValue; } } @Override public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { if (m_headerName == null || m_headerName.equals("")) { // not configured, the new exchange wins return newExchange; } Object oldHeader; if (oldExchange == null) { if (m_initValue != null) { // the first time with init value, the init value wins oldHeader = m_initValue; oldExchange = newExchange; } else { // the first time we (without init value) only have the new exchange so it wins the first round return newExchange; } } else { oldHeader = oldExchange.getIn().getHeader(m_headerName); } Object newHeader = newExchange.getIn().getHeader(m_headerName); if (!(oldHeader instanceof Comparable<?>) || !(newHeader instanceof Comparable<?>)) { // make sure that the headers are comparable. if not, the old exchange wins unmodified. return oldExchange; } if (oldHeader instanceof String) { oldHeader = ((String) oldHeader).trim(); } if (newHeader instanceof String) { newHeader = ((String) newHeader).trim(); } if (((Comparable) oldHeader).compareTo(newHeader) < 0) { // keep the maximum value in the header oldExchange.getIn().setHeader(m_headerName, newHeader); } else { oldExchange.getIn().setHeader(m_headerName, oldHeader); } return oldExchange; } } Compile & package You can also use your favorite IDE or tool (Maven, Ant, etc.) to compile and package the aggregator. This method only requires a Java 10 JDK installer on your environment. Make sure you have the following directory structure in order for the following example to work: - classes - lib |- org.apache.camel.camel-core_2.16.2.jar - src |- MaxAggregatorStrategy.java - target Compile: javac -cp lib/org.apache.camel.camel-core_2.16.2.jar -d classes src/** Build jar: jar cvf target/camel-agg-max.jar -C classes/ . Camel library Create a library named maxagg and upload the jar file camel-agg-max.jar. Camel connector Create a connector, in order to configure the aggregator and specify the header to use to hold the aggregated value: The connector name will be used to reference the corresponding bean from routes: we will use maxagg here. Specify MaxAggregatorStrategy as class name, in library maxagg. Configure the header name property to be temp. Camel route Create the following test route, with name test: route.xml <routes xmlns:u="http://www.systar.com/aluminium/camel-util" xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="timer://config?repeatCount=1" /> <!-- Initial header value --> <setHeader name="temp"> <constant>4</constant> </setHeader> <!-- Init the aggregator, for example with a value from a previous run. The aggregator will read the configured header for its initial value. --> <to uri="bean:maxagg?method=init"/> <!-- Initial hardcoded body, to be split --> <setBody> <simple>1, 5, 3</simple> </setBody> <to uri="log:maxagg.test?level=WARN"/> <!-- Split the body, specifying the 'maxagg' aggregation strategy --> <split strategyRef="maxagg"> <tokenize token=","/> <!-- Set split result into 'temp' header --> <setHeader name="temp"> <simple>body</simple> </setHeader> </split> <!-- Log the header from the split aggregator --> <setBody> <simple>header.temp</simple> </setBody> <to uri="log:maxagg.test?level=WARN"/> </route> </routes> Output sample Start the route test and check the logs to see the following output: 2013-01-24 22:34:36,671 [Camel (41-camel-2) thread #0 - timer://config] WARN maxagg.test - Exchange[ExchangePattern:InOnly, BodyType:String, Body:1, 5, 3] 2013-01-24 22:34:36,695 [Camel (41-camel-2) thread #0 - timer://config] WARN maxagg.test - Exchange[ExchangePattern:InOnly, BodyType:String, Body:5]