Apache Spark中的Dataframe示例 | Scala

30
我试图从两个数据框中提取样本,其中我需要维护计数比率。例如:
df1.count() = 10
df2.count() = 1000

noOfSamples = 10

我希望以这样的方式对数据进行抽样,以获得每个大小为101的样本10个,其中1个来自df1,100个来自df2。

在这样做时,

var newSample = df1.sample(true, df1.count() / noOfSamples)
println(newSample.count())

这里的分数意味着什么?它可以大于1吗?我查阅了这个这个链接,但无法完全理解。

此外,我们是否可以指定要抽样的行数?

7个回答

43

fraction 参数表示将返回的数据集的大约分数。例如,如果将其设置为0.1,则会返回10%(1/10)的行。针对您的情况,我认为您想要执行以下操作:

val newSample = df1.sample(true, 1D*noOfSamples/df1.count)

然而,您可能会注意到每次运行 newSample.count 时会返回不同的数字,这是因为 fraction 将成为一个随机生成值的阈值(如此处所示),因此生成的数据集大小可能会有所变化。一种解决方法是:

val newSample = df1.sample(true, 2D*noOfSamples/df1.count).limit(df1.count/noOfSamples)

一些可扩展性观察

您可能会注意到执行 df1.count 可能会很昂贵,因为它会评估整个 DataFrame,并且您将失去第一次采样的部分好处。

因此,根据您的应用程序上下文,您可能希望使用已知的总样本数量或其近似值。

val newSample = df1.sample(true, 1D*noOfSamples/knownNoOfSamples)

或者假设你的DataFrame非常大,我仍然会使用fraction并使用limit来强制取样数量。

val guessedFraction = 0.1
val newSample = df1.sample(true, guessedFraction).limit(noOfSamples)

关于你的问题:

可以大于1吗?

不行。它代表0和1之间的分数。如果将其设置为1,它会获取100%的行,因此将其设置为大于1的数字是没有意义的。

另外,我们有没有办法指定要抽样的行数?

您可以指定一个比所需行数更大的分数,然后使用limit,就像我在第二个示例中展示的那样。也许还有其他方法,但这是我使用的方法。


虽然我更清楚分数的作用,但你提出的解决方案可能在当前情况下不起作用(请参见编辑)。df2.sample(true,noOfSample / df2.count).limit(df2.count/noOfSamples)会给我1%的数据,即10行,这将少于我需要的100行。对此有什么想法吗? - hbabbar
1
@hbabbar 所以你总是想要每个数据集的10%吗?如果是这样,你可以使用 df2.sample(true, 0.15).limit(0.1*df2.count) - Daniel de Paula
很棒的解决方案! - matanster
我正在运行Spark 3.0.2,分数可以大于1(假设采样时有放回),不确定这个什么时候发生了变化。下面的答案也清楚地解释了这一点。 - A Person

5
为了回答你的问题,我们有没有办法指定要抽样的行数?
我最近需要从Spark数据框架中抽取一定数量的行。我按照以下步骤进行:
1. 将Spark数据框转换为RDD。 例如:df_test.rdd 2. RDD具有称为takeSample的功能,允许您提供所需的样本数量和种子号。 例如:df_test.rdd.takeSample(withReplacement, Number of Samples, Seed) 3. 使用sqlContext.createDataFrame()将RDD转换回Spark数据框。
以上过程合并为单个步骤:
要从中抽样的数据框(或人口)约有8,000条记录: df_grp_1
df_grp_1
test1 = sqlContext.createDataFrame(df_grp_1.rdd.takeSample(False,125,seed=115))

test1数据框将有125个样本记录。


3

回答是否可以有大于1的分数。是的,如果我们将replace设置为yes,它可以超过1。如果提供一个大于1的值,并且replace为false,则会出现以下异常:

java.lang.IllegalArgumentException: requirement failed: Upper bound (2.0) must be <= 1.0.

2
以下代码适用于对数据框df进行70%和30%的随机拆分:
val Array(trainingDF, testDF) = df.randomSplit(Array(0.7, 0.3), seed = 12345)

2

我也发现 缺乏按计数的样本 功能令人不安。如果您对创建临时视图不挑剔,我认为下面的代码很有用(df是您的数据框,count是样本大小):

val tableName = s"table_to_sample_${System.currentTimeMillis}"
df.createOrReplaceTempView(tableName)
val sampled = sqlContext.sql(s"select *, rand() as random from ${tableName} order by random limit ${count}")
sqlContext.dropTempTable(tableName)
sampled.drop("random")

只要您的当前行数不小于样本大小,它将返回准确计数。

如果我没记错的话,您可以使用DataFrame API执行完全相同的查询,无需创建临时视图并使用SQL查询。但是,这将触发排序操作,我认为它比使用样本要慢得多。 - Daniel de Paula

0

当需要精确数量的记录进行随机抽样时,我使用这个函数:

def row_count_sample (df, row_count, with_replacement=False, random_seed=113170):

    ratio = 1.08 * float(row_count) / df.count()  # random-sample more as dataframe.sample() is not a guaranteed to give exact record count
                                                  # it could be more or less actual number of records returned by df.sample()

    if ratio>1.0:
        ratio = 1.0

    result_df = (df
                    .sample(with_replacement, ratio, random_seed)
                    .limit(row_count)                                   # since we oversampled, make exact row count here
                )

    return result_df 

-3

也许你想尝试下面的代码...

val splits = data.randomSplit(Array(0.7, 0.3))
val (trainingData, testData) = (splits(0), splits(1))

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接