复杂数据集分割 - 分层组随机分割

27
我有一个包含约2百万条观测数据的数据集,需要按照60:20:20的比例将其分成训练集、验证集和测试集。我的数据集简化后的摘录如下:
+---------+------------+-----------+-----------+
| note_id | subject_id | category  |   note    |
+---------+------------+-----------+-----------+
|       1 |          1 | ECG       | blah ...  |
|       2 |          1 | Discharge | blah ...  |
|       3 |          1 | Nursing   | blah ...  |
|       4 |          2 | Nursing   | blah ...  |
|       5 |          2 | Nursing   | blah ...  |
|       6 |          3 | ECG       | blah ...  |
+---------+------------+-----------+-----------+

有多个类别,它们不是平衡的,因此我需要确保训练、验证和测试集中所有类别的比例与原始数据集中的比例相同。这部分很好解决,我可以使用来自sklearn库的StratifiedShuffleSplit。
然而,我还需要确保每个主题的观察结果没有分散在训练、验证和测试数据集中。为了确保我的训练模型在验证/测试时从未见过该主题,给定主题中的所有观测结果都需要在同一个集合中。例如,主题ID 1的每个观测结果应该在训练集中。
我无法想出一种方法来确保通过类别分层拆分,防止跨数据集污染(欠缺更好的说法),确保60:20:20的拆分并确保数据集以某种方式被混洗。感谢任何帮助!

编辑:

我现在已经学会了通过sklearnGroupShuffleSplit函数按类别分组并在数据集拆分时保持组在一起。因此,我需要的是一个结合了分层和分组洗牌拆分的方法,即不存在的StratifiedGroupShuffleSplit。Github问题:https://github.com/scikit-learn/scikit-learn/issues/12076

7个回答

6
这在scikit-learn 1.0中使用的StratifiedGroupKFold已经解决。
在此示例中,您可以在洗牌后生成3个折叠,保持组在一起并进行分层(尽可能地)。
import numpy as np
from sklearn.model_selection import StratifiedGroupKFold

X = np.ones((30, 2))
y = np.array([0, 0, 1, 1, 1, 1, 1, 1, 0, 0,
              0, 0, 0, 0, 0, 0, 0, 1, 1, 1,
              1, 1, 1, 0, 0, 0, 0, 1, 1, 1,])
groups = np.array([1, 1, 2, 2, 3, 3, 3, 4, 5, 5,
                   5, 5, 6, 6, 7, 8, 8, 9, 9, 9,
                   10, 11, 11, 12, 12, 12, 13, 13,
                   13, 13])
print("ORIGINAL POSITIVE RATIO:", y.mean())
cv = StratifiedGroupKFold(n_splits=3, shuffle=True)
for fold, (train_idxs, test_idxs) in enumerate(cv.split(X, y, groups)):
    print("Fold :", fold)
    print("TRAIN POSITIVE RATIO:", y[train_idxs].mean())
    print("TEST POSITIVE RATIO :", y[test_idxs].mean())
    print("TRAIN GROUPS        :", set(groups[train_idxs]))
    print("TEST GROUPS         :", set(groups[test_idxs]))

在输出结果中,您可以看到各折的阳性病例比例接近原始阳性比例,同一组从未同时出现在两个集合中。当然,您拥有的小/大组数越少(即您的班级不平衡程度越高),就越难保持接近原始班级分布。

ORIGINAL POSITIVE RATIO: 0.5
Fold : 0
TRAIN POSITIVE RATIO: 0.4375
TEST POSITIVE RATIO : 0.5714285714285714
TRAIN GROUPS        : {1, 3, 4, 5, 6, 7, 10, 11}
TEST GROUPS         : {2, 8, 9, 12, 13}
Fold : 1
TRAIN POSITIVE RATIO: 0.5
TEST POSITIVE RATIO : 0.5
TRAIN GROUPS        : {2, 4, 5, 7, 8, 9, 11, 12, 13}
TEST GROUPS         : {1, 10, 3, 6}
Fold : 2
TRAIN POSITIVE RATIO: 0.5454545454545454
TEST POSITIVE RATIO : 0.375
TRAIN GROUPS        : {1, 2, 3, 6, 8, 9, 10, 12, 13}
TEST GROUPS         : {11, 4, 5, 7}

4
这非常接近我需要的,但仅适用于交叉验证(即每个拆分的大小相同,而不能配置为例如60:20:20)。 - amin_nejad
4
是的,这是用于交叉验证。但你仍然可以通过一些滥用来获得所需比例的单个分割:设置 n_splits=int(1/desired_test_ratio)。之后,你可以随机选择任何一个创建的折叠。如果你还想要训练-验证分割,可以在相应的训练分割中重复该过程,并使用相应的 desired_val_ratio - Juan Manuel Ortiz

5

这已经超过一年了,但我发现自己处于一个类似的情况,我有标签和分组,由于分组的特性,数据点中的一个组可以是仅在测试集中或仅在训练集中,我编写了一个小算法,使用pandas和sklearn,希望这可以帮到你。

from sklearn.model_selection import GroupShuffleSplit
groups = df.groupby('label')
all_train = []
all_test = []
for group_id, group in groups:
    # if a group is already taken in test or train it must stay there
    group = group[~group['groups'].isin(all_train+all_test)]
    # if group is empty 
    if group.shape[0] == 0:
        continue
    train_inds, test_inds = next(GroupShuffleSplit(
        test_size=valid_size, n_splits=2, random_state=7).split(group, groups=group['groups']))

    all_train += group.iloc[train_inds]['groups'].tolist()
    all_test += group.iloc[test_inds]['groups'].tolist()



train= df[df['groups'].isin(all_train)]
test= df[df['groups'].isin(all_test)]

form_train = set(train['groups'].tolist())
form_test = set(test['groups'].tolist())
inter = form_train.intersection(form_test)

print(df.groupby('label').count())
print(train.groupby('label').count())
print(test.groupby('label').count())
print(inter) # this should be empty

4
基本上我需要一个不存在的 StratifiedGroupShuffleSplit 函数 (Github issue)。这是因为这样一个函数的行为不清楚,并且实现既分组又分层以产生数据集并不总是可能的 (也在此讨论过) - 特别是对于像我这样严重失衡的数据集。在我的情况下,我想要严格执行分组以确保没有任何组之间的重叠,同时分层和数据集比例拆分为 60:20:20,即尽可能好。

正如 Ghanem 所提到的,我别无选择,只能自己构建一个函数来拆分数据集,我已经在下面完成了:

def StratifiedGroupShuffleSplit(df_main):

    df_main = df_main.reindex(np.random.permutation(df_main.index)) # shuffle dataset

    # create empty train, val and test datasets
    df_train = pd.DataFrame()
    df_val = pd.DataFrame()
    df_test = pd.DataFrame()

    hparam_mse_wgt = 0.1 # must be between 0 and 1
    assert(0 <= hparam_mse_wgt <= 1)
    train_proportion = 0.6 # must be between 0 and 1
    assert(0 <= train_proportion <= 1)
    val_test_proportion = (1-train_proportion)/2

    subject_grouped_df_main = df_main.groupby(['subject_id'], sort=False, as_index=False)
    category_grouped_df_main = df_main.groupby('category').count()[['subject_id']]/len(df_main)*100

    def calc_mse_loss(df):
        grouped_df = df.groupby('category').count()[['subject_id']]/len(df)*100
        df_temp = category_grouped_df_main.join(grouped_df, on = 'category', how = 'left', lsuffix = '_main')
        df_temp.fillna(0, inplace=True)
        df_temp['diff'] = (df_temp['subject_id_main'] - df_temp['subject_id'])**2
        mse_loss = np.mean(df_temp['diff'])
        return mse_loss

    i = 0
    for _, group in subject_grouped_df_main:

        if (i < 3):
            if (i == 0):
                df_train = df_train.append(pd.DataFrame(group), ignore_index=True)
                i += 1
                continue
            elif (i == 1):
                df_val = df_val.append(pd.DataFrame(group), ignore_index=True)
                i += 1
                continue
            else:
                df_test = df_test.append(pd.DataFrame(group), ignore_index=True)
                i += 1
                continue

        mse_loss_diff_train = calc_mse_loss(df_train) - calc_mse_loss(df_train.append(pd.DataFrame(group), ignore_index=True))
        mse_loss_diff_val = calc_mse_loss(df_val) - calc_mse_loss(df_val.append(pd.DataFrame(group), ignore_index=True))
        mse_loss_diff_test = calc_mse_loss(df_test) - calc_mse_loss(df_test.append(pd.DataFrame(group), ignore_index=True))

        total_records = len(df_train) + len(df_val) + len(df_test)

        len_diff_train = (train_proportion - (len(df_train)/total_records))
        len_diff_val = (val_test_proportion - (len(df_val)/total_records))
        len_diff_test = (val_test_proportion - (len(df_test)/total_records)) 

        len_loss_diff_train = len_diff_train * abs(len_diff_train)
        len_loss_diff_val = len_diff_val * abs(len_diff_val)
        len_loss_diff_test = len_diff_test * abs(len_diff_test)

        loss_train = (hparam_mse_wgt * mse_loss_diff_train) + ((1-hparam_mse_wgt) * len_loss_diff_train)
        loss_val = (hparam_mse_wgt * mse_loss_diff_val) + ((1-hparam_mse_wgt) * len_loss_diff_val)
        loss_test = (hparam_mse_wgt * mse_loss_diff_test) + ((1-hparam_mse_wgt) * len_loss_diff_test)

        if (max(loss_train,loss_val,loss_test) == loss_train):
            df_train = df_train.append(pd.DataFrame(group), ignore_index=True)
        elif (max(loss_train,loss_val,loss_test) == loss_val):
            df_val = df_val.append(pd.DataFrame(group), ignore_index=True)
        else:
            df_test = df_test.append(pd.DataFrame(group), ignore_index=True)

        print ("Group " + str(i) + ". loss_train: " + str(loss_train) + " | " + "loss_val: " + str(loss_val) + " | " + "loss_test: " + str(loss_test) + " | ")
        i += 1

    return df_train, df_val, df_test

df_train, df_val, df_test = StratifiedGroupShuffleSplit(df_main)

我创建了一个基于两个方面的任意损失函数:

  1. 每个类别的百分比表示与整个数据集相比的平均平方差
  2. 数据集的比例长度与提供的比率(60:20:20)应该是什么之间的平方差

通过静态超参数hparam_mse_wgt对这两个输入进行加权处理。对于我的特定数据集,值为0.1效果很好,但如果您使用此功能,我建议您尝试一下。将其设置为0将仅优先维护拆分比率并忽略分层。将其设置为1将反之。

使用此损失函数,然后迭代每个主题(组),并根据具有最高损失函数的适当数据集(训练、验证或测试)将其附加到其中。

它并不特别复杂,但对我来说已经足够了。它不一定适用于每个数据集,但它越大,成功的机会就越大。希望其他人也能发现它有用。


2

我刚刚解决了同样的问题。在我的文档处理用例中,我希望来自同一页的单词可以黏在一起(分组),而文档类别应该在训练和测试集中均匀分布(分层)。对于我的问题,对于一个组的所有实例,我们都有相同的分层类别,即来自同一页的所有单词属于同一类别。因此,我发现直接在组上执行分层拆分最容易,然后使用拆分组选择实例。但是,如果这个假设不成立,这个解决方案就不适用。

from typing import Tuple

import pandas as pd
from sklearn.model_selection import train_test_split


def stratified_group_train_test_split(
    samples: pd.DataFrame, group: str, stratify_by: str, test_size: float
) -> Tuple[pd.DataFrame, pd.DataFrame]:
    groups = samples[group].drop_duplicates()
    stratify = samples.drop_duplicates(group)[stratify_by].to_numpy()
    groups_train, groups_test = train_test_split(groups, stratify=stratify, test_size=test_size)

    samples_train = samples.loc[lambda d: d[group].isin(groups_train)]
    samples_test = samples.loc[lambda d: d[group].isin(groups_test)]

    return samples_train, samples_test

0

我认为在这种情况下,您必须构建自己的函数来拆分数据。 以下是我的实现:

def split(df, based_on='subject_id', cv=5):
    splits = []
    based_on_uniq = df[based_on]#set(df[based_on].tolist())
    based_on_uniq = np.array_split(based_on_uniq, cv)
    for fold in based_on_uniq:
        splits.append(df[df[based_on] == fold.tolist()[0]])
    return splits


if __name__ == '__main__':
    df = pd.DataFrame([{'note_id': 1, 'subject_id': 1, 'category': 'test1', 'note': 'test1'},
                       {'note_id': 2, 'subject_id': 1, 'category': 'test2', 'note': 'test2'},
                       {'note_id': 3, 'subject_id': 2, 'category': 'test3', 'note': 'test3'},
                       {'note_id': 4, 'subject_id': 2, 'category': 'test4', 'note': 'test4'},
                       {'note_id': 5, 'subject_id': 3, 'category': 'test5', 'note': 'test5'},
                       {'note_id': 6, 'subject_id': 3, 'category': 'test6', 'note': 'test6'},
                       {'note_id': 7, 'subject_id': 4, 'category': 'test7', 'note': 'test7'},
                       {'note_id': 8, 'subject_id': 4, 'category': 'test8', 'note': 'test8'},
                       {'note_id': 9, 'subject_id': 5, 'category': 'test9', 'note': 'test9'},
                       {'note_id': 10, 'subject_id': 5, 'category': 'test10', 'note': 'test10'},
                       ])
    print(split(df))

我觉得你可能是对的。感谢提供返回分割点的起始代码。我想问题在于使用库提供的函数能让多少工作变得更简单。我会尝试看看自己能做到什么。 - amin_nejad

0

正如其他人之前所评论的那样:StratifiedGroupShuffleSplit不存在,因为您可能无法保证分组拆分将具有每个类别的相似实例数量。

但是,您可以选择一个愚蠢但痛苦易行的解决方案,最终提供足够好的解决方案:

  1. 使用设置了随机状态的GroupShuffleSplit(例如:GroupShuffleSplit(n_splits=1, test_size=0.3, random_state=0)
  2. 计算每个拆分中类别之间的平衡。
  3. 如果不满意,只需再次运行并将random_state设置为另一个值。
  4. 继续直到拆分足够好。

这种方法显然最适合少量拆分和二进制标签。


0
在我的情况下,我假设同一组中的样本具有相同的标签。因此,我将StratifiedShuffleSplitGroupShuffleSplit结合起来使用,就像这样。
class StratifiedGroupShuffleSplit(StratifiedShuffleSplit):
    """
    Note there is an assumption that the samples in a same group have a same label.
    """
    def __init__(
        self, n_splits = 10, *, test_size = None, 
        train_size = None, random_state = None
    ):
        super().__init__(
            n_splits = n_splits,
            test_size = test_size,
            train_size = train_size,
            random_state = random_state,
        )
        self._default_test_size = 0.1

    def _iter_indices(self, X, y, groups = None):
        if groups is None:
            raise ValueError("The 'groups' parameter should not be None.")
        groups = check_array(groups, input_name = "groups", ensure_2d = False, dtype = None)
        classes, group_indices = np.unique(groups, return_inverse = True)
        stratify = np.array([y[indices[0]] for indices in group_indices])

        for group_train, group_test in super()._iter_indices(X = classes, y = stratify):
            # these are the indices of classes in the partition
            # invert them into data indices

            train = np.flatnonzero(np.in1d(group_indices, group_train))
            test = np.flatnonzero(np.in1d(group_indices, group_test))

            yield train, test

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接