如何给发电机穿线。

3
我有一个生成器对象,它加载了大量数据并占用了系统的I/O。数据太大无法一次性全部放入内存,因此使用了生成器。 我还有一个消费者,它利用CPU处理生成器产生的数据。它不会占用太多其他资源。是否可能使用线程交替执行这些任务? 例如,我猜可以在11秒钟内运行下面简化的代码。
import time, threading
lock = threading.Lock()
def gen():
    for x in range(10):
        time.sleep(1)
        yield x
def con(x):
    lock.acquire()
    time.sleep(1)
    lock.release()
    return x+1

然而,最简单的线程应用程序并不在那个时间内运行。它确实加快了速度,但我认为这是由于生成器和工作线程之间的并行性,而不是由于工作线程之间的并行性。

import joblib
%time joblib.Parallel(n_jobs=2,backend='threading',pre_dispatch=2)((joblib.delayed(con)(x) for x in gen()))
# CPU times: user 0 ns, sys: 0 ns, total: 0 ns
# Wall time: 16 s

如果你的问题是“是否可能...?”,那么简单的回答是是。Python中的线程不能并行运行,你需要使用多个进程。如果你的数据可以分块处理,你可以使用线程来进行输入/输出绑定的数据获取和分发,每个线程将数据传递给一个或多个进程,以便获取数据。你需要找到一种方法来限制资源使用。Python提供了许多内置工具:线程、多进程、concurrent.futures、子进程、asyncio等。 - undefined
没有更多的细节,很难推荐一种策略。在SO上推荐策略可能与主题无关 - 但我会让社区决定。这里有很多问题和答案涉及到你的问题,也许不断优化搜索可以帮助你定义策略。concurrent.futures源代码的模块文档字符串中有一个很好的图示,展示了他们如何使用线程来提供进程。 - undefined
嗯,你的理解“Is it possible ...?”是可以的。然而,标题是“How to ... ?”。这个问题提供了一个具体的例子和关于该例子的问题。 - undefined
这是我写的一个回答,其中提到了一些关于内存消耗问题的负面反馈 - 它使用concurrent.futures来向进程提供数据。这是另一个回答。如果你的生成器有效,似乎你并不真正需要线程,只需要一种将数据提供给多个进程的方法。再次强调,你没有明确说明,但我们可以假设数据可以按块进行处理吗?也许我误解了你的问题 - 这是我有的一个坏习惯。 - undefined
@wwii 这个问题涉及到线程。一个单独的消费者使用所有的 CPU,主要使用已释放 GIL 的单个块进行处理。第一段特别写作是为了将问题集中在线程而不是进程上。 - undefined
我用线程的解决方案编辑了我的答案。 - undefined
2个回答

2
将您的数据发送到独立进程。我使用 concurrent.futures,因为我喜欢简单的 接口
在我的电脑上运行大约需要11秒钟。
from concurrent.futures import ThreadPoolExecutor
import concurrent
import threading
lock = threading.Lock()

def gen():
    for x in range(10):
        time.sleep(1)
        yield x

def con(x):
    lock.acquire()
    time.sleep(1)
    lock.release()
    return f'{x+1}'

if __name__ == "__main__":

    futures = []
    with ThreadPoolExecutor() as executor:
        t0 = time.time()
        for x in gen():
            futures.append(executor.submit(con,x))
    results = []
    for future in concurrent.futures.as_completed(futures):
        results.append(future.result())
    print(time.time() - t0)
    print('\n'.join(results))

使用100个生成器迭代(def gen(): for x in range(100):)大约需要102秒。


你的 进程 可能需要跟踪已发送到尚未完成任务的数据量,以防止过多消耗内存资源。

con 添加一些诊断打印似乎显示可能有至少两个数据块同时存在。

def con(x):
    print(f'{x} received payload at t0 + {time.time()-t0:3.3f}')
    lock.acquire()
    time.sleep(1)
    lock.release()
    print(f'{x} released lock at t0 + {time.time()-t0:3.3f}')
    return f'{x+1}'

2
我创建了这个问题来寻找一个习惯用语的for循环模式替代方案。虽然wwii的答案解决了问题,但它有一个警告,即如果生成器的输出很大,它可能会超过消费者线程并占用内存。我也更喜欢joblib。
问题在于问题文本中的joblib代码在主线程中迭代gen,因此它花费时间等待gen而不是分派作业。当输入生成器与joblib慢时,我已经放弃尝试理解调度是如此奇怪。但是,在将生产者和消费者都移动到延迟函数内后,我确实设法使其正确执行。
当可迭代长度实际上是预先知道的(例如要逐个处理的文件列表)时,代码很简单。下面的代码确保只有一个线程同时进行数据生成和数据消耗。
sync_gen,sync_con = threading.Lock(), threading.Lock()
@joblib.delayed
def work(iterable):
    with sync_gen:
        x = next(iterable)
    with sync_con:
        return con(x)

N=10
iterable = gen()
res1 = joblib.Parallel(2,'threading')(work(iterable) for x in range(N))
#[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

如果发电机的长度未知,则线程工作者最好累积他们的结果,而不是处理单个输入。
sync_gen,sync_con = threading.Lock(), threading.Lock()
def thread_safe(gen):
    try:
        while True:
            with sync_gen:
                x = next(gen)
            yield x
    except StopIteration:
        pass

def work2(safe_iterable):
    res = []
    for x in safe_iterable:
        with sync_con:
            res.append(con(x))
    return res

iterable = gen()
de_work2= joblib.delayed(work2)
res2 = joblib.Parallel(2,'threading')(de_work2(thread_safe(iterable)) for x in range(2))
#[[1, 3, 5, 7, 9], [2, 4, 6, 8, 10]]

或者使用ThreadPoolExecutor:
from concurrent.futures import ThreadPoolExecutor
iterable = gen()
with ThreadPoolExecutor() as e:
    futures = [e.submit(work2,thread_safe(iterable)) for x in range(2)]
res = [future.result() for future in futures]

我认为你可以接受自己的答案。 - undefined

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接