如何在Spark的map函数中使用数据框(DataFrames)?

6

定义:

  • sampleDF 是一个具有查找目的列表记录的示例数据帧。
  • sampleDS 是一个包含元素列表的 RDD。
  • mappingFunction 用于查找 sampleDS 中的元素,并将它们映射为1,如果这些元素存在于 sampleDF 中,则映射为0。

我有一个映射函数如下:

def mappingFunction(element):
    # The dataframe lookup!
    lookupResult = sampleDF.filter(sampleDF[0] == element).collect()
    if len(lookupResult) > 0:
        print lookupResult
        return 1
    return 0

问题:

在映射函数外部访问 sampleDF 运行正常,但一旦我在函数内部使用它,就会出现以下错误:

py4j.Py4JException: Method __getnewargs__([]) does not exist
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:335)
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:344)
    at py4j.Gateway.invoke(Gateway.java:252)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:209)
    at java.lang.Thread.run(Thread.java:744)

其他我尝试过的:

我尝试保存一个临时表并在map函数中使用sqlContext选择,但仍然无法使其工作。这是我收到的错误:

  File "/usr/lib64/python2.6/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib64/python2.6/pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib64/python2.6/pickle.py", line 686, in _batch_setitems
    save(v)
  File "/usr/lib64/python2.6/pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "/opt/spark/python/pyspark/cloudpickle.py", line 542, in save_reduce
    save(state)
  File "/usr/lib64/python2.6/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib64/python2.6/pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib64/python2.6/pickle.py", line 681, in _batch_setitems
    save(v)
  File "/usr/lib64/python2.6/pickle.py", line 306, in save
    rv = reduce(self.proto)
TypeError: 'JavaPackage' object is not callable

我的请求:

我已经尝试通过简单的示例来简化我的问题。非常感谢任何关于如何在映射函数内部使用数据框的帮助。

1个回答

3

很抱歉,这是不可能的。Spark不支持对分布式数据结构(RDDsDataFramesDatasets)进行嵌套操作。即使它支持,执行大量任务也不是一个好主意。鉴于你展示的代码,你可能想将RDD转换为DataFrame并使用join进行操作。

(rdd.map(x => (x, )).toDF(["element"])
  .join(sampleDF, sampleDF[0] == df[0])
  .groupBy("element")
  .agg(count("element") > 0))

顺便提一句,在map中进行打印完全没有意义,更不用说它增加了额外的IO开销。


谢谢。我简化了我要做的事情。根据一些条件,RDD中的数据必须映射到不同的内容中。那么,使用Python内存字典怎么样?也就是把DataFrame中的数据加载到Python字典中,然后在map函数中使用它。 - AmirHd
1
是的,你可以。你也可以使用广播变量。你只是不能使用分布式数据结构。 - zero323

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接