如何创建一个包装流(stream)的流,可以转换一个流 (transform a stream)?

4
var incomingStream = ...
var outgoingStream = ...

await incomingStream.CopyToAsync(outgoingStream);

以上代码非常简单,它将传入的流复制到出站流中。这两个流都是通过互联网进行分块传输。

现在,假设我想使用像 Func<Stream, Stream, Task> 这样的东西来转换流,我该如何在不读取所有数据的情况下实现呢?

当然,我可以直接这样做:

var ms = new MemoryStream();
incomingStream.CopyTo(ms);

--- do transform of streams and seek
ms.CopyTo(outgoingStream)

但是那将会把整个东西读入内存,是否有任何内置内容可以让我从传入流中读取并写入新流,而不是缓冲所有数据,而是仅保留一个小的内部流以用于缓冲数据,并且在再次从传入流中获取数据之前,它不会读取。

我想做的是:

    protected async Task XmlToJsonStream(Stream instream, Stream outStream)
    {
        XmlReaderSettings readerSettings = new XmlReaderSettings();
        readerSettings.IgnoreWhitespace = false;
        var reader = XmlReader.Create(instream, readerSettings);
        var jsonWriter = new JsonTextWriter(new StreamWriter(outStream));
        jsonWriter.WriteStartObject();

        while (await reader.ReadAsync())
        {
            jsonWriter.writeReader(reader);
        }
        jsonWriter.WriteEndObject();
        jsonWriter.Flush();
    }
    protected async Task XmlFilterStream(Stream instream, Stream outStream)
    {
        XmlReaderSettings readerSettings = new XmlReaderSettings();
        readerSettings.IgnoreWhitespace = false;
        var reader = XmlReader.Create(instream, readerSettings);
        var writer = XmlWriter.Create(outStream, new XmlWriterSettings { Async = true, CloseOutput = false })

        while (reader.Read())
        {
            writer.writeReader(reader);
        }


    }

但我不知道如何连接它。

var incomingStream = ...
var outgoingStream = ...
var temp=...  
XmlFilterStream(incomingStream,temp);
XmlToJsonStream(temp,outgoingstream);

因为如果我使用MemoryStream作为临时存储,那么在结束时它不会将所有内容都存储在流中。我正在寻找一个在读取完数据后可以丢弃数据的流。
以上所有内容仅是示例代码,当然缺少一些处理和查找,但我希望我成功地说明了我的意图。根据设置能够在复制流、进行XML过滤和可选转换为JSON之间进行插拔。
2个回答

2

流是字节序列,所以流转换将类似于Func<ArraySegment<byte>, ArraySegment<byte>>。然后可以以流方式应用它:

async Task TransformAsync(this Stream source, Func<ArraySegment<byte>, ArraySegment<byte>> transform, Stream destination, int bufferSize = 1024)
{
  var buffer = new byte[bufferSize];
  while (true)
  {
    var bytesRead = await source.ReadAsync(buffer, 0, bufferSize);
    if (bytesRead == 0)
      return;
    var bytesToWrite = transform(new ArraySegment(buffer, 0, bytesRead));
    if (bytesToWrite.Count != 0)
      await destination.WriteAsync(bytesToWrite.Buffer, bytesToWrite.Offset, bytesToWrite.Count);
  }
}

这比较复杂,但大致思路是这样的。需要一些逻辑来确保WriteAsync写入所有字节;通常还需要一个“flush”方法,除了调用transform方法之外,当源流完成时,该方法也会被调用,因此转换算法有最后机会返回其最终数据以写入输出流。
如果您想要其他类型的流,例如XML或JSON类型,则最好使用Reactive Extensions

我看到这种方式更加简洁,但是使用流(如我的示例)可以使用XmlReader/Writer和JsonReader/Writers进行转换。我会再考虑一下设计。 - Poul K. Sørensen
1
@PoulK.Sørensen 你有没有找到解决方法?我也处于类似的情况下。试图使用 SqlClient 流式传输,它只需要一个“流”作为参数,但我想在进入数据库之前压缩原始流。因此,我希望将原始流(即 FileStream)与 GZipStream 包装起来,以便每次 SqlClient 流式传输启动 ReadAsync 时,我的包装器流都会首先从底层流读取数据,压缩数据,然后返回压缩后的字节到 SqlClient 流式传输中。 - Terry

0

我不确定我完全理解你的问题,但我认为你正在问如何在没有将输入流完全加载到内存中的情况下对其进行操作。

在这种情况下,您不希望像这样做:

var ms = new MemoryStream();
incomingStream.CopyTo(ms);

这个操作确实会将整个输入流incomingStream加载到内存中,即ms

从我所看到的情况来看,你的XmlFilterStream方法似乎是多余的,也就是说,XmlToJsonStream已经包含了XmlFilterStream的所有功能。

为什么不直接使用以下代码:

protected async Task XmlToJsonStream(Stream instream, Stream outStream)
{
    XmlReaderSettings readerSettings = new XmlReaderSettings();
    readerSettings.IgnoreWhitespace = false;
    var reader = XmlReader.Create(instream, readerSettings);
    var jsonWriter = new JsonTextWriter(new StreamWriter(outStream));
    jsonWriter.WriteStartObject();

    while (await reader.ReadAsync())
    {
        jsonWriter.writeReader(reader);
    }
    jsonWriter.WriteEndObject();
    jsonWriter.Flush();
}

然后像这样调用:

var incomingStream = ...
var outgoingStream = ...
XmlToJsonStream(incomingStream ,outgoingstream);

如果你遗漏了一些重要的细节在XmlFilterStream中,那么我建议你将它们整合到一个XmlToJsonStream函数中,但在没有看到这些细节之前,我无法确定答案。

抱歉之前漏掉了太多内容。XmlFilterStream 不仅可以读写,还可以读取 XML,如果包含(Include)读取器,则会将其写入(writer(reader))。但我明白你的意思。 - Poul K. Sørensen

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接