如何在Pandas时间序列中高效地计算滚动唯一计数?

15
我有一系列的时间序列数据,记录了到访某建筑的人员,每个人都有唯一的ID。对于时间序列中的每条记录,我想知道在过去的 365 天内到访该建筑的独立人数(即一个滚动的、窗口为 365 天的唯一人数计数)。

pandas 没有似乎没有内置的方法可以进行这种计算。当存在大量独立访问者和/或大窗口时,计算变得计算密集。(实际数据比此示例更大。)

是否有比我下面做的更好的计算方法?我不确定我制作的快速方法 windowed_nunique("Speed test 3" 下)为什么会少 1。

感谢任何帮助!

相关链接:

初始化

In [1]:

# Import libraries.
import pandas as pd
import numba
import numpy as np

In [2]:

# Create data of people visiting a building.

np.random.seed(seed=0)
dates = pd.date_range(start='2010-01-01', end='2015-01-01', freq='D')
window = 365 # days
num_pids = 100
probs = np.linspace(start=0.001, stop=0.1, num=num_pids)

df = pd\
    .DataFrame(
        data=[(date, pid)
              for (pid, prob) in zip(range(num_pids), probs)
              for date in np.compress(np.random.binomial(n=1, p=prob, size=len(dates)), dates)],
        columns=['Date', 'PersonId'])\
    .sort_values(by='Date')\
    .reset_index(drop=True)

print("Created data of people visiting a building:")
df.head() # 9181 rows × 2 columns

Out[2]:

Created data of people visiting a building:

|   | Date       | PersonId | 
|---|------------|----------| 
| 0 | 2010-01-01 | 76       | 
| 1 | 2010-01-01 | 63       | 
| 2 | 2010-01-01 | 89       | 
| 3 | 2010-01-01 | 81       | 
| 4 | 2010-01-01 | 7        | 

速度参考

In [3]:

(在第3行输入代码)
%%timeit
# This counts the number of people visiting the building, not the number of unique people.
# Provided as a speed reference.
df.rolling(window='{:d}D'.format(window), on='Date').count()

3.32毫秒±124微秒每次循环(平均值±7次运行的标准差,每个循环100次)

速度测试1

In [4]:

%%timeit
df.rolling(window='{:d}D'.format(window), on='Date').apply(lambda arr: pd.Series(arr).nunique())

每次循环的平均时间为2.42秒,标准偏差为282毫秒(7次运行的平均值和标准差,每次运行1次)

In [5]:

# Save results as a reference to check calculation accuracy.
ref = df.rolling(window='{:d}D'.format(window), on='Date').apply(lambda arr: pd.Series(arr).nunique())['PersonId'].values

速度测试 2

在 [6]:

# Define a custom function and implement a just-in-time compiler.
@numba.jit(nopython=True)
def nunique(arr):
    return len(set(arr))

In [7]:

%%timeit
df.rolling(window='{:d}D'.format(window), on='Date').apply(nunique)

每次循环平均需要430毫秒±31.1毫秒(7次运行,每次循环1次)

In [8]:

# Check accuracy of results.
test = df.rolling(window='{:d}D'.format(window), on='Date').apply(nunique)['PersonId'].values
assert all(ref == test)

速度测试 3

In [9]:

# Define a custom function and implement a just-in-time compiler.
@numba.jit(nopython=True)
def windowed_nunique(dates, pids, window):
    r"""Track number of unique persons in window,
    reading through arrays only once.

    Args:
        dates (numpy.ndarray): Array of dates as number of days since epoch.
        pids (numpy.ndarray): Array of integer person identifiers.
        window (int): Width of window in units of difference of `dates`.

    Returns:
        ucts (numpy.ndarray): Array of unique counts.

    Raises:
        AssertionError: Raised if `len(dates) != len(pids)`

    Notes:
        * May be off by 1 compared to `pandas.core.window.Rolling`
            with a time series alias offset.

    """

    # Check arguments.
    assert dates.shape == pids.shape

    # Initialize counters.
    idx_min = 0
    idx_max = dates.shape[0]
    date_min = dates[idx_min]
    pid_min = pids[idx_min]
    pid_max = np.max(pids)
    pid_cts = np.zeros(pid_max, dtype=np.int64)
    pid_cts[pid_min] = 1
    uct = 1
    ucts = np.zeros(idx_max, dtype=np.int64)
    ucts[idx_min] = uct
    idx = 1

    # For each (date, person)...
    while idx < idx_max:

        # If person count went from 0 to 1, increment unique person count.
        date = dates[idx]
        pid = pids[idx]
        pid_cts[pid] += 1
        if pid_cts[pid] == 1:
            uct += 1

        # For past dates outside of window...
        while (date - date_min) > window:

            # If person count went from 1 to 0, decrement unique person count.
            pid_cts[pid_min] -= 1
            if pid_cts[pid_min] == 0:
                uct -= 1
            idx_min += 1
            date_min = dates[idx_min]
            pid_min = pids[idx_min]

        # Record unique person count.
        ucts[idx] = uct
        idx += 1

    return ucts

In [10]:

# Cast dates to integers.
df['DateEpoch'] = (df['Date'] - pd.to_datetime('1970-01-01'))/pd.to_timedelta(1, unit='D')
df['DateEpoch'] = df['DateEpoch'].astype(int)

输入[11]:

%%timeit
windowed_nunique(
    dates=df['DateEpoch'].values,
    pids=df['PersonId'].values,
    window=window)

107微秒±63.5微秒每个循环(平均值±7次运行的标准差,每个循环1次)

In [12]:

# Check accuracy of results.
test = windowed_nunique(
    dates=df['DateEpoch'].values,
    pids=df['PersonId'].values,
    window=window)
# Note: Method may be off by 1.
assert all(np.isclose(ref, np.asarray(test), atol=1))

在 [13] 中:

# Show where the calculation doesn't match.
print("Where reference ('ref') calculation of number of unique people doesn't match 'test':")
df['ref'] = ref
df['test'] = test
df.loc[df['ref'] != df['test']].head() # 9044 rows × 5 columns

Out[13]:

Where reference ('ref') calculation of number of unique people doesn't match 'test':

|    | Date       | PersonId | DateEpoch | ref  | test | 
|----|------------|----------|-----------|------|------| 
| 78 | 2010-01-19 | 99       | 14628     | 56.0 | 55   | 
| 79 | 2010-01-19 | 96       | 14628     | 56.0 | 55   | 
| 80 | 2010-01-19 | 88       | 14628     | 56.0 | 55   | 
| 81 | 2010-01-20 | 94       | 14629     | 56.0 | 55   | 
| 82 | 2010-01-20 | 48       | 14629     | 57.0 | 56   | 

抱歉如果这是一个愚蠢的评论,但是一个365天的滚动计数唯一ID不是很简单吗:df.rolling(365)['PersonId'].apply(lambda x: len(set(x)))??? - Woody Pride
@WoodyPride 谢谢,这就是我在“速度测试2”下所做的,但使用了即时编译器(请参见函数nunique)。计算是正确的,但效率低下,因为set每次窗口计算执行时都会对窗口中的每个元素进行操作。更有效的方法是保持每个元素的运行总数,就像“速度测试3”中一样(在示例数据上比较“速度测试2”和“速度测试3”,效率提高了约4000倍)。然而,我的实现windowed_nunique有误差1,我想知道是否有人能帮忙找到问题。 - Samuel Harrold
明白了!我觉得我没有读完整个问题。 - Woody Pride
1
干得好!我尝试应用你的速度测试3,但一直收到下面的错误,有什么想法吗?TypingError:在nopython模式管道中失败(步骤:nopython前端) 非精确类型数组(pyobject,1d,C) 期间:<ipython-input-379-072a9c819fa1>(24)处的参数输入文件“<ipython-input-379-072a9c819fa1>”,第24行: def windowed_nunique(dates, pids, window): <source elided> # 检查参数。 assert dates.shape == pids.shape ^ - Rajko Radovanovic
这里的 personId 是一个数字。但是如果我们尝试将其应用于非数字类型,我们会得到 DataError: No numeric types to aggregate 的错误。有什么办法可以让它工作吗? - David Davó
3个回答

5

在快速方法windowed_nunique中我发现了两个错误,现已在下面的 windowed_nunique_corrected 中进行了更正:

  1. 用于存储窗口内每个人ID的唯一计数的记忆化数组pid_cts的大小过小。
  2. 由于窗口的前缘和后缘包括整数天数,所以当(date - date_min + 1) > window时应更新date_min

相关链接:

In [14]:

# Define a custom function and implement a just-in-time compiler.
@numba.jit(nopython=True)
def windowed_nunique_corrected(dates, pids, window):
    r"""Track number of unique persons in window,
    reading through arrays only once.

    Args:
        dates (numpy.ndarray): Array of dates as number of days since epoch.
        pids (numpy.ndarray): Array of integer person identifiers.
            Required: min(pids) >= 0
        window (int): Width of window in units of difference of `dates`.
            Required: window >= 1

    Returns:
        ucts (numpy.ndarray): Array of unique counts.

    Raises:
        AssertionError: Raised if not...
            * len(dates) == len(pids)
            * min(pids) >= 0
            * window >= 1

    Notes:
        * Matches `pandas.core.window.Rolling`
            with a time series alias offset.

    """

    # Check arguments.
    assert len(dates) == len(pids)
    assert np.min(pids) >= 0
    assert window >= 1

    # Initialize counters.
    idx_min = 0
    idx_max = dates.shape[0]
    date_min = dates[idx_min]
    pid_min = pids[idx_min]
    pid_max = np.max(pids) + 1
    pid_cts = np.zeros(pid_max, dtype=np.int64)
    pid_cts[pid_min] = 1
    uct = 1
    ucts = np.zeros(idx_max, dtype=np.int64)
    ucts[idx_min] = uct
    idx = 1

    # For each (date, person)...
    while idx < idx_max:

        # Lookup date, person.
        date = dates[idx]
        pid = pids[idx]

        # If person count went from 0 to 1, increment unique person count.
        pid_cts[pid] += 1
        if pid_cts[pid] == 1:
            uct += 1

        # For past dates outside of window...
        # Note: If window=3, it includes day0,day1,day2.
        while (date - date_min + 1) > window:

            # If person count went from 1 to 0, decrement unique person count.
            pid_cts[pid_min] -= 1
            if pid_cts[pid_min] == 0:
                uct -= 1
            idx_min += 1
            date_min = dates[idx_min]
            pid_min = pids[idx_min]

        # Record unique person count.
        ucts[idx] = uct
        idx += 1

    return ucts

在[15]中:

# Cast dates to integers.
df['DateEpoch'] = (df['Date'] - pd.to_datetime('1970-01-01'))/pd.to_timedelta(1, unit='D')
df['DateEpoch'] = df['DateEpoch'].astype(int)

在[16]中:

%%timeit
windowed_nunique_corrected(
    dates=df['DateEpoch'].values,
    pids=df['PersonId'].values,
    window=window)

98.8微秒±41.3微秒每次循环(平均值±7次运行的标准差,每次循环1次)

In [17]:

# Check accuracy of results.
test = windowed_nunique_corrected(
    dates=df['DateEpoch'].values,
    pids=df['PersonId'].values,
    window=window)
assert all(ref == test)

1
非常接近您在种子测试二中的时间,但作为一行代码,在一年内重新采样。
 df.resample('AS',on='Date')['PersonId'].expanding(0).apply(lambda x: np.unique(x).shape[0])

时间结果
1 loop, best of 3: 483 ms per loop

这与速度测试2接近,但np.unique在窗口中操作每个元素。像“Speed test 3”一样,保持每个元素的运行总数更有效率。 (请参见我的Woody Pride评论。)我的运行总数实现windowed_nunique有1个偏差。还有其他想法吗?谢谢。 - Samuel Harrold

-1

如果您只想知道过去365天进入建筑物的独立人数,您可以首先使用.loc将数据集限制在最近的365天:

df = df.loc[df['date'] > '2016-09-28',:]

使用groupby,您将获得与进入的唯一人数相同的行数,如果按计数方式执行,则还将获得他们进入的次数:

df = df.groupby('PersonID').count()

看起来对你的问题有用,但也许我理解错了。祝你有美好的一天。


谢谢,但我正在寻找一种高效的滚动唯一计数方法。输出必须与输入具有相同的长度(例如len(df) == len(ref) == 9181),并且比“速度测试2”更快。 - Samuel Harrold
@SamuelHarrold,你说的“滚动唯一计数”是什么意思?你在一年内滚动的周期是多久? - DJK
@djk47463 示例滚动唯一计数(类似于上面“速度测试2”中定义的函数nunique):df.rolling(window='365D', on='Date').apply(lambda arr: len(set(arr)))。挑战在于使其更高效(比较“速度测试2”和“速度测试3”)。我几乎成功了,但我的解决方案windowed_nunique有1个错误,我想知道是否有人能找到我的错误。 - Samuel Harrold
@djk47463 在记录78处(例如中的Out [13]),windowed_nunique会出现偏差1,该记录对应的日期为“2010-01-19”,早于任何额外的闰日。 - Samuel Harrold

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接