从pandas Series创建Spark DataFrame

3
我有一个Pandas Series对象。
dates = pd.Series(pd.date_range(start_date,end_date))/
.dt.strftime('%y%m%d')/
.astype(int)/

我希望直接从Series对象创建Spark DF,而不需要中间的Pandas dataframe。
    _schema = StructType([
     StructField("date_id", IntegerType(), True),
])

    dates_rdd = sc.parallelize(dates)
    self.date_table = spark.createDataFrame(dates_rdd, _schema)

错误:

Error: raise TypeError("StructType can not accept object %r in type %s" % 
(obj, type(obj)))
TypeError: StructType can not accept object 160101 in type <class 
'numpy.int64'>

如果我将Series对象更改为以下内容:
    dates = pd.Series(pd.date_range(start_date,end_date))/
    .dt.strftime('%y%m%d')/
    .astype(int).values.tolist()

错误变成了:

 raise TypeError("StructType can not accept object %r in type %s" % (obj, 
 type(obj)))
 TypeError: StructType can not accept object 160101 in type <class 'int'>

如何正确地将日期列表/ rdd 中包含的 Int 值映射到可从 Spark Dataframes 接受的 Python 本机整数?


@Suresh 仍然是相同的错误。 - balalaika
请提供开始日期和结束日期的值。 - Suresh
2个回答

3
这将起作用:
dates_rdd = sc.parallelize(dates).map(lambda x: tuple([int(x)]))
date_table = spark.createDataFrame(dates_rdd, _schema)

在定义dates_rdd时,额外的映射目的是使rdd的格式与模式匹配。


1
好的,基本上和下面的答案一样,你比我快了20秒 ;) - ags29
是的,基本上他之前已经发表了评论,所以我认为接受他的答案是公平的。 - balalaika

2

相信,您错过了为每个系列值创建元组的步骤。

>>> dates = pd.Series(pd.date_range(start='1/1/1980', end='1/11/1980')).dt.strftime('%y%m%d').astype(int).values.tolist()
>>> rdd = sc.parallelize(dates).map(lambda x:(x,))
>>> _schema = StructType([StructField("date_id", IntegerType(), True),])
>>> df = spark.createDataFrame(rdd,schema=_schema)
>>> df.show()
+-------+
|date_id|
+-------+
| 800101|
| 800102|
| 800103|
| 800104|
| 800105|
| 800106|
| 800107|
| 800108|
| 800109|
| 800110|
| 800111|
+-------+

>>> df.printSchema()
root
 |-- date_id: integer (nullable = true)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接