How to get Kafka offsets for structured query for manual and reliable offset management?
D

4

30

Spark 2.2 introduced a Kafka's structured streaming source. As I understand, it's relying on HDFS checkpoint directory to store offsets and guarantee an "exactly-once" message delivery.

But old docks (like https://blog.cloudera.com/blog/2017/06/offset-management-for-apache-kafka-with-apache-spark-streaming/) says that Spark Streaming checkpoints are not recoverable across applications or Spark upgrades and hence not very reliable. As a solution, there is a practice to support storing offsets in external storage that supports transactions like MySQL or RedshiftDB.

If I want to store offsets from Kafka source to a transactional DB, how can I obtain offset from a structured stream batch?

Previously, it can be done by casting RDD to HasOffsetRanges:

val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges    

But with new Streaming API, I have an Dataset of InternalRow and I can't find an easy way to fetch offsets. The Sink API has only addBatch(batchId: Long, data: DataFrame) method and how can I suppose to get an offset for given batch id?

Dorsey answered 11/9, 2017 at 10:7 Comment(1)
How have you finally implemented this? Can you please paste your pseudo code... I need to implement...Alard
D
5

Relevant Spark DEV mailing list discussion thread is here.

Summary from it:

Spark Streaming will support getting offsets in future versions (> 2.2.0). JIRA ticket to follow - https://issues-test.apache.org/jira/browse/SPARK-18258

For Spark <= 2.2.0, you can get offsets for the given batch by reading a json from checkpoint directory (the API is not stable, so be cautious):

val checkpointRoot = // read 'checkpointLocation' from custom sink params
val checkpointDir = new Path(new Path(checkpointRoot), "offsets").toUri.toString
val offsetSeqLog = new OffsetSeqLog(sparkSession, checkpointDir)

val endOffset: Map[TopicPartition, Long] = offsetSeqLog.get(batchId).map { endOffset =>
  endOffset.offsets.filter(_.isDefined).map { str =>
    JsonUtilsWrapper.jsonToOffsets(str.get.json)
  }
}


/**
  * Hack to access private API
  * Put this class into org.apache.spark.sql.kafka010 package
  */
object JsonUtilsWrapper {
  def offsetsToJson(partitionOffsets: Map[TopicPartition, Long]): String = {
    JsonUtils.partitionOffsets(partitionOffsets)
  }

  def jsonToOffsets(str: String): Map[TopicPartition, Long] = {
    JsonUtils.partitionOffsets(str)
  }
}

This endOffset will contain the until offset for each topic/partition. Getting the start offsets is problematic, cause you have to read the 'commit' checkpoint dir. But usually, you don't care about start offsets, because storing end offsets is enough for reliable Spark job re-start.

Please, note that you have to store the processed batch id in your storage as well. Spark can re-run failed batch with the same batch id in some cases, so make sure to initialize a Custom Sink with latest processed batch id (which you should read from external storage) and ignore any batch with id < latestProcessedBatchId. Btw, batch id is not unique across queries, so you have to store batch id for each query separately.

Dorsey answered 12/9, 2017 at 7:50 Comment(0)
S
61

Spark 2.2 introduced a Kafka's structured streaming source. As I understand, it's relying on HDFS checkpoint dir to store offsets and guarantee an "exactly-once" message delivery.

Correct.

Every trigger Spark Structured Streaming will save offsets to offset directory in the checkpoint location (defined using checkpointLocation option or spark.sql.streaming.checkpointLocation Spark property or randomly assigned) that is supposed to guarantee that offsets are processed at most once. The feature is called Write Ahead Logs.

The other directory in the checkpoint location is commits directory for completed streaming batches with a single file per batch (with a file name being the batch id).

Quoting the official documentation in Fault Tolerance Semantics:

To achieve that, we have designed the Structured Streaming sources, the sinks and the execution engine to reliably track the exact progress of the processing so that it can handle any kind of failure by restarting and/or reprocessing. Every streaming source is assumed to have offsets (similar to Kafka offsets, or Kinesis sequence numbers) to track the read position in the stream. The engine uses checkpointing and write ahead logs to record the offset range of the data being processed in each trigger. The streaming sinks are designed to be idempotent for handling reprocessing. Together, using replayable sources and idempotent sinks, Structured Streaming can ensure end-to-end exactly-once semantics under any failure.

Every time a trigger is executed StreamExecution checks the directories and "computes" what offsets have been processed already. That gives you at least once semantics and exactly once in total.

But old docs (...) says that Spark Streaming checkpoints are not recoverable across applications or Spark upgrades and hence not very reliable.

There was a reason why you called them "old", wasn't there?

They refer to the old and (in my opinion) dead Spark Streaming that kept not only offsets but the entire query code that led to situations where the checkpointing were almost unusable, e.g. when you change the code.

The times are over now and Structured Streaming is more cautious what and when is checkpointed.

If I want to store offsets from Kafka source to a transactional DB, how can I obtain offset from a structured stream batch?

A solution could be to implement or somehow use MetadataLog interface that is used to deal with offset checkpointing. That could work.

how can I suppose to get an offset for given batch id?

It is not currently possible.

My understanding is that you will not be able to do it as the semantics of streaming are hidden from you. You simply should not be dealing with this low-level "thing" called offsets that Spark Structured Streaming uses to offer exactly once guarantees.

Quoting Michael Armbrust from his talk at Spark Summit Easy, Scalable, Fault Tolerant Stream Processing with Structured Streaming in Apache Spark:

you should not have to reason about streaming

and further in the talk (on the next slide):

you should write simple queries & Spark should continuously update the answer


There is a way to get offsets (from any source, Kafka including) using StreamingQueryProgress that you can intercept using StreamingQueryListener and onQueryProgress callback.

onQueryProgress(event: QueryProgressEvent): Unit Called when there is some status update (ingestion rate updated, etc.)

With StreamingQueryProgress you can access sources property with SourceProgress that gives you what you want.

Smoothen answered 12/9, 2017 at 10:44 Comment(12)
Wow, nice answer :) But last point should be the first :) However, up vote is deserved :)Flutist
"You should not. Period." - that's not an answer I was looking for :) If you follow the JIRA ticket on Spark, getting offsets is still a valid use-case. For example, what's about not-locking myself to Spark? If I have offsets in external storage, I can re-write my ETL's to Apache Flink and just let it pick up latest offsets from my storage (which is reliable by default because all data/offsets updates happen in one transaction)Dorsey
Looks like StreamingQueryListener is needed for some other purposes like monitoring, debugging, etc. I can't see how I can use it inside of custom sinks.Dorsey
Mind sharing the link to the JIRA ticket? StreamingQueryListener is not for custom sinks. That's for sure. You are right. I only mentioned it to have the whole picture of how deep are offsets hidden from you :)Smoothen
@JacekLaskowski Sure, issues-test.apache.org/jira/browse/SPARK-18258. The idea is to have an offset in addBatch() method.Dorsey
It's being discussed as we speak so my recommendation still holds, doesn't it?Smoothen
@JacekLaskowski I don't think so. It has a nice explanation of what is going on and it helps me to get better understanding, but it's misleading in part that say "you shouldn't do it". Maybe it would be better to say, that's is hard to do this currently - you either pass the topic/partition and offset from Source to Sink, or read offsets from batch id from checkpoint directory.Dorsey
Reworded the answer and added the note about MetadataLog. That could work, but haven't tried it out myself.Smoothen
@JacekLaskowski great answer, thanks. I've read "Furthermore, this same mechanism allows you to upgrade your query between restarts, as long as the input sources and output schema remain the same." on this blog article databricks.com/blog/2017/01/19/… Just because in real life applications schema changes, we should not rely on the offsets saved inside checkpoint folder, as we won't be able to retrieve them, and store them ourselves (github.com/polomarcus/Spark-Structured-Streaming-Examples). Would you agree?Dollfuss
@PaulLeclercq Offsets are the integral part of a streaming source and it rarely if ever changes. I would not worry about them. They're stable. I wish I could see an example that would show I'm wrong. Do you have one? State seems risky (as state rows are persisted in checkpoint location).Smoothen
@JacekLaskowski sorry for the misunderstanding. I was not talking about the offsets changing, but the actual message's schema changing (adding a new column for example). This is how I've understood "as long as the input sources and output schema remain the same" in the databricks' article I mentioned. I need to test that! I'll get back to youDollfuss
"the actual message's schema changing (adding a new column for example)" unless elaborate, I'm here to say that it is possible.Smoothen
D
5

Relevant Spark DEV mailing list discussion thread is here.

Summary from it:

Spark Streaming will support getting offsets in future versions (> 2.2.0). JIRA ticket to follow - https://issues-test.apache.org/jira/browse/SPARK-18258

For Spark <= 2.2.0, you can get offsets for the given batch by reading a json from checkpoint directory (the API is not stable, so be cautious):

val checkpointRoot = // read 'checkpointLocation' from custom sink params
val checkpointDir = new Path(new Path(checkpointRoot), "offsets").toUri.toString
val offsetSeqLog = new OffsetSeqLog(sparkSession, checkpointDir)

val endOffset: Map[TopicPartition, Long] = offsetSeqLog.get(batchId).map { endOffset =>
  endOffset.offsets.filter(_.isDefined).map { str =>
    JsonUtilsWrapper.jsonToOffsets(str.get.json)
  }
}


/**
  * Hack to access private API
  * Put this class into org.apache.spark.sql.kafka010 package
  */
object JsonUtilsWrapper {
  def offsetsToJson(partitionOffsets: Map[TopicPartition, Long]): String = {
    JsonUtils.partitionOffsets(partitionOffsets)
  }

  def jsonToOffsets(str: String): Map[TopicPartition, Long] = {
    JsonUtils.partitionOffsets(str)
  }
}

This endOffset will contain the until offset for each topic/partition. Getting the start offsets is problematic, cause you have to read the 'commit' checkpoint dir. But usually, you don't care about start offsets, because storing end offsets is enough for reliable Spark job re-start.

Please, note that you have to store the processed batch id in your storage as well. Spark can re-run failed batch with the same batch id in some cases, so make sure to initialize a Custom Sink with latest processed batch id (which you should read from external storage) and ignore any batch with id < latestProcessedBatchId. Btw, batch id is not unique across queries, so you have to store batch id for each query separately.

Dorsey answered 12/9, 2017 at 7:50 Comment(0)
S
2

Streaming Dataset with Kafka source has offset as one of the field. You can simply query for all offsets in query and save them into JDBC Sink

Sorrow answered 11/9, 2017 at 10:43 Comment(7)
For simple query, I need to make an additional Spark operation with group by partition, topic and aggregate max offset?Dorsey
Seems like this approach shouldn't be recommended because the information about an obtained offset ranges is already there (it's in KafkaSourceRDD), but since it's get mapped to InternalRow, I have no access to it. It's just lost and a client has to waste the cluster resources to get it back.Dorsey
Yes. From code side it will be easy, however it will be necessaryFlutist
Sorry for off-top discussion, but is it would be better to pass the optional offset via addBatch() method? Looks like having a replayable streaming source with some notion of offsets, is a requirement, so it should be in public API. Currently, it's hidden in implementation details (internal row and Kafka schema). It makes it harder to implement a reliable custom sink.Dorsey
@Dorsey Maybe not offset, but general Metadata of source :)Flutist
@t-gawęda AgreedDorsey
@Dorsey If it's helpful, please up vote answer and accept. You can also ask Spark developers on mailing list if it's possible to add metadataFlutist
A
0

A disclaimer warning: The answer relies on undocumented internal implementation details of Spark, which can be changed (and they are, see the examples of different versions below), and maybe has other unclear pitfalls, guarantees, limitations, etc.

Also, the following code examples are in Java.


At first, you have to get an underlying RDD of a dataframe in foreachBatch:

.writeStream()
// ...
.foreachBatch((dataframe, batchId) -> {
    JavaRDD<Row> rdd = dataframe.javaRDD();

Or, if you are using batch API, you can use rdd property or toJavaRDD method in Scala/Java respectively:

session.read()
    .format("kafka")
    // ...
    .load()
    .toJavaRDD();

Offsets can be obtained from the RDD using a couple of type downcasts to internal Spark classes.

For Spark 3.4.0:

import org.apache.spark.Partition;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.connector.read.InputPartition;
import org.apache.spark.sql.execution.datasources.v2.DataSourceRDDPartition;
import org.apache.spark.sql.kafka010.*;

//...

Collection<KafkaOffsetRange> getOffsetRanges(JavaRDD<Row> rdd) {
    Collection<KafkaOffsetRange> ranges = new ArrayList<>();
    for (Partition partition : rdd.partitions()) {
        for (InputPartition ip : seqAsJavaList(((DataSourceRDDPartition) partition).inputPartitions())) {
            KafkaBatchInputPartition kp = (KafkaBatchInputPartition) ip;
            KafkaOffsetRange range = kp.offsetRange();
            // String topic = range.topic();
            // int pn = range.partition();
            // long from = range.fromOffset();
            // long until = range.untilOffset();
            ranges.add(kp.offsetRange());
        }
    }
    return ranges;
}

In older Spark versions it's a bit different, e.g. in 2.4.0:

Collection<KafkaOffsetRange> getOffsetRanges(JavaRDD<Row> rdd) {
    Collection<KafkaOffsetRange> ranges = new ArrayList<>();
    for (Partition partition : rdd.partitions()) {
        DataSourceRDDPartition dp = ((DataSourceRDDPartition) partition);
        KafkaMicroBatchInputPartition kp = (KafkaMicroBatchInputPartition) dp.inputPartition();
        KafkaOffsetRange range = kp.offsetRange();
        // TopicPartition tp = range.topicPartition();
        // String topic = tp.topic();
        // int pn = tp.partition();
        // long from = range.fromOffset();
        // long until = range.untilOffset();
        ranges.add(range);
    }
    return ranges;
}
Aerugo answered 13/2 at 11:3 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.