将IObservable<byte[]>拆分为字符,然后再拆分为行

3
Rx 很棒,但有时候很难找到优雅的方法来做某件事情。这个想法非常简单。我接收到带有 byte[] 的事件,这个数组可能包含一行的一部分、多行或一整行。我想找到一种方式,可以得到一个 IObservable 的 Line,像这样:IObservable<String>,其中序列的每个元素都是一行。
经过几个小时的尝试,最接近的解决方案还是相当丑陋的,而且当然行不通,因为扫描会在每个字符上触发 OnNext:
//Intermediate subject use to transform byte[] into char
var outputStream = new Subject<char>();
_reactiveSubcription = outputStream
    //Scan doesn't work it trigger OnNext on every char
    //Aggregate doesn't work neither as it doesn't return intermediate result
    .Scan(new StringBuilder(), (builder, c) => c == '\r' ? new StringBuilder() : builder.Append((char)c))
    .Subscribe(this);


Observable.FromEventPattern<ShellDataEventArgs>(shell, "DataReceived")
            //Data is a byte[]
            .Select(_ => _.EventArgs.Data)
            .Subscribe(array => array.ToObservable()
            //Convert into char
            .ForEach(c => outputStream.OnNext((char)c)));

注意:`_reactiveSubcription` 应该是 `IObservable<String>`。
不考虑字符编码问题,我需要做哪些才能使其正常工作?

你试图在一个优雅的块中完成的事情,在我的真实世界站点中从未奏效。你如何知道byte[]是否全部接收或其结束字节在哪里?即,2K消息是否会在一条线路的两个不同事件中以两个1K块的形式到达。我总是必须将字节弹出到一个理论上无限的队列中,并在其中查找我的NewLine或CrLf,然后将单个行作为NewLines和CrLf字符显示时拉出来。然后,如果MaxLineSize值中没有NewLine或CrLf出现,我就必须放置安全代码来清除已构建的数组。 - Sql Surfer
1个回答

7

这对我有用。

首先,将byte[]转换为字符串,并在\r上拆分字符串(正则表达式拆分保留分隔符)。

现在有一串字符串,其中一些以\r结尾。

然后连接它们以保持顺序。另外,由于strings需要在下一步中使用,因此发布它们。

var strings = bytes.
  Select(arr => (Regex.Split(Encoding.Default.GetString(arr, 0, arr.Length - 1), "(\r)")).
    Where(s=> s.Length != 0).
    ToObservable()).
  Concat().
  Publish().
  RefCount();

创建一个字符串窗口,当一个字符串以 \r 结尾时结束。由于 strings 既用于窗口内容,也用于窗口结束的触发器,因此需要保持其热度。
var linewindows = strings.Window(strings.Where(s => s.EndsWith("\r")));

将每个窗口聚合成一个字符串。

var lines = linewindows.SelectMany(w => w.Aggregate((l, r) => l + r));

lines 是一个 IObservable<String>,每个字符串包含一行。

为了测试这个,我使用了以下生成器来产生 IObservable<byte[]>

var bytes = Observable.
Range(1, 10).
SelectMany(i => Observable.
    Return((byte)('A' + i)).
    Repeat(24).
    Concat(Observable.
        Return((byte)'\r'))).
Window(17).
SelectMany(w => w.ToArray());

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接