Python多线程共享变量

3
我正在尝试并行执行我的工作,但我对多线程编程不熟悉,因此对具体实现感到困惑。
我有一个套接字监听器,将数据保存到缓冲区。当缓冲区达到容量时,需要将其数据保存到数据库中。 我想在一个线程上启动套接字监听器,在并行任务中检查缓冲区状态。
BufferQueue只是Python列表的扩展,具有允许检查列表是否达到指定大小的方法。
SocketManager是我正在监听的STREAM_URL的流数据提供程序。它使用回调函数处理消息。
但是,由于我使用回调来检索数据,因此我不确定使用共享变量是否是正确且最优的决策。
buffer = BufferQueue(buffer_size=10000)

def start_listening_to_sokcet(client):
    s = SocketManager(client)
    s.start_socket(cb_new)
    s.start()

def cb_new(message):
    print("New message")
    global buffer
    for m in message:
        #save data to buffer

def is_buffer_ready(buffer):
    global buffer
    print("Buffer state")
    if buffer.ready():
         #save buffer data to db

如果您能帮我解决这个问题,我将不胜感激。


你可以使用共享缓冲区,但是你需要一些方法来控制对它的访问,以便只有一个线程可以修改它。例如,你可以使用 Lock - PM 2Ring
如果不知道这些BufferQueueSocketManager等类型来自哪里,或者至少它们的作用是什么,那么很难提供任何不太模糊的东西。但我会对使用调用者必须定期检查(或更糟的是在自旋循环中)的is_buffer_ready函数的任何API持谨慎态度;通常你会想要一些可以阻塞的东西。 - abarnert
如果您能提供一个最小可复现示例(MCVE),我们可能会比“您可能想在这里使用锁”和“通常您需要某种阻止方式”更具体地建议一些想法... - abarnert
@abarnert 谢谢您的建议。我编辑了我的问题。 - Daniel Chepenko
2个回答

4
我认为你需要的是 queue 模块。 queue.Queue 是一个自我同步队列,专门用于在线程之间传递对象。
默认情况下,在队列上调用 get 会阻塞,直到有一个对象可用,这通常是您想要做的——在网络应用程序中使用线程进行并发的关键在于,您的线程看起来都像正常的同步代码,但当它们没有任何事可做时,它们会花费大部分时间等待套接字、文件、队列或其他内容。但是,您可以使用 block=False 进行非阻塞检查,或者在等待时放置 timeout
您还可以在构造队列时指定 maxsize。然后,默认情况下,put 会阻塞,直到队列不太满以接受新对象。但是,如果太满,则可以使用 blocktimeout 进行尝试和失败。
所有同步都在 getput 内部处理,因此您不需要 Lock 来保证线程安全或 Condition 来通知等待者。
队列甚至可以为您处理关闭。生产者只需在 put 上放置一个特殊值,告诉消费者在 get 上看到它时退出即可。
对于优雅的关闭,其中生产者需要等待消费者完成后,您可以在消费者处理每个排队的对象后使用可选的 task_done 方法,并在 join 方法上阻塞生产者。但是,如果您不需要此功能——或者有其他等待关闭的方法,例如加入消费者线程——则可以跳过此部分。

2
"多线程"使您可以共享资源(变量)的状态。不要使用全局变量,只需将缓冲区作为参数传递给您的方法,并从中读取/写入即可。
您仍然需要控制对缓冲区资源的访问,以便两个线程不会同时读取/写入。您可以使用来自"threading"模块的"Lock"来实现这一点:
lock = threading.Lock()

def cb_new(buffer_, lock_, message):
    print("New message")
    with lock_():
        for m in message:
            #save data to buffer
            buffer.add(m)

def is_buffer_ready(buffer_, lock_):
    print("Buffer state")
    with lock_():
        if buffer_.ready():
             #save buffer data to db

请注意,如果您正在使用多进程而不是线程,则此解决方案将无法工作。
顺便说一下,正如@abarnert所评论的那样,有更好的机制来检查缓冲区是否已准备好(有数据可读/有空闲空间可写),而不是调用检查它的函数。请查看{{link1:select.select()}},该函数会阻塞您,直到缓冲区实际上已经准备好。
在使用 select 时,你需要将调用放在一个 while True 循环中,并检查缓冲区是否准备好进行读取。你可以在一个线程中启动此函数,传递一个标志变量和缓冲区。如果你想停止线程,则将传递的标志变量更改为 False。对于缓冲区对象,请使用 Queue.Queue() 或类似的数据结构。
def read_select(flag, buff):
    flag = 1
    while flag:
        r, _, _ = select.select([buff], [], [])
        if r:
            data = s.read(BUFFSIZE)
            # process data

P.S - select函数也适用于套接字。你可以传递一个套接字对象而不是缓冲区,它会检查套接字上的缓冲区是否可以读取。

我猜你是对的。在那种情况下,select.select() 看起来非常合理。你能给我一个提示如何开始吗? - Daniel Chepenko
我已经添加了一个如何使用它的示例。 - Chen A.
2
我不认为这是一个好建议。你不能在队列上进行select操作,也不需要这样做;你只需在get调用上阻塞即可。而且,虽然你可以在套接字上进行select操作,但同样不需要这样做;你只需在recv调用上阻塞即可。select的主要作用是可以同时在多个套接字上阻塞,而无需为每个套接字创建一个线程。它是网络并发的一种替代方案,你不需要同时使用多线程和select - abarnert
1
你绝对不想在回调 API 上使用 select。你可能想要使用它来_实现_该回调 API,但同样,这是作为线程的替代方案(而且有更好的选择,比如 asyncio 或第三方库 Twisted)。无论哪种方式,一旦回调被调用,数据要么已经准备好读取,要么已经被读取,所以你不想在任何东西上调用 select - abarnert

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接