I am working with a large dataset that includes multiple unique groups of data identified by a date and a group ID. Each group contains multiple IDs, each with several attributes. Here’s a simplified structure of my data:
| date | group_id | inner_id | attr_a | attr_b | attr_c |
|------------|----------|----------|--------|--------|--------|
| 2023-06-01 | A1 | 001 | val | val | val |
| 2023-06-01 | A1 | 002 | val | val | val |
...
Additionally, for each date, I have a large matrix associated with it:
| date | matrix |
|------------|--------------|
| 2023-06-01 | [[...], ...] |
...
I need to apply a function for each date and group_id that processes data using both the group attributes and the matrix associated with that date. The function looks like this:
def run(group_data: pd.DataFrame, matrix) -> pd.DataFrame:
# process data
return processed_data
Here, group_data
contains the attributes for a specific group:
| inner_id | attr_a | attr_b | attr_c |
|----------|--------|--------|--------|
| 001 | val | val | val |
...
Here is my current implementation, it works but I can only run ~200 dates at a time because I am broadcasting all data to all workers (I have ~2k dates, ~100 groups per date, ~150 inner elements per group)
def calculate_metrics(data: DataFrame, matrices: DataFrame) -> DataFrame:
# Convert matrices to a dictionary mapping dates to matrix
date_matrices = matrices.rdd.collectAsMap()
# Broadcast the matrices
broadcasted_matrices = spark_context.broadcast(date_matrices)
# Function to apply calculations
def apply_calculation(group_key: Tuple[str, str], data_group: pd.DataFrame) -> pd.DataFrame:
date = group_key[1]
return custom_calculation_function(broadcasted_matrices.value[date], data_group)
# Apply the function to each group
return data.groupby('group_id', 'date').applyInPandas(apply_calculation, schema_of_result)
How can I optimize this computation to parallelize the processing effectively, ensuring that the matrices are not redundantly loaded into memory more than necessary?
custom_calculation_function()
? – Prudinner_id
,attr_a
,attr_b
, etc Outputs don't matter too much, but basically I am return an aggregation of the input data as a dataframe with a single row – Geodynamics