Does Apache Beam support custom file names for its output?
Asked Answered
L

4

5

While in a distributed processing environment it is common to use "part" file names such as "part-000", is it possible to write an extension of some sort to rename the individual output file names (such as a per window file name) of Apache Beam?

To do this, one might have to be able to assign a name for a window or infer a file name based on the window's content. I would like to know if such an approach is possible.

As to whether the solution should be streaming or batch, a streaming mode example is preferable

Luxuriate answered 9/10, 2017 at 3:29 Comment(0)
A
7

Yes as suggested by jkff you can achieve this using TextIO.write.to(FilenamePolicy).

Examples are below:

If you want to write output to particular local file you can use:

lines.apply(TextIO.write().to("/path/to/file.txt"));

Below is the simple way to write the output using the prefix, link. This example is for google storage, instead of this you can use local/s3 paths.

public class MinimalWordCountJava8 {

  public static void main(String[] args) {
    PipelineOptions options = PipelineOptionsFactory.create();
    // In order to run your pipeline, you need to make following runner specific changes:
    //
    // CHANGE 1/3: Select a Beam runner, such as BlockingDataflowRunner
    // or FlinkRunner.
    // CHANGE 2/3: Specify runner-required options.
    // For BlockingDataflowRunner, set project and temp location as follows:
    //   DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
    //   dataflowOptions.setRunner(BlockingDataflowRunner.class);
    //   dataflowOptions.setProject("SET_YOUR_PROJECT_ID_HERE");
    //   dataflowOptions.setTempLocation("gs://SET_YOUR_BUCKET_NAME_HERE/AND_TEMP_DIRECTORY");
    // For FlinkRunner, set the runner as follows. See {@code FlinkPipelineOptions}
    // for more details.
    //   options.as(FlinkPipelineOptions.class)
    //      .setRunner(FlinkRunner.class);

    Pipeline p = Pipeline.create(options);

    p.apply(TextIO.read().from("gs://apache-beam-samples/shakespeare/*"))
     .apply(FlatMapElements
         .into(TypeDescriptors.strings())
         .via((String word) -> Arrays.asList(word.split("[^\\p{L}]+"))))
     .apply(Filter.by((String word) -> !word.isEmpty()))
     .apply(Count.<String>perElement())
     .apply(MapElements
         .into(TypeDescriptors.strings())
         .via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue()))
     // CHANGE 3/3: The Google Cloud Storage path is required for outputting the results to.
     .apply(TextIO.write().to("gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX"));

    p.run().waitUntilFinish();
  }
}

This example code will give you more control on writing the output:

 /**
   * A {@link FilenamePolicy} produces a base file name for a write based on metadata about the data
   * being written. This always includes the shard number and the total number of shards. For
   * windowed writes, it also includes the window and pane index (a sequence number assigned to each
   * trigger firing).
   */
  protected static class PerWindowFiles extends FilenamePolicy {

    private final ResourceId prefix;

    public PerWindowFiles(ResourceId prefix) {
      this.prefix = prefix;
    }

    public String filenamePrefixForWindow(IntervalWindow window) {
      String filePrefix = prefix.isDirectory() ? "" : prefix.getFilename();
      return String.format(
          "%s-%s-%s", filePrefix, formatter.print(window.start()), formatter.print(window.end()));
    }

    @Override
    public ResourceId windowedFilename(int shardNumber,
                                       int numShards,
                                       BoundedWindow window,
                                       PaneInfo paneInfo,
                                       OutputFileHints outputFileHints) {
      IntervalWindow intervalWindow = (IntervalWindow) window;
      String filename =
          String.format(
              "%s-%s-of-%s%s",
              filenamePrefixForWindow(intervalWindow),
              shardNumber,
              numShards,
              outputFileHints.getSuggestedFilenameSuffix());
      return prefix.getCurrentDirectory().resolve(filename, StandardResolveOptions.RESOLVE_FILE);
    }

    @Override
    public ResourceId unwindowedFilename(
        int shardNumber, int numShards, OutputFileHints outputFileHints) {
      throw new UnsupportedOperationException("Unsupported.");
    }
  }

  @Override
  public PDone expand(PCollection<InputT> teamAndScore) {
    if (windowed) {
      teamAndScore
          .apply("ConvertToRow", ParDo.of(new BuildRowFn()))
          .apply(new WriteToText.WriteOneFilePerWindow(filenamePrefix));
    } else {
      teamAndScore
          .apply("ConvertToRow", ParDo.of(new BuildRowFn()))
          .apply(TextIO.write().to(filenamePrefix));
    }
    return PDone.in(teamAndScore.getPipeline());
  }
Argentic answered 11/10, 2017 at 17:4 Comment(3)
Thanks! Looks like this isn't in the release 2.1.0. And 2.2.0 isn't out yet:issues.apache.org/jira/projects/BEAM/versions/12341044Luxuriate
I have tested this out with beam version 2.1.0 in local environment which works perfectly fine.Argentic
As per 2.1.0 documentation this is available in it: beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/…Argentic
B
1

Yes. Per documentation of TextIO:

If you want better control over how filenames are generated than the default policy allows, a custom FilenamePolicy can also be set using TextIO.Write.to(FilenamePolicy)

Bombacaceous answered 9/10, 2017 at 16:8 Comment(2)
Could you please give some sample code? When trying this approach out I got a ClassCastException...Luxuriate
Please include your code and the complete stack trace of the error you got.Bombacaceous
C
0

This is perfectly valid example with beam 2.1.0. You can call on your data (PCollection e.g)

import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.transforms.display.DisplayData;

@SuppressWarnings("serial")
public class FilePolicyExample {

    public static void main(String[] args) {
        FilenamePolicy policy = new WindowedFilenamePolicy("somePrefix");

        //data 
        data.apply(TextIO.write().to("your_DIRECTORY")
            .withFilenamePolicy(policy)
            .withWindowedWrites()
            .withNumShards(4));

    }

    private static class WindowedFilenamePolicy extends FilenamePolicy {

        final String outputFilePrefix;

        WindowedFilenamePolicy(String outputFilePrefix) {
            this.outputFilePrefix = outputFilePrefix;
        }

        @Override
        public ResourceId windowedFilename(
                ResourceId outputDirectory, WindowedContext input, String extension) {
            String filename = String.format(
                    "%s-%s-%s-of-%s-pane-%s%s%s",
                    outputFilePrefix,
                    input.getWindow(),
                    input.getShardNumber(),
                    input.getNumShards() - 1,
                    input.getPaneInfo().getIndex(),
                    input.getPaneInfo().isLast() ? "-final" : "",
                    extension);
            return outputDirectory.resolve(filename, StandardResolveOptions.RESOLVE_FILE);
        }

        @Override
        public ResourceId unwindowedFilename(
                ResourceId outputDirectory, Context input, String extension) {
            throw new UnsupportedOperationException("Expecting windowed outputs only");
        }

        @Override
        public void populateDisplayData(DisplayData.Builder builder) {
            builder.add(DisplayData.item("fileNamePrefix", outputFilePrefix)
                    .withLabel("File Name Prefix"));
        }
    }
}
Colleencollege answered 18/10, 2017 at 4:3 Comment(0)
B
0

You can check https://beam.apache.org/releases/javadoc/2.3.0/org/apache/beam/sdk/io/FileIO.html for more information, you should search "File naming" in "Writing files".

.apply(
                        FileIO.<RootElement>write()
                                .via(XmlIO
                                        .sink(RootElement.class)
                                        .withRootElement(ROOT_XML_ELEMENT)
                                        .withCharset(StandardCharsets.UTF_8))
                                .to(FILE_PATH)
                                .withNaming((window, pane, numShards, shardIndex, compression) -> NEW_FILE_NAME)
Baras answered 25/5, 2022 at 9:17 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.