我有一个RX生产者,它创建了一个字符串流,像这样(实际流的简化版本):
A1 A2 A3 B1 B2 C1 C2 C3 C4 C5 C6....
该流是无休止的,但是有序的。因此,在以“ A”开头的字符串用完后,“ B”开始。当B运行完后,C开始...当Z用完后,我们移动到AA1等。A,B等字母的数量未知,但通常每个字母有10-30个实例。
我正在寻找一种将此流分成所有A的块:A1 A2 A3,所有B的块:B1 B2,所有C的块:C1 C2 C3 C4 C5 C6等的方法。每个块可以是可观察的(我将其转换为列表),也可以只是一个列表。
我尝试了使用RxJava的几种不同方法,但所有都失败了。以下是一些不起作用的事情:
- Group by:由于流是无休止的,每个字母的observable不会完成,因此当A用完而B开始时,A的Observable不会完成。因此,observable的数量不断增加。 - Window/Buffer with distinctUntilChanged - 我在原始流上使用“distinctUntilChanged”来输出每个组的第一个项(第一个A、第一个B等)。然后,我使用该流作为输入到窗口或“buffer”操作符中,用作窗口/缓冲区之间的边界。那没用,所有我得到的只是空列表。
有什么正确的RX解决方案吗? 我更喜欢Java解决方案,但也非常欢迎可以轻松转换为Java的其他RX实现的解决方案。
A1 A2 A3 B1 B2 C1 C2 C3 C4 C5 C6....
该流是无休止的,但是有序的。因此,在以“ A”开头的字符串用完后,“ B”开始。当B运行完后,C开始...当Z用完后,我们移动到AA1等。A,B等字母的数量未知,但通常每个字母有10-30个实例。
我正在寻找一种将此流分成所有A的块:A1 A2 A3,所有B的块:B1 B2,所有C的块:C1 C2 C3 C4 C5 C6等的方法。每个块可以是可观察的(我将其转换为列表),也可以只是一个列表。
我尝试了使用RxJava的几种不同方法,但所有都失败了。以下是一些不起作用的事情:
- Group by:由于流是无休止的,每个字母的observable不会完成,因此当A用完而B开始时,A的Observable不会完成。因此,observable的数量不断增加。 - Window/Buffer with distinctUntilChanged - 我在原始流上使用“distinctUntilChanged”来输出每个组的第一个项(第一个A、第一个B等)。然后,我使用该流作为输入到窗口或“buffer”操作符中,用作窗口/缓冲区之间的边界。那没用,所有我得到的只是空列表。
有什么正确的RX解决方案吗? 我更喜欢Java解决方案,但也非常欢迎可以轻松转换为Java的其他RX实现的解决方案。
不存在类型变量R的实例,所以Transformer <String,List <String >>符合ObservableTransformer <String,R>
"。我正在使用JDK 1.8.0_131与0.8.0.7。 - melston