如何使用Combine在后台响应CoreData更改

13

我希望实现以下功能:每当有人触发 CoreData 保存(即发送NSManagedObjectContextDidSave通知)时,我想基于更改的NSManagedObject执行一些后台计算。具体例子:假设在一个笔记应用中,我想异步计算所有笔记中单词总数。

目前问题在于NSManagedObject上下文明确绑定到线程,并且不建议在此线程之外使用NSManagedObject

我已经在我的SceneDelegate中设置了两个NSManagedObjectContext

let context = (UIApplication.shared.delegate as! AppDelegate).persistentContainer.viewContext
let backgroundContext = (UIApplication.shared.delegate as! AppDelegate).persistentContainer.newBackgroundContext()

我还通过NotificationCenter.default.publisher(for: .NSManagedObjectContextDidSave)订阅了通知,并在触发一个managedObjectContext.save()后收到了两次保存通知。但是,这两个通知都是从同一个线程(即UIThread)发送的,用户字典中所有的NSManagedObjects都有一个.managedObjectContext,它是viewContext而不是backgroundContext

我的想法是基于相关的NSManagedObjectContext是否为后台上下文过滤通知,因为我认为通知也会在(私有)DispatchQueue上发送,但似乎所有通知都在UIThread上发送,而后台上下文从未使用。

你有什么解决方法吗?这是一个bug吗?如何根据backgroundContext检索通知,同时在关联的DispatchQueue上运行下游任务?


1
publisher(for:)有第二个名为object的参数,默认值为nil。尝试将对象参数设置为您想要观察的上下文。文档 - Rob C
不要保留背景上下文,否则它会充满不必要的对象。 - malhal
3个回答

17
您可以创建一个发布者,当Core Data中涉及到您的相关内容发生变化时通知您。 我已经写了一篇关于这个主题的文章。Combine、发布者和Core Data。
import Combine
import CoreData
import Foundation

class CDPublisher<Entity>: NSObject, NSFetchedResultsControllerDelegate, Publisher where Entity: NSManagedObject {
    typealias Output = [Entity]
    typealias Failure = Error

    private let request: NSFetchRequest<Entity>
    private let context: NSManagedObjectContext
    private let subject: CurrentValueSubject<[Entity], Failure>
    private var resultController: NSFetchedResultsController<NSManagedObject>?
    private var subscriptions = 0

      init(request: NSFetchRequest<Entity>, context: NSManagedObjectContext) {
        if request.sortDescriptors == nil { request.sortDescriptors = [] }
        self.request = request
        self.context = context
        subject = CurrentValueSubject([])
        super.init()
    }

      func receive<S>(subscriber: S)
        where S: Subscriber, CDPublisher.Failure == S.Failure, CDPublisher.Output == S.Input {
        var start = false

        synchronized(self) {
            subscriptions += 1
            start = subscriptions == 1
        }

        if start {
            let controller = NSFetchedResultsController(fetchRequest: request, managedObjectContext: context, 
                                                        sectionNameKeyPath: nil, cacheName: nil)
            controller.delegate = self

            do {
                try controller.performFetch()
                let result = controller.fetchedObjects ?? []
                subject.send(result)
            } catch {
                subject.send(completion: .failure(error))
            }
            resultController = controller as? NSFetchedResultsController<NSManagedObject>
        }
        CDSubscription(fetchPublisher: self, subscriber: AnySubscriber(subscriber))
    }

      func controllerDidChangeContent(_ controller: NSFetchedResultsController<NSFetchRequestResult>) {
        let result = controller.fetchedObjects as? [Entity] ?? []
        subject.send(result)
    }

      private func dropSubscription() {
        objc_sync_enter(self)
        subscriptions -= 1
        let stop = subscriptions == 0
        objc_sync_exit(self)

        if stop {
            resultController?.delegate = nil
            resultController = nil
        }
    }

    private class CDSubscription: Subscription {
        private var fetchPublisher: CDPublisher?
        private var cancellable: AnyCancellable?

        @discardableResult
        init(fetchPublisher: CDPublisher, subscriber: AnySubscriber<Output, Failure>) {
            self.fetchPublisher = fetchPublisher

            subscriber.receive(subscription: self)

            cancellable = fetchPublisher.subject.sink(receiveCompletion: { completion in
                subscriber.receive(completion: completion)
            }, receiveValue: { value in
                _ = subscriber.receive(value)
            })
        }

        func request(_ demand: Subscribers.Demand) {}

        func cancel() {
            cancellable?.cancel()
            cancellable = nil
            fetchPublisher?.dropSubscription()
            fetchPublisher = nil
        }
    }

}

2
真的很喜欢这段代码,是一个很好的编写自定义发布者的示例。我刚刚注意到过时的synchronized()objc_sync_enter()/..exit()可以用串行队列或类似的东西来替代 :) - smat88dd
所提出的方法的一个缺点是,NSFetchedResultsController不会观察关系变化。如果您需要观察关系变化,则需要采取不同的方法。 - TheoK
是的,它只会获取NSFetchResultController报告的更改。但这不是真正的问题。想象一下你有一个名为Shop的类,它有用户。如果你需要一个商店,你会为这家商店创建一个“CDPublisher”。如果你需要这家商店的用户,你可以创建一个CDPublisher来获取这家商店的用户。 - Apostolos

8
您可以将要观察的对象传递给 publisher(for:)
NotificationCenter.default
  .publisher(for: .NSManagedObjectContextDidSave, object: backgroundMoc)
  .sink(receiveValue: { notification in
    // handle changes
  })

这只会监听与后台管理的对象上下文相关的通知,这意味着您可以在该上下文的队列上安全地进行处理。


3
我会接受这个答案,因为它确实过滤了正确的上下文,并对正确的线程执行了后续操作。但我想在这里留下一些信息,在我的情况下,您不会收到针对后台线程的“NSManagedObjectContextDidSave”通知(可能是因为它没有保存,而只是得到了更新)。因此,您必须改为监听“NSManagedObjectContextObjectsDidChange”。 - Nicolas

-1

如果您没有保存上下文两次,那么您必须添加观察者两次。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接