使用sys.stdout.write与pool.map进行多进程 - 共享sys.stdout

3

我可能遗漏了一些非常简单的东西。

为什么我不能使用pool.map(sys.stdout.write, iterable)

我可以使用pool.map(len, iterable)使用相同的可迭代对象,但是当使用sys.stdout.write时,我会收到以下异常:

TypeError: expected string or Unicode object, NoneType found

这是跟踪信息:

Traceback (most recent call last):
  File "/home/reut/python/print_mult.py", line 19, in <module>
    pool.map(sys.stdout.write, messages)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 251, in map
    return self.map_async(func, iterable, chunksize).get()
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 558, in get
    raise self._value
TypeError: expected string or Unicode object, NoneType found

完整代码:

#!/usr/bin/env python

import multiprocessing
import sys

# pool of 10 workers
pool = multiprocessing.Pool(10)
messages = ["message #%d\n" % i for i in range(100)]
print messages
pool.map(sys.stdout.write, messages) # doesn't work - error
# print pool.map(len, messages) # works

编辑 #1 - 线程池可行:

当我使用来自multiprocessing.poolThreadPool时,它是可行的,因此我认为它与无法在进程间共享sys.stdout流有关。

编辑 #2 - 手动进程也可行:

from multiprocessing import Process
import sys

# pool of 10 workers
processes = []
for i in range(10):
    processes.append(Process(target=sys.stdout.write, args=("I am process %d" % i, )))

for p in processes:
    p.start()

for p in processes:
    p.join()

现在我感到困惑的原因是,我知道常规进程和map进程之间的区别在于它分叉的点。但我不确定它在这里的相关性。唯一能想到的是,map会在内部存储target并且无法像Process的手动构造函数一样与工作进程共享。


我成功解决了它。 - Reut Sharabani
2
请写下答案,因为我对此感到惊讶。 - pcurry
@pcurry 我也感到惊讶。只需要确认这是否是实际原因。我该如何确保?(添加编辑) - Reut Sharabani
2个回答

3
真正的错误被隐藏了。您只能传递一个可以直接从模块命名空间引用的函数。但是,在某些情况下,有方法可以绕过此限制。Unix具有特殊功能,可以分叉进程并复制其所有内存。这就是实例方法可以被“传递”到子进程的方式--实际上没有任何东西被传递。在Windows平台上,进程无法分叉,而必须生成。这意味着启动了一个新的解释器。为了运行给定的函数,解释器会发送要运行的函数名称和所在的模块名称。解释器导入模块并查找函数,最后运行函数。
对于作为池一部分的进程,该进程已经启动,因此不能通过分叉来接收要运行的适当函数/方法的副本。相反,它必须使用与生成新进程时相同的技术。这就是为什么第二次编辑可以正常工作,但池不能正常工作的原因。
解决问题的最简单方法是将打印语句更改为打印函数。
from __future__ import print_function

import multiprocessing
import sys

if __name__ == '__main__':
    pool = multiprocessing.Pool(2)
    messages = ["message #%d\n" % i for i in range(5)]
    print(messages) # <- notice the brackets around the arguments to print
    pool.map(print, messages)

如果无法实现上述方式,您可以定义一个函数来代替打印操作,并将其用作map的函数参数。
import multiprocessing 
import sys

def stdout_write(arg):
    sys.stdout.write(arg)

def stdout_print(arg):
    print arg

if __name__ == '__main__':
    pool = multiprocessing.Pool(2)
    messages = ["message #%d\n" % i for i in range(5)]
    print messages
    pool.map(stdout_print, messages)

这如何与第二个编辑相吻合?你可以将它(传递实例方法...我想)作为进程执行。还是将其设置为目标不同? - Reut Sharabani
1
给出一个更加详细的答案,解释幕后到底发生了什么。 - Dunes
我认为在你代码的第二部分中不需要使用stdout_write。明天我会更深入地研究你的答案,这是目前为止最好的答案。 - Reut Sharabani

2

我不确定为什么,但是pool.map()要求函数返回一个字符串。

对你的程序进行这个简单的更改可以正确运行。

#!/usr/bin/env python

import multiprocessing
import sys

def prn(s):
    sys.stdout.write(s)
    return ''

# pool of 10 workers
pool = multiprocessing.Pool(10)
messages = ["message #%d\n" % i for i in range(100)]
print messages
pool.map(prn, messages) # doesn't work - error
# print pool.map(len, messages) # works

我查阅了文档,没有发现这个要求,所以我不知道为什么要执行它。

你能说出 ProcessPool 之间的实现差异吗?(我知道它们完全不同,但即使是一个想法也可以教会很多东西...)。我试着查看了源代码,在调用 task_queue.put(...) 的部分放弃了。 - Reut Sharabani
我建议您发布一个新问题,标题为“为什么pool.map()强制要求函数返回值?”可能已经有人不用阅读源代码就知道答案了。今天我没有时间挖掘源代码来回答这个问题,否则我会好好看看……这很有趣。 - steveha
我知道map()函数期望函数返回一个值,但我不明白为什么这个值不能是None。如果你只使用简单的map()(Python 2.x内置,Python 3.x从模块导入),你肯定可以使用返回None的函数。 - steveha
我认为那个由None组成的列表与此无关。你可以这样做:pool.map(return_none, range(10))(显然,return_none是在定义池之前定义的返回None的函数)。 - Reut Sharabani

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接