I'm getting a StackOverflowError from inside the createDataFrame call in this example. It originates in scala code involving java type inferencing which calls itself in an infinite loop.
final EventParser parser = new EventParser();
JavaRDD<Event> eventRDD = sc.textFile(path)
.map(new Function<String, Event>()
{
public Event call(String line) throws Exception
{
Event event = parser.parse(line);
log.info("event: "+event.toString());
return event;
}
});
log.info("eventRDD:" + eventRDD.toDebugString());
DataFrame df = sqlContext.createDataFrame(eventRDD, Event.class);
df.show();
The bottom of the stack trace looks like this:
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$inferDataType(JavaTypeInference.scala:102)
at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:104)
at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:102)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
This looks similar to the bug reported in http://apache-spark-developers-list.1001551.n3.nabble.com/Stackoverflow-in-createDataFrame-td11791.html but I'm using Spark 1.4.1 which is later than when this bug was repaired.
The Event class is generated by avro from this avsc. It does contain double and long fields which has been reported as causing problems but replacing double with string doesn't change the symptoms.
{
"namespace": "mynamespace",
"type": "record",
"name": "Event",
"fields": [
{ "name": "ts", "type": "double", "doc": "Timestamp"},
{ "name": "uid", "type": "string", "doc": "Unique ID of Connection"},
{ "name": "idorigh", "type": "string", "doc": "Originating endpoint’s IP address (AKA ORIG)"},
{ "name": "idorigp", "type": "int", "doc": "Originating endpoint’s TCP/UDP port (or ICMP code)"},
{ "name": "idresph", "type": "string", "doc": "Responding endpoint’s IP address (AKA RESP)"},
{ "name": "idrespp", "type": "int", "doc": "Responding endpoint’s TCP/UDP port (or ICMP code)"},
{ "name": "proto", "type": "string", "doc": "Transport layer protocol of connection"},
{ "name": "service", "type": "string", "doc": "Dynamically detected application protocol, if any"},
{ "name": "duration", "type": "double", "doc": "Time of last packet seen – time of first packet seen"},
{ "name": "origbytes", "type": "int", "doc": "Originator payload bytes; from sequence numbers if TCP"},
{ "name": "respbytes", "type": "int", "doc": "Responder payload bytes; from sequence numbers if TCP"},
{ "name": "connstate", "type": "string", "doc": "Connection state (see conn.log:conn_state table)"},
{ "name": "localorig", "type": "boolean", "doc": "If conn originated locally T; if remotely F."},
{ "name": "localresp", "type": "boolean", "doc": "empty, always unset"},
{ "name": "missedbytes", "type": "int", "doc": "Number of missing bytes in content gaps"},
{ "name": "history", "type": "string", "doc": "Connection state history (see conn.log:history table)"},
{ "name": "origpkts", "type": [ "int", "null"], "doc": "Number of ORIG packets"},
{ "name": "origipbytes", "type": [ "int", "null"], "doc": "Number of RESP IP bytes (via IP total_length header field)"},
{ "name": "resppkts", "type": [ "int", "null"], "doc": "Number of RESP packets"},
{ "name": "respipbytes", "type": [ "int", "null"], "doc": "Number of RESP IP bytes (via IP total_length header field)"},
{ "name": "tunnelparents", "type": [ "string", "null"], "doc": "If tunneled, connection UID of encapsulating parent (s)"},
{ "name": "origcc", "type": ["string", "null"], "doc": "ORIG GeoIP Country Code"},
{ "name": "respcc", "type": ["string", "null"], "doc": "RESP GeoIP Country Code"}
]
}
Could someone please advise? Thanks!
parser
serializes. Are you using local mode only? – Lilybelle