Kafka JDBC connector load all data, then incremental
Asked Answered
M

2

7

I am trying to figure out how to fetch all data from a query initially, then incrementally only changes using kafka connector. The reason for this is i want to load all data into elastic search, then keep es in sync with my kafka streams. Currently, i am doing this by first using connector with mode = bulk, then i change it to timestamp. This works fine.

However, if we ever want to reload all data to the Streams and ES, it means we have to write some scripts that somehow cleans or deletes kafka streams and es indices data, modify the connect ini's to set mode as bulk, restart everything, give it time to load all that data, then modify scripts again to timestamp mode, then restart everything once more(reason for needing such a script is that occasionally, bulk updates happen to correct historic data through an etl process we do not yet have control over, and this process does not update timestamps)

Is anyone doing something similar and have found a more elegant solution?

Mull answered 4/5, 2017 at 0:38 Comment(0)
M
1

coming back to this after a long time. The way was able to solve this and never have to use bulk mode

  1. stop connectors
  2. wipe offset files for each connector jvm
  3. (optional) if you want to do a complete wipe and load, you want to probably also delete your topics use the kafka/connect utils/rest api (and dont forget the state topics)
  4. restart connects.
Mull answered 28/1, 2018 at 16:48 Comment(1)
I'm trying to solve the same sort of problem (ie, reloading data from the beginning of time). Your solution seems more complicated than updating the config to bulk and back to timestamp. What problem does it solve that the other method doesn't?Sturgill
B
0

how to fetch all data from a query initially, then incrementally only changes using kafka connector.

Maybe this may help you. For example, I have a table:

╔════╦═════════════╦═══════════╗
║ Id ║    Name     ║  Surname  ║
╠════╬═════════════╬═══════════╣
║  1 ║ Martin      ║ Scorsese  ║
║  2 ║ Steven      ║ Spielberg ║
║  3 ║ Christopher ║ Nolan     ║
╚════╩═════════════╩═══════════╝

In this case I will create a View:

CREATE OR REPLACE VIEW EDGE_DIRECTORS AS
SELECT 0 AS EXID, ID, NAME, SURNAME
FROM DIRECTORS WHERE ID =< 2
UNION ALL
SELECT ID AS EXID, ID, NAME, SURNAME
FROM DIRECTORS WHERE ID > 2;

In the properties file for kafka jdbc connector you may use:

connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
mode=incrementing
incrementing.column.name=EXID
topic.prefix=
tasks.max=1
name=gv-jdbc-source-connector
connection.url=
table.types=VIEW
table.whitelist=EDGE_DIRECTORS

So kafka jdbc connector will take steps:

  1. At first all the data where EXID = 0;
  2. It will store in connector.offsets file the offset value = 0;
  3. New row will be inserted in DIRECTORS table.
  4. Kafka JDBC connector will execute: Select EXID, ID, NAME, SURNAME FROM EDGE_DIRECTORS and will notice that EXID had been incremented.
  5. Data will be updated in Kafka Streams.
Breannebrear answered 9/5, 2017 at 22:58 Comment(1)
not exactly what i was asking. currently im using timestamp columns. I have to change the mode to bulk to reload everything, then change back to timestamp to have kafka then incrementally load changed or new data (it appends the query with a to and from timestamp to do this). I was hoping to avoid having to do that toggling of mode everytime i want to start from a 'clean' slate.Mull

© 2022 - 2024 — McMap. All rights reserved.