Kotlin Iterable not supported in Apache Beam?
Asked Answered
C

5

10

Apache beam seems to be refusing to recognise Kotlin's Iterable. Here is a sample code:

@ProcessElement
fun processElement(
    @Element input: KV<String, Iterable<String>>, receiver: OutputReceiver<String>
) {
    val output = input.key + "|" + input.value.toString()
    println("output: $output")
    receiver.output(output)
}

I get the following weird error:

java.lang.IllegalArgumentException:
   ...PrintString, @ProcessElement processElement(KV, OutputReceiver), @ProcessElement processElement(KV, OutputReceiver):
   @Element argument must have type org.apache.beam.sdk.values.KV<java.lang.String, java.lang.Iterable<? extends java.lang.String>>

Sure enough, if I replace Iterable with java.lang.Iterable, the same code works just fine. What am I doing wrong?

Version of depedencies:

  • kotlin-jvm: 1.3.21
  • org.apache.beam: 2.11.0

Here is a gist with full codes and stack trace:

Update:

After a bit of trial and error, I found out that while List<String> throws similar exception but MutableList<String> actually works:

class PrintString: DoFn<KV<String, MutableList<String>>, String>() {
    @ProcessElement
    fun processElement(
        @Element input: KV<String, MutableList<String>>, receiver: OutputReceiver<String>
    ) {
        val output = input.key + "|" + input.value.toString()
        println("output: $output")
        receiver.output(output)
    }
}

So, this reminded me that Kotlin's Immutable collection are actually only interface and that underlying collection is still mutable. However, attempt to replace Iterable with MutableIterable continue to raise the error.

Update 2:

I deployed my Kotlin Dataflow job using the MutableList per above and job failed with:

java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: java.lang.ClassCastException:
org.apache.beam.runners.dataflow.worker.util.BatchGroupAlsoByWindowViaIteratorsFn$WindowReiterable cannot be cast to java.util.List
    at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:184)
    at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner$1.outputWindowedValue(GroupAlsoByWindowFnRunner.java:102)

I had to switch back to use java.lang.Iterable.

Cowitch answered 29/4, 2019 at 18:32 Comment(2)
Is this at runtime or at compile time? Can you share more of the stack trace?Scenario
@Scenario stack trace added to gist.github.com/marcoslin/…. ThanksCowitch
W
6

I ran into this problem as well, when using a ParDo following a GroupByKey. It turns out that a @JvmWildcard annotation is needed in the Iterable generic type when writing a transformation that accepts the result of a GroupByKey.

See the contrived example below that reads a file and groups by the first character of each line.

class BeamPipe {
  class ConcatLines : DoFn<KV<String, Iterable<@JvmWildcard String>>, KV<String, String>>() {
    @ProcessElement
    fun processElement(@Element input: KV<String, Iterable<@JvmWildcard String>>, receiver: OutputReceiver<KV<String, String>>) {
      receiver.output(KV.of(input.key, input.value.joinToString("\n")))
    }
  }

  fun pipe(options: PipelineOptions) {
    val file =
        "testFile.txt"
    val p = Pipeline.create(options)
    p.apply(TextIO.read().from(file))
        .apply("Key lines by first character",
            WithKeys.of { line: String -> line[0].toString() }
                .withKeyType(TypeDescriptors.strings()))
        .apply("Group lines by first character", GroupByKey.create<String, String>())
        .apply("Concatenate lines", ParDo.of(ConcatLines()))
        .apply("Write to files", FileIO.writeDynamic<String, KV<String, String>>()
            .by { it.key }
            .withDestinationCoder(StringUtf8Coder.of())
            .via(Contextful.fn(ProcessFunction { it.value }), TextIO.sink())
            .to("whatever")
            .withNaming { key -> FileIO.Write.defaultNaming(key, ".txt") }
        )
    p.run()
  }
}
Wicked answered 16/3, 2020 at 4:0 Comment(0)
E
3

This looks like a bug in the Beam Kotlin SDK. The Reflection analysis for your @ProcessElement method does not work correctly. You can probably work around this by using ProcessContext ctx instead of using the @Element parameter.

Enticement answered 8/5, 2019 at 9:39 Comment(2)
Thanks @mxm. I tried that but still getting the same IllegalArgumentException. Added sample code and stacktrace to gist above. Similar to the other example, MutableList works just fine.Cowitch
Got it. Thanks for the update! We will look into this.Enticement
R
1

I am not very familiar with kotlin but it seems that you need to import import java.lang.Iterable before using it in your code.

Roughrider answered 29/4, 2019 at 23:3 Comment(1)
thanks but per my question, if I do the import it does work. That shouldn't be needed with Kotlin as Iterable is equivalent to java.lang.Iterable. This problem manifest itself in other area as well. For example, when using GroupByKey that returns a Iterable (which I cannot set), it raises the same error when combined with ParDo.Cowitch
F
0

May I know how to fix the issue when we get the iterable from groupbykey.create(). i could not groupbykey as you did javalang iterable

Fetial answered 7/12, 2019 at 1:22 Comment(3)
this is not an answer to the questionAmericanize
@Fetial you should ask a question with some sample code where other can easily run. I can try to help further with more info.Cowitch
@Fetial Just noticed you are experiencing similar problem. My answer above might be useful to you.Revolutionist
R
0

For those are experiencing this issue and found their way here, my current workaround to keep writing the pipeline in kotlin is to create a Java static class with function(s) that creates, contains, and processes your Iterable(s). The result (in non-iterable format) can then be passed back to the kotlin.

Revolutionist answered 9/12, 2019 at 4:24 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.