How to write to a file name defined at runtime?
Asked Answered
Y

4

3

I want to write to a gs file but I don’t know the file name at compile time. Its name is based on behavior that is defined at runtime. How can I proceed?

Yehudi answered 30/1, 2018 at 11:3 Comment(4)
OK, do you know the folder, where files could be stored?Garlan
Are you using Beam Java or Python?Clemence
Java. Using standard TextIOCityscape
Let’s say the folder is gs://bucket/myfolder/Cityscape
C
7

If you're using Beam Java, you can use FileIO.writeDynamic() for this (starting with Beam 2.3 which is currently in the process of being released - but you can already use it via the version 2.3.0-SNAPSHOT), or the older DynamicDestinations API (available in Beam 2.2).

Example of using FileIO.writeDynamic() to write a PCollection of bank transactions to different paths on GCS depending on the transaction's type:

PCollection<BankTransaction> transactions = ...;
transactions.apply(
    FileIO.<BankTransaction, TransactionType>writeDynamic()
      .by(Transaction::getType)
      .via(BankTransaction::toString, TextIO.sink())
      .to("gs://bucket/myfolder/")
      .withNaming(type -> defaultNaming("transactions_", ".txt"));

For an example of DynamicDestinations use, see example code in the TextIO unit tests.

Alternatively, if you want to write each record to its own file, just use the FileSystems API (in particular, FileSystems.create()) from a DoFn.

Clemence answered 30/1, 2018 at 22:47 Comment(5)
Great. Thank you.Cityscape
@Clemence is there a performance between using FileSystems.create vs. writeDynamic?Officialdom
I haven't benchmarked them, but there's not much else going on in writeDynamic performance-wise except for what you'd already be doing with FileSystems.create. It does use FileSystems.create under the hood.Clemence
How does this work with Beam 2.6? The signatures and / or the functions seem to have changedDeli
If you are referring to the .via(___, ___) line, then it looks like it needs to wrap the 1st argument in a Contextful.fn(___)Dunsany
M
2

For the Python crowd:

An experimental write was added to the Beam python SDK in 2.14.0, beam.io.fileio.WriteToFiles:

my_pcollection | beam.io.fileio.WriteToFiles(
      path='/my/file/path',
      destination=lambda record: 'avro' if record['type'] == 'A' else 'csv',
      sink=lambda dest: AvroSink() if dest == 'avro' else CsvSink(),
      file_naming=beam.io.fileio.destination_prefix_naming())

which can be used to write to different files per-record.

If your filename is based on data within your pcollections, you can use the destination and file_naming to create files based on each record's data.

More documentation here:

https://beam.apache.org/releases/pydoc/2.14.0/apache_beam.io.fileio.html#dynamic-destinations

And the JIRA issue here:

https://issues.apache.org/jira/browse/BEAM-2857

Mameluke answered 18/8, 2019 at 8:18 Comment(0)
E
2

As @anrope mentioned already, apache_beam.io.fileio seems to be the latest Python API for writing files. The WordCount example is currently outdated since it uses the WriteToText class, which inherits from the now deprecated apache_beam.io.filebasedsink / apache_beam.io.iobase

To add to existing answers, here is my pipeline in which I dynamically name the output files during runtime. My pipeline takes N input files and creates N output files, which are named based on their corresponding input file name.

with beam.Pipeline(options=pipeline_options) as p:
    (p
        | 'CreateFiles' >> beam.Create(input_file_paths)
        | 'MatchFiles' >> MatchAll()
        | 'OpenFiles' >> ReadMatches()
        | 'LoadData' >> beam.Map(custom_data_loader)
        | 'Transform' >> beam.Map(custom_data_transform)
        | 'Write' >> custom_writer
    )

When I load the data I create a PCollection of tuple records (file_name, data). All my transforms are applied to data, but I pass file_name through to the end of the pipeline to generate the output file names.

def custom_data_loader(f: beam.io.fileio.ReadableFile):
    file_name = f.metadata.path.split('/')[-1]
    data = custom_read_function(f.open())
    return file_name, data

def custom_data_transform(record):
    file_name, data = record
    data = custom_transform_function(data)  # not defined
    return file_name, data

And I save the file with:

def file_naming(record):
    file_name, data = record
    file_name = custom_naming_function(file_name)  # not defined
    return file_name

def return_destination(*args):
    """Optional: Return only the last arg (destination) to avoid sharding name format"""
    return args[-1]

custom_writer = WriteToFiles(
    path='path/to/output',
    file_naming=return_destination,
    destination=file_naming,
    sink=TextSink()
)

Replace all of the custom_* functions with your own logic.

Exquisite answered 11/8, 2020 at 21:16 Comment(0)
I
1

I know this is a bit of an old question but I struggled with the examples in the documentation.

Here is a simple example of how to split files based on dict items.

pipeline_options = PipelineOptions()
pipeline_options.view_as(SetupOptions).save_main_session = False


def file_names(*args):
    file_name = fileio.destination_prefix_naming()(*args)
    destination, *_ = file_name.split("----")
    return f"{destination}.json"


class JsonSink(fileio.TextSink):
    def write(self, element):
        record = json.loads(element)
        record.pop("id")
        self._fh.write(json.dumps(record).encode("utf8"))
        self._fh.write("\n".encode("utf8"))


def destination(element):
    return json.loads(element)["id"]


with beam.Pipeline(options=pipeline_options) as p:

    data = [
        {"id": 0, "message": "whhhhhhyyyyyyy"},
        {"id": 1, "message": "world"},
        {"id": 1, "message": "hi there!"},
        {"id": 1, "message": "what's up!!!?!?!!?"},
    ]

    (
        p
        | "CreateEmails" >> beam.Create(data)
        | "JSONify" >> beam.Map(json.dumps)
        | "Write Files"
        >> fileio.WriteToFiles(
            path="path/",
            destination=destination,
            sink=lambda dest: JsonSink(),
            file_naming=file_names,
        )
    )
Impedance answered 14/2, 2020 at 22:46 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.