Skip/Take with Spark SQL
Asked Answered
B

2

3

How would one go about implementing a skip/take query (typical server side grid paging) using Spark SQL. I have scoured the net and can only find very basic examples such as these here: https://databricks-training.s3.amazonaws.com/data-exploration-using-spark-sql.html

I don't see any concept of ROW_NUMBER() or OFFSET/FETCH like with T-SQL. Does anyone know how to accomplish this?

Something like:

scala > csc.sql("select * from users skip 10 limit 10").collect()
Brazilein answered 15/5, 2015 at 12:56 Comment(0)
C
2

Try something like this:

val rdd = csc.sql("select * from <keyspace>.<table>")
val rdd2 = rdd.view.zipWithIndex()
rdd2.filter(x => { x._2 > 5 && x._2 < 10;}).collect()
rdd2.filter(x => { x._2 > 9 && x._2 < 12;}).collect()
Coed answered 16/5, 2015 at 0:22 Comment(5)
Are you sure view (in rdd.view) should be there? It only works if I omit view.Brazilein
This is great, but how does it perform? Does it have to pull all the records from the database or does it only query them when collect() is called for the first time? I would think it pulls all the records from the database when you call zipWithIndex()... how else would it number all the records? What happens if you have 2M records, etc. Thanks!Brazilein
That's what view is forCoed
Yeah, but view doesn't work like I was saying. I had to remove it. You get this: 64: error: value view is not a member of org.apache.spark.sql.SchemaRDDBrazilein
var rdd = csc.sql("select * from demo.users"); var rdd2 = rdd.view.zipWithIndex() <== view here doesn't workBrazilein
B
1

I found that both sparksql and dataframe don't have limit with offset. May be in distributed data is random distributed, so limit with offset only have meanings in order by limit. we can use window function to implement it:

1. Consider we want to get product, of which revenue rank from 2 to 5

2. implementation

windowSpec = Window.partitionBy().orderBy(df.revenue.asc())
result = df.select(
    "product",
    "category",
    "revenue",
    row_number().over(windowSpec).alias("row_number"),
    dense_rank().over(windowSpec).alias("rank"))
    result.show()
    result = result.filter((col("rank") >= start) & (col("rank") <= end))
    result.show()

please refer to https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html

Burtonburty answered 1/12, 2016 at 6:28 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.