Rx - 按条件将流分成段(列表)

9
我有一个RX生产者,它创建了一个字符串流,像这样(实际流的简化版本):
A1 A2 A3 B1 B2 C1 C2 C3 C4 C5 C6....
该流是无休止的,但是有序的。因此,在以“ A”开头的字符串用完后,“ B”开始。当B运行完后,C开始...当Z用完后,我们移动到AA1等。A,B等字母的数量未知,但通常每个字母有10-30个实例。
我正在寻找一种将此流分成所有A的块:A1 A2 A3,所有B的块:B1 B2,所有C的块:C1 C2 C3 C4 C5 C6等的方法。每个块可以是可观察的(我将其转换为列表),也可以只是一个列表。
我尝试了使用RxJava的几种不同方法,但所有都失败了。以下是一些不起作用的事情:
- Group by:由于流是无休止的,每个字母的observable不会完成,因此当A用完而B开始时,A的Observable不会完成。因此,observable的数量不断增加。 - Window/Buffer with distinctUntilChanged - 我在原始流上使用“distinctUntilChanged”来输出每个组的第一个项(第一个A、第一个B等)。然后,我使用该流作为输入到窗口或“buffer”操作符中,用作窗口/缓冲区之间的边界。那没用,所有我得到的只是空列表。
有什么正确的RX解决方案吗? 我更喜欢Java解决方案,但也非常欢迎可以轻松转换为Java的其他RX实现的解决方案。
4个回答

6
你可以使用rxjava-extras.toListWhile方法:
Observable<String> source = 
    Observable.just("A1", "A2", "A3", "B1", "B2", "B3", "C1", "D1");
source.compose(Transformers.<String> toListWhile(
            (list, t) -> list.isEmpty() 
                         || list.get(0).charAt(0) == t.charAt(0)))
      .forEach(System.out::println);

它在幕后执行了@akarnokd所做的事情,并且经过了单元测试。

@dave-moten 我尝试过了,但是我遇到了构建错误。 "不存在类型变量R的实例,所以Transformer <String,List <String >>符合ObservableTransformer <String,R>"。我正在使用JDK 1.8.0_131与0.8.0.7。 - melston
@dave-moten,没事了。我使用的是rxjava2,这导致了问题。 - melston

4
这是我解决这个问题的方法:
Observable<String> source = Observable.from(
        "A1", "A2", "A3", "B1", "B2", "B3", "C1", "D1");

Observable<List<String>> output = Observable.defer(() -> {
    List<String> buffer = new ArrayList<>();
    return 
            Observable.concat(
                source.concatMap(new Function<String, Observable<List<String>>>() {
                    String lastKey;
                    @Override
                    public Observable<List<String>> apply(String t) {
                        String key = t.substring(0, 1);
                        if (lastKey != null && !key.equals(lastKey)) {
                            List<String> b = new ArrayList<>(buffer);
                            buffer.clear();
                            buffer.add(t);
                            lastKey = key;
                            return Observable.just(b);
                        }
                        lastKey = key;
                        buffer.add(t);
                        return Observable.empty();
                    }
                }),
                Observable.just(1)
                .flatMap(v -> {
                    if (buffer.isEmpty()) {
                        return Observable.empty();
                    }
                    return Observable.just(buffer);
                })
            );
    }
);

output.subscribe(System.out::println);

它是这样工作的:

  • 我使用defer,因为我们需要每个订阅者的缓冲区,而不是全局的
  • 如果源是有限的,我将缓冲区与最后一个缓冲区的发射连接起来
  • 我使用concatMap并添加到缓冲区,直到键改变,此时,我发出空的Observables。一旦键改变,我就发出缓冲区的内容并开始一个新的缓冲区。

我喜欢这个解决方案,我想在 .net 中进行翻译。是否有类似于 ConcatMap 的等效方法? - frhack
1
最接近的方法是使用Concat(Select())或者ConcatMany,它们来自于我的.NET库,扩展了Rx.NET。 - akarnokd

1

在查看akarnokdDave的答案后,我通过实现一个自定义的Rx操作符BufferWhile提出了自己的解决方案。它似乎与其他解决方案一样有效(如果我错了,请有人纠正我),但它似乎更加直观:

public class RxBufferWhileOperator<T, U> implements Operator<List<T>, T>{

    private final Func1<? super T, ? extends U> keyGenerator;

    public RxBufferWhileOperator(Func1<? super T, ? extends U> keyGenerator) {
        this.keyGenerator = keyGenerator;
    }

    @Override
    public Subscriber<? super T> call(final Subscriber<? super List<T>> s) {
        return new Subscriber<T>(s) {

            private ArrayList<T> buffer = null;
            private U currentKey = null;

            @Override
            public void onCompleted() {
                submitAndClearBuffer();
                s.onCompleted();
            }

            @Override
            public void onError(Throwable e) {
                submitAndClearBuffer(); //Optional, remove if submitting partial buffers doesn't make sense in your case
                s.onError(e);
            }

            @Override
            public void onNext(T t) {
                if (currentKey == null || !currentKey.equals(keyGenerator.call(t))) {
                    currentKey = keyGenerator.call(t);
                    submitAndClearBuffer();
                    buffer.add(t);
                } else {
                    buffer.add(t);
                    request(1); // Request additional T since we "swallowed" the incoming result without calling subsequent subscribers
                }
            }

            private void submitAndClearBuffer() {
                if (buffer != null && buffer.size() > 0) {
                    s.onNext(buffer);
                }
                buffer = new ArrayList<>();
            }
        };
    }
}

我可以使用 lift 在原始 observable 上应用此操作符,并获得一个发出字符串列表的 observable。

1
一些注释:1)通常我们不会在onError时发送部分缓冲区,但这取决于您。2)该操作符不能正确处理背压,并且遭受所谓的丢弃值效应;对于每个T,您都不会发出一个List<T>,因此如果计数不匹配,则下游无法请求更多。您应该在else子句中调用request(1)来要求补充。 - akarnokd
@akarnokd 谢谢您的建议。我会编辑代码。 - Malt
@akarnokd 我已经处理了这两个备注。现在我会在else语句中缓冲然后请求,我还添加了一条注释,说明提交部分缓冲是可选的。 - Malt

0
假设我们有一个源字符串流source和一个函数key,用于提取每个字符串的关键字,例如以下内容:
IObservable<string> source = ...;
Func<string, string> key = s => new string(s.TakeWhile(char.IsLetter).ToArray());

然后我们可以使用带有自定义关闭选择器的Buffer

var query = source.Publish(o => o.Buffer(() =>
{
    var keys = o.Select(key);
    return Observable
        .CombineLatest(
            keys.Take(1),
            keys.Skip(1),
            (a, b) => a != b)
        .Where(x => x);
}));

每个缓冲区的结束都是在缓冲区中的第一项和当前正在考虑添加到缓冲区的项目具有不同键时。

这个不正常:它会向缓冲区添加一个额外的值。 - frhack

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接