Which class in AWS CDK have option to configure Dynamic partitioning for Kinesis delivery stream
Asked Answered
P

1

5

I'm using kinesis delivery stream to send stream, from event bridge to s3 bucket. But i can't seem to find which class have the option to configure dynamic partitioning?

this is my code for delivery stream:

new CfnDeliveryStream(this, `Export-delivery-stream`, {
        s3DestinationConfiguration: {
            bucketArn: bucket.bucketArn,
            roleArn: kinesisFirehoseRole.roleArn,
            prefix: `test/!{timestamp:yyyy/MM/dd}/`

        }
    });
Proustite answered 3/9, 2021 at 2:57 Comment(1)
I dont see dynamic partition config option available in aws-cdk yet. But it is available for aws-cli and cloudformationSealed
A
9

I have been working on the same issue for a few days, and have finally gotten something to work. Here is an example of how it can be implemented in CDK. In short, the partitioning has to be enables as you have done, but you need to set the key and .jq expression in the so-called processingConfiguration.

Our incomming json data looks something like this:

{
    "data": 
        {
        "timestamp":1633521266990,
        "defaultTopic":"Topic",
        "data":
        {
            "OUT1":"Inactive",
            "Current_mA":3.92
        }
    }
}

The CDK code looks as following:

const DeliveryStream = new CfnDeliveryStream(this, 'deliverystream', {
  deliveryStreamName: 'deliverystream',
  extendedS3DestinationConfiguration: {
    cloudWatchLoggingOptions: {
      enabled: true,
    },
    bucketArn: Bucket.bucketArn,
    roleArn: deliveryStreamRole.roleArn,
    prefix: 'defaultTopic=!{partitionKeyFromQuery:defaultTopic}/!{timestamp:yyyy/MM/dd}/',
    errorOutputPrefix: 'error/!{firehose:error-output-type}/',
    bufferingHints: {
      intervalInSeconds: 60,
    },
    dynamicPartitioningConfiguration: {
      enabled: true,
    },
    processingConfiguration: {
      enabled: true,
      processors: [
        {
          type: 'MetadataExtraction',
          parameters: [
            {
              parameterName: 'MetadataExtractionQuery',
              parameterValue: '{defaultTopic: .data.defaultTopic}',
            },
            {
              parameterName: 'JsonParsingEngine',
              parameterValue: 'JQ-1.6',
            },
          ],
        },
        {
          type: 'AppendDelimiterToRecord',
          parameters: [
            {
              parameterName: 'Delimiter',
              parameterValue: '\\n',
            },
          ],
        },
      ],
    },
  },
})
Aurelio answered 6/10, 2021 at 17:32 Comment(1)
It works but in your example, this line: parameterValue: '{Topic: .data.defaultTopic}' should be: parameterValue: '{defaultTopic: .data.defaultTopic}',Kwarteng

© 2022 - 2024 — McMap. All rights reserved.