Kafka

The product embeds a modified version of the official Apache Camel component for Apache Kafka. It synchronizes the message offsets in the storage preventing message loss upon restart. It's only compatible with Apache Kafka 0.10.1.0. Learn how to work with Kafka:

For more settings options, see the official Apache Camel documentation on Apache Kafka.

TLS configuration

Although it is not mandatory, we advise you to always configure TLS connection and forbid non-encrypted communication. If not configured:

  • data is not encrypted
  • without mutual authentication, you cannot ensure that connected clients are trusted

See How to configure SSL on a component in order to know how to configure the SSL connector.

How to write data into an Apache Kafka topic

Create the following route:

<routes xmlns="http://camel.apache.org/schema/spring" xmlns:u="http://www.systar.com/aluminium/camel-util">
    <route>
        <from uri="timer:clock?period=5s"/>
        <!-- By default Kafka only accepts String messages -->
        <setBody>
            <simple>Hello. It's now ${header.firedTime}</simple>
        </setBody>
        <!-- This header is needed to find the partition in which the message should be stored -->
        <!-- Usually it's the business key -->
        <setHeader headerName="kafka.KEY">
            <simple>tick-${header.firedTime}</simple>
        </setHeader>
        <to uri="kafka:{{KAFKA_SERVER}}?topic=tick&amp;securityProtocol=SSL&amp;sslTruststoreLocation={{sslTruststoreLocation}}&amp;sslTruststorePassword={{sslTruststorePassword}}&amp;sslKeyPassword={{sslKeyPassword}}&amp;sslKeystoreLocation={{sslKeystoreLocation}}&amp;sslKeystorePassword={{sslKeystorePassword}}" />
        <!-- Without TLS: <to uri="kafka:{{KAFKA_SERVER}}?topic=tick" /> -->
    </route>
</routes>

How to read data from an Apache Kafka topic

First, create a new state  to store the message offsets:

Then you can create the following route:

<routes xmlns="http://camel.apache.org/schema/spring" xmlns:u="http://www.systar.com/aluminium/camel-util">
    <route>
        <from uri="kafka:{{KAFKA_SERVER}}?groupId=reader&amp;topic=tick&amp;offsetRepository=#offsets&amp;securityProtocol=SSL&amp;sslTruststoreLocation={{sslTruststoreLocation}}&amp;sslTruststorePassword={{sslTruststorePassword}}&amp;sslKeyPassword={{sslKeyPassword}}&amp;sslKeystoreLocation={{sslKeystoreLocation}}&amp;sslKeystorePassword={{sslKeystorePassword}}"/>
        <!-- Without TLS: <to uri="kafka:{{KAFKA_SERVER}}?groupId=reader&amp;topic=tick&amp;offsetRepository=#offsets" /> -->
        <doTry>
            <log message="Just read: ${body}"/>
            <doCatch>
                <exception>java.lang.Exception</exception>
                <to uri="seda:error"/>
            </doCatch>
        </doTry>
    </route>
    
    <route>
        <from uri="seda:error"/>
        <!-- Proper error management should go here -->
        <log message="Something went wrong"/>
    </route>
</routes>

Tip: The offsetRepository parameter defines which Data Integration State to use in order to keep the offsets of the read messages.

Related Links