Writing to text files in Apache Beam / Dataflow Python streaming
Asked Answered
A

2

6

I have a very basic Python Dataflow job that reads some data from Pub/Sub, applies a FixedWindow and writes to Google Cloud Storage.

transformed = ...
transformed | beam.io.WriteToText(known_args.output)

The output is written to the location specific in --output, but only the temporary stage, i.e.

gs://MY_BUCKET/MY_DIR/beam-temp-2a5c0e1eec1c11e8b98342010a800004/...some_UUID...

The file never gets placed into the correctly named location with the sharding template.

Tested on local and DataFlow runner.


When testing further, I have noticed that the streaming_wordcount example has the same issues, however the standard wordcount example writes fine. Perhaps the issues is to with windowing, or reading from pubsub?


It appears WriteToText is not compatible with the streaming source of PubSub. There are likely workarounds, or the Java version may be compatible, but I have opted to use a different solution altogether.

Arachnoid answered 19/11, 2018 at 17:10 Comment(3)
can you please post the code?Pumphrey
Same problem here. Have you found a solution ? I tried playing with triggers but it not related to triggers. When I call "Drain" on my Dataflow job, data is written in the correct folder.Goodman
Try fileio.WriteToFiles.Lanie
L
11

The WriteToText transform in the Python SDK does not support streaming.

Instead, you may consider the transforms in apache_beam.io.fileio. In this case, you can write something like this (let's suppose 10-minute windows):

my_pcollection = (p | ReadFromPubSub(....)
                    |  WindowInto(FixedWindows(10*60))
                    |  fileio.WriteToFiles(path=known_args.output))

This is enough to write out separate files for each window, and continue to do it as the stream advances.

You'd see files like this (let's suppose output is gs://mybucket/). The files would be printed as the windows get triggered:

gs://mybucket/output-1970-01-01T00_00_00-1970-01-01T00_10_00-0000-00002
gs://mybucket/output-1970-01-01T00_00_00-1970-01-01T00_10_00-0001-00002
gs://mybucket/output-1970-01-01T00_10_00-1970-01-01T00_20_00-0000-00002
gs://mybucket/output-1970-01-01T00_10_00-1970-01-01T00_20_00-0001-00002
...

The files, by default have $prefix-$start-$end-$pane-$shard-of-$numShards$suffix$compressionSuffix names - where prefix is output by default, but you can pass a more complex function for file naming.


If you want to customize how the files get written (e.g. naming of the files, or format of the data, or anything like that), you can look at the extra arguments in WriteToFiles.

You can see an example here of the transform being used in a Beam test, with more complex arguments - but it sounds like the default behavior should be enough for you.

Lanie answered 20/12, 2019 at 6:31 Comment(4)
Thanks, I try to figure out why I cannot use io.fileio in my code. There may be some version issue, I use Python 3.7 and SDK 2.16.0 but fileio just isn't there. This post tells me it is renamed but the filesystem module has no WriteToText #46787928Loella
let me know if that works, or if you need assistance finding the right set of parameters.Lanie
Regarding the window size: What's the timeestamp used for windowing?Kimkimball
Is there a way to limit the size of part files generated when using WriteToText ?Brinson
B
0

Python streaming pipeline execution is experimentally available (with some limitations).

Unsupported features apply to all runners. State and Timers APIs, Custom source API, Splittable DoFn API, Handling of late data, User-defined custom WindowFn.

Additionally, DataflowRunner does not currently support the following Cloud Dataflow specific features with Python streaming execution.

Streaming autoscaling Updating existing pipelines Cloud Dataflow Templates Some monitoring features, such as msec counters, display data, metrics, and element counts for transforms. However, logging, watermarks, and element counts for sources are supported.

https://beam.apache.org/documentation/sdks/python-streaming/

As you're using FixedWindowFn and the pipeline was able to writing the data into tmp location, please recheck the output location --output gs://<your-gcs-bucket>/<you-gcs-folder>/<your-gcs-output-filename>

Blades answered 6/12, 2018 at 21:42 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.