如何以线程安全的方式快速更新一个列表的列表?

3
我正在编写一个脚本,以每秒500次的频率向Python列表中添加一个“列”。以下是生成测试数据并将其通过单独的线程传递的代码:
fileA:
import random, time, threading

data = [[] for _ in range(4)]  # list with 4 empty lists (4 rows)
column = [random.random() for _ in data]  # synthetic column of data

def synthesize_data():
    while True:
        for x,y in zip(data,column):
            x.append(y)
        time.sleep(0.002)  # equivalent to 500 Hz

t1 = threading.Thread(target=synthesize_data).start()
# example of data
# [[0.61523098235, 0.61523098235, 0.61523098235, ... ],
# [0.15090349809, 0.15090349809, 0.15090349809, ... ],
# [0.92149878571, 0.92149878571, 0.92149878571, ... ],
# [0.41340918409, 0.41340918409, 0.41340918409, ... ]]

Jupyter Notebook 中的 fileB:

[1] import fileA, copy

[2] # get a copy of the data at this instant.
    data = copy.deepcopy(fileA.data)
    for row in data:
        print len(row)

如果您在fileB中运行cell [2],您应该会看到"data"中的“行”长度不相等。以下是我运行脚本时的示例输出:
8784
8786
8787
8787

我原以为可能是在 for 循环的中间获取数据,但这只会导致长度最多偏差1。随着时间的推移,差异变得更加严重。我的问题是:为什么快速向列表添加列不稳定? 是否有可能使此过程更加稳定?

你可能会建议我使用 Pandas 等工具,但我想使用 Python 列表,因为它们具有速度优势(代码需要尽可能快)。我测试了 for 循环、map() 函数和 Pandas 数据框架。以下是我的测试代码(在 Jupyter Notebook 中):

import pandas as pd
import random

channels = ['C3','C4','C5','C2']
a = [[] for _ in channels]
b = [random.random() for _ in a]

def add_col((x,y)):
    x.append(y)

df = pd.DataFrame(index=channels)
b_pandas = pd.Series(b, index=df.index)

%timeit for x,y in zip(a,b): x.append(y)  # 1000000 loops, best of 3: 1.32 µs per loop
%timeit map(add_col, zip(a,b))  # 1000000 loops, best of 3: 1.96 µs per loop
%timeit df[0] = b  # 10000 loops, best of 3: 82.8 µs per loop
%timeit df[0] = b_pandas  # 10000 loops, best of 3: 58.4 µs per loop

您还可以建议我将示例附加到data作为行,然后在分析时进行转置。出于速度考虑,我也不想这样做。此代码将用于脑机接口,其中分析在循环中发生。转置也必须在循环中发生,随着数据的增长,这会变得缓慢。
1个回答

7
deepcopy()操作会复制列表,并在另一个线程修改它们时进行修改,每次复制操作需要一小段时间(随着列表增长而变长)。因此,在复制第一个列表和复制第二个列表之间,其他线程添加了2个元素,表明复制8784个元素的列表需要花费0.002到0.004秒。

这是因为没有防止线程在执行synthesize_data()deepcopy.copy()调用之间切换。换句话说,你的代码不是线程安全的。

你需要协调你的两个线程;例如使用锁:

fileA中:

# ...
datalock = threading.RLock()
# ...

def synthesize_data():
    while True:
        with datalock:
            for x,y in zip(data,column):
                x.append(y)
            time.sleep(0.002)  # equivalent to 500 Hz

并且在fileB中:

with fileA.datalock:
    data = copy.deepcopy(fileA.data)
    for row in data:
        print len(row)

这样可以确保只有当fileA中的线程不再试图向列表中添加内容时才进行复制。
使用锁会使操作变慢;我怀疑pandas分配操作已经受到锁的限制以保持线程安全

如果我有足够的声望,我会点赞的...非常感谢。回答清晰、简洁、有帮助。顺便说一下,我修改了我的代码,使用列表推导式从头开始构建列表[[item for item in row] for row in fileA.data],而不是使用copy.deepcopy()。在一个包含500000列的嵌套列表上,copy.deepcopy()花费了1.73秒,而用列表推导式重新构建列表只需要216毫秒。感谢您提醒我深拷贝的缓慢! - jkr
@Jakub:你也可以使用[row[:] for row in fileA.data];这样你就创建了嵌套列表的浅拷贝。深拷贝必须逐个复制嵌套列表中的每个值,而且它不知道什么程度才算足够深。请注意,列表推导式不能保护您免受线程问题的影响,只能使其发生的可能性稍微降低一些。 - Martijn Pieters
♦:感谢你进行的修改。你的代码比我的快15倍! - jkr

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接