Read Files from multiple folders in Apache Beam and map outputs to filenames
Asked Answered
B

1

10

Working on reading files from multiple folders and then output the file contents with the file name like (filecontents, filename) to bigquery in apache beam using the python sdk and a dataflow runner.

Originally thought I could create A pcollection for each file then map the file contents with the filename.

def read_documents(pipeline):
  """Read the documents at the provided uris and returns (uri, line) pairs."""
  pcolls = []
  count = 0
  with open(TESTIN) as uris:
       for uri in uris:
    #print str(uri).strip("[]/'")
         pcolls.append(
         pipeline
         | 'Read: uri' + str(uri)  >>ReadFromText(str(uri).strip("[]/'"), compression_type = 'gzip')
         | 'WithKey: uri'  + str(uri)   >> beam.Map(lambda v, uri: (v, str(uri).strip("[]")), uri) 
         )
       return pcolls | 'FlattenReadPColls' >> beam.Flatten()

This worked fine but was slow and wouldn't work on dataflow cloud after about 10000 files. It would suffer from a broken pipe if over 10000 or so files.

Currently trying to overload the ReadAllFromText function from Text.io. Text.io is designed to read tons of files quickly from a pcollection of filenames or patterns. There is a bug in this module if reading from Google cloud storage and the file has content encoding. Google Cloud storage automatically gunzips files and transcodes them but for some reason ReadAllFromText doesn't work with it. You have to change the metadata of the file to remove content encoding and set the compression type on ReadAllFromText to gzip. I'm including this issue url in case anyone else has problems with ReadAllFromText https://issues.apache.org/jira/browse/BEAM-1874

My current code looks like this

class ReadFromGs(ReadAllFromText):

    def __init__(self):
        super(ReadFromGs, self).__init__(compression_type="gzip")

    def expand(self, pvalue):
        files = self._read_all_files
        return (
            pvalue          
            | 'ReadAllFiles' >> files #self._read_all_files
            | 'Map values' >>  beam.Map( lambda v: (v, filename)) # filename is a placeholder for the input filename that im trying to figure out how to include in the output.
            )

ReadAllFromText is contained in Text.io and calls ReadAllText from filebasedsource.py and inherits from PTransform.

I believe i'm just missing something simple missing.

https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filebasedsource.py

https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/textio.py

Bumbailiff answered 20/7, 2018 at 0:58 Comment(0)
T
2

As you found, ReadFromText doesn't currently support dynamic filenames and you definitely don't want to create individual steps for the each URL. From your initial sentence I understand you want get the filename and the file content out as one item. That means you won't need or benefit from any streaming of parts of the file. You can simply read the file contents. Something like:

import apache_beam as beam
from apache_beam.io.filesystems import FileSystems


def read_all_from_url(url):
    with FileSystems.open(url) as f:
        return f.read()


def read_from_urls(pipeline, urls):
    return (
        pipeline
        | beam.Create(urls)
        | 'Read File' >> beam.Map(lambda url: (
            url,
            read_all_from_url(url)
        ))
    )

You can customise it if you think you're having issues with metadata. The output will be a tuple (url, file contents). If your file contents is very large you might need a slightly different approach depending on your use case.

Tephra answered 27/10, 2018 at 12:5 Comment(2)
I don't remember what i did right now but i believe i tried something like this initially and it caused a memory error or something like that. I'll try this way and see how it goes.. i ended up putting the file names and locations into bigquery and working on it that wayBumbailiff
Using beam.Create, the urls still need to fit into memory, and the whole list is serialised, sent to the cloud and deserialised. If it is indeed a long list, then it makes sense to keep it in the cloud, e.g. a text file in google storage (but can also be BigQuery) - anything really that emits the urls. You will also want to prevent fusion if you want to run it in parallel, but that's another question.Tephra

© 2022 - 2024 — McMap. All rights reserved.