java.lang.IllegalStateException: Unable to return a default Coder in dataflow 2.X
Asked Answered
L

2

9

I have a simple pipeline in dataflow 2.1 sdk. Which reads data from pubsub then applies a DoFn to it.

PCollection<MyClass> e = streamData.apply("ToE", ParDo.of(new MyDoFNClass()));

Getting below error on this pipeline:

java.lang.IllegalStateException: Unable to return a default Coder for ToEvents/ParMultiDo(MyDoFNClass).out0 [PCollection]. Correct one of the following root causes: No Coder has been manually specified; you may do so using .setCoder(). Inferring a Coder from the CoderRegistry failed: Unable to provide a Coder for com.X.X.model.MyClass.

MyDoFn class is below:

@DefaultCoder(AvroCoder.class)

public class MyClass{

    public long id;
    public HashMap<String,HashSet<String>> a;

    @SerializedName("a")
    public Integer Id;
    @SerializedName("ae")
    public String ae;
}
Lewak answered 8/12, 2017 at 0:34 Comment(1)
1) What does MyDoFNClass look like? 2) Alternatively, have you tried specifying a coder manually using .setCoder() as the message suggests?Federalist
L
26

Found the solution just neeeded to add implements Serializable to MyClass

@DefaultCoder(AvroCoder.class)

public class MyClass implements Serializable {

public long id;
public HashMap<String,HashSet<String>> a;

@SerializedName("a")
public Integer Id;
@SerializedName("ae")
public String ae;
}
Lewak answered 8/12, 2017 at 1:43 Comment(1)
To clarify - for this error, add implements Serializable to the class that defines the objects that your storing in the PCollection, i.e. add it to any class that describes the entity being serialized/deserializedToomer
S
0

Below are some docs about coder from beam programming guide

The Beam SDKs require a coder for every PCollection in your pipeline. In most cases, the Beam SDK is able to automatically infer a Coder for a PCollection based on its element type or the transform that produces it, however, in some cases the pipeline author will need to specify a Coder explicitly, or develop a Coder for their custom type.

Each Pipeline object has a CoderRegistry object, which maps language types to the default coder the pipeline should use for those types. You can use the CoderRegistry yourself to look up the default coder for a given type, or to register a new default coder for a given type.

Go to the below link to see default coders used by beam libraries - https://beam.apache.org/documentation/programming-guide/#default-coders-and-the-coderregistry

If your object that you are using in pCollections does not lies within the default coder, you may have to provide a custom coder for that object. for e.g. If you look at the implemenation of PubsubIO.write()/PubsubIO.read() methods, they use a custom coder. e.g. PubsubMessagePayloadOnlyCoder

Suppose you are converting a string into Pubsub Message. You can supply this coder to your pcollection.

PCollection<PubsubMessage> pubsubMessagePCollection = pCollectionTuple.get(accountId);
pubsubMessagePCollection.setCoder(PubsubMessagePayloadOnlyCoder.of());
Steddman answered 8/12, 2020 at 7:31 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.