如何在 RxJava 2 中释放后重新启动发射?

7

我在onPause()中处理我的Observable,我希望在onResume()中重新启动它。我该怎么做?

以下是我的Observable:

    Observable<ObdCommandResult> myObservable = Observable.create(new ObservableOnSubscribe<ObdCommandResult>() {
        @Override
        public void subscribe(ObservableEmitter<ObdCommandResult> e) throws Exception {
            ...
            socket.connect();

            new ObdResetCommand().run(socket.getInputStream(), socket.getOutputStream());
            new EchoOffCommand().run(socket.getInputStream(), socket.getOutputStream());
            new LineFeedOffCommand().run(socket.getInputStream(), socket.getOutputStream());
            new SelectProtocolCommand(ObdProtocols.AUTO).run(socket.getInputStream(), socket.getOutputStream());
            ObdCommandResult obdCommandResult = new ObdCommandResult();
            while (!Thread.currentThread().isInterrupted()) {
                for (int i = 1; i < 5; i++) {
                    try {
                        livedataObdCommandList.get(i-1).getCommand().run(socket.getInputStream(), socket.getOutputStream());
                        obdCommandResult.setId(i);
                        obdCommandResult.setValue(livedataObdCommandList.get(i-1).getCommand().getFormattedResult());
                        e.onNext(obdCommandResult);
                    }catch (UnsupportedCommandException uce) {
                    } catch (InterruptedException ie) {
                }
            }
        }
    });

...以及我的观察者:

    Observer<ObdCommandResult> observer = new Observer<ObdCommandResult>() {
        @Override
        public void onNext(ObdCommandResult value) {
            switch (value.getId())  {
                case 1:
                    currentSpeed.setText(value.getValue());
                    break;
                case 2:
                    revCounter.setText(value.getValue());
                    break;
                case 3:
                    throttlePosition.setText(value.getValue());
                    break;
                case 4:
                    oilTemp.setText(value.getValue());
                    break;
                case 5:
                    fuelLevel.setText(value.getValue());
                    break;
                case 6:
                    engineCoolant.setText(value.getValue());
                    break;
            }
        }

    };

因此,我希望将其处理掉,并在用户返回时重新启动。


请在此处发布您的代码。 - Alex Nik
4
与您第一次订阅可观察对象时所做的相同。没有重新订阅的功能。 - Gabe Sechan
@GabeSechan,您可以将此评论转换为答案,我相信。 - Antek
@GabeSechan,所以我需要再次调用第二段代码吗?(Observer<ObdCommandResult> observer = new Observer<ObdCommandResult>() { @Override public void onNext(ObdCommandResult value) { .. ) - c0dehunter
1个回答

0

你可以重复使用同一个 Observable 并多次订阅它。每次调用 subscribe() 都将产生一个新的订阅。(除非进行了订阅,否则不会执行 Observable.create 的主体。然后,将为每个进行的订阅执行该主体。)


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接