R and sparklyr: Why is a simple query so slow?
Asked Answered
S

3

10

This is my code. I run it in databricks.

library(sparklyr)
library(dplyr)
library(arrow)

sc <- spark_connect(method = "databricks")
tbl_change_db(sc, "prod")
trip_ids <- spark_read_table(sc, "signals",memory=F) %>% 
            slice_sample(10) %>% 
            pull(trip_identifier)

The code is extremly slow and takes in hour to run albeit I am only querying 10 samples. Why is that? Is there a way to improve the performance?

Thank you!

Schreiner answered 30/3, 2023 at 13:19 Comment(0)
M
6

It seems like you're using dplyr's slice_sample function to sample your dataset and then selecting some single column from there. The problem is that the Spark engine does not know about this: your sampling happens in R. This means that the full dataset is completely read from wherever it is stored, and completely sent to your R engine to only be subsampled in there.

What you need to do is to get your subset and column within Spark itself. You can do that with the select (to grab a single column) and the head (to grab N rows) functions:

trip_ids <- head(select(spark_read_table(sc, "signals",memory=F), trip_identifier), 10L)
Minica answered 2/4, 2023 at 9:28 Comment(1)
spark is able to push down transofrmations its catalyst engine knows about. It doesn't know about anything defined outside of spark. They're a blackbox so it cannot make any optimizations. Here's some reading on it.Sartorius
T
2

The other answer and comment already covered why the query was taking so long (you were pulling the entire table into the driver/R earlier than you wanted to), but I wanted to include an example that truly samples the data and another approach that gives you more control (i.e., one that uses SparkSQL). When working with Spark, I try to do all my heavy lifting as actual SQL queries so I would prefer option 2, but I included both in-case one is more helpful than the other.

library(sparklyr)
library(dplyr)

sc = spark_connect(method = "databricks")
tbl_change_db(sc, "prod")

# Option 1, using a fraction (proportion in this case) to pull a random sample
spark_read_table(sc, "signals", memory = FALSE) %>%
  select(trip_identifier) %>%
  sdf_sample(fraction = .0001, replacement = FALSE, seed = NULL) %>%
  collect() %>% #this is not necessary, but it makes the pull-down to R explicit
  pull(trip_identifier)


# Option 2, using SparkSQL to run the query as you intended (sampling 10 rows)
sc %>%
  sdf_sql("SELECT trip_identifier FROM signals TABLESAMPLE (10 ROWS)") %>%
  collect() %>% #this is not necessary, but it makes the pull-down to R explicit
  pull(trip_identifier)
Theodora answered 4/4, 2023 at 12:45 Comment(0)
B
0
  1. I think you should use Use spark_tbl() instead of spark_read_table(): The spark_tbl() function in sparklyr provides a more efficient way to create a Spark DataFrame by directly referencing the. table in Spark's catalog, without reading the entire table into R memory
  2. And also you use sample_n() instead of slice_sample() for directly sample a fixed number of rows from Spark DataFrame

Like this example

trip_ids <- spark_tbl(sc, "prod.signals") %>%
  sample_n(10) %>% 
  pull(trip_identifier)
  1. Also you can use arrow::arrow_serialize() and arrow::arrow_deserialize() functions to serialize and deserialize data between R and Spark.

I hope it should be usable

Boardwalk answered 9/4, 2023 at 5:44 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.