如何用Pythonic的方式创建一个非阻塞版本的对象?

5
我经常使用带有阻塞方法的Python对象,并且想要将这些方法转换成非阻塞版本。我经常执行以下模式:
  1. 定义对象
  2. 定义一个函数,该函数创建对象实例并解析调用对象方法的命令
  3. 定义一个“父”对象,它创建一个子进程来运行步骤2中定义的函数,并复制原始对象的方法。
这样做可以完成任务,但需要大量重复的代码,并且对我来说似乎不太符合Python的编程思想。是否有标准的更好的方法来实现这个功能?
以下是一个高度简化的示例,以说明我一直在使用的模式:
import ctypes
import Queue
import multiprocessing as mp

class Hardware:
    def __init__(
        self,
        other_init_args):
        self.dll = ctypes.cll.LoadLibrary('hardware.dll')
        self.dll.Initialize(other_init_args)

    def blocking_command(self, arg_1, arg_2, arg_3):
        """
        This command takes a long time to execute, and blocks while it
        executes. However, while it's executing, we have to coordinate
        other pieces of hardware too, so blocking is bad.
        """
        self.dll.Takes_A_Long_Time(arg_1, arg_2, arg_3)

    def change_settings(self, arg_1, arg_2):
        """
        Realistically, there's tons of other functions in the DLL we
        want to expose as methods. For this example, just one.
        """
        self.dll.Change_Settings(arg_1, arg_2)

    def close(self):
        self.dll.Quit()

def hardware_child_process(
    commands,
    other_init_args):
    hw = Hardware(other_init_args)
    while True:
        cmd, args = commands.recv()
        if cmd == 'start':
            hw.blocking_command(**args)
        elif cmd == 'change_settings':
            hw.change_settings(**args)
        elif cmd == 'quit':
            break
    hw.close()

class Nonblocking_Hardware:
    """
    This class (hopefully) duplicates the functionality of the
    Hardware class, except now Hardware.blocking_command() doesn't
    block other execution.
    """
    def __init__(
        self,
        other_init_args):
        self.commands, self.child_commands = mp.Pipe()
        self.child = mp.Process(
            target=hardware_child_process,
            args=(self.child_commands,
                  other_init_args))
        self.child.start()

    def blocking_command(self, arg_1, arg_2, arg_3):
        """
        Doesn't block any more!
        """
        self.commands.send(
            ('start',
             {'arg_1': arg_1,
              'arg_2': arg_2,
              'arg_3': arg_3}))

    def change_settings(self, arg_1, arg_2):
        self.commands.send(
            ('change_settings',
             {'arg_1': arg_1,
              'arg_2': arg_2}))

    def close(self):
        self.commands.send(('quit', {}))
        self.child.join()
        return None
背景: 我使用Python控制硬件,通常通过使用ctypes调用闭源DLL进行控制。经常情况下,我想要调用DLL中的某些函数,这些函数会阻塞执行直到完成,但我不希望我的控制代码被阻塞。例如,我可能正在使用模拟输出卡将相机与照明同步。在向相机发送触发脉冲之前必须调用相机DLL的“快照”函数,但是“快照”命令会阻塞,从而防止我激活模拟输出卡。

1
ctypes 释放 GIL,因此使其非阻塞的标准方法是使用线程 -- Twisted 的 deferToThread()asynciorun_in_executor() 或其他一些在线程池中运行函数的方式 (multiprocessing.pool.ThreadPoolconcurrent.futures.ThreadPoolExecutor)。 - jfs
2个回答

2

我曾通过使用元类来创建非阻塞函数的版本来完成类似的事情。这使得你可以通过这样做来创建一个非阻塞版本的类:

class NB_Hardware(object):
    __metaclass__ = NonBlockBuilder
    delegate = Hardware
    nb_funcs = ['blocking_command']

我已经拿出了我的原始实现,该实现针对Python 3,并使用了concurrent.futures.ThreadPoolExecutor(我将阻塞I/O调用包装为非阻塞的asyncio上下文*),并将它们适应于使用Python 2和concurrent.futures.ProcessPoolExecutor。这是元类及其帮助类的实现:

from multiprocessing import cpu_count
from concurrent.futures import ProcessPoolExecutor

def runner(self, cb, *args, **kwargs):
    return getattr(self, cb)(*args, **kwargs)

class _ExecutorMixin():
    """ A Mixin that provides asynchronous functionality.

    This mixin provides methods that allow a class to run
    blocking methods in a ProcessPoolExecutor.
    It also provides methods that attempt to keep the object
    picklable despite having a non-picklable ProcessPoolExecutor
    as part of its state.

    """
    pool_workers = cpu_count()

    def run_in_executor(self, callback, *args, **kwargs):
        """  Runs a function in an Executor.

        Returns a concurrent.Futures.Future

        """
        if not hasattr(self, '_executor'):
            self._executor = self._get_executor()

        return self._executor.submit(runner, self, callback, *args, **kwargs)

    def _get_executor(self):
        return ProcessPoolExecutor(max_workers=self.pool_workers)

    def __getattr__(self, attr):
        if (self._obj and hasattr(self._obj, attr) and
            not attr.startswith("__")):
            return getattr(self._obj, attr)
        raise AttributeError(attr)

    def __getstate__(self):
        self_dict = self.__dict__
        self_dict['_executor'] = None
        return self_dict

    def __setstate__(self, state):
        self.__dict__.update(state)
        self._executor = self._get_executor()

class NonBlockBuilder(type):
    """ Metaclass for adding non-blocking versions of methods to a class.  

    Expects to find the following class attributes:
    nb_funcs - A list containing methods that need non-blocking wrappers
    delegate - The class to wrap (add non-blocking methods to)
    pool_workers - (optional) how many workers to put in the internal pool.

    The metaclass inserts a mixin (_ExecutorMixin) into the inheritence
    hierarchy of cls. This mixin provides methods that allow
    the non-blocking wrappers to do their work.

    """
    def __new__(cls, clsname, bases, dct, **kwargs):
        nbfunc_list = dct.get('nb_funcs', [])
        existing_nbfuncs = set()

        def find_existing_nbfuncs(d):
            for attr in d:
                if attr.startswith("nb_"):
                    existing_nbfuncs.add(attr)

        # Determine if any bases include the nb_funcs attribute, or
        # if either this class or a base class provides an actual
        # implementation for a non-blocking method.
        find_existing_nbfuncs(dct)
        for b in bases:
            b_dct = b.__dict__
            nbfunc_list.extend(b_dct.get('nb_funcs', []))
            find_existing_nbfuncs(b_dct)

        # Add _ExecutorMixin to bases.
        if _ExecutorMixin not in bases:
            bases += (_ExecutorMixin,)

        # Add non-blocking funcs to dct, but only if a definition
        # is not already provided by dct or one of our bases.
        for func in nbfunc_list:
            nb_name = 'nb_{}'.format(func)
            if nb_name not in existing_nbfuncs:
                dct[nb_name] = cls.nbfunc_maker(func)

        return super(NonBlockBuilder, cls).__new__(cls, clsname, bases, dct)

    def __init__(cls, name, bases, dct):
        """ Properly initialize a non-blocking wrapper.

        Sets pool_workers and delegate on the class, and also
        adds an __init__ method to it that instantiates the
        delegate with the proper context.

        """
        super(NonBlockBuilder, cls).__init__(name, bases, dct)
        pool_workers = dct.get('pool_workers')
        delegate = dct.get('delegate')
        old_init = dct.get('__init__')
        # Search bases for values we care about, if we didn't
        # find them on the child class.
        for b in bases:
            if b is object:  # Skip object
                continue
            b_dct = b.__dict__
            if not pool_workers:
                pool_workers = b_dct.get('pool_workers')
            if not delegate:
                delegate = b_dct.get('delegate')
            if not old_init:
                old_init = b_dct.get('__init__')

        cls.delegate = delegate

        # If we found a value for pool_workers, set it. If not,
        # ExecutorMixin sets a default that will be used.
        if pool_workers:
            cls.pool_workers = pool_workers

        # Here's the __init__ we want every wrapper class to use.
        # It just instantiates the delegate object.
        def init_func(self, *args, **kwargs):
            # Be sure to call the original __init__, if there
            # was one.
            if old_init:
                old_init(self, *args, **kwargs)

            if self.delegate:
                self._obj = self.delegate(*args, **kwargs)
        cls.__init__ = init_func

    @staticmethod
    def nbfunc_maker(func):
        def nb_func(self, *args, **kwargs):
            return self.run_in_executor(func, *args, **kwargs)
        return nb_func

使用方法:

from nb_helper import NonBlockBuilder
import time


class Hardware:
    def __init__(self, other_init_args):
        self.other = other_init_args

    def blocking_command(self, arg_1, arg_2, arg_3):
        print("start blocking")
        time.sleep(5)
        return "blocking"

    def normal_command(self):
        return "normal"


class NBHardware(object):
    __metaclass__ = NonBlockBuilder
    delegate = Hardware
    nb_funcs = ['blocking_command']


if __name__ == "__main__":
    h = NBHardware("abc")
    print "doing blocking call"
    print h.blocking_command(1,2,3)
    print "done"
    print "doing non-block call"
    x = h.nb_blocking_command(1,2,3)  # This is non-blocking and returns concurrent.future.Future
    print h.normal_command()  # You can still use the normal functions, too.
    print x.result()  # Waits for the result from the Future

输出:

doing blocking call
start blocking
< 5 second delay >
blocking
done
doing non-block call
start blocking
normal
< 5 second delay >
blocking

你需要注意的一个棘手问题是确保Hardware可被pickle。你可以通过让__getstate__删除dll对象,并在__setstate__中重新创建它来实现,类似于_ExecutorMixin的做法。
你还需要Python 2.x concurrent.futures的后备库
请注意,在元类中有许多复杂性,以便它们能够正确地与继承一起工作,并支持提供__init__nb_*方法的自定义实现。例如,像这样的东西是受支持的:
class AioBaseLock(object):
    __metaclass__ = NonBlockBuilder
    pool_workers = 1
    coroutines = ['acquire', 'release']

def __init__(self, *args, **kwargs):
    self._threaded_acquire = False
    def _after_fork(obj):
        obj._threaded_acquire = False
    register_after_fork(self, _after_fork)

def coro_acquire(self, *args, **kwargs):
    def lock_acquired(fut):
        if fut.result():
            self._threaded_acquire = True

    out = self.run_in_executor(self._obj.acquire, *args, **kwargs)
    out.add_done_callback(lock_acquired)
    return out

class AioLock(AioBaseLock):
    delegate = Lock


class AioRLock(AioBaseLock):
    delegate = RLock

如果您不需要那种灵活性,您可以简化实现:
class NonBlockBuilder(type):
    """ Metaclass for adding non-blocking versions of methods to a class.  

    Expects to find the following class attributes:
    nb_funcs - A list containing methods that need non-blocking wrappers
    delegate - The class to wrap (add non-blocking methods to)
    pool_workers - (optional) how many workers to put in the internal pool.

    The metaclass inserts a mixin (_ExecutorMixin) into the inheritence
    hierarchy of cls. This mixin provides methods that allow
    the non-blocking wrappers to do their work.

    """
    def __new__(cls, clsname, bases, dct, **kwargs):
        nbfunc_list = dct.get('nb_funcs', [])

        # Add _ExecutorMixin to bases.
        if _ExecutorMixin not in bases:
            bases += (_ExecutorMixin,)

        # Add non-blocking funcs to dct, but only if a definition
        # is not already provided by dct or one of our bases.
        for func in nbfunc_list:
            nb_name = 'nb_{}'.format(func)
            dct[nb_name] = cls.nbfunc_maker(func)

        return super(NonBlockBuilder, cls).__new__(cls, clsname, bases, dct)

    def __init__(cls, name, bases, dct):
        """ Properly initialize a non-blocking wrapper.

        Sets pool_workers and delegate on the class, and also
        adds an __init__ method to it that instantiates the
        delegate with the proper context.

        """
        super(NonBlockBuilder, cls).__init__(name, bases, dct)
        pool_workers = dct.get('pool_workers')
        cls.delegate = dct['delegate']

        # If we found a value for pool_workers, set it. If not,
        # ExecutorMixin sets a default that will be used.
        if pool_workers:
            cls.pool_workers = pool_workers

        # Here's the __init__ we want every wrapper class to use.
        # It just instantiates the delegate object.
        def init_func(self, *args, **kwargs):
            self._obj = self.delegate(*args, **kwargs)
        cls.__init__ = init_func

    @staticmethod
    def nbfunc_maker(func):
        def nb_func(self, *args, **kwargs):
            return self.run_in_executor(func, *args, **kwargs)
        return nb_func

* 原始代码在这里,供参考。


2

我曾经使用的一种启动类方法异步执行的方法是创建一个池,并使用apply_async调用几个函数别名,而不是直接调用类方法。

假设您有一个更简单版本的类:

class Hardware:
    def __init__(self, stuff):
        self.stuff = stuff
        return

    def blocking_command(self, arg1):
        self.stuff.call_function(arg1)
        return

在你的模块顶层,定义一个新函数,它看起来像这样:
def _blocking_command(Hardware_obj, arg1):
    return Hardware_obj.blocking_command(Hardware_obj, arg1)

由于类和这个“别名”函数都在模块的顶层定义,它们是可被pickle序列化的,您可以使用multiprocessing库来启动它:

import multiprocessing

hw_obj = Harware(stuff)
pool = multiprocessing.Pool()

results_obj = pool.apply_async(_blocking_command, (hw_obj, arg1))

您的函数调用结果将会存储在results对象中。我喜欢这种方法,因为它只需要添加几个两行的函数,而不需要添加任何类或者额外的导入,就能使并行化变得更加容易。
注意事项:
  1. 请勿对需要修改对象属性的方法使用此方法,但是如果在所有类属性都被设置之后使用,则可以正常工作,有效地将类属性视为“只读”。
  2. 您也可以在类方法内部使用此方法来启动其他类方法,只需明确传递“self”即可。这可以让您将浮动的“hardware_child_process”函数移动到类中。它仍然会作为一堆异步进程的分发器,但它会将该功能集中在您的Hardware类中。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接