使用Python计算用户的并发会话数

3

我有一个按用户分类的登录和注销表格。

表格长得像这样,但它包含几十万行:

data = [['aa', '2020-05-31 00:00:01', '2020-05-31 00:00:31'],
        ['bb','2020-05-31 00:01:01', '2020-05-31 00:02:01'],
        ['aa','2020-05-31 00:02:01', '2020-05-31 00:06:03'],
        ['cc','2020-05-31 00:03:01', '2020-05-31 00:04:01'],
        ['dd','2020-05-31 00:04:01', '2020-05-31 00:34:01'],
        ['aa', '2020-05-31 00:05:01', '2020-05-31 00:07:31'],
        ['bb','2020-05-31 00:05:01', '2020-05-31 00:06:01'],
        ['aa','2020-05-31 00:05:01', '2020-05-31 00:08:03'],
        ['cc','2020-05-31 00:10:01', '2020-05-31 00:40:01'],
        ['dd','2020-05-31 00:20:01', '2020-05-31 00:35:01']]


df_test = pd.DataFrame(data,  columns=['user_id','login', 'logout'], dtype='datetime64[ns]')

我能用一个for循环的hacky方法解决这个问题。它在较小数据集上运行良好,但在30万行上需要数小时。

基本上,这段代码计算了每个会话(即每一行)中同时登录的用户数量。

这是我的解决方案。它提供了我需要的结果。我还能通过写一个带有apply的lambda表达式来实现相同的功能,但这样做需要更长的时间。

# create a new column for simultaneous
df_test['simultaneous'] = 0

start_time = time.time()

# loop through dataframe and check condition
for i in df_test.index:
    login, logout = df_test.loc[i,'login'], df_test.loc[i,'logout']
    this_index = df_test.index.isin([i])
    df_test.loc[i, 'simultaneous'] = int(sum(
        (df_test[~this_index]['login'] <= logout) & (df_test[~this_index]['logout'] >= login)
    ))
print("--- %s seconds ---" % (time.time() - start_time))

您能否看一下并告诉我是否有更好的方法可以得到相同的结果。也许我漏掉了一些显而易见的东西。

提前致谢!

3个回答

1
该算法采用流式处理方式,基于数据按登录时间排序的事实。对于每个会话,它会跟踪所有未过期的注销时间的会话数量(通过将注销时间存储在列表中,并在每次检查新会话时从该列表中删除过期条目)。我决定将sess1.logout == sess2.login计为同时发生,但如果您不同意,可以将">="更改为">"。
算法在calculate函数中。
#!/usr/bin/python

import datetime
import random
import time
from statistics import mean, stdev


def calculate(data):
    active_sessions = []
    simultaneous_sessions = []
    for user_id, login, logout in data:
        active_sessions = [ts for ts in active_sessions if ts >= login]
        simultaneous_sessions.append(len(active_sessions))
        active_sessions.append(logout)
    return simultaneous_sessions


def generate_data(numsessions):
    start_time = datetime.datetime(2020, 5, 13, 0, 0, 1)
    data = []
    while len(data) < numsessions:
        for cnt in range(random.choice([0, 0, 0, 1, 1, 2, 3])):
            user_id = chr(ord("a") + cnt) * 2
            duration = random.choice([30, 30, 60, 90, 90, 900, 1800])
            logout_time = start_time + datetime.timedelta(seconds=duration)
            data.append(
                (
                    user_id,
                    start_time.strftime("%Y-%m-%d %H:%M:%S"),
                    logout_time.strftime("%Y-%m-%d %H:%M:%S"),
                )
            )

        start_time += datetime.timedelta(minutes=1)
    return data


start_time = time.time()
num_sessions = 3 * 1e5  # 300,000
print(f"generating data for {num_sessions:.0f} sessions")
data = generate_data(num_sessions)
print(f"sample data=[{data[0]}]")
print("--- %.2f seconds ---" % (time.time() - start_time))
start_time = time.time()
print("calculating simultaneous sessions")
simultaneous_sessions = calculate(data)
print(
    "for {} sessions have max={} min={}, mean={:.2f} stdev={:.2f}".format(
        len(simultaneous_sessions),
        max(simultaneous_sessions),
        min(simultaneous_sessions),
        mean(simultaneous_sessions),
        stdev(simultaneous_sessions),
    )
)
print("--- %.2f seconds ---" % (time.time() - start_time))

从性能的角度来看,我只需要遍历一次列表,并且在不断重新创建活动会话列表的同时,只要活动会话数量较少,这将非常快。您可以通过拥有更高效的活动会话列表进行其他优化,但是与为每个会话搜索所有数据相比,这应该要快得多。即使数据没有按登录时间排序,我认为按登录时间排序,然后使用此算法要比为每个会话扫描所有会话更有效率。
更新:我已经添加了一个合成数据生成器,它基于一些随机变量创建了一堆会话。这表明,对于300k行,此算法需要不到一秒钟。
对于300k个会话,大约需要0.4秒钟:
generating data for 300000 sessions
sample data=[('aa', '2020-05-13 00:02:01', '2020-05-13 00:03:31')]
--- 1.99 seconds ---
calculating simultaneous sessions
for 300001 sessions have max=26 min=0, mean=7.42 stdev=2.76
--- 0.35 seconds ---

当处理3百万个会话时,需要大约4秒钟:

generating data for 3000000 sessions
sample data=[('aa', '2020-05-13 00:00:01', '2020-05-13 00:01:31')]
--- 19.35 seconds ---
calculating simultaneous sessions
for 3000001 sessions have max=26 min=0, mean=7.43 stdev=2.77
--- 3.93 seconds ---

O(N)


0
如果您重组数据,可以通过一次遍历完成。这是 pandas.melt 的一个很好的应用例子:
# use a session id, as opposed to a user id, as a single user can log in multiple times:
df_test['sid'] = df_test.user_id + "-" + df_test.index.astype(str)

#df_test 
#  user_id               login              logout   sid
#0      aa 2020-05-31 00:00:01 2020-05-31 00:00:31  aa-0
#1      bb 2020-05-31 00:01:01 2020-05-31 00:02:01  bb-1
#2      aa 2020-05-31 00:02:01 2020-05-31 00:06:03  aa-2
#3      cc 2020-05-31 00:03:01 2020-05-31 00:04:01  cc-3
#4      dd 2020-05-31 00:04:01 2020-05-31 00:34:01  dd-4
#5      aa 2020-05-31 00:05:01 2020-05-31 00:07:31  aa-5
#6      bb 2020-05-31 00:05:01 2020-05-31 00:06:01  bb-6
#7      aa 2020-05-31 00:05:01 2020-05-31 00:08:03  aa-7
#8      cc 2020-05-31 00:10:01 2020-05-31 00:40:01  cc-8
#9      dd 2020-05-31 00:20:01 2020-05-31 00:35:01  dd-9

# restructure the data, and sort it
df_chrono = pd.melt(df_test.set_index('sid'), value_vars=['login', 'logout'], ignore_index=False)

df_chrono = df_chrono.sort_values(by='value').reset_index()
#df_chrono:
#     sid variable               value
#0   aa-0    login 2020-05-31 00:00:01
#1   aa-0   logout 2020-05-31 00:00:31
#2   bb-1    login 2020-05-31 00:01:01
#3   aa-2    login 2020-05-31 00:02:01
#4   bb-1   logout 2020-05-31 00:02:01
#5   cc-3    login 2020-05-31 00:03:01
#6   dd-4    login 2020-05-31 00:04:01
#7   cc-3   logout 2020-05-31 00:04:01
#8   aa-5    login 2020-05-31 00:05:01
#9   bb-6    login 2020-05-31 00:05:01
#10  aa-7    login 2020-05-31 00:05:01
#11  bb-6   logout 2020-05-31 00:06:01
#12  aa-2   logout 2020-05-31 00:06:03
#13  aa-5   logout 2020-05-31 00:07:31
#14  aa-7   logout 2020-05-31 00:08:03
#15  cc-8    login 2020-05-31 00:10:01
#16  dd-9    login 2020-05-31 00:20:01
#17  dd-4   logout 2020-05-31 00:34:01
#18  dd-9   logout 2020-05-31 00:35:01
#19  cc-8   logout 2020-05-31 00:40:01

有了时间顺序数据,我们可以轻松地通过并跟踪每次迭代中谁已登录(注意:请参见下面的更新,以获取以下循环的更优化版本)

# keep track of the current logins in simul_tracker, allowing for a single pass through the data

simul_track = {}
results = {"sid": [], "simul":[]}

for i,row in df_chrono.iterrows():
    
    if row.variable=='login':
        for sid in simul_track:
            simul_track[sid] += 1
        
        if row.sid not in simul_track:
            simul_track[row.sid] = len(simul_track)  # number of current logins
    
    else:
        results['simul'].append(simul_track.pop(row.sid))
        results['sid'].append (row.sid)

#results 
#{'sid': ['aa-0',
#  'bb-1',
#  'cc-3',
#  'bb-6',
#  'aa-2',
#  'aa-5',
#  'aa-7',
#  'dd-4',
#  'dd-9',
#  'cc-8'],
# 'simul': [0, 1, 2, 4, 6, 4, 4, 7, 2, 2]}

您可以使用结果字典更新原始数据帧(请注意,结果键'sid'对于对齐非常重要)

pd.merge(df_test, pd.DataFrame(results), on='sid') 
#  user_id               login              logout   sid  simul
#0      aa 2020-05-31 00:00:01 2020-05-31 00:00:31  aa-0      0
#1      bb 2020-05-31 00:01:01 2020-05-31 00:02:01  bb-1      1
#2      aa 2020-05-31 00:02:01 2020-05-31 00:06:03  aa-2      6
#3      cc 2020-05-31 00:03:01 2020-05-31 00:04:01  cc-3      2
#4      dd 2020-05-31 00:04:01 2020-05-31 00:34:01  dd-4      7
#5      aa 2020-05-31 00:05:01 2020-05-31 00:07:31  aa-5      4
#6      bb 2020-05-31 00:05:01 2020-05-31 00:06:01  bb-6      4
#7      aa 2020-05-31 00:05:01 2020-05-31 00:08:03  aa-7      4
#8      cc 2020-05-31 00:10:01 2020-05-31 00:40:01  cc-8      2
#9      dd 2020-05-31 00:20:01 2020-05-31 00:35:01  dd-9      2

更新

如果有大量用户同时登录,上述字典更新 (for sid in simul_track: simul_track[sid] += 1) 可能会成为瓶颈。为了解决这个问题,可以采用以下方案:

import numpy as np
import time

t = time.time()
results = {"sid": [], "simul":[]}
n_records = len(df_chrono)
n_active = 0  # we will track the number of active logins here

# create an array for quick incremental updates
# Each session id gets a unique element in tracker
n_session = len(df_test)
tracker = np.zeros(n_session, dtype=np.uint)
# we create a 1-to-1 mapping from session id to the tracker array
idx_from_sid = {sid:i for i,sid in zip(df_test.index, df_test.sid)}

for i,row in df_chrono.iterrows():
    idx = idx_from_sid[row.sid]  # position in data array corresonding to this particular session id
    
    # print progress
    if i % 100==0:
        perc_done = i / n_records * 100.
        print("prog=%.2f%% (rt=%.3fsec)."% (perc_done, time.time()-t), flush=True, end='\r' )
    
    if row.variable=='login':
        # We track two quantities
        # The first is how many additional users log in after current sid starts
        tracker += 1  # never mind that we increment all values here; on the next line we override this particular sessions value

        # the second is how many active users there are when this session id starts log in
        tracker[idx] = n_active
        n_active += 1
    else:
        n_active = n_active - 1
        count = tracker[idx]
        results['simul'].append(count)
        results['sid'].append(row.sid)
print("")

类似于另一个答案,我在data*30000上计时以模拟300,000行的扩展,并能够在约110秒内计算出同时的值。


现在,根据我的答案,您可能仍然对原始解决方案感兴趣,并且有几个优化可以进行。特别是,df_test.loc[~this_index]:每次迭代只需要执行一次。此外,df.loc[this_index]是数据帧中的单行,(df_test[this_index]['login'] <= logout) & (df_test[this_index]['logout'] >= login)将始终为True,因此无需进行切片:

df_test.reset_index(drop=True) # just in case
for i, row in df_test.iterrows():
    
    df_test.loc[i, 'simultaneous'] = int(np.sum(
        (df_test.login <= row.logout) & (df_test.logout >= row.login)
    )) -1  # note the subtraction by one saves us from having to do df.loc[~this_index]

    # alternatively, you can try to use numexpr to speed up the element wise comparisons
    #in_lt_out = pd.eval('df_test.login <= row.logout', engine='numexpr')
    #out_gt_in = pd.eval('df_test.logout >= row.login', engine='numexpr')
    #simul = np.sum(pd.eval('in_lt_out & out_gt_in', engine='numexpr'))
    #df_test.loc[i, 'simultaneous'] = int(simul-1)



注意,我很好奇你在使用 .isin 时想要做什么,这让我觉得你对“同时”的定义可能是针对唯一用户的,然而,在这里和你的解决方案中,情况并非如此。这可能是你需要更加明确的事情。我相信在我发布的解决方案中,如果你想要同时反映唯一登录,你可以简单地将“sid”替换为“user_id”,但我还没有测试过。祝你好运,这是一个有趣的问题。

0

尝试这个解决方案,在您的data * 30_000上,计算结果需要约1900秒(AMD 3700X/Python 3.9.7)- 但我不确定它在真实数据上的表现如何:

mn = df_test["login"].min()
mx = df_test["logout"].max()
tmp = pd.Series(0, index=pd.date_range(mn, mx, freq="S"), dtype=object)


def fn1(x):
    tmp[x["login"] : x["logout"]] = [
        v | (1 << x.name) for v in tmp[x["login"] : x["logout"]]
    ]


def fn2(x):
    out = 0
    for v in tmp[x["login"] : x["logout"]]:
        out |= v

    # If you use Python 3.10+ you can use this answer
    # https://dev59.com/rmkw5IYBdhLWcg3wiq-s#64848298
    # which should be ~6x faster instead of this:
    return bin(out).count("1") - 1


df_test.apply(fn1, axis=1)
df_test["sim"] = df_test.apply(fn2, axis=1)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接