Solution using Python or Node.js
I am using DynamoDB Streams and I needed to get those records saved into S3. I implemented a Kinesis Firehose stream along with a Lambda function. This worked for getting my records into S3 as JSON strings, however, every record that was saved to the file in S3 was inline, that is, in one single continuous row and so I needed to add a new line at the end of each record that was added so that each record was on its own line. For my solution, I ended up having to do some base64 decoding/encoding.
Here is how I did it:
- When you create your Kinesis Firehose stream, enable "Transform
source records with AWS Lambda" (select "Enabled"). If you have already created your stream, you can still enable this feature by editing your existing stream.
- At this point you will need to select another Lambda function that performs this transformation. In my case, I needed to
add a new line at the end of each record so that when I open the file up in a text editor and view it, every entry is on a separate line.
Below is the tested solution code for both Python and Node.js that I used for that second Lambda:
Python solution to add a newline:
import json
import boto3
import base64
output = []
def lambda_handler(event, context):
for record in event['records']:
payload = base64.b64decode(record['data']).decode('utf-8')
print('payload:', payload)
row_w_newline = payload + "\n"
print('row_w_newline type:', type(row_w_newline))
row_w_newline = base64.b64encode(row_w_newline.encode('utf-8'))
output_record = {
'recordId': record['recordId'],
'result': 'Ok',
'data': row_w_newline
}
output.append(output_record)
print('Processed {} records.'.format(len(event['records'])))
return {'records': output}
Node.js solution to add a newline:
'use strict';
console.log('Loading function');
exports.handler = (event, context, callback) => {
/* Process the list of records and transform them */
const output = event.records.map((record) => {
let entry = (new Buffer(record.data, 'base64')).toString('utf8');
let result = entry + "\n"
const payload = (new Buffer(result, 'utf8')).toString('base64');
return {
recordId: record.recordId,
result: 'Ok',
data: payload,
};
});
console.log(`Processing completed. Successful records ${output.length}.`);
callback(null, { records: output });
};
Some good references that helped me piece the Python version together:
In the original question up above, MrHen wanted to do this without using a second Lambda. I was able to get this working in the first Lambda, rather than using the Kinesis Firehose transform source records feature. I did this by taking the newImage from DynamoDB and doing, in this order: encode, decode, add new line ("\n"), encode, decode. There's probably a much cleaner way. I chose to go with the transform source records feature using the second Lambda function as it seems cleaner to me at this time.
In my case, the single Lambda solution looked like this:
# Not pretty, but it works! Successfully adds new line to record.
# newImage comes from the DynamoDB Stream as a Python dictionary object,
# I convert it to a string before running the code below.
newImage = base64.b64encode(newImage.encode('utf-8'))
newImage = base64.b64decode(newImage).decode('utf-8')
newImage = newImage + "\n"
newImage = base64.b64encode(newImage.encode('utf-8'))
newImage = base64.b64decode(newImage).decode('utf-8')