Combine cross between 2 dataframe efficiently
Asked Answered
A

4

3

I am working with 2 datasets. One describes some time windows by their start and stop times. The second one contains a big list of events with their corresponding timestamps.

I want to combine this into a single dataframe that contains the start and stop time of each window, together with how many events happened during this time window.

I have managed to "solve" my problem with:

import polars as pl

actions = {
    "id": ["a", "a", "a", "a", "b", "b", "a", "a"],
    "action": ["start", "stop", "start", "stop", "start", "stop", "start", "stop"],
    "time": [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0],
}

events = {
    "name": ["x", "x", "x", "y", "y", "z", "w", "w", "w"],
    "time": [0.0, 0.1, 0.5, 1.1, 2.5, 3.0, 4.5, 4.9, 5.5],
}

actions_df = (
    pl.DataFrame(actions)
    .group_by("id")
    .agg(
        start=pl.col("time").filter(pl.col("action") == "start"),
        stop=pl.col("time").filter(pl.col("action") == "stop"),
    )
    .explode(["start", "stop"])
)

df = (
    actions_df.join(pl.DataFrame(events), how="cross")
    .filter((pl.col("time") >= pl.col("start")) & (pl.col("time") <= pl.col("stop")))
    .group_by(["id", "start", "stop", "name"])
    .agg(count=pl.count("name"))
    .pivot("name", index=["id", "start", "stop"], values="count")
    .fill_null(0)
)

result_df = (
    actions_df.join(df, on=["id", "start", "stop"], how="left")
    .fill_null(0)
    .sort("start")
)

print(result_df)
"""
┌─────┬───────┬──────┬─────┬─────┬─────┬─────┐
│ id  ┆ start ┆ stop ┆ w   ┆ y   ┆ x   ┆ z   │
│ --- ┆ ---   ┆ ---  ┆ --- ┆ --- ┆ --- ┆ --- │
│ str ┆ f64   ┆ f64  ┆ u32 ┆ u32 ┆ u32 ┆ u32 │
╞═════╪═══════╪══════╪═════╪═════╪═════╪═════╡
│ a   ┆ 0.0   ┆ 1.0  ┆ 0   ┆ 0   ┆ 3   ┆ 0   │
│ a   ┆ 2.0   ┆ 3.0  ┆ 0   ┆ 1   ┆ 0   ┆ 1   │
│ b   ┆ 4.0   ┆ 5.0  ┆ 2   ┆ 0   ┆ 0   ┆ 0   │
│ a   ┆ 6.0   ┆ 7.0  ┆ 0   ┆ 0   ┆ 0   ┆ 0   │
└─────┴───────┴──────┴─────┴─────┴─────┴─────┘
"""

My issue is that this approach "explodes" in RAM and my process gets killed. I guess that the join(... how="cross") makes my dataframe huge, just to then ignore most of it again.

Can I get some help/hints on a better way to solve this?

To give some orders of magnitude, my "actions" datasets have on the order of 100-500 time windows (~1 MB), and my "events" datasets have on the order of ~10 million (~200 MB). And I am getting my process killed with 16 GB of RAM.

EDIT In real data, my intervals can be overlapping. Thanks to @RomanPekar for bringing this up.

Avilla answered 25/8 at 20:14 Comment(3)
are your intervals in actions_df non-overlapping? I see it's the case in the test example, but is it also true in real production data?Leadwort
@RomanPekar This is a very important point. My true data has overlapping intervals. Sorry I didn't point this out earlier; I didn't think about this edge case. Thanks for bringing it up.Avilla
Just with regards to cross join - Polars can optimize some of it away if you use the lazy streaming engine i.e. actions_df.lazy().join(pl.LazyFrame(events), how="cross").filter(...).collect(streaming=True) - but it's still not an "optimal" approach.Grenadines
G
1

The mentioned non-equi joins PR has been merged as part of Polars 1.7.0

It is called .join_where() and is just an inner join for now.

(actions_df
  .join_where(events_df,
     pl.col.start <= pl.col.time,
     pl.col.stop >= pl.col.time
  )
  .pivot(
     on = "name",
     values = "time",
     aggregate_function = pl.len()
  )
)
shape: (3, 7)
┌─────┬───────┬──────┬──────┬──────┬──────┬──────┐
│ id  ┆ start ┆ stop ┆ x    ┆ y    ┆ z    ┆ w    │
│ --- ┆ ---   ┆ ---  ┆ ---  ┆ ---  ┆ ---  ┆ ---  │
│ str ┆ f64   ┆ f64  ┆ u32  ┆ u32  ┆ u32  ┆ u32  │
╞═════╪═══════╪══════╪══════╪══════╪══════╪══════╡
│ a   ┆ 0.0   ┆ 1.0  ┆ 3    ┆ null ┆ null ┆ null │
│ a   ┆ 2.0   ┆ 3.0  ┆ null ┆ 1    ┆ 1    ┆ null │
│ b   ┆ 4.0   ┆ 5.0  ┆ null ┆ null ┆ null ┆ 2    │
└─────┴───────┴──────┴──────┴──────┴──────┴──────┘

There is an issue for additional join types:

Grenadines answered 12/9 at 9:6 Comment(0)
L
2

Update. As of September 2024, non-equi joins (joins possibly containing inequality conditions) are supported with pl.DataFrame.join_where.

The creation of df then simply becomes the following.

df = (
    pl.DataFrame(events)
    .join_where(
        actions_df,
        pl.col("start") <= pl.col("time"),
        pl.col("time") <= pl.col("stop"),
    )
    .group_by(["id", "start", "stop", "name"])
    .agg(count=pl.count("name"))
    .pivot("name", index=["id", "start", "stop"], values="count")
    .fill_null(0)
)

You can replace the cross-join with an asof join using pl.DataFrame.asof.

The result is identical to that using a cross-join.

df = (
    pl.DataFrame(events)
    .sort("time")
    .join_asof(
        actions_df.sort("stop"),
        left_on="time",
        right_on="stop",
        strategy="forward"
    )
    .filter(
        pl.col("time") >= pl.col("start")
    )
    .group_by(["id", "start", "stop", "name"])
    .agg(count=pl.count("name"))
    .pivot("name", index=["id", "start", "stop"], values="count")
    .fill_null(0)
)

Note that the subsequent filter now only filters for "time" >= "start" as the asof join already ensures "time" <= "stop".

Lewes answered 25/8 at 20:51 Comment(5)
note that this doesn't return a row where all the names are 0, but it should be easy to change it around and join actions_df with events on how=left instead (probably would require a slight change of filter though)Leadwort
@RomanPekar Exchanging actions_df and events does not work as the asof join with strategy="backwards" would selects the last row in the right DataFrame whose on key is less than or equal to the left’s key. Especially, not all events would be counted.Lewes
@RomanPekar Note that this is just the definition of df, which OP further uses in a left-join to create result_df and, hence, ensures that all expected rows show up in the result.Lewes
ah yes, true, you need all events to be there.Leadwort
@RomanPekar Honestly, I was surprised about this behaviour as I also would've expected all events to show up, but on a second thought it makes sense as we'll only select the first/last/nearest row in one of the dataframes.Lewes
L
2

updated join_where() was added in version 1.7.0:

(
    pl.DataFrame(events)
    .join_where(
        actions_df,
        pl.col.time >= pl.col.start,
        pl.col.time <= pl.col.stop,
    )
    .group_by(["id", "start", "stop", "name"], maintain_order=True)
    .agg(count = pl.col.name.count())
    .pivot("name", index=["id", "start", "stop"], values="count")
    .fill_null(0)
)

┌─────┬───────┬──────┬─────┬─────┬─────┬─────┐
│ id  ┆ start ┆ stop ┆ w   ┆ z   ┆ y   ┆ x   │
│ --- ┆ ---   ┆ ---  ┆ --- ┆ --- ┆ --- ┆ --- │
│ str ┆ f64   ┆ f64  ┆ u32 ┆ u32 ┆ u32 ┆ u32 │
╞═════╪═══════╪══════╪═════╪═════╪═════╪═════╡
│ b   ┆ 4.0   ┆ 5.0  ┆ 2   ┆ 0   ┆ 0   ┆ 0   │
│ a   ┆ 2.0   ┆ 3.0  ┆ 0   ┆ 1   ┆ 1   ┆ 0   │
│ a   ┆ 0.0   ┆ 1.0  ┆ 0   ┆ 0   ┆ 0   ┆ 3   │
└─────┴───────┴──────┴─────┴─────┴─────┴─────┘

previous There're open issues for Extend asof_join for between (interval join) and also Join between Polars dataframes with inequality conditions.

For now you could also try using DuckDB integration with Polars, the performance of inequality joins in DuckDB is usually quite good:

import duckdb

df = duckdb.sql("""
    select *
    from actions_df as a
        left join events_df as e on
            e.time between a.start and a.stop
""").pl()

(
    df
    .group_by("id", "start", "stop", "name", maintain_order=True)
    .agg(count = pl.col.name.count())
    .pivot("name", index=["id", "start", "stop"], values="count")
    .fill_null(0)
)

┌─────┬───────┬──────┬─────┬─────┬─────┬─────┬──────┐
│ id  ┆ start ┆ stop ┆ z   ┆ y   ┆ w   ┆ x   ┆ null │
│ --- ┆ ---   ┆ ---  ┆ --- ┆ --- ┆ --- ┆ --- ┆ ---  │
│ str ┆ f64   ┆ f64  ┆ u32 ┆ u32 ┆ u32 ┆ u32 ┆ u32  │
╞═════╪═══════╪══════╪═════╪═════╪═════╪═════╪══════╡
│ a   ┆ 2.0   ┆ 3.0  ┆ 1   ┆ 1   ┆ 0   ┆ 0   ┆ 0    │
│ b   ┆ 4.0   ┆ 5.0  ┆ 0   ┆ 0   ┆ 2   ┆ 0   ┆ 0    │
│ a   ┆ 0.0   ┆ 1.0  ┆ 0   ┆ 0   ┆ 0   ┆ 3   ┆ 0    │
│ a   ┆ 6.0   ┆ 7.0  ┆ 0   ┆ 0   ┆ 0   ┆ 0   ┆ 0    │
└─────┴───────┴──────┴─────┴─────┴─────┴─────┴──────┘

One downside of this approach is that you are not going to be able to use streaming.

Leadwort answered 26/8 at 6:51 Comment(0)
L
1

Just another idea - if your intervals are not overlapping (as in test example), each of events can be only mapped to one action, so I feel that interval join or join_asof might be too much work.

One of the ways to do it with simple join would be to use Categorical data and Expr.cut():

with pl.StringCache():
    actions_df_cut = actions_df.with_columns(
        interval = pl.col.start.cut(
            actions_df["start"].unique().sort(),
            left_closed = True
        )
    )
    
    events_df_cut = events_df.with_columns(
        interval = pl.col.time.cut(
            actions_df["start"].unique().sort(),
            left_closed = True
        )
    )
shape: (4, 4)
┌─────┬───────┬──────┬──────────┐
│ id  ┆ start ┆ stop ┆ interval │
│ --- ┆ ---   ┆ ---  ┆ ---      │
│ str ┆ f64   ┆ f64  ┆ cat      │
╞═════╪═══════╪══════╪══════════╡
│ a   ┆ 0.0   ┆ 1.0  ┆ [0, 2)   │
│ a   ┆ 2.0   ┆ 3.0  ┆ [2, 4)   │
│ a   ┆ 6.0   ┆ 7.0  ┆ [6, inf) │
│ b   ┆ 4.0   ┆ 5.0  ┆ [4, 6)   │
└─────┴───────┴──────┴──────────┘
shape: (9, 3)
┌──────┬──────┬──────────┐
│ name ┆ time ┆ interval │
│ ---  ┆ ---  ┆ ---      │
│ str  ┆ f64  ┆ cat      │
╞══════╪══════╪══════════╡
│ x    ┆ 0.0  ┆ [0, 2)   │
│ x    ┆ 0.1  ┆ [0, 2)   │
│ x    ┆ 0.5  ┆ [0, 2)   │
│ y    ┆ 1.1  ┆ [0, 2)   │
│ y    ┆ 2.5  ┆ [2, 4)   │
│ z    ┆ 3.0  ┆ [2, 4)   │
│ w    ┆ 4.5  ┆ [4, 6)   │
│ w    ┆ 4.9  ┆ [4, 6)   │
│ w    ┆ 5.5  ┆ [4, 6)   │
└──────┴──────┴──────────┘

And then you can join on categorical column:

(
    actions_df_cut.join(events_df_cut, on="interval", how="left")
    .filter((pl.col.time <= pl.col.stop) | pl.col.time.is_null())
    .group_by("id", "start", "stop", "name")
    .agg(count = pl.col.name.count())
    .pivot("name", index=["id", "start", "stop"], values="count")
    .fill_null(0)
)

┌─────┬───────┬──────┬──────┬─────┬─────┬─────┬─────┐
│ id  ┆ start ┆ stop ┆ null ┆ w   ┆ y   ┆ z   ┆ x   │
│ --- ┆ ---   ┆ ---  ┆ ---  ┆ --- ┆ --- ┆ --- ┆ --- │
│ str ┆ f64   ┆ f64  ┆ u32  ┆ u32 ┆ u32 ┆ u32 ┆ u32 │
╞═════╪═══════╪══════╪══════╪═════╪═════╪═════╪═════╡
│ a   ┆ 6.0   ┆ 7.0  ┆ 0    ┆ 0   ┆ 0   ┆ 0   ┆ 0   │
│ b   ┆ 4.0   ┆ 5.0  ┆ 0    ┆ 2   ┆ 0   ┆ 0   ┆ 0   │
│ a   ┆ 2.0   ┆ 3.0  ┆ 0    ┆ 0   ┆ 1   ┆ 1   ┆ 0   │
│ a   ┆ 0.0   ┆ 1.0  ┆ 0    ┆ 0   ┆ 0   ┆ 0   ┆ 3   │
└─────┴───────┴──────┴──────┴─────┴─────┴─────┴─────┘
Leadwort answered 26/8 at 7:29 Comment(0)
G
1

The mentioned non-equi joins PR has been merged as part of Polars 1.7.0

It is called .join_where() and is just an inner join for now.

(actions_df
  .join_where(events_df,
     pl.col.start <= pl.col.time,
     pl.col.stop >= pl.col.time
  )
  .pivot(
     on = "name",
     values = "time",
     aggregate_function = pl.len()
  )
)
shape: (3, 7)
┌─────┬───────┬──────┬──────┬──────┬──────┬──────┐
│ id  ┆ start ┆ stop ┆ x    ┆ y    ┆ z    ┆ w    │
│ --- ┆ ---   ┆ ---  ┆ ---  ┆ ---  ┆ ---  ┆ ---  │
│ str ┆ f64   ┆ f64  ┆ u32  ┆ u32  ┆ u32  ┆ u32  │
╞═════╪═══════╪══════╪══════╪══════╪══════╪══════╡
│ a   ┆ 0.0   ┆ 1.0  ┆ 3    ┆ null ┆ null ┆ null │
│ a   ┆ 2.0   ┆ 3.0  ┆ null ┆ 1    ┆ 1    ┆ null │
│ b   ┆ 4.0   ┆ 5.0  ┆ null ┆ null ┆ null ┆ 2    │
└─────┴───────┴──────┴──────┴──────┴──────┴──────┘

There is an issue for additional join types:

Grenadines answered 12/9 at 9:6 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.