How to speed up groupby().sum() on a dask dataframe with 5 millions of rows and 500 thousands of groups?
Asked Answered
M

1

1

I have a dataframe with

  • 5 millions of rows.
  • a column group_id whose number of unique elements is 500.000.
  • thousands of other columns named var1, var2, etc. Each of var1, var2, ... contains only 0 and 1.

I would like to group by group_id and then sum them up. To have better performance, I use dask. However, the speed is still slow for this simple aggregation.

The time spent on a dataframe with 10 columns is 6.285385847091675 seconds
The time spent on a dataframe with 100 columns is 64.9060411453247 seconds
The time spent on a dataframe with 200 columns is 150.6109869480133 seconds
The time spent on a dataframe with 300 columns is 235.77087807655334 seconds

My real dataset contains up to 30.000 columns. I have read answers (1 and 2) by @Divakar about using numpy. However, the former thread is about counting and the latter is about summing columns.

Could you please elaborate on some ways to speed up this aggregation?

import numpy as np
import pandas as pd
import os, time
from multiprocessing import dummy
import dask.dataframe as dd

core = os.cpu_count()
P = dummy.Pool(processes = core)

n_docs = 500000
n_rows = n_docs * 10
data = {}

def create_col(i):
    name = 'var' + str(i)
    data[name] = np.random.randint(0, 2, n_rows)

n_cols = 300
P.map(create_col, range(1, n_cols + 1))
df = pd.DataFrame(data, dtype = 'int8')
df.insert(0, 'group_id', np.random.randint(1, n_docs + 1, n_rows))
df = dd.from_pandas(df, npartitions = 3 * core) 

start = time.time()
df.groupby('group_id').sum().compute()
end = time.time()
print('The time spent on a dataframe with {} columns is'.format(n_cols), end - start, 'seconds')
Mithridatism answered 11/12, 2021 at 11:38 Comment(4)
Where is the real dataset going to come from?Leibowitz
@Leibowitz It is generated by applying pd.get_dummies on an original dataframe with 25 columns.Mithridatism
group_id comes from 0 up to 499999?Delastre
@Delastre it ranges from 1 to 500.000.Mithridatism
D
2

(I misunderstood OP in original answer, so clearing all).

I got improvement by:

  • switching to numpy
  • using same dtype for group and data (np.int32)
  • using numba with parallel mode'
import numba as nb
@nb.njit('int32[:, :](int32[:, :], int_)', parallel=True)
def count_groups2(group_and_data, n_groups):
    n_cols = group_and_data.shape[1] - 1
    counts = np.zeros((n_groups, n_cols), dtype=np.int32)
    for idx in nb.prange(len(group_and_data)):
        row = group_and_data[idx]
        counts[row[0]] += row[1:]
    return counts

df = pd.DataFrame(data, dtype='int32')
group_id = np.random.randint(1, n_docs + 1, n_rows, dtype=np.int32)
df.insert(0, 'group_id', group_id)

# switching to numpy (line below) is costly
# it would be faster to work with numpy alone (no pandas)
group_and_data = df.values
count_groups2(group_and_data)
op_method(df)

    72         1    1439807.0 1439807.0      7.0      group_and_data = df.values
    73         1    1341527.0 1341527.0      6.5      count_groups2(group_and_data, n_groups=500_000)
    74         1   12043334.0 12043334.0     58.5      op_method(df)
Delastre answered 11/12, 2021 at 12:7 Comment(13)
This is awesome!!! In your code, there is a loop for idx in range(len(group_ids)). My computer has 28 cores and 56 threads. Is there any modification to parallel for idx in range(len(group_ids))?Mithridatism
From the line sums = df_values[:, 1:].sum(axis=1), I think you misunderstood my goal :) I mean to sum rows that belong to the same group, not to sum columns.Mithridatism
@Akura I edited and added parallelism, but it gives some significant boost only when using numpy arrays instead of dataframes.Delastre
@Mithridatism You mean you count them? Or maybe for each group you want to have sum for each column?Delastre
Exactly, I want to have sum of all rows for each column.Mithridatism
@Mithridatism I fixed the answer, take a lookDelastre
I'm a bit confused. So you have run 3 versions op_method(df), faster4(group_and_data), and group_and_data = df.values and mentioned that switching to numpy (line below) is costly, it would be faster to work with numpy alone (no pandas). I understand that op_method(df) is my original code. Can you explain what is the difference between 2 versions faster4(group_and_data) and group_and_data = df.values?Mithridatism
I have run your updated code here, but it returns an error :(( Could you please include the full code of the fastest version faster4(group_and_data)?Mithridatism
@Mithridatism sorry, faster4 was just old name I forgot to change, I guess you already got it working but updated answer anywayDelastre
May I ask what is the inumber 7.0 in the line 72 1 1439807.0 1439807.0 7.0 group_and_data = df.values?Mithridatism
@Mithridatism Its percent of time this line took in a profiled function. I use line_profiler that gives results in such a form: Line # Hits Time Per Hit % Time Line ContentsDelastre
Applying your code to my real dataset reduced the computation time from 1h 20min 54s to just 7min 22s. My CPU with 28 cores and 56 threads are fully utilized. This is very impressive. Thank you so much again.Mithridatism
Glad to hear that. Thank you for letting me know :)Delastre

© 2022 - 2024 — McMap. All rights reserved.