Importing cassandra table into spark via sparklyr - possible to select only some columns?
Asked Answered
A

1

3

I've been working with sparklyr to bring large cassandra tables into spark, register these with R and conduct dplyr operations on them.

I have been successfully importing cassandra tables with the code that looks like this:

# import cassandra table into spark

cass_df <- sparklyr:::spark_data_read_generic(
  sc, "org.apache.spark.sql.cassandra", "format", 
  list(keyspace = "cass_keyspace", table = "cass_table")
  ) %>% 
  invoke("load")


# register table in R

cass_tbl <- sparklyr:::spark_partition_register_df(
         sc, cass_df, name = "cass_table", repartition = 0, memory = TRUE)
       )

Some of these cassandra tables are pretty large ( > 8.5bn rows) and take a while to import/register, and some lead to memory overruns, even with 6 nodes running a total of 60 cores and 192gb RAM. However, I only typically need a few of the columns from each cassandra database.

My questions are:

  1. Is it possible to filter the cassandra database on import/registration so that it only imports some columns or so that it is filtered on the primary key (i.e. by passing SQL / CQL type queries such as SELECT name FROM cass_table WHERE id = 5)?
  2. Where would such a query go in the above code, and what form does the syntax take?

I have tried adding such a query as an additional option in the options list, i.e.:

list(. . . , select = "id")

as well as invoking it as a separate pipe before %>% invoke("load"), i.e.:

invoke("option", "select", "id") %>%

# OR

invoke("option", "query", s"select id from cass_table") %>%

But these do not work. Any suggestions?

Alcestis answered 2/3, 2017 at 15:7 Comment(0)
L
4

You can skip eager cache and select columns of interest:

session <- spark_session(sc)

# Some columns to select
cols <- list("x", "y", "z")

cass_df <- session %>% 
  invoke("read") %>% 
  invoke("format", "org.apache.spark.sql.cassandra") %>% 
  invoke("options", as.environment(list(keyspace="test"))) %>% 
  invoke("load") %>% 
  # We use select(col: String, cols* String) so the first column
  # has to be used separately. If you want only one column the third argument
  # has to be an empty list 
  invoke("select", cols[[1]], cols[2:length(cols)]) %>%
  # Standard lazy cache if you need one
  invoke("cache")

If you use predicates which can significantly reduce amount of fetched data set pushdown option to "true" (default) and use filter before caching.

If you want to pass more complex query you register temporary view and sql method:

session %>%
  invoke("read") %>% 
  ...
  invoke("load") %>% 
  invoke("createOrReplaceTempView", "some_name")

cass_df <- session %>% 
  invoke("sql", "SELECT id FROM some_name WHERE foo = 'bar'") %>%
  invoke("cache")
Luce answered 3/3, 2017 at 14:7 Comment(2)
Wonderful, this post has helped me so much... I have done something inspired from this but loading just the right columns from a CSV file. I wanted to add one may want to register case_df so it is possible to use dplyr verbs on it (since sparklyr comes with a dplyr backend). Registration is done with: R_cass_df = sdf_register(cass_df, "spark_cass_df") Then dplyr verbs can be applied, example: library("dplyr"); R_cass_df %>% filter(foo == "bar") %>% select(id)Allhallows
@Luce How to connect to remote Cassandra using host, username and password?Shoulders

© 2022 - 2024 — McMap. All rights reserved.