Backfill AWS Kinesis Firehose to Elasticsearch Service failed records
Asked Answered
C

2

7

We have a firehose that sends records to an Elasticsearch Service cluster. Our cluster filled up and some records failed over to S3. The documentation at https://docs.aws.amazon.com/firehose/latest/dev/basic-deliver.html#retry indicates that failed records can be used to backfill: "The skipped documents are delivered to your S3 bucket in the elasticsearch_failed/ folder, which you can use for manual backfill" but I haven't been able to find any documentation on how to accomplish this.

Looking at the records they appear to be gzip files of text files containing JSON blobs with a "rawData" field containing a base64 encoded string of the original record we sent to firehose.

Is there a existing tool to process these gzip files out of S3, break them down, and re-submit the records? The documentation implies that you can "just manually backfill" and it's a pretty standardized flow so my assumption is someone has done this before but I haven't been able to find how.

Comestible answered 13/4, 2018 at 17:45 Comment(1)
Did you find any proper solution for this? I was also searching the web for some "in-built" support for this. AWS Data Pipeline provides a better way in sense you can just re-run the failed instance.Judoka
B
1

I suppose manual backfill means to use one of the AWS SDKs to send the documents into Elasticsearch again. An example in python (using boto3), of reading a failure file from S3 and sending the documents within to Elasticsearch:

es_client = boto3.client('es', region_name=REGION, aws_access_key_id=ACCESS_KEY_ID, aws_secret_access_key=SECRET_ACCESS_KEY)
s3_client = boto3.client('s3', region_name=REGION, aws_access_key_id=ACCESS_KEY_ID, aws_secret_access_key=SECRET_ACCESS_KEY)

file = s3_client.get_object(Bucket=bucket, Key=key)
text = file['Body'].read().decode("utf-8")
failure_cases = list(map(lambda x: json.loads(x), filter(None, text.split('\n'))))

for case in failure_cases:
    try:
        data = base64.b64decode(case['rawData'])
        es_instance.create(index=case['esIndexName'], id=case['esDocumentId'], body=data)
        logger.debug("Successfully sent {}".format(case['esDocumentId']))
    except RequestError:
        logger.info("Retry failed for Document ID {}\nReason: {}"
                    .format(case['esDocumentId'], case['errorMessage']))
Bigamist answered 14/4, 2019 at 6:58 Comment(0)
C
1

Had the same problem, modified the above script to backfill failed documents (with 403 )to an existing elasticsearch instance

import boto3
import json
import base64
import logger
import requests

s3_client = boto3.client('s3', region_name="xx-xx-x", aws_access_key_id="xxxx", aws_secret_access_key="xxxx")
s3keys = s3_client.list_objects(Bucket="bucketname", Prefix='path/to/folder/file')
for s3key in s3keys['Contents']:
    print(s3key['Key'])
    file = s3_client.get_object(Bucket="bucketname", Key=s3key['Key'])
    text = file['Body'].read().decode("utf-8")
    failure_cases = list(map(lambda x: json.loads(x), filter(None, text.split('\n'))))
    for case in failure_cases:
        data = base64.b64decode(case['rawData'])
        esid = case['esDocumentId']
        esIndexName = case['esIndexName']
        doc = data.decode('utf-8')
        url = ("https://es-domain-name/%s/_doc/%s" %(esIndexName, esid ))
        headers = {"content-type": "application/json", "Accept-Charset": "UTF-8"}
        if case['errorCode'] == '403':
            try:
                print(case['errorCode'])
                r = requests.post(url, data=doc, headers=headers, auth=('user', 'password'))
                response = r.json()
                print(response)
            except:
                pass
Cameroncameroon answered 12/8, 2020 at 10:21 Comment(1)
My brus, for both answers, this: failure_cases = list(map(lambda x: json.loads(x), filter(None, text.split('\n')))) code is hiiiiiideeeoousssss and not pythonic. A better alternative: [json.loads(d) for d in body.strip().split(b"\n")] where body isn't decoded (hence split by b"\n").Glossy

© 2022 - 2024 — McMap. All rights reserved.