RxJava中对缓冲区的错误理解

4
我正在尝试熟悉RxJava。这是我想要实现的用例:
我的屏幕上有一个按钮,我想收集点击次数。因此,如果用户点击按钮,则会注册单击并生成日志。现在,如果用户连续点击两次该按钮,则会注册两个点击事件,将其收集起来,并输出2而不是1。
基本上,我正在尝试在一段时间内累积点击次数,然后输出最终结果。我猜测 "buffer" 是我需要使用的方法。我在Android中快速编写了一个示例(代码如下),但是 buffer 方法似乎并不像只是收集所有事件输入并输出一个集合那么简单。
public class DemoFragment
    extends Fragment {

    private int _tapCount = 0;
    private Observable<List<Integer>> _bufferedObservable;
    private Observer<List<Integer>> _observer;

    @Override
    public void onActivityCreated(@Nullable Bundle savedInstanceState) {
        super.onActivityCreated(savedInstanceState);
        _setupLogger();

        _bufferedObservable = _getBufferedObservable();
        _observer = _getObserver();
    }


    // the library butterknife allows this
    @OnClick(R.id.btn_start_operation)
    public void onButtonTapped() {
        _log("GOT A TAP");
        _bufferedObservable.subscribeOn(Schedulers.io())
                         .observeOn(AndroidSchedulers.mainThread())
                         .subscribe(_observer);
    }

    private Observable<List<Integer>> _getBufferedObservable() {
        return Observable.create(new Observable.OnSubscribe<Integer>() {


            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onNext(1);   // send one tap here
            }

        }).buffer(2, TimeUnit.SECONDS); // collect all taps in the last 2s
    }

    private Observer<List<Integer>> _getObserver() {
        return new Observer<List<Integer>>() {


            @Override
            public void onCompleted() {
                _log(String.format("%d taps", _tapCount));
                _tapCount = 0; // reset tap count
            }

            @Override
            public void onError(Throwable e) {}

            @Override
            public void onNext(List<Integer> integers) {
                if (integers.size() > 0) {
                    for (int i : integers) {
                        _tapCount += i;
                    }
                    onCompleted();
                } else {
                    _log("No taps received");
                }
            }
        };
    }

    // ... other method that help wiring up the example (irrelevant to RxJava)
}

有人能帮助我理解我这里的误解吗?

问题1:我期望_getObserver()onNext向我发送一个包含累计点击次数的列表。因此,如果按钮被点击了5次,则我期望一个列表,其中有5个项目,每个项目的值为“1”。但是现有代码中,我总是得到一个空列表。

问题2:如果List<Integer> integers大小不为空,我会在控制台记录日志,说“未收到敲击”。似乎Observable永远不会停止。就像一个计时器一样,即使没有注册按钮点击,它也会不断地继续下去。是否有方法在最后10秒内没有注册事件时停止Observable?

问题3:发射的数量似乎呈几乎指数增长。就像它从所有以前的时间收集空按钮敲击一样。


哪一个?首先,“Buffer”不是运算符。 - user207421
RxJava测试表明缓冲区是一个操作符。 - KG -
1
不是这样的。似乎有一个叫做“操作符缓冲区”的东西,但这并不意味着“缓冲区是我应该使用的操作符”。在这种情况下,buffer()是一个方法 - user207421
3个回答

8

以下是我的代码示例(假设您的按钮id为R.id.rx_button):

private Subscription mSubscription;

@Override
protected void onResume() {
    super.onResume();
    mSubscription = Observable.create(new OnSubscribe<Integer>() {
        @Override
        public void call(Subscriber<? super Integer> subscriber) {
            findViewById(R.id.rx_button).setOnClickListener(new OnClickListener() {
                @Override
                public void onClick(View v) {
                    subscriber.onNext(1);
                }
            });
        }
    }).buffer(2, TimeUnit.SECONDS)
            .subscribe(new Action1<List<Integer>>() {
                @Override
                public void call(List<Integer> integers) {
                    Log.i("TAG", String.valueOf(integers.size()));
                }
            });
}

@Override
protected void onPause() {
    super.onPause();
    mSubscription.unsubscribe();
}

简单来说,只需在call方法上实现OnClickListener接口,这样你就可以在其中使用subscriber对象。

如果使用lambda表达式(可以看看Retrolambda项目),onResume会更好看:

@Override
protected void onResume() {
    super.onResume();
    mSubscription = Observable.create((Subscriber<? super Integer> subscriber) ->
            findViewById(R.id.rx_button).setOnClickListener(view ->
                    subscriber.onNext(1))).buffer(2, TimeUnit.SECONDS)
            .subscribe(integers -> Log.i("TAG", String.valueOf(integers.size())));
}

谢谢@tomrozb,我终于尝试了一下。我也用另一种方式(使用PublishSubject)使其工作,但你对于这个特定情况的解决方案绝对更加优雅。 - KG -

1
我认为最好使用很棒的 rx-binding 库来解决这个问题。我认为这个解决方案更加干净,特别是使用 lambda 表达式。我使用了 filter 操作符,因此我只在 onNext() 中获取用户在给定的 2 秒内按下超过 5 次的结果。
我基于一个例子 (RxJava-Android-Samples) 来实现,该例子使用了一个旧的 rx-binding 依赖项。
@Override
public void onResume() {
    super.onResume();
    mClickSubscription = getBufferedSubscription();
}

@Override 
public void onPause() {
    super.onPause();
    mClickSubscription.unsubscribe();
} 

private Subscription getBufferedSubscription() {
    return RxView.clicks(rx_button)
            .map(aVoid -> 1)
            .buffer(2, TimeUnit.SECONDS)
            .filter(integers -> integers.size() > 5)
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Observer<List<Integer>>() {

                @Override
                public void onCompleted() {
                    // fyi: you'll never reach here
                }

                @Override
                public void onError(Throwable e) {
                    SLog.i("Dang error! check your logs");
                }

                @Override
                public void onNext(List<Integer> integers) {
                    SLog.i(String.format("%d taps", integers.size()));

                }
            });
}

0
尝试下面的代码:
public class BufferExampleActivity extends AppCompatActivity {

    private static final String TAG = BufferExampleActivity.class.getSimpleName();
    Button btn;
    TextView textView;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_example);
        btn = (Button) findViewById(R.id.btn);
        textView = (TextView) findViewById(R.id.textView);

        btn.setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View view) {
                doSomeWork();
            }
        });
    }

    /*
     * simple example using buffer operator - bundles all emitted values into a list
     */
    private void doSomeWork() {

        Observable<List<String>> buffered = getObservable().buffer(3, 1);

        // 3 means,  it takes max of three from its start index and create list
        // 1 means, it jumps one step every time
        // so the it gives the following list
        // 1 - one, two, three
        // 2 - two, three, four
        // 3 - three, four, five
        // 4 - four, five
        // 5 - five

        buffered.subscribe(getObserver());
    }

    private Observable<String> getObservable() {
        return Observable.just("one", "two", "three", "four", "five");
    }

    private Observer<List<String>> getObserver() {
        return new Observer<List<String>>() {

            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, " onSubscribe : " + d.isDisposed());
            }

            @Override
            public void onNext(List<String> stringList) {
                textView.append(" onNext size : " + stringList.size());
                textView.append(AppConstant.LINE_SEPARATOR);
                Log.d(TAG, " onNext : size :" + stringList.size());
                for (String value : stringList) {
                    textView.append(" value : " + value);
                    textView.append(AppConstant.LINE_SEPARATOR);
                    Log.d(TAG, " : value :" + value);
                }

            }

            @Override
            public void onError(Throwable e) {
                textView.append(" onError : " + e.getMessage());
                textView.append(AppConstant.LINE_SEPARATOR);
                Log.d(TAG, " onError : " + e.getMessage());
            }

            @Override
            public void onComplete() {
                textView.append(" onComplete");
                textView.append(AppConstant.LINE_SEPARATOR);
                Log.d(TAG, " onComplete");
            }
        };
    }
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接