Kafka Sink Connector : Sharded MongoDB Cluster

Rajesh Vinayagam
5 min readMay 23, 2022

--

Introduction

Recently I was working on a IoT Data Migration project which involves moving data from CosmosDB to Atlas. Initial Snapshotting of around ~1TB of data took around a day, while this was in progress the ongoing changes were streamed using change capture to a Kafka topic.

The change stream was capturing 12K messages per second on an average and there were occasional spikes in the messages during the day. Very quickly around 20M messages were accumulated in the topic until the initial snapshotting completed.

Below are some of the challenges encountered during the migration.

Challenges

  1. Sink the accumulated messages from the topic to target cluster
  2. Identify the correct write model strategy in the Sink connector, so the document could be upserted.
  3. The default ReplaceDocument write model strategy would not work on the sharded cluster
  4. Parallelise the Sink Connector processing, so we could process both accumulated and ongoing changes in parallel.

Further in this article we will see how we solve these challenges.

Partitioned Write

The change stream application was modified to create a topic with 10 partition. Below is a snapshot of the code showcasing the number of partition required while creating the topic.

The decision on the number of partition can be defined based on the number of messages streamed per second and the amount of parallelisation required while sinking

Below is a snapshot of topic(“POC-TEST-POCDB-POCCOLL”) with 10 partition.

When a producer is producing a message — It will specify the topic it wants to send the message to.Does it care about partitions?

The producer will decide target partition to place any message, depending on:

  • Partition id, if it’s specified within the message
  • key % num partitions, if no partition id is mentioned
  • Round robin if neither partition id nor message key is available in the message means only the value is available

Create a record to be sent to Kafka partition topic. The produce API take 3 arguments

Parameters:

topic - The topic the record will be appended to

key - The key that will be included in the record

value - The record contents

Since key is included in the message with no partition, the message with the same key will always end in the same partition. Below is a snapshot of messages distributed across partition in a topic

Sharded Atlas Cluster & Sink Connector

The target Atlas cluster was sharded using the “vin” and “setOn” properties from the message. Below is a snapshot of a sharded collection.

sh.enableSharding(“POCDB”);

db.POCCOLL.createIndex({“vin”:1,”setOn”:1})

sh.shardCollection(“POCDB.POCCOLL”,{“vin”:1,”setOn”:1});

Below is a sample of a message with the sharded keys.

Sink Connector Configuration

The MongoDB Kafka sink connector is a Kafka Connect connector that reads data from Apache Kafka and writes data to MongoDB. The sink connector has different write model strategies more details about the various strategies can be found here

The default write model strategy(ReplaceOneDefaultStrategy) seems to work for a normal collection but for sharded collection I was encountering the below error.

Write errors: [BulkWriteError\{index=0, code=61, message=’Failed to target upsert by query :: could not extract exact shard key’, details={}}].

My intent was to make it work in the same manner as the default ReplaceOneDefaultStrategy. And below configuration was required to get that similar behaviour

“writemodel.strategy”:”com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneBusinessKeyStrategy”,
“document.id.strategy”: “com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy”,
“document.id.strategy.overwrite.existing”: “true”,
“document.id.strategy.partial.value.projection.type”: “allowlist”,
“document.id.strategy.partial.value.projection.list”: “_id,vin, setOn”

Once the connector is set up the consumer starts to process the messages that are left behind. In the below snapshot we see 227K messages were left behind to sink.

With only one task in the Sink Connector it was sinking the messages at much lower rate. Since we have 10 partitions in the topic, we can parallelise by creating up-to 10 tasks.

In the below snapshot the sink connector was configured with 5 tasks enabling partition to be read in parallel by 5 different tasks.

Below snapshot shows the messages being written in the target Atlas cluster.

Below snapshot shows all the pending messages are processed and now the production and consumption rate are almost the same enabling real time sync.

Below snapshot shows the messages are continuously being synced in the target.

Demo

A demo of the complete article covering topic creation, sink connector configuration, connector with multiple tasks.

--

--

No responses yet