I've been working with sparklyr
to bring large cassandra tables into spark, register these with R and conduct dplyr
operations on them.
I have been successfully importing cassandra tables with the code that looks like this:
# import cassandra table into spark
cass_df <- sparklyr:::spark_data_read_generic(
sc, "org.apache.spark.sql.cassandra", "format",
list(keyspace = "cass_keyspace", table = "cass_table")
) %>%
invoke("load")
# register table in R
cass_tbl <- sparklyr:::spark_partition_register_df(
sc, cass_df, name = "cass_table", repartition = 0, memory = TRUE)
)
Some of these cassandra tables are pretty large ( > 8.5bn rows) and take a while to import/register, and some lead to memory overruns, even with 6 nodes running a total of 60 cores and 192gb RAM. However, I only typically need a few of the columns from each cassandra database.
My questions are:
- Is it possible to filter the cassandra database on import/registration so that it only imports some columns or so that it is filtered on the primary key (i.e. by passing
SQL
/CQL
type queries such asSELECT name FROM cass_table WHERE id = 5
)? - Where would such a query go in the above code, and what form does the syntax take?
I have tried adding such a query as an additional option in the options list, i.e.:
list(. . . , select = "id")
as well as invoking it as a separate pipe before %>% invoke("load")
, i.e.:
invoke("option", "select", "id") %>%
# OR
invoke("option", "query", s"select id from cass_table") %>%
But these do not work. Any suggestions?
case_df
so it is possible to use dplyr verbs on it (since sparklyr comes with a dplyr backend). Registration is done with:R_cass_df = sdf_register(cass_df, "spark_cass_df")
Then dplyr verbs can be applied, example:library("dplyr"); R_cass_df %>% filter(foo == "bar") %>% select(id)
– Allhallows