如何将异步任务放入队列中的 Swift 5.6

10

假设我有以下代码:

class Duck{
    
    func walk() async {
        //do something
        print("walk start")
        try? await Task.sleep(nanoseconds: UInt64(2e9))
        print("walk end")
    }
    
    func quack() async {
        //do something...
        print("quack start")
        try? await Task.sleep(nanoseconds: UInt64(2e9))
        print("quack end")
    }
    
    func fly() async{
        //do something
        print("fly start")
        try? await Task.sleep(nanoseconds: UInt64(2e9))
        print("fly end")
    }
    
}

let duck = Duck()

Task{
    await duck.walk()
}

Task{
    await duck.quack()
}

Task{
    await duck.fly()
}

这将会打印:
walk start
quack start
fly start
walk end
quack end
fly end

我理解并预期了这一点。但是如果我想让这3个任务按顺序运行怎么办?假设每个任务都是由用户按下按钮创建的。我希望这些任务在后台排队并依次运行。是否有像你可以在DispatchQueue中排队DispatchWorkItem,但是一个Task版本的方法?


编辑:

我想出了一个解决方案,但我不确定这是否是一种好的实现方式。由于这个实现可能会创建许多级联的任务,我想知道是否存在堆栈溢出或内存泄漏的风险?

class TaskQueue{
    private var currentTask : Task<Void,Never> = Task{}
    
    func dispatch(block:@escaping () async ->Void){
        
        let oldTask = currentTask
        currentTask = Task{
            _ = await oldTask.value
            await block()
        }
    }
}

taskQueue.dispatch {
    await duck.walk()
}
taskQueue.dispatch {
    await duck.quack()
}
taskQueue.dispatch {
    await duck.fly()
}

只需将这3个await放在同一个任务中即可。 - Ptit Xav
@PtitXav 这只是一个简化的例子。请假设这3个任务在程序的不同部分分别创建,例如通过用户按下按钮。 - Ricky Mo
如果您将任务放在同一个调度队列中,这些任务应该按顺序处理(参见苹果官方文档)。 - Ptit Xav
@PtitXav 任务在调度队列上的序列化方式与块不同。它们可以在任何时候被中断,只要它们await并且其他任务可能会在同一队列上被调度。 - Rob Napier
1
@RobNapier - 我理解为什么我们的Actor需要可重入性,但我真的希望它们能够按照非可重入的“未来方向”(或其他控制并发度的方式)继续前进,而不只是通过协作线程池限制并发数量。 - Rob
2
@Rob 绝对没错。我只是说他们没有。我目前正在考虑新的异步序列算法是否会有所帮助。我们确实需要像下面的 TaskQueue 这样的东西(传统的"actor邮箱",也是大多数习惯于actors的人所期望的;Swift关于actors的看法非常有趣,我认为真正具有创新性,但它也与其他使用"actor"这个词的方式截然不同)。我认为像TaskQueue这样的数据结构比更多神奇的注释要好。当代码可能被遥远地注释时,很难判断代码是否正确。 - Rob Napier
4个回答

10

我曾经支持无结构任务的方法,每个任务都会等待前一个任务完成。回顾起来,这种方式对我来说有些脆弱。逐渐地(要感谢Rob Napier推动我走向这个方向),我现在使用异步序列,具体来说是苹果的AsyncChannel库,这更加健壮,并且与现代Swift并发中的异步序列更加一致。

在我们来看你的例子之前,先考虑一下这个串行下载器,其中一个过程(用户点击按钮)将URL对象发送给另一个过程,后者在一个for-await-in循环中监控通道中的URL:

struct DownloadView: View {
    @StateObject var viewModel = DownloadViewModel()

    var body: some View {
        VStack {
            Button("1") { Task { await viewModel.appendDownload(1) } }
            Button("2") { Task { await viewModel.appendDownload(2) } }
            Button("3") { Task { await viewModel.appendDownload(3) } }
        }
        .task {
            await viewModel.monitorDownloadRequests()
        }
    }
}

@MainActor
class DownloadViewModel: ObservableObject {
    private let session: URLSession = 
    private let baseUrl: URL = 
    private let folder: URL = 
    private let channel = AsyncChannel<URL>()   // note, we're sending URLs on this channel

    func monitorDownloadRequests() async {
        for await url in channel {
            await download(url)
        }
    }

    func appendDownload(_ index: Int) async {
        let url = baseUrl.appending(component: "\(index).jpg")
        await channel.send(url)
    }

    func download(_ url: URL) async {
        do {
            let (location, _) = try await session.download(from: url)
            let fileUrl = folder.appending(component: url.lastPathComponent)
            try? FileManager.default.removeItem(at: fileUrl)
            try FileManager.default.moveItem(at: location, to: fileUrl)
        } catch {
            print(error)
        }
    }
}

我们开始 monitorDownloadRequests,然后将下载请求 append 到通道中。

这会按顺序执行请求(因为 monitorDownloadRequests 有一个 for-await 循环)。例如,在 Instruments 的“Points of Interest”工具中,我添加了一些 Ⓢ 标志,显示了请求发生的时间间隔,您可以看到这三个请求是按顺序发生的。

enter image description here

但是通道的奇妙之处在于它们提供了串行行为,而不会引入非结构化并发的问题。它们还可以自动处理取消(如果您需要这种行为)。如果您取消for-await-in循环(在SwiftUI中,当视图被解除时,.task {…}视图修饰符会自动为我们执行此操作),如果您有一堆非结构化并发,其中一个Task等待前一个任务,那么处理取消会很快变得混乱。


现在,对于您的情况,您正在询问一个更通用的队列,可以等待任务。那么,您可以拥有一个闭包的AsyncChannel
typealias AsyncClosure = () async -> Void

let channel = AsyncChannel<AsyncClosure>()

E.g.:

typealias AsyncClosure = () async -> Void

struct ExperimentView: View {
    @StateObject var viewModel = ExperimentViewModel()

    var body: some View {
        VStack {
            Button("Red")   { Task { await viewModel.addRed() } }
            Button("Green") { Task { await viewModel.addGreen() } }
            Button("Blue")  { Task { await viewModel.addBlue() } }
        }
        .task {
            await viewModel.monitorChannel()
        }
    }
}

@MainActor
class ExperimentViewModel: ObservableObject {
    let channel = AsyncChannel<AsyncClosure>()

    func monitorChannel() async {
        for await task in channel {
            await task()
        }
    }

    func addRed() async {
        await channel.send { await self.red() }
    }

    func addGreen() async {
        await channel.send { await self.green() }
    }

    func addBlue() async {
        await channel.send { await self.blue() }
    }

    func red() async {  }

    func green() async {  }

    func blue() async {  }
}

这将产生:

enter image description here

在这里,我再次使用Instruments来可视化正在发生的事情。我快速地连续点击了“红色”、“绿色”和“蓝色”按钮两次。然后,我观察了这三个三秒任务的六个相应时间间隔。接着,我第二次重复了这个六次点击的过程,但这一次在它们完成之前取消了相关视图,中途取消了第二组按钮点击的绿色任务,展示了AsyncChannel(以及异步序列)无缝取消的能力。

现在,希望你原谅我,因为我省略了创建所有这些“兴趣点”标志和时间间隔的代码,因为它增加了很多不相关的内容,与我们所关心的问题无关(但如果你感兴趣,可以参考this)。但是,希望这些可视化能够帮助说明正在发生的事情。

最重要的信息是,AsyncChannel(以及它的姊妹AsyncThrowingChannel)是保持结构化并发性的好方法,但同时也能获得串行(或受限制的行为,如我们在answer的结尾所展示的),这是我们以前通过队列获得的,但是使用异步任务。

我必须承认,尽管这个AsyncClosure示例有望回答你的问题,但在我看来有点勉强。 我现在已经使用AsyncChannel几个月了,并且个人始终有一个更具体的对象由通道处理(例如URL、GPS位置、图像标识符等)。 这个闭包示例似乎试图过于努力地重现老式的调度/操作队列行为。


谢谢你推荐swift-async-alogrithm包!我在Android上工作时,使用Kotlin Coroutine进行了类似的实现。我还注意到Swift有AsyncSequenceAsyncStream API,并开始将我的Kotlin代码翻译成Swift。但是,Swift缺少像Kotlin一样方便的Channel API,因此我使用AsyncStream编写了它。AsyncChannel似乎是我一直在寻找的缺失环节!我感觉自己快要从头开始编写自己的AsyncChannel。不知道它位于需要手动添加的包中。谢谢。 - Ricky Mo
如果你需要从闭包中返回一些东西,比如let result = await viewModel.addRed(),那么AsyncChannel在这种情况下还能工作吗? - aheze
很遗憾,不行。正如文档所述,“通道发送的每个值都会等待迭代对该值的消耗”(已加重点)。因此,它等待对该值的“消耗”,但不管例程随后对已消耗值做什么都无关。简而言之,send不能(也不会)return任何东西。如果你想返回东西,你需要使用其他机制,比如另一个异步序列(!)的发出值。 - Rob
我承认,当我第一次偶然发现这个时,我有一个本能的“他们在想什么”反应,但是在思考双向通道的实现可能会是什么样子(尤其是在消费者端),我可以理解为什么他们会这样做。如何将双向通道很好地融入AsyncSequence模式并不是显而易见的。当然,我们可以使用一些带有完成处理程序闭包版本的next来创建一些东西,但那很快就变得非常丑陋。 - Rob

4

更新:

对于未来发现这篇文章有用的人,我已经创建了一个Swift包,具有更好的实现,并添加了对排队AsyncThrowingStream的支持。

https://github.com/rickymohk/SwiftTaskQueue


这是我更新的实现,我认为比我在问题中发布的更安全。 TaskQueueActor 部分完成所有工作,我将其包装在外部类中,只是为了在非异步上下文中调用时使代码更清晰。
class TaskQueue{
    
    private actor TaskQueueActor{
        private var blocks : [() async -> Void] = []
        private var currentTask : Task<Void,Never>? = nil
        
        func addBlock(block:@escaping () async -> Void){
            blocks.append(block)
            next()
        }
        
        func next()
        {
            if(currentTask != nil) {
                return
            }
            if(!blocks.isEmpty)
            {
                let block = blocks.removeFirst()
                currentTask = Task{
                    await block()
                    currentTask = nil
                    next()
                }
            }
        }
    }
    private let taskQueueActor = TaskQueueActor()
    
    func dispatch(block:@escaping () async ->Void){
        Task{
            await taskQueueActor.addBlock(block: block)
        }
    }
}

1
任务不会按照创建的顺序执行。因此,您添加的“dispatch”方法实际上破坏了队列功能。 - Yauheni Liauchuk

4
我在 Github 上找到了这个: https://github.com/gshahbazian/playgrounds/blob/main/AsyncAwait.playground/Sources/TaskQueue.swift 通过

https://forums.swift.org/t/how-do-you-use-asyncstream-to-make-task-execution-deterministic/57968/18

import Foundation

public actor TaskQueue {
    private let concurrency: Int
    private var running: Int = 0
    private var queue = [CheckedContinuation<Void, Error>]()

    public init(concurrency: Int) {
        self.concurrency = concurrency
    }

    deinit {
        for continuation in queue {
            continuation.resume(throwing: CancellationError())
        }
    }

    public func enqueue<T>(operation: @escaping @Sendable () async throws -> T) async throws -> T {
        try Task.checkCancellation()

        try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<Void, Error>) in
            queue.append(continuation)
            tryRunEnqueued()
        }

        defer {
            running -= 1
            tryRunEnqueued()
        }
        try Task.checkCancellation()
        return try await operation()
    }

    private func tryRunEnqueued() {
        guard !queue.isEmpty else { return }
        guard running < concurrency else { return }

        running += 1
        let continuation = queue.removeFirst()
        continuation.resume()
    }
}

看起来可以工作

@StateObject var taskQueue = TaskQueue(concurrency: 1)

            .task {
                try? await taskQueue.enqueue {
                //Task{
                    try? await Task.sleep(for: .seconds(1))
                    print("Done 1")
                }
                try? await taskQueue.enqueue {
                //Task{
                    try? await Task.sleep(for: .seconds(1))
                    print("Done 2")
                }
                try? await taskQueue.enqueue {
                //Task{
                    try? await Task.sleep(for: .seconds(1))
                    print("Done 3")
                }

0

另一种方法是使用SwiftPMAsyncFifo https://github.com/sugarbaron/async-fifo。 它允许您像DispatchQueue一样以async/await的方式排队任务:

let background: AsyncFifo = .init()

background.enqueue { await someFunc1() }
background.enqueue { someSynchFunc() }
background.enqueue { await whatever() }

// executes sequentally
// someFunc1() -> then someSycnhFunc() -> then whatever()

或者你可以只是将AsyncFifo源代码添加到你的项目中: (大量代码即将到来)


import Foundation

// MARK: constructor
public extension Async {
    
    final class Fifo {
        
        private var queue: [Scheduled]
        private let access: NSRecursiveLock
        private var executing: Bool
        
        public init() {
            self.queue = [ ]
            self.access = NSRecursiveLock()
            self.executing = false
        }
        
    }
    
}

// MARK: interface
public extension Async.Fifo {
    
    func enqueue(_ coroutine: @Sendable @escaping () async throws -> Void,
                 catch: @escaping (Error) -> Void = { print("[x][Async.Fifo] coroutine throws: \($0)") }) {
        schedule(coroutine, `catch`)
        inBackground { [weak self] in await self?.executeSequentally() }
    }
    
    var isBusy: Bool {
        access.lock()
        let isBusy: Bool = executing || !(queue.isEmpty)
        access.unlock()
        return isBusy
    }
    
    var queueSize: Int {
        access.lock()
        let size: Int = queue.count + (executing ? 1 : 0)
        access.unlock()
        return size
    }
    
    func cancelAll() {
        access.lock()
        queue = [ ]
        access.unlock()
    }
    
}

// MARK: tools
private extension Async.Fifo {
    
    func schedule(_ coroutine: @Sendable @escaping () async throws -> Void, _ catch: @escaping (Error) -> Void) {
        access.lock()
        queue.append((coroutine, `catch`))
        access.unlock()
    }
    
    func executeSequentally() async {
        if alreadyExecuting { return }
        while let next: Scheduled {
            do    { try await next.coroutine() }
            catch { next.catch(error) }
        }
    }
    
    var next: Scheduled? {
        access.lock()
        if queue.isEmpty { executing = false; access.unlock(); return nil }
        let next: Scheduled = queue.removeFirst()
        access.unlock()
        return next
    }
    
    var alreadyExecuting: Bool {
        access.lock()
        let executing = self.executing
        if executing == false { self.executing = true }
        access.unlock()
        return executing
    }
    
    typealias Scheduled = (coroutine: () async throws -> Void, catch: (Error) -> Void)
    
}

/// namespace class
public final class Async { }

public extension Async {  typealias Task = _Concurrency.Task }

@inlinable public func concurrent<T>(function: String = #function, _ callback: (CheckedContinuation<T, Error>) -> Void)
async throws -> T {
    try await withCheckedThrowingContinuation(function: function, callback)
}

@discardableResult
public func inBackground<T:Sendable>(_ coroutine: @Sendable @escaping () async throws -> T) -> Async.Task<T, Error> {
    Async.Task.detached(priority: .low, operation: coroutine)
}

@discardableResult
public func onMain<T:Sendable>(_ coroutine: @MainActor @Sendable @escaping () throws -> T) -> Async.Task<T, Error> {
    Async.Task.detached { try await MainActor.run { try coroutine() } }
}

@discardableResult
public func onMain<T:Sendable>(after delay: TimeInterval, _ coroutine: @MainActor @Sendable @escaping () throws -> T)
-> Async.Task<T, Error> {
    Async.Task.detached { await idle(delay); return try await MainActor.run { try coroutine() } }
}

public func idle(_ duration: TimeInterval) async {
    do    { try await Task.sleep(nanoseconds: UInt64(duration * 1e9)) }
    catch { print("[x][Async] sleep interrupted: \(error)") }
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接