Spark与Cython

7
我最近想在Spark中使用Cython,遵循以下参考文献。我按照文中所述编写了以下程序,但出现了以下错误:
TypeError:
fib_mapper_cython() takes exactly 1 argument (0 given)

spark-tools.py

def spark_cython(module, method):
    def wrapped(*args, **kwargs):
        global cython_function_
        try:
            return cython_function_(*args, **kwargs)
        except:
            import pyximport
            pyximport.install()
            cython_function_ = getattr(__import__(module), method)
        return cython_function_(*args, **kwargs)
    return wrapped()

fib.pyx

def fib_mapper_cython(n):
    '''
     Return the first fibonnaci number > n.
    '''
    cdef int a = 0
    cdef int b = 0
    cdef int j = int(n)
    while b<j:
        a, b  = b, a+b
    return b, 1

main.py

from spark_tools import spark_cython
import pyximport
import os
from pyspark import SparkContext
from pyspark import SparkConf
pyximport.install()


os.environ["SPARK_HOME"] = "/home/spark-1.6.0"
conf = (SparkConf().setMaster('local').setAppName('Fibo'))

sc = SparkContext()
sc.addPyFile('file:///home/Cythonize/fib.pyx')
sc.addPyFile('file:///home/Cythonize/spark_tools.py')
lines = sc.textFile('file:///home/Cythonize/nums.txt')

mapper = spark_cython('fib', 'fib_mapper_cython')
fib_frequency = lines.map(mapper).reduceByKey(lambda a, b: a+b).collect()
print fib_frequency

每次运行程序时,我都会收到一个“TypeError”错误。有什么想法吗?

fib_mapper_cython的初始值会无限循环。将b = 1更改应该可以解决这个问题。 - MrChristine
1个回答

6
这不是 CythonPySpark 的问题,很遗憾,在定义 spark_cython 时您添加了额外的函数调用。具体来说,包装对 cython_function 调用的函数在返回时没有传递参数:
return wrapped()  # call made, no args supplied.

因此,当您执行此调用时,不会返回包装函数。您所做的是使用没有*args**kwargs的参数调用wrapped。由于未提供*args, **kwargs,因此wrapped将不带任何参数调用fib_mapper_cython,导致TypeError

您应该改为:

return wrapped

并且这个问题现在不应该再存在了。

1
感谢您的帮助。 - Arnab

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接