Single.Create()...blockingGet()在使用Android/Firebase Realtime Database时会导致RxJava 2.1.3挂起

4
使用以下模式从Firebase实时数据库同步获取数据时:
String s = Single.create(new SingleOnSubscribe<String>() {
    @Override
    public void subscribe(SingleEmitter<String> e) throws Exception {
    FirebaseDatabase.getInstance().getReference("path").orderByChild("child").equalTo("xyz").addListenerForSingleValueEvent(new ValueEventListener() {
        @Override
        public void onDataChange(DataSnapshot dataSnapshot) {
        e.onSuccess("Got it");
        }
        @Override
        public void onCancelled(DatabaseError databaseError) {
        e.onError(databaseError.toException());
        }
    });
}
}).blockingGet();

如果我在 Single 内部使用相同的 Firebase "内部",它会挂起并创建 ANR 错误。但是,如果我在 Single 外部使用相同的 Firebase "内部",它就可以正常触发。同时,没有 Firebase 代码的 Single 也可以触发,因此似乎两者之间存在一些不兼容性。

有什么想法吗?

2个回答

1
Firebase在UI线程上提供事件,使用blockingGet等待结果会导致死锁。我认为您应该重新考虑应用程序逻辑,并使用subscribe(SingleObserver)进行非阻塞订阅。

这不起作用,因为 blockingGet 仍在执行并仍在阻止主线程,从而防止 firebase 事件回调的执行。 - akarnokd
@akarnokd 我同意 - 鉴于无法将执行器传递给 addListenerForSingleValueEvent,在 UI 线程上使用 blockingGet 将始终导致死锁。要么摆脱阻塞,要么从非 UI 线程调用问题中提出的所有代码。 - m.ostroverkhov
确实如此。根据 RxJava 问题列表上的讨论,可以看出该问题的源头是,楼主似乎因某种原因在阻塞主线程。 - akarnokd

1
由于您正在创建自己的Single,因此应在subscribeWith中使用DisposableSingleObserver。其次,您不应该那样调用blockingGet()。原因是默认情况下,您创建的Single或任何observable/Processor/Flowable将被订阅(在主线程上运行其操作)并观察主线程。BlockingGet()会导致主线程暂停。这就像在主线程上执行Thread.sleep()一样。这总是会以灾难告终。
对于您来说,最好的选择是重新思考要放入代码中的逻辑。由于Firebase操作本质上是异步的,因此您应该将代码适应异步模式。
无论如何,您可以尝试以下代码以实现看起来可能正在尝试的内容。请注意,我在此处编写了以下代码,因此可能存在语法错误。
   Single.create(new SingleOnSubscribe<String>() {
        // your firebase code
    @Override
        public void subscribe(SingleEmitter<String> e) throws Exception {

  FirebaseDatabase.getInstance().getReference("path").orderByChild("child").equalTo("xyz").addListenerForSingleValueEvent(new ValueEventListener() {
            @Override
            public void onDataChange(DataSnapshot dataSnapshot) {
            e.onSuccess("My String");
            }
            @Override
            public void onCancelled(DatabaseError databaseError) {
            e.onError(databaseError.toException());
            }
        });
       }
    })
    .subscribeOn(Schedular.io())
    .observeOn(AndroidThread.mainThread()) // if you aren't doing intensive/long running tasks on the data you got from firebase
    .subscribeWith(new DisposableSingleObserver<String>() { 
         public void onSuccess(String myString) {
               mMyString = myString;
         }
         public void onError(Throwable t) {
            Timber.e("error in fetching data from firebase: %s", t);
         }
     });

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接