Watching for new files matching a filepattern in Apache Beam
Asked Answered
A

2

7

I have a directory on GCS or another supported filesystem to which new files are being written by an external process.

I would like to write an Apache Beam streaming pipeline that continuously watches this directory for new files and reads and processes each new file as it arrives. Is this possible?

Abel answered 19/12, 2017 at 23:5 Comment(0)
A
11

This is possible starting with Apache Beam 2.2.0. Several APIs support this use case:

If you're using TextIO or AvroIO, they support this explicitly via TextIO.read().watchForNewFiles() and the same on readAll(), for example:

PCollection<String> lines = p.apply(TextIO.read()
    .from("gs://path/to/files/*")
    .watchForNewFiles(
        // Check for new files every 30 seconds
        Duration.standardSeconds(30),
        // Never stop checking for new files
        Watch.Growth.<String>never()));

If you're using a different file format, you may use FileIO.match().continuously() and FileIO.matchAll().continuously() which support the same API, in combination with FileIO.readMatches().

The APIs support specifying how often to check for new files, and when to stop checking (supported conditions are e.g. "if no new output appears within a given time", "after observing N outputs", "after a given time since starting to check" and their combinations).

Note that right now this feature currently works only in the Direct runner and the Dataflow runner, and only in the Java SDK. In general, it will work in any runner that supports Splittable DoFn (see capability matrix).

Abel answered 19/12, 2017 at 23:5 Comment(14)
how reliable is this watch method ? Can it be configured in a way to have files being processed exactly once ? Do we still need gcs notification or gcs pubsub notification (both are atleast once and no SLA on how delayed can this signal be) ? Does having this feature obviates the need of pubsub and notification system ?Stylopodium
Also I am curious using this feature , how can we handle state i.e , if my streamingpipeline fails , or i deploy or swithc to new one , how do i tell what files have been processed . I have a gcs bucket which has data from last several months already processed but when deploying a new pipeline after draining old one , how can this feature help us knowing what has been processed and avoid reading all the past files that matched the pattern ?Stylopodium
For as long as your pipeline is running, this method will return every file (existing and incoming new ones) exactly once - in that sense it obviates GCS and pubsub notifications. If you cancel your pipeline and start a new instance of the same pipeline, it will see old files again - there's no way to carry over state between two unrelated pipelines. However, I believe if you use the Pipeline Update feature, it should work correctly and not have duplication.Abel
You can try to address duplication between unrelated pipelines by filtering the returned files using a ParDo. However, if you want something like "exclude files that were already fully processed by a previous instance of the pipeline", you'll need to define what "fully processed" means and use some sort of external storage, e.g. Cloud Bigtable, and have your pipeline explicitly write to it when things are "fully processed" and read from it before deciding to process them.Abel
Interesting , could you shed some light on how the update feature works , i believe it internally tries to store state and then resume , we would want to build something similar to support issues like if existing streaming pipeline failed due to some issue (quota, usercodeexception, gcs failures) . the pain point is we have millions of files , so reading them from BT to filter out also will consume quite a bit of time when readall probably lists millions of files.Stylopodium
I'm not very familiar with how pipeline update works, but at a high level, I believe it drains the currently running pipeline (stops consuming data and waits for it to quiesce), stops it without deleting stored state (e.g. buffered data, pending timers etc), and starts a new pipeline initialized with that state. Format and location of storage is unspecified and inaccessible. I think a few million bigtable lookups shouldn't take long - certainly comparable to time it takes to do a pipeline update anyway (a couple of minutes), or even to the time it takes to ask GCS to list a few million files.Abel
Let us continue this discussion in chat.Stylopodium
Seems like this api is not production ready and cannot scale to watching millions of files. I would suggest try using spark.readstream to watch over gcsStylopodium
This is indeed a new API and it hasn't yet been tested at the scale of millions of files on GCS - though such testing and respective improvements are planned in the nearest future. I'd be curious to know your experience with using spark.readstream over GCS for this amount of data.Abel
I found the likely bug that's happening at this scale: issues.apache.org/jira/browse/BEAM-3499 . I'm currently heavily distracted by some family issues but I'll try to get to it asap.Abel
The fix has been committed. I've tested that with the fix we can successfully watch 1M files in a single filepattern or distributed over 10,000 concurrently watched more granular filepatterns.Abel
so this should be in beam 2.3 ? Also can i watch ~500k/1M granular filepatterns ? are there any limits to scaling that way ? any advantages or disadvantage to either approach ?Stylopodium
Unfortunately by the time I was able to finish this fix, 2.3.0 was already cut and only accepting critical cherrypicks - so it'll be in 2.4.0 (but already available in 2.4.0-SNAPSHOT if you're willing to use a nightly snapshot version). Generally I've found that partitioning 1M files into 10000 filepatterns performs somewhat better than a single filepattern, probably because it's parallelized. I haven't tried with 500k filepatterns but I expect you might run into GCS API quota issues with that, issuing that many concurrent list() calls.Abel
@Abel I am using FileIO.match() filepattern to process files in GCS using Java SDK 2.24.0 in a streaming pipeline. Currently, if a file gets overwritten in GCS it is not being processed. How can I trigger the pipeline on object overwrites?Stig
T
4

To add to Eugene's excellent answer as well as the watchfornewfiles options there are a couple of other choices;

There are several options available to solve this requirement dependent on your latency requirements. As of SDK 2.9.0:

Option 1: Continuous read mode:

Java: FileIO , TextIO and several other IO sources support continuous reading of the source for new files.

FileIO class supports the ability to watch a single file pattern continuously. This example matches a single filepattern repeatedly every 30 seconds, continuously returns new matched files as an unbounded PCollection and stops if no new files appear for 1 hour.

 PCollection<Metadata> matches = p.apply(FileIO.match()
     .filepattern("...")
     .continuously(
       Duration.standardSeconds(30), afterTimeSinceNewOutput(Duration.standardHours(1))));

TextIO class supports streaming new file matching using the watchForNewFiles property.

PCollection<String> lines = p.apply(TextIO.read()
     .from("/local/path/to/files/*")
     .watchForNewFiles(
       // Check for new files every minute
       Duration.standardMinutes(1),
       // Stop watching the filepattern if no new files appear within an hour
       afterTimeSinceNewOutput(Duration.standardHours(1))));

It is important to note that the file list is not retained across restarts of the pipeline. To deal with that scenario, you can move the files either through a process downstream of the pipeline or as part of the pipeline itself. Another option would be to store processed file names in an external file and de-dupe the lists at the next transform.

Python: The continuously option is not available as of SDK 2.9.0 for python.

Option 2: Stream processing triggered from external source You can have a Beam pipeline running in stream mode, which has an unbounded source, for example PubSub. When new files arrive you can use an external to Beam process to detect the file arrival and then send a PubSub message which has a URI as payload to the file. In a DoFn which is preceded by the PubSub source you can then use that URI to process the file.

Java : Use an Unbounded Source IO ( PubSubIO, KafakIO, etc...)

Python: Use an UnBounded Source IO ( PubSubIO, etc...)

Option 3: Batch mode processing triggered from external source This approach, introduces latency over Option 1 & 2 as the pipeline needs to startup before processing can begin. Here you can have a triggering event from your source file system to schedule or immediately start a Dataflow process. This option is best suited for low frequency large file size updates.

Tetrahedron answered 6/2, 2019 at 0:10 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.