每次运行结果不同(pyspark)

6

我有一个数据框,是多个连接的结果。我想要查找重复项。但每次我进行调查时,数据框看起来都不同。特别是下面的命令会导致不同的IDs,但结果数量保持不变。

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
import pyspark.sql.functions as f
from pyspark.sql.functions import lit

# Create a Spark session
spark = SparkSession.builder.appName("CreateDataFrame").getOrCreate()

# User input for number of rows
n_a = 10
n_a_c = 5
n_a_c_d = 3
n_a_c_e = 4

# Define the schema for the DataFrame
schema_a = StructType([StructField("id1", StringType(), True)])
schema_a_b = StructType(
    [
        StructField("id1", StringType(), True),
        StructField("id2", StringType(), True),
        StructField("extra", StringType(), True),
    ]
)
schema_a_c = StructType(
    [
        StructField("id1", StringType(), True),
        StructField("id3", StringType(), True),
    ]
)
schema_a_c_d = StructType(
    [
        StructField("id3", StringType(), True),
        StructField("id4", StringType(), True),
    ]
)
schema_a_c_e = StructType(
    [
        StructField("id3", StringType(), True),
        StructField("id5", StringType(), True),
    ]
)

# Create a list of rows with increasing integer values for "id1" and a constant value of "1" for "id2"
rows_a = [(str(i),) for i in range(1, n_a + 1)]
rows_a_integers = [str(i) for i in range(1, n_a + 1)]
rows_a_b = [(str(i), str(1), "A") for i in range(1, n_a + 1)]


def get_2d_list(ids_part_1: list, n_new_ids: int):
    rows = [
        [
            (str(i), str(i) + "_" + str(j))
            for i in ids_part_1
            for j in range(1, n_new_ids + 1)
        ]
    ]
    return [item for sublist in rows for item in sublist]


rows_a_c = get_2d_list(ids_part_1=rows_a_integers, n_new_ids=n_a_c)
rows_a_c_d = get_2d_list(ids_part_1=[i[1] for i in rows_a_c], n_new_ids=n_a_c_d)
rows_a_c_e = get_2d_list(ids_part_1=[i[1] for i in rows_a_c], n_new_ids=n_a_c_e)

# Create the DataFrame
df_a = spark.createDataFrame(rows_a, schema_a)
df_a_b = spark.createDataFrame(rows_a_b, schema_a_b)
df_a_c = spark.createDataFrame(rows_a_c, schema_a_c)
df_a_c_d = spark.createDataFrame(rows_a_c_d, schema_a_c_d)
df_a_c_e = spark.createDataFrame(rows_a_c_e, schema_a_c_e)

# Join everything
df_join = (
    df_a.join(df_a_b, on="id1")
    .join(df_a_c, on="id1")
    .join(df_a_c_d, on="id3")
    .join(df_a_c_e, on="id3")
)

# Nested structure
# show
df_nested = df_join.withColumn("id3", f.struct(f.col("id3")))

for i, index in enumerate([(5, 3), (4, 3), (3, None)]):
    remaining_columns = list(set(df_nested.columns).difference(set([f"id{index[0]}"])))
    df_nested = (
        df_nested.groupby(*remaining_columns)
        .agg(f.collect_list(f.col(f"id{index[0]}")).alias(f"id{index[0]}_tmp"))
        .drop(f"id{index[0]}")
        .withColumnRenamed(
            f"id{index[0]}_tmp",
            f"id{index[0]}",
        )
    )

    if index[1]:
        df_nested = df_nested.withColumn(
            f"id{index[1]}",
            f.struct(
                f.col(f"id{index[1]}.*"),
                f.col(f"id{index[0]}"),
            ).alias(f"id{index[1]}"),
        ).drop(f"id{index[0]}")

# Investigate for duplicates in id3 (should be unique)
df_test = df_nested.select("id2", "extra", f.explode(f.col("id3")["id3"]).alias("id3"))

for i in range(5):
    df_test.groupby("id3").count().filter(f.col("count") > 1).show()

最后一个命令在我的两种情况下打印出不同的结果。有时候:

+---+-----+
|id3|count|
+---+-----+
|6_4|    2|
+---+-----+

有时候

+---+-----+
|id3|count|
+---+-----+
|9_3|    2|
+---+-----+

如果有帮助的话,我使用的是Databricks Runtime版本11.3 LTS(包括Apache Spark 3.3.0、Scala 2.12)。
此外,根据代码的设计,据我所知不应该有任何重复项。找到的重复似乎是一个错误!?
也许这可以作为一个潜在的证明,说明连接不会导致任何重复项:
df_join.groupby("id3", "id4", "id5").count().filter(f.col("count") > 1).show()

为空


1
你能分享一下 df 是如何定义的吗?Pyspark 是惰性求值的,所以如果在此之前有任何不确定性的因素,可能会导致每次得到不同的结果。 - s_pike
3
洗牌总是发生在groupBy时 - 这不是你选择的。如果你想每次得到相同的结果,可以尝试放置一个orderBy。但如果它们存在,我不确定为什么它不给你更多。 - s_pike
1
这是一个有趣的问题。你能在你的问题中分享一些可重现的案例吗? - glory9211
1
你代码片段中唯一奇怪的地方是你使用了 f.col('count') 而不是 df.col('count'),但我从未因此遇到过无法重现的行为。df.groupby('id').count().filter(f.col('count')>1).explain(True) 输出了什么? - Bernhard Stadler
2
对我来说,这段代码是确定性的,应该始终返回相同的结果。我怀疑问题出现在这行代码之前,Spark 中的某些函数不是确定性的,可能会给出不同的结果,例如 first、ranks 等。你应该提供可重现的代码/数据并将其添加到此处。 - Abdennacer Lachiheb
显示剩余12条评论
1个回答

3

"id3"的构建方式是随机的,因此每次执行都会得到不同的结果,您需要定义一个orderBy()来获得相同的结果,因此在该列上添加一个orderBy(),如下所示:

df_nested = df_join.withColumn("id3", f.struct(f.col("id3"))).orderBy("id3")

现在,您将始终获得相同的结果,进行多次执行。

请记住,Spark评估是惰性的,因此Dag将在每个操作中重新构建,在本例中是show()。

因此,如果您的代码不确定性,它将每次给出不同的输出。


这有助于第一个问题,现在结果是一致的。然而,结果表明id3 ==“8_3”存在两次。根据代码的设计,这不应该可能。 - Lazloo Xp
你认为我的代码中哪一部分是“随意构建”的? - Lazloo Xp
我应该开一个新的问题来处理重复的内容吗? - Lazloo Xp

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接