AWS Glue: How to add a column with the source filename in the output?
Asked Answered
N

5

8

Does anyone know of a way to add the source filename as a column in a Glue job?

We created a flow where we crawled some files in S3 to create a schema. We then wrote a job that transforms the files to a new format, and the writes those files back to another S3 bucket as CSV, to be used by the rest of our pipeline. What we would like to do is get access to some sort of job meta properties so we can add a new column to the output file that contains the original filename.

I looked through the AWS documentation and the aws-glue-libs source, but didn't see anything that jumped out. Ideally there would be some way to get metadata from the awsglue.job package (we're using the python flavor).

I'm still learning Glue, so apologies if I'm using the wrong terminology. I tagged this with the spark tag as well, because I believe that's what Glue is using under the covers.

Nowicki answered 10/5, 2018 at 16:35 Comment(0)
C
4

You can do it with spark in your etl job:

var df = glueContext.getCatalogSource(
  database = database,
  tableName = table,
  transformationContext = s"source-$database.$table"
).getDynamicFrame()
 .toDF()
 .withColumn("input_file_name", input_file_name())

glueContext.getSinkWithFormat(
  connectionType = "s3",
  options = JsonOptions(Map(
    "path" -> args("DST_S3_PATH")
  )),
  transformationContext = "",
  format = "parquet"
).writeDynamicFrame(DynamicFrame(df, glueContext))

Remember it works with getCatalogSource() API only and not with create_dynamic_frame_from_options()

Churchgoer answered 11/5, 2018 at 14:24 Comment(2)
Works great! I imported input_file_name with from pyspark.sql.functions import input_file_name.Nowicki
is this still expected to work in 2022 with glueContext.create_sample_dynamic_frame_from_catalog ? when reading from S3 files, the filename can be added. but when reading from a catalog create by a glue crawler it cannot. i am now looking into using athena to write a new table with a query to get it to work aws.amazon.com/premiumsupport/knowledge-center/…Muth
F
5

With an AWS Glue Python auto-generated script, I've added the following lines:

from pyspark.sql.functions import input_file_name

## Add the input file name column
datasource1 = datasource0.toDF().withColumn("input_file_name", input_file_name())

## Convert DataFrame back to DynamicFrame
datasource2 = datasource0.fromDF(datasource1, glueContext, "datasource2")

Then, in the ApplyMapping or datasink portions of the code, you reference datasource2.

Ford answered 20/12, 2018 at 4:32 Comment(1)
doesn't work for meNganngc
C
4

You can do it with spark in your etl job:

var df = glueContext.getCatalogSource(
  database = database,
  tableName = table,
  transformationContext = s"source-$database.$table"
).getDynamicFrame()
 .toDF()
 .withColumn("input_file_name", input_file_name())

glueContext.getSinkWithFormat(
  connectionType = "s3",
  options = JsonOptions(Map(
    "path" -> args("DST_S3_PATH")
  )),
  transformationContext = "",
  format = "parquet"
).writeDynamicFrame(DynamicFrame(df, glueContext))

Remember it works with getCatalogSource() API only and not with create_dynamic_frame_from_options()

Churchgoer answered 11/5, 2018 at 14:24 Comment(2)
Works great! I imported input_file_name with from pyspark.sql.functions import input_file_name.Nowicki
is this still expected to work in 2022 with glueContext.create_sample_dynamic_frame_from_catalog ? when reading from S3 files, the filename can be added. but when reading from a catalog create by a glue crawler it cannot. i am now looking into using athena to write a new table with a query to get it to work aws.amazon.com/premiumsupport/knowledge-center/…Muth
H
1

I am using an AWS Glue Python auto-generated script. I tried using the solution from JcMaco as this is exactly what I needed and it is a very simple solution to use input_file_name().

However, I could not get that to work, my column always came back empty aside from the header of the column, but I was able to get the name of the Glue job and use that as a constant in a new column and it serves the same purpose as input_file_name() in this particular use case for me.

If you look at the top left of your script, you will see where the args variable gets created. Use args to get access to JOB_NAME as shown below.

How I did it:

from pyspark.sql.functions import *

job_name = args['JOB_NAME'] # define new variable

(The JOB_NAME is passed in as a command line argument.)

Then, after the datasource0 definition in your script, use job_name along with the lit function:

applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = […] , transformation_ctx = "applymapping1") 
applymapping2 = applymapping1.toDF().withColumn("job_name", lit(job_name))
applymapping3 = applymapping1.fromDF(applymapping2, glueContext, "applymapping3")

In the example above, you would change the assignment of your frame parameter in your datasink definition to applymapping3.

Hachure answered 16/12, 2020 at 0:26 Comment(0)
R
0

You can also use the Map transformation to apply a function to all records of a DynamicFrame, inside the function you can add fields, one field can be the name of the job, that value can be send as an argument to the AWS Glue Job:

# add argument to the job
client.start_job_run(
               JobName = 'my_test_Job',
               Arguments = {
                 '--job_name':   'name', } )
# read the argument
args = getResolvedOptions(sys.argv,
                          ['job_name'])
arg_job_name = args["job_name"]

inputDyf = glueContext.create_dynamic_frame_from_options(
    ...
)


def mapping(record: Dict[str, Any]):
    record["Job"] = arg_job_name
    return record 

mapped_dyF = Map.apply(frame=inputDyf, f=mapping)
Reremouse answered 2/7, 2022 at 13:9 Comment(0)
H
0

If you are using GlueContext.write_dynamic_frame.from_options you can use the attachFilename key in format_options to add a column with the source filename aws-glue-programming-etl-format

dynamicFrame = glueContext.create_dynamic_frame.from_options(
    connection_type="s3",
    connection_options={"paths": ["s3://s3path"]},
    format="json",
    format_options={
        "multiline": True,
        "attachFilename": "your_filename_column_name"
    }
)
Hath answered 26/10, 2023 at 21:16 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.