Apache beam seems to be refusing to recognise Kotlin's Iterable
. Here is a sample code:
@ProcessElement
fun processElement(
@Element input: KV<String, Iterable<String>>, receiver: OutputReceiver<String>
) {
val output = input.key + "|" + input.value.toString()
println("output: $output")
receiver.output(output)
}
I get the following weird error:
java.lang.IllegalArgumentException:
...PrintString, @ProcessElement processElement(KV, OutputReceiver), @ProcessElement processElement(KV, OutputReceiver):
@Element argument must have type org.apache.beam.sdk.values.KV<java.lang.String, java.lang.Iterable<? extends java.lang.String>>
Sure enough, if I replace Iterable
with java.lang.Iterable
, the same code works just fine. What am I doing wrong?
Version of depedencies:
- kotlin-jvm:
1.3.21
- org.apache.beam:
2.11.0
Here is a gist with full codes and stack trace:
Update:
After a bit of trial and error, I found out that while List<String>
throws similar exception but MutableList<String>
actually works:
class PrintString: DoFn<KV<String, MutableList<String>>, String>() {
@ProcessElement
fun processElement(
@Element input: KV<String, MutableList<String>>, receiver: OutputReceiver<String>
) {
val output = input.key + "|" + input.value.toString()
println("output: $output")
receiver.output(output)
}
}
So, this reminded me that Kotlin's Immutable collection are actually only interface and that underlying collection is still mutable. However, attempt to replace Iterable
with MutableIterable
continue to raise the error.
Update 2:
I deployed my Kotlin Dataflow job using the MutableList
per above and job failed with:
java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: java.lang.ClassCastException:
org.apache.beam.runners.dataflow.worker.util.BatchGroupAlsoByWindowViaIteratorsFn$WindowReiterable cannot be cast to java.util.List
at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:184)
at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner$1.outputWindowedValue(GroupAlsoByWindowFnRunner.java:102)
I had to switch back to use java.lang.Iterable
.