How to produce Kafka messages with JSON format in Python
Asked Answered
S

1

5

How to delete quotes and send data like on original format The original JSON-format is:

{
  "@timestamp": "2020-06-02T09:38:03.183186Z"
}

This data in another topic

"{\"@timestamp\": \"2020-05-25T17:40:47.582778Z\"}"

This is a code of sending data between servers

def parse(d):   
    if str(type(d)) == "<class 'dict'>":       
        return (r)
    return -1

producer = KafkaProducer(bootstrap_servers=param["BOOTSTRAP_SERVERS"],
                                 value_serializer=lambda x: dumps(x).encode('utf-8'))  # utf-8
consumer = KafkaConsumer(bootstrap_servers=param["BOOTSTRAP_SERVERS"]+'1',
                                 auto_offset_reset=param["AUTO_OFFSET_RESET"],
                                 consumer_timeout_ms=param["CONSUMER_TIMEOUT_MS"],
                                 enable_auto_commit=False,
                                 auto_commit_interval_ms=60000,
                                 group_id=param["GROUP_ID"],
                                 client_id=param["CLIENT_ID"]
                                 )
consumer.subscribe([param["TOPIC_IN"]])
 while True:
      num_rows = 0
      for msg in consumer:
          num_rows = num_rows + 1
          m = json.loads(msg.value)
          j = parse(m)
          if j != -1:
             data = json.dumps(j)
             producer.send(param["TOPIC_OUT"], value=data)
Spoor answered 2/6, 2020 at 9:49 Comment(0)
B
7

You are currently serializing your values as strings. If you want JSON instead of string, then you will need to properly serialise your values.


The following should do the trick:

import json  

producer = KafkaProducer(
    bootstrap_servers='mykafka-broker',
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
Burny answered 2/6, 2020 at 10:20 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.