Exactly-Once Processing End-to-End: Achieving Reliable Data Integrity in Apache Kafka

Explore the intricacies of implementing exactly-once processing in Apache Kafka, ensuring data integrity and reliability across the entire data pipeline.

13.4.1 Exactly-Once Processing End-to-End

Introduction

In the realm of distributed systems, ensuring data integrity and consistency is paramount. Apache Kafka, a cornerstone of modern data architectures, offers robust mechanisms to achieve exactly-once processing (EOP) across the entire data pipeline. This capability is crucial for applications where data accuracy is non-negotiable, such as financial transactions, inventory management, and real-time analytics. This section delves into the intricacies of implementing exactly-once processing in Kafka, providing a comprehensive guide for expert software engineers and enterprise architects.

Understanding Exactly-Once Processing

Exactly-once processing ensures that each message is processed precisely once, eliminating the risks of data duplication or loss. This guarantee is achieved through a combination of Kafka’s transactional messaging capabilities and careful coordination between producers, brokers, and consumers.

Key Concepts

  • Transactions: A transaction in Kafka is a sequence of operations that are treated as a single unit. Kafka ensures that all operations within a transaction are either committed or aborted, maintaining data consistency.
  • Transaction IDs: Unique identifiers used to track and manage transactions across producers and consumers.
  • Idempotency: The property that ensures repeated operations have the same effect as a single operation, crucial for achieving exactly-once semantics.

Roles of Producers, Brokers, and Consumers

Producers

Producers are responsible for sending messages to Kafka topics. In exactly-once processing, producers must be configured to support idempotent message production and transactions.

  • Idempotent Producers: Ensure that duplicate messages are not produced, even if a retry occurs due to network failures or other issues.
  • Transactional Producers: Use transaction IDs to group messages into transactions, ensuring atomicity.

Brokers

Kafka brokers manage the storage and delivery of messages. They play a critical role in coordinating transactions and ensuring message durability.

  • Transaction Coordinator: A broker component that manages the state of transactions, ensuring that all messages in a transaction are committed or aborted consistently.
  • Log Compaction: Helps maintain data integrity by removing duplicate records and retaining only the latest version of each key.

Consumers

Consumers read messages from Kafka topics. In exactly-once processing, consumers must handle message offsets and commits carefully to avoid processing duplicates.

  • Consumer Groups: Ensure load balancing and fault tolerance by distributing message consumption across multiple instances.
  • Transactional Consumers: Use transaction IDs to commit offsets atomically, ensuring that messages are processed exactly once.

Implementing Exactly-Once Processing

Configuring Producers

To enable exactly-once processing, configure producers to be idempotent and transactional. This involves setting specific properties and handling transactions programmatically.

Java Example:

 1import org.apache.kafka.clients.producer.KafkaProducer;
 2import org.apache.kafka.clients.producer.ProducerConfig;
 3import org.apache.kafka.clients.producer.ProducerRecord;
 4import org.apache.kafka.clients.producer.RecordMetadata;
 5import org.apache.kafka.common.serialization.StringSerializer;
 6
 7import java.util.Properties;
 8
 9public class ExactlyOnceProducer {
10    public static void main(String[] args) {
11        Properties props = new Properties();
12        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
13        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
14        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
15        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // Enable idempotence
16        props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactional-producer-1"); // Set transactional ID
17
18        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
19        producer.initTransactions(); // Initialize transactions
20
21        try {
22            producer.beginTransaction(); // Begin transaction
23            producer.send(new ProducerRecord<>("my-topic", "key", "value"));
24            producer.commitTransaction(); // Commit transaction
25        } catch (Exception e) {
26            producer.abortTransaction(); // Abort transaction on error
27        } finally {
28            producer.close();
29        }
30    }
31}

Scala Example:

 1import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
 2import org.apache.kafka.common.serialization.StringSerializer
 3
 4import java.util.Properties
 5
 6object ExactlyOnceProducer {
 7  def main(args: Array[String]): Unit = {
 8    val props = new Properties()
 9    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
10    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
11    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
12    props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")
13    props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactional-producer-1")
14
15    val producer = new KafkaProducer[String, String](props)
16    producer.initTransactions()
17
18    try {
19      producer.beginTransaction()
20      producer.send(new ProducerRecord[String, String]("my-topic", "key", "value"))
21      producer.commitTransaction()
22    } catch {
23      case e: Exception =>
24        producer.abortTransaction()
25    } finally {
26      producer.close()
27    }
28  }
29}

Kotlin Example:

 1import org.apache.kafka.clients.producer.KafkaProducer
 2import org.apache.kafka.clients.producer.ProducerConfig
 3import org.apache.kafka.clients.producer.ProducerRecord
 4import org.apache.kafka.common.serialization.StringSerializer
 5
 6fun main() {
 7    val props = Properties().apply {
 8        put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
 9        put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java.name)
10        put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java.name)
11        put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true)
12        put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactional-producer-1")
13    }
14
15    val producer = KafkaProducer<String, String>(props)
16    producer.initTransactions()
17
18    try {
19        producer.beginTransaction()
20        producer.send(ProducerRecord("my-topic", "key", "value"))
21        producer.commitTransaction()
22    } catch (e: Exception) {
23        producer.abortTransaction()
24    } finally {
25        producer.close()
26    }
27}

Clojure Example:

 1(ns exactly-once-producer
 2  (:import [org.apache.kafka.clients.producer KafkaProducer ProducerConfig ProducerRecord]
 3           [org.apache.kafka.common.serialization StringSerializer]))
 4
 5(defn create-producer []
 6  (let [props (doto (java.util.Properties.)
 7                (.put ProducerConfig/BOOTSTRAP_SERVERS_CONFIG "localhost:9092")
 8                (.put ProducerConfig/KEY_SERIALIZER_CLASS_CONFIG StringSerializer)
 9                (.put ProducerConfig/VALUE_SERIALIZER_CLASS_CONFIG StringSerializer)
10                (.put ProducerConfig/ENABLE_IDEMPOTENCE_CONFIG true)
11                (.put ProducerConfig/TRANSACTIONAL_ID_CONFIG "transactional-producer-1"))]
12    (KafkaProducer. props)))
13
14(defn -main []
15  (let [producer (create-producer)]
16    (.initTransactions producer)
17    (try
18      (.beginTransaction producer)
19      (.send producer (ProducerRecord. "my-topic" "key" "value"))
20      (.commitTransaction producer)
21      (catch Exception e
22        (.abortTransaction producer))
23      (finally
24        (.close producer)))))

Configuring Consumers

Consumers must be configured to handle offsets transactionally, ensuring that messages are processed exactly once.

Java Example:

 1import org.apache.kafka.clients.consumer.ConsumerConfig;
 2import org.apache.kafka.clients.consumer.KafkaConsumer;
 3import org.apache.kafka.clients.consumer.ConsumerRecords;
 4import org.apache.kafka.clients.consumer.ConsumerRecord;
 5import org.apache.kafka.common.serialization.StringDeserializer;
 6
 7import java.time.Duration;
 8import java.util.Collections;
 9import java.util.Properties;
10
11public class ExactlyOnceConsumer {
12    public static void main(String[] args) {
13        Properties props = new Properties();
14        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
15        props.put(ConsumerConfig.GROUP_ID_CONFIG, "transactional-consumer-group");
16        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
17        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
18        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // Disable auto commit
19
20        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
21        consumer.subscribe(Collections.singletonList("my-topic"));
22
23        while (true) {
24            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
25            for (ConsumerRecord<String, String> record : records) {
26                // Process record
27                System.out.printf("Consumed record with key %s and value %s%n", record.key(), record.value());
28            }
29            consumer.commitSync(); // Commit offsets
30        }
31    }
32}

Scala Example:

 1import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecords, KafkaConsumer}
 2import org.apache.kafka.common.serialization.StringDeserializer
 3
 4import java.time.Duration
 5import java.util.{Collections, Properties}
 6
 7object ExactlyOnceConsumer {
 8  def main(args: Array[String]): Unit = {
 9    val props = new Properties()
10    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
11    props.put(ConsumerConfig.GROUP_ID_CONFIG, "transactional-consumer-group")
12    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer].getName)
13    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer].getName)
14    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
15
16    val consumer = new KafkaConsumer[String, String](props)
17    consumer.subscribe(Collections.singletonList("my-topic"))
18
19    while (true) {
20      val records: ConsumerRecords[String, String] = consumer.poll(Duration.ofMillis(100))
21      records.forEach { record =>
22        println(s"Consumed record with key ${record.key()} and value ${record.value()}")
23      }
24      consumer.commitSync()
25    }
26  }
27}

Kotlin Example:

 1import org.apache.kafka.clients.consumer.ConsumerConfig
 2import org.apache.kafka.clients.consumer.ConsumerRecords
 3import org.apache.kafka.clients.consumer.KafkaConsumer
 4import org.apache.kafka.common.serialization.StringDeserializer
 5import java.time.Duration
 6import java.util.*
 7
 8fun main() {
 9    val props = Properties().apply {
10        put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
11        put(ConsumerConfig.GROUP_ID_CONFIG, "transactional-consumer-group")
12        put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class.java.name)
13        put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class.java.name)
14        put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false)
15    }
16
17    val consumer = KafkaConsumer<String, String>(props)
18    consumer.subscribe(listOf("my-topic"))
19
20    while (true) {
21        val records: ConsumerRecords<String, String> = consumer.poll(Duration.ofMillis(100))
22        for (record in records) {
23            println("Consumed record with key ${record.key()} and value ${record.value()}")
24        }
25        consumer.commitSync()
26    }
27}

Clojure Example:

 1(ns exactly-once-consumer
 2  (:import [org.apache.kafka.clients.consumer KafkaConsumer ConsumerConfig ConsumerRecords]
 3           [org.apache.kafka.common.serialization StringDeserializer]))
 4
 5(defn create-consumer []
 6  (let [props (doto (java.util.Properties.)
 7                (.put ConsumerConfig/BOOTSTRAP_SERVERS_CONFIG "localhost:9092")
 8                (.put ConsumerConfig/GROUP_ID_CONFIG "transactional-consumer-group")
 9                (.put ConsumerConfig/KEY_DESERIALIZER_CLASS_CONFIG StringDeserializer)
10                (.put ConsumerConfig/VALUE_DESERIALIZER_CLASS_CONFIG StringDeserializer)
11                (.put ConsumerConfig/ENABLE_AUTO_COMMIT_CONFIG false))]
12    (KafkaConsumer. props)))
13
14(defn -main []
15  (let [consumer (create-consumer)]
16    (.subscribe consumer (java.util.Collections/singletonList "my-topic"))
17    (while true
18      (let [records (.poll consumer (java.time.Duration/ofMillis 100))]
19        (doseq [record records]
20          (println (format "Consumed record with key %s and value %s" (.key record) (.value record))))
21        (.commitSync consumer)))))

Integrating with External Systems

When integrating Kafka with external systems, such as databases, it is crucial to maintain transactional consistency. This often involves using a two-phase commit protocol or the outbox pattern to ensure that changes in Kafka and the external system are synchronized.

Two-Phase Commit

The two-phase commit protocol involves preparing all systems for a transaction and then committing the transaction across all systems. This ensures that either all systems commit the transaction or none do, maintaining consistency.

Outbox Pattern

The outbox pattern involves writing changes to a database and an outbox table within the same transaction. A separate process then reads from the outbox table and publishes messages to Kafka, ensuring that messages are only sent if the database transaction is successful.

Limitations and Considerations

While exactly-once processing offers significant benefits, it is not without limitations. Understanding these limitations is crucial for designing robust systems.

  • Performance Overhead: Exactly-once processing introduces additional overhead due to the need for transactional coordination and state management.
  • Complexity: Implementing exactly-once semantics can increase system complexity, requiring careful management of transaction IDs and offsets.
  • Scalability: The scalability of exactly-once processing may be limited by the need to maintain transactional state across distributed components.
  • External System Support: Not all external systems support exactly-once semantics, which can complicate integration efforts.

Conclusion

Exactly-once processing in Apache Kafka is a powerful feature that ensures data integrity and consistency across distributed systems. By carefully configuring producers, brokers, and consumers, and considering integration with external systems, it is possible to achieve reliable exactly-once processing. However, it is essential to weigh the benefits against the potential limitations and complexity involved.

Test Your Knowledge: Exactly-Once Processing in Apache Kafka

Loading quiz…
Revised on Thursday, April 23, 2026