Spark DataFrame 方法 `toPandas` 实际上在做什么?

52

我是Spark-DataFrame API的初学者。

我使用以下代码将csv以制表符分隔的格式加载到Spark Dataframe中:

lines = sc.textFile('tail5.csv')
parts = lines.map(lambda l : l.strip().split('\t'))
fnames = *some name list*
schemaData = StructType([StructField(fname, StringType(), True) for fname in fnames])
ddf = sqlContext.createDataFrame(parts,schemaData)

假设我使用Spark从新文件创建DataFrame,并使用内置方法toPandas()将其转换为pandas DataFrame,

  • 它会将Pandas对象存储在本地内存中吗?
  • Pandas的底层计算是否全部由Spark处理?
  • 它公开了所有pandas DataFrame的功能吗?(我猜是)
  • 我能够将其转换为pandas DataFrame并完成操作,而无需过多地接触DataFrame API吗?
2个回答

69
使用Spark来读取CSV文件到pandas是一个非常绕弯的方法,以实现将CSV文件读入内存的最终目标。
看起来你可能误解了这里涉及的技术的用例。
Spark用于分布式计算(虽然也可以在本地使用)。它通常过于庞大,不适合仅用于读取CSV文件。
在你的例子中,sc.textFile方法只会给你一个Spark RDD,它实际上是一个文本行的列表。这可能不是你想要的。不会进行类型推断,所以如果你想对CSV文件中的一列数字求和,你将无法做到,因为在Spark看来它们仍然是字符串。
只需使用pandas.read_csv将整个CSV文件读入内存即可。Pandas会自动推断每列的类型。而Spark不会这样做。
现在来回答你的问题:
它将Pandas对象存储到本地内存中吗:
是的。toPandas()将把Spark DataFrame转换为Pandas DataFrame,当然是在内存中。
Pandas的低级计算是否由Spark处理:
不,Pandas运行自己的计算,Spark和Pandas之间没有相互作用,只是有一些API兼容性。
如果是Spark dataframe,不会暴露所有的Pandas dataframe功能。例如,Series对象有一个在PySpark Column对象中不可用的插值方法。在Pandas API中有很多方法和函数,在PySpark API中没有。
如果运行toPandas(),那么是的,因为你实际上是将其转换为Pandas dataframe。
当然可以。实际上,在这种情况下,你可能根本不应该使用Spark。除非你处理的是大量数据,否则pandas.read_csv可能已经满足你的需求。
尝试使用简单、低技术、易于理解的库来解决问题,只有在需要时才转向更复杂的技术。很多时候,你不需要更复杂的技术。

我没有在单台机器上使用Hadoop。我可能需要从HDFS中使用Hive加载数据。如果我将其转换为Pandas,那么我能否在分布式系统内使用Pandas? - Napitupulu Jon
1
啊,我明白了。Spark DataFrames和Pandas DataFrames没有共享计算基础设施。 Spark DataFrames在有意义的地方模拟pandas DataFrames的API。如果您正在寻找一种类似于pandas的操作方式,并且能够让您使用pandas DataFrame进入内存的Hadoop生态系统,请查看[blaze](http://blaze.pydata.org/en/latest/)。 - Phillip Cloud
2
您IP地址为143.198.54.68,由于运营成本限制,当前对于免费用户的使用频率限制为每个IP每72小时10次对话,如需解除限制,请点击左下角设置图标按钮(手机用户先点击左上角菜单按钮)。 - fanfabbb
1
我能先使用 Pandas DataFrame 读取 CSV 文件,然后将其转换成 Spark DataFrame 吗? - luthfianto
是的,您可以将 pandas DataFrame 传递给 HiveContext.createDataFrame - Phillip Cloud
显示剩余6条评论

6
使用一些Spark Context或Hive Context方法(sc.textFile()hc.sql())将数据“读入内存”会返回一个RDD,但是RDD仍然存在于分布式内存(工作节点上的内存),而不是主节点上的内存。所有RDD方法(rdd.map()rdd.reduceByKey()等)都设计为在工作节点上并行运行,但有一些例外情况。例如,如果运行rdd.collect()方法,则会将来自所有工作节点的rdd内容复制到主节点内存中。因此,您失去了分布式计算的优势(但仍可以运行rdd方法)。
同样地,在Pandas中,当您运行toPandas()时,将数据帧从分布式(工作节点)内存复制到本地(主节点)内存,并且失去大部分分布式计算功能。因此,可能使用的一种工作流程(我经常使用)是使用分布式计算方法预处理数据到合理大小,然后转换为Pandas数据帧以获得丰富的特性集。希望这可以帮到您。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接