将现有的Android Java WebSocket实现转换为响应式WebSocket。

3

我正在Android中使用OKHttpClient和WebSockets。我想将它转换为响应式编程。如何实现呢?目前,我正在为WebSocket连接执行此操作。

    // OkHttp Client
    OkHttpClient httpClient = new OkHttpClient.Builder().build();

    // WebSocket Object
    Request request = new Request.Builder().url(url).build();
    mWebSocket = httpClient.newWebSocket(request, new WebSocketListener() {
        // Override methods - OnOpen, OnMessage, OnClosing, OnFailure
    }

    // Then Calling this to make websocket request
    mWebSocket.send(message);

我发现了这个库可以使用响应式WebSocket: https://github.com/jacek-marchwicki/JavaWebsocketClient

但是,它没有像“WebSocketListener”那样的回调监听器,以便我可以处理消息。

如有任何帮助,请在此方向上提供,谢谢。


有关如何解决这个问题的任何更新吗?我也遇到了同样的问题。https://dev59.com/n8Dqa4cB1Zd3GeqPnubd - DIRTY DAVE
@DIRTYDAVE,我使用了这个库,它很好用。https://github.com/dhhAndroid/RxWebSocket - Sajid Zeb
2个回答

0

有一个由Tinder启发的Retrofit WebSocket客户端https://github.com/Tinder/Scarlet

除了发送和接收常规消息外,您还可以创建反应式流以接收Websocket.Event并过滤传入事件,例如onOpen、onMessage、onFailed等。

以下是如何实现的示例。请转到上面提供的链接获取详细示例。

//Service declaration similar to retrofit
interface MyWebsocketService{
    @Receive
    Flowable<WebSocket.Event> observeWebSocketEvent();
}

Scarlet scarletInstance = new Scarlet.Builder()
    .webSocketFactory(OkHttpClientUtils.newWebSocketFactory(okhttpClient,"websocket-server-url")
    .addStreamAdapterFactory(new RxJava2StreamAdapterFactory())
    .build();

MyWebsocketService myWebsocketService = scarletInstance.create<MyWebsocketService>();

myWebsocketService.observeWebsocketEvent()
                  .subscribe(event -> {
                      if(event instanceof Websocket.Event.OnConnectionOpened){
                         //do something here
                      }else if(event instanceof Websocket.Event.OnConnectionClosed){
                        //do something here
                      }
                  });

0

我根本没有使用OkHttp,但简短的回答是在你的OnMessage方法中放置PublishSubject。Subject将向您的订阅者提供此消息,您还可以在订阅之前创建一个链。

// OkHttp Client
OkHttpClient httpClient = new OkHttpClient.Builder().build();

// Creating a Subject
// You can use it as Observer or Subscriber
PublishSubject<String> subject = PublishSubject.create();

// WebSocket Object
Request request = new Request.Builder().url(url).build();
mWebSocket = httpClient.newWebSocket(request, new WebSocketListener() {
    @Override
    public void onMessage(String text, ...) {
        subject.onNext(text);
    }
}

// I hope that is an queue option
mWebSocket.send(message);

// Do what you want with your message
Disposable subscription = subject.flatMap(...)
                                 .observeOn(AndroidSchedulers.mainThread())
                                 .subscribe(...);

总的来说,这是一个复杂的问题,因为你需要处理连接错误、数据压力、配置更改,并且要考虑生命周期,但它也适用于非响应式方式(例如回调地狱),因此PublishSubject是一个起点。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接