Swift Combine的Rx Observable.create替代方案

14

我有一些使用RxSwift编写的代码,并尝试将其转换为使用苹果的Combine框架。

非常常见的一种模式是对于一次性可观察对象(通常是网络请求),使用Observable.create。类似这样:

func loadWidgets() -> Observable<[Widget]> {
  return Observable.create { observer in
    // start the request when someone subscribes
    let loadTask = WidgetLoader.request("allWidgets", completion: { widgets in
      // publish result on success
      observer.onNext(widgets)
      observer.onComplete()
    }, error: { error in
      // publish error on failure
      observer.onError()
    })
    // allow cancellation
    return Disposable {
      loadTask.cancel()
    }
  }
}

我正在尝试将这个映射到Combine中,但我还没有能够完全弄清楚。我能够得到的最接近的结果是像下面这样使用Future:

func loadWidgets() -> AnyPublisher<[Widget], Error> {
  return Future<[Widget], Error> { resolve in
    // start the request when someone subscribes
    let loadTask = WidgetLoader.request("allWidgets", completion: { widgets in
      // publish result on success
      resolve(.success(widgets))
    }, error: { error in
      // publish error on failure
      resolve(.failure(error))
    })
    // allow cancellation ???
  }
}

正如你所看到的,它完成了大部分工作,但没有取消的能力。 其次, future 不允许多个结果。

有没有类似于 Rx 的 Observable.create 模式,可以取消并可选地返回多个结果的方式?

5个回答

12

我认为我找到了一种使用PassthroughSubjectCombine中模拟Observable.create的方法。这是我制作的辅助工具:

struct AnyObserver<Output, Failure: Error> {
    let onNext: ((Output) -> Void)
    let onError: ((Failure) -> Void)
    let onComplete: (() -> Void)
}

struct Disposable {
    let dispose: () -> Void
}

extension AnyPublisher {
    static func create(subscribe: @escaping (AnyObserver<Output, Failure>) -> Disposable) -> Self {
        let subject = PassthroughSubject<Output, Failure>()
        var disposable: Disposable?
        return subject
            .handleEvents(receiveSubscription: { subscription in
                disposable = subscribe(AnyObserver(
                    onNext: { output in subject.send(output) },
                    onError: { failure in subject.send(completion: .failure(failure)) },
                    onComplete: { subject.send(completion: .finished) }
                ))
            }, receiveCancel: { disposable?.dispose() })
            .eraseToAnyPublisher()
    }
}

这是如何在使用中看起来的:

func loadWidgets() -> AnyPublisher<[Widget], Error> {
    AnyPublisher.create { observer in
        let loadTask = WidgetLoader.request("allWidgets", completion: { widgets in
          observer.onNext(widgets)
          observer.onComplete()
        }, error: { error in
          observer.onError(error)
        })
        return Disposable {
          loadTask.cancel()
        }
    }
}

感谢您的回答和启发。请查看我下面的纯Combine实现。 - Brody Robertson
1
这是一个很好的开始,但是每个订阅应该有自己的主题,这样关闭时可以分别调用。将返回值包装在Deferred中可以解决这个bug。此外,这个解决方案在下游发出需求之前就发出了值。它应该等到那时才发出。 - undefined
这是一个不错的开始,但是每个订阅应该有自己的主题,这样可以分别调用关闭函数。将返回值包装在Deferred中可以解决这个问题。此外,这个解决方案在下游发出需求之前就会发出值。它应该等到那时再执行。 - Daniel T.

2
据我所了解,Xcode 11 beta 3中已经取消了使用闭包初始化AnyPublisher的支持。对于Rx的Observable.create,现在Future是相应的解决方案,如果您只需要传播单个值。在其他情况下,我会返回PassthroughSubject以此方式传播多个值,但它不允许您在观察开始时启动任务,我认为这与Observable.create相比远非理想。

就取消而言,它没有类似于DisposableisDisposed属性,因此无法直接检查其状态并停止执行您自己的任务。我现在唯一能想到的方法是观察cancel事件,但这肯定不如一个Disposable舒适。 此外,我认为cancel可能实际上会根据此处的文档(https://developer.apple.com/documentation/combine/cancellable),停止类似于基于URLSession的网络请求等任务。


1
在闭包之外添加一个isCancelled操作,并在future的闭包中进行检查。 isCancelled可以使用handleEvent()运算符切换。
    var isCancelled = false
    func loadWidgets() -> AnyPublisher<[Widget], Error> {
    return HandleEvents<Future<Any, Error>> { resolve in
        // start the request when someone subscribes
        let loadTask = WidgetLoader.request("allWidgets", completion: { widgets in
            // publish result on success
            resolve(.success(widgets))
        }, error: { error in
            // publish error on failure
            resolve(.failure(error))   
        }
        if isCancelled {
            loadTask.cancel()
        }  
        ).handleEvents(receiveCancel: {
        isCancelled = true
        })
    }
}

在应用程序中的某个地方,您可以执行此操作来取消事件

loadWidgets().cancel()

同时请查看此文章


1
这里有一个解决方案,可以修复已接受的解决方案中存在的问题。
extension AnyPublisher {
    struct Subscriber {
        fileprivate let send: (Output) -> Void
        fileprivate let complete: (Subscribers.Completion<Failure>) -> Void

        func send(_ value: Output) { self.send(value) }
        func send(completion: Subscribers.Completion<Failure>) { self.complete(completion) }
    }

    init(queue: DispatchQueue = .main, _ closure: @escaping (Subscriber) -> AnyCancellable) {
        self = Deferred {
            let subject = PassthroughSubject<Output, Failure>()
            var cancellable: AnyCancellable?

            return subject
                .handleEvents(
                    receiveCancel: { cancellable?.cancel() },
                    receiveRequest: { demand in
                        precondition(demand == .unlimited, "AnyPublisher.init only works with unlimited demand")
                        queue.async {
                            cancellable = closure(Subscriber(send: subject.send(_:), complete: subject.send(completion:)))
                        }
                    }
                )
        }
        .eraseToAnyPublisher()
    }
}

0
感谢 ccwasden 的启发。这个实现使用纯 Combine 实现了 Observable.create 语义,没有任何多余的实体。

AnyPublisher.create() Swift 5.6 扩展

public extension AnyPublisher {
    
    static func create<Output, Failure>(_ subscribe: @escaping (AnySubscriber<Output, Failure>) -> AnyCancellable) -> AnyPublisher<Output, Failure> {
        
        let passthroughSubject = PassthroughSubject<Output, Failure>()
        var cancellable: AnyCancellable?
        
        return passthroughSubject
            .handleEvents(receiveSubscription: { subscription in

                let subscriber = AnySubscriber<Output, Failure> { subscription in

                } receiveValue: { input in
                    passthroughSubject.send(input)
                    return .unlimited
                } receiveCompletion: { completion in
                    passthroughSubject.send(completion: completion)
                }
                
                cancellable = subscribe(subscriber)
                
            }, receiveCompletion: { completion in
                
            }, receiveCancel: {
                cancellable?.cancel()
            })
            .eraseToAnyPublisher()
        
    }
    
}

使用方法

func doSomething() -> AnyPublisher<Int, Error> {
    
    return AnyPublisher<Int, Error>.create { subscriber in
        
        // Imperative implementation of doing something can call subscriber as follows
        _ = subscriber.receive(1)
        subscriber.receive(completion: .finished)
        // subscriber.receive(completion: .failure(myError))
        
        return AnyCancellable {
            // Imperative cancellation implementation
        }
    }
    
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接