在Spark数据框列中填充缺失日期

6

我可以帮助您翻译以下内容,这是关于编程的:

我有一个Spark数据框,其中包含列 - "date",类型为timestamp和"type",类型为long。对于每个日期,我都有一些数量的值。日期按递增顺序排序。但是有一些日期是缺失的。 例如 - 当前df -

Date        |    Quantity
10-09-2016  |    1
11-09-2016  |    2
14-09-2016  |    0
16-09-2016  |    1
17-09-2016  |    0
20-09-2016  |    2

正如您所看到的,df中有一些缺失日期,例如12-09-2016,13-09-2016等。我想在这些缺失日期的数量字段中放入0,以便结果df应该看起来像 -
Date        |    Quantity
10-09-2016  |    1
11-09-2016  |    2
12-09-2016  |    0
13-09-2016  |    0
14-09-2016  |    0
15-09-2016  |    0
16-09-2016  |    1
17-09-2016  |    0
18-09-2016  |    0
19-09-2016  |    0
20-09-2016  |    2

非常感谢您的帮助和建议。请注意,我正在使用Scala编程。

3个回答

12

我已经以较详细的方式编写了这个答案,以便更容易理解代码。它可以进行优化。

所需导入

import java.time.format.DateTimeFormatter
import java.time.{LocalDate, LocalDateTime}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{LongType, TimestampType}

将字符串转换为有效日期格式的UDFs

 val date_transform = udf((date: String) => {
    val dtFormatter = DateTimeFormatter.ofPattern("d-M-y")
    val dt = LocalDate.parse(date, dtFormatter)
    "%4d-%2d-%2d".format(dt.getYear, dt.getMonthValue, dt.getDayOfMonth)
      .replaceAll(" ", "0")
  })

下面的UDF代码取自迭代日期范围

  def fill_dates = udf((start: String, excludedDiff: Int) => {
    val dtFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")
    val fromDt = LocalDateTime.parse(start, dtFormatter)
    (1 to (excludedDiff - 1)).map(day => {
      val dt = fromDt.plusDays(day)
      "%4d-%2d-%2d".format(dt.getYear, dt.getMonthValue, dt.getDayOfMonth)
        .replaceAll(" ", "0")
    })
  })

设置示例数据框(df

val df = Seq(
      ("10-09-2016", 1),
      ("11-09-2016", 2),
      ("14-09-2016", 0),
      ("16-09-2016", 1),
      ("17-09-2016", 0),
      ("20-09-2016", 2)).toDF("date", "quantity")
      .withColumn("date", date_transform($"date").cast(TimestampType))
      .withColumn("quantity", $"quantity".cast(LongType))

df.printSchema()
root
 |-- date: timestamp (nullable = true)
 |-- quantity: long (nullable = false)


df.show()    
+-------------------+--------+
|               date|quantity|
+-------------------+--------+
|2016-09-10 00:00:00|       1|
|2016-09-11 00:00:00|       2|
|2016-09-14 00:00:00|       0|
|2016-09-16 00:00:00|       1|
|2016-09-17 00:00:00|       0|
|2016-09-20 00:00:00|       2|
+-------------------+--------+

创建一个临时数据帧(tempDf),与df合并(unioin):

val w = Window.orderBy($"date")
val tempDf = df.withColumn("diff", datediff(lead($"date", 1).over(w), $"date"))
  .filter($"diff" > 1) // Pick date diff more than one day to generate our date
  .withColumn("next_dates", fill_dates($"date", $"diff"))
  .withColumn("quantity", lit("0"))
  .withColumn("date", explode($"next_dates"))
  .withColumn("date", $"date".cast(TimestampType))

tempDf.show(false)
+-------------------+--------+----+------------------------+
|date               |quantity|diff|next_dates              |
+-------------------+--------+----+------------------------+
|2016-09-12 00:00:00|0       |3   |[2016-09-12, 2016-09-13]|
|2016-09-13 00:00:00|0       |3   |[2016-09-12, 2016-09-13]|
|2016-09-15 00:00:00|0       |2   |[2016-09-15]            |
|2016-09-18 00:00:00|0       |3   |[2016-09-18, 2016-09-19]|
|2016-09-19 00:00:00|0       |3   |[2016-09-18, 2016-09-19]|
+-------------------+--------+----+------------------------+

现在将两个数据帧合并

val result = df.union(tempDf.select("date", "quantity"))
  .orderBy("date")

result.show()
+-------------------+--------+
|               date|quantity|
+-------------------+--------+
|2016-09-10 00:00:00|       1|
|2016-09-11 00:00:00|       2|
|2016-09-12 00:00:00|       0|
|2016-09-13 00:00:00|       0|
|2016-09-14 00:00:00|       0|
|2016-09-15 00:00:00|       0|
|2016-09-16 00:00:00|       1|
|2016-09-17 00:00:00|       0|
|2016-09-18 00:00:00|       0|
|2016-09-19 00:00:00|       0|
|2016-09-20 00:00:00|       2|
+-------------------+--------+

1
你好!您能否请在Pyspark中发布这个答案? - pissall

12

基于@mrsrinivas的优秀回答,这里是将其转为PySpark版本。

需要导入的包

from typing import List
import datetime
from pyspark.sql import DataFrame, Window
from pyspark.sql.functions import col, lit, udf, datediff, lead, explode
from pyspark.sql.types import DateType, ArrayType

用于创建下一个日期范围的UDF函数

def _get_next_dates(start_date: datetime.date, diff: int) -> List[datetime.date]:
    return [start_date + datetime.timedelta(days=days) for days in range(1, diff)]

用于填充日期并支持“分组”列的DateFrame创建函数:

def _get_fill_dates_df(df: DataFrame, date_column: str, group_columns: List[str], fill_column: str) -> DataFrame:
    get_next_dates_udf = udf(_get_next_dates, ArrayType(DateType()))

    window = Window.orderBy(*group_columns, date_column)

    return df.withColumn("_diff", datediff(lead(date_column, 1).over(window), date_column)) \
        .filter(col("_diff") > 1).withColumn("_next_dates", get_next_dates_udf(date_column, "_diff")) \
        .withColumn(fill_column, lit("0")).withColumn(date_column, explode("_next_dates")) \
        .drop("_diff", "_next_dates")

函数的使用方法:

fill_df = _get_fill_dates_df(df, "Date", [], "Quantity")
df = df.union(fill_df)

它假设日期列已经是日期类型。


0

这里有一个小修改,可以使用此函数与月份一起使用,并输入度量列(应设置为零的列)而不是组列:

from typing import List
import datetime
from dateutil import relativedelta
import math
import pyspark.sql.functions as f
from pyspark.sql import DataFrame, Window
from pyspark.sql.types import DateType, ArrayType

def fill_time_gaps_date_diff_based(df: pyspark.sql.dataframe.DataFrame, measure_columns: list, date_column: str):
    
    group_columns = [col for col in df.columns if col not in [date_column]+measure_columns]
    
    # save measure sums for qc
    qc = df.agg({col: 'sum' for col in measure_columns}).collect()

    # convert month to date
    convert_int_to_date = f.udf(lambda mth: datetime.datetime(year=math.floor(mth/100), month=mth%100, day=1), DateType())
    df = df.withColumn(date_column, convert_int_to_date(date_column))

    # sort values
    df = df.orderBy(group_columns)

    # get_fill_dates_df (instead of months_between also use date_diff for days)
    window = Window.orderBy(*group_columns, date_column)

    # calculate diff column
    fill_df = df.withColumn(
        "_diff", 
        f.months_between(f.lead(date_column, 1).over(window), date_column).cast(IntegerType())
    ).filter(
        f.col("_diff") > 1
    )

    # generate next dates
    def _get_next_dates(start_date: datetime.date, diff: int) -> List[datetime.date]:
        return [
            start_date + relativedelta.relativedelta(months=months)
            for months in range(1, diff)
        ]

    get_next_dates_udf = f.udf(_get_next_dates, ArrayType(DateType()))

    fill_df = fill_df.withColumn(
        "_next_dates",
        get_next_dates_udf(date_column, "_diff")
    )

    # set measure columns to 0
    for col in measure_columns:
        fill_df = fill_df.withColumn(col, f.lit(0))

    # explode next_dates column
    fill_df = fill_df.withColumn(date_column, f.explode('_next_dates'))

    # drop unneccessary columns
    fill_df = fill_df.drop(
        "_diff",
        "_next_dates"
    )
    
    # union df with fill_df
    df = df.union(fill_df)
    
    # qc: should be removed for productive runs
    if qc != df.agg({col: 'sum' for col in measure_columns}).collect():
        raise ValueError('Sums before and after run do not fit.')
    
    return df

请注意,我假设月份以YYYYMM的形式给出并表示为整数。通过修改“将月份转换为日期”的部分,可以轻松调整此设置。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接