这个多线程函数是异步的吗?

4

尽管查看了其他线程,但我仍然有点困惑,以下两种情况是否成立:

  • 所有异步代码都是多线程的
  • 所有多线程函数都是异步的

我的初步猜测是两者都不是,并且适当的异步代码应该能够在一个线程中运行-但是可以通过添加线程来改进,例如:

enter image description here

因此,我构建了这个玩具示例:

from threading import *
from queue import Queue
import time

def do_something_with_io_lag(in_work):
    out = in_work
    # Imagine we do some work that involves sending
    # something over the internet and processing the output
    # once it arrives
    time.sleep(0.5) # simulate IO lag
    print("Hello, bee number: ",
          str(current_thread().name).replace("Thread-",""))

class WorkerBee(Thread):
    def __init__(self, q):
        Thread.__init__(self)
        self.q = q

    def run(self):
        while True:
            # Get some work from the queue
            work_todo = self.q.get()
            # This function will simiulate I/O lag
            do_something_with_io_lag(work_todo)
            # Remove task from the queue
            self.q.task_done()

if __name__ == '__main__':
    def time_me(nmbr):
        number_of_worker_bees = nmbr
        worktodo = ['some input for work'] * 50

        # Create a queue
        q = Queue()
        # Fill with work
        [q.put(onework) for onework in worktodo]
        # Launch processes
        for _ in range(number_of_worker_bees):
            t = WorkerBee(q)
            t.start()
        # Block until queue is empty
        q.join()

    # Run this code in serial mode (just one worker)
    %time time_me(nmbr=1)
    # Wall time: 25 s
    # Basically 50 requests * 0.5 seconds IO lag
    # For me everything gets processed by bee number: 59

    # Run this code using multi-tasking (launch 50 workers)
    %time time_me(nmbr=50)
    # Wall time: 507 ms
    # Basically the 0.5 second IO lag + 0.07 seconds it took to launch them
    # Now everything gets processed by different bees

它是异步的吗?

对我来说,这段代码似乎不是异步的,因为它是我的示例图中的第3个图。I/O调用会阻塞线程(尽管我们没有感觉到,因为它们在并行阻塞)。

然而,如果是这种情况,我很困惑为什么requests-futures被认为是异步的,因为它只是ThreadPoolExecutor的包装器:

with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
    future_to_url = {executor.submit(load_url, url, 10): url for url in     get_urls()}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()

这个功能能在单线程上运行吗?

特别是与asyncio相比,这意味着它可以单线程运行。

有两种方法可以让单处理器上的程序“同时做多件事情”。多线程编程是最简单和最流行的方法,但还有另一种非常不同的技术,可以让你几乎获得多线程的所有优势,而不实际使用多个线程。如果您的程序主要是I/O绑定,则这确实是唯一实用的方法。如果您的程序是处理器绑定的,则抢占式调度线程可能是您真正需要的。然而,网络服务器很少是处理器绑定的。


2
代码不是异步的,代码执行是异步的。函数不是多线程的,函数调用是多线程的。在这种情况下,你的问题是错误/不清楚的。 - Ulrich Eckhardt
2个回答

4
首先,需要注意的是:concurrent.futures.Futureasyncio.Future不同。基本上它只是一个抽象 - 一个对象,在您分配作业之后但完成之前,允许您引用作业结果(或异常,这也是结果)在程序中。这类似于将常规函数的结果分配给某些变量。
多线程方面:关于您的示例,使用多个线程时,可以说您的代码是“异步”的,因为多个操作在不等待彼此完成的情况下在不同线程中同时执行,并且可以在时间结果中看到它。并且您是正确的,由于sleep,您的功能是阻塞的,它会阻塞工作线程指定的时间量,但是当您使用多个线程时,这些线程是并行阻塞的。因此,如果您有一个带有sleep和另一个没有的作业,并运行多个线程,则没有sleep的作业将执行计算,而另一个则会休眠。当您使用单个线程时,作业按顺序一次性执行。因此,当一个作业休眠时,其他作业会等待它,实际上它们只是不存在,直到轮到它们。所有这些都几乎由您的时间测试证明了。print发生的事情与“线程安全”有关,即打印使用标准输出,这是一个单一共享的资源。因此,当多个线程尝试同时打印时,切换发生在内部,您会得到奇怪的输出。(这也显示了多线程示例的“异步性”)。为了防止这种错误,有锁定机制,例如锁定、信号量等。
异步方面:要更好地理解目的,请注意“IO”部分,它不是“异步计算”,而是“异步输入/输出”。谈论asyncio时,通常不会首先考虑线程。 Asyncio是关于事件循环和生成器(协同程序)的。事件循环是仲裁者,管理已注册到循环中的协同程序(及其回调)的执行。协程实现为生成器,即允许迭代执行某些操作并在每次迭代时保存状态和返回结果或无结果的函数,并在下一次调用时继续使用保存的状态。因此,基本上事件循环是while True:循环,按顺序调用所有分配给它的协同程序/生成器,它们在每个这样的调用中提供结果或无结果-这提供了“异步性”的可能性。(这是一种简化,因为有优化此行为的调度机制。)此情况下的事件循环可以在单个线程中运行,如果协同程序是非阻塞的,则会给您真正的“异步性”,但如果它们是阻塞的,那么基本上就是线性执行。
您可以使用显式多线程来实现相同的功能,但线程是昂贵的——它们需要分配内存,切换线程需要时间等。另一方面,asyncio API允许您抽象出实际的实现,只需考虑异步执行作业即可。它的实现可能不同,包括调用操作系统API和操作系统决定要做什么,例如DMA、附加线程、某些特定的微控制器使用等。问题在于,由于较低级别机制、硬件等原因,它对IO的处理效果很好。另一方面,执行计算将需要明确地将计算算法分解成片段以用作asyncio协程,因此单独的线程可能是更好的决策,因为您可以在那里启动整个计算。 (我不是在谈论专门用于并行计算的算法)。但是,asyncio事件循环可以明确设置为使用单独的线程来运行协程,因此这将是具有多线程的asyncio。
关于您的示例,如果使用sleep作为asyncio协程来实现函数,将50个任务单线程进行调度和运行,那么您将获得类似于第一次测试的时间,即大约25秒,因为它是阻塞的。如果将其更改为类似于yield from [asyncio.sleep](0.5)(它本身是一个协程)的内容,将50个任务单线程进行调度和运行,则会异步调用。因此,当一个协程休眠时,另一个协程将启动,依此类推。作业将在时间上完成,类似于您的第二个多线程测试,即接近0.5秒。如果在此处添加print,则输出将以串行方式由单个线程使用,但输出可能与将协程分配给循环的顺序不同,因为协程可能以不同的顺序运行。如果使用多个线程,则结果显然将接近最后一个。
简化:多线程和asyncio的区别在于阻塞/非阻塞,因此基本上阻塞式多线程将接近非阻塞式asyncio,但存在许多差异。
多线程用于计算(即CPU绑定代码)
Asyncio用于输入/输出(即I / O绑定代码)
关于您最初的声明:
所有异步代码都是多线程的
所有多线程函数都是异步的
我希望我能够证明:
异步代码可以是单线程和多线程的
所有多线程函数都可以称为“异步”

@Nitika。非常感谢您的详细回复!我希望您能查看我的编辑后的第一篇帖子,因为我已经包含了一个图表来说明为什么我很难看到所有多线程函数都是异步的。 - mptevsion
@mptevsion,你的图表非常准确,只需注意一点,在asyncio的图表中,任务在单线程和多线程场景下可能以任何顺序运行,因为它们首先被调度,然后执行,这取决于实现和特定任务(例如,它们都打开连接,而任务3立即收到连接响应,而任务1和任务2仍在等待)。 - Nikita
@mptevsion,关于您对多线程的担忧,您自己展示了任务1、任务2和任务3是并行运行的,因此彼此异步。想象一下,如果它们完成所需的时间不同,那么它们将在不同的时间点结束。如果您的担忧在于“阻塞”这个词,那么有3个线程,当其中一个线程被阻塞时,它只会阻塞自己,其他线程会继续执行它们的工作。只是在您的例子中它们完全相同,在现实生活中这种情况很少见——尽管执行相同的工作,但每个线程处理不同的输入数据需要不同的时间。 - Nikita
注意@Ulrich Eckhardt的评论 - 异步执行的是任务,而不是任务本身。在任何情况下,任务1对自身都不是异步的,但对于多线程和asyncio,任务1与任务2之间是异步的,任务3也是如此。此外,对于多线程,通常有一个“主”进程来管理其他线程,称为“工作线程”,并对其结果进行某些操作。任何异步性或多个线程都有一个同步点,例如获取任务或返回结果以进行未来的组合处理,但这并不使这些部分的计算同步化。 - Nikita

0

我认为主要的困惑来自于异步的含义。根据计算机自由在线词典,“一个进程[...]其执行可以独立进行”是异步的。现在,将这个应用到你的蜜蜂所做的事情:

  • 从队列中检索一个项目。只有一个蜜蜂可以一次执行该操作,而它们获取项目的顺序是未定义的。我不会称之为异步。
  • 睡眠。每个蜜蜂都是独立的,即睡眠持续时间对所有蜜蜂都适用,否则时间不会随着多个蜜蜂而减少。我会称之为异步。
  • 调用print()。虽然调用是独立的,但在某个时刻,数据会汇集到相同的输出目标中,并且在那个时刻强制执行一个序列。我不会称之为异步。但请注意,print()的两个参数以及尾随换行符是独立处理的,这就是为什么它们可以交错的原因。
  • 最后,调用q.join()。当然,在队列为空之前,调用线程会被阻塞,因此需要实施某种形式的同步。我不明白为什么这对你来说“似乎是错误的”。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接