使用protobuf压缩流式传输IDataReader

4
我们有一个需求,需要大大减少我们后端服务在从SQL拉取和推送数据时使用的带宽。SqlClient使用的TDS流相当臃肿。多年来,人们一直要求在从sql提取数据时提供压缩选项,但微软没有添加此功能。
我想知道是否有任何想法来处理这个问题。以下是我迄今为止尝试过的内容:
  1. 我修改了https://github.com/MindFlavor/TDSBridge以在套接字层面上添加压缩和解压缩。由于有效负载是SSL加密的,因此并没有多大差别。
  2. 接下来,我使用https://github.com/dotarj/protobuf-net-datahttps://github.com/jchristn/WatsonTcp中找到的IDataReader to Protobuf库和TCP框架,试图创建一种客户端服务器代理来通过将IDataReader转换为protobuf,然后压缩此流,并在另一端执行相反操作来将其流式传输到网络。 。
我已经在这里获得了概念验证,并且实际上在网络上比普通TDS流使用原始字节时获得了84%至98%的减少。缺点是WatsonTcp希望您在分配流时传递内容长度。但是在创建整个protobuf流之前没有办法知道这一点。我们有时会一次传输数百GB,所以这是行不通的。
我没有看到protobuf-net-data如何通过grpc进行流式处理,即使它可以这样做,我担心IAsyncEnumerable中记录的粒度可能会减慢大型传输的速度。
毫无疑问,我可以编写一个完全自定义的原始套接字压缩protobuf over TCP流式传输实现,并且客户端的表面积接近SqlCommand,但我知道这很难实现。
有任何省时的想法吗?如果没有,也许我会将其作为开源项目发布。

你是否考虑过在靠近支持返回压缩、高效格式结果的 SQL Server 旁边建立一个 HTTP Web API?使用简单的二进制序列化 System.Data.DataSet 是相当不错的选择。https://learn.microsoft.com/en-us/archive/msdn-magazine/2004/october/cutting-edge-binary-serialization-of-datasets - David Browne - Microsoft
感谢@DavidBrowne-Microsoft。我们面临的挑战是一些数据集达到了数十GB。我们通过从一个服务器的idatareader流式传输到另一个服务器的bulkcopy来解决这个问题。每一侧的数据存储都是压缩的。我不确定如何开始部分填充数据集,压缩并开始传输,并以流格式在另一侧执行相反的操作。我对System.Data.Dataset的理解是需要一次性在内存中管理所有内容? - David Thompson
只是随口一说:我不会担心流式 API 的开销 - 只需将每个批次设置为某个适当的大小,而不是 1;它可以是 10、100、10k - 但无论如何:它都会使这里的任何开销变得无关紧要 - 然后在输出时展开每个批次成模拟行?即编写一个方法,该方法接受 IAsyncEnumerable<RowBatch> 并返回 IDataReader(仅实现异步部分)或 IAsyncEnumerable<IDataRecord>(每个 RowBatch 多个记录)? - Marc Gravell
关于“在IAsyncEnumerable<T>中记录的粒度”,诀窍是:不要将T定义为一条记录;而是将其定义为N个记录的页面,其中N是某个数字(可以是10, 100,甚至1000)-这样您就可以获得增量批处理的优势,而不需要支付任何重要的“每行”开销。 - Marc Gravell
2个回答

0

您可以使用此技术使 SQL Server 将结果格式化为 gzipped csv(在 group by 中调整每行结果 - 大约在 1000 行时 gzip 开销会减少):

with csv as (
    select n = row_number() over (order by (select null)),
        line = convert(nvarchar(max), concat(
            message_id, ',', language_id, ',', severity, ',',
            is_event_logged, ',', '"' + replace([text], '"', '""') + '"'))
    from sys.messages)

select compress(string_agg(line, char(13)) within group (order by n))
from csv group by n / 1000

如果你在 SQL Server 中遇到了实际的出口瓶颈,这应该会有所帮助。将其实现为 TDSBridge 并重写查询,然后将结果转换回客户端期望的格式,这将是一个有趣的实践。


0
这里有一个模式,您可以使用它将大查询分批传输,其中每个批是经过压缩的二进制序列化DataTable。传输和反序列化后,每个DataTable都可以直接被SqlBulkCopy使用。相同的模式可以与其他格式一起使用,但在传递给SqlBulkCopy之前需要使用额外的转换器。
using System.Data.SqlClient;
using System;
using System.Collections;
using System.Collections.Generic;
using System.Data;
using System.IO;
using System.Runtime.Serialization.Formatters.Binary;

namespace BatchingDataReader
{
    class BatchingDataReader : IDataReader
    {
        private int batchSize;
        private IDataReader rdr;
        private int rowsRead;
        private bool atEnd;
        private int batchesRead;

        public BatchingDataReader(IDataReader rdr, int batchSize)
        {
            this.batchSize = batchSize;
            this.rdr = rdr;
        }

        public object this[int i] => rdr[i];

        public object this[string name] => rdr[name];

        public int Depth => rdr.Depth;

        public bool IsClosed => rdr.IsClosed;

        public int RecordsAffected => rdr.RecordsAffected;

        public int FieldCount => rdr.FieldCount;

        public void Close()
        {
            if (!atEnd)
                return;
            rdr.Close();
        }

        public void Dispose()
        {
            if (!atEnd)
                return;

            rdr.Dispose();
        }

        public bool GetBoolean(int i)
        {
            return rdr.GetBoolean(i);
        }

        public byte GetByte(int i)
        {
            return rdr.GetByte(i);
        }

        public long GetBytes(int i, long fieldOffset, byte[] buffer, int bufferoffset, int length)
        {
            return rdr.GetBytes(i, fieldOffset, buffer, bufferoffset, length);
        }

        public char GetChar(int i)
        {
            return rdr.GetChar(i);
        }

        public long GetChars(int i, long fieldoffset, char[] buffer, int bufferoffset, int length)
        {
            return rdr.GetChars(i, fieldoffset, buffer, bufferoffset, length);
        }

        public IDataReader GetData(int i)
        {
            return rdr.GetData(i);
        }

        public string GetDataTypeName(int i)
        {
            return rdr.GetDataTypeName(i);
        }

        public DateTime GetDateTime(int i)
        {
            return rdr.GetDateTime(i);
        }

        public decimal GetDecimal(int i)
        {
            return rdr.GetDecimal(i);
        }

        public double GetDouble(int i)
        {
            return rdr.GetDouble(i);
        }

        public Type GetFieldType(int i)
        {
            return rdr.GetFieldType(i);
        }

        public float GetFloat(int i)
        {
            return rdr.GetFloat(i);
        }

        public Guid GetGuid(int i)
        {
            return rdr.GetGuid(i);
        }

        public short GetInt16(int i)
        {
            return rdr.GetInt16(i);
        }

        public int GetInt32(int i)
        {
            return rdr.GetInt32(i);
        }

        public long GetInt64(int i)
        {
            return rdr.GetInt64(i);
        }

        public string GetName(int i)
        {
            return rdr.GetName(i);
        }

        public int GetOrdinal(string name)
        {
            return rdr.GetOrdinal(name);
        }

        public DataTable GetSchemaTable()
        {
            return rdr.GetSchemaTable();
        }

        public string GetString(int i)
        {
            return rdr.GetString(i);
        }

        public object GetValue(int i)
        {
            return rdr.GetValue(i);
        }

        public int GetValues(object[] values)
        {
            return rdr.GetValues(values);
        }

        public bool IsDBNull(int i)
        {
            return rdr.IsDBNull(i);
        }

        public bool NextResult()
        {
            if (!atEnd)
            {
                batchesRead += 1;
                rowsRead = 0;
                return true;
            }

            if (IsClosed)
                return false;

            return rdr.NextResult();
        }

        public bool Read()
        {
            if (rowsRead >= batchSize)
                return false;
            rowsRead += 1;

            atEnd = !rdr.Read();
            return !atEnd;

        }

        public static IEnumerable<DataTable> Read(SqlDataReader r, int batchSize)
        {
            var rdr = new BatchingDataReader(r, batchSize);
            do
            {
                var dt = new DataTable();
                dt.TableName = "table";
                dt.Load(rdr);
                yield return dt;
            } while (rdr.NextResult());
        }
    }
    class Program
    {

        static void Main(string[] args)
        {
            var constr = "server=localhost;database=master;integrated security=true";
            var outfile = "c:\\temp\\out.bin";

            if (File.Exists(outfile))
                File.Delete(outfile);

            using (var con = new SqlConnection(constr))
            {
                //322,162,200  TDS raw
                //235,355,311  binary uncompressed out.bin
                // 52,755,181  binary GZ Fastest
                // 43,061,121  binary GZ optimal
                // 65,282,624  XML GZ fastest
                // 41,892,056  binary GZ optimal 100,000 row batches

                con.Open();

                var bin = new BinaryFormatter();

                var cmd = new SqlCommand("select top (1000000) * from sys.messages m, sys.objects o", con);
                using (SqlDataReader rdr = cmd.ExecuteReader())
                using (var destFile = File.OpenWrite(outfile))
                using (var zipStream = new System.IO.Compression.GZipStream(destFile,System.IO.Compression.CompressionLevel.Optimal))
                {
                    foreach (var dt in BatchingDataReader.Read(rdr, 10000))
                    {
                        Console.WriteLine(dt.Rows.Count);

                        dt.RemotingFormat = SerializationFormat.Binary;
                        bin.Serialize(zipStream, dt);
                    }
                }
            }
        }
    }

}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接