Restarting Kafka Connect S3 Sink Task Loses Position, Completely Rewrites everything
Asked Answered
T

2

7

After restarting a Kafka Connect S3 sink task, it restarted writing all the way from the beginning of the topic and wrote duplicate copies of older records. In other words, Kafka Connect seemed to lose its place.

So, I imagine that Kafka Connect stores current offset position information in the internal connect-offsets topic. That topic is empty which I presume is part of the problem.

The other two internal topics connect-statuses and connect-configs are not empty. connect-statuses has 52 entries. connect-configs has 6 entries; three for each of two sink connectors I have configured: connector-<name>, task-<name>-0, commit-<name>.

I manually created the internal Kafka Connect topics as specified in the docs before running this:

/usr/bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-configs --replication-factor 3 --partitions 1 --config cleanup.policy=compact
/usr/bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-offsets --replication-factor 3 --partitions 50 --config cleanup.policy=compact
/usr/bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-statuses --replication-factor 3 --partitions 10 --config cleanup.policy=compact

I can verify that the connect-offsets topic seems to be created correctly:

/usr/bin/kafka-topics --zookeeper localhost:2181 --describe --topic connect-offsets
Topic:connect-offsets   PartitionCount:50   ReplicationFactor:3 Configs:cleanup.policy=compact
    Topic: connect-offsets  Partition: 0    Leader: 1   Replicas: 1,2,3 Isr: 1,2,3
    Topic: connect-offsets  Partition: 1    Leader: 2   Replicas: 2,3,1 Isr: 2,3,1
    Topic: connect-offsets  Partition: 2    Leader: 3   Replicas: 3,1,2 Isr: 3,1,2
  <snip>

This is with a three server cluster running Confluent Platform v3.2.1 running Kafka 10.2.1.

Is connect-offsets supposed to be empty? Why else would Kafka Connect restart at the beginning of the topic when restarting a task?

UPDATE: Response to Randall Hauch's answer.

  • Explanation regarding source connector offsets vs sink connector offsets explains empty connect-offsets. Thanks for explanation!
  • I'm definitely not changing connector name.
  • If the connector is down for ~five days and restarted afterwards, is there any reason that the connector offset position would expire and reset? I see __consumer_offsets has cleanup.policy=compact
  • auto.offset.reset should only take affect if there is no position in __consumer_offsets, right?

I'm using mostly system defaults. My Sink config JSON is as follows. I'm using a very simple custom partitioner to partition on an Avro datetime field rather than wallclock time. That feature seems to have been added in Confluent v3.2.2 so that I won't need a custom plugin for that functionality. I'm hoping to skip Confluent v3.2.2 and go straight to v3.3.0 when it is available.

{
  "name": "my-s3-sink",

  "tasks.max": 1,
  "topics": "my-topic",
  "flush.size": 10000,

  "connector.class": "io.confluent.connect.s3.S3SinkConnector",
  "storage.class": "io.confluent.connect.s3.storage.S3Storage",
  "format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
  "schema.generator.class": "io.confluent.connect.storage.hive.schema.TimeBasedSchemaGenerator",
  "partitioner.class": "mycompany.partitioner.TimeFieldPartitioner",

  "s3.bucket.name": "my-bucket",
  "s3.region": "us-west-2",

  "partition.field.name": "timestamp",

  "locale": "us",
  "timezone": "UTC",
  "path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",

  "schema.compatibility": "NONE",

  "key.converter": "io.confluent.connect.avro.AvroConverter",
  "key.converter.schema.registry.url": "http://localhost:8081",
  "value.converter": "io.confluent.connect.avro.AvroConverter",
  "value.converter.schema.registry.url": "http://localhost:8081"
}
Traumatize answered 25/7, 2017 at 21:28 Comment(0)
F
9

The default offset retention period for Kafka consumers is 24 hours (1440 minutes). If you stop a connector and therefore make no new commits for longer than 24 hours your offsets will expire and you will start over as a new consumer when you restart. You can modify the retention period on the __consumer_offsets topic using the offsets.retention.minutes parameter

Favourable answered 28/7, 2017 at 7:26 Comment(0)
S
8

Kafka Connect uses the connect-offsets topic (or whatever you name it) to store the offsets for source connectors, but sink connector offsets are stored using the normal Kafka consumer group mechanism.

One reason your connector might start over is if the connector name changes. The connector name is used to define the consumer group's name, so if you change the name of the connector then upon restart the connector will use a different consumer group, and its consumers will start from the beginning.

Another reason might be that the Kafka Connect consumer is configured to start from the beginning each time, via consumer.auto.offset.reset=earliest.

The S3 connector version 3.3.0 (available soon) has fixes for several issues, and some of which affect how the rotation on time or schema work. You've not provided your configuration, so it's hard to say whether or not these would cause the behavior you're seeing.

Sewing answered 27/7, 2017 at 16:12 Comment(1)
Wow you save my day, amazing answer, can we set consumer.auto.offset.reset=earliest specificly for a consumer in kafka connect ?Moldy

© 2022 - 2024 — McMap. All rights reserved.