Swift await/async - 如何同步等待异步任务完成?

44

我正在用Swift桥接同步/异步世界,并进行渐进式采用异步/等待。我正在尝试从非异步函数调用返回值的异步函数。我理解明确使用Task是这样做的方式,例如在这里所述。

示例并不真正适合,因为该任务没有返回值。

经过多番搜索,我仍然找不到任何描述我认为是相当常见的请求:同步调用异步任务(是的,我明白那可能会冻结主线程)。

理论上,我想在我的同步函数中编写如下内容:

let x = Task {
  return await someAsyncFunction()
}.result

然而,当我尝试这样做时,由于尝试访问result,我得到了这个编译器错误:

'async'属性在不支持并发的函数中进行访问

我找到的一个替代方法是类似于:

Task.init {
  self.myResult = await someAsyncFunction()
}

myResult必须作为@State成员变量来归属。

然而,这并不是我想要的方式,因为在Task.init()完成并移到下一个语句之前,无法保证完成该任务。那么,我该如何同步等待该任务完成?


9
我理解您的请求,但如果您原谅我说这句话的话,我建议完全不采用这种模式。 如果您已经有现有的异步API,在过渡期间引入同步模式是向后迈出的一步。 毫无疑问,有更好的过渡策略。也许您可以编辑上述内容,并附带说明您当前代码的简化示例,我们可以提供更好的方法。顺便提一下,WWDC视频Swift并发:更新示例应用程序展示了用于逐步转换现有代码库的技术。 - Rob
2
“因为无法保证在Task.init()完成并移动到下一条语句之前完成该任务”--如果这是您要寻找的内容,为什么不将下一条语句包含在Task.init闭包内呢?话虽如此,我认为上述建议是很好的。 - jnpdx
@Rob - 感谢您的评论。我基本上同意引入同步行为的问题。然而,我仍然对并发框架中如何实现我所描述的感兴趣。即使出于更好地了解系统的原因。如果不能轻松完成此操作,则在同步和异步之间完成整个循环绝对是缺失的组件。 - gds
7
我建议您观看Swift并发:幕后花絮,它介绍了Swift并发系统的一个核心原则:“这意味着使用Swift并发编写的代码可以保持运行时合同,即线程始终能够向前推进。”但您问如何阻止向前推进并阻塞线程,这与此核心设计原则背道而驰。新并发系统的目标是允许我们编写镜像传统同步模式的代码,但完全异步执行。 - Rob
2
这是关于结构化并发最常见的问题,简短的答案是你不能这样做,如果你能够这样做,那么它将违背SC的目的。你可以通过Combine来欺骗系统https://dev59.com/oW0NtIcB2Jgan1znH0uE#70350527,https://www.swiftbysundell.com/articles/calling-async-functions-within-a-combine-pipeline/,但这可能是一个坏主意。 - Jano
显示剩余2条评论
5个回答

26

您不应该同步等待异步任务。

可能有人会提出类似于以下的解决方案:

func updateDatabase(_ asyncUpdateDatabase: @Sendable @escaping () async -> Void) {
    let semaphore = DispatchSemaphore(value: 0)

    Task {
        await asyncUpdateDatabase()
        semaphore.signal()
    }

    semaphore.wait()
}

尽管在某些简单条件下可以使用,但根据WWDC 2021 Swift Concurrency: Behind the scenes,这是不安全的。原因在于系统期望您遵守运行时合同。该合同要求:

线程始终能够向前推进。

这意味着线程永远不会被阻塞。当异步函数到达挂起点(例如await表达式)时,函数可以被挂起,但线程不会被阻塞,而是可以进行其他工作。基于此合同,新的协作线程池只能生成与CPU核心数量相同的线程,避免过多的线程上下文切换。这也是演员不会导致死锁的关键原因。

以上的信号量模式违反了这个合同。semaphore.wait()函数会阻塞线程。这可能会引起问题。例如:

func testGroup() {
    Task {
        await withTaskGroup(of: Void.self) { group in
            for _ in 0 ..< 100 {
                group.addTask {
                    syncFunc()
                }
            }
        }
        NSLog("Complete")
    }
}

func syncFunc() {
    let semaphore = DispatchSemaphore(value: 0)
    Task {
        try? await Task.sleep(nanoseconds: 1_000_000_000)
        semaphore.signal()
    }
    semaphore.wait()
}

testGroup函数中,我们添加了100个并发的子任务,但不幸的是,任务组永远无法完成。在我的 Mac 上,系统会生成 4 个协同线程,只需添加 4 个子任务就足以无限期地阻塞所有 4 个线程。这是因为,在所有 4 个线程都被wait函数阻塞之后,没有更多的线程可用于执行内部任务来发出信号量。

另一个不安全使用的例子是 actor 死锁:

func testActor() {
    Task {
        let d = Database()
        await d.updateSettings()
        NSLog("Complete")
    }
}

func updateDatabase(_ asyncUpdateDatabase: @Sendable @escaping () async -> Void) {
    let semaphore = DispatchSemaphore(value: 0)

    Task {
        await asyncUpdateDatabase()
        semaphore.signal()
    }

    semaphore.wait()
}

actor Database {
    func updateSettings() {
        updateDatabase {
            await self.updateUser()
        }
    }

    func updateUser() {

    }
}

在这里调用updateSettings函数将会死锁。因为它会同步等待updateUser函数完成,而updateUser函数被隔离到同一个actor中,所以它也在等待updateSettings函数先完成。

以上两个示例使用了DispatchSemaphore。基于类似的原因,在NSCondition中以类似的方式进行等待是不安全的。基本上,同步等待意味着阻塞当前线程。除非您只想要临时解决方案并完全理解风险,否则避免使用此模式。


3
你写道这很危险,因为“系统期望你遵守运行时合约”。有趣的是,我正在将旧代码从旧同步API转换为现在由苹果提供的更新方法。在这种情况下,我别无选择,根据你的说法,我实际上正在遵守合约。我的旧代码被广泛使用且没有错误,在过渡到新架构时,我们需要使用正好这种方式:以阻塞方式调用异步方法。 - Andreas Pardeike
2
我使用了这个信号量策略来维护一个旧的 API,该 API 现在在内部使用新的异步 API。在我的单元测试中,任务从未触发,信号量最终一直等待。但是通过将优先级设置为除 .default 或 .medium(它们是相同的)以外的任何其他值,单元测试就成功了。我唯一的解释是,在测试环境中,.medium 优先级尝试使用与正在运行测试的线程相同的线程,该线程正在等待信号量,但其他优先级使用不同的线程。这可能因不同的环境而异,显示了其不安全性。 - Richard Venable
对于那些寻找有关为何在 Swift 并发中信号量不安全的参考资料的人,请参阅 WWDC 2021 的 Swift concurrency: Behind the scenes,并跳至 25:58。 - Rob
我使用信号量来现代化NSFileCoordinator().coordinate,并使用async/await进行封装,但我还在其中加入了withTaskCancellationHandler以支持任务取消。 - malhal
1
运行时合约适用于Swift并发中的异步任务。如果您是从Swift并发之外调用(就像您的示例一样),那就没有问题。 (虽然有很多潜在的陷阱,但“系统期望您遵守运行时合约”不是其中之一) - undefined

7

除了使用信号量,你还可以像这里一样将异步任务封装在操作中。一旦底层异步任务完成,您可以发出操作完成的信号,并使用waitUntilFinished()等待操作完成:

let op = TaskOperation {
   try await Task.sleep(nanoseconds: 1_000_000_000)
}
op.waitUntilFinished()

请注意,在使用 semaphore.wait()op.waitUntilFinished() 的过程中,会阻塞当前线程,而阻塞线程可能会导致现代并发性能下的未定义运行时行为。 如果您打算仅在不使用现代并发性能的上下文中使用此方法,则可以在 Swift 5.7 中提供属性标记方法在异步上下文中不可用:
@available(*, noasync, message: "this method blocks thread use the async version instead")
func yourBlockingFunc() {
    // do work that can block thread
}

通过使用这个属性,你只能在非异步上下文中调用这个方法。但是需要注意的是,如果那个方法没有指定noasync可用性,你可以从异步上下文中调用调用此方法的非异步方法。


2
我相当确定这个链接库正在以更间接的方式做着苹果/ Swift 团队所说的不应该做的事情。虽然你似乎已经意识到了这一点,但我仍然强烈建议谨慎使用这种试图将新的 async/await 范式弯曲成适合传统 async 范式的代码。 - mredig
为什么op.waitUntilFinished()不会阻塞线程?如果它不会阻塞线程,那么它必须与await相同,只是没有在其前面加上await这样的关键字,这对于这种用例来说有点无意义,因为原始代码块实际上不会是同步的。waitUntilFinished()的文档中写道:“阻塞当前线程的执行,直到操作对象完成其任务。”请查看我的答案,以了解可能是原始问题的正确解决方案。 - oxygen

1
我编写了简单的函数,可以像Kotlin一样将异步代码作为同步代码运行,您可以在这里看到代码。虽然它仅供测试目的,但不要在生产中使用 ,因为异步代码必须仅以异步方式运行。
let result = runBlocking {
    try? await Task.sleep(nanoseconds: 1_000_000_000)
    return "Some result"
}
print(result) // prints "Some result"

1
你可以使用Task.synchronous将一个异步调用包装在同步上下文中,并像这样使用它:
Task.synchronous { try await myAsyncFunc() }

使用这个简单的扩展:
extension Task where Failure == Error {
    /// Performs an async task in a sync context.
    ///
    /// - Note: This function blocks the thread until the given operation is finished. The caller is responsible for managing multithreading.
    static func synchronous(priority: TaskPriority? = nil, operation: @escaping @Sendable () async throws -> Success) {
        let semaphore = DispatchSemaphore(value: 0)

        Task(priority: priority) {
            defer { semaphore.signal() }
            return try await operation()
        }

        semaphore.wait()
    }
}

这是一个用法示例:
func synchronousContext() { //  No `async` keyword here

    let duration: UInt64 = 2_000_000_000 /* 2 seconds */
    var seconds: UInt64 { duration/1_000_000_000 }

    print("Start: Should wait for \(seconds) seconds...")

    Task.synchronous { try await Task.sleep(nanoseconds: duration) } //  No `await` keyword for calling the `Task.synchronous `

    print("...Ended after \(seconds) seconds")
}

-9

我也一直在思考这个问题。例如,如何在主线程中启动一个任务(或多个任务)并等待它们完成?这可能是C++的思维方式,但在Swift中肯定也有方法可以实现。不管好坏,我想到了使用全局变量来检查工作是否完成:

import Foundation

var isDone = false

func printIt() async {
    try! await Task.sleep(nanoseconds: 200000000)
    print("hello world")
    isDone = true
}

Task {
    await printIt()
}

while !isDone {
    Thread.sleep(forTimeInterval: 0.1)
}

5
不要这样做。像这样繁忙地等待没有意义,会损耗电池和性能。在异步任务上同步等待是一个不好的想法。但如果你真的需要这样做,虽然你很可能不需要,你至少可以使用信号量或调度组,在这种情况下等待是由内核完成的,而不会浪费能量进行无意义的循环操作。 - Alexander
3
无论好坏,甚至更糟。更坏的是,甚至是最糟的。永远不要那样做。 - vadian
1
只是一个玩具示例,模拟在等待工作任务完成时在主线程中执行某些操作。无论如何,它也不适用于多个工作任务。感谢指向DispatchGroup的指针。我想我找到了一种方法,通过创建DispatchGroup,在启动每个任务之前调用dispatchGroup.enter(),在异步函数结束时调用dispatchGroup.leave(),然后在同步线程中调用dispatchGroup.wait()(等待所有任务完成)。 - omegahelix
1
永远不要在主线程上使用 sleep()。如果需要等待某个事件发生,请使用运行循环函数。 - Cristik

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接