我想知道如何将Spark DataFrame转换为Polars DataFrame。
假设我有以下PySpark代码:
df = spark.sql('''select * from tmp''')
我可以使用 .toPandas
轻松将其转换为 pandas dataframe。在 polars 中是否有类似的方法,因为我需要获取一个 polars dataframe 以供进一步处理?
我想知道如何将Spark DataFrame转换为Polars DataFrame。
假设我有以下PySpark代码:
df = spark.sql('''select * from tmp''')
我可以使用 .toPandas
轻松将其转换为 pandas dataframe。在 polars 中是否有类似的方法,因为我需要获取一个 polars dataframe 以供进一步处理?
Pyspark使用Arrow将数据转换为Pandas。Polars是对Arrow内存的抽象。因此,我们可以劫持Spark在内部使用的API来创建Arrow数据,并使用它来创建Polars DataFrame
。
给定一个Spark上下文,我们可以编写:
import pyarrow as pa
import polars as pl
sql_context = SQLContext(spark)
data = [('James',[1, 2]),]
spark_df = sql_context.createDataFrame(data=data, schema = ["name","properties"])
df = pl.from_arrow(pa.Table.from_batches(spark_df._collect_as_arrow()))
print(df)
shape: (1, 2)
┌───────┬────────────┐
│ name ┆ properties │
│ --- ┆ --- │
│ str ┆ list[i64] │
╞═══════╪════════════╡
│ James ┆ [1, 2] │
└───────┴────────────┘
这实际上比spark
自带的toPandas()
更快,因为它避免了额外的复制。
toPandas()
会导致以下序列化/复制步骤:
spark-memory -> arrow-memory -> pandas-memory
使用所提供的查询,我们有:
spark-memory -> arrow/polars-memory
.toPandas()
而是按照这个路径时,我遇到了一个IndexOutOfBoundsException
的Pyspark异常。我正在处理的文件只有大约16k列和20列左右。我将spark.driver.memory设置为24g,spark.driver.executor设置为8g,但仍然出现相同的错误。有什么想法吗?有趣的是,使用.toPandas()
却完全正常。这是我最初的SO问题-https://stackoverflow.com/questions/77114720/indexoutofboundsexception-when-converting-pyspark-dataframe-to-polars-dataframe/77115565#77115565 - undefinedpl.DataFrame._from_arrow
,它提供了更具体的类型提示。 - undefined请注意,Polars是一个单机多线程的DataFrame库,而Spark则是一个多机多线程的DataFrame库。所以 Spark 可以将 DataFrame 分布到多台机器上。
如果你的数据集需要这个功能,因为 DataFrame 无法适应单台机器,则 _collect_as_arrow
、to_dict
和 from_pandas
将不能为你提供服务。
如果你想使用一些 Polars 代码对 Spark DataFrame 进行转换(Spark -> Polars -> Spark),你可以使用mapInArrow
进行分布式和可扩展处理:
import pyarrow as pa
import polars as pl
from typing import Iterator
# The example data as a Spark DataFrame
data = [(1, 1.0), (2, 2.0)]
spark_df = spark.createDataFrame(data=data, schema = ['id', 'value'])
spark_df.show()
# Define your transformation on a Polars DataFrame
# Here we multply the 'value' column by 2
def polars_transform(df: pl.DataFrame) -> pl.DataFrame:
return df.select([
pl.col('id'),
pl.col('value') * 2
])
# Converts a part of the Spark DataFrame into a Polars DataFrame and call `polars_transform` on it
def arrow_transform(iter: Iterator[pa.RecordBatch]) -> Iterator[pa.RecordBatch]:
# Transform a single RecordBatch so data fit into memory
# Increase spark.sql.execution.arrow.maxRecordsPerBatch if batches are too small
for batch in iter:
polars_df = pl.from_arrow(pa.Table.from_batches([batch]))
polars_df_2 = polars_transform(polars_df)
for b in polars_df_2.to_arrow().to_batches():
yield b
# Map the Spark DataFrame to Arrow, then to Polars, run the the `polars_transform` on it,
# and transform everything back to Spark DataFrame, all distributed and scalable
spark_df_2 = spark_df.mapInArrow(arrow_transform, schema='id long, value double')
spark_df_2.show()
你不能直接将Spark转换为Polars。但是,你可以先从Spark转到Pandas,然后将Pandas数据转换成字典,最后像这样传递给Polars:
pandas_df = df.toPandas()
data = pandas_df.to_dict('list')
pl_df = pl.DataFrame(data)
正如 @ritchie46 指出的那样,您可以使用 pl.from_pandas()
而不是创建一个字典:
pandas_df = df.toPandas()
pl_df = pl.from_pandas(pandas_df)
另外,正如@DataPsycho的回答中提到的那样,这可能会导致大型数据集的内存溢出异常。这是因为toPandas()
将首先将数据收集到驱动程序中。在这种情况下,最好将其写入csv或parquet文件,然后再读取。但要避免使用repartition(1)
,因为这也会将数据移动到驱动程序。
我提供的代码适用于适合驱动程序内存的数据集。如果您有增加驱动程序内存的选项,可以通过增加spark.driver.memory
的值来实现。
pl.from_pandas
的参数。这将节省大量堆分配并确保类型正确性。 - ritchie46了解您的使用情况会很有帮助。如果需要进行大量转换,应该使用Spark或Polars。不应混合使用两个数据框架。Polars能做到的,Spark都能做到。因此,您应该使用Spark进行所有转换。然后将文件写入csv或parquet格式。然后,您应该使用Polars读取转换后的文件,这样一切都会运行得非常快。但是,如果您对绘图感兴趣,则可以直接将其读入pandas并使用matplotlib。因此,如果您有一个Spark数据框架,可以将其写入csv:
(transformed_df
.repartition(1)
.write
.option("header",true)
.option("delimiter",",") # by default it is ,
.csv("<your_path>")
)
read_csv
函数来读取它。如果你在spark的驱动节点上的内存很小,那么transformed_df.toPandas()
可能会因为内存不足而失败。