线程锁 - 何时真正需要使用?

4

我一直在密集地使用线程(并行处理)和锁(防止共享对象的同时操作)。由于我正在编写具有非常高并行线程处理速度的代码,接收数据并填充共享数据缓冲区,因此我想知道何时真正需要锁定?

  • 编写共享对象
  • 读取共享对象
  • 根据其内容更新共享对象

我了解到主要是第三种情况很关键(给出线程和锁定的著名“增量计数器”示例)。但是我是否应该在其他情况下也使用锁?

在我的特定情况中,它是关于用作数据缓冲区的pandas数据帧。我想要:

  • 向其中添加新数据
  • 从中获取数据
  • 删除其中的数据(创建一个循环缓冲区)

下面的最小工作示例(MWE)显示了此过程与线程一起进行,但为简单起见在此按顺序处理,并在进程之间进行了密集的锁定。虽然这是一种超级谨慎的方法,但我猜想可以省略一些获取/释放锁步骤?但由于pandas在附加对象时会复制对象,因此我不能百分之百确定是否要删除这些锁。

有没有人对此进行了密集测试或有任何经验?


MWE:

import pandas as pd
import threading

thread_lock = threading.Lock()

df_data_buffer = pd.DataFrame({"key" : []})

def add_data_to_buffer(df_data_ingestion):        
    global df_data_buffer
    thread_lock.acquire()
    df_data_buffer = df_data_buffer.append(df_data_ingestion)
    thread_lock.release()

def get_data_from_buffer(key):
    thread_lock.acquire()
    df_data_buffer.reset_index(inplace=True, drop=True) #required for proper dropping by index
    df_extracted = df_data_buffer.loc[df_data_buffer["key"] == key].copy()
    thread_lock.release()
    drop_data(df_extracted.index)
    return df_extracted

def drop_data_from_buffer(df_index):
    global df_data_buffer
    thread_lock.acquire()
    df_data_buffer.drop(df_index, inplace=True)
    thread_lock.release()
    return True


df_data1 = pd.DataFrame({"key" : [1]})
t_add_data1 = threading.Thread(target=add_data, args=[df_data1])
t_add_data1.start()
t_add_data1.join()
print "*"*10, 1, "*"*10
print df_data_buffer

df_data2 = pd.DataFrame({"key" : [2]})
t_add_data2 = threading.Thread(target=add_data, args=[df_data2])
t_add_data2.start()
t_add_data2.join()
print "*"*10, 2, "*"*10
print df_data_buffer

key=1
df_data_extracted = get_data(key)
print "*"*10, "extract", "*"*10
print "df_data_extracted\n", df_data_extracted
print "df_data_buffer\n", df_data_buffer

print "*"*10, 3, "*"*10
df_data3 = pd.DataFrame({"key" : [3]})
t_add_data3 = threading.Thread(target=add_data, args=[df_data3])
t_add_data3.start()
t_add_data3.join()
print df_data_buffer

输出:

********** 1 **********
   key
0  1.0
********** 2 **********
   key
0  1.0
1  2.0
********** extract **********
df_data_extracted
   key
0  1.0
df_data_buffer
   key
1  2.0
********** 3 **********
   key
0  2.0
1  3.0

1
每当一个线程可能会写入共享对象时,您都需要一个锁。唯一不需要锁的情况是对只读对象进行并发访问。 - chepner
但这意味着我的示例始终需要锁定。add_data过程通过添加行肯定会写入它,drop_data也通过删除行来操作对象,并且get_data需要更新索引,因此也修改了分片对象? - HeXor
是的,可变数据是使并发处理变得困难的原因。 - chepner
1个回答

0

你选择什么锁定策略取决于你对数据一致性的重视程度。在执行读取操作时,你是否需要最新的数据。例如,你可以选择在读取数据框时不包括锁定,但只在某些计算后重新分配时才进行锁定。然而,看起来你想要完全的一致性保证,你当前的锁定策略可以很好地实现这一点。此外,当从多个线程写入时,pandas DataFrame没有内部一致性保证,因此在执行此操作时必须进行锁定。

然而,你也必须意识到cpython实现使用GIL或全局解释器锁,只允许一个Python“线程”在任何给定时间执行。要获得实际的并行性,您必须使用多个进程,这些进程不受GIL限制。我怀疑由于这个事实,上面的代码执行速度是否比在单个线程中运行这些操作更快。


是的,我知道“线程”中的GIL/cpython问题。我处于并行执行不同进程的情况下(在线程中监听MQTT代理的消息(mqtt模块循环),在各个线程中添加接收到的消息数据到缓冲区,主线程在给定时间间隔内处理数据)。由于它“伪造”了并行处理,所以我很满意。到目前为止,它似乎能够以O(50ms)间隔跟上传入消息的速度。但我想减少由于锁定而导致的死时间,因此尝试减少我的锁获取/释放过程。 - HeXor

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接