AWS Glue transform a struct into dynamicframe
Asked Answered
S

2

5

I am a little new to AWSGlue. I am working on transform a raw cloudwatch json out into csv with AWSGlue. The transformation script is pretty straight forward, however documentation and example doesn't seem to be comprehensive. The data structure is something like this:

{
"Label": "RequestCount",
"Datapoints": [
    {
        "Timestamp": "2017-07-23T00:00:00Z",
        "Sum": 41960.0,
        "Unit": "Count"
    },
    {
        "Timestamp": "2017-07-30T00:00:00Z",
        "Sum": 46065.0,
        "Unit": "Count"
    },
    {
        "Timestamp": "2017-08-24T00:00:00Z",
        "Sum": 43915.0,
        "Unit": "Count"
    },

The tricky part is to transform it from single dynamic frame(lable,string, datapoint array) into dynamic frames (Timestamp,string,Sum,Double,Unit,String). I am not sure which method to use in dynamic dataframe.

Speedwriting answered 13/12, 2017 at 5:20 Comment(0)
S
12

I don't think AWSGlue provide any mapping method for it. After some struggling, I found the transformation was relatively easy in the pyspark. Here is the pseudo code:

  • Retrieve datasource from database

    datasource0 = glueContext.create_dynamic_frame.from_catalog(database = ...)
    
  • Convert it into DF and transform it in spark

    mapped_df = datasource0.toDF().select(explode(col("Datapoints")).alias("collection")).select("collection.*")
    
  • Convert back to DynamicFrame and continue the rest of ETL process

    mapped_datasource0 = DynamicFrame.fromDF(mapped_df, glueContext, "mapped_datasource0");
    

Thanks to this reference

Speedwriting answered 14/12, 2017 at 14:46 Comment(0)
E
0

Check the function split_row in the following link:

https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-dynamic-frame.html#aws-glue-api-crawler-pyspark-extensions-dynamic-frame-split_rows

Here is also an example of how to transform directly the DynamicFrame in case you also need to do it first:

datasource = glueContext.create_dynamic_frame.from_catalog(database = ...)

# Function to modify a single record
def process_record(record):
    # Changes in the fields or adding fields
    record["timestamp"] = record["Datapoints"] + '_sufix' # Any change you need
    ...
    return record

processed_datasource = datasource.map(process_record)
Epigeous answered 14/7, 2023 at 20:35 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.