Issue while using py-polars sink_parquet method on a LazyFrame
Asked Answered
C

1

9

I am getting the below error while using sink_parquet on a LazyFrame. Earlier I was using .collect() on the output of the scan_parquet() to convert the result into a DataFrame but unfortunately it is not working with larger than RAM datasets. Here is the error I received -

PanicException: sink_parquet not yet supported in standard engine. Use 'collect().write_parquet()'

I am trying to write the LazyFrame (the output from scan_parquet) into a local file after I added some filter and join conditions on the LazyFrame. It seems the error is coming from the below location -

https://github.com/pola-rs/polars/blob/master/py-polars/polars/internals/lazyframe/frame.py#L1235 (In Python)

https://github.com/pola-rs/polars/blob/master/polars/polars-lazy/src/physical_plan/planner/lp.rs#L154 (In Rust) .

I have tried updating to the latest version 0.15.16 0.16.1 but this issue still exists .

Sample code :

pl.scan_parquet("path/to/file1.parquet")
.select([
    pl.col("col2"),
    pl.col("col2").apply( lambda x : ...)
    .alias("splited_levels"),
    ..followed by more columns and .alias()
])
.join(<another lazyframe>,on="some key",how="inner")
.filter(...)
.filter(..)
..followed by some more filters
.sink_parquet("path/to/result2.parquet")

The parquet file should be written in local system. Instead I am getting the below error -

PanicException: sink_parquet not yet supported in standard engine. Use 'collect().write_parquet()'

Here are the details of the installed packages after I used polars.show_versions() -

--- Version info----
Polars : 0.15.16
Index type : UInt32
Platform : Linux-4.15.0-191-generic-x86_64-with-glibc2.28
Python: 3.9.16
[GCC 8.3.0]
--- Optional dependencies---
pyarrow : 11.0.0
pandas : not installed
numpy : 1.24.1
fsspec : 2023.1.0
connectorx : not installed
xlsx2csv : not installed
deltalake: not installed
matplotlib : not installed

Update : I have raised a github issue here for the same and it seems all types of queries are not supported for streaming at this moment . So I am looking for a work around in this case or any alternative way of doing this with polars https://github.com/pola-rs/polars/issues/6603

Calling answered 31/1, 2023 at 17:7 Comment(3)
The latest version isn't 0.15.16. It's (at least) 0.16.1.Sedgewick
@DeanMacGregor Yes I tried with that one as well. I raised a github issue here github.com/pola-rs/polars/issues/6603 . As per them this type of query is not supported yet for streaming .Calling
I only run into the problem when I read from a hadoop filesystem, if I do the read/writes on a local disk it seems to work.Buttery
H
2

I don't know what's going on under the hood, but one thing I've found to work for me has been setting .collect(streaming=True) and even setting pl.Config.set_streaming_chunk_size() if it still blows up memory.

We can see that pl.cocat_list() isn't currently supported by streaming, which means we won't be able to .sink_parquet():

(pl.LazyFrame({'a':'word', 'b': 'word2'})
 .with_columns(joined = pl.concat_list(pl.col('a'), 
                                       pl.col('b'))
              )
 .explain(streaming=True))
 WITH_COLUMNS:
 [col("a").list.concat([col("b")]).alias("joined")]
  --- STREAMING
DF ["a", "b"]; PROJECT */2 COLUMNS; SELECTION: "None"  --- END STREAMING

    DF []; PROJECT */0 COLUMNS; SELECTION: "None"

So instead, I'll use the following:

pl.Config.set_streaming_chunk_size(1000)

(pl.LazyFrame({'a':'word', 'b': 'word2'})
 .with_columns(joined = pl.concat_list(pl.col('a'), 
                                       pl.col('b'))
              )
 .collect(streaming=True)
 .write_parquet('test.parquet')
)

In practice, this seems to stream as much as possible, and resort to collecting in memory once it gets to those operations that require it. (I can watch the memory spike up as it gets to an expensive row/set, and then settle down until the next large portion)

On particularly large data, such as a list or string almost too big to fit into memory, setting a small chunk size sometimes allows me to write to file.

Hornmad answered 2/3 at 11:40 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.