Kafka Connect - Failed to flush, timed out while waiting for producer to flush outstanding messages
Asked Answered
K

4

23

I am trying to use the Kafka Connect JDBC Source Connector with following properties in BULK mode.

connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
timestamp.column.name=timestamp
connection.password=XXXXX
validate.non.null=false
tasks.max=1
producer.buffer.memory=2097152
batch.size=1000
producer.enable.idempotence=true
offset.flush.timeout.ms=300000
table.types=TABLE,VIEW
table.whitelist=materials
offset.flush.interval.ms=5000
mode=bulk
topic.prefix=mysql-
connection.user=kafka_connect_user
poll.interval.ms=200000
connection.url=jdbc:mysql://<DBNAME>
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter

I get the following error about committing offsets, changing various parameters seems to have little effect.

[2019-04-04 12:42:14,886] INFO WorkerSourceTask{id=SapMaterialsConnector-0} flushing 4064 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2019-04-04 12:42:19,886] ERROR WorkerSourceTask{id=SapMaterialsConnector-0} Failed to flush, timed out while waiting for producer to flush outstanding 712 messages (org.apache.kafka.connect.runtime.WorkerSourceTask)
Kurth answered 4/4, 2019 at 17:6 Comment(2)
How are you running Confluent Platform? On your laptop? The message you're seeing indicates some issue with sending messages to the broker. For example, if it was overloaded.Grosvenor
Kafka is a running as a managed service on HDInsight on Azure. The cluster has three brokers. The 'bulk' data is under 20 MB in size. Are there any good guides for what to check to see if the cluster is overloaded?Kurth
P
17

The error indicates that there are a lot of messages buffered and cannot be flushed before the timeout is reached.


To address this issue you can

  • either increase offset.flush.timeout.ms configuration parameter in your Kafka Connect Worker Configs
  • or you can reduce the amount of data being buffered by decreasing producer.buffer.memory in your Kafka Connect Worker Configs. This turns to be the best option when you have fairly large messages.
Pyelography answered 4/4, 2019 at 20:49 Comment(4)
producer.buffer.memory=2097152 offset.flush.timeout.ms=300000 so 2 megabyte buffer and 5 minute timeout, really these are non suitable values? I can't send 2 megabytes in 5 minutes?Kurth
I am facing the same issue and I have tried the mentioned answer but it does not help me. I have mentioned complete details about my issue here: github.com/confluentinc/kafka-connect-jdbc/issues/… Please provide any commentsHaulm
Do you know how this property is called for kafka connect workers? Our confluent cloud instance does not recognize this under "producer.override.offset.flush.timeout.ms" nor "offset.flush.timeout.ms".Dogeatdog
Giorgos Myrianthous- is there possibility to have duplicate events on kakfa topic due to reported error ?Heritor
M
3

When security.protocol=SSL is enabled make sure that there are separate SSL parameters for Connect workers and Connect producers. Provide SSL settings for both

# Authentication settings for Connect workers
ssl.keystore.location=/var/private/ssl/kafka.worker.keystore.jks
ssl.keystore.password=worker1234
ssl.key.password=worker1234

# Authentication settings for Connect producers used with source connectors
producer.ssl.keystore.location=/var/private/ssl/kafka.source.keystore.jks
producer.ssl.keystore.password=connector1234
producer.ssl.key.password=connector1234

see https://docs.confluent.io/5.2.3/connect/security.html#separate-principals

Matelda answered 27/9, 2019 at 4:14 Comment(0)
C
2

If you're trying to connect with confluent cloud this error is probably because a missing configuration in the worker properties, make sure You added the producer and consumer configuration.

consumer.ssl.endpoint.identification.algorithm=https
consumer.sasl.mechanism=PLAIN
consumer.request.timeout.ms=20000
consumer.retry.backoff.ms=500
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="API_KEY" password="SECRET";
consumer.security.protocol=SASL_SSL

producer.ssl.endpoint.identification.algorithm=https
producer.sasl.mechanism=PLAIN
producer.request.timeout.ms=20000
producer.retry.backoff.ms=500
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="API_KEY" password="SECRET";
producer.security.protocol=SASL_SSL
Custommade answered 8/11, 2019 at 20:36 Comment(0)
U
0

I don't know If this can help to someone. I had the same error with Oracle Connector CDC and the mistake was because the table did not have primary key. I added the primary key and this worked fine.

Undesigned answered 13/1, 2022 at 15:6 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.