How to change the name of the topic generated by Kafka Connect Source Connector
Asked Answered
B

3

9

I have an already running production deployed Kafka-Cluster and having Topic "existing-topic". I am using MongoDB-Source-Connector from Debezium.

Here all what I want is to push the CDC events directly to the topic "existing-topic" so that my consumers which are already listening to that topic will process it.

I didn't find any resource to do it so, however it's mentioned that topic is created in below format -

"If your mongodb.name parameter is A, database name is B and collection name is C, the data from database A and collection C will be loaded under the topic A.B.C"

Can I change the topic to "existing-topic" and push the events to it?

Bebel answered 17/5, 2020 at 5:52 Comment(0)
V
4

According to the documentation,

The name of the Kafka topics always takes the form logicalName.databaseName.collectionName, where logicalName is the logical name of the connector as specified with the mongodb.name configuration property, databaseName is the name of the database where the operation occurred, and collectionName is the name of the MongoDB collection in which the affected document existed.


This means that if your connector's logical name is myConnector and your database myDatabase has two collections users and orders

{
  "name": "myConnector",  
  "config": {
    "connector.class": "io.debezium.connector.mongodb.MongoDbConnector", 
    "mongodb.hosts": "mongo-db-host:27017", 
    "mongodb.name": "myDatabase", 
    "collection.whitelist": "myDatabase[.]*", 
  }
}

then Kafka Connect will populate two topics with names:

  • myConnector.myDatabase.users
  • myConnector.myDatabase.orders

Now if you still want to change the name of the target topic, you can make use of Kafka Connect Single Message Transforms (SMT). More precisely, ExtractTopic should help you. Note though that this SMT helps you extract the topic name from the key or value of the message, therefore you somehow need to include the desired topic name in the payload.

For example, the following SMT will extract the value of field myField and use this as the record's topic:

 transforms.ValueFieldExample.type=io.confluent.connect.transforms.ExtractTopic$Value
 transforms.ValueFieldExample.field=myField
Verbality answered 17/5, 2020 at 11:47 Comment(0)
W
4

I was facing the same problem with the JDBC Source Connector and found a different solution:

Using the RegexRouter Single Message Transform with dropPrefix you can just override the whole topic name:

"transforms":"dropPrefix",
"transforms.dropPrefix.regex":"A.B.C",                 // whole created topic name
"transforms.dropPrefix.replacement":"existing-topic"   // whole exisiting topic name

And it works with regex, so if you're using multiple tables/collections and your created topic name isn't constant, you should be able to make it dynamic.

It's a bit hacky as technically I'm dropping the whole topic name and then adding a new topic name - which isn't the best solution, to me anyway.

Without answered 30/7, 2020 at 16:43 Comment(3)
if i am using this transforms in JDBC sink connector and i am using this transforms, what value should i give to parameter "table.name.format" if i want output of this transforms as my destination table nameImco
Haven't tried transforms yet and I need apply that in mongo source connector. Will check how it can fit in it.Bebel
the type is missing: "transforms.dropPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter"Perseid
R
-1

I was having the same issue and i found this asnwer.

https://www.mongodb.com/docs/kafka-connector/current/source-connector/usage-examples/topic-naming/#database-and-collection-names

Here we can add a mapping to connector config, a property called topic.namespace.map. Using that we can map mongo database.collection --> exsiting_topic. For example,

If i have a collection called eventstore in test mongo database and i want to publish it to different_topic in Kafka. topic.namespace.map property would look like this.

{topic.namespace.map: {"test.eventstore": "different_topic"}}

Ratal answered 18/12, 2023 at 1:40 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.