Can I customize partitioning in Kinesis Firehose before delivering to S3?
Asked Answered
S

6

21

I have a Firehose stream that is intended to ingest millions of events from different sources and of different event-types. The stream should deliver all data to one S3 bucket as a store of raw\unaltered data.

I was thinking of partitioning this data in S3 based on metadata embedded within the event message like event-souce, event-type and event-date.

However, Firehose follows its default partitioning based on record arrival time. Is it possible to customize this partitioning behavior to fit my needs?

Update: Accepted answer updated as a new answer suggests the feature is available as of Sep 2021

Singlehanded answered 12/7, 2018 at 20:27 Comment(2)
Similar to: Partitioning AWS Kinesis Firehose data to s3 by payloadVizierate
@JohnRotenstein Unfortunately answers do not address the question. Both suggesting attaching a lambda function that would route the incoming data based to different streams based on a particular ID. This and the other question was addressing whether it is possible to define the partitioning methodology for firehose. Thank you for the reference, though !!Singlehanded
C
13

Since September 1st, 2021, AWS Kinesis Firehose supports this feature. Read the announcement blog post here.

From the documentation:

You can use the Key and Value fields to specify the data record parameters to be used as dynamic partitioning keys and jq queries to generate dynamic partitioning key values. ...

Here is how it looks like from UI:

enter image description here enter image description here

Campestral answered 1/9, 2021 at 6:58 Comment(1)
this is awesome!Detergent
H
16

As of writing this, the dynamic partitioning feature Vlad has mentioned is still pretty new. I needed it to be a part of CloudFormation template, which was still not properly documented. I had to add in DynamicPartitioningConfiguration to get it working properly. MetadataExtractionQuery syntax was also not properly documented.

  MyKinesisFirehoseStream:
    Type: AWS::KinesisFirehose::DeliveryStream
    ...
    Properties:
      ExtendedS3DestinationConfiguration:
        Prefix: "clients/client_id=!{client_id}/dt=!{timestamp:yyyy-MM-dd}/"
        ErrorOutputPrefix: "errors/!{firehose:error-output-type}/"
        DynamicPartitioningConfiguration:
          Enabled: "true"
          RetryOptions:
            DurationInSeconds: "300"
        ProcessingConfiguration:
          Enabled: "true"
          Processors:
            - Type: AppendDelimiterToRecord
            - Type: MetadataExtraction
              Parameters:
                - ParameterName: MetadataExtractionQuery
                  ParameterValue: "{client_id:.client_id}"
                - ParameterName: JsonParsingEngine
                  ParameterValue: JQ-1.6

Humanism answered 3/9, 2021 at 16:20 Comment(4)
It is April 2022 and there is still no documentation I could find on MetadataExtractionQuery. This answer was really helpful to figure out the syntax. Thanks!Hewett
There are some examples here: docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/… ...but they are rather unclear compared to the simple partitioning in this answerHorsemint
Great to have a working example! However, I had to write Prefix: "clients/client_id=!{partitionKeyFromQuery:client_id}/dt=!{timestamp:yyyy-MM-dd}/", maybe it's useful for someone else.Endure
For anyone else not being able to figure it out on the spot: the "Dynamic partitioning keys" in the UI correspond to the "ProcessingConfiguration" here in the template.Endure
C
13

Since September 1st, 2021, AWS Kinesis Firehose supports this feature. Read the announcement blog post here.

From the documentation:

You can use the Key and Value fields to specify the data record parameters to be used as dynamic partitioning keys and jq queries to generate dynamic partitioning key values. ...

Here is how it looks like from UI:

enter image description here enter image description here

Campestral answered 1/9, 2021 at 6:58 Comment(1)
this is awesome!Detergent
V
5

No. You cannot 'partition' based upon event content.

Some options are:

  • Send to separate Firehose streams
  • Send to a Kinesis Data Stream (instead of Firehose) and write your own custom Lambda function to process and save the data (See: AWS Developer Forums: Athena and Kinesis Firehose)
  • Use Kinesis Analytics to process the message and 'direct' it to different Firehose streams

If you are going to use the output with Amazon Athena or Amazon EMR, you could also consider converting it into Parquet format, which has much better performance. This would require post-processing of the data in S3 as a batch rather than converting the data as it arrives in a stream.

Vizierate answered 14/7, 2018 at 21:42 Comment(0)
E
4

To expand on Murali's answer, we have implemented it in CDK:

Our incomming json data looks something like this:

{
    "data": 
        {
        "timestamp":1633521266990,
        "defaultTopic":"Topic",
        "data":
        {
            "OUT1":"Inactive",
            "Current_mA":3.92
        }
    }
}

The CDK code looks as following:

const DeliveryStream = new CfnDeliveryStream(this, 'deliverystream', {
  deliveryStreamName: 'deliverystream',
  extendedS3DestinationConfiguration: {
    cloudWatchLoggingOptions: {
      enabled: true,
    },
    bucketArn: Bucket.bucketArn,
    roleArn: deliveryStreamRole.roleArn,
    prefix: 'defaultTopic=!{partitionKeyFromQuery:defaultTopic}/!{timestamp:yyyy/MM/dd}/',
    errorOutputPrefix: 'error/!{firehose:error-output-type}/',
    bufferingHints: {
      intervalInSeconds: 60,
    },
    dynamicPartitioningConfiguration: {
      enabled: true,
    },
    processingConfiguration: {
      enabled: true,
      processors: [
        {
          type: 'MetadataExtraction',
          parameters: [
            {
              parameterName: 'MetadataExtractionQuery',
              parameterValue: '{Topic: .data.defaultTopic}',
            },
            {
              parameterName: 'JsonParsingEngine',
              parameterValue: 'JQ-1.6',
            },
          ],
        },
        {
          type: 'AppendDelimiterToRecord',
          parameters: [
            {
              parameterName: 'Delimiter',
              parameterValue: '\\n',
            },
          ],
        },
      ],
    },
  },
})
Embrey answered 6/10, 2021 at 17:37 Comment(3)
Do you know how to use 2 fields as 2 separate values?Malemute
sample parameter with 2 fields, { parameterName: 'MetadataExtractionQuery', parameterValue: '{Topic:.data.defaultTopic,out1:.data.data.OUT1}', }Toughen
Thank you, @Toughen ! I think this is the only place on the web that this is documented.Forelady
C
3

To build on John's answer, if you don't have the near real-time streaming requirements, we've found batch-processing with Athena to be a simple solution for us.

Kinesis streams to a given table unpartitioned_event_data, which can make use of the native record arrival time partitioning.

We define another Athena table partitioned_event_table which can be defined with custom partition keys and make use of the INSERT INTO capabilities that Athena has. Athena will automatically repartition your data in the format you want without requiring any custom consumers or new infrastructure to manage. This can be scheduled with a cron, SNS, or something like Airflow.

What's cool is you can create a view that does a UNION of the two tables to query historical and real-time data in one place.

We actually dealt with this problem at Radar and talk about more trade-offs in this blog post.

Corinthians answered 14/6, 2021 at 17:58 Comment(0)
A
1

My scenario is:

Firehose needs to send data to s3, which is tied to glue table, parquet as format, and dynamic partitioning enabled since I want to consider the year, month, and day from the data I push to firehose instead of the default.

Below is the working code

  rawdataFirehose:
    Type: AWS::KinesisFirehose::DeliveryStream
    Properties:
      DeliveryStreamName: !Join ["-", [rawdata, !Ref AWS::StackName]]
      DeliveryStreamType: DirectPut
      ExtendedS3DestinationConfiguration:
        BucketARN: !GetAtt rawdataS3bucket.Arn
        Prefix: parquetdata/year=!{partitionKeyFromQuery:year}/month=!{partitionKeyFromQuery:month}/day=!{partitionKeyFromQuery:day}/
        BufferingHints:
          IntervalInSeconds: 300
          SizeInMBs: 128
        ErrorOutputPrefix: errors/
        RoleARN: !GetAtt FirehoseRole.Arn
        DynamicPartitioningConfiguration:
          Enabled: true
        ProcessingConfiguration:
          Enabled: true
          Processors:
            - Type: MetadataExtraction
              Parameters:
                - ParameterName: MetadataExtractionQuery
                  ParameterValue: "{year:.year,month:.month,day:.day}"
                - ParameterName: "JsonParsingEngine"
                  ParameterValue: "JQ-1.6"
        DataFormatConversionConfiguration:
          Enabled: true
          InputFormatConfiguration:
            Deserializer:
              HiveJsonSerDe: {}
          OutputFormatConfiguration:
            Serializer:
              ParquetSerDe: {}
          SchemaConfiguration:
            CatalogId: !Ref AWS::AccountId
            RoleARN: !GetAtt FirehoseRole.Arn
            DatabaseName: !Ref rawDataDB
            TableName: !Ref rawDataTable
            Region:
              Fn::ImportValue: AWSRegion
            VersionId: LATEST

  FirehoseRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Principal:
              Service: firehose.amazonaws.com
            Action: sts:AssumeRole
      Policies:
        - PolicyName: !Sub firehose-glue-${Envname}
          PolicyDocument: |
            {
              "Version": "2012-10-17",
              "Statement":
                [
                  {
                    "Effect": "Allow",
                    "Action":
                      [
                        "glue:*",
                        "iam:ListRolePolicies",
                        "iam:GetRole",
                        "iam:GetRolePolicy",
                        "tag:GetResources",
                        "s3:*",
                        "cloudwatch:*",
                        "ssm:*"
                      ],
                    "Resource": "*"
                  }
                ]
            }

Note:

rawDataDB is a reference to glue database

rawDataTable is a reference to table

rawdataS3bucket is a reference to s3 bucket

Ambert answered 2/11, 2022 at 19:24 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.