kafka connect - jdbc sink sql exception
Asked Answered
I

3

4

I am using the confluent community edition for a simple setup consisting a rest client calling the Kafka rest proxy and then pushing that data into an oracle database using the provided jdbc sink connector.

I noticed that if there is an sql exception for instance if the actual data's length is greater than the actual one (column's length defined), the task stopped and if I do restart it, same thing it tries to insert the erroneous entry and it stopped. It does not insert the other entries.

Is not a way I can log the erroneous entry and let the tasks continue inserting the other data?

Illjudged answered 15/3, 2019 at 4:24 Comment(2)
Please read minimal reproducible example and enhance your question accordingly. Don't explain what your code is supposedly doing, post the significant portions of the code within your question. If your code would be doing what you want it to do, then you would not be posting here...Williamson
There is no written code. The JDBC Sink Connector is a provided one. All I am asking is there a configuration that can be done in confluent kafka that can keep inserting records even though there is an exception?Illjudged
A
4

Kafka Connect framework for Sink Connectors can only skip problematic records when exception is thrown during: - Convertion key or values (Converter:toConnectData(...)) - Transformation (Transformation::apply)

For that you can use errors.tolerance property:

"errors.tolerance": "all"

There are some additional properties, for printing details regarding errors: errors.log.enable, errors.log.include.messages. Original answer: Apache Kafka JDBC Connector - SerializationException: Unknown magic byte

If an exception is thrown during delivering messages Sink Task is killed. If you need to handle communication error (or others) with an external system, you have to add support to your connector

Jdbc Connector, when SQLException is thrown makes retries but doesn't skip any records

Number of retries and interval between them is managed by the following properties

  • max.retries default value 10
  • retry.backoff.ms default 3000
Argilliferous answered 15/3, 2019 at 8:25 Comment(3)
thanks for the comments. Even using these properties, it still stops. (ERROR WorkerSinkTask{id=test-sink-oracle-jdbc-autoincrement-0} Task is being killed and will not recover until manually restarted.)Illjudged
Is there a solution for that or does it make sense if I say I need to write my own jdbc sink connector? ThanksIlljudged
This does not help the question, the issue is valid data being pass to the sink connector, but the data coming in doesn't match the expected column length of the database and therefore the connector will go into an error state and not be able to pass the messages in question. A transformation upstream of the sink connector itself would be necessary to check the column length and throw an exception then in order to capture for errors.tolerancePapa
F
3

The sink cannot currently ignore bad records, but you can manually skip them, using the kafka-consumer-groups tool:

kafka-consumer-groups \
    --bootstrap-server kafka:29092 \
    --group connect-sink_postgres_foo_00 \
    --reset-offsets \
    --topic foo \
    --to-offset 2 \
    --execute

For more info see here.

Filiano answered 15/10, 2019 at 9:27 Comment(3)
I was looking at your article confluent.io/blog/…, after reading i was expecting that this errors.tolerance will fix the problem. So all sink connectors currently can't ignore bad records? @Robin MoffattNightingale
See Bartosz Wardziński's answer - that setting only applies if there are problems in certain stages of the pipeline. If it's at the point of delivery then it's down to the connector to handle it, which the JDBC sink doesn't currently.Filiano
Oh ok, got the point. The concrete issue i'm facing is with the s3 sink connector with kafka v1.1.1. When receving a malformed json it stops the connector and i need to do the manual skip as you suggested. Is there a way that upgrading to kafka 2.0 and using errors.tolerance:all will fix this? Or do you know if s3 sink connector has something for this like the elastic search sink connector has?Nightingale
P
1

Currently, there is no way to stop this from failing the sink connector, specifically.

However, there is another approach that might be worth looking into. You can apply a Single Message Transform (SMT) on the Connector, check the length of the incoming columns, then decide to either throw an exception, which would bubble up to the errors.tolerance configuration, or return null which will filter the record out entirely.

Since this is a Sink connector, the SMT would be applied before passing the record on to the connector, and therefore records that are skipped via the transform would never make it to the tasks to be sync'd into the database.

Papa answered 9/9, 2021 at 15:0 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.