Swift 5.5并发编程: 如何序列化异步任务以替换最大并发操作数为1的OperationQueue?

10

我目前正在将我的应用程序迁移到使用Swift中的并发模型。我希望序列化任务以确保它们一个接一个地执行(没有并行)。在我的用例中,我想要监听由NotificationCenter发布的通知,并在发布新通知时执行任务。但我想确保没有先前的任务正在运行。这相当于使用maxConcurrentOperationCount = 1的OperationQueue。

例如,我在我的应用程序中使用CloudKit和Core Data,并使用持久历史跟踪来确定存储中发生了哪些更改。在这个同步本地存储到云端示例代码中,Apple使用操作队列处理历史处理任务(在CoreDataStack中)。此OperationQueue的最大操作数设置为1。

private lazy var historyQueue: OperationQueue = {
    let queue = OperationQueue()
    queue.maxConcurrentOperationCount = 1
    return queue
}()

当收到Core Data通知时,将在此串行操作队列中添加一个新任务。因此,如果收到许多通知,则它们将一个接一个地按顺序执行。

@objc
func storeRemoteChange(_ notification: Notification) {
    // Process persistent history to merge changes from other coordinators.
    historyQueue.addOperation {
        self.processPersistentHistory()
    }
}

在这个加载和显示大数据源的示例代码中,苹果公司使用Tasks来处理历史更改(在QuakesProvider中)。
// Observe Core Data remote change notifications on the queue where the changes were made.
notificationToken = NotificationCenter.default.addObserver(forName: .NSPersistentStoreRemoteChange, object: nil, queue: nil) { note in
    Task {
        await self.fetchPersistentHistory()
    }
}

我觉得第二个项目有些问题,因为任务可以以任何顺序发生,不一定是串行的(与第一个项目相反,那里使用的OperationQueue的maxConcurrentOperationCount = 1)。

我们应该在某个地方使用actor来确保方法被串行调用吗?

我考虑过这样的实现,但我还不太熟悉:

actor PersistenceStoreListener {
    let historyTokenManager: PersistenceHistoryTokenManager = .init()
    private let persistentContainer: NSPersistentContainer

    init(persistentContainer: NSPersistentContainer) {
        self.persistentContainer = persistentContainer
    }

    func processRemoteStoreChange() async {
        print("\(#function) called on \(Date.now.formatted(date: .abbreviated, time: .standard)).")
    }
}

当收到新通知(AsyncSequence)时,processRemoteStoreChange方法将被调用:

notificationListenerTask = Task {
   let notifications = NotificationCenter.default.notifications(named: .NSPersistentStoreRemoteChange, object: container.persistentStoreCoordinator)
   
   for await _ in notifications {
        print("notificationListenerTask called on \(Date.now.formatted(date: .abbreviated, time: .standard)).")
        await self.storeListener?.processRemoteStoreChange()
    }
}

@vadian,你的意思是说苹果第二个项目中使用Task(https://developer.apple.com/documentation/coredata/loading_and_displaying_a_large_data_feed)的代码是以串行方式执行任务的吗?这样做没有风险,后续计划的任务在已经计划好的任务之前被执行了吗? - alpennec
是的,正如“AsyncSequence”名称所示,它是一个序列,并且序列是按顺序执行的。但是请随意尝试一下。 - vadian
“我们是否应该在某个地方使用Actor来确保方法被串行调用?” 是的,这确实是Actor的主要目的之一。 - matt
1
@matt 你是对的,我道歉。TaskGroup按完成顺序返回项目。但AsyncSequence按顺序返回项目。 - vadian
正确。我们需要做相同的傻事,以按原始顺序重新组装结果。 - Rob
显示剩余8条评论
1个回答

13

在我的原始答案中,我回答了如何在Swift并发中实现独立任务的顺序行为的一般问题。

但是,您正在提出一个更具体的问题,即如何从异步事件序列中获取串行行为。如果您有一个AsyncSequence,例如{{link1:notifications}},那么您在答案末尾考虑的for-await-in方法是一个很好的解决方案:

notificationListenerTask = Task {
    let notifications = NotificationCenter.default.notifications(named: .NSPersistentStoreRemoteChange, object: container.persistentStoreCoordinator)
   
    for await _ in notifications {
        await self.storeListener?.processRemoteStoreChange()
    }
}

因为你在循环内使用了 await,所以在前一个 processRemoteStoreChange 返回并继续执行循环之前,它不会到达下一个迭代的 notifications AsyncSequence

总之,AsyncSequence(无论是 notifications 还是你自己的 AsyncStreamAsyncChannel)是从异步事件序列中获得串行行为的绝佳方式。WWDC 2021 视频 Meet AsyncSequence 是异步序列的入门者的绝佳指南。


在我的原始答案中,我解决了一个更一般的问题,即如何从一系列独立的Swift并发任务中获得串行行为:

如果您想获得一个 maxConcurrentOperationCount1OperationQueue 的行为(即“串行”操作队列),可以使用一个 actor 来实现。

有两种模式适用于串行的 OperationQueue

  1. 队列中的操作本身是同步的。

    如果您正在使用标准的 OperationQueue(即您没有子类化手动进行 isFinished KVO 等操作的 Operation),那么一个简单的 actor 就可以实现我们想要的效果。Actor 将防止并发执行。

    关键在于,这仅适用于同步方法(即那些没有 await 暂停点的方法)。

  2. 队列中的操作是异步的。

    操作队列的更高级用例之一是处理它们本身是异步的任务之间的依赖关系。这是操作队列中更复杂的场景,需要自定义 Operation 子类,在其中手动处理 isFinished 等的 KVO。(有关该模式的示例,请参见 this answer。)

    使用 Swift 并发来完成此操作的挑战在于,Actor 是可重入的(请参见 SE-0306 中的可重入性讨论)。如果 Actor 的方法是异步的(即带有 async-await),则会引入暂停点,即一个调用中的 await 将允许另一个 async 方法在该 Actor 上运行。

    要在单独的 async 方法之间实现串行执行,您有几个选项:


考虑以下内容(使用操作系统标志,以便我可以在Instruments中图形化地说明行为):
import os.signpost

private let pointsOfInterest = OSLog(subsystem: "log", category: .pointsOfInterest)

class ViewController: UIViewController {

    let example = Example()
    let taskSerializer = SerialTasks<Void>()

    @IBAction func didTapSync(_ sender: Any) {
        os_signpost(.event, log: pointsOfInterest, name: #function)
        startSynchronous()
    }

    @IBAction func didTapAsync(_ sender: Any) {
        os_signpost(.event, log: pointsOfInterest, name: #function)
        Task { try await startAsynchronous() }
    }

    @IBAction func didTapSerializedAsync(_ sender: Any) {
        os_signpost(.event, log: pointsOfInterest, name: #function)
        Task { try await startSerializedAsynchronous() }
    }

    func startSynchronous() {
        Task {
            await example.synchronousExample("1. synchronous")
        }
    }

    func startAsynchronous() async throws {
        try await example.asynchronousExample("2. asynchronous")
    }

    func startSerializedAsynchronous() async throws {
        try await taskSerializer.add {
            try await self.example.asynchronousExample("3. serial async")
        }
    }
}

actor Example {
    func asynchronousExample(_ name: StaticString) async throws {
        let id = OSSignpostID(log: pointsOfInterest)
        os_signpost(.begin, log: pointsOfInterest, name: name, signpostID: id)
        defer { os_signpost(.end, log: pointsOfInterest, name: name, signpostID: id) }

        try await Task.sleep(for: .seconds(2))
    }

    func synchronousExample(_ name: StaticString) {
        let id = OSSignpostID(log: pointsOfInterest)
        os_signpost(.begin, log: pointsOfInterest, name: name, signpostID: id)
        defer { os_signpost(.end, log: pointsOfInterest, name: name, signpostID: id) }

        Thread.sleep(forTimeInterval: 2)
    }
}

actor SerialTasks<Success> {
    private var previousTask: Task<Success, Error>?

    func add(block: @Sendable @escaping () async throws -> Success) async throws -> Success {
        let task = Task { [previousTask] in
            let _ = await previousTask?.result
            return try await block()
        }
        previousTask = task
        return try await task.value
    }
}

在同步任务(场景1)中,startSynchronous是最简单的。只需调用演员的同步方法,即可获得串行执行。

在异步任务(场景2)中,startAsynchronous如果您有await挂起点,则由于演员重入而失去了顺序行为。

但是,您可以通过拥有一个演员SerialTasks来细化异步任务模式(场景3),该演员在上述代码中跟踪先前的任务,在启动下一个任务之前等待它。微妙的一点是,add方法本身是同步的(尽管它所采取的闭包是异步的)。这避免了如果添加多个任务时出现微妙的竞争。

在Instruments中运行上述内容,我们可以以图形方式查看执行情况,其中标志着任务的启动,并显示任务执行的时间间隔:

enter image description here

简而言之,如果您的actor仅执行同步任务(这是您的情况),那么actor会自动产生maxConcurrentOperationCount = 1的行为。如果任务是异步的,则只需在开始下一个任务之前await先前的任务即可。

感谢@Rob提供这个好的解释。它巩固了我对Actor的知识。方法'processPersistentHistory'是异步的,因为它需要在NSManagedObjectContext线程上运行。所以我们需要'await context.perform { }'来确保执行体在正确的线程上执行。当然,如果我们想使用新的async/await,因为我仍然可以使用performAndWait方法来同步运行执行体。 - alpennec
不要混淆在另一个线程上运行它(例如,在您的操作队列上)与processPersistentHistory本身是否异步的问题。它没有完成处理程序并且您只是进行了简单的addOperation,这表明它是一个同步方法,您只是在后台线程上异步运行它。但是,如果该方法本身是异步的,则您分享的addOperation示例将无法工作。 - Rob
如果您想了解有关processPersistentHistory方法的更多详细信息,可以在此文章中找到:https://www.avanderlee.com/swift/persistent-history-tracking-core-data/。您将看到它创建了一个新的Core Data backgroundContext并执行一些操作,但以同步方式(.performAndWait)。使用新的Swift并发模型和Core Data,这个performAndWait现在是await context.perform {。但是,如果我们使用Task { await context.perform { ... sync work ... } },我仍然不清楚如何将它们序列化,因为任务可以在任意线程上运行。 - alpennec
嗨@Rob,你能否提供一个使用SerialTasks.add()方法添加实际返回值的异步方法的示例,声明类似于SerialTasks<String>的例子吗? - Sagar D
1
@SagarD - 我已经修改了SerialTasks,使其异步返回闭包的结果。请参见https://github.com/robertmryan/SerialTaskDemo.git作为示例。 - Rob
显示剩余2条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接