基于列值复制Pyspark行

3

我希望能根据每行的某一列的值,复制整个数据框中的所有行,并为每个新行建立索引。假设我有以下数据框:

Column A Column B
T1       3
T2       2

I want the result to be:

Column A Column B Index
T1       3        1
T1       3        2
T1       3        3
T2       2        1
T2       2        2

我能够使用固定值来做类似的事情,但是不能使用列上找到的信息。我目前使用的固定值工作代码如下:

idx = [lit(i) for i in range(1, 10)]
df = df.withColumn('Index', explode(array( idx ) ))

我试图进行更改:

lit(i) for i in range(1, 10) 

to

lit(i) for i in range(1, df['Column B'])

并将其添加到我的array()函数中:

df = df.withColumn('Index', explode(array( lit(i) for i in range(1, df['Column B']) ) ))

但是它不起作用(TypeError:'Column'对象无法被解释为整数)。

我应该如何实现它?

2个回答

3

很遗憾,你不能像那样迭代列。你可以随时使用udf,但是如果你使用的是Spark版本2.1或更高版本,则我有一个非hack的解决方案适用于你。

诀窍在于利用pyspark.sql.functions.posexplode()来获取索引值。我们通过创建一个由逗号重复Column B次的字符串来实现这一点。然后我们在逗号上拆分此字符串,并使用posexplode获取索引。

df.createOrReplaceTempView("df")  # first register the DataFrame as a temp table

query = 'SELECT '\
    '`Column A`,'\
    '`Column B`,'\
    'pos AS Index '\
    'FROM ( '\
        'SELECT DISTINCT '\
        '`Column A`,'\
        '`Column B`,'\
        'posexplode(split(repeat(",", `Column B`), ",")) '\
        'FROM df) AS a '\
    'WHERE a.pos > 0'
newDF = sqlCtx.sql(query).sort("Column A", "Column B", "Index")
newDF.show()
#+--------+--------+-----+
#|Column A|Column B|Index|
#+--------+--------+-----+
#|      T1|       3|    1|
#|      T1|       3|    2|
#|      T1|       3|    3|
#|      T2|       2|    1|
#|      T2|       2|    2|
#+--------+--------+-----+

注意:由于列名中包含空格,因此需要使用反引号将列名括起来,如在此帖子中所解释的那样:如何在Spark SQL中表示名称包含空格的列

不错的黑客技术@pault。但是我有一个疑问,为什么Repeat中的“B列”被视为原始列而非SQL表达式中的列?在API表单中它被视为列,对吧? - Ramesh Maharjan
@Ramesh 我曾试过使用DataFrame函数来实现这个,但是一直无法成功。我不知道为什么这个查询却能成功。 - pault
@RameshMaharjan 我发布了一个问题,询问这种行为。 - pault

0
You can try this:

    from pyspark.sql.window import Window
    from pyspark.sql.functions import *
    from pyspark.sql.types import ArrayType, IntegerType
    from pyspark.sql import functions as F
    df = spark.read.csv('/FileStore/tables/stack1.csv', header = 'True', inferSchema = 'True')

    w = Window.orderBy("Column A")
    df = df.select(row_number().over(w).alias("Index"), col("*"))

    n_to_array = udf(lambda n : [n] * n ,ArrayType(IntegerType()))
    df2 = df.withColumn('Column B', n_to_array('Column B'))
    df3= df2.withColumn('Column B', explode('Column B'))
    df3.show()

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接