Spark: How to “reduceByKey” when the keys are nump

2019-02-27 13:27发布

问题:

I have an RDD of (key,value) elements. The keys are NumPy arrays. NumPy arrays are not hashable, and this causes a problem when I try to do a reduceByKey operation.

Is there a way to supply the Spark context with my manual hash function? Or is there any other way around this problem (other than actually hashing the arrays "offline" and passing to Spark just the hashed key)?

Here is an example:

import numpy as np
from pyspark import SparkContext

sc = SparkContext()

data = np.array([[1,2,3],[4,5,6],[1,2,3],[4,5,6]])
rd = sc.parallelize(data).map(lambda x: (x,np.sum(x))).reduceByKey(lambda x,y: x+y)
rd.collect()

The error is:

An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.

...

TypeError: unhashable type: 'numpy.ndarray'

回答1:

The simplest solution is to convert it to an object that is hashable. For example:

from operator import add

reduced = sc.parallelize(data).map(
    lambda x: (tuple(x), x.sum())
).reduceByKey(add)

and convert it back later if needed.

Is there a way to supply the Spark context with my manual hash function

Not a straightforward one. A whole mechanism depend on the fact object implements a __hash__ method and C extensions are cannot be monkey patched. You could try to use dispatching to override pyspark.rdd.portable_hash but I doubt it is worth it even if you consider the cost of conversions.