Ktor响应流式传输

6
我正在尝试调用 Twitter 的一个端点,该端点会向客户端返回一系列 JSON 结果的不间断流。

https://documenter.getpostman.com/view/9956214/T1LMiT5U#977c147d-0462-4553-adfa-d7a1fe59c3ec

我试图像这样调用端点

        val url = "https://api.twitter.com/2/tweets/search/stream"
        _streamChannel = _client.get<ByteReadChannel>(token, url) //Stops here

        val byteBufferSize = 1024
        val byteBuffer = ByteArray(byteBufferSize)

        _streamChannel?.let {
            while (_streamChannel!!.availableForRead > 0) {
                _streamChannel!!.readAvailable(byteBuffer, 0, byteBufferSize)
                val s = String(byteBuffer)
                parseStreamResponseString(s).forEach {
                    emit(Response.Success(it))
                }
            }
        }

我的client.get代码是这样的

suspend inline fun <reified T> get(authKey: String, url: String): T? {
    val response = _client.get<HttpResponse>(url) {
        header("Authorization", "Bearer $authKey")
    }

    when (response.status.value) {
        in 300..399 -> throw RedirectResponseException(response)
        in 400..499 -> throw ClientRequestException(response)
        in 500..599 -> throw ServerResponseException(response)
    }

    if (response.status.value >= 600) {
        throw ResponseException(response)
    }

    return response.receive<T>()
}

当我发出请求时,它只是停在那里,我认为它在等待完整的响应返回给我之前不会给我任何响应。
编辑
我也尝试使用scoped streaming,但它停在readAvailable这一行。我知道有消息传递,因为当我通过cURL运行请求时,我不断地获取数据。
    _client.get<HttpStatement> {
        header("Authorization", "Bearer $authKey")
        url(urlString)
        contentType(ContentType.Application.Json)
        method = HttpMethod.Get
    }.execute {
        val streamChannel = it.receive<ByteReadChannel>()
        val byteBufferSize = 1024
        val byteBuffer = ByteArray(byteBufferSize)
        streamChannel.readAvailable(byteBuffer, 0, byteBufferSize) // Stops here
        val s = String(byteBuffer)
    }

如何使用Ktor处理不断传输的JSON数据?

1个回答

1
据我所知,Ktor客户端没有以Twitter流API所需的方式公开访问请求的IO缓冲区。
从Twitter文档here中可以看到:
一些HTTP客户端库只有在服务器关闭连接后才返回响应正文。这些客户端无法用于访问Streaming API。您必须使用能够逐步返回响应数据的HTTP客户端。大多数强大的HTTP客户端库都提供此功能。例如,Apache HttpClient将处理此用例。
您正在告诉Ktor获取的是一个ByteReadChannel,因此,一旦请求关闭(这在此Twitter端点上永远不会发生),Ktor客户端将尝试使用您正在使用的任何插件(例如json)将该数据解析为ByteReadChannel。它也无法做到这一点,因为您从Twitter获取的数据不是ByteReadChannel,而是一个新行分隔的JSON对象列表。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接