Infinite recursion in createDataFrame for avro types
Asked Answered
A

1

4

I'm getting a StackOverflowError from inside the createDataFrame call in this example. It originates in scala code involving java type inferencing which calls itself in an infinite loop.

final EventParser parser = new EventParser();
JavaRDD<Event> eventRDD = sc.textFile(path)
        .map(new Function<String, Event>()
{
    public Event call(String line) throws Exception
    {
        Event event = parser.parse(line);
        log.info("event: "+event.toString());
        return event;
    }
});
log.info("eventRDD:" + eventRDD.toDebugString());

DataFrame df = sqlContext.createDataFrame(eventRDD, Event.class);
df.show();

The bottom of the stack trace looks like this:

at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$inferDataType(JavaTypeInference.scala:102)
at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:104)
at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:102)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)

This looks similar to the bug reported in http://apache-spark-developers-list.1001551.n3.nabble.com/Stackoverflow-in-createDataFrame-td11791.html but I'm using Spark 1.4.1 which is later than when this bug was repaired.

The Event class is generated by avro from this avsc. It does contain double and long fields which has been reported as causing problems but replacing double with string doesn't change the symptoms.

{
    "namespace": "mynamespace", 
    "type": "record", 
    "name": "Event", 
    "fields": [
        { "name": "ts", "type": "double", "doc": "Timestamp"},
        { "name": "uid", "type": "string", "doc": "Unique ID of Connection"},
        { "name": "idorigh", "type": "string", "doc": "Originating endpoint’s IP address (AKA ORIG)"},
        { "name": "idorigp", "type": "int", "doc": "Originating endpoint’s TCP/UDP port (or ICMP code)"},
        { "name": "idresph", "type": "string", "doc": "Responding endpoint’s IP address (AKA RESP)"},
        { "name": "idrespp", "type": "int", "doc": "Responding endpoint’s TCP/UDP port (or ICMP code)"},
        { "name": "proto", "type": "string", "doc": "Transport layer protocol of connection"},
        { "name": "service", "type": "string", "doc": "Dynamically detected application protocol, if any"},
        { "name": "duration", "type": "double", "doc": "Time of last packet seen – time of first packet seen"},
        { "name": "origbytes", "type": "int", "doc": "Originator payload bytes; from sequence numbers if TCP"},
        { "name": "respbytes", "type": "int", "doc": "Responder payload bytes; from sequence numbers if TCP"},
        { "name": "connstate", "type": "string", "doc": "Connection state (see conn.log:conn_state table)"},
        { "name": "localorig", "type": "boolean", "doc": "If conn originated locally T; if remotely F."},
        { "name": "localresp", "type": "boolean", "doc": "empty, always unset"},
        { "name": "missedbytes", "type": "int", "doc": "Number of missing bytes in content gaps"},
        { "name": "history", "type": "string", "doc": "Connection state history (see conn.log:history table)"},
        { "name": "origpkts", "type": [ "int", "null"], "doc": "Number of ORIG packets"},
        { "name": "origipbytes", "type": [ "int", "null"], "doc": "Number of RESP IP bytes (via IP total_length header field)"},
        { "name": "resppkts", "type": [ "int", "null"], "doc": "Number of RESP packets"},
        { "name": "respipbytes", "type": [ "int", "null"], "doc": "Number of RESP IP bytes (via IP total_length header field)"},
        { "name": "tunnelparents", "type": [ "string", "null"], "doc": "If tunneled, connection UID of encapsulating parent (s)"},
        { "name": "origcc", "type": ["string", "null"], "doc": "ORIG GeoIP Country Code"},
        { "name": "respcc", "type": ["string", "null"], "doc": "RESP GeoIP Country Code"}
    ]
}

Could someone please advise? Thanks!

Askja answered 10/4, 2016 at 16:17 Comment(12)
Where is the infinite recursion ? Please provide a minimum reproducible example !Paterson
The recursion starts at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) and continues until a stack overflow error. The minimum example involves a complex pom, an avsc file, a parser and several others. So I've packed them all up in a tgz. How should I get that to you?Askja
Ok that would be a bit overwhelming then I think. Let's isolate part of your code to figure out what's breaking. e. g before transforming the RDD into a DataFrame, perform a count action and/or an RDD collect.Paterson
I am honestly surprised parser serializes. Are you using local mode only?Lilybelle
Parser creates Event instances by parsing a line of input text as opposed to serializing. The same class happens to also implement Encoder<> and Decoder<> to minimize the number of classes (personal taste) but those have passed unit tests. I've backed off to using local mode to diagnose the same problem I experienced on the cluster via yarn-client.Askja
elias: I tried your idea which did expose a bug that I've fixed. Both collect and count now work fine. But I keep getting the same problem I reported before.Askja
how did you come to conclusion that it's infinitely looping ? I still don't see that point.Paterson
From the stacktrace in eclipse. What remains of it at least. Console limits its length.Askja
By replacing the avro-generated bean with a hand-coded one and steadily pruning it down to bare essentials, I've confirmed that the stack overflow is triggered by precisely this avro-generated line: public org.apache.avro.Schema getSchema() { return SCHEMA$; }. Does anybody have any idea what's causing this and how to get around it?Askja
@Askja is this fixed at your side? I am having the same problem.Weidar
No. The only fix I've found is to avoid avro altogether. Stackoverflow was no help with this one.Askja
Also found it too confusing to remember what types are supported in avro versus parquet. Wound up working exclusively with parquet.Askja
G
1

There is work being done in the spark-avro project to address this issue see: https://github.com/databricks/spark-avro/pull/217 and https://github.com/databricks/spark-avro/pull/216

Once this is merged, there should be a function to convert an RDD of Avro objects into a DataSet (a DataSet of Rows is equivalent to a DataFrame), without the circular reference issue with the getSchema() function in the generated class.

Grill answered 28/9, 2017 at 23:0 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.