使用RxAndroidBle(rxJava)向设备发送命令列表

3

我正在尝试通过rxJava向设备发送命令列表。以下是我的代码:

public void startWriteCommucation(final ArrayList<byte[]> b) {
    if (isConnected()){
            connectionObservable
                    .flatMap(new Func1<RxBleConnection, Observable<Observable<byte[]>>>() {
                        @Override
                        public Observable<Observable<byte[]>> call(final RxBleConnection rxBleConnection) {
                            final List<Observable<byte[]>> list = new ArrayList<>();
                            for (byte[] bytes: b){
                                Log.e("Observer", Arrays.toString(bytes));
                                list.add(rxBleConnection
                                        .writeCharacteristic(BleDevice.characteristicWrite, bytes));
                            }
                            return Observable.from(list);
                        }
                    })
                    .concatMap(new Func1<Observable<byte[]>, Observable<byte[]>>() {
                        @Override
                        public Observable<byte[]> call(Observable<byte[]> observable) {
                            return observable;
                        }
                    })
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Action1<byte[]>() {
                        @Override
                        public void call(byte[] bytes) {
                            view.setTextStatus("Write success");
                            Log.e("Subscriber", Arrays.toString(bytes));
                        }
                    });
        }
}

它可以工作,然后我点击一次按钮。例如,我的点击方法:

 public void onClick(){
        ArrayList<byte[]> listCmd = new ArrayList<>();
        listCmd.add(new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10});
        listCmd.add(new byte[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0});
        startWriteCommucation(listCmd);
}

我在LogCat中看到了myLogs:

E/Observer: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
E/Observer: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]

E/Subscriber: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
E/Subscriber: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]

但是,当我快速双击按钮时就会出现问题。然后第一次带有observable的点击仍在工作,我再次单击来再次调用startWriteCommunication方法。之后我的日志看起来是这样的:

 E/Observer: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
 E/Observer: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
 E/Observer: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
 E/Observer: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]

 E/Subscriber: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
 E/Subscriber: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
 E/Subscriber: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
 E/Subscriber: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]

主要问题是它们没有按顺序,我的设备不能正常工作。你能帮我找到问题吗?

1个回答

2
问题是RxAndroidBle库存在一个bug(导致响应与请求不匹配),并且在两个有状态的通信流之间共享连接(需要进行两次写操作而没有任何中间通信)。
这个bug:应该写入BluetoothGattCharacteristic的值(byte[])被设置得太早了。如果有两个相同特性的并行写入 - 其中一个由于竞争条件可能会覆盖另一个已经设置的byte[]。我已经修复了库中的问题,现在正在进行代码审查,应该很快将其应用于SNAPSHOT版本。
通过这些更改,输出将如下所示:
D/Observer: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
D/Observer: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
D/Observer: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
D/Observer: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]

D/Subscriber: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
D/Subscriber: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
D/Subscriber: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
D/Subscriber: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]

可能的解决方案

如果您不想在用户快速点击按钮两次时触发两次流程 - 您只需创建一个可共享的流程:

Observable<byte[]> theSharedFlow = rxBleConnection
  .writeCharacteristic(uuid, data1)
  .flatMap(writtenBytes -> rxBleConnection.writeCharacteristic(uuid, data2))
  .share()

当订阅多次时,只会执行一次直到完成。在上述片段中,第二个writeCharacteristic()将在第一个发出写入字节后被订阅(并排队等待通信)。
如果应用程序旨在在共享连接的同时按任意时间顺序发送任意命令集,则由应用程序负责确保前一组已完成。
我希望我已经回答了您的问题。如果您可以提供有关用例的更多信息,我将尝试改进我的答案。
最好的问候
编辑:

备选方案:

为了保留顺序,所有Observable都需要以它们应该到达的顺序进行订阅。 Observable的契约是:Observable(如果它是冷的)不会执行,直到订阅。而且,当使用flatMap()时,第二个Observable在第一个发出后订阅。
为了使两个写操作按顺序传输,它们必须按相同的顺序订阅,因此流程可能如下所示:
connectionObservable
            .flatMap(rxBleConnection -> {
                Observable<byte[]> mergedObservable = null;
                for (byte[] bytes : b) {
                    Log.d("Observer", Arrays.toString(bytes));
                    final Observable<byte[]> writeObservable = rxBleConnection
                            .writeCharacteristic(uuid, bytes);

                    if (mergedObservable == null) {
                        mergedObservable = writeObservable;
                    } else {
                        // merging two Observables to be subscribed at the same time when subscribed
                        mergedObservable = mergedObservable.mergeWith(writeObservable);
                    }
                }
                return mergedObservable;
            })
            // removed .concatMap()
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(
                    bytes -> Log.d("Subscriber", Arrays.toString(bytes)),
                    throwable -> Log.e("Subscriber", "error", throwable)
            );

RxJava有显然更多的方法来实现相同的行为,但这不是这个问题的一部分。

谢谢,我明天会尝试。 - KolinLoures
但我认为订阅者的输出将与观察者的顺序相同。 - KolinLoures
我已经编辑了我的回复。另一种解决方案可以修复这个问题,但在我看来,这只是一个不正确流程的表现(即具有状态的两个并行执行的流程)。 - Dariusz Seweryn

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接