如何在Combine中限制flatMap的并发性,同时仍然处理所有源事件?

5
如果我指定了maxPublishers参数,则第一个maxPublishers事件后的源事件将不会被扁平化映射。然而,我只想限制并发性,也就是说,在某些第一个maxPublishers扁平映射发布者完成后继续处理下一个事件。
Publishers.Merge(
    addImageRequestSubject
        .flatMap(maxPublishers: .max(3)) { self.compressImage($0) }
        .compactMap { $0 }
        .flatMap(maxPublishers: .max(3)) { self.addImage($0) },
    addVideoRequestSubject
        .flatMap(maxPublishers: .max(3)) { self.addVideo(url: $0) }
).sink(receiveCompletion: { _ in }, receiveValue: {})
.store(in: &cancelBag)

我也尝试通过OperationQueue来限制并发,但maxConcurrentOperationCount似乎没有效果。

Publishers.Merge(
    addImageRequestSubject
        .receive(on: imageCompressionQueue)
        .flatMap { self.compressImage($0) }
        .compactMap { $0 }
        .receive(on: mediaAddingQueue)
        .flatMap { self.addImage($0) },
    addVideoRequestSubject
        .receive(on: mediaAddingQueue)
        .flatMap { self.addVideo(url: $0) }
).sink(receiveCompletion: { _ in }, receiveValue: {})
.store(in: &cancelBag)

private lazy var imageCompressionQueue: OperationQueue = {
    var queue = OperationQueue()
    queue.maxConcurrentOperationCount = 3

    return queue
}()

private lazy var mediaAddingQueue: OperationQueue = {
    var queue = OperationQueue()
    queue.maxConcurrentOperationCount = 3

    return queue
}()

扁平化映射发布者看这里:

func compressImage(_ image: UIImage) -> Future<Data?, Never> {
    Future { promise in
        DispatchQueue.global().async {
            let result = image.compressTo(15)?.jpegData(compressionQuality: 1)
            promise(Result.success(result))
        }
    }
}

@matt谢谢您的回复。我已尝试推迟我的futures,但仍然只有三张图片被处理。您说我在切换线程时使用的方式不合适,应该使用receiveOn吗? - Ihor Vovk
关于maxPublishers,当flatMap操作符加载了指定的最大发布者数量时,那些发送的源事件将被忽略。在flatMap操作完成后,我可以发送新的源事件并且它们将被成功处理。这是我所观察到的。但是我需要所有的源事件都能够遵守对于flatMap操作符的需求进行处理。 - Ihor Vovk
1
好的,也许你需要添加一个缓冲区? - matt
@matt 你是对的,我漏掉了缓冲区!你想要添加一个答案吗? - Ihor Vovk
1个回答

7
您已经非常美妙地进入了使用.buffer操作符的用例。它的目的是通过累计本来会被丢弃的值来补偿.flatMap的背压。
我将通过一个完全人造的示例进行说明:
class ViewController: UIViewController {
    let sub = PassthroughSubject<Int,Never>()
    var storage = Set<AnyCancellable>()
    var timer : Timer!
    override func viewDidLoad() {
        super.viewDidLoad()
        sub
            .flatMap(maxPublishers:.max(3)) { i in
                return Just(i)
                    .delay(for: 3, scheduler: DispatchQueue.main)
                    .eraseToAnyPublisher()
            }
            .sink { print($0) }
            .store(in: &storage)
        
        var count = 0
        self.timer = Timer.scheduledTimer(withTimeInterval: 1, repeats: true) { 
            _ in
            count += 1
            self.sub.send(count)
        }
    }
}

所以,我们的发布者每秒发出一个递增的整数,但是我们的flatMap使用了.max(3)并且需要3秒来重新发布一个值。结果是我们开始错过一些值:
1
2
3
5
6
7
9
10
11
...

解决方案是在flatMap前面放置缓冲区。它需要足够大,以容纳任何错过的值,使它们在被请求时能够保留:
        sub
            .buffer(size: 20, prefetch: .keepFull, whenFull: .dropOldest)
            .flatMap(maxPublishers:.max(3)) { i in

结果是所有数字值确实到达了 sink。当然在现实生活中,如果缓冲区不足以弥补发布者的值发射速率和背压 flatMap 的值发射速率之间的差异,我们仍然可能会丢失值。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接