Working on reading files from multiple folders and then output the file contents with the file name like (filecontents, filename) to bigquery in apache beam using the python sdk and a dataflow runner.
Originally thought I could create A pcollection for each file then map the file contents with the filename.
def read_documents(pipeline):
"""Read the documents at the provided uris and returns (uri, line) pairs."""
pcolls = []
count = 0
with open(TESTIN) as uris:
for uri in uris:
#print str(uri).strip("[]/'")
pcolls.append(
pipeline
| 'Read: uri' + str(uri) >>ReadFromText(str(uri).strip("[]/'"), compression_type = 'gzip')
| 'WithKey: uri' + str(uri) >> beam.Map(lambda v, uri: (v, str(uri).strip("[]")), uri)
)
return pcolls | 'FlattenReadPColls' >> beam.Flatten()
This worked fine but was slow and wouldn't work on dataflow cloud after about 10000 files. It would suffer from a broken pipe if over 10000 or so files.
Currently trying to overload the ReadAllFromText function from Text.io. Text.io is designed to read tons of files quickly from a pcollection of filenames or patterns. There is a bug in this module if reading from Google cloud storage and the file has content encoding. Google Cloud storage automatically gunzips files and transcodes them but for some reason ReadAllFromText doesn't work with it. You have to change the metadata of the file to remove content encoding and set the compression type on ReadAllFromText to gzip. I'm including this issue url in case anyone else has problems with ReadAllFromText https://issues.apache.org/jira/browse/BEAM-1874
My current code looks like this
class ReadFromGs(ReadAllFromText):
def __init__(self):
super(ReadFromGs, self).__init__(compression_type="gzip")
def expand(self, pvalue):
files = self._read_all_files
return (
pvalue
| 'ReadAllFiles' >> files #self._read_all_files
| 'Map values' >> beam.Map( lambda v: (v, filename)) # filename is a placeholder for the input filename that im trying to figure out how to include in the output.
)
ReadAllFromText is contained in Text.io and calls ReadAllText from filebasedsource.py and inherits from PTransform.
I believe i'm just missing something simple missing.
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filebasedsource.py
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/textio.py