How to group absorptions

Learn how to group absorptions.

 

Camel aggregation

See Aggregator documentation for the options list regarding the completion and correlation. In the examples below we use constant correlation key in order to create one and only one group.

To use aggregation, you need to provide an implementation with a reference for your route.
To do that, you must define a connector with:

  • A name – Used in the strategyRef of your aggregate
  • A class name – The implementation of your strategy. you can use the  GroupedExchangeAggregationStrategy

Current implementation of the tnd-absorption used the specific property named CamelGroupedExchange (list property of the GroupedExchangeAggregationStrategy)

If you want to use another strategy, you need to copy the result of your strategy in a property named CamelGroupedExchange before the tnd-absorption call

Connector configuration example:

Class name: org.apache.camel.processor.aggregate.GroupedExchangeAggregationStrategy

<aggregate completionSize="2" strategyRef="groupedStrategy">...</aggregate>

Using a split and an aggregation

<routes xmlns="http://camel.apache.org/schema/spring">
    <route>
        <from uri="direct:test"/>
        <split>
            <simple>${body}</simple>
            <aggregate completionSize="2" strategyRef="groupedStrategy">
                <correlationExpression>
                    <constant>SINGLE_GROUP</constant>
                </correlationExpression>
                <to uri="tnd-absorption:myMapping"/>
            </aggregate>
        </split>
    </route>
</routes>

This context splits the body (it assumes it was a collection) into several messages and absorbs them 2 by 2.

Using several messages

<routes xmlns="http://camel.apache.org/schema/spring">
    <route>
        <from uri="direct:test"/>
        <aggregate completionSize="2" strategyRef="groupedStrategy">
            <correlationExpression>
                <constant>SINGLE_GROUP</constant>
            </correlationExpression>
            <to uri="tnd-absorption:myMapping"/>
        </aggregate>
    </route>
</routes>

This context receives messages and once it has received 2 of them, it absorb them in one chunk.

Using several calls for one message

<routes xmlns="http://camel.apache.org/schema/spring">
    <route>
        <from uri="direct:test"/>
        <setProperty propertyName="tnd-absorption-mapping-name">
            <constant>myFirstMapping</constant>
        </setProperty>
        <to uri="direct:agg"/>

        <setProperty propertyName="tnd-absorption-mapping-name">
            <constant>mySecondMapping</constant>
        </setProperty>
        <to uri="direct:agg"/>
    </route>

    <route>
        <from uri="direct:agg"/>
        <aggregate completionSize="2" strategyRef="groupedStrategy">
            <correlationExpression>
                <constant>SINGLE_GROUP</constant>
            </correlationExpression>
            <to uri="tnd-absorption:dynamic"/>
        </aggregate>
    </route>
</routes>

This context uses the same message and dispatches it to two endpoints that will be absorbed in one chunk.

When using Aggregation to optimize absorption, take care of completion

By default, the aggregation waits indefinitely while it is not completed.

Option

Description

completionSize

Number of messages aggregated before the aggregation is complete. This option can be set as either a fixed value or using an Expression which allows you to evaluate a size dynamically - will use Integer as result. If both are set Camel will fallback to use the fixed value if the Expression result was null or 0.

completionTimeout

Time in millis that an aggregated exchange should be inactive before its complete. This option can be set as either a fixed value or using an Expression which allows you to evaluate a timeout dynamically - will use Long as result. If both are set Camel will fallback to use the fixed value if the Expression result was null or 0. You cannot use this option together with completionInterval, only one of the two can be used.

completionInterval

A repeating period in millis by which the aggregator will complete all current aggregated exchanges. Camel has a background task which is triggered every period. You cannot use this option together with completionTimeout, only one of them can be used.

completionPredicate

A Predicate to indicate when an aggregated exchange is complete.

completionFromBatchConsumer

False by default. This option is if the exchanges are coming from a Batch Consumer. Then when enabled the Aggregator2 will use the batch size determined by the Batch Consumer in the message header CamelBatchSize. See more details at Batch Consumer. This can be used to aggregate all files consumed from a File endpoint in that given poll.

forceCompletionOnStop

False by default. Camel 2.9 Indicates to complete all current aggregated exchanges when the context is stopped

Note: When stopping the route, camel doesn't wait (timeout, ...)

Sample:

Sample
<routes xmlns="http://camel.apache.org/schema/spring" xmlns:u="http://www.systar.com/aluminium/camel-util">
    <route>
        <from uri="timer:foo?repeatCount=1"/>
        <log message="First tick"/>
        <aggregate completionSize="2" strategyRef="groupedStrategy">
            <correlationExpression>
                    <constant>SINGLE_GROUP</constant>
                </correlationExpression>
            <log message="It's aggregated"/>
        </aggregate>
    </route>
</routes>

The will never have "It's aggregated" in the logs, even on stop. The workaround is:

Workaround
<aggregate completionSize="2" strategyRef="groupedStrategy" forceCompletionOnStop="true">
</aggregate>

Absorption fallback when group fails

<routes xmlns="http://camel.apache.org/schema/spring">
    <route>
        <from uri="timer:test?repeatCount=1"/>
        <!-- Create message content -->
        <setBody>
            <groovy>
                ["PAY1", 10, "PAY3"] //PAY1 fail, 10 success, PAY3 fail
            </groovy>
        </setBody>
        <split>
            <!-- Get one by one -->
            <simple>${body}</simple>
            <!-- Aggregate by 3 -->
            <aggregate completionSize="3" strategyRef="groupedStrategy">
                <correlationExpression>
                    <constant>SINGLE_GROUP</constant>
                </correlationExpression>
                <!-- Try to absorb in group -->
                <doTry>
                    <to uri="tnd-absorption:P"/>
                    <!-- Catch absorption fail and try to absorb one by one -->
                    <doCatch>
                        <exception>java.lang.Exception</exception>
                        <split>
                            <!-- Split the group exchange container -->
                            <simple>${property.CamelGroupedExchange}</simple>
                            <!-- We need to extract the real body of the split Group Exchange, because data are embedded inside another Exchange -->
                            <setBody>
                                <groovy>request.getBody().getIn().getBody()</groovy>
                            </setBody>
                            <!-- Remove the grouped exchange property because its duplicated in each exchange -->
                            <removeProperty propertyName="CamelGroupedExchange"/>
                            <!-- Absorb the message -->
                            <to uri="tnd-absorption:P"/>
                        </split>
                    </doCatch>
                </doTry>
            </aggregate>
        </split>
    </route>
</routes>

 

 

Related Links