How to use maxOffsetsPerTrigger in pyspark structured streaming?
Asked Answered
O

2

9

I want to limit the rate when fetching data from kafka. My code looks like:

df = spark.read.format('kafka') \
        .option("kafka.bootstrap.servers",'...')\
        .option("subscribe",'A') \
        .option("startingOffsets",'''{"A":{"0":200,"1":200,"2":200}}''') \
        .option("endingOffsets",'''{"A":{"0":400,"1":400,"2":400}}''') \
        .option("maxOffsetsPerTrigger",20) \
        .load() \
        .cache()

However when I call df.count(), the result is 600. What I expected is 20. Does anyone knows why "maxOffsetsPerTrigger" doesn't work.

Overstuffed answered 26/6, 2018 at 0:17 Comment(1)
I am seeing is working perfectly fine. How many files in total are you seeing as a final result?Devoir
D
8

You are bringing 200 records per each partition (0, 1, 2), the total is 600 records.

As you can see here:

Use maxOffsetsPerTrigger option to limit the number of records to fetch per trigger.

This means that for each trigger or fetch process Kafka will get 20 records, but in total, you will still fetch the total records set in the configuration (200 per partition).

Devoir answered 26/6, 2018 at 2:6 Comment(5)
Thank you for the reply! So it will trigger 30 times to fetch 600 records? Is there any method to limit how much records I can get when I set endingOffsets to be latest, meanwhile the total records don't exceed some limit?Hairdresser
Yes, that is right! Usually, you only can limit the number of records got from fetch/poll operation using max.poll.records, if you prefer use bytes use replica.fetch.max.bytes or replica.fetch.min.bytes.Devoir
What is the difference then between setting the max.poll.records and the maxOffsetsPerTrigger ?Sturges
I have used maxOffsetsPerTrigger as 1, my topic has 10 partitions. And my number of executor is 4 and executor cores is 2. When my job runs , it is fetching 8 records per batch and assigning 2 messages per executor. Any idea why?Bingo
I am having same query, my only condition is what if I keep starting offset as earliest, how maxOffsetsPerTrigger will behave in this case.Outermost
L
0

The maxOffsetsPerTrigger option works only for streaming jobs, but you are running a batch job. That's why the configuration has no effect for you.

Listlessness answered 31/5 at 8:0 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.