How to decode/deserialize Avro with Python from Kafka
Asked Answered
J

4

20

I am receiving from a remote server Kafka Avro messages in Python (using the consumer of Confluent Kafka Python library), that represent clickstream data with json dictionaries with fields like user agent, location, url, etc. Here is what a message looks like:

b'\x01\x00\x00\xde\x9e\xa8\xd5\x8fW\xec\x9a\xa8\xd5\x8fW\x1axxx.xxx.xxx.xxx\x02:https://website.in/rooms/\x02Hhttps://website.in/wellness-spa/\x02\xaa\x14\x02\x9c\n\x02\xaa\x14\x02\xd0\x0b\x02V0:j3lcu1if:rTftGozmxSPo96dz1kGH2hvd0CREXmf2\x02V0:j3lj1xt7:YD4daqNRv_Vsea4wuFErpDaWeHu4tW7e\x02\x08null\x02\nnull0\x10pageview\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x10Thailand\x02\xa6\x80\xc4\x01\x02\x0eBangkok\x02\x8c\xba\xc4\x01\x020*\xa9\x13\xd0\x84+@\x02\xec\xc09#J\x1fY@\x02\x8a\x02Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/58.0.3029.96 Chrome/58.0.3029.96 Safari/537.36\x02\x10Chromium\x02\x10Chromium\x028Google Inc. and contributors\x02\x0eBrowser\x02\x1858.0.3029.96\x02"Personal computer\x02\nLinux\x02\x00\x02\x1cCanonical Ltd.'

How to decode it? I tried bson decode but the string was not recognized as UTF-8 as it's a specific Avro encoding I guess. I found https://github.com/verisign/python-confluent-schemaregistry but it only supports Python 2.7. Ideally I would like to work with Python 3.5+ and MongoDB to process the data and store it as it's my current infrastructure.

Jacquerie answered 7/6, 2017 at 8:45 Comment(0)
J
24

I thought Avro library was just to read Avro files, but it actually solved the problem of decoding Kafka messages, as follow: I first import the libraries and give the schema file as a parameter and then create a function to decode the message into a dictionary, that I can use in the consumer loop.

import io

from confluent_kafka import Consumer, KafkaError
from avro.io import DatumReader, BinaryDecoder
import avro.schema

schema = avro.schema.Parse(open("data_sources/EventRecord.avsc").read())
reader = DatumReader(schema)

def decode(msg_value):
    message_bytes = io.BytesIO(msg_value)
    decoder = BinaryDecoder(message_bytes)
    event_dict = reader.read(decoder)
    return event_dict

c = Consumer()
c.subscribe(topic)
running = True
while running:
    msg = c.poll()
    if not msg.error():
        msg_value = msg.value()
        event_dict = decode(msg_value)
        print(event_dict)
    elif msg.error().code() != KafkaError._PARTITION_EOF:
        print(msg.error())
        running = False
Jacquerie answered 12/6, 2017 at 11:40 Comment(7)
I am getting this error when trying to import avro File "/usr/lib64/python3.6/tokenize.py", line 27, in <module> ImportError: cannot import name 'open' Do you know what might be wrong?Interpellate
Where was defined io?Backspin
Just added the missing "import io" ThanksJacquerie
And "open()" is a built-in Python function, so it should be callable without importing itJacquerie
HI, I am using kafka-python to do the same work. I am getting following error: [avro.io.SchemaResolutionException: Can't access branch index 55 for union with 2 branches.] Also, can you please help on which avro package are you using? Thanks.Gravely
note that the method name (with avro 1.10.1, at least) is avro.schema.parse() not Parse() :)Venison
errors out c = Consumer() typeerror: expected configuration dictHadik
H
32

If you use Confluent Schema Registry and want to deserialize avro messages, just add message_bytes.seek(5) to the decode function, since Confluent adds 5 extra bytes before the typical avro-formatted data.

def decode(msg_value):
    message_bytes = io.BytesIO(msg_value)
    message_bytes.seek(5)
    decoder = BinaryDecoder(message_bytes)
    event_dict = reader.read(decoder)
    return event_dict
Hemeralopia answered 3/6, 2020 at 16:43 Comment(10)
Thanks for this, I don't think I would have thought about that myself...Aspirate
Thanks! I was blocked just by that ;-)Sill
Whaaaaaaat, this is crazy -- but thank you very much for the fixMortenson
You are a saviour _/\_Copperplate
how to import reader in 5th line?Clisthenes
@EkremGurdal check Alexandre's answerHemeralopia
I can't thank you enough for this! You're a hero.Hanse
See confluent wire format: docs.confluent.io/platform/current/schema-registry/fundamentals/…Shabbir
Thank you for this. This absolutely saved my day.Scrape
It was awesome! Thank you for this clarify.Corazoncorban
J
24

I thought Avro library was just to read Avro files, but it actually solved the problem of decoding Kafka messages, as follow: I first import the libraries and give the schema file as a parameter and then create a function to decode the message into a dictionary, that I can use in the consumer loop.

import io

from confluent_kafka import Consumer, KafkaError
from avro.io import DatumReader, BinaryDecoder
import avro.schema

schema = avro.schema.Parse(open("data_sources/EventRecord.avsc").read())
reader = DatumReader(schema)

def decode(msg_value):
    message_bytes = io.BytesIO(msg_value)
    decoder = BinaryDecoder(message_bytes)
    event_dict = reader.read(decoder)
    return event_dict

c = Consumer()
c.subscribe(topic)
running = True
while running:
    msg = c.poll()
    if not msg.error():
        msg_value = msg.value()
        event_dict = decode(msg_value)
        print(event_dict)
    elif msg.error().code() != KafkaError._PARTITION_EOF:
        print(msg.error())
        running = False
Jacquerie answered 12/6, 2017 at 11:40 Comment(7)
I am getting this error when trying to import avro File "/usr/lib64/python3.6/tokenize.py", line 27, in <module> ImportError: cannot import name 'open' Do you know what might be wrong?Interpellate
Where was defined io?Backspin
Just added the missing "import io" ThanksJacquerie
And "open()" is a built-in Python function, so it should be callable without importing itJacquerie
HI, I am using kafka-python to do the same work. I am getting following error: [avro.io.SchemaResolutionException: Can't access branch index 55 for union with 2 branches.] Also, can you please help on which avro package are you using? Thanks.Gravely
note that the method name (with avro 1.10.1, at least) is avro.schema.parse() not Parse() :)Venison
errors out c = Consumer() typeerror: expected configuration dictHadik
U
7

If you have access to a Confluent schema registry server, you can also use Confluent's own AvroDeserializer to avoid messing with their magic 5 bytes:

from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer

def process_record_confluent(record: bytes, src: SchemaRegistryClient, schema: str):
    deserializer = AvroDeserializer(schema_str=schema, schema_registry_client=src)
    return deserializer(record, None) # returns dict
Upspring answered 20/1, 2021 at 8:36 Comment(4)
With the caveat that confluent_kafka doesn't install to Python on Windows (I believe)Mortenson
@Mortenson I've got it running on WSL 2. There are a couple of gotcha's but it works well enough for development purposes. It's not intended for production use. confluent.io/blog/set-up-and-run-kafka-on-windows-and-wsl-2Architectural
and if we don't have a Confluent schema registry server?Tamikotamil
I like this answer, but if you have access to the schema registry server, why do you need to pass in the schema as a string in order to deserialize? I was able to remove the schema_str argument to AvroDeserializer and just pass the schema client and it worked.Gooey
S
1

Decoding of the msg_value (c.poll().value()) was having issue in my case and using the below code to decode the value worked

import io
import avro
from avro.io import DatumReader, BinaryDecoder
message_bytes = io.BytesIO(msg.value())
message_bytes.seek(5)
decoder = BinaryDecoder(message_bytes)
schema = avro.schema.parse(jstr)
reader = DatumReader(schema)
event_dict = reader.read(decoder)
print(event_dict)
Signory answered 11/7, 2023 at 6:17 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.