How to Get Filename when using file pattern match in google-cloud-dataflow
Asked Answered
Z

6

5

Someone know how to get Filename when using file pattern match in google-cloud-dataflow?

I'm newbee to use dataflow. How to get filename when use file patten match, in this way.

p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/*.txt"))

I'd like to how I detect filename that kinglear.txt,Hamlet.txt, etc.

Zechariah answered 1/5, 2015 at 8:13 Comment(0)
T
10

If you would like to simply expand the filepattern and get a list of filenames matching it, you can use GcsIoChannelFactory.match("gs://dataflow-samples/shakespeare/*.txt") (see GcsIoChannelFactory).

If you would like to access the "current filename" from inside one of the DoFn's downstream in your pipeline - that is currently not supported (though there are some workarounds - see below). It is a common feature request and we are still thinking how best to fit it into the framework in a natural, generic and high-performant way.

Some workarounds include:

  • Writing a pipeline like this (the tf-idf example uses this approach):
    DoFn readFile = ...(takes a filename, reads the file and produces records)...
    p.apply(Create.of(filenames))
     .apply(ParDo.of(readFile))
     .apply(the rest of your pipeline)

This has the downside that dynamic work rebalancing features won't work particularly well, because they currently apply at the level of Read PTransform's only, but not at the level of ParDo's with high fan-out (like the one here, which would read a file and produce all records); and parallelization will only work to the level of files but files will not be split into sub-ranges. At the scale of reading Shakespeare this is not an issue, but if you are reading a set of files of wildly different size, some extremely large, then it may become an issue.

  • Implementing your own FileBasedSource (javadoc, general documentation) which would return records of type something like Pair<String, T> where the String is the filename and the T is the record you're reading. In this case the framework would handle the filepattern matching for you, dynamic work rebalancing would work just fine, however it is up to you to write the reading logic in your FileBasedReader.

Both of these work-arounds are non-ideal, but depending on your requirements, one of them may do the trick for you.

Throstle answered 1/5, 2015 at 17:5 Comment(6)
@jkff: does this mean we also lose TextIO's automatic compression detection based on filename extension?Refute
@Refute Unfortunately, yes - in both cases. We're considering some fundamental improvements in the way sources and sinks are handled, which would solve this whole class of problems and more, but there's no timeline yet.Throstle
@Throstle has this featured been implemented yet in Java and/or Python?Readjust
Not implemented yet, but it will be implemented as part of issues.apache.org/jira/browse/BEAM-65 which is currently in active development.Throstle
@Throstle do you know what I would do for this problem, sort of similar but not sure if with new releases things changed #53405079Intercourse
At this point the best way to solve this problem is to use FileIO - specifically FileIO.match/matchAll, FileIO.read, and a ParDo using regular Java facilities to parse lines from the result of FileIO.read.Throstle
G
2

Update based on latest SDK Java (sdk 2.9.0):

Beams TextIO readers do not give access to the filename itself, for these use cases we need to make use of FileIO to match the files and gain access to the information stored in the file name. Unlike TextIO, the reading of the file needs to be taken care of by the user in transforms downstream of the FileIO read. The results of a FileIO read is a PCollection the ReadableFile class contains the file name as metadata which can be used along with the contents of the file.

FileIO does have a convenience method readFullyAsUTF8String() which will read the entire file into a String object, this will read the whole file into memory first. If memory is a concern you can work directly with the file with utility classes like FileSystems.

From: Document Link

PCollection<KV<String, String>> filesAndContents = p
     .apply(FileIO.match().filepattern("hdfs://path/to/*.gz"))
     // withCompression can be omitted - by default compression is detected from the filename.
     .apply(FileIO.readMatches().withCompression(GZIP))
     .apply(MapElements
         // uses imports from TypeDescriptors
         .into(KVs(strings(), strings()))
         .via((ReadableFile f) -> KV.of(
             f.getMetadata().resourceId().toString(), f.readFullyAsUTF8String())));

Python (sdk 2.9.0):

For 2.9.0 for python you will need to collect the list of URI from outside of the Dataflow pipeline and feed it in as a parameter to the pipeline. For example making use of FileSystems to read in the list of files via a Glob pattern and then passing that to a PCollection for processing.

Once fileio see PR https://github.com/apache/beam/pull/7791/ is available, the following code would also be an option for python.

import apache_beam as beam
from apache_beam.io import fileio

with beam.Pipeline() as p:
  readable_files = (p 
                    | fileio.MatchFiles(‘hdfs://path/to/*.txt’)
                    | fileio.ReadMatches()
                    | beam.Reshuffle())
  files_and_contents = (readable_files 
                        | beam.Map(lambda x: (x.metadata.path, 
                                              x.read_utf8()))
Gantline answered 29/1, 2019 at 5:5 Comment(0)
B
1

One approach is to build a List<PCollection> where each entry corresponds to an input file, then use Flatten. For example, if you want to parse each line of a collection of files into a Foo object, you might do something like this:

public static class FooParserFn extends DoFn<String, Foo> {
  private String fileName;
  public FooParserFn(String fileName) {
    this.fileName = fileName;
  }

  @Override
  public void processElement(ProcessContext processContext) throws Exception {
    String line = processContext.element();
    // here you have access to both the line of text and the name of the file
    // from which it came.
  }
}

public static void main(String[] args) {
  ...
  List<String> inputFiles = ...;
  List<PCollection<Foo>> foosByFile =
          Lists.transform(inputFiles,
          new Function<String, PCollection<Foo>>() {
            @Override
            public PCollection<Foo> apply(String fileName) {
              return p.apply(TextIO.Read.from(fileName))
                      .apply(new ParDo().of(new FooParserFn(fileName)));
            }
          });

  PCollection<Foo> foos = PCollectionList.<Foo>empty(p).and(foosByFile).apply(Flatten.<Foo>pCollections());
  ...
}

One downside of this approach is that, if you have 100 input files, you'll also have 100 nodes in the Cloud Dataflow monitoring console. This makes it hard to tell what's going on. I'd be interested in hearing from the Google Cloud Dataflow people whether this approach is efficient.

Bertilla answered 26/2, 2016 at 21:46 Comment(1)
do you know what I would do for this problem, sort of similar but not sure if with new releases things changed: #53405079Intercourse
C
1

I also had the 100 input files = 100 nodes on the dataflow diagram when using code similar to @danvk. I switched to an approach like this which resulted in all the reads being combined into a single block that you can expand to drill down into each file/directory that was read. The job also ran faster using this approach rather than the Lists.transform approach in our use case.

GcsOptions gcsOptions = options.as(GcsOptions.class);
List<GcsPath> paths = gcsOptions.getGcsUtil().expand(GcsPath.fromUri(options.getInputFile()));
List<String>filesToProcess = paths.stream().map(item -> item.toString()).collect(Collectors.toList());

PCollectionList<SomeClass> pcl = PCollectionList.empty(p);
for(String fileName : filesToProcess) {
    pcl = pcl.and(
            p.apply("ReadAvroFile" + fileName, AvroIO.Read.named("ReadFromAvro")
                    .from(fileName)
                    .withSchema(SomeClass.class)
            )
            .apply(ParDo.of(new MyDoFn(fileName)))
    );
}

// flatten the PCollectionList, combining all the PCollections together
PCollection<SomeClass> flattenedPCollection = pcl.apply(Flatten.pCollections());
Cooke answered 2/6, 2016 at 2:2 Comment(0)
B
0

This might be a very late post for the above question, but I wanted to add answer with Beam bundled classes.

This could also be seen as an extracted code from the solution provided by @Reza Rokni.

PCollection<String> listOfFilenames =
    pipe.apply(FileIO.match().filepattern("gs://apache-beam-samples/shakespeare/*"))
        .apply(FileIO.readMatches())
        .apply(
            MapElements.into(TypeDescriptors.strings())
                .via(
                    (FileIO.ReadableFile file) -> {
                      String f = file.getMetadata().resourceId().getFilename();
                      System.out.println(f);
                      return f;
                    }));

pipe.run().waitUntilFinish();

Above PCollection<String> will have a list of files available at any provided directory.

Buddie answered 2/7, 2019 at 15:1 Comment(0)
J
0

I was struggling with the same use case while using wildcard to read files from GCS but also needed to modify the collection based on the file name.The key is to use ReadFromTextWithFilename instead of readfromtext In java you already have a way out and you can use: String filename =context.element().getMetadata().resourceId().getCurrentDirectory().toString() inside your processElement method.

But for Python below technique will work: -> Use beam.io.ReadFromTextWithFilename for reading the wildcard path from GCS -> As per the document, ReadFromTextWithFilename returns the file's name and the file's content.

Below is the code snippet:

class GetFileNameFromWildcard(beam.DoFn):
def process(self, element, *args, **kwargs):
    file_path, content = element
    schema = ["id","name","mob","email","dept","store"]
    store_name = file_path.split("/")[-2]
    content_list = content.split(",")
    content_list.append(store_name)
    out_dict = dict(zip(schema,content_list))
    print(out_dict)
    yield out_dict


def run():
pipeline_options = PipelineOptions()
with beam.Pipeline(options=pipeline_options) as p:
    # saving main session so that it can load global namespace on the Cloud Dataflow Worker
    init = p | 'Begin Pipeline With Initiator' >> beam.Create(
        ["pcollection initializer"]) | 'Read From GCS' >> beam.io.ReadFromTextWithFilename(
        "gs://<bkt-name>/20220826/*/dlp*", skip_header_lines=1) | beam.ParDo(
        GetFileNameFromWildcard()) | beam.io.WriteToText(
        'df_out.csv')
    
Junejuneau answered 26/8, 2022 at 21:36 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.