Actually, it appeared that the exception
This type (GenericType<Test>) cannot be used as key
remains even for ordinary case class (not generated via reflection)
case class Test(a: String, b: Int, c: String, d: Long)
The first issue is that this case class is not a POJO
Java and Scala classes are treated by Flink as a special POJO data
type if they fulfill the following requirements:
The class must be public.
It must have a public constructor without arguments (default constructor).
All fields are either public or must be accessible through getter and setter functions. For a field called foo the getter and setter
methods must be named getFoo() and setFoo().
The type of a field must be supported by a registered serializer.
So you should replace
case class Test(a: String, b: Int, c: String, d: Long)
import scala.beans.BeanProperty
case class Test(
@BeanProperty var a: String,
@BeanProperty var b: Int,
@BeanProperty var c: String,
@BeanProperty var d: Long) {
def this() = {
this(null, 0, null, 0)
The second issue can be that Flink doesn't allow inner-class POJOs that are not static inner classes but reflective toolbox generates a local class nested into a method
Rules for POJO types
Flink recognizes a data type as a POJO type (and allows “by-name”
field referencing) if the following conditions are fulfilled:
- The class is public and standalone (no non-static inner class)
- The class has a public no-argument constructor
- All non-static, non-transient fields in the class (and all superclasses) are either public (and non-final) or have a public
getter- and a setter- method that follows the Java beans naming
conventions for getters and setters.
Here is decompiled version of toolbox-generated code
public final class __wrapper$1$a077cb72a4ee423291aac7dfb47454b9$ {
public Object wrapper() {
new LazyRef();
class Test$1 implements Product, Serializable {
private String a;
private int b;
private String c;
private long d;
return scala.reflect.package..MODULE$.classTag(scala.reflect.ClassTag..MODULE$.apply(Test$1.class)).runtimeClass();
The full decompiled code:
So it's possible that if it's really necessary to generate a class for Flink it should be generated manually rather than via toolbox
But the code with a class generated manually
still throws This type (GenericType<java.lang.Object>) cannot be used as key
I guess the reason for that is the following (and this is the third issue).
The code with ordinary case class (not generated) seems to work
But if we replace type Test
with Any
then it throws This type (GenericType<java.lang.Object>) cannot be used as key
And with reflection we can't return anything but Any.
Now I'm creating TypeInformation[Test]
inside my code generated, this seems to fix This type (GenericType<java.lang.Object>) cannot be used as key
but now I have
org.apache.flink.api.common.InvalidProgramException: UTF-8 is not serializable.
The object probably contains or references non serializable fields.
I resolved the issue with InvalidProgramException: UTF-8 is not serializable
annotating fields of MapFunc
with @transient
Actually it appeared that if we create TypeInformation
inside code generated, then toolbox is enough
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import scala.reflect.runtime
import scala.reflect.runtime.universe._
object App {
val toolbox = ToolBox(runtime.currentMirror).mkToolBox()
class MapFunc extends RichMapFunction[String, Any] {
var typeInfo: TypeInformation[_] = _
@transient var classSymbol: ClassSymbol = _
override def open(parameters: Configuration): Unit = {
val code =
"""|case class Test(
| @scala.beans.BeanProperty var a: String,
| @scala.beans.BeanProperty var b: Int,
| @scala.beans.BeanProperty var c: String,
| @scala.beans.BeanProperty var d: Long) {
| def this() = {
| this(null, 0, null, 0)
| }
val tree = toolbox.parse(code)
classSymbol = toolbox.define(tree.asInstanceOf[ImplDef]).asClass
typeInfo = toolbox.eval(
override def map(value: String): Any = {
val values = Seq("aaa", 1, "ccc", 2L) //hardcoded for now
createClassInstance(classSymbol, values: _*)
def main(args: Array[String]): Unit = {
val func = new MapFunc Configuration)
val classInstance ="""{a: "aaa", b: 1, c: "ccc", d: 2}""")
println(classInstance) //Test(aaa,1,ccc,2)
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.socketTextStream("localhost", 9999)
val typeInfo = func.typeInfo.asInstanceOf[TypeInformation[Any]]
println(typeInfo)//PojoType<__wrapper$1$75434c8e32f541f7a87513a2ad2aa0ce.Test, fields = [a: String, b: Integer, c: String, d: Long]>
val res ="a", "c").sum("b")
def createClassInstance(classSymbol: ClassSymbol, args: Any*): Any = {
val runtimeMirror = toolbox.mirror
val classType = classSymbol.typeSignature
val constructorSymbol = classType.decl(termNames.CONSTRUCTOR).alternatives.head.asMethod
val classMirror = runtimeMirror.reflectClass(classSymbol)
val constructorMirror = classMirror.reflectConstructor(constructorSymbol)
constructorMirror(args: _*)
