Spark 'limit' does not run in parallel?
Asked Answered
T

3

2

I have a simple join where I limit on of the sides. In the explain plan I see that before the limit is executed there is an ExchangeSingle operation, indeed I see that at this stage there is only one task running in the cluster.

This of course affects performance dramatically (removing the limit removes the single task bottleneck but lengthens the join as it works on a much larger dataset).

Is limit truly not parallelizable? and if so- is there a workaround for this?

I am using spark on Databricks cluster.

Edit: regarding the possible duplicate. The answer does not explain why everything is shuffled into a single partition. Also- I asked for advice to work around this issue.

Togoland answered 22/7, 2018 at 13:56 Comment(7)
Possible duplicate of Towards limiting the big RDDCurse
why everything is shuffled into - because this is the only reasonable implementation which ensures a single pass over data and exact results. advice to work around this issue. - consider relaxing the requirements and sample, not limit?Curse
@user8371915, at first I thought that Spark knows the exact size of each partition in advance- but I think I got it wrong. Suppose there is a filter followed by a limit. Spark does not know the size of the partition in advance and has to evaluate it in order to take elements from there. I guess spark could speculate and evaluate more than one partition concurrently but the designers chose not to go into it. Did I understand correctly?Togoland
@user8371915, I switched the limit to sample and it worked like a charm. I you post it as an answer I'll accept it. If you can through in an explanation on the implementation details as a bonus for the community, all the better. Thanks.Togoland
In general Spark doesn't know the size of the partition. It might in some cases know input size (from input splits) or number of records (if cost based optimizer or other form of computing statistics is used - this requires additional scan), but it is not the case in general. You could write some form of optimizer rule, but it is hard to generalize.Curse
@user8371915, thanks again. Reminding you to post this as an answer if you wish. I will accept it.Togoland
I don't think there is a need for that, and I sure there is a better duplicate target out there - just my search skills are not so good today. I am glad my comment helped you, and I don't think that the question really deserves the downvote, so don't mind me and delete it, or if you prefer self answer :)Curse
T
1

Following the advice given by user8371915 in the comments, I used sample instead of limit. And it uncorked the bottleneck.

A small but important detail: I still had to put a predictable size constraint on the result set after sample, but sample inputs a fraction, so the size of the result set can very greatly depending on the size of the input.

Fortunately for me, running the same query with count() was very fast. So I first counted the size of the entire result set and used it to compute the fraction I later used in the sample.

Togoland answered 24/7, 2018 at 6:25 Comment(0)
W
0

Workaround for parallelization after limit: .repartition(200)

This redistributes the data again so that you can work in parallel.

Whish answered 25/3, 2022 at 9:21 Comment(0)
C
0

Answer

Spark's limit doesn't run in parallel.

Reason

There is some physical operator in Spark for limit logic:

  1. CollectLimitExec:
  • collect data to a single partition, not work in parallel, but "perform limit incrementally".

  • only be used when a logical Limit operation is the final operator in an logical plan, which happens when the user is collecting results back to the driver. eg: spark.sql("select * from x limit 100").collect()

  1. LocalLimitExec & GlobalLimitExec
  • they work together

    LocalLimitExec: Take the first limit elements of each child partition, but do not collect or shuffle them.

    GlobalLimitExec: Take the first limit elements of the child’s single output partition.

    There is an Exchange(shuffle) between them.

  • the global limit step, work in single partition, not parallel

  1. other Exec: CollectTailExec / TakeOrderedAndProjectExec

Solution

random

SELECT * FROM test TABLESAMPLE (50 PERCENT)

select * from x where rand() < 0.01

df.sample(0.01)  or  rdd.sample(0.01)

mapPartitions and take

take directly

df.mapPartitions((a)=>a.take(2853557))

take after countByPartitions

   //  ①  统计每个分区内行数 / countByPartitions
    val x = df5.mapPartitions((a) => {
      val pid = TaskContext.getPartitionId()
      Iterator((pid, a.size))
    })
    val countByPart = x.collectAsList()
    print(countByPart) //[(0,400), (1,400), (2,400), (3,400), (4,400)]

   //  ②  分配各分区应该take的数量. / allocate
    var limit = 900
    val takeByPart = new Array[Int](countByPart.size)
    for (a <- 0 until countByPart.size) {
      val take = if (limit > 0) {
        Math.min(limit, countByPart.get(a)._2)
      } else {
        0
      }
      limit = limit - take
      takeByPart(a) = take
    }
    print(takeByPart.mkString("(", ", ", ")")) //(400, 400, 100, 0, 0)
    val takeByPartBC = spark.sparkContext.broadcast(takeByPart)

   //  ③  分区take结果
    val result = df5.mapPartitions((a) => {
      val pid = TaskContext.getPartitionId()
      val take = takeByPartBC.value(pid)
      a.take(take)
    })
    assert(result.count() == 900)


go to my blog for further reading

Complexity answered 10/11, 2023 at 7:18 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.