Adding a column in AWS glue dynamic dataframe
Asked Answered
L

2

11

I am very new to AWS Glue. I am working on a small project and the ask is to read a file from S3 bucket, transpose it and load it in a mysql table. The source data in S3 bucket looks as below

    +----+----+-------+-----+---+--+--------+
    |cost|data|minutes|name |sms|id|category|
    +----+----+-------+-----+---+--+--------+
    |  5 |1000|  200  |prod1|500|p1|service |
    +----+----+-------+-----+---+--+--------+

The target table structure is Product_id, Parameter, value

I am expecting target table to have following values

p1, cost, 5

P1, data, 1000

I am able to load the target table with ID and Value. But I am not able to populate the parameter column. This column is not present in the input data and I want to populate a string depending on which column value I am populating.

Here is the code I used for cost.

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

## @type: DataSource
## @args: [database = "mainclouddb", table_name = "s3product", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "mainclouddb", table_name = "s3product", transformation_ctx = "datasource0")

## @type: ApplyMapping
## @args: [mapping = [("cost", "long", "value", "int"), ("id", "string", "product_id", "string")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("cost", "long", "value", "int"), ("id", "string", "product_id", "string")], transformation_ctx = "applymapping1")

## @type: SelectFields
## @args: [paths = ["product_id", "parameter", "value"], transformation_ctx = "selectfields2"]
## @return: selectfields2
## @inputs: [frame = applymapping1]
selectfields2 = SelectFields.apply(frame = applymapping1, paths = ["product_id", "parameter", "value"], transformation_ctx = "selectfields2")

## @type: ResolveChoice
## @args: [choice = "MATCH_CATALOG", database = "mainclouddb", table_name = "mysqlmaincloud_product_parameter_mapping", transformation_ctx = "resolvechoice3"]
## @return: resolvechoice3
## @inputs: [frame = selectfields2]
resolvechoice3 = ResolveChoice.apply(frame = selectfields2, choice = "MATCH_CATALOG", database = "mainclouddb", table_name = "mysqlmaincloud_product_parameter_mapping", transformation_ctx = "resolvechoice3")

## @type: ResolveChoice
## @args: [choice = "make_cols", transformation_ctx = "resolvechoice4"]
## @return: resolvechoice4
## @inputs: [frame = resolvechoice3]
resolvechoice4 = ResolveChoice.apply(frame = resolvechoice3, choice = "make_cols", transformation_ctx = "resolvechoice4")

## @type: DataSink
## @args: [database = "mainclouddb", table_name = "mysqlmaincloud_product_parameter_mapping", transformation_ctx = "datasink5"]
## @return: datasink5
## @inputs: [frame = resolvechoice4]
datasink5 = glueContext.write_dynamic_frame.from_catalog(frame = resolvechoice4, database = "mainclouddb", table_name = "mysqlmaincloud_product_parameter_mapping", transformation_ctx = "datasink5")

job.commit()

Can somebody help me to add this new column to my data frame so that it can be made available in the table?

Thanks

Lianaliane answered 11/11, 2019 at 20:1 Comment(3)
Btw, just noticed that the spark variable is unused...Garik
Could you be more clear on how you want to derive the column which is not present in your source data?Russophobe
I am planning to use the column name. Basically it will be a name value-pair and there will be one record for each name - cost, data and so on. I can use the column name as hardcoded string for this new column.Lianaliane
D
2

For a smaller datsframe you can do the following

  1. convert the dynamic frame to spark dataframe
  2. add a column
  3. convert back to dynamic frame

step 1

datasource0 = datasource0.toDF()

step 2

from pyspark.sql.functions import udf
getNewValues = udf(lambda val: val+1) # you can do what you need to do here instead of val+1

datasource0 = datasource0.withColumn('New_Col_Name', getNewValues(col('some_existing_col'))

step 3

from awsglue.dynamicframe import DynamicFrame
datasource0 = DynamicFrame.fromDF(datasource0, glueContext, "datasource0")

The issue is when you have a large dataset the operation toDF() is very expensive!

Depend answered 6/8, 2020 at 20:46 Comment(1)
So what's the solution for bigger dataframes?Berny
A
2

One way to add columns to a dynamicframe directly without converting a spark dataframe in between is to use a Map transformation (note that this is different from ApplyMapping).

So let's assume that your input dynframe (with the data looking like in your example row) is called dyf_in.

You can do something like the following to create 2 separate dynamicframes, one with the cost entries, and another with the data entries:

from awsglue.gluetypes import _create_dynamic_record
def getCosts(rec):
  return _create_dynamic_record({
    'Product_id':rec['id'],
    'Parameter':'cost',
    'value':rec['cost'
  }
def getDatas(rec):
  return _create_dynamic_record({
    'Product_id':rec['id'],
    'Parameter':'data',
    'value':rec['data']
  }

dyf_costs = Map.apply(frame=dyf_in, f=getCosts, transformation_ctx='dyf_costs')
dyf_datas = Map.apply(frame=dyf_in, f=getDatas, transformation_ctx='dyf_datas')

And then you either push those dynamicframes into the same sink, or use something like Join (after adding an extra column in the Map funcs to use as a unique join key, and then dropping it afterwards) to concatenate the two dynamicframes into a single one.

One thing I'm not sure if Glue is able to do (at least with Map) is do this sort of a transpose (which is what you're sort of trying to do?) directly without running through the same dynamicframe twice as my example does.

Glue Databrew seems to have some sort of a transpose function available, but I don't know much about Databrew, and maybe it's not even applicable to your situation, so I won't comment on that further.

Abjuration answered 10/2, 2022 at 13:11 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.