我正在尝试消费一个 REST API [1],该 API 发送服务器推送事件到客户端。我目前正在使用来自 Square 的 Retrofit 进行消费,但我不确定如何操作。有没有之前使用 Retrofit 工作经验的人可以帮忙?如果不是 Retrofit,请建议其他可以实现这一功能的 Java 库。
[1] https://mesosphere.github.io/marathon/docs/rest-api.html#get-v2-events
[1] https://mesosphere.github.io/marathon/docs/rest-api.html#get-v2-events
我知道这是一个老问题。但我没有找到一个完整的例子,现在试着通过我的代码提供一个。我们只使用retrofit
和coroutines
1.在retrofit API接口
中需要添加代码。请注意我们使用了@Streaming
和返回类型为Call<ResponseBody>
@POST("/v1/calc/group-prices")
@Streaming
fun calculateGroupPrices(@Body listOptions: List<GroupCalculatorOptions>): Call<ResponseBody>
2. 在您的仓库
类中需要添加以下代码。注意我们使用了流
和读取流
。要理解已经到达带有有效载荷的消息,它必须以"data:"
开头。
fun loadGroupDeliveryRateInfos(listOptions: List<GroupCalculatorOptions>) = flow {
coroutineScope {
val response = restApi.calculateGroupPrices(listOptions).execute()
if (response.isSuccessful) {
val input = response.body()?.byteStream()?.bufferedReader() ?: throw Exception()
try {
while (isActive) {
val line = input.readLine() ?: continue
if (line.startsWith("data:")) {
try {
val groupDeliveryRateInfo = gson.fromJson(
line.substring(5).trim(),
GroupDeliveryRateInfo::class.java
)
emit(groupDeliveryRateInfo)
} catch (e: Exception) {
e.printStackTrace()
}
}
}
} catch (e: IOException) {
throw Exception(e)
} finally {
input.close()
}
} else {
throw HttpException(response)
}
}
}
3. 最后一步是将我们的数据收集到ViewModel
中。 我们只需要从repository
调用该方法即可。
repository.loadGroupDeliveryRateInfos(it.values.toList())
.collect { info ->
handleGroupDeliveryRateInfo(info)
}
据我所知,Retrofit不支持SSE。从我的研究来看,目前这是最佳选择。 注:保留html标签。OkSse是一个扩展库,用于创建Server-Sent Event (SSE)客户端的OkHttp库。
public interface NotificationAPI {
@GET("notifications/sse")
Call<InputStream> getNotificationsStream(@retrofit2.http.Query("Last-Event-ID") String lastEventId);
}
我为InputStream
编写了一个快速的转换器工厂:
public class InputStreamConverterFactory extends Converter.Factory {
private static class InputStreamConverter implements Converter<ResponseBody, InputStream> {
@Nullable
@Override
public InputStream convert(ResponseBody value) throws IOException {
return value.byteStream();
}
}
@Override
public @Nullable
Converter<ResponseBody, ?> responseBodyConverter(Type type, Annotation[] annotations, Retrofit retrofit) {
if (type.equals(InputStream.class)) {
return new InputStreamConverter();
}
return null;
}
}
My client code looks like this:
var cinputStream = api.getNotificationsStream(null);
var inputStream = cinputStream.execute().body();
try(var sseStream = new MySSEStreamParser(inputStream)) {
//handle the stream here...
}
有一个OkHttp SSE解析器,你可能可以使用。然而:
BufferedReader
封装并调用 readLine()
方法,语法大多是这样的:data: <message>
。话虽如此,我想肯定已经有人编写了可以重用的解析器。如果您找到了,请告诉我! - Matthew这里是一个稍微更新的方法,我刚刚根据this博客得到了工作。它使用Retrofit的@Streaming
与Kotlin流和缓冲读取器。SseHeartbeatData只是我的sse端点返回的自定义json负载。
import com.google.gson.Gson
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.isActive
import okhttp3.ResponseBody
import retrofit2.http.GET
import retrofit2.http.Streaming
interface NetworkService {
@Streaming
@GET("/network/sse/heartbeat")
suspend fun getHeartbeat() : ResponseBody
}
data class SseHeartbeatData(val count: Int, val timestamp: Long, val requestId: String)
class SseRepository(private val networkService: NetworkService) {
fun getHeartbeat() = flow {
coroutineScope {
val gson = Gson()
val inputReader = networkService.getHeartbeat().byteStream().bufferedReader()
var event: String? = null
var heartbeatData: SseHeartbeatData? = null
while (isActive) {
val line = inputReader.readLine()
when {
line.startsWith("event:") -> {
event = line.removePrefix("event:").trim()
}
line.startsWith("data:") -> {
val jsonString = line.removePrefix("data:").trim()
if (event == "heartbeat") {
heartbeatData = gson.fromJson(jsonString, SseHeartbeatData::class.java)
}
event = null
}
line.isEmpty() -> {
heartbeatData?.let {
emit(it)
}
heartbeatData = null
}
}
}
inputReader.close()
}
}
}
然后您可以在视图模型中收集它并根据需要处理异常。
_sseJob = viewModelScope.launch(Dispatchers.IO) {
sseRepository.getHeartbeat().catch {
Log.w("MainViewModel", it)
_sseJob?.cancel()
_uiState.value = SseNotActive
}
.collect {
_uiState.value = SseActive(count = it.count)
}
}