Pyspark:使用带条件的window()函数收集列表collect_list

6
我有以下测试数据:
import pandas as pd
import datetime

data = {'date': ['2014-01-01', '2014-01-02', '2014-01-03', '2014-01-04', '2014-01-05', '2014-01-06'],
     'customerid': [2, 2, 2, 3, 4, 3], 'names': ['Andrew', 'Pete', 'Sean', 'Steve', 'Ray', 'Stef'], 'PaymentType': ['OI', 'CC', 'CC', 'OI', 'OI', 'OI']}
data = pd.DataFrame(data)
data['date'] = pd.to_datetime(data['date'])

以下代码会返回在两天内有匹配的客户ID的名称列表:
import pandas as pd
import datetime

from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.sql import SparkSession


spark = SparkSession.builder \
    .master('local[*]') \
    .config("spark.driver.memory", "500g") \
    .appName('my-pandasToSparkDF-app') \
    .config("spark.ui.showConsoleProgress", "false")\
    .getOrCreate()
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
spark.conf.set('spark.sql.execution.arrow.maxRecordsPerBatch', 50000)
spark.sparkContext.setLogLevel("OFF")


data = {'date': ['2014-01-01', '2014-01-02', '2014-01-03', '2014-01-04', '2014-01-05', '2014-01-06'],
     'customerid': [2, 2, 2, 3, 4, 3], 'names': ['Andrew', 'Pete', 'Sean', 'Steve', 'Ray', 'Stef'], 'PaymentType': ['OI', 'CC', 'CC', 'OI', 'OI', 'OI']}
data = pd.DataFrame(data)
data['date'] = pd.to_datetime(data['date'])
spark_data= spark.createDataFrame(data)

win = Window().partitionBy('customerid').orderBy((F.col('date')).cast("long")).rangeBetween(
        -(2*86400), Window.currentRow)

result_frame = spark_data.withColumn("names_array", F.collect_list('names').over(win)).sort(F.col("date").asc())

pd_result_frame = result_frame.toPandas()

数据:

<pre>
date      |customerid|PaymentType|names 
2014-01-01|2         |OI         |Andrew
2014-01-02|2         |CC         |Pete 
2014-01-03|2         |CC         |Sean
2014-01-04|3         |OI         |Steve
2014-01-05|4         |OI         |Ray
2014-01-06|3         |OI         |Stef
</pre>

结果表格:

<pre>
date      |customerid|PaymentType|names_array|
2014-01-01|2         |OI         |['Andrew']
2014-01-02|2         |CC         |['Andrew', 'Pete'] 
2014-01-03|2         |CC         |['Andrew', 'Pete', 'Sean']
2014-01-04|3         |OI         |['Steve']
2014-01-05|4         |OI         |['Ray']
2014-01-06|3         |OI         |['Steve', 'Stef']
</pre>

现在我想介绍一个 F.collect_list 的条件。只有 PaymentType == 'OI' 的名称才会被收集到列表中。
最终,表格应该看起来像这样:
<pre>
date      |customerid|PaymentType|names_array|
2014-01-01|2         |OI         |['Andrew']
2014-01-02|2         |CC         |['Andrew'] 
2014-01-03|2         |CC         |['Andrew']
2014-01-04|3         |OI         |['Steve']
2014-01-05|4         |OI         |['Ray']
2014-01-06|3         |OI         |['Steve', 'Stef']
</pre>

谢谢!

1个回答

11
您可以在您的 collect_list 中放置一个 when/otherwise 条件句,只有当 PaymentType 是'OI'时才收集数据,否则收集 None。
spark_data.withColumn("names_array",\
                      F.collect_list(F.when(F.col("PaymentType")=='OI',F.col("names"))\
                      .otherwise(F.lit(None))).over(win)).sort(F.col("date").asc()).show()

#+-------------------+----------+------+-----------+-------------+
#|               date|customerid| names|PaymentType|  names_array|
#+-------------------+----------+------+-----------+-------------+
#|2014-01-01 00:00:00|         2|Andrew|         OI|     [Andrew]|
#|2014-01-02 00:00:00|         2|  Pete|         CC|     [Andrew]|
#|2014-01-03 00:00:00|         2|  Sean|         CC|     [Andrew]|
#|2014-01-04 00:00:00|         3| Steve|         OI|      [Steve]|
#|2014-01-05 00:00:00|         4|   Ray|         OI|        [Ray]|
#|2014-01-06 00:00:00|         3|  Stef|         OI|[Steve, Stef]|
#+-------------------+----------+------+-----------+-------------+

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接