How to ensure message resiliency

Overview

A resilient system is a system that doesn't lose any messages consumed from external sources even in the unfortunate case of node or server crashes.

This page describes a pattern for consuming messages that prevents data loss.

Tip: If you need to ensure the resiliency of a large number messages, see Integration node pattern.


The key idea of this pattern is to store each message on the disk before acknowledging it to the external source. It's then possible to read the files and use data integration states to keep track of the processed files. Thanks to the states, you can know which files were successfully processed before the last checkpoint and then reprocess the other ones.


How to deploy a resilient system

Receiving route

This route retrieves the data from the external source, persists it to disk and then acknowledges it.

In this example, we're using a JMS source but it can be any other source.

Keep in mind that only persisted data will be available in the processing route. For instance, if you need some headers or properties, persist them as well.

<routes xmlns="http://camel.apache.org/schema/spring" xmlns:u="http://www.systar.com/aluminium/camel-util">
	<route>
		<!-- Read from JMS -->
		<from uri="jms:queue:test?connectionFactory=activemq"/>
 
        <!-- Create a unique file name -->
		<setHeader headerName="fileName">
			<groovy>def instant = java.time.Instant.now(); sprintf('data-%08X%08X', instant.epochSecond, instant.nano)</groovy>
		</setHeader>
 
		<!-- Serialize and write to disk -->
		<marshal>
			<serialization/>
		</marshal>
		<to uri="file:{{RESILIENCY_DIR}}?fileName=${header.fileName}"/>
	</route>
</routes> 
Line number Description
Line 8 Generate a unique file name for each message. To ensure messages are received in the correct order, generate a name based on the reception instant with a nanosecond precision. If you don't need to enforce the order of messages, then you can remove this <setHeader> block.
Line 14 Serialize the body of the exchange using the standard java serialization. You can also choose another serialization if you prefer.
Line 17 Persist the file with the proper name into the RESILIENCY_DIR defined in properties (using the "/" separator even with Windows).

When using JMS, the message is automatically acknowledged at the end of the processing (here, after the <to uri="file..." />). Most of the sources work the same way.


Processing route

Since the file is now safely persisted to disk, you can process it in another route

First, create a new data integration state to avoid processing the files over and over since you cannot move them or delete them. Name the State processed and use a deletion after 15 minutes without querying it (see How to configure states).

<routes xmlns:u="http://www.systar.com/aluminium/camel-util" xmlns="http://camel.apache.org/schema/spring">
	<route>
        <!-- Read from the directory -->
		<from uri="file:{{RESILIENCY_DIR}}?noop=true&amp;idempotentRepository=#processed&amp;idempotentKey=${file:onlyname}" />
 
		<!-- Deserialize it -->
		<unmarshal>
			<serialization />
		</unmarshal>
 
        <!-- Process the message -->
        <!-- ... -->
    </route>
</routes> 
Line number Description
Line 5

Poll the directory using the following options:

  • noop=true indicates that the file must not be deleted or moved.
  • idempotentRepository=#processed indicates that you're using the data integration state named processed to avoid processing the same files over and over.
  • idempotentKey=${file:onlyname} indicates that instead of saving the whole file path in the State we only save its name.
Line 8 Deserialize the message using the same data format as in the receiving route.
Line 14 This is the message processing itself with all its logic and absorption.

You can also to define both routes into a single routing context.

Cleaning route

Since you're writing to disk, perform a clean-up and remove the oldest files when you're sure that messages have been processed and persisted into a checkpoint.

Doing a clean-up prevents two things:

  1. Running out of disk.
  2. Slowing down the process due to the huge number of files to scan each time.


Be aware that if misconfigured, this route can either drop other important files or break the resiliency. For all these reasons it's best to keep this route in a dedicated Routing context.


The following route will poll the contents of the directory every hour and remove the files that are older than four hours.

This 4-hour value is only an example. Find the sweet spot for your particular application. Here are the rules and consequences to be aware of:

The value must be at least twice the period of the automatic checkpoint (30 minutes by default) to ensure that content will be absorbed into at least one checkpoint:

  • A small value increases the risk of deleting unprocessed files, for instance:
    • If the processing route is stopped but the cleaning route isn't.
    • If you have many pending message files then you can delete a file before processing it.
  • A high value decreases the performance of the application:
    • You'll lose disk space for files that are already processed and secured in a checkpoint.
    • Scanning files again and again  (even if they're ignored because already processed) takes time.

Be aware that if you stop your node more often than the retention period you must not start this route upon startup because it will delete messages before having a chance to process them.

In order to ease the configuration, we're defining a property named DELETION_THRESHOLD_HOUR with a value of 4.

<routes xmlns:u="http://www.systar.com/aluminium/camel-util" xmlns="http://camel.apache.org/schema/spring">
	<route>
		<!-- Runs every hour and the first run is one hour after startup -->
		<from uri="timer:clean?period=3600s&amp;delay=3600s" />
 
		<!-- List the files of the directory and remove the ones that are older than the threshold -->
		<setBody>
			<groovy>
				def now = System.currentTimeMillis()
				def threshold = java.util.concurrent.TimeUnit.HOURS.toMillis({{DELETION_THRESHOLD_HOUR}})
				def files = new File("{{RESILIENCY_DIR}}").listFiles().findAll { f -> now - f.lastModified() > threshold }
				files.each { f -> f.delete() }
				files
			</groovy>
		</setBody>
	</route>
</routes>

Related Links