How to apply a function to multiple columns of a polars DataFrame in Rust
Asked Answered
C

5

3

I'd like to apply a user-define function which takes a few inputs (corresponding some columns in a polars DataFrame) to some columns of a polars DataFrame in Rust. The pattern that I'm using is as below. I wonder is this the best practice?

fn my_filter_func(col1: &Series, col2: &Series, col2: &Series) -> ReturnType {
    let it = (0..n).map(|i| {
        let col1 = match col.get(i) {
            AnyValue::UInt64(val) => val,
            _ => panic!("Wrong type of col1!"),
        };
        // similar for col2 and col3
        // apply user-defined function to col1, col2 and col3
    }
    // convert it to a collection of the required type
}
Cid answered 25/5, 2022 at 6:27 Comment(1)
this might help #70387339Startle
U
5

You can downcast the Series to the proper type you want to iterate over, and then use rust iterators to apply your logic.

fn my_black_box_function(a: f32, b: f32) -> f32 {
    // do something
    a
}

fn apply_multiples(col_a: &Series, col_b: &Series) -> Float32Chunked {
    match (col_a.dtype(), col_b.dtype()) {
        (DataType::Float32, DataType::Float32) => {
            let a = col_a.f32().unwrap();
            let b = col_b.f32().unwrap();

            a.into_iter()
                .zip(b.into_iter())
                .map(|(opt_a, opt_b)| match (opt_a, opt_b) {
                    (Some(a), Some(b)) => Some(my_black_box_function(a, b)),
                    _ => None,
                })
                .collect()
        }
        _ => panic!("unpexptected dtypes"),
    }
}

Lazy API

You don't have to leave the lazy API to be able to access my_black_box_function.

We can collect the columns we want to apply in a Struct data type and then apply a closure over that Series.

fn apply_multiples(lf: LazyFrame) -> Result<DataFrame> {
    df![
        "a" => [1.0, 2.0, 3.0],
        "b" => [3.0, 5.1, 0.3]
    ]?
    .lazy()
    .select([concat_lst(["col_a", "col_b"]).map(
        |s| {
            let ca = s.struct_()?;

            let b = ca.field_by_name("col_a")?;
            let a = ca.field_by_name("col_b")?;
            let a = a.f32()?;
            let b = b.f32()?;

            let out: Float32Chunked = a
                .into_iter()
                .zip(b.into_iter())
                .map(|(opt_a, opt_b)| match (opt_a, opt_b) {
                    (Some(a), Some(b)) => Some(my_black_box_function(a, b)),
                    _ => None,
                })
                .collect();

            Ok(out.into_series())
        },
        GetOutput::from_type(DataType::Float32),
    )])
    .collect()
}
Uvulitis answered 26/5, 2022 at 6:8 Comment(9)
Does the lazy API loads rows when needed or does it try to load the whole file at once? It seems to me that columns are loaded and converted to ChunkedArrays, which means that all rows are loaded into memory?Cid
I don't think concat_lst is defined anywhereStartle
Not all feature flags are activated by default. It does exist, but you need to activate the list feature.Uvulitis
If the whole file is loaded into memory any way, there's really no advantage of using LazyFrame APIs in this case, right?Cid
Your question does not mention a file. That would be another question.Uvulitis
Second example returns an error Err(SchemaMisMatch(Owned("Series of dtype: List(Float64) != Struct",),),)Aden
Instead of concat_lst need to use as_structAden
Yep, need to use as_struct(["col_a", "col_b"]) and also enable feature dtype-structThreewheeler
Probably the author meant the function concat_list (docs.rs/polars/0.35.4/polars/prelude/fn.concat_list.html), at least on polars version 0.35.4. It also does not work well when both original columns are lists, because they get flattened into a single list.Execute
S
3

The solution I found working for me is with map_multiple(my understanding - this to be used if no groupby/agg) or apply_multiple(my understanding - whenerver you have groupby/agg). Alternatively, you could also use map_many or apply_many. See below.

use polars::prelude::*;
use polars::df;

fn main() {
    let df = df! [
        "names" => ["a", "b", "a"],
        "values" => [1, 2, 3],
        "values_nulls" => [Some(1), None, Some(3)],
        "new_vals" => [Some(1.0), None, Some(3.0)]
    ].unwrap();

    println!("{:?}", df);

    //df.try_apply("values_nulls", |s: &Series| s.cast(&DataType::Float64)).unwrap();

    let df = df.lazy()
        .groupby([col("names")])
        .agg( [
            total_delta_sens().sum()
        ]
        );

    println!("{:?}", df.collect());
}

pub fn total_delta_sens () -> Expr {
    let s: &mut [Expr] = &mut [col("values"), col("values_nulls"),  col("new_vals")];

    fn sum_fa(s: &mut [Series])->Result<Series>{
        let mut ss = s[0].cast(&DataType::Float64).unwrap().fill_null(FillNullStrategy::Zero).unwrap().clone();
        for i in 1..s.len(){
            ss = ss.add_to(&s[i].cast(&DataType::Float64).unwrap().fill_null(FillNullStrategy::Zero).unwrap()).unwrap();
        }
        Ok(ss) 
    }

    let o = GetOutput::from_type(DataType::Float64);
    map_multiple(sum_fa, s, o)
}

Here total_delta_sens is just a wrapper function for convenience. You don't have to use it.You can do directly this within your .agg([]) or .with_columns([]) : lit::<f64>(0.0).map_many(sum_fa, &[col("norm"), col("uniform")], o)

Inside sum_fa you can as Richie already mentioned downcast to ChunkedArray and .iter() or even .par_iter() Hope that helps

Startle answered 27/5, 2022 at 7:46 Comment(0)
S
2

This example uses Rust Polars version = "0.30"

Apply the same function on the selected columns:

lazyframe
.with_columns([
    cols(col_name1, col_name2, ..., col_nameN)
   .apply(|series| 
       some_function(series), 
       GetOutput::from_type(DataType::Float64)
   )
]);

Or apply many functions to many columns with a much more flexible and powerful method:

lazyframe
.with_columns([
    some_function1(col_name1, col_name2, ..., col_nameN), 
    some_function2(col_name1, col_name2, ..., col_nameM),
    some_functionN(col_name1, col_name2, ..., col_nameZ),
]);

The Cargo.toml:

[dependencies]
polars = { version = "0.30", features = [
    "lazy", # Lazy API
    "round_series", # round underlying float types of Series
] }

And the main() function:

use std::error::Error;

use polars::{
    prelude::*,
    datatypes::DataType,
};

fn main()-> Result<(), Box<dyn Error>> {

    let dataframe01: DataFrame = df!(
        "integers"  => &[1, 2, 3, 4, 5, 6],
        "float64 A" => [23.654, 0.319, 10.0049, 89.01999, -3.41501, 52.0766],
        "options"   => [Some(28), Some(300), None, Some(2), Some(-30), None],
        "float64 B" => [9.9999, 0.399, 10.0061, 89.0105, -3.4331, 52.099999],
    )?;

    println!("dataframe01: {dataframe01}\n");

    // let selected: Vec<&str> = vec!["float64 A", "float64 B"];

    // Example 1:
    // Format only the columns with float64
    // input: two columns --> output: two columns

    let lazyframe: LazyFrame = dataframe01
        .lazy()
        .with_columns([
            //cols(selected)
            all()
            .apply(|series| 
                round_float64_columns(series, 2),
                GetOutput::same_type()
             )
         ]);

    let dataframe02: DataFrame = lazyframe.clone().collect()?;

    println!("dataframe02: {dataframe02}\n");

    let series_a: Series = Series::new("float64 A", &[23.65, 0.32, 10.00, 89.02, -3.42, 52.08]);
    let series_b: Series = Series::new("float64 B", &[10.00,  0.4, 10.01, 89.01, -3.43, 52.1]);

    assert_eq!(dataframe02.column("float64 A")?, &series_a);
    assert_eq!(dataframe02.column("float64 B")?, &series_b);

    // Example 2:
    // input1: two columns --> output: one new column
    // input2: one column  --> output: one new column
    // input3: two columns --> output: one new column

    let lazyframe: LazyFrame = lazyframe
        .with_columns([
            apuracao1("float64 A", "float64 B", "New Column 1"),
            apuracao2("float64 A", "New Column 2"),
            (col("integers") * lit(10) + col("options")).alias("New Column 3"),
         ]);

    println!("dataframe03: {}\n", lazyframe.collect()?);

    Ok(())
}

pub fn round_float64_columns(series: Series, decimals: u32) -> Result<Option<Series>, PolarsError> {
    match series.dtype() {
        DataType::Float64 => Ok(Some(series.round(decimals)?)),
        _ => Ok(Some(series))
    }
}

fn apuracao1(name_a: &str, name_b: &str, new: &str) -> Expr {
    (col(name_a) * col(name_b) / lit(100))
    //.over("some_group")
    .alias(new)
}

fn apuracao2(name_a: &str, new: &str) -> Expr {
    (lit(10) * col(name_a) - lit(2))
    //.over("some_group")
    .alias(new)
}

The output:

dataframe01: shape: (6, 4)
┌──────────┬───────────┬─────────┬───────────┐
│ integers ┆ float64 A ┆ options ┆ float64 B │
│ ---      ┆ ---       ┆ ---     ┆ ---       │
│ i32      ┆ f64       ┆ i32     ┆ f64       │
╞══════════╪═══════════╪═════════╪═══════════╡
│ 1        ┆ 23.654    ┆ 28      ┆ 9.9999    │
│ 2        ┆ 0.319     ┆ 300     ┆ 0.399     │
│ 3        ┆ 10.0049   ┆ null    ┆ 10.0061   │
│ 4        ┆ 89.01999  ┆ 2       ┆ 89.0105   │
│ 5        ┆ -3.41501  ┆ -30     ┆ -3.4331   │
│ 6        ┆ 52.0766   ┆ null    ┆ 52.099999 │
└──────────┴───────────┴─────────┴───────────┘

dataframe02: shape: (6, 4)
┌──────────┬───────────┬─────────┬───────────┐
│ integers ┆ float64 A ┆ options ┆ float64 B │
│ ---      ┆ ---       ┆ ---     ┆ ---       │
│ i32      ┆ f64       ┆ i32     ┆ f64       │
╞══════════╪═══════════╪═════════╪═══════════╡
│ 1        ┆ 23.65     ┆ 28      ┆ 10.0      │
│ 2        ┆ 0.32      ┆ 300     ┆ 0.4       │
│ 3        ┆ 10.0      ┆ null    ┆ 10.01     │
│ 4        ┆ 89.02     ┆ 2       ┆ 89.01     │
│ 5        ┆ -3.42     ┆ -30     ┆ -3.43     │
│ 6        ┆ 52.08     ┆ null    ┆ 52.1      │
└──────────┴───────────┴─────────┴───────────┘

dataframe03: shape: (6, 7)
┌──────────┬───────────┬─────────┬───────────┬──────────────┬──────────────┬──────────────┐
│ integers ┆ float64 A ┆ options ┆ float64 B ┆ New Column 1 ┆ New Column 2 ┆ New Column 3 │
│ ---      ┆ ---       ┆ ---     ┆ ---       ┆ ---          ┆ ---          ┆ ---          │
│ i32      ┆ f64       ┆ i32     ┆ f64       ┆ f64          ┆ f64          ┆ i32          │
╞══════════╪═══════════╪═════════╪═══════════╪══════════════╪══════════════╪══════════════╡
│ 1        ┆ 23.65     ┆ 28      ┆ 10.0      ┆ 2.365        ┆ 234.5        ┆ 38           │
│ 2        ┆ 0.32      ┆ 300     ┆ 0.4       ┆ 0.00128      ┆ 1.2          ┆ 320          │
│ 3        ┆ 10.0      ┆ null    ┆ 10.01     ┆ 1.001        ┆ 98.0         ┆ null         │
│ 4        ┆ 89.02     ┆ 2       ┆ 89.01     ┆ 79.236702    ┆ 888.2        ┆ 42           │
│ 5        ┆ -3.42     ┆ -30     ┆ -3.43     ┆ 0.117306     ┆ -36.2        ┆ 20           │
│ 6        ┆ 52.08     ┆ null    ┆ 52.1      ┆ 27.13368     ┆ 518.8        ┆ null         │
└──────────┴───────────┴─────────┴───────────┴──────────────┴──────────────┴──────────────┘

Shigella answered 3/6, 2023 at 2:56 Comment(0)
E
1

Here's another variant that I built based on @ritchie46's answer. It's not as elegant and less efficient memory-wise (and probably also compute-wise), but maybe it helps understanding the big picture.

The UDF (user-defined function) concatenates a string column with an integer column.

use polars::prelude::*;

pub fn udf(series: Series) ->  Result<Option<Series>, PolarsError> {
    let mut result: Vec<String> = vec![];
    for struct_ in series.iter() {
        let mut iter = struct_._iter_struct_av();
        let first = iter.next().unwrap();
        let second = iter.next().unwrap();

        let a = first.get_str().unwrap();
        let b = second.try_extract::<i32>().unwrap();

        // Here you have your individual values
        println!("{:?}, {:?}",a, b);

        let concat = format!("{}-{}", a, b);
        result.push(concat);
    }
    let output: ChunkedArray<Utf8Type> = result.into_iter().collect(); 
    return Ok(Some(output.into_series()));
}


fn main() {
    let stub_output_type = GetOutput::from_type(DataType::Float64);
    let mut df = df![
        "a" => ["x", "y"],
        "b" => [1, 2],
    ].unwrap();

    df = df.lazy().with_columns(
        [
            as_struct(vec![col("a"), col("b")])
                .apply(|s| udf(s), stub_output_type)
                .alias("new_column")
        ]
    ).collect().unwrap();

    println!("{:?}", df);
}

And to compile, use this Cargo.toml:

[package]
name = "explore-rust-polars"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
polars = {version = "0.35.4", features = ["lazy", "dtype-struct"]}

The output:

"x", 1
"y", 2
shape: (2, 3)
┌─────┬─────┬────────────┐
│ a   ┆ b   ┆ new_column │
│ --- ┆ --- ┆ ---        │
│ str ┆ i32 ┆ str        │
╞═════╪═════╪════════════╡
│ x   ┆ 1   ┆ x-1        │
│ y   ┆ 2   ┆ y-2        │
└─────┴─────┴────────────┘
Execute answered 22/12, 2023 at 7:10 Comment(0)
L
0

Your answers are all too complicated. Actually there is a very simple method.

[package]
name = "pol"
version = "0.1.0"
edition = "2021"
[dependencies]
polars = {version="0.43.0",features=["mode","polars-io","csv","polars-ops","lazy","docs-selection","streaming","regex","temporal","is_unique","is_between","dtype-date","dtype-datetime","dtype-time","dtype-duration","dtype-categorical","rows","is_in","pivot"]}
polars-io = "0.43.0"
polars-lazy = "0.43.0"

For Dataframe:

Just df.group_by(["date"])?.apply(user_defined_function)?


    let mut employee_df: DataFrame = df!("Name"=> ["老李", "老李", "老李", "老李", "老张", "老张", "老张", "老张", "老王", "老王", "老王", "老王"],
        "employee_ID"=> ["员工01", "员工01", "员工01", "员工01", "员工02", "员工02", "员工02", "员工02", "员工03", "员工03", "员工03", "员工03"],
        "date"=> ["8月", "9月", "10月", "11月", "8月", "9月", "10月", "11月", "8月", "9月", "10月", "11月"],
        "score"=> [83, 24, 86, 74, 89, 59, 48, 79, 51, 71, 44, 90])?;
        let user_defined_function = |x: DataFrame| -> Result<DataFrame, PolarsError> {
            let col1: &Series = x.column("Name")?;
            let col2: &Series = x.column("employee_ID")?;
            let col3: &Series = x.column("score")?;
            let group_id = x.column("date")?.str()?.get(0).unwrap();
     // Please do something; we get those results below.
    //For each group, you can return complex two-dimensional results, 
    //rather than just a single value like a simple aggregation.
    //For each group,Keep the "Schema" of dataframe consistent,
    //"Schema" is the order,names,datatype of all fields.
            let group_field = Series::new("group".into(), vec![group_id, group_id, group_id]);
            let res_field1 = Series::new("field1".into(), vec!["a1,1", "a2,1", "a3,1"]);
            let res_field2 = Series::new("field2".into(), vec!["a1,2", "a2,2", "a3,2"]);
            let res_field3 = Series::new("field3".into(), vec!["a1,3", "a2,3", "a3,3"]);
            let result = DataFrame::new(vec![group_field, res_field1, res_field2, res_field3])?;
                return Ok(result);
            };
       let res = employee_df.group_by(["date"])?.apply(user_defined_function)?; //For each group, one aggregation returns results that include multiple rows and columns.
       println!("{}", res);

The output:

shape: (12, 4)
┌───────┬────────┬────────┬────────┐
│ group ┆ field1 ┆ field2 ┆ field3 │
│ ---   ┆ ---    ┆ ---    ┆ ---    │
│ str   ┆ str    ┆ str    ┆ str    │
╞═══════╪════════╪════════╪════════╡
│ 8月   ┆ a1,1   ┆ a1,2   ┆ a1,3   │
│ 8月   ┆ a2,1   ┆ a2,2   ┆ a2,3   │
│ 8月   ┆ a3,1   ┆ a3,2   ┆ a3,3   │
│ 9月   ┆ a1,1   ┆ a1,2   ┆ a1,3   │
│ 9月   ┆ a2,1   ┆ a2,2   ┆ a2,3   │
│ …     ┆ …      ┆ …      ┆ …      │
│ 10月  ┆ a2,1   ┆ a2,2   ┆ a2,3   │
│ 10月  ┆ a3,1   ┆ a3,2   ┆ a3,3   │
│ 11月  ┆ a1,1   ┆ a1,2   ┆ a1,3   │
│ 11月  ┆ a2,1   ┆ a2,2   ┆ a2,3   │
│ 11月  ┆ a3,1   ┆ a3,2   ┆ a3,3   │
└───────┴────────┴────────┴────────┘

For LazyFrame

Expression in lazy().group_by/agg context just col("score").apply_many

    use polars::prelude::*;
    let mut employee_df: DataFrame = df!("Name"=> ["老李", "老李", "老李", "老李", "老张", "老张", "老张", "老张", "老王", "老王", "老王", "老王"],
    "employee_ID"=> ["员工01", "员工01", "员工01", "员工01", "员工02", "员工02", "员工02", "员工02", "员工03", "员工03", "员工03", "员工03"],
    "date"=> ["8月", "9月", "10月", "11月", "8月", "9月", "10月", "11月", "8月", "9月", "10月", "11月"],
    "score"=> [83, 24, 86, 74, 89, 59, 48, 79, 51, 71, 44, 90])?;

        let user_defined_function= |x: & mut[Series]| -> Result<Option<Series>, PolarsError>{
            let arg0 = &x[0];
            let arg1 = &x[1];
            let arg2 = &x[2];
            //Please do something; we get those results below.
            let res_field1 = Series::new("rank".into(), vec!["field1,row[10]","row[11]","row[12]"]);
            let res_field2 = Series::new("rank2".into(), vec!["field2,row[20]","row[21]","row[22]"]);
            let res_field3 = Series::new("rank3".into(), vec![1,2,3]);
            //For each group, you can return complex two-dimensional results, 
            //rather than just a single value like a simple aggregation.
            //Complex two-dimensional results must be nest by StructChunked,So that can be stored in one Series .
    //For each group,Keep the "Schema" of StructChunked consistent,
    //"Schema" is the order,names,datatype of all fields in StructChunked.
            let res=StructChunked::from_series("res".into(), &[res_field1,res_field2,res_field3])?.into_series();

            println!("res = {}",res);
            Ok(Some(res))
        };

        // let sc = DataType::Struct(vec![
            // Field::new("f1".into(), DataType::String),
            // Field::new("f2".into(), DataType::String),
            // Field::new("f3".into(), DataType::Int32 )
        // ]);

    //In the API documentation, `GetOutput::from_type(DataType::Boolean)` should be `GetOutput::from_type(sc)`. However, in fact, any `GetOutput` does work.
    let output_type = GetOutput::from_type(DataType::Boolean);
    let res = employee_df.lazy().group_by([col("date")]).agg(
    [
    //col("date"),
    col("score").apply_many(user_defined_function, &[col("Name"),col("employee_ID"),col("score")], output_type)
    ]
    ).collect()?;
    // expolde unnest for unpack StructChunked
    println!("{}",res.explode(["score"])?.unnest(["score"])?);

the output:


        shape: (12, 4)
    ┌──────┬────────────────┬────────────────┬───────┐
    │ date ┆ rank           ┆ rank2          ┆ rank3 │
    │ ---  ┆ ---            ┆ ---            ┆ ---   │
    │ str  ┆ str            ┆ str            ┆ i32   │
    ╞══════╪════════════════╪════════════════╪═══════╡
    │ 10月 ┆ field1,row[10] ┆ field2,row[20] ┆ 1     │
    │ 10月 ┆ row[11]        ┆ row[21]        ┆ 2     │
    │ 10月 ┆ row[12]        ┆ row[22]        ┆ 3     │
    │ 8月  ┆ field1,row[10] ┆ field2,row[20] ┆ 1     │
    │ 8月  ┆ row[11]        ┆ row[21]        ┆ 2     │
    │ …    ┆ …              ┆ …              ┆ …     │
    │ 11月 ┆ row[11]        ┆ row[21]        ┆ 2     │
    │ 11月 ┆ row[12]        ┆ row[22]        ┆ 3     │
    │ 9月  ┆ field1,row[10] ┆ field2,row[20] ┆ 1     │
    │ 9月  ┆ row[11]        ┆ row[21]        ┆ 2     │
    │ 9月  ┆ row[12]        ┆ row[22]        ┆ 3     │
    └──────┴────────────────┴────────────────┴───────┘

Laurinelaurita answered 30/9 at 13:2 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.