如何在Pyspark中从Spark数据框创建边缘列表?

3

我正在使用 graphframes 在 pyspark 中进行一些图形类型的分析,想知道从顶点数据框创建边数据框的最佳方法是什么。

例如,下面是我的顶点数据框。我有一个 ID 列表,它们属于不同的组。

+---+-----+
|id |group|
+---+-----+
|a  |1    |
|b  |2    |
|c  |1    |
|d  |2    |
|e  |3    |
|a  |3    |
|f  |1    |
+---+-----+

我的目标是创建一个边缘列表数据框,以指示在共同组中出现的id。 请注意,一个id可能出现在多个组中(例如,上面的id a出现在组1和3中)。 下面是我想要得到的边缘列表数据框:

+---+-----+-----+
|src|dst  |group|
+---+-----+-----+
|a  |c    |1    |
|a  |f    |1    |
|c  |f    |1    |
|b  |d    |2    |
|a  |e    |3    |
+---+-----+-----+

感谢您的提前预约!

1
如果再添加一行 (id='f', group=1),我们如何知道哪个 id 是 src,哪个是 dst?是否有其他列来为每个组的 id 进行排序? - jxc
@jxc 这是一个很好的观点。请参见上面的新示例,包括id = 'f'和group = 1。在我的情况下,src和dst的顺序不一定要固定。只要同一组中的2个id可以显示在同一行中,就可以满足需求。 - MAMS
@jxc 我正在使用 Spark 2.3。 - MAMS
2
只需执行自连接操作:df.alias('d1').join(df.alias('d2'), ['group']).filter("d1.id < d2.id").toDF("group", "src", "dst") - jxc
@jxc 我认为你应该把这个作为答案发布。它比其他两个答案更直接了当。你的解决方案只是在最后缺少distinct()(如果我们有两个实例(1,a),它会给我们重复的行)。 - pegah
2个回答

5

编辑1

不确定这是否是更好的解决方法,但我做了一个变通:

import pyspark.sql.functions as f

df = df.withColumn('match', f.collect_set('id').over(Window.partitionBy('group')))

df = df.select(f.col('id').alias('src'),
               f.explode('match').alias('dst'),
               f.col('group'))

df = df.withColumn('duplicate_edges', f.array_sort(f.array('src', 'dst')))
df = (df
      .where(f.col('src') != f.col('dst'))
      .drop_duplicates(subset=['duplicate_edges'])
      .drop('duplicate_edges'))

df.sort('group', 'src', 'dst').show()

输出

+---+---+-----+
|src|dst|group|
+---+---+-----+
|  a|  c|    1|
|  a|  f|    1|
|  c|  f|    1|
|  b|  d|    2|
|  e|  a|    3|
+---+---+-----+

原始回答

尝试这个:

import pyspark.sql.functions as f

df = (df
      .groupby('group')
      .agg(f.first('id').alias('src'),
           f.last('id').alias('dst')))

df.show()

输出:

+-----+---+---+
|group|src|dst|
+-----+---+---+
|    1|  a|  c|
|    3|  e|  a|
|    2|  b|  d|
+-----+---+---+

3
@Kafels 的建议完全正确。不过,在您的代码开头不要忘记包含以下内容:import pyspark.sql.functions as f - Marioanzas
谢谢你们两个的回答,这是一个很好的方法!唯一缺少的是当我在同一组中有超过2个id时,只有第一个和最后一个id会显示为src和dst,但其他的会被忽略。例如,正如@jxc在评论中提到的那样,如果我们有另一个记录id='f'和group=1,我希望在结果数据框中出现组1中的a、c、f。而src和dst的顺序并不重要。我已经更新了我的问题示例,你能想到处理它的方法吗?谢谢! - MAMS

3

您可以进行自连接:

df = df.toDF('src', 'group')
df2 = df.toDF('dst', 'group2')

result = df.join(
    df2,
    (df.group == df2.group2) & (df.src < df2.dst)
).select('src', 'dst', 'group').distinct().orderBy('group', 'src', 'dst')

result.show()
+---+---+-----+
|src|dst|group|
+---+---+-----+
|  a|  c|    1|
|  a|  f|    1|
|  c|  f|    1|
|  b|  d|    2|
|  a|  e|    3|
+---+---+-----+

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接