Thread creation on function return in Rust WASM
Asked Answered
G

1

6

I am working with Polars in a wasm environment.

I have noticed an inconsistency with the LazyFrame.collect operation where it sometimes creates threads when working with certain datasets.

Here is the code that relates to the issue

#[wasm_bindgen]
pub fn start(buff: &[u8],
    item_id:&str, 
    order_id:&str,
    item_name:&str) -> JsValue{

    let cursor = Cursor::new(buff);
    let lf = CsvReader::new(cursor).with_ignore_parser_errors(true).finish().unwrap().lazy();


    let df = lf.groupby([col(order_id)]);
    let df = df.agg([col(item_id),col(item_name)]);
    
    // Error occurs here
    let df = df.collect().unwrap();

} 

Working with a particular dataset provides me with the error:

panicked at 'failed to spawn thread: Error { kind: Unsupported, message: "operation not supported on this platform" }'

because it is attempting to spawn threads in a WASM environment.

However, with other datasets, this process would execute flawlessly. And it would not try to create the threads. The issue does not seem to be file size due to testing with various datasets.

I would like to know what part of the Lazyframe.collect operation creates this inconsistency and how to avoid it.

working.csv

Order ID,Product ID,Product Name
InvoiceNo0,Product ID0,Product Name0
InvoiceNo0,Product ID1,Product Name1
InvoiceNo0,Product ID2,Product Name2
InvoiceNo0,Product ID3,Product Name3
InvoiceNo0,Product ID4,Product Name4
InvoiceNo0,Product ID5,Product Name5

notworking.csv

Order ID,Product ID,Product Name
B0000001,P0001,Product - 0001
B0000001,P0002,Product - 0002
B0000001,P0003,Product - 0003
B0000001,P0004,Product - 0004
B0000001,P0005,Product - 0005
B0000002,P0006,Product - 0006

The Polars fork that allows wasm is provided by https://github.com/universalmind303/polars/tree/wasm

You can see the full project here, as well as both CSV files: https://github.com/KivalM/lazyframe-min-test

EDIT: Output of describe_plan()

working dataset

    [col("Product ID"), col("Product Name")] BY [col("Order ID")] FROM DATAFRAME(in-memory): ["Order ID", "Product ID", "Product Name"];
    project */3 columns |   details: None;
    selection: "None"

not working dataset

    [col("Product ID"), col("Product Name")] BY [col("Order ID")] FROM DATAFRAME(in-memory): ["Order ID", "Product ID", "Product Name"];
    project */3 columns |   details: None;
    selection: "None"

Output of schema()

working dataset

name: Order ID, data type: Utf8
name: Product ID, data type: Utf8
name: Product Name, data type: Utf8

not working dataset

name: Order ID, data type: Utf8
name: Product ID, data type: Utf8
name: Product Name, data type: Utf8

output describe_optimized_plan():

    [col("Product ID"), col("Product Name")] BY [col("Order ID")] FROM DATAFRAME(in-memory): ["Product ID", "Product Name", "Order ID"];
    project 3/3 columns |   details: Some([col("Product ID"), col("Product Name"), col("Order ID")]);
    selection: "None"

EDIT: After a closer look at the source code. the problem doesnt seem to be directly from any polars code. I have tracked the issue down to polars-lazy/src/physical_plan/executors/groupby.rs Function

impl Executor for GroupByExec {
    fn execute

Which then returns a value from

groupby_helper(df,keys,&self.aggs,self.apply.as_ref(),state,self.maintain_order,self.slice,)

However, the groupby_helper function runs to completion, and the dataframe is successfully created. The error appears when the dataframe is being returned from groupby_helper to fn execute. It is odd that a thread is attempting to be created only when this function returns. Does there exist something in RUST WASM that could cause behaviour like this?

Gwendolin answered 27/5, 2022 at 18:52 Comment(6)
Does explicitly setting the environment variable POLARS_MAX_THREADS=1 fix this? Curious whether polars switches to non-threaded mode if it's allowed only one thread, or if it actually spawns a single thread.Maecenas
many of polars methods, such as groupby will often use parallel threading for only certain datatypes. Can you confirm that both csv's have the same schema? you could also run a describe_plan on the lazyframe to see if that provides any insights.Niobous
@CoryGrinstead Unfortunately the output of both seems to be the same. Both outputs have been added to the post verbatim.Gwendolin
@Maecenas Correct me if I am wrong, but I believe this is not possible in a wasm environment.Gwendolin
What does describe_optimized_plan() say? This is what will actually get executed by the engine.Maecenas
@Maecenas unfortunately the outputs are the same. It has been added to my post.Gwendolin
N
2

so it looks like there is a std::thread operation happening with the groupbys that I missed when creating the branch.

impl Drop for GroupsIdx {
    fn drop(&mut self) {
        let v = std::mem::take(&mut self.all);
        // ~65k took approximately 1ms on local machine, so from that point we drop on other thread
        // to stop query from being blocked
        if v.len() > 1 << 16 {
            std::thread::spawn(move || drop(v));
        } else {
            drop(v);
        }
    }
}

The dataset size is what is determining the thread spawn.

any group greater than 1 << 16 (~65k) will spawn a thread.

Feature flagging that impl to only compile on non-wasm targets should fix your issue.

Niobous answered 1/6, 2022 at 14:8 Comment(1)
Thank you, that seems to fix the issues I've been having. I would never have guessed to look at the drop impl for a thread being created.Gwendolin

© 2022 - 2025 — McMap. All rights reserved.