如何高效地创建一个新的DataFrame(PySpark)?

4

I have a dataframe like:

+---------------+-------+
|  date  |  ID  | count |
+--------+------+-------+
|20170101| 258  |  1003 |
|20170102| 258  |  13   |
|20170103| 258  |  1    |
|20170104| 258  |  108  |
|20170109| 258  |  25   |
|  ...   | ...  |  ...  |
|20170101| 2813 |  503  |
|20170102| 2813 |  139  |
|  ...   | ...  |  ...  |
|20170101| 4963 |  821  |
|20170102| 4963 |  450  |
|  ...   | ...  |  ...  |
+--------+------+-------+

在我的数据框中,有一些数据是不存在的。

例如,这里编号为 258 的数据在日期区间 20170105 ~ 20170108 中缺失。

缺失的数据指的是没有出现过(即计数为0)。

但是我希望也能添加计数为0的缺失数据,就像这样:

+---------------+-------+
|  date  |  ID  | count |
+--------+------+-------+
|20170101| 258  |  1003 |
|20170102| 258  |  13   |
|20170103| 258  |  1    |
|20170104| 258  |  108  |
|20170105| 258  |  0    |
|20170106| 258  |  0    |
|20170107| 258  |  0    |
|20170108| 258  |  0    |
|20170109| 258  |  25   |
|  ...   | ...  |  ...  |
|20170101| 2813 |  503  |
|20170102| 2813 |  139  |
|  ...   | ...  |  ...  |
|20170101| 4963 |  821  |
|20170102| 4963 |  450  |
|  ...   | ...  |  ...  |
+--------+------+-------+

数据框是不可变的,所以如果我想向该数据框中添加零计数的数据,则必须创建一个新的数据框。

但即使我有一个持续时间(20170101 ~ 20171231)和ID列表,我也不能使用for loop来操作数据框。

我应该如何创建一个新的数据框呢?

顺便说一句,我已经尝试过创建正确的数据框,然后比较两个数据框,创建另一个只包含0计数数据的数据框,最后合并“原始数据框”和“0计数数据框”。我认为这不是一个好的长期解决方案。请推荐其他高效的解决方案。

1个回答

3
from pyspark.sql.functions import unix_timestamp, from_unixtime, struct, datediff, lead, col, explode, lit, udf
from pyspark.sql.window import Window
from pyspark.sql.types import ArrayType, DateType
from datetime import timedelta

#sample data
df = sc.parallelize([
    ['20170101', 258, 1003],
    ['20170102', 258, 13],
    ['20170103', 258, 1],
    ['20170104', 258, 108],
    ['20170109', 258, 25],
    ['20170101', 2813, 503],
    ['20170102', 2813, 139],
    ['20170101', 4963, 821],
    ['20170102', 4963, 450]]).\
    toDF(('date', 'ID', 'count')).\
    withColumn("date", from_unixtime(unix_timestamp('date', 'yyyyMMdd')).cast('date'))
df.show()

def date_list_fn(d):
    return [d[0] + timedelta(days=x) for x in range(1, d[1])]
date_list_udf = udf(date_list_fn, ArrayType(DateType()))

w =  Window.partitionBy('ID').orderBy('date')

#dataframe having missing date
df_missing = df.withColumn("diff", datediff(lead('date').over(w), 'date')).\
                filter(col("diff") > 1).\
                withColumn("date_list", date_list_udf(struct("date", "diff"))).\
                withColumn("date_list", explode(col("date_list"))).\
                select(col("date_list").alias("date"), "ID", lit(0).alias("count"))

#final dataframe by combining sample data with missing date dataframe
final_df = df.union(df_missing).sort(col("ID"), col("date"))
final_df.show()

示例数据:

+----------+----+-----+
|      date|  ID|count|
+----------+----+-----+
|2017-01-01| 258| 1003|
|2017-01-02| 258|   13|
|2017-01-03| 258|    1|
|2017-01-04| 258|  108|
|2017-01-09| 258|   25|
|2017-01-01|2813|  503|
|2017-01-02|2813|  139|
|2017-01-01|4963|  821|
|2017-01-02|4963|  450|
+----------+----+-----+

输出结果为:

+----------+----+-----+
|      date|  ID|count|
+----------+----+-----+
|2017-01-01| 258| 1003|
|2017-01-02| 258|   13|
|2017-01-03| 258|    1|
|2017-01-04| 258|  108|
|2017-01-05| 258|    0|
|2017-01-06| 258|    0|
|2017-01-07| 258|    0|
|2017-01-08| 258|    0|
|2017-01-09| 258|   25|
|2017-01-01|2813|  503|
|2017-01-02|2813|  139|
|2017-01-01|4963|  821|
|2017-01-02|4963|  450|
+----------+----+-----+

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接