How to specify the group id of kafka consumer for spark structured streaming?
Asked Answered
C

1

3

I would like run 2 spark structured streaming jobs in the same emr cluster to consumer the same kafka topic. Both jobs are in the running status. However, only one job can get the kafka data. My configuration for kafka part is as following.

        .format("kafka")
        .option("kafka.bootstrap.servers", "xxx")
        .option("subscribe", "sametopic")
        .option("kafka.security.protocol", "SASL_SSL")
          .option("kafka.ssl.truststore.location", "./cacerts")
          .option("kafka.ssl.truststore.password", "changeit")
          .option("kafka.ssl.truststore.type", "JKS")
          .option("kafka.sasl.kerberos.service.name", "kafka")
          .option("kafka.sasl.mechanism", "GSSAPI")
        .load()

I did not set the group.id. I guess the same group id in two jobs are used to cause this issue. However, when I set the group.id, it complains that "user-specified consumer groups are not used to track offsets.". What is the correct way to solve this problem? Thanks!

Cyclades answered 1/8, 2020 at 8:53 Comment(4)
Does this answer your question? How to manually commit kafka offsets in spark structured streaming?Dowzall
What Spark version? issues.apache.org/jira/browse/SPARK-26350Salta
Any progress to date?Hypotenuse
I tried the kafka.group.id in spark 3.0, but it does not work as my expectation. So I open a new question. #64003905Cyclades
H
1

You need to run Spark v3.

From https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html

kafka.group.id

The Kafka group id to use in Kafka consumer while reading from Kafka. Use this with caution. By default, each query generates a unique group id for reading data. This ensures that each Kafka source has its own consumer group that does not face interference from any other consumer, and therefore can read all of the partitions of its subscribed topics. In some scenarios (for example, Kafka group-based authorization), you may want to use a specific authorized group id to read data. You can optionally set the group id. However, do this with extreme caution as it can cause unexpected behavior. Concurrently running queries (both, batch and streaming) or sources with the same group id are likely interfere with each other causing each query to read only part of the data. This may also occur when queries are started/restarted in quick succession. To minimize such issues, set the Kafka consumer session timeout (by setting option "kafka.session.timeout.ms") to be very small. When this is set, option "groupIdPrefix" will be ignored.

Hypotenuse answered 3/8, 2020 at 7:8 Comment(1)
thanks, I will try spark v3. Is spark v3 already integrated in EMR now?Cyclades

© 2022 - 2024 — McMap. All rights reserved.