Restart kafka connect sink and source connectors to read from beginning
Asked Answered
T

4

21

I have searched quite a lot on this but there doesn't seems to be a good guide around this.

From what I have searched there are a few things to consider:

  • Resetting Sink Connector internal topics (status, config and offset).
  • Source Connector offsets implementation is implementation specific.

Question: Is there even a need to reset these topics?

  • Deleting the consumer group.
  • Restarting the connector with a different name (this is also an option) but it doesn't seems to be the right thing to do.
  • Resetting consumer group to --reset-offsets to --to-earliest
  • Using the REST API (Does the it provides the functionality to reset and read from beginning)

What would be the best way to restart both a sink and a source connector to read from beginning?

Talich answered 27/3, 2019 at 12:40 Comment(0)
K
25

Source Connector:

  • Standalone mode: remove offset file (/tmp/connect.offsets) or change connector name.
  • Distributed mode: change name of the connector.

Sink Connector (both modes) one of the following methods:

  • Change name.
  • Reset offset for the Consumer group. Name of the group is same as Connector name.

To reset offset you have to first delete connector, reset offset (./bin/kafka-consumer-groups.sh --bootstrap-server :9092 --group connectorName --reset-offsets --to-earliest --execute --topic topicName), add same configuration one more time

You can check following question: Reset the JDBC Kafka Connector to start pulling rows from the beginning of time?

Knee answered 27/3, 2019 at 13:12 Comment(1)
Consumer groupconnect-<connectorName> worked for me when resetting offsets for sink consumerChevron
P
5

Source connector Distributed mode - has another option which is producing a new message to the offset topic. For example I use jdbc source connector: When looking on the offset topic I see the following:

./kafka-console-consumer.sh --zookeeper localhost:2181/kafka11-staging --topic kc-staging--offsets --from-beginning --property print.key=true

["referrer-family-jdbc-source",{"query":"query"}]   {"incrementing":100}

Now in order to reset this I just produce another message with incrementing:0

For example: how to produce from shell with key from here

./kafka-console-producer.sh \
  --broker-list `hostname`:9092 \
  --topic kc-staging--offsets \
  --property "parse.key=true" \
  --property "key.separator=|"
["referrer-family-jdbc-source",{"query":"query"}]|{"incrementing":0}

Please note that you need to do the following:

  • Delete the connector.
  • Produce a message with the relevant offset as I described above.
  • Create the connector again.
Pyretotherapy answered 27/7, 2019 at 21:13 Comment(1)
Also possible just delete the value for a specific key, see more: rmoff.net/2019/08/15/…Pickard
W
3

With version 3.6.0, Kafka Connect will add native support for resetting the offsets of both sink and source connectors via the REST API as part of KIP-875.

If you are running version 3.6.0 or later, first issue a PUT request to the /connectors/{name}/stop endpoint to stop (but not delete) the connector, then reset its offsets by issuing a DELETE request to the /connectors/{name}/offsets endpoint.

Wiggler answered 8/8, 2023 at 12:33 Comment(0)
A
0

a bit late but found another way. Just set the offset.storage.file.name in standalone mode to dev/null:

#worker.properties    
offset.storage.file.filename=/dev/null

#cmdline
connect-standalone /data/config/worker.properties /data/config/connector.properties
Arne answered 2/7, 2021 at 12:56 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.