多年过去了,现在有了Swift Concurrency,也许有人会觉得这个有用。
似乎原来关于缓冲区填满的问题是个难题。我发现通过从连接到SSE端点的服务请求返回一个AsyncThrowingStream,你可以通过AsyncThrowingStream的bufferingPolicy来控制缓冲区。
苹果文档:
AsyncThrowingStream.init(_:bufferingPolicy:_:)
下面的代码是为了可读性。你当然可以使用泛型来使代码更可重用,但这样可以传达出重点。
假设你有一个端点,它接收一个带有请求体的POST请求,然后以事件流的形式响应。正如@Guillaume提到的,你期望得到的是"text/event-stream"而不是典型的JSON。
请求体中的JSON可能是这样的:
{
"question": "What's the answer?"
}
而且响应可能以文本形式将事件流返回,就像这样:
data: {"answer": "This"}
data: {"answer": "This is"}
data: {"answer": "This is the"}
data: {"answer": "This is the answer."}
你需要理解从服务器返回的事件的格式,以便正确解码。它应该包含一个JSON字符串,你需要解码其中的某些内容,但它可能没有以"data: "开头。在这个例子中,我们将继续假设它是这样的。
所以这是一个示例代码,用于发送这个请求:
final class Service {
enum ServiceError: Error {
case generic
}
private var streamTask: Task<Void, Error>?
private let url = URL(string: "https://example.com/api/events")!
func sendRequest(with question: String) async throws -> AsyncThrowingStream<Response, Error> {
var request = URLRequest(url: url)
request.setValue("application/json", forHTTPHeaderField: "Content-Type")
request.setValue("text/event-stream; charset=utf-8", forHTTPHeaderField: "Accept")
request.setValue("keep-alive", forHTTPHeaderField: "Connection")
request.httpMethod = "POST"
request.httpBody = try JSONEncoder().encode(
Request(question: question)
)
let (asyncBytes, response) = try await URLSession.shared.bytes(for: request)
try Task.checkCancellation()
guard let httpResponse = response as? HTTPURLResponse,
(200...299).contains(httpResponse.statusCode)
else {
throw ServiceError.generic
}
return AsyncThrowingStream(Response.self, bufferingPolicy: .bufferingNewest(3)) { continuation in
continuation.onTermination = { [weak self] _ in
self?.streamTask?.cancel()
self?.streamTask = nil
}
streamTask = Task {
for try await line in asyncBytes.lines {
try Task.checkCancellation()
guard line.hasPrefix("data: "),
let data = line.dropFirst(6).data(using: .utf8)
else {
continue
}
do {
let response = try JSONDecoder().decode(Response.self, from: data)
continuation.yield(with: .success(response))
} catch {
continuation.finish(throwing: error)
}
}
continuation.finish()
}
}
}
}
struct Request: Encodable {
let question: String
}
struct Response: Decodable {
let answer: String
}
我们需要保留一个未结构化的任务,以便以后可以取消它。这在下面会有意义。
构建我们的URLRequest。这里没有什么特别的。
调用这个`bytes(for:delegate:)`方法。这个东西很酷,因为它返回`(URLSession.AsyncBytes, URLResponse)`。`AsyncBytes`部分是关键。
Apple Docs
使用定义好的`bufferingPolicy`构建我们的`AsyncThrowingStream`。在这里,我选择保留缓冲区中最新的3个事件。根据你的需求选择合适的方法或策略。我们也可以使用更简单的`AsyncThrowingStream(unfolding:)`,但在完成这些步骤后,请查看底部的注意事项。
当继续执行被终止(即取消)时,我们应该调用`cancel()`并将`streamTask`设置为`nil`。如果你还在想“什么是streamTask?”那就是第6步。
这是`streamTask`。我们创建并保留对未结构化任务的引用来完成我们的工作。通过将其存储在第1步的`streamTask`属性中,我们可以在第5步中取消它。
迭代我们从URLSession调用中获得的`asyncBytes.lines`。看看我们上面的示例响应中的每个`data: {...}`事件是如何从新行开始的?这将遍历每一行。
检查`line`的内容(记住它只是文本,不是JSON)是否以`data:`开头,然后去掉这些字符并将其转换为`Data`。如果不是,我们得到了一个无法识别的内容。继续等待下一个事件。
将上一步的`Data`解码。如果成功,`yield`解码后的`Response`。如果失败,抛出错误并结束。(你也可以选择不结束,继续尝试解码流中的下一个事件)。
如果上面没有抛出任何错误,一旦我们处理完流中的所有事件,我们将调用`.finish()`。
抓住你了:
你们中的一些人可能会想:“你在一个异步上下文中!为什么要使用带有同步的build
闭包的AsyncThrowingStream
,它只接受一个continuation
来包装一个非结构化的任务?你破坏了自动取消!为什么不只是返回AsyncThrowingStream(unfolding:)
?!?!?”如果你没有想到这个,或者现在在想“等等,什么意思?”,这就是它:
我最初尝试了这个。将下面的所有内容替换为以下内容,你就会得到:
return AsyncThrowingStream {
for try await line in asyncBytes.lines {
try Task.checkCancellation()
guard line.hasPrefix("data: "),
let data = line.dropFirst(6).data(using: .utf8)
else {
continue
}
do {
return try JSONDecoder().decode(Response.self, from: data)
} catch {
throw error
}
}
return nil
}
哎呀,这个好多了。不需要再坚持一些没有结构的任务了。你会自动取消。可以直接返回/抛出异常,而不用搞那些“continuation”的东西。而且代码量也少了。
不过,我发现有时候它会在中途停止(对于较长的响应经常发生)。我可以在
for try await line in asyncBytes.lines {}
的顶部放一个
print(line)
,但它在中途就停止打印了。即使
docs中说“默认情况下,缓冲区限制是Int.max,这意味着它是无限制的。”所以要么这不是真的,要么有一些与缓冲区无关的问题发生了,而使用“continuation”的方法则没有遇到这些问题。
如果你选择使用
AsyncThrowingStream(unfolding:)
的方法,请务必进行充分的测试。
如果有人对为什么只有基于“continuation”的方法有效有任何想法,我会很乐意听听。