Dataflow/apache beam - how to access current filename when passing in pattern?
Asked Answered
T

2

9

I have seen this question answered before on stack overflow (https://mcmap.net/q/1172015/-how-to-get-filename-when-using-file-pattern-match-in-google-cloud-dataflow), but not since apache beam has added splittable dofn functionality for python. How would I access the filename of the current file being processed when passing in a file pattern to a gcs bucket?

I want to pass the filename into my transform function:

with beam.Pipeline(options=pipeline_options) as p:                              
    lines = p | ReadFromText('gs://url to file')                                        


    data = (                                                                    
        lines                                                                   
        | 'Jsonify' >> beam.Map(jsonify)                                        
        | 'Unnest' >> beam.FlatMap(unnest)                                      
        | 'Write to BQ' >> beam.io.Write(beam.io.BigQuerySink(                  
            'project_id:dataset_id.table_name', schema=schema,                     
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,    
            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)       
        )                                                   

Ultimately, what I want to do is pass the filename into my transform function when I transform each row of the json (see this and then use the filename to do a lookup in a different BQ table to get a value). I think once I manage to know how to get the filename I will be able to figure out the side input part in order to do the lookup in the bq table and get the unique value.

Tops answered 21/11, 2018 at 2:42 Comment(5)
You can take a look at this (mind the solution in the question too)Bowes
@GuillemXercavins your example is reading an entire file as a string right? If I am reading a newline delimited json like in this post and want to use the filename and look up something in another bq table do you have any suggestions as to how to do thatTops
don't see how exactly it is BQ related - but if you insist :o) ...Hoy
@MikhailBerlyant I want to take in side inputs from BQ when I can get the file name :/Tops
obviously BQ is part of your overall project but I still don't see how BQ related to your particular issue!Hoy
B
13

I tried to implement a solution with the previously cited case. There, as well as in other approaches such as this one they also get a list of file names but load all the file into a single element which might not scale well with large files. Therefore, I looked into adding the filename to each record.

As input I used two csv files:

$ gsutil cat gs://$BUCKET/countries1.csv
id,country
1,sweden
2,spain

gsutil cat gs://$BUCKET/countries2.csv
id,country
3,italy
4,france

Using GCSFileSystem.match we can access metadata_list to retrieve FileMetadata containing the file path and size in bytes. In my example:

[FileMetadata(gs://BUCKET_NAME/countries1.csv, 29),
 FileMetadata(gs://BUCKET_NAME/countries2.csv, 29)]

The code is:

result = [m.metadata_list for m in gcs.match(['gs://{}/countries*'.format(BUCKET)])]

We will read each of the matching files into a different PCollection. As we don't know the number of files a priori we need to create programmatically a list of names for each PCollection (p0, p1, ..., pN-1) and ensure that we have unique labels for each step ('Read file 0', 'Read file 1', etc.):

variables = ['p{}'.format(i) for i in range(len(result))]
read_labels = ['Read file {}'.format(i) for i in range(len(result))]
add_filename_labels = ['Add filename {}'.format(i) for i in range(len(result))]

Then we proceed to read each different file into its corresponding PCollection with ReadFromText and then we call the AddFilenamesFn ParDo to associate each record with the filename.

for i in range(len(result)):   
  globals()[variables[i]] = p | read_labels[i] >> ReadFromText(result[i].path) | add_filename_labels[i] >> beam.ParDo(AddFilenamesFn(), result[i].path)

where AddFilenamesFn is:

class AddFilenamesFn(beam.DoFn):
    """ParDo to output a dict with filename and row"""
    def process(self, element, file_path):
        file_name = file_path.split("/")[-1]
        yield {'filename':file_name, 'row':element}

My first approach was using a Map function directly which results in simpler code. However, result[i].path was resolved at the end of the loop and each record was incorrectly mapped to the last file of the list:

globals()[variables[i]] = p | read_labels[i] >> ReadFromText(result[i].path) | add_filename_labels[i] >> beam.Map(lambda elem: (result[i].path, elem))

Finally, we flatten all the PCollections into one:

merged = [globals()[variables[i]] for i in range(len(result))] | 'Flatten PCollections' >> beam.Flatten()

and we check the results by logging the elements:

INFO:root:{'filename': u'countries2.csv', 'row': u'id,country'}
INFO:root:{'filename': u'countries2.csv', 'row': u'3,italy'}
INFO:root:{'filename': u'countries2.csv', 'row': u'4,france'}
INFO:root:{'filename': u'countries1.csv', 'row': u'id,country'}
INFO:root:{'filename': u'countries1.csv', 'row': u'1,sweden'}
INFO:root:{'filename': u'countries1.csv', 'row': u'2,spain'}

I tested this with both DirectRunner and DataflowRunner for Python SDK 2.8.0.

I hope this addresses the main issue here and you can continue by integrating BigQuery into your full use case now. You might need to use the Python Client Library for that, I wrote a similar Java example.

Full code:

import argparse, logging
from operator import add

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io import ReadFromText
from apache_beam.io.filesystem import FileMetadata
from apache_beam.io.filesystem import FileSystem
from apache_beam.io.gcp.gcsfilesystem import GCSFileSystem

class GCSFileReader:
  """Helper class to read gcs files"""
  def __init__(self, gcs):
      self.gcs = gcs

class AddFilenamesFn(beam.DoFn):
    """ParDo to output a dict with filename and row"""
    def process(self, element, file_path):
        file_name = file_path.split("/")[-1]
        # yield (file_name, element) # use this to return a tuple instead
        yield {'filename':file_name, 'row':element}

# just logging output to visualize results
def write_res(element):
  logging.info(element)
  return element

def run(argv=None):
  parser = argparse.ArgumentParser()
  known_args, pipeline_args = parser.parse_known_args(argv)

  p = beam.Pipeline(options=PipelineOptions(pipeline_args))
  gcs = GCSFileSystem(PipelineOptions(pipeline_args))
  gcs_reader = GCSFileReader(gcs)

  # in my case I am looking for files that start with 'countries'
  BUCKET='BUCKET_NAME'
  result = [m.metadata_list for m in gcs.match(['gs://{}/countries*'.format(BUCKET)])]
  result = reduce(add, result)

  # create each input PCollection name and unique step labels
  variables = ['p{}'.format(i) for i in range(len(result))]
  read_labels = ['Read file {}'.format(i) for i in range(len(result))]
  add_filename_labels = ['Add filename {}'.format(i) for i in range(len(result))]

  # load each input file into a separate PCollection and add filename to each row
  for i in range(len(result)):
    # globals()[variables[i]] = p | read_labels[i] >> ReadFromText(result[i].path) | add_filename_labels[i] >> beam.Map(lambda elem: (result[i].path, elem))
    globals()[variables[i]] = p | read_labels[i] >> ReadFromText(result[i].path) | add_filename_labels[i] >> beam.ParDo(AddFilenamesFn(), result[i].path)

  # flatten all PCollections into a single one
  merged = [globals()[variables[i]] for i in range(len(result))] | 'Flatten PCollections' >> beam.Flatten() | 'Write results' >> beam.Map(write_res)

  p.run()

if __name__ == '__main__':
  run()
Bowes answered 24/11, 2018 at 15:33 Comment(5)
Thanks so much, this worked! It works if I merge the pcollections after and it also works if i do for I in range(len(result)): p | ... and then write to big query in the for loop. Is there a benefit to merging and then writing to big query all together?Tops
Glad to be of help. I flattened the PCollections together in case we want to apply some operations across data from different files (such as a GroupByKey, etc.) while still keeping track of its origin. In terms of performance, I think it could also help to evenly distribute the load among workers if the GCS files can have very disparate sizes but that would depend on the exact data and full use case. Feel free to omit the flatten step if there is no actual need for it -process each file separately- and the pipeline works well without it.Bowes
Thank you for the great answer @GuillemXercavins, this works. I was wondering though, how would this perform if we have, say 30 000 files. It will load 30 000 file names in memory, and assuming that is not a bottleneck, we will then create 30 000 PTransforms(with distinct labels) to add filenames before doing other PTransforms. I don't know much about dataflow, but I wouldn't want to find out by running that on the cloud and get surprising bills. What is your opinion? (Also cannot imagine the label graph on the gui)Quantic
Interesting observation. I tested it with 30k files using the following script. You'll already face an issue when building the pipeline execution graph locally (>1 hr, job not sent yet) and might exceed the 10MB request limit to the Dataflow API. I would use this example for a low number of large files and the alternatives I cited in my answer for many small files. If your use case involves many large files I'd try to convert it to one of the other manageable situations.Bowes
Thank you great answer @Guillem Xercavins. This works great. Can you please help extending this to enter each file metadata in to an audit file and then delete with in the loop. In fact I am not merging the pcollections instead writing to BQ within the loop. Idea is to continue with the loop even if one file fails (and move the failed file to another bucket)Cenobite
J
6

I had to read some metadata files and use the filename for further processing. I struggled when I finally came across apache_beam.io.ReadFromTextWithFilename

def run(argv=None, save_main_session=True):
    import typing
    import apache_beam as beam
    from apache_beam.options.pipeline_options import PipelineOptions
    from apache_beam.io import ReadFromTextWithFilename
  
                
    class ExtractMetaData(beam.DoFn):
        def process(self, element):
            filename, meta = element
            image_name = filename.split("/")[-2]
            labels = json.loads(meta)["labels"]
            image = {"image_name": image_name, "labels": labels}
            print(image)
            return image

    parser = argparse.ArgumentParser()
    known_args, pipeline_args = parser.parse_known_args(argv)
    pipeline_options = PipelineOptions(pipeline_args)
    
  
    with beam.Pipeline(options=pipeline_options) as pipeline:
        meta = (
            pipeline
            | "Read Metadata" >> ReadFromTextWithFilename(f'gs://{BUCKET}/dev-set/**/*metadata.json')
            | beam.ParDo(ExtractMetaData())


        )
 
    pipeline.run()
Jeffcott answered 20/1, 2021 at 23:52 Comment(1)
Note: This solution does not seem to scale very well. I ran it on a folder containing subfolders with the JSON files in the subfolders. Running it on the full dataset of >40.000 subfolders, the job did not appear in Dataflow, even after running for hours. It suggests that ReadFromTextWithFilename is iterating over and reading the files before even creating the job. This means that the job is not spread across multiple workers. I'll try to put the JSON files in one single folder, and alter the pipeline to improve performance.Jeffcott

© 2022 - 2024 — McMap. All rights reserved.