Coder issues with Apache Beam and CombineFn
Asked Answered
A

2

9

We are building a pipeline using Apache Beam and DirectRunner as the runner. We are currently attempting a simple pipeline whereby we:

  1. Pull data from Google Cloud Pub/Sub (currently using the emulator to run locally)
  2. Deserialize into a Java object
  3. Window events using fixed windows of 1 minute
  4. Combine these windows using a custom CombineFn that transforms them from events into a list of events.

Pipeline code:

pipeline
.apply(PubsubIO.<String>read().topic(options.getTopic()).withCoder(StringUtf8Coder.of()))

.apply("ParseEvent", ParDo.of(new ParseEventFn()))

.apply("WindowOneMinute",Window.<Event>into(FixedWindows.of(Duration.standardMinutes(1))))              

.apply("CombineEvents", Combine.globally(new CombineEventsFn()));

ParseEvent function:

    static class ParseEventFn extends DoFn<String, Event> {
        @ProcessElement
        public void processElement(ProcessContext c) {
            String json = c.element();
            c.output(gson.fromJson(json, Event.class));
        }
    }

CombineEvents function:

public static class CombineEventsFn extends CombineFn<Event, CombineEventsFn.Accum, EventListWrapper> {
        public static class Accum {
            EventListWrapper eventListWrapper = new EventListWrapper();
        }

        @Override
        public Accum createAccumulator() {
            return new Accum();
        }

        @Override
        public Accum addInput(Accum accumulator, Event event) {
            accumulator.eventListWrapper.events.add(event);
            return accumulator;
        }

        @Override
        public Accum mergeAccumulators(Iterable<Accum> accumulators) {
            Accum merged = createAccumulator();
            for (Accum accum : accumulators) {
                merged.eventListWrapper.events.addAll(accum.eventListWrapper.events);
            }
            return merged;
        }

        @Override
        public EventListWrapper extractOutput(Accum accumulator) {
            return accumulator.eventListWrapper;
        }

    }

When attempting to run this locally using Maven and DirectRunner, we are getting the following error:

java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: Unable to return a default Coder for CombineEvents/Combine.perKey(CombineEvents)/Combine.GroupedValues/ParDo(Anonymous).out [PCollection]. Correct one of the following root causes:
  No Coder has been manually specified;  you may do so using .setCoder().
  Inferring a Coder from the CoderRegistry failed: Unable to provide a default Coder for org.apache.beam.sdk.values.KV<K, OutputT>. Correct one of the following root causes:
  Building a Coder using a registered CoderFactory failed: Cannot provide coder for parameterized type org.apache.beam.sdk.values.KV<K, OutputT>: Unable to provide a default Coder for java.lang.Object. Correct one of the following root causes:
  Building a Coder using a registered CoderFactory failed: Cannot provide coder based on value with class java.lang.Object: No CoderFactory has been registered for the class.
  Building a Coder from the @DefaultCoder annotation failed: Class java.lang.Object does not have a @DefaultCoder annotation.
  Building a Coder from the fallback CoderProvider failed: Cannot provide coder for type java.lang.Object: org.apache.beam.sdk.coders.protobuf.ProtoCoder$2@6e610150 could not provide a Coder for type java.lang.Object: Cannot provide ProtoCoder because java.lang.Object is not a subclass of com.google.protobuf.Message; org.apache.beam.sdk.coders.SerializableCoder$1@7adc59c8 could not provide a Coder for type java.lang.Object: Cannot provide SerializableCoder because java.lang.Object does not implement Serializable.
  Building a Coder from the @DefaultCoder annotation failed: Class org.apache.beam.sdk.values.KV does not have a @DefaultCoder annotation.
  Using the default output Coder from the producing PTransform failed: Unable to provide a default Coder for org.apache.beam.sdk.values.KV<K, OutputT>. Correct one of the following root causes:
  Building a Coder using a registered CoderFactory failed: Cannot provide coder for parameterized type org.apache.beam.sdk.values.KV<K, OutputT>: Unable to provide a default Coder for java.lang.Object. Correct one of the following root causes:
  Building a Coder using a registered CoderFactory failed: Cannot provide coder based on value with class java.lang.Object: No CoderFactory has been registered for the class.
  Building a Coder from the @DefaultCoder annotation failed: Class java.lang.Object does not have a @DefaultCoder annotation.
  Building a Coder from the fallback CoderProvider failed: Cannot provide coder for type java.lang.Object: org.apache.beam.sdk.coders.protobuf.ProtoCoder$2@6e610150 could not provide a Coder for type java.lang.Object: Cannot provide ProtoCoder because java.lang.Object is not a subclass of com.google.protobuf.Message; org.apache.beam.sdk.coders.SerializableCoder$1@7adc59c8 could not provide a Coder for type java.lang.Object: Cannot provide SerializableCoder because java.lang.Object does not implement Serializable.
  Building a Coder from the @DefaultCoder annotation failed: Class org.apache.beam.sdk.values.KV does not have a @DefaultCoder annotation.
    at org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions.checkState(Preconditions.java:444)
    at org.apache.beam.sdk.values.TypedPValue.getCoder(TypedPValue.java:51)
    at org.apache.beam.sdk.values.PCollection.getCoder(PCollection.java:130)
    at org.apache.beam.sdk.values.TypedPValue.finishSpecifying(TypedPValue.java:90)
    at org.apache.beam.sdk.runners.TransformHierarchy.finishSpecifyingInput(TransformHierarchy.java:143)
    at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:418)
    at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:334)
    at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:154)
    at org.apache.beam.sdk.transforms.Combine$Globally.expand(Combine.java:1459)
    at org.apache.beam.sdk.transforms.Combine$Globally.expand(Combine.java:1336)
    at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:420)
    at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:350)
    at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:167)
at ***************************.main(***************.java:231)
... 6 more

Apologies for the huge code dump - wanted to provide all the context.

I'm curious as to why it's complaining about no default coder for both java.lang.Object and org.apache.beam.sdk.values.KV<K, OutputT> - as far as I can tell our pipeline is changing types between String, Event, and EventListWrapper - the latter two classes have their default coders set on the class itself (AvroCoder in both cases).

The error is occurring on the line where we apply the CombineFn - can confirm that without this transform the pipeline works.

I suspect we've set up the combine transform incorrectly somehow, but as of yet have found nothing in the Beam documentation to point us in the right direction.

Any insight would be appreciated - thanks in advance!

Aconite answered 16/5, 2017 at 14:38 Comment(6)
Which version of Beam are you using?Dosi
Oops, probably should have included that: 0.6.0Aconite
is the Event.class coder registered?Pretzel
I believe so - I have @DefaultCoder(AvroCoder.class) on that class definitionAconite
Does the CombineEventsFn.Accum class have a coder registered?Dosi
It does not! I was just about to post that that was the issue :) Added @DefaultCoder(AvroCoder.class) to the Accum and that fixed it. Feel free to answer and I'll accept - thanks!Aconite
T
8

The probable reason you are seeing java.lang.Object is because Beam is trying to infer a coder for an unresolved type variable, which will be resolved to Object. This may be a bug in how coder inference is done within Combine.

Separately, I would expect the Accum class to also cause a failure of coder inference. You can override getAccumulatorCoder in your CombineFn to provide one quite directly.

Talos answered 16/5, 2017 at 17:55 Comment(1)
Thanks! I added @DefaultCoder(AvroCoder.class) to the class definition for CombineEventsFn.AvroCoder which I believe accomplished exactly that :)Aconite
S
6

Did you check if adding Serializable to your Accumulator works directly?

So add"implements Serializable" to Accum class ...

public static class Accum implements Serializable {
            EventListWrapper eventListWrapper = new EventListWrapper();
        }
Shirl answered 6/3, 2019 at 14:19 Comment(3)
Can you explain further how that resolves the given exception? There is already an existing and upvoted answer that takes another way, so you should definitely explain your codeBorg
Instead of specifying a Coder for the Accumulator explicitly, one may specify the Accum class as being Serializable (assuming that it is indeed Serializable). The above error is because the Coder for the Accumulator has not been specified. The approach above is basically asking for the Coder to overwrite the default Coder specification, which is fine if it needs to be done so. However, assuming that the class "EventListWrapper" can be Serialized, specifying "implements Serializable" makes it easierShirl
Please add all such information to the answer itself, not to the comment sectionBorg

© 2022 - 2024 — McMap. All rights reserved.