How to solve pyspark `org.apache.arrow.vector.util.OversizedAllocationException` error by increasing spark's memory?
Asked Answered
D

4

5

I'm running a job in pyspark where I at one point use a grouped aggregate Pandas UDF. This results in the following (here abbreviate) error:

org.apache.arrow.vector.util.OversizedAllocationException: Unable to expand the buffer

I'm fairly sure this is because one of the groups the pandas UDF receives is huge, and if I reduce the dataset and removes enough rows I can run my UDF with no problems. However, I want to run with my original dataset and even if I run this spark job on a machine with 192.0 GiB of RAM I still get the same error. (And 192.0 GiB should be enough to hold the whole dataset in memory.)

How can I give spark enough memory to be able to run grouped aggregate Pandas UDFs that requires a lot of memory?

For example, is there some spark configuration I'm missing out on that gives more memory to apache arrow?

Longer error message

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
 in 
----> 1 device_attack_result.count()
      2 
      3 
      4 

/usr/lib/spark/python/pyspark/sql/dataframe.py in count(self)
    520         2
    521         """
--> 522         return int(self._jdf.count())
    523 
    524     @ignore_unicode_prefix

/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258 
   1259         for temp_arg in temp_args:

/usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling o818.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 102 in stage 27.0 failed 4 times, most recent failure: Lost task 102.3 in stage 27.0 (TID 3235, ip-172-31-111-163.ec2.internal, executor 1): org.apache.arrow.vector.util.OversizedAllocationException: Unable to expand the buffer
...

Full error message here.

Dominicadominical answered 7/10, 2019 at 12:29 Comment(2)
Did you ever resolve this issue? I am also experiencing this ...Ephesians
I believe @artem-vovsia is correct in that I hit Apache Arrows internal limitation. So I "solved" the problem (learning it's cause), but that didn't result in an easy solution. The hard solution I had to go through was simply to send less data through Arrow. For example, I encoded all columns with strings as integers and other hacks.Rothstein
I
4
  1. Have you tried setting --executor-memory spark-submit option to 180g, so the Spark utilises all available memory?
  2. Actually it doesn't look like Spark is OOMing or a typical data skew issue. It looks like a rather weird situation when one of your data structures hits Apache Arrow internal limitation - no buffer can be larger than Integer.MAX_VALUE bytes in size: https://github.com/apache/arrow/blob/157b179812adb8f29e5966682ff1937f85ce192a/java/vector/src/main/java/org/apache/arrow/vector/BaseValueVector.java#L42 . I'm not sure how Arrow works, but for me, it looks like one of your data points contains more than 4Gbs of data
Imine answered 7/10, 2019 at 15:41 Comment(0)
D
6

Spark's PandasUDF functionality uses Arrow framework to convert the spark DataFrame to pandas DataFrame and The Arrow internal buffer limit is only 2GB at this point, so your pandasUDF group by condition should not produce more than 2 GB of data uncompressed.

df.groupby('id').apply(function)

I mean

you can run your pandas UDF method only if your group by partition size is less than 2 GB uncompressed

here is the ticket for your reference

https://issues.apache.org/jira/browse/ARROW-4890

above issue seems resolved in >= 0.15 version of pyarrow and only Spark 3.x uses pyarrow 0.15 version

Deputize answered 11/12, 2019 at 13:27 Comment(0)
F
6

Arrow 0.16 has changed max buffer allocation size form MaxInteger to MaxLong (64 bits) https://issues.apache.org/jira/browse/ARROW-6112

As of July 2020 upstream Spark is still based on Arrow 0.15 https://github.com/apache/spark/blob/master/python/setup.py

Netty backing buffers still don't support this though.. so chances are you will still hit that issue as a different exception.

So as of now this is still not possible due to the above restrictions.

This might get fixed in Spark side https://issues.apache.org/jira/browse/SPARK-32294 The idea is to feed GroupedData into a pandas UDF in batches to solve this issue.

Update: PySpark on Databricks platform doesn't have this issue. Requires DBR7.4+

Fool answered 10/7, 2020 at 7:15 Comment(1)
I experienced this issue as well and upgrading to DBR 7.6 fixed it, TYSMKnotting
I
4
  1. Have you tried setting --executor-memory spark-submit option to 180g, so the Spark utilises all available memory?
  2. Actually it doesn't look like Spark is OOMing or a typical data skew issue. It looks like a rather weird situation when one of your data structures hits Apache Arrow internal limitation - no buffer can be larger than Integer.MAX_VALUE bytes in size: https://github.com/apache/arrow/blob/157b179812adb8f29e5966682ff1937f85ce192a/java/vector/src/main/java/org/apache/arrow/vector/BaseValueVector.java#L42 . I'm not sure how Arrow works, but for me, it looks like one of your data points contains more than 4Gbs of data
Imine answered 7/10, 2019 at 15:41 Comment(0)
B
2

As per my understanding, all data for a group is loaded into memory before the function is applied. This can lead to out of memory exceptions, especially if the group sizes are skewed. The configuration for maxRecordsPerBatch is not applied to groups and it is up to you to ensure that the grouped data will fit into the available memory.

You can try salting your data to make sure groups are not skewed. Refer to below article which talks about salting for joins. Same concept can be applied here as well

https://datarus.wordpress.com/2015/05/04/fighting-the-skew-in-spark/

Betz answered 7/10, 2019 at 13:33 Comment(2)
That is my understanding too. My question was: Can I increase the available memory for Spark to accommodate my huge groups? One assumption here is that the 200GB RAM I have available is somehow not all used by spark.Rothstein
it depends on the configuration of your cluster. Can you please share the details of your cluster like number of executors, executor memory, cores etc?Betz

© 2022 - 2024 — McMap. All rights reserved.