Spark LDA consumes too much memory
Asked Answered
S

1

14

I'm trying to use spark mllib lda to summarize my document corpus.

My problem setting is as bellow.

  • about 100,000 documents
  • about 400,000 unique words
  • 100 cluster

I have 16 servers (each has 20 cores and 128GB memory).

When I execute LDA with OnlineLDAOptimizer, it gives out of memory error, suggesting me to increase spark.driver.maxResultSize like Total size of serialized results of 11 tasks (1302 MB) is bigger than spark.driver.maxResultSize

I increased spark.driver.maxResultSize to 120GB (and also spark.driver.memory to 120GB) and re-run LDA but no lack. It still says Total size of serialized results of 11 tasks (120.1 GB) is bigger than spark.driver.maxResultSize

I tried another dataset with about 100,000 unique words and it worked.

So, how can I estimate the memory usage when using Spark mllib LDA? I couldn't find any specification in the official documentation.

Note I used sparse vector for constructing docuemnt RDD[(Long, Vector)] passed to LDA.run() but don't know whether spark lda can handle sparse format correctly in internal.

(editted) I used Scala version of LDA. Not Python version.

This may be a related issue but no clear answer was given. Spark LDA woes - prediction and OOM questions

(edited)

This is a snippet of my code (gist). https://gist.github.com/lucidfrontier45/11420721c0078c5b7415

def startJob(args: RunArgs)(implicit sc: SparkContext): Unit = {
    val src = sc.textFile(args.fname, minPartitions = args.n_partitions).map(_.split("\t"))
        .flatMap {
            // input file's format is (user_id, product_name, count)
            case Array(u, p, r, t) => Some((u.toInt, p.toInt, r.toDouble))
            case _ => None
        }.persist()

    // Map to convert user_id or product_name into unique sequencential id
    val userid_map = src.map(_._1).distinct().zipWithIndex().collect().toMap
    val productid_map = src.map(_._2).distinct().zipWithIndex().collect().toMap
    val inverse_userid_map = userid_map.map(_.swap)

    // broadcat to speedup RDD map operation
    val b_userid_map = sc.broadcast(userid_map)
    val b_productid_map = sc.broadcast(productid_map)
    val b_inverse_userid_map = sc.broadcast(inverse_userid_map)

    // run map
    val transformed_src = src.map { case (u, p, r) =>
        (b_userid_map.value(u), b_productid_map.value(p).toInt, r)
    }

    println("unique items = %d".format(b_productid_map.value.size))

    // prepare for LDA input RDD[(LONG, Vector)]
    val documents = transformed_src.map { case (u, p, r) => (u, (p, r)) }
        .groupByKey()
        .map { t => (t._1, Vectors.sparse(b_productid_map.value.size, t._2.toSeq)) }.persist()

    documents.count()
    src.unpersist()

    // run Online Variational LDA
    val ldamodel = new LDA()
        .setK(args.k)
        .setMaxIterations(args.n_iter)
        .setOptimizer("online")
        .run(documents)
        .asInstanceOf[LocalLDAModel]


    val result = ldamodel.topicDistributions(documents)
        .map { case (i, v) =>
            val u = b_inverse_userid_map.value(i)
            "%d,%s".format(u, v.toArray.mkString(","))
        }
    result.saveAsTextFile(args.out)
}

Actually, I use LDA for dimensional reduction of transaction data. My data is in the format of (u, p, r) where u is user id, p is product name, r is the number user u interacted with p. user corresponds to document and product to word in this case. Since user id and product name are arbitrary string, I converted them to unique sequential integers before submitting to LDA.

Thank you.

Sister answered 14/3, 2016 at 3:59 Comment(10)
As a note, mllib LDA does handle sparse vector properly when training.Limacine
@Mai Yes, I thought that. Still don't know why I'm in short of memory.Sister
Could you please show your code?Parrakeet
@Parrakeet I updated my post to include snippet of my code. Thanks for your help in advance.Sister
Thanks, it would be better to have this inside a question.. LDA returns at least two relatively large local objects topicsMatrix (#docs * #clusters) and describeTopics (this is proportional to (#clusters * #tokens * 2). At first glance it shouldn't account for 120GB but it still a lot.Parrakeet
@Parrakeet I know that LDA scales with O(#doc*#clusters + #words*#clusters) and should not consume that much memory. In fact, the case of #words = 100,000 was OK with 20GB memory, and I thought 80GB should be enough for #wores = 400,000.Sister
This is still a lot for a single machine. How much memory did it consume with 100K words? Driver alone.Parrakeet
Let us continue this discussion in chat.Sister
This is an old question, but I'm still running into this problem (spark 1.6.1) - what I found is that setting the number of features for HashingTF in the spark pipeline to 2^18-1 (one less than the default) everything works fine and I can save the entire pipeline with spark.driver.memory=1g. Using the default 2^18 causes the save operation to produce a heap space OOM error. I also tried increasing the spark.driver.memory and spark.driver.maxResultSize to 8g but that didn't help with 2^18 features - it's not clear to me if it's the driver of the workers that run out of memory.Bendix
What is your output path that you supply to .saveAsTextFile( ... ).Ruckus
T
1

There are three common causes for this problem, which may work independently or in tandem.

  1. The job returns a lot of data to the driver using something like collect. Alas, some of the SparkML code does this. If you cannot blame (2) or (3) below for the problem, it's likely the outcome of how your data interacts with the OnlineLDAOptimizer implementation.

  2. The job involves a large number of tasks, each of which returns results to the driver as part of Spark's job management (as opposed to with something like collect). Check the number of tasks in the SparkUI. Also see Exceeding `spark.driver.maxResultSize` without bringing any data to the driver Are org.apache.spark.scheduler.TaskSetManager#canFetchMoreResults or org.apache.spark.scheduler.TaskResultGetter#enqueueSuccessfulTask on the stack trace?

  3. An estimation error: Spark significantly over-estimates the size of data that's about to be returned to the driver and throws this error in order to prevent a cluster's driver from OOM. See What is spark.driver.maxResultSize? One way to test for this is to set spark.driver.maxResultSize to 0 (no limit) and see what happens.

Hope this helps!

Thar answered 17/3, 2019 at 17:46 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.