I have an RDD of (key,value) elements. The keys are NumPy arrays. NumPy arrays are not hashable, and this causes a problem when I try to do a reduceByKey
operation.
Is there a way to supply the Spark context with my manual hash function? Or is there any other way around this problem (other than actually hashing the arrays "offline" and passing to Spark just the hashed key)?
Here is an example:
import numpy as np
from pyspark import SparkContext
sc = SparkContext()
data = np.array([[1,2,3],[4,5,6],[1,2,3],[4,5,6]])
rd = sc.parallelize(data).map(lambda x: (x,np.sum(x))).reduceByKey(lambda x,y: x+y)
rd.collect()
The error is:
An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
...
TypeError: unhashable type: 'numpy.ndarray'
The simplest solution is to convert it to an object that is hashable. For example:
and convert it back later if needed.
Not a straightforward one. A whole mechanism depend on the fact object implements a
__hash__
method and C extensions are cannot be monkey patched. You could try to use dispatching to overridepyspark.rdd.portable_hash
but I doubt it is worth it even if you consider the cost of conversions.