Apache Kafka Streams DSL vs. Processor API: A Comprehensive Comparison

Explore the differences between Apache Kafka's Streams DSL and Processor API, understanding their use cases, advantages, and implementation techniques for advanced stream processing tasks.

5.3.2 Streams DSL vs. Processor API

Introduction

Apache Kafka’s Streams API offers two distinct approaches for stream processing: the high-level Streams DSL (Domain Specific Language) and the low-level Processor API. Each provides unique abstractions and capabilities, catering to different needs and complexities in stream processing tasks. This section delves into the intricacies of both APIs, offering insights into their use cases, advantages, and implementation techniques.

Streams DSL: High-Level Abstractions

Intent

The Streams DSL is designed to simplify stream processing by providing a high-level, declarative approach. It abstracts the complexities of stream processing, allowing developers to focus on defining the “what” rather than the “how.”

Motivation

The Streams DSL is ideal for developers who need to implement common stream processing tasks quickly and efficiently without delving into the underlying mechanics. It is particularly useful for tasks such as filtering, mapping, joining, and aggregating data streams.

Applicability

  • Simple to Moderate Complexity: Use the Streams DSL for straightforward processing tasks that can be expressed in a declarative manner.
  • Rapid Development: Ideal for projects with tight deadlines where rapid prototyping is essential.
  • Common Operations: Suitable for tasks involving standard operations like filtering, mapping, and joining.

Structure

The Streams DSL provides a fluent API that allows chaining operations to define a processing topology.

    graph TD;
	    A["Source Topic"] --> B["Filter"];
	    B --> C["Map"];
	    C --> D["Aggregate"];
	    D --> E["Sink Topic"];

Caption: A typical Streams DSL topology involving filtering, mapping, and aggregating data from a source topic to a sink topic.

Participants

  • KStream: Represents a stream of records.
  • KTable: Represents a changelog stream, essentially a table.
  • GlobalKTable: Represents a globally replicated table.

Collaborations

  • KStream-KStream Joins: Joining two streams.
  • KStream-KTable Joins: Enriching a stream with a table.
  • Windowed Operations: Applying time-based operations.

Consequences

  • Ease of Use: Simplifies development with a rich set of built-in operations.
  • Limited Flexibility: Less control over the processing logic compared to the Processor API.

Implementation

  • Java:

    1StreamsBuilder builder = new StreamsBuilder();
    2KStream<String, String> source = builder.stream("source-topic");
    3KStream<String, String> filtered = source.filter((key, value) -> value.contains("important"));
    4filtered.to("sink-topic");
    
  • Scala:

    1val builder = new StreamsBuilder()
    2val source: KStream[String, String] = builder.stream("source-topic")
    3val filtered = source.filter((key, value) => value.contains("important"))
    4filtered.to("sink-topic")
    
  • Kotlin:

    1val builder = StreamsBuilder()
    2val source: KStream<String, String> = builder.stream("source-topic")
    3val filtered = source.filter { key, value -> value.contains("important") }
    4filtered.to("sink-topic")
    
  • Clojure:

    1(def builder (StreamsBuilder.))
    2(def source (.stream builder "source-topic"))
    3(def filtered (.filter source (reify Predicate
    4                                (test [_ key value]
    5                                  (.contains value "important")))))
    6(.to filtered "sink-topic")
    

Processor API: Low-Level Control

Intent

The Processor API offers fine-grained control over the processing logic, allowing developers to implement custom processing logic and manage state explicitly.

Motivation

The Processor API is suitable for complex processing tasks that require custom logic, state management, or integration with external systems.

Applicability

  • Complex Processing Logic: Use the Processor API for tasks that cannot be easily expressed with the Streams DSL.
  • Custom State Management: Ideal for applications requiring explicit state handling.
  • Integration with External Systems: Suitable for scenarios where integration with non-Kafka systems is necessary.

Structure

The Processor API allows developers to define a processing topology using custom processors and state stores.

    graph TD;
	    A["Source Topic"] --> B["Custom Processor"];
	    B --> C["State Store"];
	    C --> D["Custom Processor"];
	    D --> E["Sink Topic"];

Caption: A Processor API topology with custom processors and state stores.

Participants

  • Processor: A node in the processing topology that processes records.
  • StateStore: A storage mechanism for maintaining state.
  • TopologyBuilder: Constructs the processing topology.

Collaborations

  • Custom Processors: Implement custom logic for processing records.
  • State Management: Use state stores for maintaining application state.

Consequences

  • Flexibility: Provides full control over the processing logic and state management.
  • Complexity: Requires a deeper understanding of stream processing concepts and Kafka internals.

Implementation

  • Java:

     1Topology topology = new Topology();
     2topology.addSource("Source", "source-topic")
     3        .addProcessor("Process", CustomProcessor::new, "Source")
     4        .addStateStore(Stores.keyValueStoreBuilder(
     5            Stores.inMemoryKeyValueStore("state-store"),
     6            Serdes.String(),
     7            Serdes.String()), "Process")
     8        .addSink("Sink", "sink-topic", "Process");
     9
    10public class CustomProcessor implements Processor<String, String> {
    11    private ProcessorContext context;
    12    private KeyValueStore<String, String> stateStore;
    13
    14    @Override
    15    public void init(ProcessorContext context) {
    16        this.context = context;
    17        this.stateStore = (KeyValueStore<String, String>) context.getStateStore("state-store");
    18    }
    19
    20    @Override
    21    public void process(String key, String value) {
    22        // Custom processing logic
    23        if (value.contains("important")) {
    24            stateStore.put(key, value);
    25            context.forward(key, value);
    26        }
    27    }
    28
    29    @Override
    30    public void close() {
    31        // Cleanup resources
    32    }
    33}
    
  • Scala:

     1val topology = new Topology()
     2topology.addSource("Source", "source-topic")
     3        .addProcessor("Process", () => new CustomProcessor, "Source")
     4        .addStateStore(Stores.keyValueStoreBuilder(
     5            Stores.inMemoryKeyValueStore("state-store"),
     6            Serdes.String(),
     7            Serdes.String()), "Process")
     8        .addSink("Sink", "sink-topic", "Process")
     9
    10class CustomProcessor extends Processor[String, String] {
    11    private var context: ProcessorContext = _
    12    private var stateStore: KeyValueStore[String, String] = _
    13
    14    override def init(context: ProcessorContext): Unit = {
    15        this.context = context
    16        this.stateStore = context.getStateStore("state-store").asInstanceOf[KeyValueStore[String, String]]
    17    }
    18
    19    override def process(key: String, value: String): Unit = {
    20        if (value.contains("important")) {
    21            stateStore.put(key, value)
    22            context.forward(key, value)
    23        }
    24    }
    25
    26    override def close(): Unit = {
    27        // Cleanup resources
    28    }
    29}
    
  • Kotlin:

     1val topology = Topology()
     2topology.addSource("Source", "source-topic")
     3        .addProcessor("Process", { CustomProcessor() }, "Source")
     4        .addStateStore(Stores.keyValueStoreBuilder(
     5            Stores.inMemoryKeyValueStore("state-store"),
     6            Serdes.String(),
     7            Serdes.String()), "Process")
     8        .addSink("Sink", "sink-topic", "Process")
     9
    10class CustomProcessor : Processor<String, String> {
    11    private lateinit var context: ProcessorContext
    12    private lateinit var stateStore: KeyValueStore<String, String>
    13
    14    override fun init(context: ProcessorContext) {
    15        this.context = context
    16        this.stateStore = context.getStateStore("state-store") as KeyValueStore<String, String>
    17    }
    18
    19    override fun process(key: String, value: String) {
    20        if (value.contains("important")) {
    21            stateStore.put(key, value)
    22            context.forward(key, value)
    23        }
    24    }
    25
    26    override fun close() {
    27        // Cleanup resources
    28    }
    29}
    
  • Clojure:

     1(def topology (Topology.))
     2(.addSource topology "Source" (into-array String ["source-topic"]))
     3(.addProcessor topology "Process" (reify ProcessorSupplier
     4                                    (get [_]
     5                                      (reify Processor
     6                                        (init [_ context]
     7                                          (def context context)
     8                                          (def state-store (.getStateStore context "state-store")))
     9                                        (process [_ key value]
    10                                          (when (.contains value "important")
    11                                            (.put state-store key value)
    12                                            (.forward context key value)))
    13                                        (close [_])))) "Source")
    14(.addStateStore topology (Stores/keyValueStoreBuilder
    15                           (Stores/inMemoryKeyValueStore "state-store")
    16                           (Serdes/String)
    17                           (Serdes/String)) "Process")
    18(.addSink topology "Sink" "sink-topic" "Process")
    

Learning Curve and Complexity

The Streams DSL offers a gentle learning curve, making it accessible to developers familiar with functional programming paradigms. Its declarative nature allows for rapid development and prototyping. In contrast, the Processor API requires a deeper understanding of Kafka’s internals and stream processing concepts, making it more suitable for experienced developers who need fine-grained control over processing logic.

Guidelines for Selecting the Appropriate API

  • Use Streams DSL when:

    • The processing logic is straightforward and can be expressed declaratively.
    • Rapid development and prototyping are priorities.
    • Built-in operations suffice for the application’s needs.
  • Use Processor API when:

    • Custom processing logic is required.
    • There is a need for explicit state management.
    • Integration with external systems is necessary.

Conclusion

Both the Streams DSL and Processor API offer powerful capabilities for stream processing in Apache Kafka. The choice between them depends on the complexity of the processing tasks, the need for custom logic, and the developer’s familiarity with Kafka’s internals. By understanding the strengths and limitations of each approach, developers can select the most appropriate API for their specific use case.

Test Your Knowledge: Apache Kafka Streams DSL vs. Processor API Quiz

Loading quiz…
Revised on Thursday, April 23, 2026