How can I use the AvroParquetWriter and write to S3 via the AmazonS3 api?
Asked Answered
R

2

7

I am currently using the code below to write parquet via Avro. This code writes it to a file system but I want to write to S3.

try {
    StopWatch sw = StopWatch.createStarted();
    Schema avroSchema = AvroSchemaBuilder.build("pojo", message.getTransformedMessage().get(0));
    final String parquetFile = "parquet/data.parquet";
    final Path path = new Path(parquetFile);

    ParquetWriter writer = AvroParquetWriter.<GenericData.Record>builder(path)
        .withSchema(avroSchema)
        .withConf(new org.apache.hadoop.conf.Configuration())
        .withCompressionCodec(CompressionCodecName.SNAPPY)
        .withWriteMode(Mode.OVERWRITE)//probably not good for prod. (overwrites files).
        .build();

    for (Map<String, Object> row : message.getTransformedMessage()) {
      StopWatch stopWatch = StopWatch.createStarted();
      final GenericRecord record = new GenericData.Record(avroSchema);
      row.forEach((k, v) -> {
        record.put(k, v);
      });
      writer.write(record);
    }
    //todo:  Write to S3.  We should probably write via the AWS objects.  This does not show that.
    //https://mcmap.net/q/266144/-how-to-generate-parquet-file-using-pure-java-including-date-amp-decimal-types-and-upload-to-s3-windows-no-hdfs
    writer.close();
    System.out.println("Total Time: " + sw);

  } catch (Exception e) {
    //do somethign here.  retryable?  non-retryable?  Wrap this excetion in one of these?
    transformedParquetMessage.getOriginalMessage().getMetaData().addException(e);
  }

This writes to a file fine, but how do I get it to stream it into the AmazonS3 api? I have found some code on the web using the Hadoop-aws jar, but that requires some Windows exe files to work and, of course, we want to avoid that. Currently I am using only:

 <dependency>
  <groupId>org.apache.avro</groupId>
  <artifactId>avro</artifactId>
  <version>1.9.2</version>
</dependency>
<dependency>
  <groupId>org.apache.parquet</groupId>
  <artifactId>parquet-avro</artifactId>
  <version>1.8.1</version>
</dependency>
<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-core</artifactId>
  <version>1.2.1</version>
</dependency>

So the question is, is there a way to intercept the output stream on the AvroParquetWriter so I can stream it to S3? The main reason I want to do this is for retries. S3 automagically retries up to 3 times. This would help us out a lot.

Rossy answered 25/2, 2020 at 16:59 Comment(5)
In this way maybe? Albeit it uses the libs/exes you would like to avoid :( :)Grandniece
Yes, that is close, but it has a dependency on Hadoop running on the server.Rossy
I am afraid this is not really feasible. If you have a look at the class source you can see that the superclass is org.apache.parquet.hadoop.ParquetWriter, so I would say (without digging too deep) that the existence of a configured hadoop is a requirement for this. It seems Spark can be a better candidate for this.Grandniece
Thanks for the response! I am really trying to overcome two things: the external dependency on hadoop and the S3 dependencies. If parquet is just a file format, why is it so hard to write out?!Rossy
You can find some answers to a similar question here - have a look at the linked Jira issues.Grandniece
C
8

This does depend on the hadoop-aws jar, so if you're not willing to use that I'm not sure I can help you. I am, however, running on a mac and do not have any windows exe files, so I'm not sure where you say those are coming from. The AvroParquetWriter already depends on Hadoop, so even if this extra dependency is unacceptable to you it may not be a big deal to others:

You can use an AvroParquetWriter to stream directly to S3 by passing it a Hadoop Path that is created with a URI parameter and setting the proper configs.

val uri = new URI("s3a://<bucket>/<key>")
val path = new Path(uri)

val config = new Configuration()
config.set("fs.s3a.access.key", key)
config.set("fs.s3a.secret.key", secret)
config.set("fs.s3a.session.token", sessionToken)
config.set("fs.s3a.aws.credentials.provider", credentialsProvider)

val writer = AvroParquetWriter.builder[GenericRecord](path).withConf(config).withSchema(schema).build()

I used the following dependencies (sbt format):

"org.apache.avro" % "avro" % "1.8.1"
"org.apache.hadoop" % "hadoop-common" % "2.9.0"
"org.apache.hadoop" % "hadoop-aws" % "2.9.0"
"org.apache.parquet" % "parquet-avro" % "1.8.1"
Christa answered 14/4, 2020 at 21:16 Comment(4)
This is the correct answer but needs to be fleshed out. Please include the jars you used. We did it this way and discovered that only Windows needs the extra .exe files and such. Linux (fargate on AWS) did not. These jars are updated and supported by AWS and the documentation is great. Nice work, noobie!Rossy
As far as I can tell, you don't need to include avro dependency explicitly. It comes with parquet-avro.Warison
I ended up also running on Windows, and needed a winutils.exe file in my HADOOP_HOME environment variable path.Christa
This worked for me, might be obvious to others but I had to call writer.close() to actually complete the upload.Kinslow
U
0

Hopefully I am not misunderstanding the question, but it seems here what you are doing is converting a avro to parquet and you'd like to upload the parquet to s3

After you close your ParquetWriter, you should call a method that looks like this (granted this doesn't intercept the stream writing from avro to parquet, it just streams the parquet file that is no longer being written to):

        AmazonS3 s3Client = AmazonS3ClientBuilder.standard().withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("ACCESS_KEY", "SECRET_KEY"))).build();
        S3Path outputPath = new S3Path();
        outputPath.setBucket("YOUR_BUCKET");
        outputPath.setKey("YOUR_FOLDER_PATH");
        try {
            InputStream parquetStream = new FileInputStream(new File(parquetFile));
            s3Client.putObject(outputPath.getBucket(), outputPath.getKey(), parquetStream, null);
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        }

using the AWS SDK

<dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>aws-java-sdk</artifactId>
    <version>1.11.749</version>
</dependency>

Of course the method would reside in a different utils class and the constructor of this method should initialize the AmazonS3 s3Client with the credentials, so all you'd need to do is invoke and access it's s3Client member to put objects

hope this helps

Unarm answered 24/3, 2020 at 5:13 Comment(4)
Good try but this just shows how to upload a file to S3. Hadoop makes this difficult as the file is saved to disk first. I want to go DIRECTLY to s3 from ParquetWriter.Rossy
@Rossy Hadoop uses HDFS which models POSIX file system behavior while S3 is a object store, not a file system- so to write directly to s3 you'd have to configure your clusters HDFS configurations to synchronize with S3- but again, your Hadoop job would still leverage the Hadoop API's HDFS implementation and you'd be writing on cluster disk space. If you use AWS EMR- this sync is set up already- and you can push objects from HDFS into s3. What would your use case be to do such an operation where writing to HDFS is being avoided?Unarm
We are merely converting data to parquet. As is seen above, the parquetWriter only writes to the local file system. We need to write to s3 without hadoop being involved.Rossy
@Rossy You cannot write directly to S3 without some wrapper which is modifying the Hadoop API HDFS implementation to work with an object store (s3) by creating some sudo filesystem tree based on S3 paths (which m4gic provided a link for). The idea of Hadoop is to use HDFS because during map reduce, you'd be able to locally retrieve stored blocks and a shuffle is internal- where YARN can manage it. I guess, what I am trying to understand is why go through this effort to use a wrapper that will probably be harder to maintain in the long run anyways? Unless disk space is the concern?Unarm

© 2022 - 2024 — McMap. All rights reserved.