使用Retrofit消费服务器发送事件

7
我正在尝试消费一个 REST API [1],该 API 发送服务器推送事件到客户端。我目前正在使用来自 Square 的 Retrofit 进行消费,但我不确定如何操作。有没有之前使用 Retrofit 工作经验的人可以帮忙?如果不是 Retrofit,请建议其他可以实现这一功能的 Java 库。

[1] https://mesosphere.github.io/marathon/docs/rest-api.html#get-v2-events

4个回答

4

我知道这是一个老问题。但我没有找到一个完整的例子,现在试着通过我的代码提供一个。我们只使用retrofitcoroutines

1.在retrofit API接口中需要添加代码。请注意我们使用了@Streaming和返回类型为Call<ResponseBody>

@POST("/v1/calc/group-prices")
@Streaming
fun calculateGroupPrices(@Body listOptions: List<GroupCalculatorOptions>): Call<ResponseBody>

2. 在您的仓库类中需要添加以下代码。注意我们使用了和读取。要理解已经到达带有有效载荷的消息,它必须以"data:"开头。

fun loadGroupDeliveryRateInfos(listOptions: List<GroupCalculatorOptions>) = flow {
        coroutineScope {
            val response = restApi.calculateGroupPrices(listOptions).execute()
            if (response.isSuccessful) {
                val input = response.body()?.byteStream()?.bufferedReader() ?: throw Exception()
                try {
                    while (isActive) {
                        val line = input.readLine() ?: continue
                        if (line.startsWith("data:")) {
                            try {
                                val groupDeliveryRateInfo = gson.fromJson(
                                    line.substring(5).trim(),
                                    GroupDeliveryRateInfo::class.java
                                )
                                emit(groupDeliveryRateInfo)
                            } catch (e: Exception) {
                                e.printStackTrace()
                            }
                        }
                    }
                } catch (e: IOException) {
                    throw Exception(e)
                } finally {
                    input.close()
                }
            } else {
                throw HttpException(response)
            }
        }
    }

3. 最后一步是将我们的数据收集到ViewModel中。 我们只需要从repository调用该方法即可。

 repository.loadGroupDeliveryRateInfos(it.values.toList())
                .collect { info ->
                    handleGroupDeliveryRateInfo(info)
                }

这就是全部内容,不需要额外的库。

2
尝试使用这个库:oksee

OkSse是一个扩展库,用于创建Server-Sent Event (SSE)客户端的OkHttp库。

据我所知,Retrofit不支持SSE。从我的研究来看,目前这是最佳选择。

https://github.com/heremaps/oksse

注:保留html标签。

项目似乎有点死气沉沉。现在有了:https://github.com/launchdarkly/okhttp-eventsource - Matthew

1
没有必要混淆Retrofit和SSE。使用Retrofit获取输入流,然后找到(或编写)一个输入流解析器来分块SSE事件。
在Retrofit中,我有这个:
public interface NotificationAPI {
    @GET("notifications/sse")
    Call<InputStream> getNotificationsStream(@retrofit2.http.Query("Last-Event-ID") String lastEventId);
}

我为InputStream编写了一个快速的转换器工厂:

public class InputStreamConverterFactory extends Converter.Factory {

    private static class InputStreamConverter implements Converter<ResponseBody, InputStream> {
        @Nullable
        @Override
        public InputStream convert(ResponseBody value) throws IOException {
            return value.byteStream();
        }
    }

    @Override
    public @Nullable
    Converter<ResponseBody, ?> responseBodyConverter(Type type, Annotation[] annotations, Retrofit retrofit) {
        if (type.equals(InputStream.class)) {
            return new InputStreamConverter();
        }
        return null;
    }
}

My client code looks like this:

var cinputStream = api.getNotificationsStream(null);
var inputStream = cinputStream.execute().body();
try(var sseStream = new MySSEStreamParser(inputStream)) {
   //handle the stream here...
}

有一个OkHttp SSE解析器,你可能可以使用。然而:

  • OkHttp SSE代码带有线程。很可能你想要自己带入线程模型。
  • 实际的OkHttp SSE解析器是一个内部包。这并不使它成为一个很好的提升候选。

你好!抱歉,我也遇到了retrofit的同样问题。我找到了你的解决方案,但是我不明白它是如何工作的?我找不到SSEStreamParser类。也许你在git上有例子?那就太好了!谢谢! - Дмитрий Александрович
我们自己编写了一个解析器,规范相当简单,请参见:https://html.spec.whatwg.org/multipage/server-sent-events.html - 你基本上只需要使用BufferedReader封装并调用 readLine()方法,语法大多是这样的:data: <message>。话虽如此,我想肯定已经有人编写了可以重用的解析器。如果您找到了,请告诉我! - Matthew

0

这里是一个稍微更新的方法,我刚刚根据this博客得到了工作。它使用Retrofit的@Streaming与Kotlin流和缓冲读取器。SseHeartbeatData只是我的sse端点返回的自定义json负载。

import com.google.gson.Gson
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.isActive
import okhttp3.ResponseBody
import retrofit2.http.GET
import retrofit2.http.Streaming

interface NetworkService {
  @Streaming
  @GET("/network/sse/heartbeat")
  suspend fun getHeartbeat() : ResponseBody
}

data class SseHeartbeatData(val count: Int, val timestamp: Long, val requestId: String)

class SseRepository(private val networkService: NetworkService) {

fun getHeartbeat() = flow {
    coroutineScope {
        val gson = Gson()
        val inputReader = networkService.getHeartbeat().byteStream().bufferedReader()

        var event: String? = null
        var heartbeatData: SseHeartbeatData? = null

        while (isActive) {
            val line = inputReader.readLine()

            when {
                line.startsWith("event:") -> {
                    event = line.removePrefix("event:").trim()
                }
                line.startsWith("data:") -> {
                    val jsonString = line.removePrefix("data:").trim()

                    if (event == "heartbeat") {
                        heartbeatData = gson.fromJson(jsonString, SseHeartbeatData::class.java)
                    }

                    event = null
                }
                line.isEmpty() -> {
                    heartbeatData?.let {
                        emit(it)
                    }
                    heartbeatData = null
                }
            }
        }
        inputReader.close()
    }
}

}

然后您可以在视图模型中收集它并根据需要处理异常。

_sseJob = viewModelScope.launch(Dispatchers.IO) {
        sseRepository.getHeartbeat().catch {
            Log.w("MainViewModel", it)
            _sseJob?.cancel()
            _uiState.value = SseNotActive
        }
            .collect {
            _uiState.value = SseActive(count = it.count)
        }
    }

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接