Increase Kafka Streams Consumer Throughput
H

2

7

I have a Spark Streaming application and a Kafka Streams application running side by side, for benchmarking purposes. Both consume from the same input topic and write to different targets databases. Input topic has 15 partitions, both spark streaming and kafka streams have 15 consumers (1:1 ratio). In addition, event payloads are around 2kb. Not sure if it's relevant, but the 90% percentile Execution time for Spark Streaming is around 9ms. Kafka Streams, 12ms. commit() method is invoked in my Processor every time a message is processed.

The problem relies on high bursts. Spark Streaming can keep up with 700 per second, while Kafka Streams, around 60/70 per second only. I can't go beyond that. See graph below: (Green Line - Spark Streaming / Blue line - Kafka Streams)

Green Line - Spark Streaming / Blue line - Kafka Streams

As per config below, as long as it doesn't exceed 1000 events per consumer, considering the backpressure, spark streaming can keep up, regardless of the number of bytes per partition. As for Kafka Streams, if I understood its configs correctly (and please keep me honest), based on the same below, I am able to fetch a max of 1000 records (max.poll.records) every 100ms (poll.ms), as long as it doesn't exceed 1MB per partition (max.partition.fetch.bytes) and 50MB per fetch (fetch.max.bytes).

I see the same results (stuck on 70 events per second), regardless if I am using 5, 10 or 15 consumers, which drives me to think it is config related. I tried to tweak these by increasing the number of records per fetch and max bytes per partition, but i didn't get a significant result.

I am aware these are different tech and used for different purposes, but I am wondering what values I should use in Kafka Streams for better throughput.

Spark Streaming config:

spark.batch.duration=10
spark.streaming.backpressure.enabled=true
spark.streaming.backpressure.initialRate=1000
spark.streaming.kafka.maxRatePerPartition=100

Kafka Streams Config (All bytes and timing related)

# Consumer Config
fetch.max.bytes = 52428800 
fetch.max.wait.ms = 500 
fetch.min.bytes = 1 
heartbeat.interval.ms = 3000 
max.partition.fetch.bytes = 1048576 
max.poll.interval.ms = 300000 
max.poll.records = 1000 
request.timeout.ms = 30000
enable.auto.commit = false

# StreamsConfig
poll.ms=100 

Processor Code

public class KStreamsMessageProcessor extends AbstractProcessor<String, String> {

    private ProcessorContext context;

    @Override
    public void init(ProcessorContext context) {
        this.context = context;
    }

    @Override
    public void process(String key, String payload) {

        ResponseEntity responseEntity = null;
        try {

          // Do Some processing

        } catch (final MyException e) {

          // Do Some Exception Handling

        } finally {

            context.forward(UUID.randomUUID().toString(), responseEntity);
            context.commit();
        }
    }

Thanks in advance!

Hageman answered 23/6, 2020 at 18:8 Comment(2)
No a an expert here: I think the problem will be the commit in Kafka. Just to be sure, I cannot see in the config you posted that autocommit is disabled (enable.auto.commit). I'm also surprised you consume 70 events regardless then number of consumers... I wonder if all records end up going to the same partition, can you verify that the load is balanced between all the partitions? Looking at the code might also help! And maybe the topic configuration too.Lashley
Hi Augusto, thanks for your inputs. Just added the code in case that helps. All the partitions are well balanced with the consumers. And yes, autocommit is disabled. Let me know if there's any other config you'd like to see.Sudarium
H
5

UPDATE

The database in which Kafka Streams was writing to was the big bottleneck here. After we switch it to a better cluster (better hardware, memory, cores, etc), I tuned with the config below and I was able to consume around 2k events per second. Commit interval config was also changed (as per Augusto suggestion) and also used G1GC Garbage collector.

fetch.max.bytes = 52428800
max.partition.fetch.bytes = 1048576 

fetch.max.wait.ms = 1000 
max.poll.records = 10000 
fetch.min.bytes = 100000
enable.auto.commit = false

1

Hageman answered 26/6, 2020 at 15:43 Comment(0)
S
4

if I understood its configs correctly (and please keep me honest), based on the same below, I am able to fetch a max of 1000 records (max.poll.records) every 100ms (poll.ms), as long as it doesn't exceed 1MB per partition (max.partition.fetch.bytes) and 50MB per fetch (fetch.max.bytes).

That is not correct. :) max.poll.records specifies how many records may be returned by poll() -- if a single "fetch" to the broker returns more records, the next "poll()" call will be served from the consumer's internal buffer (ie, no network request). max.poll.records is basically a knob to tune you application code, ie, how many record do I want to process before poll() is called again. Calling poll() more frequently makes your application more reactive (for example, a rebalance only happens when poll() is called -- also you need to call poll often even to no violate max.poll.interval.ms).

poll.ms is the maximum blocking time within poll() in case no data is available. This avoids busy waiting. However, if there is data, poll() will return immediately.

Thus, the actual "network throughput" is based on the "fetch request" settings only.

Squeamish answered 4/7, 2020 at 21:6 Comment(1)
Hi Matthias, thanks again for your inputs! It makes more sense now.Sudarium

© 2022 - 2024 — McMap. All rights reserved.