How to set group.id for consumer group in kafka data source in Structured Streaming?
Asked Answered
T

4

15

I want to use Spark Structured Streaming to read from a secure kafka. This means that I will need to force a specific group.id. However, as is stated in the documentation this is not possible. Still, in the databricks documentation https://docs.azuredatabricks.net/spark/latest/structured-streaming/kafka.html#using-ssl, it says that it is possible. Does this only refer to the azure cluster?

Also, by looking at the documentation of the master branch of the apache/spark repo https://github.com/apache/spark/blob/master/docs/structured-streaming-kafka-integration.md, we can understand that such functionality is intended to be added at later spark releases. Do you know of any plans of such a stable release, that is going to allow setting that consumer group.id?

If not, are there any workarounds for Spark 2.4.0 to be able to set a specific consumer group.id?

Thermograph answered 26/3, 2019 at 10:52 Comment(0)
P
8

Currently (v2.4.0) it is not possible.

You can check following lines in Apache Spark project:

https://github.com/apache/spark/blob/v2.4.0/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala#L81 - generate group.id

https://github.com/apache/spark/blob/v2.4.0/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala#L534 - set it in properties, that are used to create KafkaConsumer

In master branch you can find modification, that enable to setting prefix or particular group.id

https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala#L83 - generate group.id based on group prefix (groupidprefix)

https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala#L543 - set previously generated groupId, if kafka.group.id wasn't passed in properties

Proletarian answered 26/3, 2019 at 11:51 Comment(4)
Thanks for the response, any idea on how I would go about implementing those modified classes? Is building a jar out of the package and adding that jar with spark-submit enough?Thermograph
@PanagiotisFytas, you can check code in master branch of apache spark. I think it is enough to remove following line (github.com/apache/spark/blob/v2.4.0/external/kafka-0-10-sql/src/…) and build and add jar to spark-submit and pass kafka.group.id property via optionStenophagous
I made this work by adding some commits from the master branch to the 2.4.0 branch. You can check my fork github.com/PanagiotisFytas/spark. You build the fat jar with the custom connector by calling [external/kafka-0-10-sql/mvn -DskipTests package]. I am not sure how safe that method is as I have not yet fully tested it.Thermograph
@PanagiotisFytas, I would be very careful with it. For sure it is not recommended approach in the production . However it is only modification of kafka streaming part.Stenophagous
S
7

Since Spark 3.0.0

According to the Structured Kafka Integration Guide you can provide the ConsumerGroup as an option kafka.group.id:

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .option("kafka.group.id", "myConsumerGroup")
  .load()

However, Spark will not commit any offsets back so the offsets of your ConsumerGroups will not be stored in Kafka's internal topic __consumer_offsets but rather in Spark's checkpoint files.

Being able to set the group.id is meant to deal with Kafka's latest feature Authorization using Role-Based Access Control for which your ConsumerGroup usually needs to follow naming conventions.

A full example of a Spark 3.x application setting kafka.group.id is discussed and solved here.

Spousal answered 12/10, 2020 at 7:25 Comment(0)
S
1

Structured Streaming guide seems to be quite explicit about it:

Note that the following Kafka params cannot be set and the Kafka source or sink will throw an exception:

group.id: Kafka source will create a unique group id for each query automatically.

auto.offset.reset: Set the source option startingOffsets to specify where to start instead.

Scat answered 26/3, 2019 at 10:52 Comment(2)
I know. At the time, I asked for workarounds, which are possible. I managed to make it work with the solution suggested.Thermograph
@PanagiotisFytas Can you show the code (as a separate answer) to help us understand it better? That'd be super helpful. Thank you.Scat
C
1

Now with spark3.0, you can specify group.id for kafka https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#kafka-specific-configurations

Coniine answered 27/6, 2020 at 20:22 Comment(1)
it has a warnig message that we should not use the group.id. if you turn on the log to be WARN level, you would see this.Hoarfrost

© 2022 - 2024 — McMap. All rights reserved.