如何使用Pandas 2.0.0从Pandas数据框创建Pyspark数据框?

3

我通常使用spark.createDataFrame(),在早期版本的Pandas中,它会向我抛出关于iteritems()调用的弃用警告。但是在Pandas 2.0.0中,它根本无法工作,导致以下错误:

AttributeError                            Traceback (most recent call last)
File <command-2209449931455530>:64
     61 df_train_test_p.loc[df_train_test_p.is_train=='N','preds']=preds_test
     63 # save the original table and predictions into spark dataframe
---> 64 df_test = spark.createDataFrame(df_train_test_p.loc[df_train_test_p.is_train=='N'])
     65 df_results = df_results.union(df_test)
     67 # saving all relevant data

File /databricks/spark/python/pyspark/instrumentation_utils.py:48, in _wrap_function.<locals>.wrapper(*args, **kwargs)
     46 start = time.perf_counter()
     47 try:
---> 48     res = func(*args, **kwargs)
     49     logger.log_success(
     50         module_name, class_name, function_name, time.perf_counter() - start, signature
     51     )
     52     return res

File /databricks/spark/python/pyspark/sql/session.py:1211, in SparkSession.createDataFrame(self, data, schema, samplingRatio, verifySchema)
   1207     data = pd.DataFrame(data, columns=column_names)
   1209 if has_pandas and isinstance(data, pd.DataFrame):
   1210     # Create a DataFrame from pandas DataFrame.
-> 1211     return super(SparkSession, self).createDataFrame(  # type: ignore[call-overload]
   1212         data, schema, samplingRatio, verifySchema
   1213     )
   1214 return self._create_dataframe(
   1215     data, schema, samplingRatio, verifySchema  # type: ignore[arg-type]
   1216 )

File /databricks/spark/python/pyspark/sql/pandas/conversion.py:478, in SparkConversionMixin.createDataFrame(self, data, schema, samplingRatio, verifySchema)
    476             warn(msg)
    477             raise
--> 478 converted_data = self._convert_from_pandas(data, schema, timezone)
    479 return self._create_dataframe(converted_data, schema, samplingRatio, verifySchema)

File /databricks/spark/python/pyspark/sql/pandas/conversion.py:516, in SparkConversionMixin._convert_from_pandas(self, pdf, schema, timezone)
    514 else:
    515     should_localize = not is_timestamp_ntz_preferred()
--> 516     for column, series in pdf.iteritems():
    517         s = series
    518         if should_localize and is_datetime64tz_dtype(s.dtype) and s.dt.tz is not None:

File /local_disk0/.ephemeral_nfs/envs/pythonEnv-fefe10af-04b7-4277-b395-2f16b77bd90b/lib/python3.9/site-packages/pandas/core/generic.py:5981, in NDFrame.__getattr__(self, name)
   5974 if (
   5975     name not in self._internal_names_set
   5976     and name not in self._metadata
   5977     and name not in self._accessors
   5978     and self._info_axis._can_hold_identifiers_and_holds_name(name)
   5979 ):
   5980     return self[name]
-> 5981 return object.__getattribute__(self, name)

AttributeError: 'DataFrame' object has no attribute 'iteritems'

我该如何解决这个问题?我使用的是Spark 3.3.2。我发现了一个看起来更新的代码,已经替换了有问题的调用,链接在这里:https://github.com/apache/spark/blob/master/python/pyspark/sql/pandas/conversion.py但不确定它是哪个版本以及是否可用。

编辑:下面是重现问题的示例代码:

import pandas as pd
from pyspark.sql import SparkSession

# create a sample pandas dataframe
data = {'name': ['John', 'Mike', 'Sara', 'Adam'], 'age': [25, 30, 18, 40]}
df_pandas = pd.DataFrame(data)

# convert pandas dataframe to PySpark dataframe
spark = SparkSession.builder.appName('pandasToSpark').getOrCreate()
df_spark = spark.createDataFrame(df_pandas)

# show the PySpark dataframe
df_spark.show()

请详细添加您的用例,如果可能,请附上样本输入和期望输出。 - Dipanjan Mallick
sample code added - shiftyscales
我遇到了同样的问题。目前似乎存在pyspark和pandas 2.0.0版本之间的软件包冲突(该版本已于2023年4月3日在PyPi上发布为默认版本)。希望pyspark开发人员能尽快解决这个问题。 - K.S.
3个回答

3

目前存在一个损坏的依赖关系。问题最近已经合并。它将在pyspark==3.4中发布。

不幸的是,目前只有pyspark==3.3.2在pypi上可用。但由于pandas==2.0.0刚刚在pypi上发布(截至2023年4月3日),当前的pyspark似乎暂时无法使用。

唯一的解决方法是按建议固定到旧版本的pandas,直到下一个pyspark版本发布。或者,您可以尝试从发行候选版中获取新的pyspark。


0
一个可行的方法是直接将Pandas数据框转换为Parquet格式,然后使用PySpark读取Parquet文件。
import pandas as pd
from pyspark.sql import SparkSession

# create a sample pandas dataframe
data = {'name': ['John', 'Mike', 'Sara', 'Adam'], 'age': [25, 30, 18, 40]}
df_pandas = pd.DataFrame(data)

parquet_file = 'temp_data.parquet'
df_pandas.to_parquet(parquet_file)

# convert pandas dataframe to PySpark dataframe
spark = SparkSession.builder.appName('pandasToSpark').getOrCreate()
df_spark = spark.read.parquet(parquet_file)

# show the PySpark dataframe
df_spark.show()

-1
你遇到的问题是因为在Pandas 2.0中已经移除了iteritems()方法。相反,你可以使用items()方法来完成同样的操作。
要解决这个问题,你需要更新使用iteritems()方法的代码。一种方法是将iteritems()替换为items()。你可以更新以下代码行:
import pandas as pd

data = {
    "Name": ["Alice", "Bob", "Charlie", "David", "Emily"],
    "Age": [25, 32, 18, 47, 21],
    "City": ["New York", "San Francisco", "Chicago", "Los Angeles", "Boston"],
}

pdf = pd.DataFrame(data)

for column, series in pdf.items():
    print(column, series)

输出

Name 0      Alice
1        Bob
2    Charlie
3      David
4      Emily
Name: Name, dtype: object
Age 0    25
1    32
2    18
3    47
4    21
Name: Age, dtype: int64
City 0         New York
1    San Francisco
2          Chicago
3      Los Angeles
4           Boston
Name: City, dtype: object

或者,如果你仍然需要使用 iteritems(),你可以将 Pandas 版本降级到仍支持 iteritems() 方法的版本。你可以运行以下命令实现:
!pip install pandas==1.2.5

这并不能解决我的问题,因为iteritems()是在Spark中调用的。我知道我可以通过降级pandas来解决这个问题,但我特别关注使用Pandas 2.0.0的解决方案。 - shiftyscales
你可以在Spark中调用items()方法。这个方法可以在pyspark.pandas数据帧方法中使用。请参阅此官方文档spark - Dipanjan Mallick
1
我正在使用 spark.createDataFrame() 方法从 pandas dataframe 创建 pyspark dataframe。该方法使用了已弃用的 iteritems() 调用。 - shiftyscales

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接