RxJava事件总线

5

我曾使用过第一版的RxJavaRxAndroid,并创建了以下class作为EventBus:

public class RxBus {
private static RxBus instance;
private PublishSubject<Object> subject = PublishSubject.create();

public static RxBus instanceOf() {
    if (instance == null) {
        instance = new RxBus();
    }
    return instance;
}

public void setMessage(Object object) {
    subject.onNext(object);
}

public Observable<Object> getEvents() {
    return subject;
}
}

在任何类中使用instanceOf获取实例,我使用setMessage方法来发出消息,并使用以下代码来获取已发出的消息:

  bus.getEvents().subscribe(new Action1<Object>() {
        @Override
        public void call(Object o) {
            if (o instanceof String) {
                //TODO
            }
        }
    });

Action1来自于rx.functions包。尝试迁移到使用RxJava 2时,我无法导入它。

请告诉我,使用RxJava 2作为EventBus的最简便方法是什么?

2个回答

6
在RxJava2中,Action1已被重命名为Consumer。剩下的动作接口根据Java 8函数类型进行命名。无参数的Action0io.reactivex.functions.Action替换用于操作符,而Scheduler方法则使用java.lang.RunnableAction1已更名为ConsumerAction2称为BiConsumerActionN已被Consumer<Object[]>类型声明所替代。请参阅2.0中的不同之处

0

这是在RxJava2中实现事件总线的好方法(代码来自于此gist

import io.reactivex.Observable;
import io.reactivex.subjects.PublishSubject;

public class RxBus {
    private static volatile RxBus sRxBus = null;
    private PublishSubject<Object> mPublishSubject = PublishSubject.create();

    private RxBus() {
    }

    public static RxBus getInstance() {
        if (sRxBus == null) {
            synchronized (RxBus.class) {
                if (sRxBus == null) {
                    sRxBus = new RxBus();
                }
            }
        }
        return sRxBus;
    }

    public <T> Observable<T> subscribe(Class<T> cls) {
        return mPublishSubject
                .filter(o -> o.getClass().equals(cls))
                .map(o -> (T) o);
    }

    public void post(Object obj) {
        mPublishSubject.onNext(obj);
    }
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接