AWS EMR - ModuleNotFoundError: No module named 'pyarrow'
Asked Answered
L

2

7

I am running into this problem w/ Apache Arrow Spark Integration.

Using AWS EMR w/ Spark 2.4.3

Tested this problem on both local spark single machine instance and a Cloudera cluster and everything works fine.

set these in spark-env.sh

export PYSPARK_PYTHON=python3
export PYSPARK_PYTHON_DRIVER=python3

confirmed this in spark shell

spark.version
2.4.3
sc.pythonExec
python3
SC.pythonVer
python3

running basic pandas_udf with apache arrow integration results in error

from pyspark.sql.functions import pandas_udf, PandasUDFType

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

@pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
def subtract_mean(pdf):
    # pdf is a pandas.DataFrame
    v = pdf.v
    return pdf.assign(v=v - v.mean())

df.groupby("id").apply(subtract_mean).show()

error on aws emr [doesn't error on cloudera and local machine]

ModuleNotFoundError: No module named 'pyarrow'

        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)
        at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:172)
        at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:122)
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:291)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:283)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:121)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

Anyone have an idea what is going on? some possible ideas ...

Could PYTHONPATH be causing a problem because I am not using anaconda?

Does it have to do with the Spark Version and Arrow Version?

This is the strangest thing because I am using the same versions across within all 3 platforms [local desktop, cloudera, emr] and only EMR is not working ...

I logged into all 4 EMR EC2 data nodes and tested that I can importpyarrow and it works totally fine but not when trying to use it with spark

# test

import numpy as np
import pandas as pd
import pyarrow as pa
df = pd.DataFrame({'one': [20, np.nan, 2.5],'two': ['january', 'february', 'march'],'three': [True, False, True]},index=list('abc'))
table = pa.Table.from_pandas(df)
Lengthwise answered 1/8, 2019 at 18:28 Comment(8)
I guess EMR spark use a different python interpreter. what's the result of echo $PYTHONPATH on that node?Worker
it is empty when i echoLengthwise
getting closer to finding the root cause @Worker ... when i run spark in local mode on EMR and run the pandas_udf code it works so it tells me something is wrong with the remaining nodes in the cluster not having the env vars mapped properlyLengthwise
have you set the PYSPARK_PYTHON on slaves. and make sure there's the pyarrow. or you can add a param to transfer Python env(zip file) to every nodes.Worker
this cluster was created by someone else so i am not sure ... i have PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON set in the spark-env.sh on the master nodeLengthwise
you need to set pyspark python on slavesWorker
how is this accomplish in EMR? because in Cloudera all i ever have to do is set PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON in spark-env.sh on edge nodeLengthwise
yes, you need to copy it to all your slave nodes for the settings to take effect.Worker
M
7

In EMR python3 is not resolved by default. You have to make it explicit. One way to do it is to pass a config.json file as you're creating the cluster. It's available in the Edit software settings section in AWS EMR UI. A sample json file looks something like this.

[
  {
    "Classification": "spark-env",
    "Configurations": [
      {
        "Classification": "export",
        "Properties": {
          "PYSPARK_PYTHON": "/usr/bin/python3"
        }
      }
    ]
  },
  {
    "Classification": "yarn-env",
    "Properties": {},
    "Configurations": [
      {
        "Classification": "export",
        "Properties": {
          "PYSPARK_PYTHON": "/usr/bin/python3"
        }
      }
    ]
  }
]

Also you need to have the pyarrow module installed in all core nodes, not only in the master. For that you can use a bootstrap script while creating the cluster in AWS. Again, a sample bootstrap script can be as simple as something like this:

#!/bin/bash
sudo python3 -m pip install pyarrow==0.13.0
Memoirs answered 8/8, 2019 at 8:53 Comment(7)
thanks for the info ... yes ... bootstrap has been applied and is installed on all nodes ... same error ... will check the config fileLengthwise
this is the path where it is installed on every node /home/hadoop/.local/lib/python3.6/site-packagesLengthwise
This is a working solution for us, we're using it against EMR 5.23.0. Let me know if it's working for you too.Memoirs
won't be able to confirm in the next few days ... why are you using yarn-env? i don't see any documentation on thisLengthwise
well, for spark that's optional. Here's the official doc: aws.amazon.com/premiumsupport/knowledge-center/…Memoirs
this worked! specifically it was the way python was installing the packages ... the bootstrap was using the hadoop user via pip-3.6 and installing under ~/.local/lib/... which i think was causing problems with the ec2 linux preferences whereas root user installs under /usr/lib64/... ... thank you!!!Lengthwise
This did not work for me. When I run pyspark on the master, I can see it is being run with Python 3. I can also import pyarrow on any of the nodes. However, when I submit a job that uses pyarrow, I get ImportError: PyArrow >= 0.8.0 must be installed; however, it was not found.Weidman
W
3

There are two options in your case:

One is to make sure the Python env is correct on every machines:

  • set the PYSPARK_PYTHON to your Python interpreter that has installed the third part module such as pyarrow. you can use type -a python to check how many python there is on your slave node.

  • if the python interpreter path are all the same on every nodes, you can set PYSPARK_PYTHON in spark-env.sh then copy to every other nodes. read this for more: https://spark.apache.org/docs/2.4.0/spark-standalone.html

Another option is to add argument on spark-submit:

Worker answered 8/8, 2019 at 1:46 Comment(3)
thank you ... this all makes sense ... i am familiar with --py-files and tried it with the zipped module and it does not work ... same error ... the spark-env.sh doesn't even exist on the slaves nodes so not sure how/where to copy what i have on the master ... my cluster manager is YARNLengthwise
if you wanna set spark en with EMR. you 'd better take a look at this docs.aws.amazon.com/emr/latest/ReleaseGuide/…Worker
spark-submit --conf "spark.yarn.executorEnv.PYSPARK_PYTHON=$PYSPARK_PYTHON" try adding a conf in spark-submit like this.Worker

© 2022 - 2024 — McMap. All rights reserved.