Spark can't pickle method_descriptor

2019-02-17 15:43发布

问题:

I get this weird error message

15/01/26 13:05:12 INFO spark.SparkContext: Created broadcast 0 from wholeTextFiles at NativeMethodAccessorImpl.java:-2
Traceback (most recent call last):
  File "/home/user/inverted-index.py", line 78, in <module>
    print sc.wholeTextFiles(data_dir).flatMap(update).top(10)#groupByKey().map(store)
  File "/home/user/spark2/python/pyspark/rdd.py", line 1045, in top
    return self.mapPartitions(topIterator).reduce(merge)
  File "/home/user/spark2/python/pyspark/rdd.py", line 715, in reduce
    vals = self.mapPartitions(func).collect()
  File "/home/user/spark2/python/pyspark/rdd.py", line 676, in collect
    bytesInJava = self._jrdd.collect().iterator()
  File "/home/user/spark2/python/pyspark/rdd.py", line 2107, in _jrdd
    pickled_command = ser.dumps(command)
  File "/home/user/spark2/python/pyspark/serializers.py", line 402, in dumps
    return cloudpickle.dumps(obj, 2)
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 816, in dumps
    cp.dump(obj)
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 133, in dump
    return pickle.Pickler.dump(self, obj)
  File "/usr/lib/python2.7/pickle.py", line 224, in dump
    self.save(obj)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 562, in save_tuple
    save(element)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 254, in save_function
    self.save_function_tuple(obj, [themodule])
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 304, in save_function_tuple
    save((code, closure, base_globals))
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 548, in save_tuple
    save(element)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 600, in save_list
    self._batch_appends(iter(obj))
  File "/usr/lib/python2.7/pickle.py", line 633, in _batch_appends
    save(x)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 254, in save_function
    self.save_function_tuple(obj, [themodule])
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 304, in save_function_tuple
    save((code, closure, base_globals))
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 548, in save_tuple
    save(element)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 600, in save_list
    self._batch_appends(iter(obj))
  File "/usr/lib/python2.7/pickle.py", line 633, in _batch_appends
    save(x)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 254, in save_function
    self.save_function_tuple(obj, [themodule])
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 304, in save_function_tuple
    save((code, closure, base_globals))
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 548, in save_tuple
    save(element)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 600, in save_list
    self._batch_appends(iter(obj))
  File "/usr/lib/python2.7/pickle.py", line 636, in _batch_appends
    save(tmp[0])
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 249, in save_function
    self.save_function_tuple(obj, modList)
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 309, in save_function_tuple
    save(f_globals)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 174, in save_dict
    pickle.Pickler.save_dict(self, obj)
  File "/usr/lib/python2.7/pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib/python2.7/pickle.py", line 681, in _batch_setitems
    save(v)
  File "/usr/lib/python2.7/pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 650, in save_reduce
    save(state)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 174, in save_dict
    pickle.Pickler.save_dict(self, obj)
  File "/usr/lib/python2.7/pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib/python2.7/pickle.py", line 681, in _batch_setitems
    save(v)
  File "/usr/lib/python2.7/pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 650, in save_reduce
    save(state)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 174, in save_dict
    pickle.Pickler.save_dict(self, obj)
  File "/usr/lib/python2.7/pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib/python2.7/pickle.py", line 681, in _batch_setitems
    save(v)
  File "/usr/lib/python2.7/pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 650, in save_reduce
    save(state)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 174, in save_dict
    pickle.Pickler.save_dict(self, obj)
  File "/usr/lib/python2.7/pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib/python2.7/pickle.py", line 681, in _batch_setitems
    save(v)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 547, in save_inst
    self.save_inst_logic(obj)
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 537, in save_inst_logic
    save(stuff)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 174, in save_dict
    pickle.Pickler.save_dict(self, obj)
  File "/usr/lib/python2.7/pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib/python2.7/pickle.py", line 681, in _batch_setitems
    save(v)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 547, in save_inst
    self.save_inst_logic(obj)
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 537, in save_inst_logic
    save(stuff)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 174, in save_dict
    pickle.Pickler.save_dict(self, obj)
  File "/usr/lib/python2.7/pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib/python2.7/pickle.py", line 681, in _batch_setitems
    save(v)
  File "/usr/lib/python2.7/pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 616, in save_reduce
    save(cls)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 467, in save_global
    d),obj=obj)
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 631, in save_reduce
    save(args)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 548, in save_tuple
    save(element)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 174, in save_dict
    pickle.Pickler.save_dict(self, obj)
  File "/usr/lib/python2.7/pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib/python2.7/pickle.py", line 681, in _batch_setitems
    save(v)
  File "/usr/lib/python2.7/pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 616, in save_reduce
    save(cls)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 442, in save_global
    raise pickle.PicklingError("Can't pickle builtin %s" % obj)
pickle.PicklingError: Can't pickle builtin <type 'method_descriptor'>

My update func returns a list of tuples of type (key, (value1, value2)) and all of them are strings as seen below:

def update(doc):
    doc_id  = doc[0][path_len:-ext_len] #actual file name
    content = doc[1].lower()

    new_fi = regex.split(content)
    old_fi = fi_table.row(doc_id)

    fi_table.put(doc_id, {'cf:col': ",".join(new_fi)})

    if not old_fi:
        return [(term, ('add', doc_id)) for term in new_fi]
    else:
        new_fi = set(new_fi)
        old_fi = set(old_fi['cf:col'].split(','))
        return [(term, ('add', doc_id)) for term in new_fi - old_fi] + \
               [(term, ('del', doc_id)) for term in old_fi - new_fi]

EDIT: The problem lies on these 2 hbase functions, the row and the put. When I comment them both the code works (setting the old_fi as an empty dictionary) but if one of them runs, it produces the above error. I use happybase to operate hbase in python. Can someone explain me what goes wrong?

回答1:

Spark tries to serialize the connect object so it can be used inside the executors, which will surely fail because a deserialized db connect object can't grant read/write permission to another scope (or even computer). The problem can be reproduced by trying to broadcast the connect object. For this instance there was a problem on serializing an i/o object.

The problem was partly solved by connecting to the database inside the map functions. Since there will be too many connections for each RDD element in the map function, I had to switch to partition processing to reduce the db connections from 20k to about 8-64 (based on number of partitions). Spark developers should consider creating an initialization function/script for the executors to avoid these kind of dead end problems.

So let's say I got this init function executed by every node, then every node will be connected to the database (some conn pool, or separate zookeeper nodes) because the init function and the map functions will share the same scope, and then the problem is gone, so you write faster code than the workaround I found. At the end of the execution spark will free/unload these defined variables and the program will end.



回答2:

If it's really a pickling issue for a MethodDescriptorType, you could register how to pickle a MethodDescriptorType, with this:

def _getattr(objclass, name, repr_str):
    # hack to grab the reference directly
    try:
        attr = repr_str.split("'")[3]
        return eval(attr+'.__dict__["'+name+'"]')
    except:
        attr = getattr(objclass,name)
        if name == '__dict__':
            attr = attr[name]
        return attar


def save_wrapper_descriptor(pickler, obj):
    pickler = Pickler(file, protocol)
    pickler.save_reduce(_getattr, (obj.__objclass__, obj.__name__,
                                   obj.__repr__()), obj=obj)
    return

# register the following "type" with:
#     Pickler.dispatch[MethodDescriptorType] = save_wrapper_descriptor
MethodDescriptorType = type(type.__dict__['mro'])

Then, if you register the above to the pickling dispatch table that spark uses (as shown above, or with copy_reg), it may get past the pickling error.