I recently wanted to use Cython
with Spark, for which I followed the following reference.
I wrote the following programs as mentioned but I am getting a:
TypeError:
fib_mapper_cython() takes exactly 1 argument (0 given)
spark-tools.py
def spark_cython(module, method):
def wrapped(*args, **kwargs):
global cython_function_
try:
return cython_function_(*args, **kwargs)
except:
import pyximport
pyximport.install()
cython_function_ = getattr(__import__(module), method)
return cython_function_(*args, **kwargs)
return wrapped()
fib.pyx
def fib_mapper_cython(n):
'''
Return the first fibonnaci number > n.
'''
cdef int a = 0
cdef int b = 0
cdef int j = int(n)
while b<j:
a, b = b, a+b
return b, 1
main.py
from spark_tools import spark_cython
import pyximport
import os
from pyspark import SparkContext
from pyspark import SparkConf
pyximport.install()
os.environ["SPARK_HOME"] = "/home/spark-1.6.0"
conf = (SparkConf().setMaster('local').setAppName('Fibo'))
sc = SparkContext()
sc.addPyFile('file:///home/Cythonize/fib.pyx')
sc.addPyFile('file:///home/Cythonize/spark_tools.py')
lines = sc.textFile('file:///home/Cythonize/nums.txt')
mapper = spark_cython('fib', 'fib_mapper_cython')
fib_frequency = lines.map(mapper).reduceByKey(lambda a, b: a+b).collect()
print fib_frequency
I get a TypeError
whenever I run the program. Any Ideas?