Can I automatically append newlines to AWS Firehose records?
Asked Answered
A

4

21

I am trying to configure a Kinesis Analytics application with the following settings:

  • Input stream is a Kinesis Firehose which is taking stringified JSON values
  • The SQL is a simple passthrough (it needs to be more complicated later but for testing, it just sends the data through)
  • Output stream is a second Kinesis Firehose which delivers records to an S3 bucket

Later down the line, I will import the contents of the S3 bucket using Hive + JSONSERDE which expects each JSON record to live on its own line. The Firehose output just appends all of the JSON records which breaks JSONSERDE.

I could attach an AWS Lambda data formatter to the output stream but that seems expensive. All I want is to split each record using a newline.

If I was doing without an Analytics app I would append the newline to each Firehose record. It seems strange that there is no way to do that in the app's SQL:

CREATE OR REPLACE STREAM "STREAM_OUT" (
  a VARCHAR(4),
  b VARCHAR(4),
  c VARCHAR(4)
);
CREATE OR REPLACE PUMP "STREAM_PUMP" AS
  INSERT INTO "STREAM_OUT"
    SELECT STREAM
      "a",
      "b",
      "c"
    FROM "SOURCE_SQL_STREAM_001";

Is the best answer to add the Lambda data formatter? I'd really like to avoid this.

Asymmetric answered 29/5, 2017 at 16:1 Comment(0)
G
5

I am posting an answer just to keep the question updated with the recent AWS announcements. AWS recently has announced the availability of Dynamic Partitioning on Kinesis Firehose Delivery streams. It supports adding the new line character per record. For more information please see this and this.

Gogh answered 28/9, 2021 at 10:58 Comment(1)
The caveat is that it only works for newly created Data Streams (created from scratch), so if you have an existing one you'll need to use the transformation lambda approachCanorous
A
4

I had a similar requirement to add new lines to the firehose generated files, In our application firehose is invoked via API Gateway.

This is specified in the Body Mapping Templates under Integration Request section.

The following command in the API Gateway generates new lines to the kinesis firehose records.

Method 1 :

    #set($payload="$input.path('$.Record.Data')
")
        {
            "DeliveryStreamName": "$input.path('$.DeliveryStreamName')",
            "Record": {
            "Data": "$util.base64Encode($payload)"
        }
        }

This works perfectly if you are invoking firehose via API Gateway.

Thanks & Regards, Srivignesh KN

Aconite answered 2/8, 2017 at 0:0 Comment(8)
This works for the Firehose but not the Analytics App. The Analytics App stripped out the newlines in its output.Asymmetric
You could try data transformation within firehose and add new lines by using lambda function and then let kinesis to pass it on to S3.Aconite
I mentioned that in the question. I'd rather not add a Lambda transformer just for this.Asymmetric
Did you find any other ways to add new line delimiters? Last Friday AWS released a new update to some regions which broke your solution. Adding Cg== now doesn't help. It throws a SerializationException now in most cases if you add Cg==. For example, this code doesn't work anymore: { "DeliveryStreamName": "fus-bear-csv-dev", "Records": [ { "Data": "$util.base64Encode('a')Cg==" } ] } Method response body after transformations: {"__type":"SerializationException"}Macaulay
Thanks for letting know, can you tell which region are you facing it, i shall try from it and post an alternative. This works perfectly for me in US-East and Us-West regions.Aconite
{"__type":"SerializationException"} usually occurs when the incoming payload is not same as the delivery mapping template or when the json is malformed. In your example you use Records instead of Record in which case i believe you should have action as PutRecords, and you are just passing 'a' instead of a buffer of values.Aconite
@AndreyCheptsov this did affect me from last week and I have modified the template to the new format as described in the post.Aconite
@AndreyCheptsov I can also confirm that usage of "Cg==" stopped working recently after a successful year in production in us-east-1. Thank you for sharing your finding! Did AWS ever confirm this? Or do they have release notes? I wish I had known about this breaking change sooner.Whittle
J
2

A basic example here in the way which we implemented. We used javascript to put records into Kinesis Stream and used Firehose to redirect into s3 location with gzip compression. Later athena will query from the s3 location for fetching records from s3.

Below Code for adding new Line before sending to Kinesis Stream using javascript code.

var payload = JSON.parse(payload);  
finalData = JSON.stringify(payload)+"\n";

var kinesisPayload = {};    
kinesisPayload.Data = finalData;    
kinesisPayload.StreamName = "kinesisStreamName");    
kinesisPayload.PartitionKey = "124";
Japhetic answered 6/3, 2018 at 3:29 Comment(1)
I'm guessing this won't work with the Analytics App, though. Adding newlines to the original records isn't hard but the Analytics App stripped them out before sending them into Firehose.Asymmetric
M
2

Solution using Python or Node.js

I am using DynamoDB Streams and I needed to get those records saved into S3. I implemented a Kinesis Firehose stream along with a Lambda function. This worked for getting my records into S3 as JSON strings, however, every record that was saved to the file in S3 was inline, that is, in one single continuous row and so I needed to add a new line at the end of each record that was added so that each record was on its own line. For my solution, I ended up having to do some base64 decoding/encoding.

Here is how I did it:

  1. When you create your Kinesis Firehose stream, enable "Transform
    source records with AWS Lambda" (select "Enabled"). If you have already created your stream, you can still enable this feature by editing your existing stream.
  2. At this point you will need to select another Lambda function that performs this transformation. In my case, I needed to add a new line at the end of each record so that when I open the file up in a text editor and view it, every entry is on a separate line.

Below is the tested solution code for both Python and Node.js that I used for that second Lambda:

Python solution to add a newline:

import json
import boto3
import base64

output = []

def lambda_handler(event, context):
    
    for record in event['records']:
        payload = base64.b64decode(record['data']).decode('utf-8')
        print('payload:', payload)
        
        row_w_newline = payload + "\n"
        print('row_w_newline type:', type(row_w_newline))
        row_w_newline = base64.b64encode(row_w_newline.encode('utf-8'))
        
        output_record = {
            'recordId': record['recordId'],
            'result': 'Ok',
            'data': row_w_newline
        }
        output.append(output_record)

    print('Processed {} records.'.format(len(event['records'])))
    
    return {'records': output}

Node.js solution to add a newline:

'use strict';
console.log('Loading function');

exports.handler = (event, context, callback) => {

   
    /* Process the list of records and transform them */
    const output = event.records.map((record) => {
        
        let entry = (new Buffer(record.data, 'base64')).toString('utf8');
        let result = entry + "\n"
        const payload = (new Buffer(result, 'utf8')).toString('base64');
            
            return {
                recordId: record.recordId,
                result: 'Ok',
                data: payload,
            };
            
    });
    console.log(`Processing completed.  Successful records ${output.length}.`);
    callback(null, { records: output });
};

Some good references that helped me piece the Python version together:

In the original question up above, MrHen wanted to do this without using a second Lambda. I was able to get this working in the first Lambda, rather than using the Kinesis Firehose transform source records feature. I did this by taking the newImage from DynamoDB and doing, in this order: encode, decode, add new line ("\n"), encode, decode. There's probably a much cleaner way. I chose to go with the transform source records feature using the second Lambda function as it seems cleaner to me at this time.

In my case, the single Lambda solution looked like this:

 # Not pretty, but it works! Successfully adds new line to record.
 # newImage comes from the DynamoDB Stream as a Python dictionary object,
 # I convert it to a string before running the code below.

    newImage = base64.b64encode(newImage.encode('utf-8'))
    newImage = base64.b64decode(newImage).decode('utf-8')
    newImage = newImage + "\n"
    newImage = base64.b64encode(newImage.encode('utf-8'))
    newImage = base64.b64decode(newImage).decode('utf-8')
Might answered 29/12, 2020 at 21:18 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.