URLSession与服务器推送事件(Server-Sent Events)有时会返回kCFErrorDomainCFNetwork 303错误。

7
在我们的应用中,我们使用Server-Sent Events请求信息流。 为此,我们使用小型库IKEventSource, 其底层使用Foundation.URLSession
这些信息以小的JSON包发送,例如: {"type":"update","data":{"id":"1234","name":"someName","someOtherField":33,"size":"20","someAddress":"Awesome Street 111"}} 现在对我们来说这个功能非常好,但是我们注意到有时会出现以下错误: Error Domain=kCFErrorDomainCFNetwork Code=303 "(null)" UserInfo={NSErrorPeerAddressKey=<CFData 0x610000a84510 [0x103ccbe40]>{length = 16, capacity = 16, bytes = 0x100201bbb9131fbd0000000000000000}, _kCFStreamErrorCodeKey=-2201, _kCFStreamErrorDomainKey=4} 我理解这是一个kCFErrorHTTPParseFailure,它尝试解析的字符串似乎只是JSON包的一部分,如: {"type":"update","data":{"id":"1234","name":"so 我们目前的理解是,URLSession正在缓冲数据,有时它会填满并且最后一部分将被切断。 我们可以使用curl http://our.service.com在终端中重现此问题,并使用curl -N http://our.service.com查看工作示例。
是否有人知道如何将此选项添加到URLSessionURLSessionConfigurationURLSessionTask中。 或者是否有其他解释或解决方案(可能是服务器端)。 对于这个错误,我们的用户有时会错过数据更新,我们认为这是一些无法解释的反馈的原因。
顺便说一下,我们在相应的Android应用程序中也遇到了同样的问题。

有解决方案了吗? - ergunkocak
1
我们仍然面临这个问题,计划更改整个API以解决它。 - Peter Schumacher
你的服务器使用了http/2.0吗? - Yulong Xiao
你解决了这个错误吗? - Zouhair Sassi
我们整体改变了API。我们没有找到解决方案。 - Peter Schumacher
2个回答

1

我遇到了相同的错误,没有使用任何库,只是用自己的实现方式使用了URLSession

通过添加HTTP头部Accept: text/event-stream,我解决了这个问题。

    var request = URLRequest(url: url)
    request.setValue("text/event-stream", forHTTPHeaderField: "Accept")

0
多年过去了,现在有了Swift Concurrency,也许有人会觉得这个有用。
似乎原来关于缓冲区填满的问题是个难题。我发现通过从连接到SSE端点的服务请求返回一个AsyncThrowingStream,你可以通过AsyncThrowingStream的bufferingPolicy来控制缓冲区。
苹果文档:AsyncThrowingStream.init(_:bufferingPolicy:_:) 下面的代码是为了可读性。你当然可以使用泛型来使代码更可重用,但这样可以传达出重点。
假设你有一个端点,它接收一个带有请求体的POST请求,然后以事件流的形式响应。正如@Guillaume提到的,你期望得到的是"text/event-stream"而不是典型的JSON。
请求体中的JSON可能是这样的:
{
  "question": "What's the answer?"
}

而且响应可能以文本形式将事件流返回,就像这样:
data: {"answer": "This"}
data: {"answer": "This is"}
data: {"answer": "This is the"}
data: {"answer": "This is the answer."}

你需要理解从服务器返回的事件的格式,以便正确解码。它应该包含一个JSON字符串,你需要解码其中的某些内容,但它可能没有以"data: "开头。在这个例子中,我们将继续假设它是这样的。
所以这是一个示例代码,用于发送这个请求:
final class Service {
    enum ServiceError: Error {
        case generic
    }

    // 1.
    private var streamTask: Task<Void, Error>?
    private let url = URL(string: "https://example.com/api/events")!
    
    func sendRequest(with question: String) async throws -> AsyncThrowingStream<Response, Error> {
        // 2.
        var request = URLRequest(url: url)
        request.setValue("application/json", forHTTPHeaderField: "Content-Type")
        request.setValue("text/event-stream; charset=utf-8", forHTTPHeaderField: "Accept")
        request.setValue("keep-alive", forHTTPHeaderField: "Connection")
        request.httpMethod = "POST"
        request.httpBody = try JSONEncoder().encode(
            Request(question: question)
        )
        
        // 3.
        let (asyncBytes, response) = try await URLSession.shared.bytes(for: request)
        try Task.checkCancellation()
        
        guard let httpResponse = response as? HTTPURLResponse,
              (200...299).contains(httpResponse.statusCode)
        else {
            throw ServiceError.generic
        }
        
        // 4.
        return AsyncThrowingStream(Response.self, bufferingPolicy: .bufferingNewest(3)) { continuation in
            // 5.
            continuation.onTermination = { [weak self] _ in
                self?.streamTask?.cancel()
                self?.streamTask = nil
            }
            
            // 6.
            streamTask = Task {
                // 7.
                for try await line in asyncBytes.lines {
                    try Task.checkCancellation()
                    
                    // 8.
                    guard line.hasPrefix("data: "),
                          let data = line.dropFirst(6).data(using: .utf8)
                    else {
                        // Seems like we got a partial event or one that doesn't start with
                        // `data: `. No big deal. Just continue to the next event in the stream.
                        continue
                    }
                    
                    // 9.
                    do {
                        let response = try JSONDecoder().decode(Response.self, from: data)
                        continuation.yield(with: .success(response))
                    } catch {
                        continuation.finish(throwing: error)
                    }
                }
                // 10.
                continuation.finish()
            }
        }
    }
}

struct Request: Encodable {
    let question: String
}

struct Response: Decodable {
    let answer: String
}

我们需要保留一个未结构化的任务,以便以后可以取消它。这在下面会有意义。
构建我们的URLRequest。这里没有什么特别的。
调用这个`bytes(for:delegate:)`方法。这个东西很酷,因为它返回`(URLSession.AsyncBytes, URLResponse)`。`AsyncBytes`部分是关键。Apple Docs 使用定义好的`bufferingPolicy`构建我们的`AsyncThrowingStream`。在这里,我选择保留缓冲区中最新的3个事件。根据你的需求选择合适的方法或策略。我们也可以使用更简单的`AsyncThrowingStream(unfolding:)`,但在完成这些步骤后,请查看底部的注意事项。
当继续执行被终止(即取消)时,我们应该调用`cancel()`并将`streamTask`设置为`nil`。如果你还在想“什么是streamTask?”那就是第6步。
这是`streamTask`。我们创建并保留对未结构化任务的引用来完成我们的工作。通过将其存储在第1步的`streamTask`属性中,我们可以在第5步中取消它。
迭代我们从URLSession调用中获得的`asyncBytes.lines`。看看我们上面的示例响应中的每个`data: {...}`事件是如何从新行开始的?这将遍历每一行。
检查`line`的内容(记住它只是文本,不是JSON)是否以`data:`开头,然后去掉这些字符并将其转换为`Data`。如果不是,我们得到了一个无法识别的内容。继续等待下一个事件。
将上一步的`Data`解码。如果成功,`yield`解码后的`Response`。如果失败,抛出错误并结束。(你也可以选择不结束,继续尝试解码流中的下一个事件)。
如果上面没有抛出任何错误,一旦我们处理完流中的所有事件,我们将调用`.finish()`。

抓住你了:

你们中的一些人可能会想:“你在一个异步上下文中!为什么要使用带有同步的build闭包的AsyncThrowingStream,它只接受一个continuation来包装一个非结构化的任务?你破坏了自动取消!为什么不只是返回AsyncThrowingStream(unfolding:)?!?!?”如果你没有想到这个,或者现在在想“等等,什么意思?”,这就是它:

我最初尝试了这个。将下面的所有内容替换为以下内容,你就会得到:

return AsyncThrowingStream {
    for try await line in asyncBytes.lines {
        try Task.checkCancellation()
        
        guard line.hasPrefix("data: "),
              let data = line.dropFirst(6).data(using: .utf8)
        else {
            // Seems like we got a partial event or one that doesn't start with
            // `data: `. No big deal. Just continue to the next event in the stream.
            continue
        }
        
        do {
            return try JSONDecoder().decode(Response.self, from: data)
        } catch {
            throw error
        }
    }
    return nil
}

哎呀,这个好多了。不需要再坚持一些没有结构的任务了。你会自动取消。可以直接返回/抛出异常,而不用搞那些“continuation”的东西。而且代码量也少了。
不过,我发现有时候它会在中途停止(对于较长的响应经常发生)。我可以在for try await line in asyncBytes.lines {}的顶部放一个print(line),但它在中途就停止打印了。即使docs中说“默认情况下,缓冲区限制是Int.max,这意味着它是无限制的。”所以要么这不是真的,要么有一些与缓冲区无关的问题发生了,而使用“continuation”的方法则没有遇到这些问题。
如果你选择使用AsyncThrowingStream(unfolding:)的方法,请务必进行充分的测试。
如果有人对为什么只有基于“continuation”的方法有效有任何想法,我会很乐意听听。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接