Python多进程:在异步调用中调用方法和传递对象

3

我正在尝试使用 apply_async (https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.AsyncResult) 方法实现两个目标:

(i) 调用类方法

(ii) 传递对象作为参数

到目前为止,我有以下基线代码:

import multiprocessing as mp

class myClass():
  def __init__(self, id):
    self.id = id
    self.val = 1.0
    self.pool = None

  def callback(self, obj):
    self.val = obj.val

def foo(new_val):  # foo is outside myClass
    print ('foo passed with', new_val)
    c1.val = new_val
    return c1

if __name__ == '__main__':
  c1 = myClass('c1')
  c1.pool = mp.Pool(processes=1)
  c1.pool.apply_async(foo, args=(2.0, ), callback=c1.callback).wait()  
  c1.pool.close()
  c1.pool.join()
  print ('c1.val:', c1.val)  # should display 'c1 val: 2.0'

输出:

foo passed with 2.0
c1.val: 2.0

尝试使用下面的代码去实现(i),但是得到的输出结果与之前不同。
class myClass():
  def __init__(self, id):
    self.id = id
    self.val = 1.0
    self.pool = None

  def callback(self, obj):
    self.val = obj.val

  def foo(self, new_val):  # foo is inside myClass
      print ('foo passed with', new_val)
      self.val = new_val
      return self

if __name__ == '__main__':
  c1 = myClass('c1')
  c1.pool = mp.Pool(processes=1)
  c1.pool.apply_async(c1.foo, args=(2.0, ), callback=c1.callback).wait()  
  c1.pool.close()
  c1.pool.join()
  print ('c1.val:', c1.val)  # should display 'c1 val: 2.0'

输出:

c1.val: 1.0

同样地,当我尝试完成(ii)时,foo不会再次被调用。
class myClass():
  def __init__(self, id):
    self.id = id
    self.val = 1.0
    self.pool = None

  def callback(self, obj):
    self.val = obj.val

def foo(obj, new_val):  # foo is outside myClass
    print ('foo passed with', new_val)
    obj.val = new_val
    return obj

if __name__ == '__main__':
  c1 = myClass('c1')
  c1.pool = mp.Pool(processes=1)
  c1.pool.apply_async(foo, args=(c1, 2.0, ), callback=c1.callback).wait()  
  c1.pool.close()
  c1.pool.join()
  print ('c1.val:', c1.val)  # should display 'c1 val: 2.0'

输出:

c1.val: 1.0

有什么想法可以改变上面的代码来实现(i)和(ii)?

1个回答

3
调用未能完成而引发了异常。您可以使用multiprocessing.pool.AsyncResult.successful方法来检查:
import multiprocessing as mp


class myClass():
    def __init__(self, id):
        self.id = id
        self.val = 1.0
        self.pool = None

    def callback(self, obj):
        self.val = obj.val

    def foo(self, new_val):
        print ('foo passed with', new_val)
        self.val = new_val
        return self

if __name__ == '__main__':
    c1 = myClass('c1')
    c1.pool = mp.Pool(processes=1)
    async_result = c1.pool.apply_async(c1.foo, args=(2.0, ), callback=c1.callback)
    async_result.wait()
    print(async_result.successful())  # this is printing False!!!
    c1.pool.close()
    c1.pool.join()
    print ('c1.val:', c1.val)

现在你可以定义一个error_callback函数来查看发生了什么:
...
async_result = c1.pool.apply_async(c1.foo, args=(2.0, ), callback=c1.callback, error_callback=lambda x: print(x))
...

这是该函数打印的错误信息:
pool objects cannot be passed between processes or pickled

这个SO问题中,您可以找到更多关于这种情况的信息。问题是代码必须pickle发送到它启动的子进程的东西,而pickler不会执行实例方法。


1
"pickler不支持实例方法"。这在Python 3+中不是问题,实例方法非常可被pickle(事实上,它们甚至不会被pickle,只有它们相关的名称)。错误实际上是因为c1.pool = mp.Pool(processes=1)这行代码引起的。将pool对象(一个无法pickle的对象)作为实例属性进行存储才导致了错误,因为如果目标函数是实例方法或者参数包括实例本身,所有实例属性都会隐式地进行pickle。因此,不要将池存储在实例中就可以解决这个问题。 - Charchit Agarwal

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接