不复制字符串将其作为流读取

10
我有一些数据存储在字符串中。我有一个以流为输入的函数。我想将我的数据提供给该函数而不必将整个字符串复制到流中。实际上,我正在寻找一种可以包装字符串并从中读取的流类。
目前我在网上看到的唯一建议是使用StringReader,但它不是流,或者创建一个内存流并写入它,这意味着要复制数据。我可以编写自己的流对象,但麻烦的部分是处理编码,因为流处理的是字节。有没有一种方法可以在不编写新流类的情况下完成这个任务?
我正在BizTalk中实现管道组件。BizTalk完全使用流来处理所有内容,因此您总是将东西传递给BizTalk。BizTalk会始终以小块读取流,因此如果我可以按照BizTalk所需的方式从流中读取数据,则将整个字符串复制到流中就没有意义(特别是如果字符串很大)。

你是否意识到Stream只能复制数据?(例如,复制到提供给Read的数组中)。 - Peter Ritchie
1
你将不得不在某个时候解码字符串,以获取所需编码的实际字节。无论你是否愿意,你都必须要复制一份。 - Jeff Mercado
@Peter Ritchie - 或许我没有恰当地措辞问题,但通常它会通过read方法以小块的形式发生。使用内存流会一次性复制整个字符串。 - Jeremy
如果你正在寻找减少可能被发送/接收的数据复制的方法,你可能想看看 ArraySegment http://msdn.microsoft.com/en-us/library/1hsbd92d(v=vs.110).aspx - Peter Ritchie
1
如果您在管道组件中,有没有一种方法可以不首先创建字符串? - Johns-305
4个回答

8

这里有一个适当的StringReaderStream,但存在以下缺点:

  • Read的缓冲区大小必须至少为maxBytesPerChar。可以通过保持内部一个字符buff = new byte[maxBytesPerChar]来实现对小缓冲区的Read。但对于大多数用途来说并不是必要的。
  • 没有Seek,虽然可以进行搜索,但通常会非常棘手。(某些搜索情况,如搜索到开头、搜索到结尾,很容易实现。)
/// <summary>
/// Convert string to byte stream.
/// <para>
/// Slower than <see cref="Encoding.GetBytes()"/>, but saves memory for a large string.
/// </para>
/// </summary>
public class StringReaderStream : Stream
{
    private string input;
    private readonly Encoding encoding;
    private int maxBytesPerChar;
    private int inputLength;
    private int inputPosition;
    private readonly long length;
    private long position;

    public StringReaderStream(string input)
        : this(input, Encoding.UTF8)
    { }

    public StringReaderStream(string input, Encoding encoding)
    {
        this.encoding = encoding ?? throw new ArgumentNullException(nameof(encoding));
        this.input = input;
        inputLength = input == null ? 0 : input.Length;
        if (!string.IsNullOrEmpty(input))
            length = encoding.GetByteCount(input);
            maxBytesPerChar = encoding == Encoding.ASCII ? 1 : encoding.GetMaxByteCount(1);
    }

    public override bool CanRead => true;

    public override bool CanSeek => false;

    public override bool CanWrite => false;

    public override long Length => length;

    public override long Position
    {
        get => position;
        set => throw new NotImplementedException();
    }

    public override void Flush()
    {
    }

    public override int Read(byte[] buffer, int offset, int count)
    {
        if (inputPosition >= inputLength)
            return 0;
        if (count < maxBytesPerChar)
            throw new ArgumentException("count has to be greater or equal to max encoding byte count per char");
        int charCount = Math.Min(inputLength - inputPosition, count / maxBytesPerChar);
        int byteCount = encoding.GetBytes(input, inputPosition, charCount, buffer, offset);
        inputPosition += charCount;
        position += byteCount;
        return byteCount;
    }

    public override long Seek(long offset, SeekOrigin origin)
    {
        throw new NotImplementedException();
    }

    public override void SetLength(long value)
    {
        throw new NotImplementedException();
    }

    public override void Write(byte[] buffer, int offset, int count)
    {
        throw new NotImplementedException();
    }
}

4
虽然最初标记了 ,但是在 .NET 5 中引入了 Encoding.CreateTranscodingStream,这可以很容易地完成:

创建一个流,用于在内部编码和外部编码之间进行数据转换,类似于 Convert(Encoding, Encoding, Byte[])

诀窍在于定义一个底层的 UnicodeStream 直接访问 string 的字节,然后将其包装在转码流中以使用所需的编码呈现流内容。
以下类和扩展方法可以完成此任务:
public static partial class TextExtensions
{
    public static Encoding PlatformCompatibleUnicode => BitConverter.IsLittleEndian ? Encoding.Unicode : Encoding.BigEndianUnicode;
    static bool IsPlatformCompatibleUnicode(this Encoding encoding) => BitConverter.IsLittleEndian ? encoding.CodePage == 1200 : encoding.CodePage == 1201;
    
    public static Stream AsStream(this string @string, Encoding encoding = default) => 
        (@string ?? throw new ArgumentNullException(nameof(@string))).AsMemory().AsStream(encoding);
    public static Stream AsStream(this ReadOnlyMemory<char> charBuffer, Encoding encoding = default) =>
        ((encoding ??= Encoding.UTF8).IsPlatformCompatibleUnicode())
            ? new UnicodeStream(charBuffer)
            : Encoding.CreateTranscodingStream(new UnicodeStream(charBuffer), PlatformCompatibleUnicode, encoding, false);
}

sealed class UnicodeStream : Stream
{
    const int BytesPerChar = 2;

    // By sealing UnicodeStream we avoid a lot of the complexity of MemoryStream.
    ReadOnlyMemory<char> charMemory;
    int position = 0;
    Task<int> _cachedResultTask; // For async reads, avoid allocating a Task.FromResult<int>(nRead) every time we read.

    public UnicodeStream(string @string) : this((@string ?? throw new ArgumentNullException(nameof(@string))).AsMemory()) { }
    public UnicodeStream(ReadOnlyMemory<char> charMemory) => this.charMemory = charMemory;

    public override int Read(Span<byte> buffer)
    {
        EnsureOpen();
        var charPosition = position / BytesPerChar;
        // MemoryMarshal.AsBytes will throw on strings longer than int.MaxValue / 2, so only slice what we need. 
        var byteSlice = MemoryMarshal.AsBytes(charMemory.Slice(charPosition, Math.Min(charMemory.Length - charPosition, 1 + buffer.Length / BytesPerChar)).Span);
        var slicePosition = position % BytesPerChar;
        var nRead = Math.Min(buffer.Length, byteSlice.Length - slicePosition);
        byteSlice.Slice(slicePosition, nRead).CopyTo(buffer);
        position += nRead;
        return nRead;
    }

    public override int Read(byte[] buffer, int offset, int count) 
    {
        ValidateBufferArgs(buffer, offset, count);
        return Read(buffer.AsSpan(offset, count));
    }

    public override int ReadByte()
    {
        // Could be optimized.
        Span<byte> span = stackalloc byte[1];
        return Read(span) == 0 ? -1 : span[0];
    }

    public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
    {
        EnsureOpen();
        if (cancellationToken.IsCancellationRequested) 
            return ValueTask.FromCanceled<int>(cancellationToken);
        try
        {
            return new ValueTask<int>(Read(buffer.Span));
        }
        catch (Exception exception)
        {
            return ValueTask.FromException<int>(exception);
        }   
    }
    
    public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
    {
        ValidateBufferArgs(buffer, offset, count);
        var valueTask = ReadAsync(buffer.AsMemory(offset, count));
        if (!valueTask.IsCompletedSuccessfully)
            return valueTask.AsTask();
        var lastResultTask = _cachedResultTask;
        return (lastResultTask != null && lastResultTask.Result == valueTask.Result) ? lastResultTask : (_cachedResultTask = Task.FromResult<int>(valueTask.Result));
    }

    void EnsureOpen()
    {
        if (position == -1)
            throw new ObjectDisposedException(GetType().Name);
    }
    
    // https://learn.microsoft.com/en-us/dotnet/api/system.io.stream.flush?view=net-5.0
    // In a class derived from Stream that doesn't support writing, Flush is typically implemented as an empty method to ensure full compatibility with other Stream types since it's valid to flush a read-only stream.
    public override void Flush() { }
    public override Task FlushAsync(CancellationToken cancellationToken) => cancellationToken.IsCancellationRequested ? Task.FromCanceled(cancellationToken) : Task.CompletedTask;
    public override bool CanRead => true;
    public override bool CanSeek => false;
    public override bool CanWrite => false;
    public override long Length => throw new NotSupportedException();
    public override long Position { get => throw new NotSupportedException(); set => throw new NotSupportedException(); }
    public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
    public override void SetLength(long value) => throw new NotSupportedException();
    public override void Write(byte[] buffer, int offset, int count) =>  throw new NotSupportedException();
    
    protected override void Dispose(bool disposing)
    {
        try 
        {
            if (disposing) 
            {
                _cachedResultTask = null;
                charMemory = default;
                position = -1;
            }
        }
        finally 
        {
            base.Dispose(disposing);
        }
    }   
    
    static void ValidateBufferArgs(byte[] buffer, int offset, int count)
    {
        if (buffer == null)
            throw new ArgumentNullException(nameof(buffer));
        if (offset < 0 || count < 0)
            throw new ArgumentOutOfRangeException();
        if (count > buffer.Length - offset)
            throw new ArgumentException();
    }
}   

注释:

  • 您可以从一个string、一个char []数组或它们的一部分流式处理,方法是将它们转换为ReadOnlyMemory<char>缓冲区。这个转换只是简单地包装了底层的字符串或数组内存,而不会分配任何内存。

  • 使用Encoding.GetBytes()来编码字符串块的解决方案是有问题的,因为它们不能处理分散在块之间的代理对。为了正确处理代理对,必须首先调用Encoding.GetEncoder()来保存一个Encoder。然后,可以使用Encoder.GetBytes(ReadOnlySpan<Char>, Span<Byte>, flush: false)来进行编码并在调用之间记住状态。

    (Microsoft的TranscodingStream做得很好。)

  • 使用Encoding.Unicode可以获得最佳性能,因为(在几乎所有的.NET平台上)这种编码与String类型本身的编码相同。

    当提供与平台兼容的Unicode编码时,不需要使用TranscodingStream,返回的Stream直接从字符数据缓冲区中读取。

  • 待完成:

    • 在大端平台上进行测试(这是罕见的)。
    • 测试长度超过int.MaxValue / 2的字符串。

演示fiddle,包括一些基本测试 这里


1
你可以避免维护整个内容的副本,但你必须使用一种编码方式,使每个字符都产生相同数量的字节。这样,你就可以通过Encoding.GetBytes(str, strIndex, byteCount, byte[], byteIndex)提供数据块,以便在请求时直接读入读取缓冲区。
每个Stream.Read()操作始终会有一个复制操作,因为它允许调用者提供目标缓冲区。

1
如果要转换的数据仅以连续块的形式可用(例如从流中读取的数据),或者如果数据量太大需要分成较小的块,则应用程序应使用派生类的GetDecoder方法或GetEncoder方法提供的解码器或编码器。 - Ben Voigt

-1

Stream 只能 复制 数据。此外,它处理的是 byte 而不是 char,因此您必须通过解码过程来复制数据。但是,如果您想将字符串视为 ASCII 字节流,您可以 创建一个实现 Stream 的类来完成它。例如:

public class ReadOnlyStreamStringWrapper : Stream
{
    private readonly string theString;

    public ReadOnlyStreamStringWrapper(string theString)
    {
        this.theString = theString;
    }

    public override void Flush()
    {
        throw new NotSupportedException();
    }

    public override long Seek(long offset, SeekOrigin origin)
    {
        switch (origin)
        {
            case SeekOrigin.Begin:
                if(offset < 0 || offset >= theString.Length)
                    throw new InvalidOperationException();

                Position = offset;
                break;
            case SeekOrigin.Current:
                if ((Position + offset) < 0)
                    throw new InvalidOperationException();
                if ((Position + offset) >= theString.Length)
                    throw new InvalidOperationException();

                Position += offset;
                break;
            case SeekOrigin.End:
                if ((theString.Length + offset) < 0)
                    throw new InvalidOperationException();
                if ((theString.Length + offset) >= theString.Length)
                    throw new InvalidOperationException();
                Position = theString.Length + offset;
                break;
        }

        return Position;
    }

    public override void SetLength(long value)
    {
        throw new NotSupportedException();
    }

    public override int Read(byte[] buffer, int offset, int count)
    {
        return Encoding.ASCII.GetBytes(theString, (int)Position, count, buffer, offset);
    }

    public override void Write(byte[] buffer, int offset, int count)
    {
        throw new NotSupportedException();
    }

    public override bool CanRead
    {
        get { return true; }
    }

    public override bool CanSeek
    {
        get { return true; }
    }

    public override bool CanWrite
    {
        get { return false; }
    }

    public override long Length
    {
        get { return theString.Length; }
    }

    public override long Position { get; set; }
}

但是,为了避免“复制”数据,这需要很多工作...


@L.B 正如我在回答中详细说明的那样,如果你想将一个字符串视为 UTF-8 字节流;这意味着它应该是 byte 而不是 char,因此是 byte 2。 - Peter Ritchie
例如,这段代码会抛出异常:var stream = new ReadOnlyStreamStringWrapper("Ça"); var buf = new byte[1]; var read = stream.Read(buf, 0, 1); - L.B
好的,已经更改为ASCII码。 - Peter Ritchie
“许多工作”...我猜这取决于字符串的大小或同时进行的操作数量。我认为,您可以使用 GetByteCount 函数来计算字符串的字节实际长度,从而使其独立于编码类型。 - Jeremy
如果流不可寻址,这会更容易。 - Jeremy
显示剩余2条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接