extracting numpy array from Pyspark Dataframe
Asked Answered
K

3

19

I have a dataframe gi_man_df where group can be n:

+------------------+-----------------+--------+--------------+
|           group  |           number|rand_int|   rand_double|
+------------------+-----------------+--------+--------------+
|          'GI_MAN'|                7|       3|         124.2|
|          'GI_MAN'|                7|      10|        121.15|
|          'GI_MAN'|                7|      11|         129.0|
|          'GI_MAN'|                7|      12|         125.0|
|          'GI_MAN'|                7|      13|         125.0|
|          'GI_MAN'|                7|      21|         127.0|
|          'GI_MAN'|                7|      22|         126.0|
+------------------+-----------------+--------+--------------+

and I am expecting a numpy nd_array i.e, gi_man_array:

[[[124.2],[121.15],[129.0],[125.0],[125.0],[127.0],[126.0]]]

where rand_double values after applying pivot.

I tried the following 2 approaches:
FIRST: I pivot the gi_man_df as follows:

gi_man_pivot = gi_man_df.groupBy("number").pivot('rand_int').sum("rand_double")

and the output I got is:

Row(number=7, group=u'GI_MAN', 3=124.2, 10=121.15, 11=129.0, 12=125.0, 13=125.0, 21=127.0, 23=126.0)

but here the problem is to get the desired output, I can't convert it to matrix then convert again to numpy array.

SECOND: I created the vector in the dataframe itself using:

assembler = VectorAssembler(inputCols=["rand_double"],outputCol="rand_double_vector")

gi_man_vector = assembler.transform(gi_man_df)
gi_man_vector.show(7)

and I got the following output:

+----------------+-----------------+--------+--------------+--------------+
|           group|           number|rand_int|   rand_double| rand_dbl_Vect|
+----------------+-----------------+--------+--------------+--------------+
|          GI_MAN|                7|       3|         124.2|       [124.2]|
|          GI_MAN|                7|      10|        121.15|      [121.15]|
|          GI_MAN|                7|      11|         129.0|       [129.0]|
|          GI_MAN|                7|      12|         125.0|       [125.0]|
|          GI_MAN|                7|      13|         125.0|       [125.0]|
|          GI_MAN|                7|      21|         127.0|       [127.0]|
|          GI_MAN|                7|      22|         126.0|       [126.0]|
+----------------+-----------------+--------+--------------+--------------+

but problem here is I can't pivot it on rand_dbl_Vect.

So my question is:
1. Is any of the 2 approaches is correct way of achieving the desired output, if so then how can I proceed further to get the desired result?
2. What other way I can proceed with so the code is optimal and performance is good?

Krems answered 8/2, 2017 at 14:42 Comment(5)
I'm not at my spark console but can you use the .toArray() method? Df.select('rand_dbl').toArray(). Neither your number or rand_int suggest that a groupby has any groups to work from to necessitate a groupby.Mareah
but the groups can be of n types like GI_MAN, LI_MAN and the corresponding values of other columns are changing accordingly, I tried to group by with pivot and it's working fine, can you please elaborate when you say "groupby has any groups to work from to necessitate a groupby", I didn't quite get thatKrems
Your number vector in the example is all 7s. There's only one group. So why need groupby?Mareah
did my answer work for you? If so, please approve it.Mareah
Groups can be of n types so i need a group by thereKrems
M
26

This

import numpy as np
np.array(gi_man_df.select('rand_double').collect())

produces

array([[ 124.2 ],
       [ 121.15],
       .........])
Mareah answered 12/2, 2017 at 1:11 Comment(5)
I can't use collect as the current datasize is 20TB and every month, it's increased by ~5TB. So using collect won't be a viable option as it's will need a lot of memory on driver.Krems
any other way to do the same?Krems
@UdayShankarSingh where were you imagining holding this numpy array if not in memory on your driver? I ask because I am doing a similar thing and the only solution I can think of is to batch through the dataframe in chunks that are small enough to hold the resulting array in memory.Retool
@Retool I have the same problem, did you find an elegant solution? I would be happy to avoid reinventing a solution to a common problem.Aquacade
did you find a solution? can the data frame be split into chunks and processed async?Baronet
P
0

To convert the spark df to numpy array, first convert it to pandas and then apply the to_numpy() function.

spark_df.select(<list of columns needed>).toPandas().to_numpy()
Pickerel answered 5/11, 2022 at 6:37 Comment(1)
This would work but introduces pandas as an intermediate step, which brings big memory overhead which is unnecessary if all we care about is getting a numpy array out in the end. @data_steve's answer is a really elegant way to get around this.Transmigrant
M
0

This solution based on @data_steve's answer is more memory efficient, taking a bit longer:

import numpy as np
np.fromiter( gi_man_df.select('rand_double').toLocalIterator(), dtype=float )[:,None]

as it does not first create a local dataframe and then another numpy array but reads the values one by one to build the array. I observed the RAM consumption and that seems to be exactly what is (not) happening.

Probably you can specify a more appropriate type.

Moulmein answered 20/2 at 9:13 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.