Kafka-Connect: Creating a new connector in distributed mode is creating new group
Asked Answered
P

2

7

I am currently working with confluent 3.0.1 platform. I am trying to create 2 connectors on two different workers but trying to create a new connector is creating a new group for it.

Two connectors were created using below details:

1) POST http://devmetric.com:8083/connectors

{
    "name": "connector1",
    "config": {
        "connector.class": "com.xxx.kafka.connect.sink.DeliverySinkConnector",
        "tasks.max": "1",
        "topics": "dev.ps_primary_delivery",
        "elasticsearch.cluster.name": "ad_metrics_store",
        "elasticsearch.hosts": "devkafka1.com:9300",
        "elasticsearch.bulk.size": "100",
        "tenants": "tenant1"
    }
}

2) POST http://devkafka01.com:8083/connectors

{
    "name": "connector2",
    "config": {
        "connector.class": "com.xxx.kafka.connect.sink.DeliverySinkConnector",
        "tasks.max": "1",
        "topics": "dev.ps_primary_delivery",
        "elasticsearch.cluster.name": "ad_metrics_store",
        "elasticsearch.hosts": "devkafka.com:9300",
        "elasticsearch.bulk.size": "100",
        "tenants": "tenant1"
    }
}

But both of them were created under different group id. After this i queried on the existing groups.

$ sh ./bin/kafka-consumer-groups --bootstrap-server devmetric.com:9091  --new-consumer  --list

Result was:
connect-connector2
connect-connector1

These groups was created by Kafka connect automatically and was not given by me. I had given different group.id in worker.properties. But I wanted both connectors to be under same group so that they work parallel to share the messages.As of now I have 1 million data on a topic "dev.ps_primary_delivery" and I want both connector to get 0.5 million each.

Please let me know how to do this.

Polypropylene answered 18/1, 2017 at 12:56 Comment(0)
R
27

I think some clarification is required...

  1. group.id in the worker.properties file does not refer to consumer groups. It is a "worker group" - multiple workers in the same worker group will split work between them - so if the same connector has many tasks (for example the JDBC connector has a task for every table), those tasks will be allocated to all workers in the group.

  2. Sink connectors do have consumers that are part of a consumer group. The group.id of this group is always "connect-"+connector name. In your case, you got "connect-connector1" and "connect-connector2" based on your connector names. This also means that the only way two connectors will be in the same group is... if they have the same name. But names are unique, so you can't have two connectors in the same group. The reason is...

  3. Connectors don't really get events themselves, they just start a bunch of tasks. Each of the tasks has consumers that are part of the connector consumer group and each task will handle a subset of the topics and partitions independently. So having two connectors in the same group, basically means that all their tasks are part of the same group - so why do you need two connectors? Just configure more topics and more tasks for that one connector and you are all set.

The only exception is if the connector you are using doesn't use tasks correctly or limits you to just one task. In that case - either they have a good reason or (more likely) someone needs to improve their connector...

Ringlet answered 20/1, 2017 at 2:35 Comment(3)
Thanks for the clarification. I went through Kafka-connect code and I could get the difference between consumer group and worker group.Polypropylene
What if I have two kafka-connect instances that doesn't know each other? Does the name of the connector still need to be unique?Linus
To the 2nd point, you can override the default group id by passing in "consumer.override.group.id"Waal
D
0

You can set consumer.group.id as a value which Kafka Connect can take and use it as the group.id for the entire application

Advantage : you get one consumer group your application connects to Disadvantage : you should be careful on the Consumer Group configurations. Make them all look same

Dynode answered 14/8, 2018 at 20:48 Comment(2)
Even tried adding consumer.group.id in config but it simply assigns consumer is in "connect-<worker_name>" format.Natachanatal
Actually, it's consumer.override.group.id and you need to have connector.client.config.override.policy=All in the worker's properties. See docs.confluent.io/platform/current/connect/references/…Waal

© 2022 - 2024 — McMap. All rights reserved.