Multi-key GroupBy with shared data on one key
Asked Answered
G

1

6

I am working with a large dataset that includes multiple unique groups of data identified by a date and a group ID. Each group contains multiple IDs, each with several attributes. Here’s a simplified structure of my data:

| date       | group_id | inner_id | attr_a | attr_b | attr_c |
|------------|----------|----------|--------|--------|--------|
| 2023-06-01 | A1       | 001      | val    | val    | val    |
| 2023-06-01 | A1       | 002      | val    | val    | val    |
...

Additionally, for each date, I have a large matrix associated with it:

| date       | matrix       |
|------------|--------------|
| 2023-06-01 | [[...], ...] |
...

I need to apply a function for each date and group_id that processes data using both the group attributes and the matrix associated with that date. The function looks like this:

def run(group_data: pd.DataFrame, matrix) -> pd.DataFrame:
    # process data
    return processed_data

Here, group_data contains the attributes for a specific group:

| inner_id | attr_a | attr_b | attr_c |
|----------|--------|--------|--------|
| 001      | val    | val    | val    |
...

Here is my current implementation, it works but I can only run ~200 dates at a time because I am broadcasting all data to all workers (I have ~2k dates, ~100 groups per date, ~150 inner elements per group)

def calculate_metrics(data: DataFrame, matrices: DataFrame) -> DataFrame:
    # Convert matrices to a dictionary mapping dates to matrix
    date_matrices = matrices.rdd.collectAsMap()

    # Broadcast the matrices
    broadcasted_matrices = spark_context.broadcast(date_matrices)

    # Function to apply calculations
    def apply_calculation(group_key: Tuple[str, str], data_group: pd.DataFrame) -> pd.DataFrame:
        date = group_key[1]
        return custom_calculation_function(broadcasted_matrices.value[date], data_group)

    # Apply the function to each group
    return data.groupby('group_id', 'date').applyInPandas(apply_calculation, schema_of_result)

How can I optimize this computation to parallelize the processing effectively, ensuring that the matrices are not redundantly loaded into memory more than necessary?

Geodynamics answered 27/6, 2024 at 20:52 Comment(3)
Do you have samples as inputs and outputs to custom_calculation_function()?Prud
The inputs would be the dataframe shown above with columns like inner_id, attr_a, attr_b, etc Outputs don't matter too much, but basically I am return an aggregation of the input data as a dataframe with a single rowGeodynamics
The notes for the applyInPandas documentation calls out that it requires a full shuffle. This may be an ignorant question but is there any way you could restructure your custom calculation to use the built in GroupBy functions (perhaps chained together) instead of your function?Libya
P
1

It seems like you don't want to broadcast all the matrices to all the workers resulting in a rather large overhead. This answer also seems to tackle a similar problem where data accessed outside the function scope results in a large computation.

I'm not completely familiar with pySpark myself yet, but I would assume that Spark can handle a join (basically your lookup broadcasted_matrices.value[date]) and a groupby rather efficient. Maybe you can try something like this:

import pandas as pd
from pyspark import SparkContext
from pyspark.sql import DataFrame
from typing import Tuple

# this was just used for my type hinting
spark_context = SparkContext()
schema = None

def custom_calculation_function(group_key: Tuple[str, str], data_group: pd.DataFram) -> pd.DataFrame:
    matrix = data_group["matrix"]
    rest_of_data = data_group.loc[:, data_group.columns != 'matrix']
    ... # whatever you want to do

def calculate_metrics(data: DataFrame, matrices: DataFrame) -> DataFrame:
    return (data
            .join(matrices, on="date", how="left")
            .groupby('date', 'group_id', "inner_id")
            .applyInPandas(custom_calculation_function, schema)
            )
# maybe some additional work / intermediate schema is needed to construct your final schema
Peccadillo answered 6/7, 2024 at 9:32 Comment(1)
This would work, but then I believe I would be repeating the same matrix for each of the inner group elements. So for a certain number of inner group elements I'd still have the same memory issues (I tried something like this and memory usage increased)Geodynamics

© 2022 - 2025 — McMap. All rights reserved.