Global Tables and Foreign Key Joins in Apache Kafka

Explore the use of global tables and foreign key joins in Apache Kafka for advanced stream processing. Learn how to implement these techniques to enable complex data integration and real-time analytics.

8.4.3 Global Tables and Foreign Key Joins

Introduction

In the realm of stream processing with Apache Kafka, joining streams and tables is a powerful technique that enables complex data integration and real-time analytics. Among the various types of joins, foreign key joins using global tables stand out for their ability to handle scenarios that are not feasible with partitioned tables. This section delves into the concept of global tables, their distinction from regular tables, and their application in foreign key joins.

Understanding Global Tables

What Are Global Tables?

Global tables in Kafka Streams are a special type of table that are fully replicated across all instances of an application. Unlike partitioned tables, which are distributed across multiple nodes and only accessible by the nodes that own the partition, global tables provide every instance with a complete copy of the data. This replication allows for efficient lookups and joins on data that is not co-partitioned with the stream.

Key Differences from Regular Tables

  • Replication: Global tables are fully replicated across all instances, whereas regular tables are partitioned.
  • Access: Every instance of a Kafka Streams application can access the entire dataset in a global table, enabling joins on non-key attributes.
  • Use Cases: Global tables are ideal for scenarios where the join key is not the partition key, such as foreign key joins.

Scenarios Requiring Foreign Key Joins

Foreign key joins are essential in scenarios where the join key in the stream does not match the partition key of the table. Common use cases include:

  • Enriching Event Streams: Joining a stream of transactions with a global table of customer details to enrich each transaction with customer information.
  • Real-Time Analytics: Aggregating data from different sources where the join keys are not aligned with partition keys.
  • Data Integration: Combining data from disparate systems where the foreign key relationship is necessary for meaningful integration.

Implementing Global Tables for Lookups

To implement global tables in Kafka Streams, you need to define the table as a GlobalKTable. This allows you to perform lookups and joins using non-key attributes.

Example: Using Global Tables for Lookups

Consider a scenario where you have a stream of order events and a global table of product details. You want to enrich each order with product information.

Java Example
 1import org.apache.kafka.streams.KafkaStreams;
 2import org.apache.kafka.streams.StreamsBuilder;
 3import org.apache.kafka.streams.kstream.GlobalKTable;
 4import org.apache.kafka.streams.kstream.KStream;
 5import org.apache.kafka.streams.kstream.KeyValueMapper;
 6import org.apache.kafka.streams.kstream.ValueJoiner;
 7
 8public class GlobalTableExample {
 9    public static void main(String[] args) {
10        StreamsBuilder builder = new StreamsBuilder();
11
12        // Define the order stream
13        KStream<String, Order> orders = builder.stream("orders");
14
15        // Define the global table for product details
16        GlobalKTable<String, Product> products = builder.globalTable("products");
17
18        // Perform the join
19        KStream<String, EnrichedOrder> enrichedOrders = orders.join(
20            products,
21            (orderId, order) -> order.getProductId(), // Foreign key extractor
22            (order, product) -> new EnrichedOrder(order, product) // ValueJoiner
23        );
24
25        // Start the Kafka Streams application
26        KafkaStreams streams = new KafkaStreams(builder.build(), new Properties());
27        streams.start();
28    }
29}
Scala Example
 1import org.apache.kafka.streams.scala._
 2import org.apache.kafka.streams.scala.kstream._
 3import org.apache.kafka.streams.{KafkaStreams, StreamsConfig}
 4
 5object GlobalTableExample extends App {
 6  val builder = new StreamsBuilder()
 7
 8  // Define the order stream
 9  val orders: KStream[String, Order] = builder.stream[String, Order]("orders")
10
11  // Define the global table for product details
12  val products: GlobalKTable[String, Product] = builder.globalTable[String, Product]("products")
13
14  // Perform the join
15  val enrichedOrders: KStream[String, EnrichedOrder] = orders.join(
16    products)(
17    (orderId, order) => order.productId, // Foreign key extractor
18    (order, product) => EnrichedOrder(order, product) // ValueJoiner
19  )
20
21  // Start the Kafka Streams application
22  val streams = new KafkaStreams(builder.build(), new Properties())
23  streams.start()
24}
Kotlin Example
 1import org.apache.kafka.streams.KafkaStreams
 2import org.apache.kafka.streams.StreamsBuilder
 3import org.apache.kafka.streams.kstream.GlobalKTable
 4import org.apache.kafka.streams.kstream.KStream
 5
 6fun main() {
 7    val builder = StreamsBuilder()
 8
 9    // Define the order stream
10    val orders: KStream<String, Order> = builder.stream("orders")
11
12    // Define the global table for product details
13    val products: GlobalKTable<String, Product> = builder.globalTable("products")
14
15    // Perform the join
16    val enrichedOrders: KStream<String, EnrichedOrder> = orders.join(
17        products,
18        { _, order -> order.productId }, // Foreign key extractor
19        { order, product -> EnrichedOrder(order, product) } // ValueJoiner
20    )
21
22    // Start the Kafka Streams application
23    val streams = KafkaStreams(builder.build(), Properties())
24    streams.start()
25}
Clojure Example
 1(ns global-table-example
 2  (:require [org.apache.kafka.streams StreamsBuilder KafkaStreams]
 3            [org.apache.kafka.streams.kstream KStream GlobalKTable]))
 4
 5(defn -main []
 6  (let [builder (StreamsBuilder.)]
 7
 8    ;; Define the order stream
 9    (def orders (.stream builder "orders"))
10
11    ;; Define the global table for product details
12    (def products (.globalTable builder "products"))
13
14    ;; Perform the join
15    (def enriched-orders
16      (.join orders
17             products
18             (fn [order-id order] (.getProductId order)) ;; Foreign key extractor
19             (fn [order product] (EnrichedOrder. order product)))) ;; ValueJoiner
20
21    ;; Start the Kafka Streams application
22    (def streams (KafkaStreams. (.build builder) (Properties.)))
23    (.start streams)))

Considerations for Scaling and Data Replication

When using global tables, consider the following:

  • Data Volume: Since global tables are fully replicated, ensure that the data volume is manageable for each instance.
  • Network Bandwidth: Replicating data across instances can increase network traffic. Monitor and optimize network usage.
  • Consistency: Ensure that updates to the global table are propagated consistently across all instances.
  • Fault Tolerance: Design your application to handle failures gracefully, ensuring that the global table remains consistent.

Conclusion

Global tables and foreign key joins in Kafka Streams provide a robust mechanism for integrating and enriching data in real-time. By leveraging global tables, you can perform complex joins that are not feasible with partitioned tables, enabling a wide range of applications from real-time analytics to data integration. As you implement these techniques, consider the trade-offs in terms of data replication and network usage, and design your system to handle these challenges effectively.

Test Your Knowledge: Global Tables and Foreign Key Joins in Kafka

Loading quiz…
Revised on Thursday, April 23, 2026