虽然@mozway已经提供了一个非常聪明的解决方案,但我也想分享一下我的方法,这个方法是受到this post的启发。
你可以创建自己的对象来比较一个序列和一个滚动序列。比较可以通过典型的运算符进行,即>、<或==
。如果至少有一个比较成立,该对象将返回预定义的值(在列表returns_tf
中给出,如果比较为真,则返回第一个元素,如果为假,则返回第二个元素)。
可能的代码:
import numpy as np
import pandas as pd
df = pd.DataFrame([1, 2, 3, 2, 5, 4, 3, 6, 7])
check_df = pd.DataFrame([3, 2, 5, 4, 3, 6, 4, 2, 1])
class RollingComparison:
def __init__(self, comparing_series: pd.Series, rolling_series: pd.Series, window: int):
self.comparing_series = comparing_series.values[:-1*window]
self.rolling_series = rolling_series.values
self.window = window
def rolling_window_mask(self, option: str = "smaller"):
shape = self.rolling_series.shape[:-1] + (self.rolling_series.shape[-1] - self.window + 1, self.window)
strides = self.rolling_series.strides + (self.rolling_series.strides[-1],)
rolling_window = np.lib.stride_tricks.as_strided(self.rolling_series, shape=shape, strides=strides)[:-1]
rolling_window_mask = (
self.comparing_series.reshape(-1, 1) < rolling_window if option=="smaller" else (
self.comparing_series.reshape(-1, 1) > rolling_window if option=="greater" else self.comparing_series.reshape(-1, 1) == rolling_window
)
)
return rolling_window_mask.any(axis=1)
def assign(self, option: str = "rolling", returns_tf: list = [1, -1]):
mask = self.rolling_window_mask(option)
return np.concatenate((np.where(mask, returns_tf[0], returns_tf[1]), self.rolling_series[-1*self.window:]))
可以按照以下方式完成作业:
roller = RollingComparison(check_df[0], df[0], 3)
check_df["rolling_smaller_checking"] = roller.assign(option="smaller")
check_df["rolling_greater_checking"] = roller.assign(option="greater")
check_df["rolling_equals_checking"] = roller.assign(option="equal")
输出(列rolling_smaller_checking
等于您所需的输出):
0 rolling_smaller_checking rolling_greater_checking rolling_equals_checking
0 3 -1 1 1
1 2 1 -1 1
2 5 -1 1 1
3 4 1 1 1
4 3 1 -1 1
5 6 -1 1 1
6 4 3 3 3
7 2 6 6 6
8 1 7 7 7
N
是什么? ;)) - Lucian