如何减少Python中的线程切换延迟

5

我有一个Python 2.7的应用程序,其中有3个生产者线程和1个消费者线程连接到Queue.queue。我使用getput,生产者线程大部分时间都被阻塞在IO(从串行端口读取)上,基本上什么也不做,就是调用serial.read()

然而,我似乎有一个我认为很高的延迟时间,即生产者线程放入队列到消费者线程获取队列的时间,大约25毫秒(我在Angstrom Linux上运行一个1 GHz的1处理器Beagle Bone Black)。

我认为,如果所有进程都被阻塞,那么putget之间的经过时间应该非常小,只有几微秒左右,而不是几十毫秒,除非消费者线程实际上很忙(这里不是这种情况)。

我读了一些在线文章,有些文章提到Python有业务自旋的问题,Python中的GIL是罪魁祸首。我想我不想知道原因,只是想得到更快响应的东西。实际的串行传输延迟(大约1-2毫秒)我可以接受。

代码基本上看起来像这样:

q = Queue.queue

def a1(): 
   while True:
      p = read_serial_packet("/dev/ttyO1")
      p.timestamp = time.time()
      q.put(p)

def a2(): 
   while True:
      p = read_serial_packet("/dev/ttyO2")
      p.timestamp = time.time()
      q.put(p)

def a3(): 
   while True:
      p = read_serial_packet("/dev/ttyO3")
      p.timestamp = time.time()
      q.put(p)

def main():
   while True:
      p = q.get()
      d = time.time() - p.timestamp
      print str(d)

现在有4个线程在运行,分别是a1a2a3main

以下是一些示例时间:

0.0119640827179
0.0178141593933
0.0154139995575
0.0192430019379
0.0185649394989
0.0225830078125
0.018187046051
0.0234098434448
0.0208261013031
0.0254039764404
0.0257620811462

这个问题在Python 3中是否得到了“解决”?

@roippi - 感谢您的建议,但似乎并没有对答案产生太大影响。我在 q.put() 后插入了 time.sleep(0)。延迟仍然是 0.013 到 0.028 秒。它可能只削减了几毫秒。 - Mark Lakata
1
你尝试过计时time.time()吗?有时仅仅是询问时间就会浪费几毫秒(这取决于时间函数和操作系统)。去除粒度计时信息并执行整体基准测试可能是值得的。此外,也许可以考虑使用Python的'multiprocess'模块,而不是使用线程。 - fileoffset
@Alp - 在我做时序实验的同时,消费者线程只是在执行 p.get() 。我的测量对象是 put()get() 之间的时间,而不是 get()get() 之间的时间,后者包括处理接收到的对象的时间。 - Mark Lakata
在这个平台上,time.time()似乎是合理的。它可能不是纳秒级别精确的,但连续调用会在微秒级别上产生差异。我始终得到10毫秒左右的差值。 - Mark Lakata
@Alp - 我明白你的意思。我忘了在这个例子中提到,其他线程(a2、a3)没有做任何事情。我只是在示例中放置了这些线程来激发对线程安全队列的需求,但实际上,这些线程是空闲的。真正起作用的只有 a1main 提供数据。 - Mark Lakata
显示剩余4条评论
2个回答

2

正如 @fileoffset 暗示的那样,答案似乎是从使用 threading(由于Python GIL实际上并没有进行“真正”的线程处理而受到影响)切换到使用 multiprocessing,后者使用多个Python进程而不是线程。

从线程转换为多进程看起来像这样:

useMP = True  # or False if you want threading

if useMP:
    import multiprocessing
    import multiprocessing.queues
    import Queue # to import Queue.Empty exception, but don't use Queue.Queue
else:
    import threading
    import Queue

...


    if useMP:
        self.event_queue = multiprocessing.queues.Queue()
        t1 = multiprocessing.Process(target=self.upstream_thread)
        t2 = multiprocessing.Process(target=self.downstream_thread)
        t3 = multiprocessing.Process(target=self.scanner_thread)
    else :
        self.event_queue = Queue.Queue()
        t1 = threading.Thread(target=self.upstream_thread)
        t2 = threading.Thread(target=self.downstream_thread)
        t3 = threading.Thread(target=self.scanner_thread)

其余的API看起来都一样。

然而,还有一个重要的问题不容易迁移,需要留作练习。这个问题是捕获Unix信号,例如SIGINT或Ctrl-C处理程序。以前,主线程捕获信号,所有其他线程忽略它。现在,信号被发送到所有进程。因此,您必须小心捕获KeyboardInterrupt并安装信号处理程序。我认为我没有用正确的方式做到这一点,所以我不打算详细说明... :)


0
你可以尝试调整"检查间隔"的值。
sys.setcheckinterval(50)

关于这个概念的简要解释可以在这些幻灯片中找到,大约从第10页开始。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接