为Alamofire请求创建类似RxSwift的Observable.Create的Combine发布者

3

我使用以下代码生成一个冷的RxSwift Observable

func doRequest<T :Mappable>(request:URLRequestConvertible) -> Observable<T> {
        let observable = Observable<T>.create { [weak self] observer in
        guard let self = self else { return Disposables.create() }
        self.session.request(request).validate().responseObject { (response: AFDataResponse<T>) in
            switch response.result {
                case .success(let obj):
                    observer.onNext(obj)
                    observer.onCompleted()
                case .failure(let error):
                    let theError = error as Error
                    observer.onError(theError)
            }
        }
         return Disposables.create()
    }
    return observable
}

其中Mappable是基于ObjectMapper的类型,self.session是Alamofire的Session对象。

我找不到苹果的Combine框架中与Observable.create {...}相当的东西。我只发现了使用苹果的URLSession类创建发布者的URLSession.shared.dataTaskPublisher(for:)

如何将上述observable转换为Alamofire Combine的发布者?

编辑:根据rob提供的解决方案,我最终得到了以下代码:

 private let apiQueue = DispatchQueue(label: "API", qos: .default, attributes: .concurrent)

  func doRequest<T>(request: URLRequestConvertible) -> AnyPublisher<T, AFError> where T : Mappable {

       Deferred { [weak self] () -> Future<T, AFError> in

          guard let self = self else {
              return Future<T, AFError> { promise in  
promise(.failure(.explicitlyCancelled))  }
        }

          return Future { promise in
            self.session
            .request(request)
            .validate()
            .responseObject { (response: AFDataResponse<T>) in
                promise(response.result)
            }
        }
    }
    .handleEvents(receiveCompletion: { completion in
        if case .failure (let error) = completion {
                //handle the error
        }
    })
    .receive(on: self.apiQueue)
    .eraseToAnyPublisher()
}

编辑2: 我需要删除私有队列,因为它不再需要了。Alamofire会自己进行解析和解码,所以请移除队列及其使用情况(.receive(on: self.apiQueue))。


请注意,Alamofire已经在后台队列中执行响应处理,并在主队列上调用完成处理程序(仅完成“Future”),因此您可能不需要调用receive(on:) - Jon Shier
1个回答

8
您可以使用 Future 来将 responseObject 的回调连接到 Combine 的 Publisher。我没有 Alamofire 进行测试,但我认为以下代码应该能够工作:
func doRequest<T: Mappable>(request: URLRequestConvertible) -> AnyPublisher<T, AFError> {
    return Future { promise in
        self.session
            .request(request)
            .validate()
            .responseObject { (response: AFDataResponse<T>) in
            promise(response.result)
        }
    }.eraseToAnyPublisher()
}

请注意,这比 RxSwift 版本要简单一些,因为 promise 直接使用了 Result,所以我们不必切换到 response.resultFuture 类似于“微温”的发布者。它像热可观察对象一样立即执行其体并且只执行一次,因此它会立即启动 Alamofire 请求。又像冷可观察对象一样,因为每个订阅者最终都会收到一个值或错误(假设您最终调用了 promise)。Future 只执行其体一次,但缓存您传递给 promiseResult
您可以通过将 Future 包装在 Deferred 中来创建真正的冷发布者。
func doRequest<T: Mappable>(request: URLRequestConvertible) -> AnyPublisher<T, AFError> {
    return Deferred {
        Future { promise in
            self.session
                .request(request)
                .validate()
                .responseObject { (response: AFDataResponse<T>) in
                    promise(response.result) }
        }
    }.eraseToAnyPublisher()
}

Deferred 每次订阅时都会调用其主体创建一个新的内部 Publisher。所以每次订阅时,您都将创建一个新的 Future,它将立即开始一个新的 Alamofire 请求。如果您想使用 retry 运算符,如此问题中所述,则此方法很有用。


我已在我的问题中添加了一个编辑。 - JAHelia
你需要更多的帮助吗?我在你的编辑中没有看到任何请求寻求更多的帮助。 - rob mayoff
从 Ray Wenderlich 的 Combine 书籍第 248 页摘录:“...为了在后台线程上解析 JSON 并保持应用程序的响应性,让我们创建一个新的自定义调度队列....”,然后作者创建了我在问题中列出的队列。在书中的用法:URLSession.shared .dataTaskPublisher(for: url) .receive(on: apiQueue) .map(\.data) .decode(type: MyModel.self, decoder: decoder) .catch { _ in Empty<MyModel, Error>() } .eraseToAnyPublisher() - JAHelia
我相信你示例中的解析是由Alamofire在将结果交给Combine之前完成的。因此,如果它没有默认执行解码操作,你需要使用Alamofire API在后台队列上进行解码。 - rob mayoff
没错,Alamofire默认确实进行解析和解码,所以我想私有后台队列是不需要的。 - JAHelia
显示剩余5条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接