I was using pyspark on AWS EMR (4 r5.xlarge as 4 workers, each has one executor and 4 cores), and I got AttributeError: Can't get attribute 'new_block' on <module 'pandas.core.internals.blocks'
. Below is a snippet of the code that threw this error:
search = SearchEngine(db_file_dir = "/tmp/db")
conn = sqlite3.connect("/tmp/db/simple_db.sqlite")
pdf_ = pd.read_sql_query('''select zipcode, lat, lng,
bounds_west, bounds_east, bounds_north, bounds_south from
simple_zipcode''',conn)
brd_pdf = spark.sparkContext.broadcast(pdf_)
conn.close()
@udf('string')
def get_zip_b(lat, lng):
pdf = brd_pdf.value
out = pdf[(np.array(pdf["bounds_north"]) >= lat) &
(np.array(pdf["bounds_south"]) <= lat) &
(np.array(pdf['bounds_west']) <= lng) &
(np.array(pdf['bounds_east']) >= lng) ]
if len(out):
min_index = np.argmin( (np.array(out["lat"]) - lat)**2 + (np.array(out["lng"]) - lng)**2)
zip_ = str(out["zipcode"].iloc[min_index])
else:
zip_ = 'bad'
return zip_
df = df.withColumn('zipcode', get_zip_b(col("latitude"),col("longitude")))
Below is the traceback, where line 102, in get_zip_b refers to pdf = brd_pdf.value
:
21/08/02 06:18:19 WARN TaskSetManager: Lost task 12.0 in stage 7.0 (TID 1814, ip-10-22-17-94.pclc0.merkle.local, executor 6): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/mnt/yarn/usercache/hadoop/appcache/application_1627867699893_0001/container_1627867699893_0001_01_000009/pyspark.zip/pyspark/worker.py", line 605, in main
process()
File "/mnt/yarn/usercache/hadoop/appcache/application_1627867699893_0001/container_1627867699893_0001_01_000009/pyspark.zip/pyspark/worker.py", line 597, in process
serializer.dump_stream(out_iter, outfile)
File "/mnt/yarn/usercache/hadoop/appcache/application_1627867699893_0001/container_1627867699893_0001_01_000009/pyspark.zip/pyspark/serializers.py", line 223, in dump_stream
self.serializer.dump_stream(self._batched(iterator), stream)
File "/mnt/yarn/usercache/hadoop/appcache/application_1627867699893_0001/container_1627867699893_0001_01_000009/pyspark.zip/pyspark/serializers.py", line 141, in dump_stream
for obj in iterator:
File "/mnt/yarn/usercache/hadoop/appcache/application_1627867699893_0001/container_1627867699893_0001_01_000009/pyspark.zip/pyspark/serializers.py", line 212, in _batched
for item in iterator:
File "/mnt/yarn/usercache/hadoop/appcache/application_1627867699893_0001/container_1627867699893_0001_01_000009/pyspark.zip/pyspark/worker.py", line 450, in mapper
result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs)
File "/mnt/yarn/usercache/hadoop/appcache/application_1627867699893_0001/container_1627867699893_0001_01_000009/pyspark.zip/pyspark/worker.py", line 450, in <genexpr>
result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs)
File "/mnt/yarn/usercache/hadoop/appcache/application_1627867699893_0001/container_1627867699893_0001_01_000009/pyspark.zip/pyspark/worker.py", line 90, in <lambda>
return lambda *a: f(*a)
File "/mnt/yarn/usercache/hadoop/appcache/application_1627867699893_0001/container_1627867699893_0001_01_000009/pyspark.zip/pyspark/util.py", line 121, in wrapper
return f(*args, **kwargs)
File "/mnt/var/lib/hadoop/steps/s-1IBFS0SYWA19Z/Mobile_ID_process_center.py", line 102, in get_zip_b
File "/mnt/yarn/usercache/hadoop/appcache/application_1627867699893_0001/container_1627867699893_0001_01_000009/pyspark.zip/pyspark/broadcast.py", line 146, in value
self._value = self.load_from_path(self._path)
File "/mnt/yarn/usercache/hadoop/appcache/application_1627867699893_0001/container_1627867699893_0001_01_000009/pyspark.zip/pyspark/broadcast.py", line 123, in load_from_path
return self.load(f)
File "/mnt/yarn/usercache/hadoop/appcache/application_1627867699893_0001/container_1627867699893_0001_01_000009/pyspark.zip/pyspark/broadcast.py", line 129, in load
return pickle.load(file)
AttributeError: Can't get attribute 'new_block' on <module 'pandas.core.internals.blocks' from '/mnt/miniconda/lib/python3.9/site-packages/pandas/core/internals/blocks.py'>
Some observations and thought process:
1, After doing some search online, the AttributeError in pyspark seems to be caused by mismatched pandas versions between driver and workers?
2, But I ran the same code on two different datasets, one worked without any errors but the other didn't, which seems very strange and undeterministic, and it seems like the errors may not be caused by mismatched pandas versions. Otherwise, neither two datasets would succeed.
3, I then ran the same code on the successful dataset again, but this time with different spark configurations: setting spark.driver.memory from 2048M to 4192m, and it threw AttributeError.
4, In conclusion, I think the AttributeError has something to do with driver. But I can't tell how they are related from the error message, and how to fix it: AttributeError: Can't get attribute 'new_block' on <module 'pandas.core.internals.blocks'.