4

I'd like to apply a user-define function which takes a few inputs (corresponding some columns in a polars DataFrame) to some columns of a polars DataFrame in Rust. The pattern that I'm using is as below. I wonder is this the best practice?

fn my_filter_func(col1: &Series, col2: &Series, col2: &Series) -> ReturnType {
    let it = (0..n).map(|i| {
        let col1 = match col.get(i) {
            AnyValue::UInt64(val) => val,
            _ => panic!("Wrong type of col1!"),
        };
        // similar for col2 and col3
        // apply user-defined function to col1, col2 and col3
    }
    // convert it to a collection of the required type
}
1

5 Answers 5

5

You can downcast the Series to the proper type you want to iterate over, and then use rust iterators to apply your logic.

fn my_black_box_function(a: f32, b: f32) -> f32 {
    // do something
    a
}

fn apply_multiples(col_a: &Series, col_b: &Series) -> Float32Chunked {
    match (col_a.dtype(), col_b.dtype()) {
        (DataType::Float32, DataType::Float32) => {
            let a = col_a.f32().unwrap();
            let b = col_b.f32().unwrap();

            a.into_iter()
                .zip(b.into_iter())
                .map(|(opt_a, opt_b)| match (opt_a, opt_b) {
                    (Some(a), Some(b)) => Some(my_black_box_function(a, b)),
                    _ => None,
                })
                .collect()
        }
        _ => panic!("unpexptected dtypes"),
    }
}

Lazy API

You don't have to leave the lazy API to be able to access my_black_box_function.

We can collect the columns we want to apply in a Struct data type and then apply a closure over that Series.

fn apply_multiples(lf: LazyFrame) -> Result<DataFrame> {
    df![
        "a" => [1.0, 2.0, 3.0],
        "b" => [3.0, 5.1, 0.3]
    ]?
    .lazy()
    .select([concat_lst(["col_a", "col_b"]).map(
        |s| {
            let ca = s.struct_()?;

            let b = ca.field_by_name("col_a")?;
            let a = ca.field_by_name("col_b")?;
            let a = a.f32()?;
            let b = b.f32()?;

            let out: Float32Chunked = a
                .into_iter()
                .zip(b.into_iter())
                .map(|(opt_a, opt_b)| match (opt_a, opt_b) {
                    (Some(a), Some(b)) => Some(my_black_box_function(a, b)),
                    _ => None,
                })
                .collect();

            Ok(out.into_series())
        },
        GetOutput::from_type(DataType::Float32),
    )])
    .collect()
}
Sign up to request clarification or add additional context in comments.

9 Comments

Does the lazy API loads rows when needed or does it try to load the whole file at once? It seems to me that columns are loaded and converted to ChunkedArrays, which means that all rows are loaded into memory?
I don't think concat_lst is defined anywhere
Not all feature flags are activated by default. It does exist, but you need to activate the list feature.
Instead of concat_lst need to use as_struct
Yep, need to use as_struct(["col_a", "col_b"]) and also enable feature dtype-struct
|
3

The solution I found working for me is with map_multiple(my understanding - this to be used if no groupby/agg) or apply_multiple(my understanding - whenerver you have groupby/agg). Alternatively, you could also use map_many or apply_many. See below.

use polars::prelude::*;
use polars::df;

fn main() {
    let df = df! [
        "names" => ["a", "b", "a"],
        "values" => [1, 2, 3],
        "values_nulls" => [Some(1), None, Some(3)],
        "new_vals" => [Some(1.0), None, Some(3.0)]
    ].unwrap();

    println!("{:?}", df);

    //df.try_apply("values_nulls", |s: &Series| s.cast(&DataType::Float64)).unwrap();

    let df = df.lazy()
        .groupby([col("names")])
        .agg( [
            total_delta_sens().sum()
        ]
        );

    println!("{:?}", df.collect());
}

pub fn total_delta_sens () -> Expr {
    let s: &mut [Expr] = &mut [col("values"), col("values_nulls"),  col("new_vals")];

    fn sum_fa(s: &mut [Series])->Result<Series>{
        let mut ss = s[0].cast(&DataType::Float64).unwrap().fill_null(FillNullStrategy::Zero).unwrap().clone();
        for i in 1..s.len(){
            ss = ss.add_to(&s[i].cast(&DataType::Float64).unwrap().fill_null(FillNullStrategy::Zero).unwrap()).unwrap();
        }
        Ok(ss) 
    }

    let o = GetOutput::from_type(DataType::Float64);
    map_multiple(sum_fa, s, o)
}

Here total_delta_sens is just a wrapper function for convenience. You don't have to use it.You can do directly this within your .agg([]) or .with_columns([]) : lit::<f64>(0.0).map_many(sum_fa, &[col("norm"), col("uniform")], o)

Inside sum_fa you can as Richie already mentioned downcast to ChunkedArray and .iter() or even .par_iter() Hope that helps

Comments

3

With Rust Polars version = "0.46"

The Cargo.toml:

[dependencies]
polars = { version = "0.46", features = [
    "lazy", # Lazy API
    "round_series", # round underlying float types of Series
] }

polars_round.rs

use polars::prelude::*;

/// Formats a DataFrame by rounding all Float32 and Float64 columns to a specified number of decimal places.
///
/// Other column types remain unchanged.
///
/// The function uses lazy evaluation for efficiency.
pub fn format_dataframe_columns(
    dataframe: DataFrame,
    decimals: u32,
) -> Result<DataFrame, PolarsError> {
    dataframe
        .lazy()
        .with_columns([all() // Select all columns
            .map(
                // Apply the rounding function to each column.
                move |column| round_float_columns(column, decimals),
                // Indicate that the output type will be the same as the input type.
                GetOutput::same_type(),
            )])
        .collect() // Collect the results back into an eager DataFrame.
}

/// Rounds Float32 and Float64 Series to the specified number of decimal places.
///
/// Other Series types are returned unchanged.
pub fn round_float_columns(column: Column, decimals: u32) -> Result<Option<Column>, PolarsError> {
    match column.as_series() {
        // Attempt to get a Series from the Column
        Some(series) => {
            // Check if the series data type is a floating point type.
            if series.dtype().is_float() {
                // Round the Series to the specified number of decimals.
                series.round(decimals).map(|s| Some(s.into_column()))
            } else {
                // If it's not a floating-point series, return the original column.
                Ok(Some(column))
            }
        }
        None => Ok(Some(column)),
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_format_dataframe() -> Result<(), PolarsError> {
        // Create a DataFrame with various data types (string, integer, float, boolean, and optional float).
        let df = df!(
            "text_col" => &["a", "b", "c"],
            "int_col" => &[1, 2, 3],
            "float_col" => &[1.1234, 2.565001, 3.965000],
            "bool_col" => &[true, false, true],
            "opt_float" => &[Some(1.234), None, Some(3.45677)],
        )?;

        dbg!(&df);

        // Format the DataFrame to 2 decimal places.
        let formatted_df = format_dataframe_columns(df, 2)?;

        dbg!(&formatted_df);

        // Create expected columns for comparison.
        let column1 = Column::new("float_col".into(), [1.12, 2.57, 3.97]);
        let column2 = Column::new("opt_float".into(), [Some(1.23), None, Some(3.46)]);

        // Assert that the float columns are rounded correctly.
        assert_eq!(formatted_df["float_col"], column1);
        assert_eq!(formatted_df["opt_float"], column2);
        //Other columns remains equals

        Ok(())
    }
}

The output:

[src/polars.rs:59:9] &df = shape: (3, 5)
┌──────────┬─────────┬───────────┬──────────┬───────────┐
│ text_col ┆ int_col ┆ float_col ┆ bool_col ┆ opt_float │
│ ---      ┆ ---     ┆ ---       ┆ ---      ┆ ---       │
│ str      ┆ i32     ┆ f64       ┆ bool     ┆ f64       │
╞══════════╪═════════╪═══════════╪══════════╪═══════════╡
│ a        ┆ 1       ┆ 1.1234    ┆ true     ┆ 1.234     │
│ b        ┆ 2       ┆ 2.565001  ┆ false    ┆ null      │
│ c        ┆ 3       ┆ 3.965     ┆ true     ┆ 3.45677   │
└──────────┴─────────┴───────────┴──────────┴───────────┘
[src/polars.rs:64:9] &formatted_df = shape: (3, 5)
┌──────────┬─────────┬───────────┬──────────┬───────────┐
│ text_col ┆ int_col ┆ float_col ┆ bool_col ┆ opt_float │
│ ---      ┆ ---     ┆ ---       ┆ ---      ┆ ---       │
│ str      ┆ i32     ┆ f64       ┆ bool     ┆ f64       │
╞══════════╪═════════╪═══════════╪══════════╪═══════════╡
│ a        ┆ 1       ┆ 1.12      ┆ true     ┆ 1.23      │
│ b        ┆ 2       ┆ 2.57      ┆ false    ┆ null      │
│ c        ┆ 3       ┆ 3.97      ┆ true     ┆ 3.46      │
└──────────┴─────────┴───────────┴──────────┴───────────┘
test polars::tests::test_format_dataframe ... ok

Comments

1

Here's another variant that I built based on @ritchie46's answer. It's not as elegant and less efficient memory-wise (and probably also compute-wise), but maybe it helps understanding the big picture.

The UDF (user-defined function) concatenates a string column with an integer column.

use polars::prelude::*;

pub fn udf(series: Series) ->  Result<Option<Series>, PolarsError> {
    let mut result: Vec<String> = vec![];
    for struct_ in series.iter() {
        let mut iter = struct_._iter_struct_av();
        let first = iter.next().unwrap();
        let second = iter.next().unwrap();

        let a = first.get_str().unwrap();
        let b = second.try_extract::<i32>().unwrap();

        // Here you have your individual values
        println!("{:?}, {:?}",a, b);

        let concat = format!("{}-{}", a, b);
        result.push(concat);
    }
    let output: ChunkedArray<Utf8Type> = result.into_iter().collect(); 
    return Ok(Some(output.into_series()));
}


fn main() {
    let stub_output_type = GetOutput::from_type(DataType::Float64);
    let mut df = df![
        "a" => ["x", "y"],
        "b" => [1, 2],
    ].unwrap();

    df = df.lazy().with_columns(
        [
            as_struct(vec![col("a"), col("b")])
                .apply(|s| udf(s), stub_output_type)
                .alias("new_column")
        ]
    ).collect().unwrap();

    println!("{:?}", df);
}

And to compile, use this Cargo.toml:

[package]
name = "explore-rust-polars"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
polars = {version = "0.35.4", features = ["lazy", "dtype-struct"]}

The output:

"x", 1
"y", 2
shape: (2, 3)
┌─────┬─────┬────────────┐
│ a   ┆ b   ┆ new_column │
│ --- ┆ --- ┆ ---        │
│ str ┆ i32 ┆ str        │
╞═════╪═════╪════════════╡
│ x   ┆ 1   ┆ x-1        │
│ y   ┆ 2   ┆ y-2        │
└─────┴─────┴────────────┘

Comments

1

Your answers are all too complicated. Actually there is a very simple method.

[package]
name = "pol"
version = "0.1.0"
edition = "2021"
[dependencies]
polars = {version="0.43.0",features=["mode","polars-io","csv","polars-ops","lazy","docs-selection","streaming","regex","temporal","is_unique","is_between","dtype-date","dtype-datetime","dtype-time","dtype-duration","dtype-categorical","rows","is_in","pivot"]}
polars-io = "0.43.0"
polars-lazy = "0.43.0"

For Dataframe:

Just df.group_by(["date"])?.apply(user_defined_function)?


    let mut employee_df: DataFrame = df!("Name"=> ["老李", "老李", "老李", "老李", "老张", "老张", "老张", "老张", "老王", "老王", "老王", "老王"],
        "employee_ID"=> ["员工01", "员工01", "员工01", "员工01", "员工02", "员工02", "员工02", "员工02", "员工03", "员工03", "员工03", "员工03"],
        "date"=> ["8月", "9月", "10月", "11月", "8月", "9月", "10月", "11月", "8月", "9月", "10月", "11月"],
        "score"=> [83, 24, 86, 74, 89, 59, 48, 79, 51, 71, 44, 90])?;
        let user_defined_function = |x: DataFrame| -> Result<DataFrame, PolarsError> {
            let col1: &Series = x.column("Name")?;
            let col2: &Series = x.column("employee_ID")?;
            let col3: &Series = x.column("score")?;
            let group_id = x.column("date")?.str()?.get(0).unwrap();
     // Please do something; we get those results below.
    //For each group, you can return complex two-dimensional results, 
    //rather than just a single value like a simple aggregation.
    //For each group,Keep the "Schema" of dataframe consistent,
    //"Schema" is the order,names,datatype of all fields.
            let group_field = Series::new("group".into(), vec![group_id, group_id, group_id]);
            let res_field1 = Series::new("field1".into(), vec!["a1,1", "a2,1", "a3,1"]);
            let res_field2 = Series::new("field2".into(), vec!["a1,2", "a2,2", "a3,2"]);
            let res_field3 = Series::new("field3".into(), vec!["a1,3", "a2,3", "a3,3"]);
            let result = DataFrame::new(vec![group_field, res_field1, res_field2, res_field3])?;
                return Ok(result);
            };
       let res = employee_df.group_by(["date"])?.apply(user_defined_function)?; //For each group, one aggregation returns results that include multiple rows and columns.
       println!("{}", res);

The output:

shape: (12, 4)
┌───────┬────────┬────────┬────────┐
│ group ┆ field1 ┆ field2 ┆ field3 │
│ ---   ┆ ---    ┆ ---    ┆ ---    │
│ str   ┆ str    ┆ str    ┆ str    │
╞═══════╪════════╪════════╪════════╡
│ 8月   ┆ a1,1   ┆ a1,2   ┆ a1,3   │
│ 8月   ┆ a2,1   ┆ a2,2   ┆ a2,3   │
│ 8月   ┆ a3,1   ┆ a3,2   ┆ a3,3   │
│ 9月   ┆ a1,1   ┆ a1,2   ┆ a1,3   │
│ 9月   ┆ a2,1   ┆ a2,2   ┆ a2,3   │
│ …     ┆ …      ┆ …      ┆ …      │
│ 10月  ┆ a2,1   ┆ a2,2   ┆ a2,3   │
│ 10月  ┆ a3,1   ┆ a3,2   ┆ a3,3   │
│ 11月  ┆ a1,1   ┆ a1,2   ┆ a1,3   │
│ 11月  ┆ a2,1   ┆ a2,2   ┆ a2,3   │
│ 11月  ┆ a3,1   ┆ a3,2   ┆ a3,3   │
└───────┴────────┴────────┴────────┘

For LazyFrame

Expression in lazy().group_by/agg context just col("score").apply_many

    use polars::prelude::*;
    let mut employee_df: DataFrame = df!("Name"=> ["老李", "老李", "老李", "老李", "老张", "老张", "老张", "老张", "老王", "老王", "老王", "老王"],
    "employee_ID"=> ["员工01", "员工01", "员工01", "员工01", "员工02", "员工02", "员工02", "员工02", "员工03", "员工03", "员工03", "员工03"],
    "date"=> ["8月", "9月", "10月", "11月", "8月", "9月", "10月", "11月", "8月", "9月", "10月", "11月"],
    "score"=> [83, 24, 86, 74, 89, 59, 48, 79, 51, 71, 44, 90])?;

        let user_defined_function= |x: & mut[Series]| -> Result<Option<Series>, PolarsError>{
            let arg0 = &x[0];
            let arg1 = &x[1];
            let arg2 = &x[2];
            //Please do something; we get those results below.
            let res_field1 = Series::new("rank".into(), vec!["field1,row[10]","row[11]","row[12]"]);
            let res_field2 = Series::new("rank2".into(), vec!["field2,row[20]","row[21]","row[22]"]);
            let res_field3 = Series::new("rank3".into(), vec![1,2,3]);
            //For each group, you can return complex two-dimensional results, 
            //rather than just a single value like a simple aggregation.
            //Complex two-dimensional results must be nest by StructChunked,So that can be stored in one Series .
    //For each group,Keep the "Schema" of StructChunked consistent,
    //"Schema" is the order,names,datatype of all fields in StructChunked.
            let res=StructChunked::from_series("res".into(), &[res_field1,res_field2,res_field3])?.into_series();

            println!("res = {}",res);
            Ok(Some(res))
        };

        // let sc = DataType::Struct(vec![
            // Field::new("f1".into(), DataType::String),
            // Field::new("f2".into(), DataType::String),
            // Field::new("f3".into(), DataType::Int32 )
        // ]);

    //In the API documentation, `GetOutput::from_type(DataType::Boolean)` should be `GetOutput::from_type(sc)`. However, in fact, any `GetOutput` does work.
    let output_type = GetOutput::from_type(DataType::Boolean);
    let res = employee_df.lazy().group_by([col("date")]).agg(
    [
    //col("date"),
    col("score").apply_many(user_defined_function, &[col("Name"),col("employee_ID"),col("score")], output_type)
    ]
    ).collect()?;
    // expolde unnest for unpack StructChunked
    println!("{}",res.explode(["score"])?.unnest(["score"])?);

the output:


        shape: (12, 4)
    ┌──────┬────────────────┬────────────────┬───────┐
    │ date ┆ rank           ┆ rank2          ┆ rank3 │
    │ ---  ┆ ---            ┆ ---            ┆ ---   │
    │ str  ┆ str            ┆ str            ┆ i32   │
    ╞══════╪════════════════╪════════════════╪═══════╡
    │ 10月 ┆ field1,row[10] ┆ field2,row[20] ┆ 1     │
    │ 10月 ┆ row[11]        ┆ row[21]        ┆ 2     │
    │ 10月 ┆ row[12]        ┆ row[22]        ┆ 3     │
    │ 8月  ┆ field1,row[10] ┆ field2,row[20] ┆ 1     │
    │ 8月  ┆ row[11]        ┆ row[21]        ┆ 2     │
    │ …    ┆ …              ┆ …              ┆ …     │
    │ 11月 ┆ row[11]        ┆ row[21]        ┆ 2     │
    │ 11月 ┆ row[12]        ┆ row[22]        ┆ 3     │
    │ 9月  ┆ field1,row[10] ┆ field2,row[20] ┆ 1     │
    │ 9月  ┆ row[11]        ┆ row[21]        ┆ 2     │
    │ 9月  ┆ row[12]        ┆ row[22]        ┆ 3     │
    └──────┴────────────────┴────────────────┴───────┘

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.