如何将Spark DataFrame转换为Polars DataFrame?

8

我想知道如何将Spark DataFrame转换为Polars DataFrame。

假设我有以下PySpark代码:

df = spark.sql('''select * from tmp''')

我可以使用 .toPandas 轻松将其转换为 pandas dataframe。在 polars 中是否有类似的方法,因为我需要获取一个 polars dataframe 以供进一步处理?


据我所知,根据文档显示,Spark目前尚不支持polars。 - samkart
4个回答

24

背景

Pyspark使用Arrow将数据转换为Pandas。Polars是对Arrow内存的抽象。因此,我们可以劫持Spark在内部使用的API来创建Arrow数据,并使用它来创建Polars DataFrame

简述

给定一个Spark上下文,我们可以编写:

import pyarrow as pa
import polars as pl

sql_context = SQLContext(spark)

data = [('James',[1, 2]),]
spark_df = sql_context.createDataFrame(data=data, schema = ["name","properties"])

df = pl.from_arrow(pa.Table.from_batches(spark_df._collect_as_arrow()))

print(df)
shape: (1, 2)
┌───────┬────────────┐
│ name  ┆ properties │
│ ---   ┆ ---        │
│ str   ┆ list[i64]  │
╞═══════╪════════════╡
│ James ┆ [1, 2]     │
└───────┴────────────┘

序列化步骤

这实际上比spark自带的toPandas()更快,因为它避免了额外的复制。

toPandas()会导致以下序列化/复制步骤:

spark-memory -> arrow-memory -> pandas-memory

使用所提供的查询,我们有:

spark-memory -> arrow/polars-memory


嘿,当我不使用.toPandas()而是按照这个路径时,我遇到了一个IndexOutOfBoundsException的Pyspark异常。我正在处理的文件只有大约16k列和20列左右。我将spark.driver.memory设置为24g,spark.driver.executor设置为8g,但仍然出现相同的错误。有什么想法吗?有趣的是,使用.toPandas()却完全正常。这是我最初的SO问题-https://stackoverflow.com/questions/77114720/indexoutofboundsexception-when-converting-pyspark-dataframe-to-polars-dataframe/77115565#77115565 - undefined
最好使用pl.DataFrame._from_arrow,它提供了更具体的类型提示。 - undefined

7

Polars不支持分布式,而Spark支持

请注意,Polars是一个单机多线程的DataFrame库,而Spark则是一个多机多线程的DataFrame库。所以 Spark 可以将 DataFrame 分布到多台机器上。

使用 Polars 代码扩展转换 Spark DataFrame

如果你的数据集需要这个功能,因为 DataFrame 无法适应单台机器,则 _collect_as_arrowto_dictfrom_pandas 将不能为你提供服务。

如果你想使用一些 Polars 代码对 Spark DataFrame 进行转换(Spark -> Polars -> Spark),你可以使用mapInArrow进行分布式和可扩展处理:

import pyarrow as pa
import polars as pl

from typing import Iterator


# The example data as a Spark DataFrame
data = [(1, 1.0), (2, 2.0)]
spark_df = spark.createDataFrame(data=data, schema = ['id', 'value'])
spark_df.show()


# Define your transformation on a Polars DataFrame
# Here we multply the 'value' column by 2
def polars_transform(df: pl.DataFrame) -> pl.DataFrame:
  return df.select([
    pl.col('id'),
    pl.col('value') * 2
  ])


# Converts a part of the Spark DataFrame into a Polars DataFrame and call `polars_transform` on it
def arrow_transform(iter: Iterator[pa.RecordBatch]) -> Iterator[pa.RecordBatch]:
  # Transform a single RecordBatch so data fit into memory
  # Increase spark.sql.execution.arrow.maxRecordsPerBatch if batches are too small
  for batch in iter:
    polars_df = pl.from_arrow(pa.Table.from_batches([batch]))
    polars_df_2 = polars_transform(polars_df)
    for b in polars_df_2.to_arrow().to_batches():
      yield b


# Map the Spark DataFrame to Arrow, then to Polars, run the the `polars_transform` on it,
# and transform everything back to Spark DataFrame, all distributed and scalable
spark_df_2 = spark_df.mapInArrow(arrow_transform, schema='id long, value double')
spark_df_2.show()

2

你不能直接将Spark转换为Polars。但是,你可以先从Spark转到Pandas,然后将Pandas数据转换成字典,最后像这样传递给Polars:

pandas_df = df.toPandas()
data = pandas_df.to_dict('list')
pl_df = pl.DataFrame(data)

正如 @ritchie46 指出的那样,您可以使用 pl.from_pandas() 而不是创建一个字典:

pandas_df = df.toPandas()
pl_df = pl.from_pandas(pandas_df)

另外,正如@DataPsycho的回答中提到的那样,这可能会导致大型数据集的内存溢出异常。这是因为toPandas()将首先将数据收集到驱动程序中。在这种情况下,最好将其写入csv或parquet文件,然后再读取。但要避免使用repartition(1),因为这也会将数据移动到驱动程序。

我提供的代码适用于适合驱动程序内存的数据集。如果您有增加驱动程序内存的选项,可以通过增加spark.driver.memory的值来实现。


你永远不应该通过 Python 字典来访问 Polars。Polars 应该作为 pl.from_pandas 的参数。这将节省大量堆分配并确保类型正确性。 - ritchie46
是的,我考虑过先将我的数据转换成pandas dataframe,但我认为这在我处理的大量数据上行不通:( 希望Spark能尽快添加极地支持。 - s1nbad

0

了解您的使用情况会很有帮助。如果需要进行大量转换,应该使用Spark或Polars。不应混合使用两个数据框架。Polars能做到的,Spark都能做到。因此,您应该使用Spark进行所有转换。然后将文件写入csv或parquet格式。然后,您应该使用Polars读取转换后的文件,这样一切都会运行得非常快。但是,如果您对绘图感兴趣,则可以直接将其读入pandas并使用matplotlib。因此,如果您有一个Spark数据框架,可以将其写入csv:

(transformed_df
    .repartition(1)
    .write
    .option("header",true)
    .option("delimiter",",") # by default it is ,
    .csv("<your_path>")
)

现在用polars或pandas的read_csv函数来读取它。如果你在spark的驱动节点上的内存很小,那么transformed_df.toPandas()可能会因为内存不足而失败。

我主要使用Spark,但有时需要创建Pandas dataframe进行额外的分析/绘制图形。 因此,我想知道在使用polar时是否存在这样的可能性 :) - s1nbad
很遗憾,你应该继续使用 pandas 进行绘图。 - DataPsycho

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接