Can't Consume JSON Messages From Kafka Using Kafka-Python's Deserializer
Asked Answered
I

3

9

I am trying to send a very simple JSON object through Kafka and read it out the other side using Python and kafka-python. However, I keep seeing the following error:

2017-04-07 10:28:52,030.30.9998989105:kafka.future:8228:ERROR:10620:Error processing callback
Traceback (most recent call last):
  File "C:\Anaconda2\lib\site-packages\kafka\future.py", line 79, in _call_backs
    f(value)
  File "C:\Anaconda2\lib\site-packages\kafka\consumer\fetcher.py", line 760, in _handle_fetch_response
    unpacked = list(self._unpack_message_set(tp, messages))
  File "C:\Anaconda2\lib\site-packages\kafka\consumer\fetcher.py", line 539, in _unpack_message_set
    tp.topic, msg.value)
  File "C:\Anaconda2\lib\site-packages\kafka\consumer\fetcher.py", line 570, in _deserialize
    return f(bytes_)
  File "C:\Users\myUser\workspace\PythonKafkaTest\src\example.py", line 55, in <lambda>
    value_deserializer=lambda m: json.loads(m).decode('utf-8'))
  File "C:\Anaconda2\lib\json\__init__.py", line 339, in loads
    return _default_decoder.decode(s)
  File "C:\Anaconda2\lib\json\decoder.py", line 364, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
  File "C:\Anaconda2\lib\json\decoder.py", line 382, in raw_decode
    raise ValueError("No JSON object could be decoded")
ValueError: No JSON object could be decoded

I’ve done some research and the most common cause of this error is that the JSON is wrong. I have tried printing out the JSON before I send it by adding the following to my code and the JSON prints with no errors.

  while True:
        json_obj1 = json.dumps({"dataObjectID": "test1"})
        print json_obj1
        producer.send('my-topic', {"dataObjectID": "test1"})
        producer.send('my-topic', {"dataObjectID": "test2"})
        time.sleep(1)

This leads me to suspect that I can produce the json, but not consume it.

Here is my code:

import threading
import logging
import time
import json

from kafka import KafkaConsumer, KafkaProducer


class Producer(threading.Thread):
    daemon = True

    def run(self):
        producer = KafkaProducer(bootstrap_servers='localhost:9092',
                                 value_serializer=lambda v: json.dumps(v).encode('utf-8'))

        while True:
            producer.send('my-topic', {"dataObjectID": "test1"})
            producer.send('my-topic', {"dataObjectID": "test2"})
            time.sleep(1)


class Consumer(threading.Thread):
    daemon = True

    def run(self):
        consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
                                 auto_offset_reset='earliest',
                                 value_deserializer=lambda m: json.loads(m).decode('utf-8'))
        consumer.subscribe(['my-topic'])

        for message in consumer:
            print (message)


def main():
    threads = [
        Producer(),
        Consumer()
    ]

    for t in threads:
        t.start()

    time.sleep(10)

if __name__ == "__main__":
    logging.basicConfig(
        format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:' +
               '%(levelname)s:%(process)d:%(message)s',
        level=logging.INFO
    )
    main()

I can successfully send and receive strings if I remove the value_serializer and value_deserializer. When I run that code I can see the JSON I am sending in. Here is a short snipit:

ConsumerRecord(topic=u'my-topic', partition=0, offset=5742, timestamp=None, timestamp_type=None, key=None, value='{"dataObjectID": "test1"}', checksum=-1301891455, serialized_key_size=-1, serialized_value_size=25)
ConsumerRecord(topic=u'my-topic', partition=0, offset=5743, timestamp=None, timestamp_type=None, key=None, value='{"dataObjectID": "test2"}', checksum=-1340077864, serialized_key_size=-1, serialized_value_size=25)
ConsumerRecord(topic=u'my-topic', partition=0, offset=5744, timestamp=None, timestamp_type=None, key=None, value='test', checksum=1495943047, serialized_key_size=-1, serialized_value_size=4)
ConsumerRecord(topic=u'my-topic', partition=0, offset=5745, timestamp=None, timestamp_type=None, key=None, value='\xc2Hello, stranger!', checksum=-1090450220, serialized_key_size=-1, serialized_value_size=17)
ConsumerRecord(topic=u'my-topic', partition=0, offset=5746, timestamp=None, timestamp_type=None, key=None, value='test', checksum=1495943047, serialized_key_size=-1, serialized_value_size=4)
ConsumerRecord(topic=u'my-topic', partition=0, offset=5747, timestamp=None, timestamp_type=None, key=None, value='\xc2Hello, stranger!', checksum=-1090450220, serialized_key_size=-1, serialized_value_size=17)

So I tried removing the value_deserializer from the consumer, and that code executes but without the deserializer the message comes out as a String, which isn't what I need. So, why doesn't the value_deserializer work? Is there a different way to get the JSON from the Kafka Message that I should be using?

Inkerman answered 7/4, 2017 at 16:21 Comment(1)
I prefer a simpler value_deserializer=lambda x: x.decode('utf-8') and then do the message_dict = json.loads(message.value) later, where you can conditionalize or wrap it in exception-handling.Sententious
I
3

It turns out the problem is the decode portion of value_deserializer=lambda m: json.loads(m).decode('utf-8') when I change it to value_deserializer=lambda m: json.loads(m) then I see the type of object being read from Kafka is now a dictionary. Which based on the following information from python's JSON documentation is correct:

|---------------------|------------------|
|       JSON          |     Python       |
|---------------------|------------------|
|      object         |      dict        |
|---------------------|------------------|
|      array          |      list        |
|---------------------|------------------|
|      string         |      unicode     |
|---------------------|------------------|
|      number (int)   |      int, long   |
|---------------------|------------------|
|      number (real)  |      float       |
|---------------------|------------------|
|      true           |      True        |
|---------------------|------------------|
|      false          |      False       |
|---------------------|------------------|
|      null           |      None        |
|---------------------|------------------|
Inkerman answered 7/4, 2017 at 16:40 Comment(1)
So, can you tell how did your problem get solved? I am facing the same error message like yours.Sebastian
M
9

My Problem was solved after decoding the message first into utf-8, and then json.load/dump it:

value_deserializer=lambda m: json.loads(m.decode('utf-8'))

instead of:

value_deserializer=lambda m: json.loads(m).decode('utf-8')

Hope this will also work for the Producer's side

Muriah answered 12/7, 2017 at 13:17 Comment(0)
I
3

It turns out the problem is the decode portion of value_deserializer=lambda m: json.loads(m).decode('utf-8') when I change it to value_deserializer=lambda m: json.loads(m) then I see the type of object being read from Kafka is now a dictionary. Which based on the following information from python's JSON documentation is correct:

|---------------------|------------------|
|       JSON          |     Python       |
|---------------------|------------------|
|      object         |      dict        |
|---------------------|------------------|
|      array          |      list        |
|---------------------|------------------|
|      string         |      unicode     |
|---------------------|------------------|
|      number (int)   |      int, long   |
|---------------------|------------------|
|      number (real)  |      float       |
|---------------------|------------------|
|      true           |      True        |
|---------------------|------------------|
|      false          |      False       |
|---------------------|------------------|
|      null           |      None        |
|---------------------|------------------|
Inkerman answered 7/4, 2017 at 16:40 Comment(1)
So, can you tell how did your problem get solved? I am facing the same error message like yours.Sebastian
N
2

You don't need the lambda... instead of

value_deserializer=lambda m: json.loads(m)

you should use

value_deserializer=json.load
Nonoccurrence answered 27/4, 2019 at 0:9 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.