Slowdown with repeated calls to spark dataframe in memory
Asked Answered
O

1

8

Say I have 40 continuous (DoubleType) variables that I've bucketed into quartiles using ft_quantile_discretizer. Identifying the quartiles on all of the variables is super fast, as the function supports execution of multiple variables at once.

Next, I want to one hot code those bucketed variables, but there is no functionality currently supported to one hot code all of those variables with a single call. So I'm piping ft_string_indexer, ft_one_hot_encoder, and sdf_separate_column for each of the bucketed variables one at a time, by looping through the variables. This gets the job done. However, as the loop progresses, it slows down considerably. I'm thinking it's running out of memory, but can't figure out how to program this so that it executes with the same speed across the variables.

If q_vars is a character array of variable names (say 40 of them) for continuous variables, how can I code this up in a more spark-efficient way?

for (v in q_vars) {
   data_sprk_q<-data_sprk_q %>% 
       ft_string_indexer(v,paste0(v,"b"),"keep",string_order_type = "alphabetAsc") %>%
       ft_one_hot_encoder(paste0(v,"b"),paste0(v,"bc")) %>%
       sdf_separate_column(paste0(v,"bc"),into=q_vars_cat_list[[v]]) 
}

I also tried executing as a single massive pipeline with all of the variables referenced, but that too didn't solve the issue, so I'm thinking it doesn't have anything to do with the loop itself.

test_text<-paste0("data_sprk_q<-data_sprk_q %>% ", paste0("ft_string_indexer('",q_vars,"',paste0('",q_vars,"','b'),'keep',string_order_type = 'alphabetAsc') %>% ft_one_hot_encoder(paste0('",q_vars,"','b'),paste0('",q_vars,"','bc')) %>% sdf_separate_column(paste0('",q_vars,"','bc'),into=",q_vars_cat_list,")",collapse=" %>% "))
eval(parse(text=test_text))

Any help would be appreciated.

Otisotitis answered 21/8, 2018 at 23:23 Comment(2)
Why do you apply StringIndexer on a data that you claim to be discretized? And why do you use sdf_separate_column?Maniac
@user6910411 I'm using sdf_separate_column to break out the columns that are otherwise stored as a list. In my model, I'd like to allow for flexibility in which buckets to include, and from my understanding, I wouldn't be able to do that if I left the one hot encoded list in (unless I applied some regularization, dropping the coefficients to 0). I'm applying the StringIndexer to have control over which buckets get which names since, at least to my understanding, the default naming with separating columns is relative to the frequency of the buckets.Otisotitis
M
8

In general some (sometimes substantial) slowdown with long ML Pipeline is expected, as a result of worse than linear complexity of the Catalyst optimizer. Short of splitting the process into multiple pipelines, and breaking the lineage in between (either using checkpoints and writing data to persistent storage and loading it back) there is not much you can about it at the moment.

However you current code adds a number of problems on top of that:

  • Unless you use more than 10 buckets StringIndexer

    ft_string_indexer(v ,paste0(v, "b"), "keep", string_order_type = "alphabetAsc")
    

    just duplicates the labels assigned by QuantileDiscretizer. With larger number of levels behavior becomes even less useful when using lexicographic order.

  • Applying One-Hot-Encoding might not be required at all (and in the worst case scenario can be harmful), depending on the downstream process, and even with linear models, might not be strictly necessary (you could argue that assigned labels are valid ordinals, and recording as nominal values, and increasing dimensionality is not desired outcome).

  • However the biggest problem is application of sdf_separate_column. It

    • Increases the cost of computing the execution plan by increasing the number of expressions.
    • Increases amount of memory required for processing by converting sparse data into dense.
    • Internally sparklyr uses UserDefinedFunction on each index, effectively causing reapeated allocation, decoding and garbage collection for the same row putting a lot of pressure on the cluster.
    • Last but not least it discards column metadata, extensively used by Spark ML.

    I would strongly advise against using this function here. Based on your comments it looks like you want to subset columns before passing the result to some other algorithm - for that you can use VectorSlicer.

Overall you can rewrite your pipeline as

set.seed(1)

df <- copy_to(sc, tibble(x=rnorm(100), y=runif(100), z=rpois(100, 1)))

input_cols <- colnames(df)
discretized_cols <- paste0(input_cols, "_d")
encoded_cols <- paste0(discretized_cols, "_e") %>% setNames(discretized_cols)

discretizer <- ft_quantile_discretizer(
  sc, input_cols = input_cols, output_cols = discretized_cols, num_buckets = 10
)
encoders <- lapply(
  discretized_cols, 
  function(x) ft_one_hot_encoder(sc, input_col=x, output_col=encoded_cols[x])
)

transformed_df <- do.call(ml_pipeline, c(list(discretizer), encoders)) %>%
  ml_fit(df) %>% 
  ml_transform(df)

and apply ft_vector_slicer when needed. For example to take values corresponding to the first, third and sixth bucket from x you can:

transformed_df %>% 
  ft_vector_slicer(
    input_col="x_d_e", output_col="x_d_e_s", indices=c(0, 2, 5)) 
Maniac answered 24/8, 2018 at 19:25 Comment(2)
Thank you for the very thorough response! This is very cool. Much appreciated.Otisotitis
I have a couple of follow up questions to your example, which I'm trying to generalize to my application. (1) Why, in the do.call(ml_pipeline, ...) statement, does discretizer need to be within a list, while encoders is not? When I try to create another pipeline with a similar method us ft_vector_slicer, I get an error saying argument "x" is missing. (2) How would you add the ft_vector_slicer instructions to a pipeline if the vector slice instructions for each encoded variable lives within a list?Otisotitis

© 2022 - 2024 — McMap. All rights reserved.