Reading the data written to s3 by Amazon Kinesis Firehose stream
Asked Answered
C

14

34

I am writing record to Kinesis Firehose stream that is eventually written to a S3 file by Amazon Kinesis Firehose.

My record object looks like

ItemPurchase {
    String personId,
    String itemId
}

The data is written to S3 looks like:

{"personId":"p-111","itemId":"i-111"}{"personId":"p-222","itemId":"i-222"}{"personId":"p-333","itemId":"i-333"}

NO COMMA SEPERATION.

NO STARTING BRACKET as in a Json Array

[

NO ENDING BRACKET as in a Json Array

]

I want to read this data get a list of ItemPurchase objects.

List<ItemPurchase> purchases = getPurchasesFromS3(IOUtils.toString(s3ObjectContent))

What is the correct way to read this data?

Cacophonous answered 26/12, 2015 at 3:48 Comment(1)
Kinesis delivers the data in this odd format so that it can be queried via Athena link. It is strange why they have fixed this kind of format at the first place.Ballata
G
28

It boggles my mind that Amazon Firehose dumps JSON messages to S3 in this manner, and doesn't allow you to set a delimiter or anything.

Ultimately, the trick I found to deal with the problem was to process the text file using the JSON raw_decode method

This will allow you to read a bunch of concatenated JSON records without any delimiters between them.

Python code:

import json

decoder = json.JSONDecoder()

with open('giant_kinesis_s3_text_file_with_concatenated_json_blobs.txt', 'r') as content_file:

    content = content_file.read()

    content_length = len(content)
    decode_index = 0

    while decode_index < content_length:
        try:
            obj, decode_index = decoder.raw_decode(content, decode_index)
            print("File index:", decode_index)
            print(obj)
        except JSONDecodeError as e:
            print("JSONDecodeError:", e)
            # Scan forward and keep trying to decode
            decode_index += 1
Greater answered 21/3, 2018 at 22:39 Comment(1)
A bit concerned with the time complexity of this solutionNovosibirsk
F
7

I also had the same problem, here is how I solved.

  1. replace "}{" with "}\n{"
  2. line split by "\n".

    input_json_rdd.map(lambda x : re.sub("}{", "}\n{", x, flags=re.UNICODE))
                  .flatMap(lambda line: line.split("\n"))
    

A nested json object has several "}"s, so split line by "}" doesn't solve the problem.

Flavoring answered 15/2, 2017 at 22:3 Comment(3)
I considered doing something like this, but I think that if one of the strings inside the JSON object happens to include a }{ then this technique will break. Maybe if you go through each character, toggle a boolean if you hit a " (to indicate either entering or leaving a string), count the levels of objects you are in (increment on seeing { outside a string, decrement on seeing } outside a string), then consider the end of the object as when your level counter hits 0 again.Plowshare
The separator }{ is problematic because inner strings can have jsons inside them like this: }{\" (with escaped quotes) , so using }{" as a separator is a bit better because inner strings can't have quotesWithdrawn
To build on Eran's answer, I used a negative lookahead to account for the case when }{ occurs at the end of a string: re.sub('}{"(?![,}])', '}\n{"', string)Chiaki
H
4

I've had the same issue.

It would have been better if AWS allowed us to set a delimiter but we can do it on our own.

In my use case, I've been listening on a stream of tweets, and once receiving a new tweet I immediately put it to Firehose.

This, of course, resulted in a 1-line file which could not be parsed.

So, to solve this, I have concatenated the tweet's JSON with a \n. This, in turn, let me use some packages that can output lines when reading stream contents, and parse the file easily.

Hope this helps you.

Hen answered 15/7, 2016 at 22:35 Comment(0)
R
4

I think the best ways to tackle this is to first create a properly formatted json file containing well separated json objects within them. In my case I added ',' to the events which was pushed into the firehose. Then After a file is saved in s3, all the files will contain json object separated by some delimitter(comma- in our case). Another thing that must be added are '[' and ']' at the beginning and end of the file. Then you have a proper json file containing multiple json objects. Parsing them will be possible now.

Renaterenato answered 29/1, 2018 at 15:47 Comment(2)
This works for JSON but not for more complex markup such as XML. If every record is an XML document, they would need to be parsed and root elements wrapped into a new XML document and some kind enclosing element (I've used <array></array>). I'm currently trying to figure out how to read from S3 this way.Mccarter
This won't work if you have multiple producers sending valid json arrays to Firehose.Styliform
G
4

I used a transformation Lambda to add a line break at the end of every record

def lambda_handler(event, context):
    output = []

    for record in event['records']:

        # Decode from base64 (Firehose records are base64 encoded)
        payload = base64.b64decode(record['data'])

        # Read json as utf-8    
        json_string = payload.decode("utf-8")

        # Add a line break
        output_json_with_line_break = json_string + "\n"

        # Encode the data
        encoded_bytes = base64.b64encode(bytearray(output_json_with_line_break, 'utf-8'))
        encoded_string = str(encoded_bytes, 'utf-8')

        # Create a deep copy of the record and append to output with transformed data
        output_record = copy.deepcopy(record)
        output_record['data'] = encoded_string
        output_record['result'] = 'Ok'

        output.append(output_record)

    print('Successfully processed {} records.'.format(len(event['records'])))

    return {'records': output}
Gangplank answered 30/1, 2020 at 2:55 Comment(0)
I
3

If the input source for the firehose is an Analytics application, this concatenated JSON without a delimiter is a known issue as cited here. You should have a lambda function as here that outputs JSON objects in multiple lines.

Inadvisable answered 26/11, 2018 at 19:35 Comment(0)
M
2

Use this simple Python code.

input_str = '''{"personId":"p-111","itemId":"i-111"}{"personId":"p-222","itemId":"i-222"}{"personId":"p-333","itemId":"i-333"}'''

data_str = "[{}]".format(input_str.replace("}{","},{"))
data_json = json.loads(data_str)

And then (if you want) convert to Pandas.

import pandas as pd   
df = pd.DataFrame().from_records(data_json)
print(df)

And this is result

itemId personId
0  i-111    p-111
1  i-222    p-222
2  i-333    p-333
Maurili answered 17/5, 2019 at 10:19 Comment(0)
D
1

If there's a way to change the way data is written, please separate all the records by a line. That way you can read the data simply, line by line. If not, then simply build a scanner object which takes "}" as a delimiter and use the scanner to read. That would do the job.

Desperation answered 19/5, 2016 at 8:41 Comment(0)
D
1

You can find the each valid JSON by counting the brackets. Assuming the file starts with a { this python snippet should work:

import json

def read_block(stream):
    open_brackets = 0
    block = ''
    while True:
        c = stream.read(1)
        if not c:
            break

        if c == '{':
            open_brackets += 1
        elif c == '}':
            open_brackets -= 1

        block += c

        if open_brackets == 0:
            yield block
            block = ''


if __name__ == "__main__":
    c = 0
    with open('firehose_json_blob', 'r') as f:
        for block in read_block(f):
            record = json.loads(block)
            print(record)
Defile answered 15/8, 2018 at 14:45 Comment(1)
Warning: This is just a blind stream reader, so it will break if any of the JSON blobs contain strings that happen to have escaped brackets in them.Greater
S
1

This problem can be solved with a JSON parser that consumes objects one at a time from a stream. The raw_decode method of the JSONDecoder exposes just such a parser, but I've written a library that makes it straightforward to do this with a one-liner.

from firehose_sipper import sip

for entry in sip(bucket=..., key=...):
    do_something_with(entry)

I've added some more details in this blog post

Stopoff answered 14/2, 2022 at 7:32 Comment(0)
O
0

In Spark, we had the same problem. We're using the following:

from pyspark.sql.functions import *

@udf
def concatenated_json_to_array(text):
  final = "["
  separator = ""
  
  for part in text.split("}{"):
    final += separator + part
    separator = "}{" if re.search(r':\s*"([^"]|(\\"))*$', final) else "},{"
      
  return final + "]"


def read_concatenated_json(path, schema):
  return (spark.read
          .option("lineSep", None)
          .text(path)
          .withColumn("value", concatenated_json_to_array("value"))
          .withColumn("value", from_json("value", schema))
          .withColumn("value", explode("value"))
          .select("value.*"))  

It works as follows:

  1. Read the data as one string per file (no delimiters!)
  2. Use a UDF to introduce the JSON array and split the JSON objects by introducing a comma. Note: be careful not to break any strings with }{ in them!
  3. Parse the JSON with a schema into DataFrame fields.
  4. Explode the array into separate rows
  5. Expand the value object into column.

Use it like this:

from pyspark.sql.types import *

schema = ArrayType(
  StructType([
    StructField("type", StringType(), True),
    StructField("value", StructType([
      StructField("id", IntegerType(), True),
      StructField("joke", StringType(), True),
      StructField("categories", ArrayType(StringType()), True)  
    ]), True)
  ])
)

path = '/mnt/my_bucket_name/messages/*/*/*/*/'
df = read_concatenated_json(path, schema)

I've written more details and considerations here: Parsing JSON data from S3 (Kinesis) with Spark. Do not just split by }{, as it can mess up your string data! For example: { "line": "a\"r}{t" }.

Olag answered 23/11, 2021 at 9:13 Comment(0)
C
0

You can use below script.

If streamed data size is not over buffer size that you set, each file of s3 have one pair of brackets([]) and comma.

import base64

print('Loading function')


def lambda_handler(event, context):
    output = []

    for record in event['records']:
        print(record['recordId'])
        payload = base64.b64decode(record['data']).decode('utf-8')+',\n'

        # Do custom processing on the payload here

        output_record = {
            'recordId': record['recordId'],
            'result': 'Ok',
            'data': base64.b64encode(payload.encode('utf-8'))
        }
        output.append(output_record)

    last = len(event['records'])-1
    print('Successfully processed {} records.'.format(len(event['records'])))
    
    start = '['+base64.b64decode(output[0]['data']).decode('utf-8')
    end = base64.b64decode(output[last]['data']).decode('utf-8')+']'
    
    output[0]['data'] = base64.b64encode(start.encode('utf-8'))
    output[last]['data'] = base64.b64encode(end.encode('utf-8'))
    return {'records': output}

Consanguinity answered 1/12, 2021 at 5:49 Comment(0)
A
0

Using JavaScript Regex.

JSON.parse(`[${item.replace(/}\s*{/g, '},{')}]`);
Abort answered 11/1, 2022 at 4:27 Comment(0)
P
0

There is an option to add a newline delimeter to each log. Which will make the files valid JSONL format. https://jsonlines.org/

Navigate to the AWS console and select your kinesis firehose, select the configure tab, scroll down to Destination settings aws kinesis destination settings

Ensure that the newline delimeter option is enabled. aws kinesis delimeter option

Amazon Data Firehose > Firehose streams > {your-firehose-name} > Edit destination settings

Paregoric answered 28/2 at 21:31 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.