Integrating Apache Kafka with MongoDB Connectors

Explore the integration of Apache Kafka with MongoDB using connectors, enabling document-based data storage and real-time data flows.

17.1.5.2 MongoDB Connectors

Integrating Apache Kafka with MongoDB using connectors is a powerful approach to enable flexible, document-based data storage and real-time data flows. This section explores the MongoDB Kafka Connector, providing configuration examples for source and sink connectors, discussing considerations for handling schema-less data and JSON formats, and showcasing use cases such as real-time analytics and operational data synchronization.

Introduction to MongoDB Kafka Connector

The MongoDB Kafka Connector is a tool that enables seamless integration between Apache Kafka and MongoDB, allowing data to flow in real-time between these two systems. It leverages Kafka Connect, a framework for connecting Kafka with external systems, to provide both source and sink connectors. This integration is particularly beneficial for applications that require real-time data processing and storage in a flexible, document-oriented database like MongoDB.

Key Features of MongoDB Kafka Connector

  • Real-Time Data Integration: Facilitates real-time data streaming between Kafka and MongoDB.
  • Schema-Less Data Handling: Supports JSON and BSON formats, making it ideal for schema-less data.
  • Scalability: Designed to handle large volumes of data efficiently.
  • Flexibility: Supports a wide range of configurations and transformations.

For more detailed documentation, visit the MongoDB Kafka Connector.

Configuring MongoDB Kafka Connectors

The MongoDB Kafka Connector provides two main types of connectors: source and sink. Each serves a distinct purpose in the data integration process.

Source Connector Configuration

The source connector is used to stream data from MongoDB to Kafka. It captures changes in MongoDB collections and publishes them to Kafka topics.

Example Configuration:

 1{
 2  "name": "mongodb-source-connector",
 3  "config": {
 4    "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
 5    "tasks.max": "1",
 6    "connection.uri": "mongodb://localhost:27017",
 7    "database": "myDatabase",
 8    "collection": "myCollection",
 9    "topic.prefix": "mongo.",
10    "poll.max.batch.size": "1000",
11    "poll.await.time.ms": "5000"
12  }
13}
  • connector.class: Specifies the class for the MongoDB source connector.
  • connection.uri: The URI for connecting to MongoDB.
  • database and collection: Specify the MongoDB database and collection to monitor.
  • topic.prefix: Prefix for Kafka topics to which data will be published.
  • poll.max.batch.size and poll.await.time.ms: Control the polling behavior.

Sink Connector Configuration

The sink connector is used to stream data from Kafka to MongoDB. It consumes messages from Kafka topics and writes them to MongoDB collections.

Example Configuration:

 1{
 2  "name": "mongodb-sink-connector",
 3  "config": {
 4    "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
 5    "tasks.max": "1",
 6    "connection.uri": "mongodb://localhost:27017",
 7    "database": "myDatabase",
 8    "collection": "myCollection",
 9    "topics": "kafka.topic",
10    "change.data.capture.handler": "com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler"
11  }
12}
  • connector.class: Specifies the class for the MongoDB sink connector.
  • topics: The Kafka topics to consume from.
  • change.data.capture.handler: Handles change data capture events.

Handling Schema-Less Data and JSON Formats

MongoDB’s document-oriented nature allows it to store data in a flexible, schema-less format, typically using JSON or BSON. When integrating with Kafka, it’s important to handle these formats effectively.

Considerations for Schema-Less Data

  1. Dynamic Schemas: MongoDB collections can have documents with varying structures. Ensure your Kafka consumers and producers can handle dynamic schemas.
  2. Data Transformation: Use Kafka Connect’s Single Message Transforms (SMTs) to modify data as it flows between Kafka and MongoDB.
  3. Schema Registry: Consider using a schema registry to manage and enforce schemas, even in a schema-less environment.

JSON and BSON Handling

  • JSON: Widely used for data interchange, JSON is easy to read and write. Ensure your Kafka setup can serialize and deserialize JSON effectively.
  • BSON: A binary representation of JSON, BSON is more efficient for storage and retrieval in MongoDB. Use appropriate serializers and deserializers in Kafka.

Use Cases for MongoDB Kafka Integration

Integrating Kafka with MongoDB opens up a range of possibilities for real-time data processing and analytics.

Real-Time Analytics

By streaming data from MongoDB to Kafka, you can perform real-time analytics on data as it changes. This is particularly useful for applications like fraud detection, where timely insights are critical.

Operational Data Synchronization

Use the MongoDB Kafka Connector to synchronize operational data between systems. For example, changes in a MongoDB database can be propagated to other systems via Kafka, ensuring data consistency across your architecture.

Practical Examples

Let’s explore some practical examples of using the MongoDB Kafka Connector in different programming languages.

Java Example

 1import org.apache.kafka.clients.producer.KafkaProducer;
 2import org.apache.kafka.clients.producer.ProducerRecord;
 3import org.bson.Document;
 4import com.mongodb.client.MongoClients;
 5import com.mongodb.client.MongoCollection;
 6import com.mongodb.client.MongoDatabase;
 7
 8public class MongoDBKafkaExample {
 9    public static void main(String[] args) {
10        // Connect to MongoDB
11        var mongoClient = MongoClients.create("mongodb://localhost:27017");
12        MongoDatabase database = mongoClient.getDatabase("myDatabase");
13        MongoCollection<Document> collection = database.getCollection("myCollection");
14
15        // Kafka Producer configuration
16        var props = new Properties();
17        props.put("bootstrap.servers", "localhost:9092");
18        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
19        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
20
21        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
22
23        // Stream data from MongoDB to Kafka
24        collection.find().forEach(document -> {
25            String json = document.toJson();
26            producer.send(new ProducerRecord<>("mongo.topic", json));
27        });
28
29        producer.close();
30        mongoClient.close();
31    }
32}

Scala Example

 1import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
 2import org.bson.Document
 3import com.mongodb.client.{MongoClients, MongoCollection, MongoDatabase}
 4
 5object MongoDBKafkaExample extends App {
 6  // Connect to MongoDB
 7  val mongoClient = MongoClients.create("mongodb://localhost:27017")
 8  val database: MongoDatabase = mongoClient.getDatabase("myDatabase")
 9  val collection: MongoCollection[Document] = database.getCollection("myCollection")
10
11  // Kafka Producer configuration
12  val props = new java.util.Properties()
13  props.put("bootstrap.servers", "localhost:9092")
14  props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
15  props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
16
17  val producer = new KafkaProducer[String, String](props)
18
19  // Stream data from MongoDB to Kafka
20  collection.find().forEach(document => {
21    val json = document.toJson
22    producer.send(new ProducerRecord[String, String]("mongo.topic", json))
23  })
24
25  producer.close()
26  mongoClient.close()
27}

Kotlin Example

 1import org.apache.kafka.clients.producer.KafkaProducer
 2import org.apache.kafka.clients.producer.ProducerRecord
 3import org.bson.Document
 4import com.mongodb.client.MongoClients
 5
 6fun main() {
 7    // Connect to MongoDB
 8    val mongoClient = MongoClients.create("mongodb://localhost:27017")
 9    val database = mongoClient.getDatabase("myDatabase")
10    val collection = database.getCollection("myCollection")
11
12    // Kafka Producer configuration
13    val props = Properties().apply {
14        put("bootstrap.servers", "localhost:9092")
15        put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
16        put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
17    }
18
19    val producer = KafkaProducer<String, String>(props)
20
21    // Stream data from MongoDB to Kafka
22    collection.find().forEach { document ->
23        val json = document.toJson()
24        producer.send(ProducerRecord("mongo.topic", json))
25    }
26
27    producer.close()
28    mongoClient.close()
29}

Clojure Example

 1(require '[clojure.java.io :as io])
 2(require '[clojure.data.json :as json])
 3(require '[monger.core :as mg])
 4(require '[monger.collection :as mc])
 5(require '[org.apache.kafka.clients.producer :as kp])
 6
 7(defn stream-mongo-to-kafka []
 8  (let [mongo-client (mg/connect)
 9        db (mg/get-db mongo-client "myDatabase")
10        collection (mc/get-collection db "myCollection")
11        props (doto (java.util.Properties.)
12                (.put "bootstrap.servers" "localhost:9092")
13                (.put "key.serializer" "org.apache.kafka.common.serialization.StringSerializer")
14                (.put "value.serializer" "org.apache.kafka.common.serialization.StringSerializer"))
15        producer (kp/KafkaProducer. props)]
16    (doseq [doc (mc/find-maps collection)]
17      (let [json-doc (json/write-str doc)]
18        (.send producer (kp/ProducerRecord. "mongo.topic" json-doc))))
19    (.close producer)
20    (mg/disconnect mongo-client)))
21
22(stream-mongo-to-kafka)

Visualizing the Integration

To better understand the integration between Kafka and MongoDB, consider the following diagram illustrating the data flow:

    graph TD;
	    A["MongoDB"] -->|Source Connector| B["Kafka Topic"];
	    B -->|Sink Connector| C["MongoDB"];
	    B -->|Consumer| D["Analytics Engine"];

Diagram Explanation: This diagram shows MongoDB as both a source and sink for data flowing through Kafka. Data from MongoDB is published to a Kafka topic via the source connector. The sink connector then writes data from the Kafka topic back to MongoDB, while consumers can also process the data for analytics.

Best Practices and Considerations

  • Data Consistency: Ensure data consistency between MongoDB and Kafka by carefully managing offsets and acknowledgments.
  • Error Handling: Implement robust error handling and retry mechanisms to deal with transient failures.
  • Performance Tuning: Optimize connector configurations for performance, considering factors like batch size and poll intervals.
  • Security: Secure your data flows by configuring SSL/TLS and authentication mechanisms for both Kafka and MongoDB.

Conclusion

Integrating Apache Kafka with MongoDB using connectors provides a robust solution for real-time data processing and storage in a flexible, document-oriented database. By leveraging the MongoDB Kafka Connector, you can build scalable, fault-tolerant systems that handle schema-less data efficiently. Whether you’re implementing real-time analytics or synchronizing operational data, this integration offers a powerful toolset for modern data architectures.

Test Your Knowledge: MongoDB Kafka Connector Integration Quiz

Loading quiz…
Revised on Thursday, April 23, 2026