Rx 很棒,但有时候很难找到优雅的方法来做某件事情。这个想法非常简单。我接收到带有 byte[] 的事件,这个数组可能包含一行的一部分、多行或一整行。我想找到一种方式,可以得到一个 IObservable 的 Line,像这样:
经过几个小时的尝试,最接近的解决方案还是相当丑陋的,而且当然行不通,因为扫描会在每个字符上触发 OnNext:
注意:`_reactiveSubcription` 应该是 `IObservable<String>`。
不考虑字符编码问题,我需要做哪些才能使其正常工作?
IObservable<String>
,其中序列的每个元素都是一行。经过几个小时的尝试,最接近的解决方案还是相当丑陋的,而且当然行不通,因为扫描会在每个字符上触发 OnNext:
//Intermediate subject use to transform byte[] into char
var outputStream = new Subject<char>();
_reactiveSubcription = outputStream
//Scan doesn't work it trigger OnNext on every char
//Aggregate doesn't work neither as it doesn't return intermediate result
.Scan(new StringBuilder(), (builder, c) => c == '\r' ? new StringBuilder() : builder.Append((char)c))
.Subscribe(this);
Observable.FromEventPattern<ShellDataEventArgs>(shell, "DataReceived")
//Data is a byte[]
.Select(_ => _.EventArgs.Data)
.Subscribe(array => array.ToObservable()
//Convert into char
.ForEach(c => outputStream.OnNext((char)c)));
注意:`_reactiveSubcription` 应该是 `IObservable<String>`。
不考虑字符编码问题,我需要做哪些才能使其正常工作?