Confluent connect-jdbc and exactly once delivery
Asked Answered
E

2

6

Is kafka-connect-jdbc safe in terms of lose and duplicate rows in case of auto incrementing primary key field in database as incrementing field?

Electrometallurgy answered 23/4, 2018 at 5:31 Comment(1)
You might want to look at Debezium if this is a concern.Razorback
F
9

It is absolutely not safe in auto-incrementing mode. The issue is transaction isolation and the resulting visibility characteristics — the order in which transactions are commenced (and the values of any auto-incrementing fields they might acquire) is not the same as the order in which those transactions commit. This issue will be particularly pronounced in mixed workloads, where the transactions may take a varying time to complete. So, as an observer, what you'll see in the table is temporary 'gaps' in the visible records, until such time when those transactions complete. If transaction T0 with key 0 starts before T1 with key 1, but T1 completes first, the Kafka Connect source connector will observe the effects of T1, publish the record and advance the watermark to key 1. Later, T0 will eventually commit, but by this time the source connector will have moved on.

This is a reported issue, and the Kafka Connect documentation is not transparent about the known limitations (in spite of the issue being open with the KC JDBC team since 2016).

One workaround is to use the timestamp mode (which isn't safe on its own), and add a lag via the timestamp.delay.interval.ms property. As per the Confluent documentation:

How long to wait after a row with certain timestamp appears before we include it in the result. You may choose to add some delay to allow transactions with earlier timestamp to complete. The first execution will fetch all available records (that is, starting at timestamp 0) until current time minus the delay. Every following execution will get data from the last time we fetched until current time minus the delay.

This solves one problem (awkwardly), but introduces another. Now the source sink will lag behind the 'tail' (so to speak) of the table for the duration of the timestamp delay, on the off-chance that a latent transaction will commit within that grace period. The longer the grace period — the longer the lag. So this might not be an option in applications that require near-real-time messaging.

You could try to relax the isolation level of the source sink queries, but that has other implications, especially if your application is relying on the transaction outbox pattern for guaranteed message delivery.

One safe solution with Kafka Connect is to employ CDC (change data capture) or equivalent, and point the source sink to the CDC table (which will be in commit order). You can either use raw CDC or Debezium as a 'portable' variant. This will add to the database I/O, but gives the connector a linear history to work off.

Felske answered 6/6, 2019 at 5:52 Comment(0)
A
4

I've analysed it for that purpose and I concluded it is NOT safe to use the "incrementing" mode with PK column unless you're dealing with a non-transactional database. This is because sequence numbers for the auto-incremented PKs are allocated during a transaction (when the INSERT is executed) but rows only appear when the transaction commits, so they may appear out of order. Imagine the not-so-uncommon scenario:

  • Transaction A does an INSERT to a table and the PK "1" is allocated to that row.
  • Transaction B does an INSERT to the same table and PK "2" is allocated to that row.
  • Transaction B commits first
  • If the connect job now performs the read, it will read row "2" first and it will remember "2" is the last row it read
  • Transaction A commits second and only now row "1" becomes visible.
  • Since the connect job is later scanning only for rows >2, row "1" won't be read.

To overcome missed rows like this, you can consider using DIRTY READ in you jdbc driver config but you will then see inserts that may have been part in transactions that were later rolled back and should not have been read.

Instead of "incrementing" I suggest you consider "timestamp" or "timestamp+incrementing" mode. https://docs.confluent.io/current/connect/connect-jdbc/docs/source_config_options.html#mode and set the "timestamp.delay.interval.ms" config appropriately as a tolerance for long running transactions completing out of order. I can't say from experience if that's 100% safe as the database I had to deal with wasn't ANSI SQL compliant and the timestamp-related features of kafka-connect-jdbc wouldn't work.

Agonist answered 24/4, 2018 at 16:47 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.