I understand that using Kafka's own offset tracking instead of other methods (like checkpointing) is problematic for streaming jobs.
However I just want to run a Spark batch job every day, reading all messages from the last offset to the most recent and do some ETL with it.
In theory I want to read this data like so:
val dataframe = spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:6001")
.option("subscribe", "topic-in")
.option("includeHeaders", "true")
.option("kafka.group.id", s"consumer-group-for-this-job")
.load()
And have Spark commit the offsets back to Kafka based on the group.id
Unfortunately Spark never commits these back, so I went creative and added in the end of my etl job, this code to manually update the offsets for the consumer in Kafka:
val offsets: Map[TopicPartition, OffsetAndMetadata] = dataFrame
.select('topic, 'partition, 'offset)
.groupBy("topic", "partition")
.agg(max('offset))
.as[(String, Int, Long)]
.collect()
.map {
case (topic, partition, maxOffset) => new TopicPartition(topic, partition) -> new OffsetAndMetadata(maxOffset)
}
.toMap
val props = new Properties()
props.put("group.id", "consumer-group-for-this-job")
props.put("bootstrap.servers", "localhost:6001")
props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer")
props.put("enable.auto.commit", "false")
val kafkaConsumer = new KafkaConsumer[Array[Byte], Array[Byte]](props)
kafkaConsumer.commitSync(offsets.asJava)
Which technically works, but still next time reading based on this group.id Spark will still start from the beginning.
Do I have to bite the bullet and keep track of the offsets somewhere, or is there something I'm overlooking?
BTW I'm testing this with EmbeddedKafka