当第三个Python线程执行任务时,如何暂停两个线程(使用锁?)

4

我是一名新手并发编程。

我想要重复执行三个任务。前两个任务应该一直运行,第三个任务每小时左右运行一次。前两个任务可以并行运行,但当第三个任务正在运行时,我总是想暂停它们。

这是我尝试过的框架:

import threading
import time

flock = threading.Lock()
glock = threading.Lock()

def f():
    while True:
        with flock:
            print 'f'
            time.sleep(1)

def g():
    while True:
        with glock:
            print 'g'
            time.sleep(1)

def h():
    while True:
        with flock:
            with glock:
                print 'h'
        time.sleep(5)

threading.Thread(target=f).start()
threading.Thread(target=g).start()
threading.Thread(target=h).start()

我希望这段代码每秒钟输出一个f和一个g,大约每五秒钟输出一个h。然而,当我运行它时,我需要等到12个f和12个g之后才会开始看到一些h。看起来前两个线程不断释放并重新获取它们的锁,而第三个线程则被留在了循环之外。
1. 为什么?当第三个线程尝试获取当前持有的锁时,如果它被释放,难道不应该立即成功获取,而不是第一/二个线程立即重新获取吗?我可能理解有误。
2. 如何才能实现我想要的效果?
注意:将time.sleep(1)调用移出with flock/glock块可在这个简单的示例中解决问题,但对于我的实际应用似乎不起作用,因为线程大部分时间都在执行实际操作。当前面两个线程在循环体执行后每次睡眠一秒钟并释放锁时,第三个任务仍然永远不会被执行。

使用您的确切代码,我在Python 2.7和Python 3.2中每5-6秒钟看到'h'。 我甚至尝试完全删除休眠,并将其从锁定中取出。您使用的是什么操作系统?(我在Windows上)。 - Mark Tolonen
我尝试了 Linux 2.6.24-1-amd64 #1 SMP x86_64 GNU/Linux,使用Python 2.7.1 和 Linux 2.6.32-5-686 #1 SMP i686 GNU/Linux,使用Python 2.6.6 - ke.
5个回答

5

线程事件来做怎么样:

import threading
import time
import logging

logger=logging.getLogger(__name__)

def f(resume,is_waiting,name):
    while True:
        if not resume.is_set():
            is_waiting.set()
            logger.debug('{n} pausing...'.format(n=name))
            resume.wait()
            is_waiting.clear()
        logger.info(name)
        time.sleep(1)

def h(resume,waiters):
    while True:
        logger.debug('halt') 
        resume.clear()
        for i,w in enumerate(waiters):
            logger.debug('{i}: wait for worker to pause'.format(i=i))
            w.wait()
        logger.info('h begin')
        time.sleep(2)
        logger.info('h end')        
        logger.debug('resume')
        resume.set()
        time.sleep(5)

logging.basicConfig(level=logging.DEBUG,
                    format='[%(asctime)s %(threadName)s] %(message)s',
                    datefmt='%H:%M:%S')

# set means resume; clear means halt
resume = threading.Event()
resume.set()

waiters=[]
for name in 'fg':
    is_waiting=threading.Event()
    waiters.append(is_waiting)
    threading.Thread(target=f,args=(resume,is_waiting,name)).start()    
threading.Thread(target=h,args=(resume,waiters)).start()

产出

[07:28:55 Thread-1] f
[07:28:55 Thread-2] g
[07:28:55 Thread-3] halt
[07:28:55 Thread-3] 0: wait for worker to pause
[07:28:56 Thread-1] f pausing...
[07:28:56 Thread-2] g pausing...
[07:28:56 Thread-3] 1: wait for worker to pause
[07:28:56 Thread-3] h begin
[07:28:58 Thread-3] h end
[07:28:58 Thread-3] resume
[07:28:58 Thread-1] f
[07:28:58 Thread-2] g
[07:28:59 Thread-1] f
[07:28:59 Thread-2] g
[07:29:00 Thread-1] f
[07:29:00 Thread-2] g
[07:29:01 Thread-1] f
[07:29:01 Thread-2] g
[07:29:02 Thread-1] f
[07:29:02 Thread-2] g
[07:29:03 Thread-3] halt

(回复评论中的问题)此代码尝试测量线程从其他工作线程获取每个锁所需的时间。
似乎表明,即使正在等待获取锁,其他工作线程也可能以相当高的概率释放并重新获取锁。 没有给予优先权,仅仅因为它等待的时间更长。
David Beazley在PyCon上介绍了与线程和GIL相关的问题。这里是幻灯片的pdf。这是一篇引人入胜的阅读,也许可以帮助解释这个问题。
import threading
import time
import logging

logger=logging.getLogger(__name__)

def f(lock,n):
    while True:
        with lock:
            logger.info(n)
            time.sleep(1)

def h(locks):
    while True:
        t=time.time()
        for n,lock in enumerate(locks):
            lock.acquire()
            t2=time.time()
            logger.info('h acquired {n}: {d}'.format(n=n,d=t2-t))
            t=t2
        t2=time.time()
        logger.info('h {d}'.format(d=t2-t))
        t=t2
        for lock in locks:
            lock.release()
        time.sleep(5)

logging.basicConfig(level=logging.DEBUG,
                    format='[%(asctime)s %(threadName)s] %(message)s',
                    datefmt='%H:%M:%S')

locks=[]
N=5
for n in range(N):
    lock=threading.Lock()
    locks.append(lock)
    t=threading.Thread(target=f,args=(lock,n))
    t.start()

threading.Thread(target=h,args=(locks,)).start()

我仍然想了解为什么我的原始代码不能按预期工作。是否确实存在这样一种情况,即一个线程的新lock.acquire()可能会立即成功,而另一个线程的lock.acquire()已经被阻塞? - ke.
我对线程的理解主要是经验性的。我不认为我能够深入地解释这个问题。我会发布一些代码,我认为可能会让人们对正在发生的事情有一点了解。对于h-thread来说,要获取flock,h-thread必须获取GIL,并且在f-thread释放flock时获取。这是如何发生的机制并不是我可以详细解释的。但是代码(我将在上面发布)表明它没有以所需的频率发生。 - unutbu
进一步思考我的问题,当然可能发生这种情况——毕竟,无法保证获取锁的请求的顺序。我实际上假设的是,在线程T中释放锁并且某些请求当前正在阻塞时,至少有一个这些请求将在T中发生任何其他事情之前得到满足。但显然,这也不能保证。 - ke.

1

使用通信进行同步:

#!/usr/bin/env python
import threading
import time
from Queue import Empty, Queue

def f(q, c):
    while True:
        try: q.get_nowait(); q.get() # get PAUSE signal      
        except Empty: pass  # no signal, do our thing
        else: q.get()       # block until RESUME signal
        print c,
        time.sleep(1)

def h(queues):
    while True:
        for q in queues:
            q.put_nowait(1); q.put(1) # block until PAUSE received
        print 'h'
        for q in queues:
            q.put(1) # put RESUME
        time.sleep(5)

queues = [Queue(1) for _ in range(2)]
threading.Thread(target=f, args=(queues[0], 'f')).start()
threading.Thread(target=f, args=(queues[1], 'g')).start()
threading.Thread(target=h, args=(queues,)).start()

从性能角度来看,这可能不是最优的选择,但我发现这样更容易理解。

输出

f g
f g h
f g f g g f f g g f g f f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h
f g f g f g f g f g f g f g h

这难道不会冒并发执行 print c,print 'h' 的风险吗?比如,当 H 线程将暂停信号放入队列后继续执行其工作时,另外两个线程可能仍在工作,对吧?我想避免这种情况。 - ke.
@ke:是的,这是可能的。你说“h”线程每小时启动一次。我允许线程在一个周期内重叠。(由于Queue()不能有maxsize=0,因为它意味着无限制,所以您需要使用put()两次来阻止具有maxsize=1的队列。我已经更新了答案。 - jfs

1

最简单的方法是使用3个Python进程。如果您在Linux上进行此操作,则每小时的进程可以发送信号以使其他任务暂停,甚至可以杀死它们,然后在每小时任务完成后重新启动。不需要线程。

但是,如果您决定使用线程,请尽量不要在线程之间共享任何数据,只需来回发送消息(也称为数据复制而非数据共享)。线程很难正确处理。

但是,多个进程强制您不共享任何内容,因此更容易正确执行。如果您使用像0MQ http://www.zeromq.org这样的库来进行消息传递,则很容易从线程模型转换为多进程模型。


很抱歉,我的线程确实需要共享一些数据,这就是我一开始决定使用线程而不是进程的原因。 - ke.
你确定没有其他方式可以分享所需的数据吗?一种方法是通过消息发送副本,但我使用的另一种方法是将共享数据放入memcache中。这是一种重构程序使编写可靠高性能代码更容易的情况。 - Michael Dillon

0
这种方法怎么样(虽然这种方式具有争议性,因为我知道在多线程编程中“全局”变量被认为是大忌(我是新手,仍在学习)...)
import threading, time


import threading, time

def f():
    global BL
    while True:
        BL = 'f' if BL == -1 else BL
        if BL == 'f':
            print('f')
            BL = -1
            ss(0.1)

def g():
    global BL
    while True:
        BL = 'g' if BL == -1 else BL
        if BL == 'g':
            print('g')
            BL = -1
            ss(0.1)

def h():
    global BL
    while True:
        BL = 'h' if BL == -1 and (tt() - start) % delay_3rd <= 0.1 and (tt()-start) > 1 else BL
        if (BL == 'h'):
           print('h')
           print(f' seconds: {round(tt() - start,None)}!!'*100)
           BL = -1
           ss(0.1)


BL, delay_3rd, [ss], [tt]  = -1, 5, [time.sleep], [time.time]
start = tt()

第三个将每秒运行(您可以将delay_3rd设置为3600以进行每小时间隔;而前两个始终运行(根据您的请求/意图)

threading.Thread(target=f).start()
threading.Thread(target=g).start()
threading.Thread(target=h).start()

(运行约4-5秒后的输出...)

f

h 秒:5!!

g

f

g

f

f

g

f

h

g

f

g

h 秒:6!!

f

g

f

g

f

g

f

g

f

g

f

g

f

h

秒数:7!!

g

f

g

(注意,h 只会每隔一段时间出现;f 和 g 会间歇性地出现...)


(即最接近您原始提交的“代码”-将flock / glock替换为全局变量,然后就这样了..噢,是的,我包括一个“if语句”,即如果BL == -1,则BL ='f / g / h' if BL ... - JB-007

0

一个初始值为2的信号量怎么样?F和G等待并发出一个单位的信号,H等待并发出两个单位的信号。


你的意思是像上面的例子一样,但将所有对flock和glock的引用替换为对单个信号量对象的引用,使用threading.Semaphore(2)构造它?恐怕结果程序会出现相同的问题。 - ke.
嗯...我原本期待因为H等待任何一个释放,而不只是一个,所以有改进的性能。接下来我会尝试降低F、G的优先级或提高H的优先级,这样当一个单位变为空闲时,H就会运行并获取该单位,而不是F或G。H应该在两个循环中等待其两个单位,即两次调用一个单位,而不是一次调用两个单位。 - Martin James
在Python中设置线程优先级没有简单的方法,是吗? - ke.
另外,我不确定你所说的“在两个循环中等待其2个单位”的意思是什么。我尝试的是这样的:with semaphore: with semaphore: print 'h'这是你所要表达的意思吗? - ke.

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接