Python librdkafka producer perform against the native Apache Kafka Producer
Asked Answered
M

1

7

I am testing Apache Kafka Producer with native java implementation against Python's confluent-kafka to see which has the maximum throughput.

I am deploying a Kafka cluster with 3 Kafka brokers and 3 zookeeper instances using docker-compose. My docker compose file: https://paste.fedoraproject.org/paste/bn7rr2~YRuIihZ06O3Q6vw/raw

It's a very simple code with mostly default options for Python confluent-kafka and some config changes in java producer to match that of confluent-kafka.

Python Code:

from confluent_kafka import Producer

producer = Producer({'bootstrap.servers': 'kafka-1:19092,kafka-2:29092,kafka-3:39092', 'linger.ms': 300, "max.in.flight.requests.per.connection": 1000000, "queue.buffering.max.kbytes": 1048576, "message.max.bytes": 1000000,
    'default.topic.config': {'acks': "all"}})

ss = '0123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357'

def f():
    import time
    start = time.time()
    for i in xrange(1000000):
        try:
            producer.produce('test-topic', ss)
        except Exception:
            producer.poll(1)
            try:
                producer.produce('test-topic', ss)
            except Exception:
                producer.flush(30)
                producer.produce('test-topic', ss)
        producer.poll(0)
    producer.flush(30)
    print(time.time() - start)


if __name__ == '__main__':
    f()

Java implementation. Configuration same as config in librdkafka. Changed the linger.ms and callback as suggested by Edenhill.

package com.amit.kafka;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.nio.charset.Charset;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

public class KafkaProducerExampleAsync {

    private final static String TOPIC = "test-topic";
    private final static String BOOTSTRAP_SERVERS = "kafka-1:19092,kafka-2:29092,kafka-3:39092";

    private static Producer<String, String> createProducer() {
        int bufferMemory = 67108864;
        int batchSizeBytes = 1000000;
        String acks = "all";

        Properties props = new Properties();

        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaExampleProducer");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSizeBytes);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 100);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
        props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1000000);
        props.put(ProducerConfig.ACKS_CONFIG, acks);

        return new KafkaProducer<>(props);
    }

    static void runProducer(final int sendMessageCount) throws InterruptedException {
        final Producer<String, String> producer = createProducer();
        final String msg = "0123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357";

        final ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, msg);
        final long[] new_time = new long[1];

        try {
            for (long index = 0; index < sendMessageCount; index++) {
                producer.send(record, new Callback() {
                    public void onCompletion(RecordMetadata metadata, Exception e) {
                        // This if-else is to only start timing this when first message reach kafka
                        if(e != null) {
                           e.printStackTrace();
                        } else {
                            if (new_time[0] == 0) {
                                new_time[0] = System.currentTimeMillis();
                            }
                        }
                    }
                });
            }
        } finally {
            // producer.flush();
            producer.close();
            System.out.printf("Total time %d ms\n", System.currentTimeMillis() - new_time[0]);
        }
    }

    public static void main(String... args) throws Exception {
        if (args.length == 0) {
            runProducer(1000000);
        } else {
            runProducer(Integer.parseInt(args[0]));
        }
    }
}

Benchmark results(Edited after making changes recommended by Edenhill)

Acks = 0, Messages: 1000000

Java: 12.066

Python: 9.608 seconds

Acks: all, Messages: 1000000

Java: 45.763 11.917 seconds

Python: 14.3029 seconds


Java implementation is performing same as Python implementation even after making all the changes that I could think of and the ones suggested by Edenhill in the comment below.

There are various benchmarks about the performance of Kafka in Python but I couldn't find any comparing librdkafka or python Kafka against Apache Kafka.

I have two questions:

  1. Is this test enough to come to the conclusion that with default config's and message of size 1Kb librdkafka is faster?

  2. Does anyone have experience or a source(blog, doc etc) benchmarking librdkafka against confluent-kafka?

Mantooth answered 5/3, 2019 at 9:23 Comment(5)
librdkafka is probably faster than the Java counterpart, but the Python client is not due to the large overhead of Python object creation. You will probably want to set "linger.ms" on both clients to something like 50 ms, and start measuring from when you receive the first message delivery acknowledgement / delivery report, to not include startup costs in your measurements.Suited
What would be the equivalent of queue.buffering.mx.messages in Kafka producer API?Galinagalindo
I'm not sure there is a queue limit in the Java Producer.Suited
I also couldn't find queue.buffering.mx.messages option in Java producer api. It used to be there at least till v0.8 kafka.apache.org/08/documentation.htmlMantooth
@Suited I made the changes you suggested and saw a big speed bump in case of Java with acks='all' case but it's still not faster than Python producer by much. As I increased message count Python performed better ¯_(ツ)_/¯. I have updated the questionMantooth
G
3

Python client uses librdkakfa which overrides some of the default configuration of Kafka.

Paramter = Kafka default
batch.size = 16384
max.in.flight.requests.per.connection = 5 (librdkafka's default is 1000000)

message.max.bytes in librdkafka may be equivalent to max.request.size.

I think there is no equivalent of librdKafka's queue.buffering.max.messages in Kafka's producer API. If you find something then correct me.

Also, remove buffer.memory parameter from Java program.

https://kafka.apache.org/documentation/#producerconfigs https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md

Next thing is Java takes some time to load classes. So you need to increase the number of messages your producers producer. It would be great if it takes at-least 20-30 minutes to produce all messages. Then you can compare Java client with Python client.

I like the idea of comparison between python and java client. Keep posting your results on stackoverflow.

Galinagalindo answered 5/3, 2019 at 14:56 Comment(2)
It's same ACKS_CONFIG= acks which i am setting to 0 or all, I think batch.size is same as batch.num.messages in librdkafka which I am setting to batchSize = 10000 and props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1000000) this is for max inflight. It's same is librdkafka config github.com/edenhill/librdkafka/blob/master/CONFIGURATION.mdMantooth
I think there is one issue here message.max.bytes in librdkafka is BATCH_SIZE_CONFIG in apache kafka. I was using this as batch.num.messages parameter. BATCH_SIZE_CONFIG controls the batch size in bytes not number. Will update questions with new resultsMantooth

© 2022 - 2025 — McMap. All rights reserved.