如何在Python中实现非阻塞无限循环

4
我有一个无限循环,从网络摄像头读取视频帧,并且每个帧都会通过高计算力的复杂函数。因此,在显示帧时,由于阻塞代码,程序会感到有些卡顿。
现在我想要做的是:
- 仅在目标对象出现时收集前几帧 - 将它们放入单独的线程中以避免代码阻塞
我已经测量了摄像头每秒捕获的帧数,约为28帧。因此,while循环每秒只会收集前5帧并在另一个线程中处理所有帧,等到所有5个函数完成后再返回结果。
我尝试使用“Pool”和“Queue”,但无法让它工作,while循环仍然被阻塞。下面的代码模糊地表示了我的程序现在的样子,我回家后会进行编辑,现在使用手机发布。
def detect(frame):
    # detect target object from images
    pass

def nn(frame):
    # some heavy processing code
    pass

count = 0
stack = []

while True:
    frame = cv2.imread(0)

    detected = detect(frame)

    # stop collecting images when collected 5
    if detected and count <= 5:
        stack.append(frame)
        count += 1

    # start processing
    if len(stack) == 5:
        p = Pool(4)
        results = p.map(nn, frame)
        p.close()
        p.join()

        # reset
        stack = []
        count = 0

我理解正确吗?还是需要像协程一样做些其他事情?


尝试使用多进程。这是我使用ping制作的一个简单示例(请阅读我的注释中关于CPU绑定任务的说明): https://gist.github.com/ltpitt/6cdcb5377c462445a315703c5062397c - Pitto
我明白了,我会更新帖子的 :) - Evhz
1个回答

1
我使用 rq 解决了这个问题。
这是一个适用于 Python 的简单消息队列。 首先,你需要异步运行所需的方法实现。
它将运行你的 nn 函数,
然后,为消息队列设置一个简单的配置, 我使用 redis 包中的 connectionPool。
基本上,你将整个任务发送到由 rq 工作进程执行的并行进程中。
def nn(frame):
    # some heavy processing code
    pass

def asynch_call(frame):
   p = Pool(4)
   results = p.map(nn, frame)
   p.close()
   p.join()

pool = redis.ConnectionPool(
  host=HOST, 
  port=PORT, 
  password=PASS, 
  db=0)

r = redis.Redis(connection_pool=pool)  
q = Queue('nn_queue', connection=r)

count = 0
stack = []

while True:
    frame = cv2.imread(0)

    detected = detect(frame)

    # stop collecting images when collected 5
    if detected and count <= 5:
        stack.append(frame)
        count += 1

    # start processing
    if len(stack) == 5:

        job = q.enqueue(asynch_call, frame, timeout=any_long_timeout )

        if job.status=='queued':
            print("Job submission ok")

            # reset
            stack = []
            count = 0

为了启动处理异步调用的工作线程,您有几个选项:自己编写 Worker 代码或只需在单独的终端中运行以下命令。
rq worker nn_queue

请查看上面使用的带有队列名称的命令以发送作业。
希望这能帮到您。


我稍微修正了一下答案。祝好运! - Evhz
谢谢。有点偏离原问题,但是rq的队列和Python自身的队列有什么区别? - Moore Tech
总的来说,可能是一样的。Rq在Redis之上。与纯Python队列的实现相比,Rq可能只是您当前设计的更简单的集成。 - Evhz
首先,你的答案非常好用!我已经通过你的实现实现了非阻塞代码。其次,使用队列,这是否意味着所有我的帧都将按顺序处理?如果是的话,我该如何通过并行化此过程来加快速度? - Moore Tech
你仍需要同时使用Python队列和多线程。不过要记住,大量的线程共享内存会引起很多问题。你的做法是正确的。为了更好的速度,应该运行独立的进程(并行)将帧注入消息队列中,然后消费者通过异步调用从队列中提取帧,并使用多线程p.map(nn, frame)处理它们。这与你现在所做的并没有太大差别。 - Evhz
@Evhz 你好!我的while循环每180秒就会超时并停止。有没有办法让循环无限进行下去?或者我只需要设置timeout=5000h之类的东西。此外,在RQ中有没有一种方法可以杀死当前正在运行的作业?我尝试使用start_registry = StartedJobRegistry(queue=q)和在for循环中删除作业,但这并没有起作用。 - Mike Sandstrom

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接