Why is input_file_name() empty for S3 catalog sources in pyspark?
Asked Answered
G

8

9

I'm trying to get the input file name (or path) for every file loaded through an S3 data catalog in AWS Glue.

I've read in a few places that input_file_name() should provide this information (though caveated that this only works when calling from_catalog and not from_options, which I believe I am!).

So the code below seems like it should work, but always returns empty values for every input_file_name.

import sys

from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql.functions import input_file_name


args = getResolvedOptions(sys.argv, ['JOB_NAME', 'TempDir'])
sc = SparkContext()
gc = GlueContext(sc)
spark = gc.spark_session


job = Job(gc)
job.init(args['JOB_NAME'], args)


# Get the source frame from the Glue Catalog, which describes files in S3
fm_source = gc.create_dynamic_frame.from_catalog(
    database='database_name',
    table_name='table_name',
    transformation_ctx='fm_source',
)

df_source = fm_source.toDF().withColumn('input_file_name', input_file_name())
df_source.show(5)

Resulting output:

+-------------+---------------+
|other_columns|input_file_name|
+-------------+---------------+
|           13|               |
|           33|               |
|           53|               |
|           73|               |
|           93|               |
+-------------+---------------+

Is there another way that I should be creating the frame that ensures input_file_name() is populated? I've now tried to build a source frame through create_dynamic_frame.from_catalog, create_dynamic_frame.from_options and getSource().getFrame(), but I get the same result of an empty input_file_name column for each.

Gillmore answered 28/6, 2019 at 16:58 Comment(8)
It might happen due to enabled groupFiles option (see docs.aws.amazon.com/glue/latest/dg/grouping-input-files.html). However, I'm not sure if it's possible to disable it for reading from a catalogFlaunt
@YuriyBondaruk thanks! If I am reading the docs correctly, it seems like it should be automatically disabled for fewer than 50,000 files (my test bucket has ~100). I can't see the syntax to disable the option, even without reading from a catalog, though. Does this also mean that if my catalog includes (automatic) partitions, single file operation calls like input_file_name() will never work?Gillmore
For anyone else looking, the relevant groupFiles setting is none when you wish to explicitly disable this functionality for inputs greater than 50,000 filesGillmore
I'm not sure if it's possible to disable this option if reading directly from catalog. I used it when I loaded data directly from s3 using create_dynamic_frame_from_options(...) functionFlaunt
Have you tried reading the data into data frame and then convert it to a dynamic frame?Shockproof
@WillCroft have you found a solution ? thanks!!Sneaker
@Sneaker apologies for the slow reply, but yes we found a "solution" albeit with major caveats. See my answer below!Gillmore
@WillCroft THANK YOU!!Sneaker
G
11

I believe this to be impossible when using the groupFiles option, given that behind the scenes Glue is concatenating files to create the optimal number of inputs. Thus, the concept of input_file_name does not make sense in this context, given the original file path is no longer a direct input.

However, the docs are slightly misleading in the sense that even for inputs fewer than 50,000 files, not explicitly disabling the option will trigger Glue to concatenate inputs depending on their file size. In our case, we have thousands of tiny input files (<1 MB) causing this behaviour.

You can easily verify this, by explicitly disabling the grouping (note this will have a severe performance impact for scenarios similar to ours:

ds_s3 = self.gc.getSource(
    connection_type='s3',
    paths=paths,
    groupFiles='none',
)
fm_s3 = ds_s3.getFrame()

Of course, it is better not to have to depend on the input state or context, and so we ended up writing an AWS Lambda triggered on S3 PUT which writes metadata (including the filename and path) into the file itself.

Gillmore answered 13/2, 2020 at 11:30 Comment(0)
H
17

I also add my experience, in my case I received an empty result because calling the cache() method.

For example:

import pyspark.sql.functions as F

df = spark.read.json("/my/folder/test.json")
df.cache()
df = df.withColumn("input_file_name", F.input_file_name())

df.show()

I receive

+-------------+---------------+
|other_columns|input_file_name|
+-------------+---------------+
|           13|               |
|           33|               |
|           53|               |
|           73|               |
|           93|               |
+-------------+---------------+

But if I remove the line df.cache() the column input_file_name shows correctly the input filenames.

A workaround might be calling F.input_file_name() before caching.

Hillinck answered 9/7, 2020 at 8:52 Comment(5)
Brilliant insight!Peterec
This alo worked for me. Consider using spark.catalog.clearCache() if you need to drop the cached table in an interactive spark session (i.e. Jupyter notebook)Gumm
Yes!! That was the issue, but why? should I then cache later?Kimura
After df.cache(), I cannot get anything from input_file_name. If I run df.unpersist(), it starts to work.Dingle
Once you cache the data into spark nodes, they are no more seen as the original general formatted files from the spark's perspective. When you unpersist, the execution plan starts from the first point again, that is reading from raw input files.Yungyunick
G
11

I believe this to be impossible when using the groupFiles option, given that behind the scenes Glue is concatenating files to create the optimal number of inputs. Thus, the concept of input_file_name does not make sense in this context, given the original file path is no longer a direct input.

However, the docs are slightly misleading in the sense that even for inputs fewer than 50,000 files, not explicitly disabling the option will trigger Glue to concatenate inputs depending on their file size. In our case, we have thousands of tiny input files (<1 MB) causing this behaviour.

You can easily verify this, by explicitly disabling the grouping (note this will have a severe performance impact for scenarios similar to ours:

ds_s3 = self.gc.getSource(
    connection_type='s3',
    paths=paths,
    groupFiles='none',
)
fm_s3 = ds_s3.getFrame()

Of course, it is better not to have to depend on the input state or context, and so we ended up writing an AWS Lambda triggered on S3 PUT which writes metadata (including the filename and path) into the file itself.

Gillmore answered 13/2, 2020 at 11:30 Comment(0)
K
4

I found another weird behavior. When I add a limit() before input_file_path id does not work either.

This code has empty filename

df_raw = (spark
          .read
          .option("delimiter", ";")
          .csv(filepath,header = "true",inferSchema=True)      
          .select("COL1","COL2")
          .limit(1000)
          )

df_transform = (df_raw
          .withColumn("filename", f.input_file_name())
          )

This code works

df_raw = (spark
          .read
          .option("delimiter", ";")
          .csv(filepath,header = "true",inferSchema=True)      
          .select("COL1","COL2")
          )

df_transform = (df_raw
          .withColumn("filename", f.input_file_name())
          .limit(1000)
          )

Tooked me some time to figure that out, because I tried to speed up debuging with reading only a few rows.

Kimura answered 10/5, 2021 at 8:10 Comment(0)
I
3

Sometimes, there are a few edge cases wherein this method, 'input_file_name()', does not work as expected with DynamicFrame. In such cases, you can try using 'attachFilename' option with DynamicFrame to retrieve file path for the records.

Here is an example:

== Code:

dyf6_ = glue_context.create_dynamic_frame.from_options(
    connection_type="s3",
    connection_options={
        "paths": ["s3://awsglue-datasets/examples/highly-partitioned-table/year=2021/month=12/"],
        "recurse":True,
        "groupFiles": "inPartition",
        "groupSize": "1048576"
    },
    format="json",
    format_options={
        "attachFilename": "filenameCol"
    }
)
dyf6_.toDF().show()
Ia answered 29/8, 2022 at 11:57 Comment(2)
Where is this attachFilename option documented?Veloz
This is what I wanted exactly. works like a charm!!!Crouch
N
3

This issue is important since some people might be using this feature on production. We did some investigation within the Data Team of MediaQX, and here are our findings. Kudos to @Leyla Helin Çetin and @Eren Sakarya.

Before the findings make sure that you are not using this input_file_name feature on Glue since it breaks when the file sizes are small. Make sure you add the necessary columns to your files, and do not depend on the path. If you are using pure spark you might not need to worry about this.

We checked all the responses to related posts, I will link them for a circular reading for the upcoming issue holders:

In this context, this is the most plausible solution, but in the glue context, it is not clear that it works perfectly or not and you might miss some glue features like bookmarks: How to append a new column with source name in the AWS Glue job?

There are several approaches showing a mapping function approach but they all use input_file_name function she they are not accurate for a production solution Using S3 folder structure as meta data in AWS Glue

Some people solved their issues by using job name but it is not plausible for different configurations: AWS Glue: How to add a column with the source filename in the output?

And in this thread, groupFiles option is discussed, in the AWS documentation it states that it is not working with parquet files but it works actually, and it is not a direct solution for this issues.

Another solution is to remove the dots from file names but it is not a solution in our case: Glue dynamic frame is not populating from s3 bucket

You can make the function work by setting this parameter: spark.conf.set("spark.sql.files.maxPartitionBytes", "1MB")

However, we think that it increases the reading time and it makes the input_file_name function to work properly. We think that the function is halted when the reading process is very fast. And please note that, the total size of the read files should be larger than the parameter you set, so it is not a perfect solution either. In addition to that, it will slow down the jobs since the default value is 128MB for this parameter.

I hope this answer helps, please shout out your findings so may be the AWS staff might use the answer while fixing this bug.

Cheers.

Narton answered 28/4, 2023 at 14:5 Comment(0)
F
1

I faced the same issues as Vzzarr. When I created the input_file_path column after calling cache on the dataframe, the file path was empty. But When I created the input_file_path before calling cache, it worked.

Fossick answered 19/12, 2020 at 16:56 Comment(0)
P
0

If all the above still couldn't work, add this..

spark.catalog.clearCache()

to clear all cache in the cluster.

Palaeobotany answered 13/6, 2022 at 5:13 Comment(0)
L
-1

This happened to me because I had a deprecated runtime of my databricks cluster (7.6) and I tried it on 7.3 LTS which worked. :)

Livable answered 2/3, 2022 at 11:10 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.