Spark can't pickle method_descriptor
Asked Answered
C

2

7

I get this weird error message

15/01/26 13:05:12 INFO spark.SparkContext: Created broadcast 0 from wholeTextFiles at NativeMethodAccessorImpl.java:-2
Traceback (most recent call last):
  File "/home/user/inverted-index.py", line 78, in <module>
    print sc.wholeTextFiles(data_dir).flatMap(update).top(10)#groupByKey().map(store)
  File "/home/user/spark2/python/pyspark/rdd.py", line 1045, in top
    return self.mapPartitions(topIterator).reduce(merge)
  File "/home/user/spark2/python/pyspark/rdd.py", line 715, in reduce
    vals = self.mapPartitions(func).collect()
  File "/home/user/spark2/python/pyspark/rdd.py", line 676, in collect
    bytesInJava = self._jrdd.collect().iterator()
  File "/home/user/spark2/python/pyspark/rdd.py", line 2107, in _jrdd
    pickled_command = ser.dumps(command)
  File "/home/user/spark2/python/pyspark/serializers.py", line 402, in dumps
    return cloudpickle.dumps(obj, 2)
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 816, in dumps
    cp.dump(obj)
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 133, in dump
    return pickle.Pickler.dump(self, obj)
  File "/usr/lib/python2.7/pickle.py", line 224, in dump
    self.save(obj)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 562, in save_tuple
    save(element)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 254, in save_function
    self.save_function_tuple(obj, [themodule])
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 304, in save_function_tuple
    save((code, closure, base_globals))
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 548, in save_tuple
    save(element)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 600, in save_list
    self._batch_appends(iter(obj))
  File "/usr/lib/python2.7/pickle.py", line 633, in _batch_appends
    save(x)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 254, in save_function
    self.save_function_tuple(obj, [themodule])
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 304, in save_function_tuple
    save((code, closure, base_globals))
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 548, in save_tuple
    save(element)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 600, in save_list
    self._batch_appends(iter(obj))
  File "/usr/lib/python2.7/pickle.py", line 633, in _batch_appends
    save(x)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 254, in save_function
    self.save_function_tuple(obj, [themodule])
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 304, in save_function_tuple
    save((code, closure, base_globals))
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 548, in save_tuple
    save(element)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 600, in save_list
    self._batch_appends(iter(obj))
  File "/usr/lib/python2.7/pickle.py", line 636, in _batch_appends
    save(tmp[0])
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 249, in save_function
    self.save_function_tuple(obj, modList)
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 309, in save_function_tuple
    save(f_globals)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 174, in save_dict
    pickle.Pickler.save_dict(self, obj)
  File "/usr/lib/python2.7/pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib/python2.7/pickle.py", line 681, in _batch_setitems
    save(v)
  File "/usr/lib/python2.7/pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 650, in save_reduce
    save(state)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 174, in save_dict
    pickle.Pickler.save_dict(self, obj)
  File "/usr/lib/python2.7/pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib/python2.7/pickle.py", line 681, in _batch_setitems
    save(v)
  File "/usr/lib/python2.7/pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 650, in save_reduce
    save(state)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 174, in save_dict
    pickle.Pickler.save_dict(self, obj)
  File "/usr/lib/python2.7/pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib/python2.7/pickle.py", line 681, in _batch_setitems
    save(v)
  File "/usr/lib/python2.7/pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 650, in save_reduce
    save(state)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 174, in save_dict
    pickle.Pickler.save_dict(self, obj)
  File "/usr/lib/python2.7/pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib/python2.7/pickle.py", line 681, in _batch_setitems
    save(v)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 547, in save_inst
    self.save_inst_logic(obj)
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 537, in save_inst_logic
    save(stuff)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 174, in save_dict
    pickle.Pickler.save_dict(self, obj)
  File "/usr/lib/python2.7/pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib/python2.7/pickle.py", line 681, in _batch_setitems
    save(v)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 547, in save_inst
    self.save_inst_logic(obj)
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 537, in save_inst_logic
    save(stuff)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 174, in save_dict
    pickle.Pickler.save_dict(self, obj)
  File "/usr/lib/python2.7/pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib/python2.7/pickle.py", line 681, in _batch_setitems
    save(v)
  File "/usr/lib/python2.7/pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 616, in save_reduce
    save(cls)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 467, in save_global
    d),obj=obj)
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 631, in save_reduce
    save(args)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 548, in save_tuple
    save(element)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 174, in save_dict
    pickle.Pickler.save_dict(self, obj)
  File "/usr/lib/python2.7/pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib/python2.7/pickle.py", line 681, in _batch_setitems
    save(v)
  File "/usr/lib/python2.7/pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 616, in save_reduce
    save(cls)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 442, in save_global
    raise pickle.PicklingError("Can't pickle builtin %s" % obj)
pickle.PicklingError: Can't pickle builtin <type 'method_descriptor'>

My update func returns a list of tuples of type (key, (value1, value2)) and all of them are strings as seen below:

def update(doc):
    doc_id  = doc[0][path_len:-ext_len] #actual file name
    content = doc[1].lower()

    new_fi = regex.split(content)
    old_fi = fi_table.row(doc_id)

    fi_table.put(doc_id, {'cf:col': ",".join(new_fi)})

    if not old_fi:
        return [(term, ('add', doc_id)) for term in new_fi]
    else:
        new_fi = set(new_fi)
        old_fi = set(old_fi['cf:col'].split(','))
        return [(term, ('add', doc_id)) for term in new_fi - old_fi] + \
               [(term, ('del', doc_id)) for term in old_fi - new_fi]

EDIT: The problem lies on these 2 hbase functions, the row and the put. When I comment them both the code works (setting the old_fi as an empty dictionary) but if one of them runs, it produces the above error. I use happybase to operate hbase in python. Can someone explain me what goes wrong?

Chi answered 25/1, 2015 at 22:57 Comment(3)
Mind posting the code you used?Initiatory
How are you running your code?Are you using pyspark binary for executing it?Maxentia
i run it on cluster with submit job. I have no clue how to debug the map function as the print command doesn't execute inside itChi
C
7

Spark tries to serialize the connect object so it can be used inside the executors, which will surely fail because a deserialized db connect object can't grant read/write permission to another scope (or even computer). The problem can be reproduced by trying to broadcast the connect object. For this instance there was a problem on serializing an i/o object.

The problem was partly solved by connecting to the database inside the map functions. Since there will be too many connections for each RDD element in the map function, I had to switch to partition processing to reduce the db connections from 20k to about 8-64 (based on number of partitions). Spark developers should consider creating an initialization function/script for the executors to avoid these kind of dead end problems.

So let's say I got this init function executed by every node, then every node will be connected to the database (some conn pool, or separate zookeeper nodes) because the init function and the map functions will share the same scope, and then the problem is gone, so you write faster code than the workaround I found. At the end of the execution spark will free/unload these defined variables and the program will end.

Chi answered 9/3, 2015 at 13:10 Comment(4)
I've experienced a similar problem with custom Python modules distributed to the workers by zip file, where the method couldn't access a module nested inside another, imported at the top of the driver script (e.g. import A.B would mean A.B.foo would fail). Somehow, CloudPickle was trying to serialise the module. The solution was to either import all from the parent module (from A import *), or import the nested modules from inside the method itself.Andy
The Spark Streaming pages have some info on best practices that are strongly related to this answer: spark.apache.org/docs/latest/…Christensen
Can you please explain how to write the init functions inside the executor ? I am facing the same issues when I am trying to write my numpy arrays from memory to s3 directly using cottoncandy client. And I am passing the client to my map functionWine
@Wine you will have to create the clients and connect to s3 inside the executor function, if you spawn 1000 executors for 1000 objects, switch to batch (partitions) and run 10 executors that process 100 objects each so you create 10 clients isntead of 1000Chi
R
0

If it's really a pickling issue for a MethodDescriptorType, you could register how to pickle a MethodDescriptorType, with this:

def _getattr(objclass, name, repr_str):
    # hack to grab the reference directly
    try:
        attr = repr_str.split("'")[3]
        return eval(attr+'.__dict__["'+name+'"]')
    except:
        attr = getattr(objclass,name)
        if name == '__dict__':
            attr = attr[name]
        return attar


def save_wrapper_descriptor(pickler, obj):
    pickler = Pickler(file, protocol)
    pickler.save_reduce(_getattr, (obj.__objclass__, obj.__name__,
                                   obj.__repr__()), obj=obj)
    return

# register the following "type" with:
#     Pickler.dispatch[MethodDescriptorType] = save_wrapper_descriptor
MethodDescriptorType = type(type.__dict__['mro'])

Then, if you register the above to the pickling dispatch table that spark uses (as shown above, or with copy_reg), it may get past the pickling error.

Ruderal answered 26/1, 2015 at 7:31 Comment(6)
after some checks i found that the problem lies on happybase (hbase python client) function callsChi
i doubt that the problem is happybase. it doesn't appear in the stack trace, and happybase never pickles anything.Engenia
I do this to register a new type "t" to the dispatch table. Here func would be save_wrapper_descriptor and t would be MethodDescriptorType = type(type.__dict__['mro']). Then do: Pickler.dispatch[t] = func, or use copy_reg.Ruderal
I altrered your code to match the cloudpickler code style and I get this error: pickle.PicklingError: Could not pickle object as excessively deep recursion required. Try _fast_serialization=2 or contact PiCloud support Here is the altered version pastebin.com/sTCRSaHT code is under """Stack Overflow"""commentsChi
I got a <method 'reset' of 'cStringIO.StringO' objects> 80+ times printing the repr of the objectChi
I was looking at your pastebin. On line 121, you're returning "attar". Should that be "attr"?Idem

© 2022 - 2024 — McMap. All rights reserved.