用pySpark / Python遍历数据框列,检查条件并填充另一列。

4
我正在Jupyter Notebook中使用python/pySpark进行工作,并尝试弄清楚以下问题:
我有一个类似于数据框的东西。
MainDate      Date1        Date2        Date3         Date4
2015-10-25    2015-09-25   2015-10-25   2015-11-25    2015-12-25
2012-07-16    2012-04-16   2012-05-16   2012-06-16    2012-07-16
2005-03-14    2005-07-14   2005-08-14   2005-09-14    2005-10-14

我需要将MainDate与Date1-Date4中的每个日期进行比较,如果MainDate == Date#,则创建一个新列REAL = Date#,如果没有匹配,则REAL =“None”,所有日期都以日期格式给出。另外,真实数据框具有Date1到Date72,可能只有一个匹配,如果有任何匹配。

最终结果:
MainDate      Date1        Date2        Date3         Date4        REAL
2015-10-25    2015-09-25   2015-10-25   2015-11-25    2015-12-25   Date2
2012-07-16    2012-04-16   2012-05-16   2012-06-16    2012-07-16   Date4
2005-03-14    2005-07-14   2005-08-14   2005-09-14    2005-10-14   None

提前感谢您。
2个回答

6

我会使用 coalesce

from pyspark.sql.functions import col, when, coalesce, lit

df = spark.createDataFrame([
    ("2015-10-25", "2015-09-25", "2015-10-25", "2015-11-25", "2015-12-25"),
    ("2012-07-16", "2012-04-16", "2012-05-16", "2012-06-16", "2012-07-16"),
    ("2005-03-14", "2005-07-14", "2005-08-14", "2005-09-14", "2005-10-14"),],
    ("MainDate", "Date1", "Date2", "Date3", "Date4")
)


df.withColumn("REAL", 
    coalesce(*[when(col(c) == col("MainDate"), lit(c)) for c in df.columns[1:]])
).show()


+----------+----------+----------+----------+----------+-----+
|  MainDate|     Date1|     Date2|     Date3|     Date4| REAL|
+----------+----------+----------+----------+----------+-----+
|2015-10-25|2015-09-25|2015-10-25|2015-11-25|2015-12-25|Date2|
|2012-07-16|2012-04-16|2012-05-16|2012-06-16|2012-07-16|Date4|
|2005-03-14|2005-07-14|2005-08-14|2005-09-14|2005-10-14| null|
+----------+----------+----------+----------+----------+-----+

位置在哪里

when(col(c) == col("MainDate"), lit(c))

如果有匹配项,则返回列名 (lit(c)),否则返回 NULL

这比使用 udf 或转换为 RDD 要快得多。


2
你可以将数据框转换为RDD,通过检查与MainDate匹配的日期列,在每个Row中添加一个新字段:
df = spark.read.option("header", True).option("inferSchema", True).csv("test.csv")
from pyspark.sql import Row
from pyspark.sql.types import StringType

# get the list of columns you want to compare with MainDate
dates = [col for col in df.columns if col.startswith('Date')]

# for each row loop through the dates column and find the match, if nothing matches, return None
rdd = df.rdd.map(lambda row: row + Row(REAL = next((col for col in dates if row[col] == row['MainDate']), None)))

# recreate the data frame from the rdd
spark.createDataFrame(rdd, df.schema.add("REAL", StringType(), True)).show()
+--------------------+--------------------+--------------------+--------------------+--------------------+-----+
|            MainDate|               Date1|               Date2|               Date3|               Date4| REAL|
+--------------------+--------------------+--------------------+--------------------+--------------------+-----+
|2015-10-25 00:00:...|2015-09-25 00:00:...|2015-10-25 00:00:...|2015-11-25 00:00:...|2015-12-25 00:00:...|Date2|
|2012-07-16 00:00:...|2012-04-16 00:00:...|2012-05-16 00:00:...|2012-06-16 00:00:...|2012-07-16 00:00:...|Date4|
|2005-03-14 00:00:...|2005-07-14 00:00:...|2005-08-14 00:00:...|2005-09-14 00:00:...|2005-10-14 00:00:...| null|
+--------------------+--------------------+--------------------+--------------------+--------------------+-----+

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接