Python中的接触追踪 - 与时间序列配合使用

5
假设我有时间序列数据(时间在x轴上,坐标在y-z平面上)。
给定一组感染用户的种子集,我想获取所有在时间内与种子集中的点距离d以内的用户。这基本上就是接触追踪。
有没有聪明的方法来实现这一点?
天真的方法大致如下:
points_at_end_of_iteration = []
for p in seed_set:
    other_ps = find_points_t_time_away(t)
    points_at_end_of_iteration += find_points_d_distance_away_from_set(other_ps)

我应该如何更聪明地处理这个问题——最好将所有数据保留在RAM中(虽然我不确定是否可行)。Pandas是一个好的选择吗?我也考虑过Bandicoot,但它似乎不能为我完成这个任务。
如果我的问题太广泛,请告诉我如何改进。 编辑: 我认为我上面提出的算法有缺陷。
这样是否更好:
for user,time,pos in infected_set:
    info = get_next_info(user, time) # info will be a tuple: (t, pos)
    intersecting_users = find_intersecting_users(user, time, delta_t, pos, delta_pos) # intersect if close enough to the user's pos/time
    infected_set.add(intersecting_users)
    update_infected_set(user, info) # change last_time and last_pos (described below)

infected_set 我认为应该实际上是一个哈希表 {user_id: {last_time: ..., last_pos: ...}, user_id2: ...}

一个潜在的问题是用户被独立处理,因此对于用户2来说,下一个时间戳可能是几个小时或几天之后。

如果我进行插值,使每个用户都有每个时间点的信息(比如每小时一次),那么接触追踪可能会更容易,尽管这将大大增加数据量。

数据格式/示例

user_id = 123
timestamp = 2015-05-01 05:22:25
position = 12.111,-12.111 # lat,long

有一个包含所有记录的csv文件:

uid1,timestamp1,position1
uid1,timestamp2,position2
uid2,timestamp3,position3

还有一个文件目录(相同格式),每个文件对应一个用户。

records/uid1.csv
records/uid2.csv


@toasteez 当然,我会更新。 - pushkin
你如何定义两个用户之间的联系?是指他们在完全相同的位置同时出现吗?还是位置和/或时间有一个阈值呢? - Valentin Lorentz
@matjazzz144 是的。但一个问题是我有太多数据一次性加载到内存中,所以我正在进行大量的文件IO操作(我应该编辑我的问题提到这一点)。 - pushkin
多少数据?一个好的想法是按时间对不同文件进行排序,如果还没有这样做的话。您可能希望从最早的时间开始在每个时间增量上进行计算,并随着前进加载文件到内存中。此外,如果某个人的时间增量很大,您可能需要插值,否则感染的概率将相对较低 :) - matjazzz144
一旦您的数据结构化,您就快完成了。我想到了pytables - matjazzz144
显示剩余5条评论
1个回答

2

使用插值的第一种解决方案:

# i would use a shelf (a persistent, dictionary-like object,
# included with python).
import shelve

# hashmap of clean users indexed by timestamp)
# { timestamp1: {uid1: (lat11, long11), uid12: (lat12, long12), ...},
#   timestamp2: {uid1: (lat21, long21), uid2: (lat22, long22), ...},
#   ...
# }
#
clean_users = shelve.open("clean_users.dat")

# load data in clean_users from csv (shelve use same syntax than
# hashmap). You will interpolate data (only data at a given timestamp
# will be in memory at the same time). Note: the timestamp must be a string

# hashmap of infected users indexed by timestamp (same format than clean_users)
infected_users = shelve.open("infected_users.dat")

# for each iteration
for iteration in range(1, N):

    # compute current timestamp because we interpolate each user has a location
    current_timestamp = timestamp_from_iteration(iteration)

    # get clean users for this iteration (in memory)
    current_clean_users = clean_user[current_timestamp]

    # get infected users for this iteration (in memory)
    current_infected_users = infected_user[current_timestamp]

    # new infected user for this iteration
    new_infected_users = dict()

    # compute new infected_users for this iteration from current_clean_users and
    # current_infected_users then store the result in new_infected_users

    # remove user in new_infected_users from clean_users

    # add user in new_infected_users to infected_users

# close the shelves
infected_users.close()
clean_users.close()

不使用插值的第二种解决方案:

# i would use a shelf (a persistent, dictionary-like object,
# included with python).
import shelve

# hashmap of clean users indexed by timestamp)
# { timestamp1: {uid1: (lat11, long11), uid12: (lat12, long12), ...},
#   timestamp2: {uid1: (lat21, long21), uid2: (lat22, long22), ...},
#   ...
# }
#
clean_users = shelve.open("clean_users.dat")

# load data in clean_users from csv (shelve use same syntax than
# hashmap). Note: the timestamp must be a string

# hashmap of infected users indexed by timestamp (same format than clean_users)
infected_users = shelve.open("infected_users.dat")


# for each iteration (not time related as previous version)
# could also stop when there is no new infected users in the iteration
for iteration in range(1, N):

    # new infected users for this iteration
    new_infected_users = dict()

    # get timestamp from infected_users
    for an_infected_timestamp in infected_users.keys():

        # get infected users for this time stamp 
        current_infected_users = infected_users[an_infected_timestamp]

        # get relevant timestamp from clean users
        for a_clean_timestamp in clean_users.keys():
            if time_stamp_in_delta(an_infected_timestamp, a_clean_timestamp):

                # get clean users for this clean time stamp
                current_clean_users = clean_users[a_clean_timestamp]

                # compute infected users from current_clean_users and
                # current_infected_users then append the result to
                # new_infected_users

        # remove user in new_infected_users from clean_users

        # add user in new_infected_users to infected_users

# close the shelves
infected_users.close()
clean_users.close()

“shelve” 看起来非常有用 - 谢谢!但有一个问题是我没有插值(我提到过可能会这样做,但那将导致太多的数据,更重要的是,在我的上下文中进行插值的方法并不那么简单)。 - pushkin
我已经添加了一个没有插值的版本。我认为它可能有效。 - Olivier Pellier-Cuit
计算当前干净用户中的感染用户 - 意味着检查他们(感染用户和当前用户)的坐标是否接近? - pushkin
是的,“从当前干净用户计算感染用户” - 意味着检查它们(感染用户和当前用户)的坐标是否接近。时间戳在此阶段已经完成。 - Olivier Pellier-Cuit
非常感谢您的帮助。虽然我还没有测试过,但它看起来很有前途。 - pushkin
显示剩余2条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接