I'm trying to use spark mllib lda to summarize my document corpus.
My problem setting is as bellow.
- about 100,000 documents
- about 400,000 unique words
- 100 cluster
I have 16 servers (each has 20 cores and 128GB memory).
When I execute LDA with OnlineLDAOptimizer
, it gives out of memory error, suggesting me to increase spark.driver.maxResultSize
like
Total size of serialized results of 11 tasks (1302 MB) is bigger than spark.driver.maxResultSize
I increased spark.driver.maxResultSize
to 120GB (and also spark.driver.memory
to 120GB) and re-run LDA but no lack.
It still says Total size of serialized results of 11 tasks (120.1 GB) is bigger than spark.driver.maxResultSize
I tried another dataset with about 100,000 unique words and it worked.
So, how can I estimate the memory usage when using Spark mllib LDA? I couldn't find any specification in the official documentation.
Note I used sparse vector for constructing docuemnt RDD[(Long, Vector)]
passed to LDA.run()
but don't know whether spark lda can handle sparse format correctly in internal.
(editted) I used Scala version of LDA. Not Python version.
This may be a related issue but no clear answer was given. Spark LDA woes - prediction and OOM questions
(edited)
This is a snippet of my code (gist). https://gist.github.com/lucidfrontier45/11420721c0078c5b7415
def startJob(args: RunArgs)(implicit sc: SparkContext): Unit = {
val src = sc.textFile(args.fname, minPartitions = args.n_partitions).map(_.split("\t"))
.flatMap {
// input file's format is (user_id, product_name, count)
case Array(u, p, r, t) => Some((u.toInt, p.toInt, r.toDouble))
case _ => None
}.persist()
// Map to convert user_id or product_name into unique sequencential id
val userid_map = src.map(_._1).distinct().zipWithIndex().collect().toMap
val productid_map = src.map(_._2).distinct().zipWithIndex().collect().toMap
val inverse_userid_map = userid_map.map(_.swap)
// broadcat to speedup RDD map operation
val b_userid_map = sc.broadcast(userid_map)
val b_productid_map = sc.broadcast(productid_map)
val b_inverse_userid_map = sc.broadcast(inverse_userid_map)
// run map
val transformed_src = src.map { case (u, p, r) =>
(b_userid_map.value(u), b_productid_map.value(p).toInt, r)
}
println("unique items = %d".format(b_productid_map.value.size))
// prepare for LDA input RDD[(LONG, Vector)]
val documents = transformed_src.map { case (u, p, r) => (u, (p, r)) }
.groupByKey()
.map { t => (t._1, Vectors.sparse(b_productid_map.value.size, t._2.toSeq)) }.persist()
documents.count()
src.unpersist()
// run Online Variational LDA
val ldamodel = new LDA()
.setK(args.k)
.setMaxIterations(args.n_iter)
.setOptimizer("online")
.run(documents)
.asInstanceOf[LocalLDAModel]
val result = ldamodel.topicDistributions(documents)
.map { case (i, v) =>
val u = b_inverse_userid_map.value(i)
"%d,%s".format(u, v.toArray.mkString(","))
}
result.saveAsTextFile(args.out)
}
Actually, I use LDA for dimensional reduction of transaction data. My data is in the format of (u, p, r)
where u
is user id, p
is product name, r
is the number user u
interacted with p
. user corresponds to document and product to word in this case. Since user id and product name are arbitrary string, I converted them to unique sequential integers before submitting to LDA.
Thank you.
mllib
LDA does handle sparse vector properly when training. – LimacinetopicsMatrix
(#docs * #clusters) anddescribeTopics
(this is proportional to (#clusters * #tokens * 2). At first glance it shouldn't account for 120GB but it still a lot. – ParrakeetHashingTF
in the spark pipeline to2^18-1
(one less than the default) everything works fine and I can save the entire pipeline withspark.driver.memory=1g
. Using the default2^18
causes the save operation to produce a heap space OOM error. I also tried increasing thespark.driver.memory
andspark.driver.maxResultSize
to8g
but that didn't help with2^18
features - it's not clear to me if it's the driver of the workers that run out of memory. – Bendix.saveAsTextFile( ... )
. – Ruckus