Encoder for Row Type Spark Datasets
P

3

37

I would like to write an encoder for a Row type in DataSet, for a map operation that I am doing. Essentially, I do not understand how to write encoders.

Below is an example of a map operation:

In the example below, instead of returning Dataset<String>, I would like to return Dataset<Row>

Dataset<String> output = dataset1.flatMap(new FlatMapFunction<Row, String>() {
            @Override
            public Iterator<String> call(Row row) throws Exception {

                ArrayList<String> obj = //some map operation
                return obj.iterator();
            }
        },Encoders.STRING());

I understand that instead of a string Encoder needs to be written as follows:

    Encoder<Row> encoder = new Encoder<Row>() {
        @Override
        public StructType schema() {
            return join.schema();
            //return null;
        }

        @Override
        public ClassTag<Row> clsTag() {
            return null;
        }
    };

However, I do not understand the clsTag() in the encoder, and I am trying to find a running example which can demostrate something similar (i.e. an encoder for a row type)

Edit - This is not a copy of the question mentioned : Encoder error while trying to map dataframe row to updated row as the answer talks about using Spark 1.x in Spark 2.x (I am not doing so), also I am looking for an encoder for a Row class rather than resolve an error. Finally, I was looking for a solution in Java, not in Scala.

Pitching answered 5/4, 2017 at 18:13 Comment(0)
P
49

The answer is to use a RowEncoder and the schema of the dataset using StructType.

Below is a working example of a flatmap operation with Datasets:

    StructType structType = new StructType();
    structType = structType.add("id1", DataTypes.LongType, false);
    structType = structType.add("id2", DataTypes.LongType, false);

    ExpressionEncoder<Row> encoder = RowEncoder.apply(structType);

    Dataset<Row> output = join.flatMap(new FlatMapFunction<Row, Row>() {
        @Override
        public Iterator<Row> call(Row row) throws Exception {
            // a static map operation to demonstrate
            List<Object> data = new ArrayList<>();
            data.add(1l);
            data.add(2l);
            ArrayList<Row> list = new ArrayList<>();
            list.add(RowFactory.create(data.toArray()));
            return list.iterator();
        }
    }, encoder);
Pitching answered 8/4, 2017 at 14:37 Comment(2)
shouldn't this fail in cluster mode because ArrayList is not serializableWiburg
In Spark 3.5, I had to use RowEncoder.encoderFor instead of RowEncoder.applyManage
F
10

I had the same problem... Encoders.kryo(Row.class)) worked for me.

As a bonus, the Apache Spark tuning docs refer to Kryo it since it’s faster at serialization "often as much as 10x":

https://spark.apache.org/docs/latest/tuning.html

Fondea answered 26/1, 2018 at 16:41 Comment(3)
Doesn't that break columnar storage if you serialize the dataset to a Parquet file, though?Reform
What's the difference between using kryo encoder and the RowEncoder, considering both works?Weatherspoon
isn't kryo supposed to be a fallback encoder?, Kryo is a generalized encoder,and custom row encoders should perform better performance (basically allowing you to code for more optimized encoding for your schema), as far as i remember, please correct if i'm missing somethingEdlyn
R
0

RowEncoder is internal class which is undocumented, and will break unexpectedly when you upgrade Spark. (As an above comment says, RowEncoder.apply no longer works in Spark 3.5.)

Here is the proper method that relies on documented public API only. The trick is creating a empty DataFrame with intended schema, and obtain the row encoder from the empty DataFrame.

import java.util.List;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;

StructType structType = new StructType()
    .add("id1", DataTypes.LongType, false)
    .add("id2", DataTypes.LongType, false);
List<Row> emptyRow = List.of();

Encoder<Row> encoder = spark.createDataFrame(emptyRow, structType).encoder();

Alternatively, if you are using Spark 3.5, there is a simpler way, no more need to use the trick.

import org.apache.spark.sql.Encoders;
Encoder<Row> encoder = Encoders.row(structType);
Randee answered 16/7 at 1:12 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.