kafka-python 1.3.3: KafkaProducer.send with explicit key fails to send message to broker
Asked Answered
S

2

7

(Possibly a duplicate of Can't send a keyedMessage to brokers with partitioner.class=kafka.producer.DefaultPartitioner, although the OP of that question didn't mention kafka-python. And anyway, it never got an answer.)

I have a Python program that has been successfully (for many months) sending messages to the Kafka broker, using essentially the following logic:

producer = kafka.KafkaProducer(bootstrap_servers=[some_addr],
                               retries=3)
...
msg = json.dumps(some_message)
res = producer.send(some_topic, value=msg)

Recently, I tried to upgrade it to send messages to different partitions based on a definite key value extracted from the message:

producer = kafka.KafkaProducer(bootstrap_servers=[some_addr],
                               key_serializer=str.encode,
                               retries=3)
...
try: 
    key = some_message[0]
except:
    key = None
msg = json.dumps(some_message)
res = producer.send(some_topic, value=msg, key=key)

However, with this code, no messages ever make it out of the program to the broker. I've verified that the key value extracted from some_message is always a valid string. Presumably I don't need to define my own partitioner, since, according to the documentation:

The default partitioner implementation hashes each non-None key using the same murmur2 algorithm as the java client so that messages with the same key are assigned to the same partition.

Furthermore, with the new code, when I try to determine what happened to my send by calling res.get (to obtain a kafka.FutureRecordMetadata), that call throws a TypeError exception with the message descriptor 'encode' requires a 'str' object but received a 'unicode'.

(As a side question, I'm not exactly sure what I'd do with the FutureRecordMetadata if I were actually able to get it. Based on the kafka-python source code, I assume I'd want to call either its succeeded or its failed method, but the documentation is silent on the point. The documentation does say that the return value of send "resolves to" RecordMetadata, but I haven't been able to figure out, from either the documentation or the code, what "resolves to" means in this context.)

Anyway: I can't be the only person using kafka-python 1.3.3 who's ever tried to send messages with a partitioning key, and I have not seen anything on teh Intertubes describing a similar problem (except for the SO question I referenced at the top of this post).

I'm certainly willing to believe that I'm doing something wrong, but I have no idea what that might be. Is there some additional parameter I need to supply to the KafkaProducer constructor?

Scorpio answered 2/8, 2017 at 20:9 Comment(0)
S
6

The fundamental problem turned out to be that my key value was a unicode, even though I was quite convinced that it was a str. Hence the selection of str.encode for my key_serializer was inappropriate, and was what led to the exception from res.get. Omitting the key_serializer and calling key.encode('utf-8') was enough to get my messages published, and partitioned as expected.

A large contributor to the obscurity of this problem (for me) was that the kafka-python 1.3.3 documentation does not go into any detail on what a FutureRecordMetadata really is, nor what one should expect in the way of exceptions its get method can raise. The sole usage example in the documentation:

# Asynchronous by default
future = producer.send('my-topic', b'raw_bytes')

# Block for 'synchronous' sends
try:
    record_metadata = future.get(timeout=10)
except KafkaError:
    # Decide what to do if produce request failed...
    log.exception()
    pass

suggests that the only kind of exception it will raise is KafkaError, which is not true. In fact, get can and will (re-)raise any exception that the asynchronous publishing mechanism encountered in trying to get the message out the door.

Scorpio answered 9/8, 2017 at 17:57 Comment(0)
B
4

I also faced the same error. Once I added json.dumps while sending the key, it worked.

producer.send(topic="first_topic", key=json.dumps(key)
.encode('utf-8'), value=json.dumps(msg)
.encode('utf-8'))
.add_callback(on_send_success).add_errback(on_send_error)
Brightness answered 10/6, 2020 at 14:36 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.