Spark batch reading from Kafka & using Kafka to keep track of offsets
C

2

6

I understand that using Kafka's own offset tracking instead of other methods (like checkpointing) is problematic for streaming jobs.

However I just want to run a Spark batch job every day, reading all messages from the last offset to the most recent and do some ETL with it.

In theory I want to read this data like so:

val dataframe = spark.read
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:6001")
      .option("subscribe", "topic-in")
      .option("includeHeaders", "true")
      .option("kafka.group.id", s"consumer-group-for-this-job")
      .load()

And have Spark commit the offsets back to Kafka based on the group.id

Unfortunately Spark never commits these back, so I went creative and added in the end of my etl job, this code to manually update the offsets for the consumer in Kafka:

val offsets: Map[TopicPartition, OffsetAndMetadata] = dataFrame
      .select('topic, 'partition, 'offset)
      .groupBy("topic", "partition")
      .agg(max('offset))
      .as[(String, Int, Long)]
      .collect()
      .map {
        case (topic, partition, maxOffset) => new TopicPartition(topic, partition) -> new OffsetAndMetadata(maxOffset)
      }
      .toMap

val props = new Properties()
    props.put("group.id", "consumer-group-for-this-job")
    props.put("bootstrap.servers", "localhost:6001")
    props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer")
    props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer")
    props.put("enable.auto.commit", "false")
    val kafkaConsumer = new KafkaConsumer[Array[Byte], Array[Byte]](props)

    kafkaConsumer.commitSync(offsets.asJava)

Which technically works, but still next time reading based on this group.id Spark will still start from the beginning.

Do I have to bite the bullet and keep track of the offsets somewhere, or is there something I'm overlooking?

BTW I'm testing this with EmbeddedKafka

Cason answered 27/1, 2021 at 21:16 Comment(0)
C
0

I managed to resolve it by leaving the spark.read as is, ignoring the group.id etc. But surrounding it with my own KafkaConsumer logic.

 protected val kafkaConsumer: String => KafkaConsumer[Array[Byte], Array[Byte]] =
    groupId => {
      val props = new Properties()
      props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
      props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServers)
      props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
      props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
      props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
      props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
      new KafkaConsumer[Array[Byte], Array[Byte]](props)
    }

  protected def getPartitions(kafkaConsumer: KafkaConsumer[_, _], topic: String): List[TopicPartition] = {
    import scala.collection.JavaConverters._

    kafkaConsumer
      .partitionsFor(topic)
      .asScala
      .map(p => new TopicPartition(topic, p.partition()))
      .toList
  }

  protected def getPartitionOffsets(kafkaConsumer: KafkaConsumer[_, _], topic: String, partitions: List[TopicPartition]): Map[String, Map[String, Long]] = {
    Map(
      topic -> partitions
        .map(p => p.partition().toString -> kafkaConsumer.position(p))
        .map {
          case (partition, offset) if offset == 0L => partition -> -2L
          case mapping                             => mapping
        }
        .toMap
    )
  }

def getStartingOffsetsString(kafkaConsumer: KafkaConsumer[_, _], topic: String)(implicit logger: Logger): String = {
    Try {
      import scala.collection.JavaConverters._

      val partitions: List[TopicPartition] = getPartitions(kafkaConsumer, topic)

      kafkaConsumer.assign(partitions.asJava)

      val startOffsets: Map[String, Map[String, Long]] = getPartitionOffsets(kafkaConsumer, topic, partitions)

      logger.debug(s"Starting offsets for $topic: ${startOffsets(topic).filterNot(_._2 == -2L)}")

      implicit val formats = org.json4s.DefaultFormats
      Serialization.write(startOffsets)
    } match {
      case Success(jsonOffsets) => jsonOffsets
      case Failure(e) =>
        logger.error(s"Failed to retrieve starting offsets for $topic: ${e.getMessage}")
        "earliest"
    }
  }

// MAIN CODE

    val groupId              = consumerGroupId(name)
    val currentKafkaConsumer = kafkaConsumer(groupId)
    val topic                = config.topic.getOrElse(name)

    val startingOffsets = getStartingOffsetsString(currentKafkaConsumer, topic)

    val dataFrame = spark.read
      .format("kafka")
      .option("kafka.bootstrap.servers", config.bootstrapServers)
      .option("subscribe", topic)
      .option("includeHeaders", "true")
      .option("startingOffsets", startingOffsets)
      .option("enable.auto.commit", "false")
      .load()

Try {
  import scala.collection.JavaConverters._

  val partitions: List[TopicPartition] = getPartitions(kafkaConsumer, topic)

  val numRecords = dataFrame.cache().count() // actually read data from kafka
  kafkaConsumer.seekToEnd(partitions.asJava) // assume the read has head everything

  val endOffsets: Map[String, Map[String, Long]] = getPartitionOffsets(kafkaConsumer, topic, partitions)

  logger.debug(s"Loaded $numRecords records")
  logger.debug(s"Ending offsets for $topic: ${endOffsets(topic).filterNot(_._2 == -2L)}")

  kafkaConsumer.commitSync()
  kafkaConsumer.close()
} match {
  case Success(_) => ()
  case Failure(e) =>
    logger.error(s"Failed to set offsets for $topic: ${e.getMessage}")
}
Cason answered 28/1, 2021 at 15:31 Comment(3)
Looking at this in the past it is unclear why batch still exists given other answer.Fonteyn
@Tom Lous i was looking for resolution for same usecase using java, is your code working as expected?Pteryla
It worked at the time. I have no idea if it still works, since I'm no longer working at that company anymore :-)Cason
F
5

"However I just want to run a Spark batch job every day, reading all messages from the last offset to the most recent and do some ETL with it."

The Trigger.Once is exactly made for this kind of requirement.

There is a nice blog from Databricks that explains why "Streaming and RunOnce is Better than Batch".

Most importantly:

"When you’re running a batch job that performs incremental updates, you generally have to deal with figuring out what data is new, what you should process, and what you should not. Structured Streaming already does all this for you."

Although your approach is working technically, I would really recommend to have Spark take care of the offset management.

It probably does not work with EmbeddedKafka as this is running only in memory and not remembering that you have committed some offsets between runs of your test code. Therefore, it starts reading again and again from earliest offset.

Fraught answered 27/1, 2021 at 22:14 Comment(1)
Thanks for the info. I'm aware of the Tigger.Once mechanic, but I was hoping that the batch version would be better integrated with kafka offsets then the streaming one. For streams it makes sense to keep track of offsets in other places. For batch a case can be made to integrate with kafka offsets IMHO. But I am probably mistakenCason
C
0

I managed to resolve it by leaving the spark.read as is, ignoring the group.id etc. But surrounding it with my own KafkaConsumer logic.

 protected val kafkaConsumer: String => KafkaConsumer[Array[Byte], Array[Byte]] =
    groupId => {
      val props = new Properties()
      props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
      props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServers)
      props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
      props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
      props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
      props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
      new KafkaConsumer[Array[Byte], Array[Byte]](props)
    }

  protected def getPartitions(kafkaConsumer: KafkaConsumer[_, _], topic: String): List[TopicPartition] = {
    import scala.collection.JavaConverters._

    kafkaConsumer
      .partitionsFor(topic)
      .asScala
      .map(p => new TopicPartition(topic, p.partition()))
      .toList
  }

  protected def getPartitionOffsets(kafkaConsumer: KafkaConsumer[_, _], topic: String, partitions: List[TopicPartition]): Map[String, Map[String, Long]] = {
    Map(
      topic -> partitions
        .map(p => p.partition().toString -> kafkaConsumer.position(p))
        .map {
          case (partition, offset) if offset == 0L => partition -> -2L
          case mapping                             => mapping
        }
        .toMap
    )
  }

def getStartingOffsetsString(kafkaConsumer: KafkaConsumer[_, _], topic: String)(implicit logger: Logger): String = {
    Try {
      import scala.collection.JavaConverters._

      val partitions: List[TopicPartition] = getPartitions(kafkaConsumer, topic)

      kafkaConsumer.assign(partitions.asJava)

      val startOffsets: Map[String, Map[String, Long]] = getPartitionOffsets(kafkaConsumer, topic, partitions)

      logger.debug(s"Starting offsets for $topic: ${startOffsets(topic).filterNot(_._2 == -2L)}")

      implicit val formats = org.json4s.DefaultFormats
      Serialization.write(startOffsets)
    } match {
      case Success(jsonOffsets) => jsonOffsets
      case Failure(e) =>
        logger.error(s"Failed to retrieve starting offsets for $topic: ${e.getMessage}")
        "earliest"
    }
  }

// MAIN CODE

    val groupId              = consumerGroupId(name)
    val currentKafkaConsumer = kafkaConsumer(groupId)
    val topic                = config.topic.getOrElse(name)

    val startingOffsets = getStartingOffsetsString(currentKafkaConsumer, topic)

    val dataFrame = spark.read
      .format("kafka")
      .option("kafka.bootstrap.servers", config.bootstrapServers)
      .option("subscribe", topic)
      .option("includeHeaders", "true")
      .option("startingOffsets", startingOffsets)
      .option("enable.auto.commit", "false")
      .load()

Try {
  import scala.collection.JavaConverters._

  val partitions: List[TopicPartition] = getPartitions(kafkaConsumer, topic)

  val numRecords = dataFrame.cache().count() // actually read data from kafka
  kafkaConsumer.seekToEnd(partitions.asJava) // assume the read has head everything

  val endOffsets: Map[String, Map[String, Long]] = getPartitionOffsets(kafkaConsumer, topic, partitions)

  logger.debug(s"Loaded $numRecords records")
  logger.debug(s"Ending offsets for $topic: ${endOffsets(topic).filterNot(_._2 == -2L)}")

  kafkaConsumer.commitSync()
  kafkaConsumer.close()
} match {
  case Success(_) => ()
  case Failure(e) =>
    logger.error(s"Failed to set offsets for $topic: ${e.getMessage}")
}
Cason answered 28/1, 2021 at 15:31 Comment(3)
Looking at this in the past it is unclear why batch still exists given other answer.Fonteyn
@Tom Lous i was looking for resolution for same usecase using java, is your code working as expected?Pteryla
It worked at the time. I have no idea if it still works, since I'm no longer working at that company anymore :-)Cason

© 2022 - 2024 — McMap. All rights reserved.