我一直在密集地使用线程(并行处理)和锁(防止共享对象的同时操作)。由于我正在编写具有非常高并行线程处理速度的代码,接收数据并填充共享数据缓冲区,因此我想知道何时真正需要锁定?
- 编写共享对象
- 读取共享对象
- 根据其内容更新共享对象
我了解到主要是第三种情况很关键(给出线程和锁定的著名“增量计数器”示例)。但是我是否应该在其他情况下也使用锁?
在我的特定情况中,它是关于用作数据缓冲区的pandas数据帧。我想要:
- 向其中添加新数据
- 从中获取数据
- 删除其中的数据(创建一个循环缓冲区)
下面的最小工作示例(MWE)显示了此过程与线程一起进行,但为简单起见在此按顺序处理,并在进程之间进行了密集的锁定。虽然这是一种超级谨慎的方法,但我猜想可以省略一些获取/释放锁步骤?但由于pandas在附加对象时会复制对象,因此我不能百分之百确定是否要删除这些锁。
有没有人对此进行了密集测试或有任何经验?
MWE:
import pandas as pd
import threading
thread_lock = threading.Lock()
df_data_buffer = pd.DataFrame({"key" : []})
def add_data_to_buffer(df_data_ingestion):
global df_data_buffer
thread_lock.acquire()
df_data_buffer = df_data_buffer.append(df_data_ingestion)
thread_lock.release()
def get_data_from_buffer(key):
thread_lock.acquire()
df_data_buffer.reset_index(inplace=True, drop=True) #required for proper dropping by index
df_extracted = df_data_buffer.loc[df_data_buffer["key"] == key].copy()
thread_lock.release()
drop_data(df_extracted.index)
return df_extracted
def drop_data_from_buffer(df_index):
global df_data_buffer
thread_lock.acquire()
df_data_buffer.drop(df_index, inplace=True)
thread_lock.release()
return True
df_data1 = pd.DataFrame({"key" : [1]})
t_add_data1 = threading.Thread(target=add_data, args=[df_data1])
t_add_data1.start()
t_add_data1.join()
print "*"*10, 1, "*"*10
print df_data_buffer
df_data2 = pd.DataFrame({"key" : [2]})
t_add_data2 = threading.Thread(target=add_data, args=[df_data2])
t_add_data2.start()
t_add_data2.join()
print "*"*10, 2, "*"*10
print df_data_buffer
key=1
df_data_extracted = get_data(key)
print "*"*10, "extract", "*"*10
print "df_data_extracted\n", df_data_extracted
print "df_data_buffer\n", df_data_buffer
print "*"*10, 3, "*"*10
df_data3 = pd.DataFrame({"key" : [3]})
t_add_data3 = threading.Thread(target=add_data, args=[df_data3])
t_add_data3.start()
t_add_data3.join()
print df_data_buffer
输出:
********** 1 **********
key
0 1.0
********** 2 **********
key
0 1.0
1 2.0
********** extract **********
df_data_extracted
key
0 1.0
df_data_buffer
key
1 2.0
********** 3 **********
key
0 2.0
1 3.0
add_data
过程通过添加行肯定会写入它,drop_data
也通过删除行来操作对象,并且get_data
需要更新索引,因此也修改了分片对象? - HeXor