Tasks are empty after deploying ElasticsearchSinkConnector
Asked Answered
S

1

0

I tried to deploy ElasticsearchSinkConnector by

POST /connectors

{
    "name": "elasticsearch-sink",
    "config": {
        "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
        "type.name": "_doc",
        "tasks.max": "1",
        "topics": "my_db_server.public.my_table",
        "connection.url": "https://my-elasticsearch.com:9200",
        "connection.username": "xxx",
        "connection.password": "xxx",
        "key.ignore": "true",
        "schema.ignore": "true",
        "elastic.security.protocol": "SSL",
        "elastic.https.ssl.keystore.location": "path/to/keystore.jks",
        "elastic.https.ssl.keystore.password": "xxx",
        "elastic.https.ssl.key.password": "xxx",
        "elastic.https.ssl.keystore.type": "JKS",
        "elastic.https.ssl.truststore.location": "path/to/truststore.jks",
        "elastic.https.ssl.truststore.password": "xxx",
        "elastic.https.ssl.truststore.type": "JKS",
        "elastic.https.ssl.protocol": "TLS"
    }
}

It successfully deployed, however, when I check status by

GET /connectors/elasticsearch-sink/status

tasks is empty array []:

{
    "name": "elasticsearch-sink",
    "connector": {
        "state": "RUNNING",
        "worker_id": "10.xxx.xxx.xxx:8083"
    },
    "tasks": [],
    "type": "sink"
}

I found this Kafka Connect: No tasks created for a connector

However, I tried those two answers inside, both changing name, and deleting ElasticsearchSinkConnector then redeploy multiple times didn't work for me.

Also, there is no logs in the Kafka Connect pod.

Any idea? Thanks!

Syrupy answered 17/3, 2022 at 20:28 Comment(0)
S
0

After adding these two to the ElasticsearchSinkConnector config

        "errors.log.include.messages": "true",
        "errors.log.enable": "true"

in the config so like

POST /connectors

{
    "name": "elasticsearch-sink",
    "config": {
        "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
        "type.name": "_doc",
        "tasks.max": "1",
        "topics": "my_db_server.public.my_table",
        "connection.url": "https://my-elasticsearch.com:9200",
        "connection.username": "xxx",
        "connection.password": "xxx",
        "key.ignore": "true",
        "schema.ignore": "true",
        "elastic.security.protocol": "SSL",
        "elastic.https.ssl.keystore.location": "path/to/keystore.jks",
        "elastic.https.ssl.keystore.password": "xxx",
        "elastic.https.ssl.key.password": "xxx",
        "elastic.https.ssl.keystore.type": "JKS",
        "elastic.https.ssl.truststore.location": "path/to/truststore.jks",
        "elastic.https.ssl.truststore.password": "xxx",
        "elastic.https.ssl.truststore.type": "JKS",
        "elastic.https.ssl.protocol": "TLS",
        "errors.log.include.messages": "true",
        "errors.log.enable": "true"
    }
}

And this time when I check status by

GET /connectors/elasticsearch-sink/status

tasks shows error now:

{
    "name": "elasticsearch-sink",
    "connector": {
        "state": "RUNNING",
        "worker_id": "10.xxx.xxx.xxx:8083"
    },
    "tasks": [
        {
            "id": 0,
            "state": "FAILED",
            "worker_id": "10.xxx.xxx.xxx:8083",
            "trace": "org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: connect-elasticsearch-sink\n"
        }
    ],
    "type": "sink"
}

Now just need fix the permission issue.

Also, I found a good article about how to debug: https://www.confluent.io/blog/kafka-connect-deep-dive-error-handling-dead-letter-queues/

Syrupy answered 17/3, 2022 at 20:49 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.