Spark with Cython

2019-05-06 17:58发布

问题:

I recently wanted to use Cython with Spark, for which I followed the following reference.

I wrote the following programs as mentioned but I am getting a:

TypeError:
fib_mapper_cython() takes exactly 1 argument (0 given)

spark-tools.py

def spark_cython(module, method):
    def wrapped(*args, **kwargs):
        global cython_function_
        try:
            return cython_function_(*args, **kwargs)
        except:
            import pyximport
            pyximport.install()
            cython_function_ = getattr(__import__(module), method)
        return cython_function_(*args, **kwargs)
    return wrapped()

fib.pyx

def fib_mapper_cython(n):
    '''
     Return the first fibonnaci number > n.
    '''
    cdef int a = 0
    cdef int b = 0
    cdef int j = int(n)
    while b<j:
        a, b  = b, a+b
    return b, 1

main.py

from spark_tools import spark_cython
import pyximport
import os
from pyspark import SparkContext
from pyspark import SparkConf
pyximport.install()


os.environ["SPARK_HOME"] = "/home/spark-1.6.0"
conf = (SparkConf().setMaster('local').setAppName('Fibo'))

sc = SparkContext()
sc.addPyFile('file:///home/Cythonize/fib.pyx')
sc.addPyFile('file:///home/Cythonize/spark_tools.py')
lines = sc.textFile('file:///home/Cythonize/nums.txt')

mapper = spark_cython('fib', 'fib_mapper_cython')
fib_frequency = lines.map(mapper).reduceByKey(lambda a, b: a+b).collect()
print fib_frequency

I get a TypeError whenever I run the program. Any Ideas?

回答1:

This is not a Cython nor a PySpark issue, you unfortunately added an extra function call during the definition of spark_cython. Specifically, the function that wraps the call to the cython_function is called with no arguments on return:

return wrapped()  # call made, no args supplied.

As a result you won't return the wrapped function when you execute this call. What you do is call wrapped with no *args or **kwargs. wrapped then calls fib_mapper_cython with no arguments (since *args, **kwargs are not supplied) hence the TypeError.

You should instead:

return wrapped

and this issue should no longer be present.