Kafka Connect: No tasks created for a connector
Asked Answered
Y

4

8

We are running Kafka Connect (Confluent Platform 5.4, ie. Kafka 2.4) in a distributed mode using Debezium (MongoDB) and Confluent S3 connectors. When adding a new connector via the REST API the connector is created in RUNNING state, but no tasks are created for the connector.

Pausing and resuming the connector does not help. When we stop all workers and then start them again, the tasks are created and everything runs as it should.

The issue is not caused by the connector plugins, because we see the same behaviour for both Debezium and S3 connectors. Also in debug logs I can see that Debezium is correctly returning a task configuration from the Connector.taskConfigs() method.

Can somebody tell me what to do se we can add connectors without restarting the workers? Thanks.

Configuration details

The cluster has 3 nodes with the following connect-distributed.properties:

bootstrap.servers=kafka-broker-001:9092,kafka-broker-002:9092,kafka-broker-003:9092,kafka-broker-004:9092
group.id=tdp-QA-connect-cluster

key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

offset.storage.topic=connect-offsets-qa
offset.storage.replication.factor=3
offset.storage.partitions=5

config.storage.topic=connect-configs-qa
config.storage.replication.factor=3

status.storage.topic=connect-status-qa
status.storage.replication.factor=3
status.storage.partitions=3

offset.flush.interval.ms=10000

rest.host.name=tdp-QA-kafka-connect-001
rest.port=10083
rest.advertised.host.name=tdp-QA-kafka-connect-001
rest.advertised.port=10083

plugin.path=/opt/kafka-connect/plugins,/usr/share/java/

security.protocol=SSL
ssl.truststore.location=/etc/kafka/ssl/kafka-connect.truststore.jks
ssl.truststore.password=<secret>
ssl.endpoint.identification.algorithm=
producer.security.protocol=SSL
producer.ssl.truststore.location=/etc/kafka/ssl/kafka-connect.truststore.jks
producer.ssl.truststore.password=<secret>
consumer.security.protocol=SSL
consumer.ssl.truststore.location=/etc/kafka/ssl/kafka-connect.truststore.jks
consumer.ssl.truststore.password=<secret>

max.request.size=20000000
max.partition.fetch.bytes=20000000

The connectors configuration

Debezium example:

{
  "name": "qa-mongodb-comp-converter-task|1",
  "config": {
    "connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
    "mongodb.hosts": "mongodb-qa-001:27017,mongodb-qa-002:27017,mongodb-qa-003:27017",
    "mongodb.name": "qa-debezium-comp",
    "mongodb.ssl.enabled": true,
    "collection.whitelist": "converter[.]task",
    "tombstones.on.delete": true
  }
}

S3 example:

{
  "name": "qa-s3-sink-task|1",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "topics": "qa-debezium-comp.converter.task",
    "topics.dir": "data/env/qa",
    "s3.region": "eu-west-1",
    "s3.bucket.name": "<bucket-name>",
    "flush.size": "15000",
    "rotate.interval.ms": "3600000",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "custom.kafka.connect.s3.format.plaintext.PlaintextFormat",
    "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
    "partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
    "schema.compatibility": "NONE",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": false,
    "value.converter.schemas.enable": false,
    "transforms": "ExtractDocument",
    "transforms.ExtractDocument.type":"custom.kafka.connect.transforms.ExtractDocument$Value"
  }
}

The connectors are created using curl: curl -X POST -H "Content-Type: application/json" --data @<json_file> http:/<connect_host>:10083/connectors

Yuriyuria answered 21/2, 2020 at 8:15 Comment(15)
Faced the same issue many times (https://mcmap.net/q/1362697/-kafka-jdbc-sink-connector-no-tasks-assigned/7109598), but no solution was foundTurquoise
Never have had this problem. Please show all relevant configs, commands, installation details, etc.... cc @IskuskovButylene
@cricket_007 I added the configuration details. We add the connectors via the REST API using curl -X POST ...Yuriyuria
Thats one file... Where is your Connector JSON? What command are you using to run the connector? What OS? What RAM & Heap settings?Butylene
btw, max.request.size needs to be prefixed with producer and max.partition.fetch.bytes with consumerButylene
Are you able to individually describe each of the topics in the config? Do they all look healthy?Butylene
@cricket_007 I added the connector JSON examples. We have 3 differently sized clusters that all display the same problem, the Xmx vary from 1G to 6G. The topics look healthy and after the workers are restarted everything works fine until we try to add another connector.Yuriyuria
All I can say is that tasks get distributed in the cluster. There is no guarantee all connectors always have running tasks. You may be able to see the rebalancing if you plot some JMX values, but restarting the cluster shouldnt be necessary.Butylene
I think I've faced this issue in the past. If I recall correctly, renaming the connectors' names did the trick for me. I know it doesn't make any sense at all but it might be worth trying it.Muttra
Yes, renaming connector is workaround solution. But it's not clear why this situation happenedTurquoise
@Iskuskov Do you have Confluent Support? Did you open any JIRAs? If you think it's a bug, there are more targeted audiences to reach out toButylene
> Did you open any JIRAs: My bad, no. I tryed to ask on StackOverflow and Confluent Slack, but there is no result. Over the past six months, everything is ok and now I doesn't have logs or some other helpful context.Turquoise
@user2732824, would you like to open Jira ticket about this issue?Turquoise
Creating a jira issue at issues.apache.org/jira/projects/KAFKA/issues would probably help with investigation. Please include logs in the ticket (redacted if needed). Debug level will be quite more informative with respect to how tasks are created and are assigned during a rebalance. Also, might worth checking whether you experience the issue with CP 5.3.2 too or try each of the three options for connect.protocol. Also, make sure that Connect's internal topics are created with the right settings (config topic needs to be compacted).Adder
I think it is insane that Kafka Connect doesn't log a WARN or ERROR with information on why the task can't be created or sustained. I added a comment here: issues.apache.org/jira/browse/KAFKA-9747Dempsey
Y
0

The problem was caused by the | character in the names of the connectors. Generally special characters in connector names were not properly url-encoded during a communication between workers in older Kafka Connect versions. More details in https://issues.apache.org/jira/browse/KAFKA-9747

Yuriyuria answered 22/5, 2023 at 17:13 Comment(0)
H
3

I had the same problem, so i changed the name of the connector and create a new one, it worked but I don't know the source of this problem because we have no information in kafka-connect logs.

Hit answered 26/3, 2021 at 15:35 Comment(1)
Changing the name worked for me too. Weird...Pachton
D
0

Delete the connector and create it again with a different database.server.id. Repeat this process until task(s) show up.

It worked for me after 6-7 trials, not sure why. Pausing and resuming, restarting the connector/tasks did not help me.

Dorsett answered 26/1, 2022 at 21:30 Comment(0)
U
0

I got empty tasks when deploying a different connector at Tasks are empty after deploying ElasticsearchSinkConnector

Adding these two to the config when deploy the connector will help locate the issue about why the task failed.

        "errors.log.include.messages": "true",
        "errors.log.enable": "true"

In my case, instead of empty tasks, it will show why it failed:

GET /connectors/elasticsearch-sink/status

{
    "name": "elasticsearch-sink",
    "connector": {
        "state": "RUNNING",
        "worker_id": "10.xxx.xxx.xxx:8083"
    },
    "tasks": [
        {
            "id": 0,
            "state": "FAILED",
            "worker_id": "10.xxx.xxx.xxx:8083",
            "trace": "org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: connect-elasticsearch-sink\n"
        }
    ],
    "type": "sink"
}
Undercoat answered 17/3, 2022 at 21:1 Comment(0)
Y
0

The problem was caused by the | character in the names of the connectors. Generally special characters in connector names were not properly url-encoded during a communication between workers in older Kafka Connect versions. More details in https://issues.apache.org/jira/browse/KAFKA-9747

Yuriyuria answered 22/5, 2023 at 17:13 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.