Modify collection inside a Spark RDD foreach
Asked Answered
M

1

18

I'm trying to add elements to a map while iterating the elements of an RDD. I'm not getting any errors, but the modifications are not happening.

It all works fine adding directly or iterating other collections:

scala> val myMap = new collection.mutable.HashMap[String,String]
myMap: scala.collection.mutable.HashMap[String,String] = Map()

scala> myMap("test1")="test1"

scala> myMap
res44: scala.collection.mutable.HashMap[String,String] = Map(test1 -> test1)

scala> List("test2", "test3").foreach(w => myMap(w) = w)

scala> myMap
res46: scala.collection.mutable.HashMap[String,String] = Map(test2 -> test2, test1 -> test1, test3 -> test3)

But when I try to do the same from an RDD:

scala> val fromFile = sc.textFile("tests.txt")
...
scala> fromFile.take(3)
...
res48: Array[String] = Array(test4, test5, test6)

scala> fromFile.foreach(w => myMap(w) = w)
scala> myMap
res50: scala.collection.mutable.HashMap[String,String] = Map(test2 -> test2, test1 -> test1, test3 -> test3)

I've tried printing the contents of the map as it was before the foreach to make sure the variable is the same, and it prints correctly:

fromFile.foreach(w => println(myMap("test1")))
...
test1
test1
test1
...

I've also printed the modified element of the map inside the foreach code and it prints as modified, but when the operation is completed, the map seems unmodified.

scala> fromFile.foreach({w => myMap(w) = w; println(myMap(w))})
...
test4
test5
test6
...
scala> myMap
res55: scala.collection.mutable.HashMap[String,String] = Map(test2 -> test2, test1 -> test1, test3 -> test3)

Converting the RDD to an array (collect) also works fine:

fromFile.collect.foreach(w => myMap(w) = w)
scala> myMap
res89: scala.collection.mutable.HashMap[String,String] = Map(test2 -> test2, test5 -> test5, test1 -> test1, test4 -> test4, test6 -> test6, test3 -> test3)

Is this a context problem? Am I accessing a copy of the data that is being modified somewhere else?

Monegasque answered 30/4, 2014 at 17:19 Comment(0)
C
36

It becomes clearer when running on a Spark cluster (not a single machine). The RDD is now spread over several machines. When you call foreach, you tell each machine what to do with the piece of the RDD that it has. If you refer to any local variables (like myMap), they get serialized and sent to the machines, so they can use it. But nothing comes back. So your original copy of myMap is unaffected.

I think this answers your question, but obviously you are trying to accomplish something and you will not be able to get there this way. Feel free to explain here or in a separate question what you are trying to do, and I will try to help.

Citrange answered 30/4, 2014 at 19:29 Comment(3)
It does indeed answer my question, and don't worry about what I was trying to accomplish, I just found this an interesting case I didn't have an explanation for. I do now, thanks!Monegasque
Yes as Daniel points out you can't mutate state, palako is kind of missing the point of functional programming. Your not supposed to be mutating state as then you can't parallerize. By designing code in such a way that you do not mutate state, your code can parallerize for free and you can use frameworks like Spark and Scalding to distribute across a cluster.Foretop
There is no single general workaround I think, you just need to solve the problem without this sort of thing. This is pretty much always possible. For example, let's say you want to put some elements of the RDD in a collection, sort of like in the question. The solution then is to use RDD.filter instead of RDD.foreach, so you get another RDD with the elements you wanted. Then you can process them further via Spark, or use collect to fetch them to the driver and process them locally.Citrange

© 2022 - 2024 — McMap. All rights reserved.