PySpark pandas_udfs java.lang.IllegalArgumentException error
Asked Answered
N

2

8

Does anyone have experience using pandas UDFs on a local pyspark session running on Windows? I've used them on linux with good results, but I've been unsuccessful on my Windows machine.

Environment:

python==3.7
pyarrow==0.15
pyspark==2.3.4
pandas==0.24

java version "1.8.0_74"

Sample script:

from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local").getOrCreate()
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
spark.conf.set("spark.sql.execution.arrow.fallback.enabled", "false")

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))


@pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
def subtract_mean(pdf):
    # pdf is a pandas.DataFrame
    v = pdf.v
    return pdf.assign(v=v - v.mean())


out_df = df.groupby("id").apply(subtract_mean).toPandas()
print(out_df.head())

# +---+----+
# | id|   v|
# +---+----+
# |  1|-0.5|
# |  1| 0.5|
# |  2|-3.0|
# |  2|-1.0|
# |  2| 4.0|
# +---+----+

After running for a loooong time (splits the toPandas stage into 200 tasks each taking over a second) it returns an error like this:

Traceback (most recent call last):
  File "C:\miniconda3\envs\pandas_udf\lib\site-packages\pyspark\sql\dataframe.py", line 1953, in toPandas
    tables = self._collectAsArrow()
  File "C:\miniconda3\envs\pandas_udf\lib\site-packages\pyspark\sql\dataframe.py", line 2004, in _collectAsArrow
    sock_info = self._jdf.collectAsArrowToPython()
  File "C:\miniconda3\envs\pandas_udf\lib\site-packages\py4j\java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "C:\miniconda3\envs\pandas_udf\lib\site-packages\pyspark\sql\utils.py", line 63, in deco
    return f(*a, **kw)
  File "C:\miniconda3\envs\pandas_udf\lib\site-packages\py4j\protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o62.collectAsArrowToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 69 in stage 3.0 failed 1 times, most recent failure: Lost task 69.0 in stage 3.0 (TID 201, localhost, executor driver): java.lang.IllegalArgumentException
    at java.nio.ByteBuffer.allocate(Unknown Source)
    at org.apache.arrow.vector.ipc.message.MessageChannelReader.readNextMessage(MessageChannelReader.java:64)
    at org.apache.arrow.vector.ipc.message.MessageSerializer.deserializeSchema(MessageSerializer.java:104)
    at org.apache.arrow.vector.ipc.ArrowStreamReader.readSchema(ArrowStreamReader.java:128)
    at org.apache.arrow.vector.ipc.ArrowReader.initialize(ArrowReader.java:181)
    at org.apache.arrow.vector.ipc.ArrowReader.ensureInitialized(ArrowReader.java:172)
    at org.apache.arrow.vector.ipc.ArrowReader.getVectorSchemaRoot(ArrowReader.java:65)
    at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:161)
    at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:121)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:290)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$2.hasNext(ArrowConverters.scala:96)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$2.foreach(ArrowConverters.scala:94)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
    at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$2.to(ArrowConverters.scala:94)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
    at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$2.toBuffer(ArrowConverters.scala:94)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
    at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$2.toArray(ArrowConverters.scala:94)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:945)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:945)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
Narcose answered 19/2, 2020 at 18:5 Comment(0)
K
8

Your java.lang.IllegalArgumentException in pandas_udf has to do with pyarrow version, not with OS environment. See this issue for details.

You have two routs of action:

  1. Downgrade pyarrow to v.0.14, or
  2. Add environment variable ARROW_PRE_0_15_IPC_FORMAT=1 to SPARK_HOME/conf/spark-env.sh
Klinger answered 19/2, 2020 at 19:46 Comment(6)
Thanks @Sergey! I downgraded to pyarrow v0.14 and it ran the example producing the expected output; however, it took 5 minutes after initializing spark. Any thoughts on why it could be executing so slowly or where I could look to debug the performance?Narcose
You may have a look at localhost:4040 and see what stage is taking so long time.Klinger
On my Linux machine it took 7 secs for 2 stages with 202 tasks. Not surprisingly, it took longest to move data between executors.Klinger
Running on Windows I get 2 stages 201 tasks, with the second stage taking median duration of 2s/task.Narcose
I still don't understand why it takes 5 minutes on Windows vs 7 seconds on Linux. All additional metrics show very low time (1 ms) and the event timeline shows virtually all "Executor Computing Time". Seems like it's just taking a long time to map that function across the default of 200 partitions(?)Narcose
Try asking a new question, maybe somebody will come up with some meaningful advice.Klinger
C
5

Addendum to the answer of Sergey: if you prefer to build your own sparkSession in python and not change your config files, you'll need to set both spark.yarn.appMasterEnv.ARROW_PRE_0_15_IPC_FORMAT and the environment variable of the local executor spark.executorEnv.ARROW_PRE_0_15_IPC_FORMAT

spark_session = SparkSession.builder \
            .master("yarn") \
            .config('spark.yarn.appMasterEnv.ARROW_PRE_0_15_IPC_FORMAT',1)\
            .config('spark.executorEnv.ARROW_PRE_0_15_IPC_FORMAT',1)

spark = spark_session.getOrCreate()

Hope this helps!

Clairclairaudience answered 8/6, 2020 at 19:5 Comment(1)
I'm running my pyspark jobs on a cluster on Google DataProc and this is REALLY useful for me. Thank you so much!Amiamiable

© 2022 - 2024 — McMap. All rights reserved.