时间序列数据交叉验证中使用的验证窗口前向步进

10
我希望能够在我的时间序列数据上执行前向验证。有大量文档介绍如何执行“滚动窗口”:

enter image description here

扩展窗口

enter image description here

但是这种验证并不符合我生产系统的要求:我希望每天重新训练一个模型,该模型将预测未来14天的情况。因此,我只需要在前一训练周期添加 一天 的数据(而其他方法则会在下一次训练中将整个长度为test_size的数据集添加到后续的训练折叠中; 在我的情况下为14天)。因此,我希望使用滑动窗口验证我的模型:

enter image description here

我的问题是我找不到一个能够完成这项工作的Python库。 sklearn中的TimeSeriesSplit没有这种选项。基本上,我想提供:
test_sizen_foldmin_train_size

如果n_fold > (n_samples - min_train_size) % test_size,那么下一个training_set将从前一个折叠test_set中提取数据。


我的回答解决了你的问题吗? - Venkatachalam
2个回答

8

看起来你的需求是将测试集大小增加到1个fold以上。要进行此更改,你需要调整这些行。

我已经进行了这些更改,并添加了一个名为n_test_folds的新参数,以便可以自定义它。

from sklearn.model_selection._split import TimeSeriesSplit
from sklearn.utils.validation import _deprecate_positional_args

from sklearn.utils import indexable
from sklearn.utils.validation import _num_samples

class WindowedTestTimeSeriesSplit(TimeSeriesSplit):
    """
    parameters
    ----------
    n_test_folds: int
        number of folds to be used as testing at each iteration.
        by default, 1.
    """
    @_deprecate_positional_args
    def __init__(self, n_splits=5, *, max_train_size=None, n_test_folds=1):
        super().__init__(n_splits, 
                         max_train_size=max_train_size)
        self.n_test_folds=n_test_folds

    def split(self, X, y=None, groups=None):
        """Generate indices to split data into training and test set.
        Parameters
        ----------
        X : array-like of shape (n_samples, n_features)
            Training data, where n_samples is the number of samples
            and n_features is the number of features.
        y : array-like of shape (n_samples,)
            Always ignored, exists for compatibility.
        groups : array-like of shape (n_samples,)
            Always ignored, exists for compatibility.
        Yields
        ------
        train : ndarray
            The training set indices for that split.
        test : ndarray
            The testing set indices for that split.
        """
        X, y, groups = indexable(X, y, groups)
        n_samples = _num_samples(X)
        n_splits = self.n_splits
        n_folds = n_splits + self.n_test_folds
        if n_folds > n_samples:
            raise ValueError(
                ("Cannot have number of folds ={0} greater"
                 " than the number of samples: {1}.").format(n_folds,
                                                             n_samples))
        indices = np.arange(n_samples)
        fold_size = (n_samples // n_folds)
        test_size = fold_size * self.n_test_folds # test window
        test_starts = range(fold_size + n_samples % n_folds,
                            n_samples-test_size+1, fold_size) # splits based on fold_size instead of test_size
        for test_start in test_starts:
            if self.max_train_size and self.max_train_size < test_start:
                yield (indices[test_start - self.max_train_size:test_start],
                       indices[test_start:test_start + test_size])
            else:
                yield (indices[:test_start],
                       indices[test_start:test_start + test_size])

例子:

import numpy as np
X = np.array([[1, 2], [3, 4], [1, 2], [3, 4], [1, 2], [3, 4]])
y = np.array([1, 2, 3, 4, 5, 6])
tscv = WindowedTestTimeSeriesSplit(n_splits=4, n_test_folds=2)
print(tscv)

for train_index, test_index in tscv.split(X):
    print("TRAIN:", train_index, "TEST:", test_index)
    X_train, X_test = X[train_index], X[test_index]
    y_train, y_test = y[train_index], y[test_index]

# WindowedTestTimeSeriesSplit(max_train_size=None, n_splits=4, n_test_folds=2)
# TRAIN: [0] TEST: [1 2]
# TRAIN: [0 1] TEST: [2 3]
# TRAIN: [0 1 2] TEST: [3 4]
# TRAIN: [0 1 2 3] TEST: [4 5]

注意: TRAIN: [0 1 2 3 4] TEST: [5] 没有生成,因为它不满足测试折数的要求。
使用此函数,我们可以可视化CV的不同分割方式。请参考此处
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.patches import Patch
np.random.seed(1338)
cmap_data = plt.cm.Paired
cmap_cv = plt.cm.coolwarm
n_splits = 4


# Generate the class/group data
n_points = 100
X = np.random.randn(100, 10)

percentiles_classes = [.1, .3, .6]
y = np.hstack([[ii] * int(100 * perc)
               for ii, perc in enumerate(percentiles_classes)])

# Evenly spaced groups repeated once
groups = np.hstack([[ii] * 10 for ii in range(10)])

fig, ax = plt.subplots()
cv = WindowedTestTimeSeriesSplit(n_splits=n_splits, n_test_folds=2)
plot_cv_indices(cv, X, y, groups, ax, n_splits)
plt.show()


谢谢你的回答!确实帮了我很多,特别是可视化方面。我想出了一个稍微不同的解决方案,满足了我的要求,特别是关于min_training_sizetest_size,我想对它们进行控制。 - Roger
很高兴它有所帮助。我认为,test_size已经被n_test_fold捕获了。你的min_training_size在滑动窗口可视化中没有被捕获。因此,我没有添加它。 - Venkatachalam

2

这是我的解决方案,允许用户指定测试时间和训练的最小数据样本:

from sklearn.model_selection import TimeSeriesSplit
from sklearn.utils import indexable
from sklearn.utils.validation import _num_samples

class TimeSeriesSplitCustom(TimeSeriesSplit):
    def __init__(self, n_splits=5, max_train_size=None,
                 test_size=1,
                 min_train_size=1):
        super().__init__(n_splits=n_splits, max_train_size=max_train_size)
        self.test_size = test_size
        self.min_train_size = min_train_size

    def overlapping_split(self, X, y=None, groups=None):
        min_train_size = self.min_train_size
        test_size = self.test_size

        n_splits = self.n_splits
        n_samples = _num_samples(X)

        if (n_samples - min_train_size) / test_size >= n_splits:
            print('(n_samples -  min_train_size) / test_size >= n_splits')
            print('default TimeSeriesSplit.split() used')
            yield from super().split(X)

        else:
            shift = int(np.floor(
                (n_samples - test_size - min_train_size) / (n_splits - 1)))

            start_test = n_samples - (n_splits * shift + test_size - shift)

            test_starts = range(start_test, n_samples - test_size + 1, shift)

            if start_test < min_train_size:
                raise ValueError(
                    ("The start of the testing : {0} is smaller"
                     " than the minimum training samples: {1}.").format(start_test,
                                                                        min_train_size))

            indices = np.arange(n_samples)

            for test_start in test_starts:
                if self.max_train_size and self.max_train_size < test_start:
                    yield (indices[test_start - self.max_train_size:test_start],
                           indices[test_start:test_start + test_size])
                else:
                    yield (indices[:test_start],
                           indices[test_start:test_start + test_size])

而且还有可视化效果:
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.patches import Patch
from ModelEvaluation import TimeSeriesSplitCustom
np.random.seed(1338)
cmap_data = plt.cm.Paired
cmap_cv = plt.cm.coolwarm
n_splits = 13

# Generate the class/group data
n_points = 100
X = np.random.randn(100, 10)

percentiles_classes = [.1, .3, .6]
y = np.hstack([[ii] * int(100 * perc)
               for ii, perc in enumerate(percentiles_classes)])

# Evenly spaced groups repeated once
groups = np.hstack([[ii] * 10 for ii in range(10)])

fig, ax = plt.subplots()

cv = TimeSeriesSplitCustom(n_splits=n_splits, test_size=20, min_train_size=12)
plot_cv_indices(cv, X, y, groups, ax, n_splits)
plt.show()

enter image description here

为了获得相同的结果,请确保在plot_cv_indices函数中更改以下内容:
for ii, (tr, tt) in enumerate(**cv.overlapping_split**(X=X, y=y, groups=group)):
祝好!

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接