痛苦缓慢的Azure表格插入和删除批量操作

46
我在使用Azure表格存储时遇到了巨大的性能瓶颈。我的愿望是将表格用作一种缓存,因此长时间的处理可能会产生数百到几千行数据。然后可以通过分区和行键快速查询数据。
查询速度非常快(仅使用分区和行键时非常快,当还要搜索特定匹配的属性时稍慢,但仍然可接受)。
然而,插入和删除行都非常缓慢。
澄清:
我想澄清的是,即使插入单个批次的100个项目也需要几秒钟的时间。这不仅是在处理成千上万行的总吞吐量时的问题。即使只插入100个项目,也会影响我。
以下是我用于批量插入表格的代码示例:
static async Task BatchInsert( CloudTable table, List<ITableEntity> entities )
    {
        int rowOffset = 0;

        while ( rowOffset < entities.Count )
        {
            Stopwatch sw = Stopwatch.StartNew();

            var batch = new TableBatchOperation();

            // next batch
            var rows = entities.Skip( rowOffset ).Take( 100 ).ToList();

            foreach ( var row in rows )
                batch.Insert( row );

            // submit
            await table.ExecuteBatchAsync( batch );

            rowOffset += rows.Count;

            Trace.TraceInformation( "Elapsed time to batch insert " + rows.Count + " rows: " + sw.Elapsed.ToString( "g" ) );
        }
    }

我正在使用批量操作,以下是一个调试输出的示例:

Microsoft.WindowsAzure.Storage Information: 3 : b08a07da-fceb-4bec-af34-3beaa340239b: Starting asynchronous request to http://127.0.0.1:10002/devstoreaccount1.
Microsoft.WindowsAzure.Storage Verbose: 4 : b08a07da-fceb-4bec-af34-3beaa340239b: StringToSign = POST..multipart/mixed; boundary=batch_6d86d34c-5e0e-4c0c-8135-f9788ae41748.Tue, 30 Jul 2013 18:48:38 GMT./devstoreaccount1/devstoreaccount1/$batch.
Microsoft.WindowsAzure.Storage Information: 3 : b08a07da-fceb-4bec-af34-3beaa340239b: Preparing to write request data.
Microsoft.WindowsAzure.Storage Information: 3 : b08a07da-fceb-4bec-af34-3beaa340239b: Writing request data.
Microsoft.WindowsAzure.Storage Information: 3 : b08a07da-fceb-4bec-af34-3beaa340239b: Waiting for response.
Microsoft.WindowsAzure.Storage Information: 3 : b08a07da-fceb-4bec-af34-3beaa340239b: Response received. Status code = 202, Request ID = , Content-MD5 = , ETag = .
Microsoft.WindowsAzure.Storage Information: 3 : b08a07da-fceb-4bec-af34-3beaa340239b: Response headers were processed successfully, proceeding with the rest of the operation.
Microsoft.WindowsAzure.Storage Information: 3 : b08a07da-fceb-4bec-af34-3beaa340239b: Processing response body.
Microsoft.WindowsAzure.Storage Information: 3 : b08a07da-fceb-4bec-af34-3beaa340239b: Operation completed successfully.
iisexpress.exe Information: 0 : Elapsed time to batch insert 100 rows: 0:00:00.9351871

如您所见,这个例子插入100行几乎需要1秒钟。在我的开发机器上(3.4 GHz四核),平均时间似乎约为0.8秒。

这看起来很荒谬。

以下是批量删除操作的示例:

Microsoft.WindowsAzure.Storage Information: 3 : 4c271cb5-7463-44b1-b2e5-848b8fb10a93: Starting asynchronous request to http://127.0.0.1:10002/devstoreaccount1.
Microsoft.WindowsAzure.Storage Verbose: 4 : 4c271cb5-7463-44b1-b2e5-848b8fb10a93: StringToSign = POST..multipart/mixed; boundary=batch_7e3d229f-f8ac-4aa0-8ce9-ed00cb0ba321.Tue, 30 Jul 2013 18:47:41 GMT./devstoreaccount1/devstoreaccount1/$batch.
Microsoft.WindowsAzure.Storage Information: 3 : 4c271cb5-7463-44b1-b2e5-848b8fb10a93: Preparing to write request data.
Microsoft.WindowsAzure.Storage Information: 3 : 4c271cb5-7463-44b1-b2e5-848b8fb10a93: Writing request data.
Microsoft.WindowsAzure.Storage Information: 3 : 4c271cb5-7463-44b1-b2e5-848b8fb10a93: Waiting for response.
Microsoft.WindowsAzure.Storage Information: 3 : 4c271cb5-7463-44b1-b2e5-848b8fb10a93: Response received. Status code = 202, Request ID = , Content-MD5 = , ETag = .
Microsoft.WindowsAzure.Storage Information: 3 : 4c271cb5-7463-44b1-b2e5-848b8fb10a93: Response headers were processed successfully, proceeding with the rest of the operation.
Microsoft.WindowsAzure.Storage Information: 3 : 4c271cb5-7463-44b1-b2e5-848b8fb10a93: Processing response body.
Microsoft.WindowsAzure.Storage Information: 3 : 4c271cb5-7463-44b1-b2e5-848b8fb10a93: Operation completed successfully.
iisexpress.exe Information: 0 : Elapsed time to batch delete 100 rows: 0:00:00.6524402

持续时间超过0.5秒。

我在Azure上运行了这个程序(小型实例),记录下插入28000行的时间为20分钟。

我目前正在使用Storage Client Library 2.1 RC版本:MSDN Blog

我一定做错了什么。有什么想法吗?

更新

我尝试了并行处理,总体速度得到了提高(使用了8个逻辑处理器),但是在我的开发机上每秒插入150行数据。

整体效果没有明显改善,甚至在Azure上部署时可能更差。

我已经按照此建议增加了线程池,并增加了WebRole的最大HTTP连接数。

我仍然感觉缺少某些基本的东西,导致插入/删除操作受到限制,只能达到每秒150次的速度。

更新2

通过分析部署在Azure上的小型实例的一些诊断日志(使用2.1 RC Storage Client中内置的新日志记录功能),我获得了更多信息。

批量插入的第一个Storage Client日志在635109046781264034时刻记录:

caf06fca-1857-4875-9923-98979d850df3: Starting synchronous request to https://?.table.core.windows.net/.; TraceSource 'Microsoft.WindowsAzure.Storage' event

接着大约3秒钟后,我在 635109046810104314 个刻度处看到了这个日志:

caf06fca-1857-4875-9923-98979d850df3: Preparing to write request data.; TraceSource 'Microsoft.WindowsAzure.Storage' event

接下来还有几条日志,总共耗时0.15秒,最后一条是在635109046811645418个tick处结束的,这条日志记录了插入操作的情况:

caf06fca-1857-4875-9923-98979d850df3: Operation completed successfully.; TraceSource 'Microsoft.WindowsAzure.Storage' event

我不确定如何解释这个,但是在我查看的批处理插入日志中非常一致。
更新3:
以下是用于并行批量插入的代码。在此代码中,仅供测试,我确保将每个100批次插入到唯一的分区中。
static async Task BatchInsert( CloudTable table, List<ITableEntity> entities )
    {
        int rowOffset = 0;

        var tasks = new List<Task>();

        while ( rowOffset < entities.Count )
        {
            // next batch
            var rows = entities.Skip( rowOffset ).Take( 100 ).ToList();

            rowOffset += rows.Count;

            string partition = "$" + rowOffset.ToString();

            var task = Task.Factory.StartNew( () =>
                {
                    Stopwatch sw = Stopwatch.StartNew();

                    var batch = new TableBatchOperation();

                    foreach ( var row in rows )
                    {
                        row.PartitionKey = row.PartitionKey + partition;
                        batch.InsertOrReplace( row );
                    }

                    // submit
                    table.ExecuteBatch( batch );

                    Trace.TraceInformation( "Elapsed time to batch insert " + rows.Count + " rows: " + sw.Elapsed.ToString( "F2" ) );
                } );

            tasks.Add( task );
        }

        await Task.WhenAll( tasks );
    }

如上所述,这确实有助于提高插入数千行的总体时间,但是每个100行的批处理仍然需要几秒钟。
更新4:
因此,我创建了一个全新的Azure云服务项目,使用VS2012.2,Web角色作为单页模板(其中包含TODO示例)。
这是直接从盒子里拿出来的,没有新的NuGet包或任何东西。它默认使用Storage客户端库v2和EDM及相关库v5.2。
我只需修改HomeController代码如下(使用一些随机数据模拟我想要存储在真实应用程序中的列):
public ActionResult Index( string returnUrl )
    {
        ViewBag.ReturnUrl = returnUrl;

        Task.Factory.StartNew( () =>
            {
                TableTest();
            } );

        return View();
    }

    static Random random = new Random();
    static double RandomDouble( double maxValue )
    {
        // the Random class is not thread safe!
        lock ( random ) return random.NextDouble() * maxValue;
    }

    void TableTest()
    {
        // Retrieve storage account from connection-string
        CloudStorageAccount storageAccount = CloudStorageAccount.Parse(
            CloudConfigurationManager.GetSetting( "CloudStorageConnectionString" ) );

        // create the table client
        CloudTableClient tableClient = storageAccount.CreateCloudTableClient();

        // retrieve the table
        CloudTable table = tableClient.GetTableReference( "test" );

        // create it if it doesn't already exist
        if ( table.CreateIfNotExists() )
        {
            // the container is new and was just created
            Trace.TraceInformation( "Created table named " + "test" );
        }


        Stopwatch sw = Stopwatch.StartNew();

        // create a bunch of objects
        int count = 28000;
        List<DynamicTableEntity> entities = new List<DynamicTableEntity>( count );

        for ( int i = 0; i < count; i++ )
        {
            var row = new DynamicTableEntity()
            {
                PartitionKey = "filename.txt",
                RowKey = string.Format( "$item{0:D10}", i ),
            };

            row.Properties.Add( "Name", EntityProperty.GeneratePropertyForString( i.ToString() ) );
            row.Properties.Add( "Data", EntityProperty.GeneratePropertyForString( string.Format( "data{0}", i ) ) );
            row.Properties.Add( "Value1", EntityProperty.GeneratePropertyForDouble( RandomDouble( 10000 ) ) );
            row.Properties.Add( "Value2", EntityProperty.GeneratePropertyForDouble( RandomDouble( 10000 ) ) );
            row.Properties.Add( "Value3", EntityProperty.GeneratePropertyForDouble( RandomDouble( 1000 ) ) );
            row.Properties.Add( "Value4", EntityProperty.GeneratePropertyForDouble( RandomDouble( 90 ) ) );
            row.Properties.Add( "Value5", EntityProperty.GeneratePropertyForDouble( RandomDouble( 180 ) ) );
            row.Properties.Add( "Value6", EntityProperty.GeneratePropertyForDouble( RandomDouble( 1000 ) ) );

            entities.Add( row );
        }

        Trace.TraceInformation( "Elapsed time to create record rows: " + sw.Elapsed.ToString() );

        sw = Stopwatch.StartNew();

        Trace.TraceInformation( "Inserting rows" );

        // batch our inserts (100 max)
        BatchInsert( table, entities ).Wait();

        Trace.TraceInformation( "Successfully inserted " + entities.Count + " rows into table " + table.Name );
        Trace.TraceInformation( "Elapsed time: " + sw.Elapsed.ToString() );

        Trace.TraceInformation( "Done" );
    }


            static async Task BatchInsert( CloudTable table, List<DynamicTableEntity> entities )
    {
        int rowOffset = 0;

        var tasks = new List<Task>();

        while ( rowOffset < entities.Count )
        {
            // next batch
            var rows = entities.Skip( rowOffset ).Take( 100 ).ToList();

            rowOffset += rows.Count;

            string partition = "$" + rowOffset.ToString();

            var task = Task.Factory.StartNew( () =>
            {
                var batch = new TableBatchOperation();

                foreach ( var row in rows )
                {
                    row.PartitionKey = row.PartitionKey + partition;
                    batch.InsertOrReplace( row );
                }

                // submit
                table.ExecuteBatch( batch );

                Trace.TraceInformation( "Inserted batch for partition " + partition );
            } );

            tasks.Add( task );
        }

        await Task.WhenAll( tasks );
    }

这是我得到的输出:

iisexpress.exe Information: 0 : Elapsed time to create record rows: 00:00:00.0719448
iisexpress.exe Information: 0 : Inserting rows
iisexpress.exe Information: 0 : Inserted batch for partition $100
...
iisexpress.exe Information: 0 : Successfully inserted 28000 rows into table test
iisexpress.exe Information: 0 : Elapsed time: 00:01:07.1398928

这比我的其他应用程序快一点,达到了超过460 ROPS的水平。但这仍然无法接受。在这个测试中,我的CPU(8个逻辑处理器)几乎达到了最大值,磁盘访问则几乎空闲。

我不知道问题出在哪里。

更新5

反复调整和微调已经取得了一些改进,但是批量InsertOrReplace操作的速度最多只能达到500-700 ROPS左右(每批100个)。此测试在Azure云上使用一个或两个小实例进行。

根据下面的评论,我接受本地测试至多会很慢的事实。

以下是几个示例。每个示例都有自己的PartitionKey:

Successfully inserted 904 rows into table org1; TraceSource 'w3wp.exe' event
Elapsed time: 00:00:01.3401031; TraceSource 'w3wp.exe' event

Successfully inserted 4130 rows into table org1; TraceSource 'w3wp.exe' event
Elapsed time: 00:00:07.3522871; TraceSource 'w3wp.exe' event

Successfully inserted 28020 rows into table org1; TraceSource 'w3wp.exe' event
Elapsed time: 00:00:51.9319217; TraceSource 'w3wp.exe' event

也许是我的MSDN Azure账户有一些性能限制?我不确定。
目前我认为我已经完成了这个问题。也许对于我的目的来说它足够快,或者我会选择不同的路径。
结论:
以下所有答案都很好!
对于我的具体问题,我已经能够在一个小的Azure实例上看到高达2k ROPS的速度,更典型的情况是1k左右。由于我需要控制成本(因此需要控制实例大小),这定义了我将能够使用表格的范围。
感谢大家的帮助。

你能添加这个吗?在调用之前和之后发布结果?int iMinThreadWorkers,iMinThreadPorts,iMaxThreadWorkers,iMaxThreadPorts,iAvialbleThreadWorkers,iAvailableThreadPorts;ThreadPool.GetMinThreads(out iMinThreadWorkers, out iMinThreadPorts); ThreadPool.GetMaxThreads(out iMaxThreadWorkers, out iMaxThreadPorts); ThreadPool.GetAvailableThreads(out iAvailableThreadPorts, out iAvialbleThreadWorkers); - JTtheGeek
进展了!你的代码已经在我们的测试系统中运行了,但是它比我给你的那个慢了大约3000倍...... 我正在研究这个问题。 - JTtheGeek
感谢 @JTtheGeek 的持续帮助,真的非常感激!以下是数值:iMinThreadWorkers 8、iMinThreadPorts 8、iMaxThreadWorkers 32767、iMaxThreadPorts 1000、iAvialbleThreadWorkers 1000、iAvailableThreadPorts 32766。 - Keith Murray
请检查您的日志 - C:\ Users \ [User] \ AppData \ Local \ DevelopmentStorage - JTtheGeek
看起来内部存储的速度大约为每秒700个操作。我编写了一个非常原始的REST客户端,在我的机器上获得了大约每秒140个更新,在Azure内部获得了大约每秒700个更新,没有批量操作。也就是说,我为每个插入执行单独的HTTP请求(我知道这会花费更多的钱,但在我的情况下差异微不足道)。唯一重要的事情是将ServicePointManager.DefaultConnectionLimit增加到10并在10个线程中运行插入。我尝试了不同的数字,但10和100的区别不大。 - Ivan Krivyakov
4个回答

15

基本概念 - 使用并行处理来加速。

步骤1 - 给您的线程池足够的线程来完成此操作 - ThreadPool.SetMinThreads(1024, 256);

步骤2 - 使用分区。我使用GUID作为ID,使用最后两个字符将其分成256个唯一的分区(实际上,在我的情况下,我将它们分组到N个子集中,即48个分区)

步骤3 - 使用任务进行插入,我使用对象池进行表引用。

public List<T> InsertOrUpdate(List<T> items)
        {
            var subLists = SplitIntoPartitionedSublists(items);

            var tasks = new List<Task>();

            foreach (var subList in subLists)
            {
                List<T> list = subList;
                var task = Task.Factory.StartNew(() =>
                    {
                        var batchOp = new TableBatchOperation();
                        var tableRef = GetTableRef();

                        foreach (var item in list)
                        {
                            batchOp.Add(TableOperation.InsertOrReplace(item));
                        }

                        tableRef.ExecuteBatch(batchOp);
                        ReleaseTableRef(tableRef);
                    });
                tasks.Add(task);
            }

            Task.WaitAll(tasks.ToArray());

            return items;
        }

private IEnumerable<List<T>> SplitIntoPartitionedSublists(IEnumerable<T> items)
        {
            var itemsByPartion = new Dictionary<string, List<T>>();

            //split items into partitions
            foreach (var item in items)
            {
                var partition = GetPartition(item);
                if (itemsByPartion.ContainsKey(partition) == false)
                {
                    itemsByPartion[partition] = new List<T>();
                }
                item.PartitionKey = partition;
                item.ETag = "*";
                itemsByPartion[partition].Add(item);
            }

            //split into subsets
            var subLists = new List<List<T>>();
            foreach (var partition in itemsByPartion.Keys)
            {
                var partitionItems = itemsByPartion[partition];
                for (int i = 0; i < partitionItems.Count; i += MaxBatch)
                {
                    subLists.Add(partitionItems.Skip(i).Take(MaxBatch).ToList());
                }
            }

            return subLists;
        }

        private void BuildPartitionIndentifiers(int partitonCount)
        {
            var chars = new char[] { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f' }.ToList();
            var keys = new List<string>();

            for (int i = 0; i < chars.Count; i++)
            {
                var keyA = chars[i];
                for (int j = 0; j < chars.Count; j++)
                {
                    var keyB = chars[j];
                    keys.Add(string.Concat(keyA, keyB));
                }
            }


            var keySetMaxSize = Math.Max(1, (int)Math.Floor((double)keys.Count / ((double)partitonCount)));
            var keySets = new List<List<string>>();

            if (partitonCount > keys.Count)
            {
                partitonCount = keys.Count;
            }

            //Build the key sets
            var index = 0;
            while (index < keys.Count)
            {
                var keysSet = keys.Skip(index).Take(keySetMaxSize).ToList();
                keySets.Add(keysSet);
                index += keySetMaxSize;
            }

            //build the lookups and datatable for each key set
            _partitions = new List<string>();
            for (int i = 0; i < keySets.Count; i++)
            {
                var partitionName = String.Concat("subSet_", i);
                foreach (var key in keySets[i])
                {
                    _partitionByKey[key] = partitionName;
                }
                _partitions.Add(partitionName);
            }

        }

        private string GetPartition(T item)
        {
            var partKey = item.Id.ToString().Substring(34,2);
            return _partitionByKey[partKey];
        }

        private string GetPartition(Guid id)
        {
            var partKey = id.ToString().Substring(34, 2);
            return _partitionByKey[partKey];
        }

        private CloudTable GetTableRef()
        {
            CloudTable tableRef = null;
            //try to pop a table ref out of the stack
            var foundTableRefInStack = _tableRefs.TryPop(out tableRef);
            if (foundTableRefInStack == false)
            {
                //no table ref available must create a new one
                var client = _account.CreateCloudTableClient();
                client.RetryPolicy = new ExponentialRetry(TimeSpan.FromSeconds(1), 4);
                tableRef = client.GetTableReference(_sTableName);
            }

            //ensure table is created
            if (_bTableCreated != true)
            {
                tableRef.CreateIfNotExists();
                _bTableCreated = true;
            }

            return tableRef;
        }

结果 - 最大存储帐户为19-22kops

如果您对完整源代码感兴趣,请联系我。

需要更多?使用多个存储帐户!

这是经过数月的试验和错误、测试以及反复思考得出的结论。我真心希望它能够帮到您。


谢谢你的回答。我已经采用了这个更简单的变体,仍然像你建议的那样使用并行任务,并且每个分区最多有100个项目。以这种方式插入28k行在我的开发机器上花费了超过3分钟,我的CPU几乎达到了最大值。这大约是155rops,非常糟糕,基本上与没有并行化的速率相同。在Azure上进行的相同测试在小实例上表现得更差(正如我所预期的那样),CPU达到最大值时插入速度很慢。 - Keith Murray
所以你需要将它们分批处理,每批包含100个项目,每批只能包含特定的分区集,并在任务中使用。听起来你混淆了项目并且没有正确地进行分批处理。如果你能给我发送一个小的测试项目展示你的问题,我会帮你查看一下。 - JTtheGeek
这正是我正在做的,插入100个批次,每个批次都有一个唯一的PartitionKey,并且全部并行进行。我应该澄清,即使在本地和Azure中插入一个批次也需要几秒钟的时间。 - Keith Murray
1
不要使用Parallel.for,因为这会导致性能下降。你可以通过设置MaxDegreeOfParallelism来决定触发多少个并行任务。 - Amit
我需要进行更多的测试来确定Amit的建议是否解决了问题。在我的测试中,直接管理任务比Parallel.for提供了极大的性能优势。 - JTtheGeek
显示剩余5条评论

13

好的,第三次回答就是成功?

http://blogs.msdn.com/b/windowsazurestorage/archive/2010/11/06/how-to-get-most-out-of-windows-azure-tables.aspx

有几点需要注意 - 存储仿真器 - 来自一个深入挖掘的朋友。

"所有内容都会命中单个数据库中的单个表(更多分区不会影响任何东西)。每个表插入操作至少需要3个sql操作。每批都在事务内部。根据事务隔离级别,这些批处理将有限制的并行执行能力。

由于 SQL Server 的行为,串行批处理应该比单独插入更快。(单独插入本质上是小事务,每个事务都会刷新到磁盘,而真正的事务会作为一组刷新到磁盘)。"

也就是说,在仿真器上使用多个分区不会影响性能,但是在实际的 Azure 存储中会影响性能。

此外,请启用日志记录并检查一下日志 - c: \ users \用户名\ appdata \ local \ developmentstorage

批处理大小为100似乎提供了最佳的实际性能,关闭 Nagle,关闭期望100,增加连接限制。

还要确保您不会意外插入重复项,这将导致错误并大大降低速度。

在真正的存储中进行测试。 有一个相当不错的库可以为您处理大多数内容 - http://www.nuget.org/packages/WindowsAzure.StorageExtensions/,只需确保在添加和其他操作上实际调用 ToList,因为它不会真正执行直到枚举。此外,该库使用 DynamicTableEntity,因此序列化会受到小的性能影响,但它确实允许您使用纯 POCO 对象而没有 TableEntity 的东西。

~ JT


最后添加以下内容 - <system.net> <connectionManagement> <add address="*" maxconnection="12"/> </connectionManagement> </system.net> - JTtheGeek
感谢您一直以来对此事的支持。我已经取得了一些进展,但在我的小型Azure实例中进行插入操作时,仍无法获得超过500个ROPS。也许我的MSDN Azure订阅有上限?一切似乎都正常工作,只是速度不如我预期。 - Keith Murray
我将这个标记为答案,因为它特别解决了我在本地电脑上看到的极慢的性能问题。尽管@JTtheGeek的所有评论和答案都很有帮助。 - Keith Murray
链接不存在。 - aniruddha

7

经历了很多痛苦和实验后,最终通过 Azure Table 存储得到了单表分区的最优吞吐量(每秒 2,000 多个批量写操作),并且在存储帐户中获得了更好的吞吐量(每秒 3,500 多个批量写操作)。我尝试了各种不同的方法,但编程设置 .NET 连接限制(我尝试了配置示例,但对我没有用)解决了问题(基于 Microsoft 提供的 白皮书),如下所示:

ServicePoint tableServicePoint = ServicePointManager
    .FindServicePoint(_StorageAccount.TableEndpoint);

//This is a notorious issue that has affected many developers. By default, the value 
//for the number of .NET HTTP connections is 2.
//This implies that only 2 concurrent connections can be maintained. This manifests itself
//as "underlying connection was closed..." when the number of concurrent requests is
//greater than 2.

tableServicePoint.ConnectionLimit = 1000;

如果其他人也获得了每个存储账户 20K+ 批量写操作的经验,请分享您的经验。


1
我们有没有人达到每秒2000个批处理操作并愿意分享更新或代码的? - drobertson
1
我使用ConnectionLimit参数获得了2.5k op/s的速度。如果没有设置该参数,我只能做到大约100 op/s。 - Horia Toma
也许这个页面能帮助别人: http://blogs.msmvps.com/nunogodinho/2013/11/20/windows-azure-storage-performance-best-practices/ - Horia Toma
更新的白皮书链接:https://web.archive.org/web/20160201003635/http://download.microsoft.com/download/3/B/1/3B170FF4-2354-4B2D-B4DC-8FED5F838F6A/Windows%20Azure%20Table%20-%20May%202009.docx - JTtheGeek

5

为了更有趣,这里有一个新的答案 - 隔离的独立测试在生产中实现了惊人的写入性能,并且避免了IO阻塞和连接管理,表现得更好。我非常想看看这对你的作用如何,因为我们获得了荒谬的写入速度(> 7kps)。

网页配置

 <system.net>
    <connectionManagement>
      <add address="*" maxconnection="48"/>
    </connectionManagement>
  </system.net>

为了测试,我使用的参数是基于数据量的,比如25000个数据项,24个分区,批量大小为100似乎总是最好的选择,并参考计数为20。这是使用TPL数据流(http://www.nuget.org/packages/Microsoft.Tpl.Dataflow/)来提供缓存块的一种不错的可等待线程安全的表引用拉取。

public class DyanmicBulkInsertTestPooledRefsAndAsynch : WebTest, IDynamicWebTest
{
    private int _itemCount;
    private int _partitionCount;
    private int _batchSize;
    private List<TestTableEntity> _items;
    private GuidIdPartitionSplitter<TestTableEntity> _partitionSplitter;
    private string _tableName;
    private CloudStorageAccount _account;
    private CloudTableClient _tableClient;
    private Dictionary<string, List<TestTableEntity>> _itemsByParition;
    private int _maxRefCount;
    private BufferBlock<CloudTable> _tableRefs;


    public DyanmicBulkInsertTestPooledRefsAndAsynch()
    {
        Properties = new List<ItemProp>();    
        Properties.Add(new ItemProp("ItemCount", typeof(int)));
        Properties.Add(new ItemProp("PartitionCount", typeof(int)));
        Properties.Add(new ItemProp("BatchSize", typeof(int)));
        Properties.Add(new ItemProp("MaxRefs", typeof(int)));


    }

    public List<ItemProp> Properties { get; set; }

    public void SetProps(Dictionary<string, object> propValuesByPropName)
    {
        _itemCount = (int)propValuesByPropName["ItemCount"];
        _partitionCount = (int)propValuesByPropName["PartitionCount"];
        _batchSize = (int)propValuesByPropName["BatchSize"];
        _maxRefCount = (int)propValuesByPropName["MaxRefs"];
    }

    protected override void SetupTest()
    {
        base.SetupTest();

        ThreadPool.SetMinThreads(1024, 256);
        ServicePointManager.DefaultConnectionLimit = 256;
        ServicePointManager.UseNagleAlgorithm = false;
        ServicePointManager.Expect100Continue = false;


        _account = CloudStorageAccount.Parse(CloudConfigurationManager.GetSetting("DataConnectionString"));
        _tableClient = _account.CreateCloudTableClient();
        _tableName = "testtable" + new Random().Next(100000);

        //create the refs
        _tableRefs = new BufferBlock<CloudTable>();
        for (int i = 0; i < _maxRefCount; i++)
        {
            _tableRefs.Post(_tableClient.GetTableReference(_tableName));
        }

        var tableRefTask = GetTableRef();
        tableRefTask.Wait();
        var tableRef = tableRefTask.Result;

        tableRef.CreateIfNotExists();
        ReleaseRef(tableRef);

        _items = TestUtils.GenerateTableItems(_itemCount);
        _partitionSplitter = new GuidIdPartitionSplitter<TestTableEntity>();
        _partitionSplitter.BuildPartitions(_partitionCount);

        _items.ForEach(o =>
            {
                o.ETag = "*";
                o.Timestamp = DateTime.Now;
                o.PartitionKey = _partitionSplitter.GetPartition(o);
            });

        _itemsByParition = _partitionSplitter.SplitIntoPartitionedSublists(_items);
    }

    private async Task<CloudTable> GetTableRef()
    {
        return await _tableRefs.ReceiveAsync();            
    }

    private void ReleaseRef(CloudTable tableRef)
    {
        _tableRefs.Post(tableRef);
    }

    protected override void ExecuteTest()
    {
        Task.WaitAll(_itemsByParition.Keys.Select(parition => Task.Factory.StartNew(() => InsertParitionItems(_itemsByParition[parition]))).ToArray());
    }

    private void InsertParitionItems(List<TestTableEntity> items)
    {

        var tasks = new List<Task>();

        for (int i = 0; i < items.Count; i += _batchSize)
        {
            int i1 = i;

            var task = Task.Factory.StartNew(async () =>
            {
                var batchItems = items.Skip(i1).Take(_batchSize).ToList();

                if (batchItems.Select(o => o.PartitionKey).Distinct().Count() > 1)
                {
                    throw new Exception("Multiple partitions batch");
                }

                var batchOp = new TableBatchOperation();
                batchItems.ForEach(batchOp.InsertOrReplace);   

                var tableRef = GetTableRef.Result();
                tableRef.ExecuteBatch(batchOp);
                ReleaseRef(tableRef);
            });

            tasks.Add(task);

        }

        Task.WaitAll(tasks.ToArray());


    }

    protected override void CleanupTest()
    {
        var tableRefTask = GetTableRef();
        tableRefTask.Wait();
        var tableRef = tableRefTask.Result;
        tableRef.DeleteIfExists();
        ReleaseRef(tableRef);
    }

我们目前正在开发一个版本,可以处理多个存储账户,希望能够获得更快的速度。此外,对于大型数据集,我们在8核虚拟机上运行,但是由于新的非阻塞IO,它也可以在有限的虚拟机上运行得很好。祝你好运!

 public class SimpleGuidIdPartitionSplitter<T> where T : IUniqueId
{
    private ConcurrentDictionary<string, string> _partitionByKey = new ConcurrentDictionary<string, string>();
    private List<string> _partitions;
    private bool _bPartitionsBuilt;

    public SimpleGuidIdPartitionSplitter()
    {

    }

    public void BuildPartitions(int iPartCount)
    {
        BuildPartitionIndentifiers(iPartCount);
    }

    public string GetPartition(T item)
    {
        if (_bPartitionsBuilt == false)
        {
            throw new Exception("Partitions Not Built");
        }

        var partKey = item.Id.ToString().Substring(34, 2);
        return _partitionByKey[partKey];
    }

    public string GetPartition(Guid id)
    {
        if (_bPartitionsBuilt == false)
        {
            throw new Exception("Partitions Not Built");
        }

        var partKey = id.ToString().Substring(34, 2);
        return _partitionByKey[partKey];
    }

    #region Helpers
    private void BuildPartitionIndentifiers(int partitonCount)
    {
        var chars = new char[] { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f' }.ToList();
        var keys = new List<string>();

        for (int i = 0; i < chars.Count; i++)
        {
            var keyA = chars[i];
            for (int j = 0; j < chars.Count; j++)
            {
                var keyB = chars[j];
                keys.Add(string.Concat(keyA, keyB));
            }
        }


        var keySetMaxSize = Math.Max(1, (int)Math.Floor((double)keys.Count / ((double)partitonCount)));
        var keySets = new List<List<string>>();

        if (partitonCount > keys.Count)
        {
            partitonCount = keys.Count;
        }

        //Build the key sets
        var index = 0;
        while (index < keys.Count)
        {
            var keysSet = keys.Skip(index).Take(keySetMaxSize).ToList();
            keySets.Add(keysSet);
            index += keySetMaxSize;
        }

        //build the lookups and datatable for each key set
        _partitions = new List<string>();
        for (int i = 0; i < keySets.Count; i++)
        {
            var partitionName = String.Concat("subSet_", i);
            foreach (var key in keySets[i])
            {
                _partitionByKey[key] = partitionName;
            }
            _partitions.Add(partitionName);
        }

        _bPartitionsBuilt = true;
    }
    #endregion
}



internal static List<TestTableEntity> GenerateTableItems(int count)
        {
            var items = new List<TestTableEntity>();
            var random = new Random();

            for (int i = 0; i < count; i++)
            {
                var itemId = Guid.NewGuid();

                items.Add(new TestTableEntity()
                {
                    Id = itemId,
                    TestGuid = Guid.NewGuid(),
                    RowKey = itemId.ToString(),
                    TestBool = true,
                    TestDateTime = DateTime.Now,
                    TestDouble = random.Next() * 1000000,
                    TestInt = random.Next(10000),
                    TestString = Guid.NewGuid().ToString(),
                });
            }

            var dupRowKeys = items.GroupBy(o => o.RowKey).Where(o => o.Count() > 1).Select(o => o.Key).ToList();
            if (dupRowKeys.Count > 0)
            {
                throw  new Exception("Dupicate Row Keys");
            }

            return items;
        }

还有一件事——您的计时和我们的框架如何被影响,指向了这个问题:http://blogs.msdn.com/b/windowsazurestorage/archive/2013/08/08/net-clients-encountering-port-exhaustion-after-installing-kb2750149-or-kb2805227.aspx


再次感谢!为了进行快速测试,我阅读了那篇博客并更新了存储库2.0.6.1,但在我的测试中没有明显的影响。接下来我会尝试你的代码。 - Keith Murray
经过更多的测试,我相信我已经达到了小型Azure实例的极限。当批量插入或替换100个项目时,每个批次都进入自己的分区,这些实例似乎最大只能达到2k ROPS。无论如何调整maxConnections、线程数、nagling或其他建议的任何内容都不能推动更高的性能。也许所有这些都在更大的多核Azure实例上发挥作用。 - Keith Murray
在你的情况下,可能与小实例可用带宽有关...在正常情况下会发生什么?我通常会在大型实例上运行这些东西。 - JTtheGeek
1
所以,我们在不同的实例大小上运行了测试,结果差别很大。在中型实例上,我们每秒约获得1200次写入,而在超大型实例上,我们每秒约获得7200次写入。我们正在考虑构建一个分布式读/写控制器,可能使用dcache作为中间人。 - JTtheGeek
1
感谢确认和提供的所有回答。这非常有益!现在我知道表格的位置以及如果我们需要那种性能,需要什么。 - Keith Murray
我正在对Azure Table进行一些大规模插入操作。这是期权交易数据,分区效果很好。到目前为止,我已经达到了790 ROPS的最大值,但似乎无法超越这个限制。我希望您能分享任何更新或工具,以实现更好的结果。顺便说一句,感谢您在这里的努力。它们非常有帮助。 - drobertson

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接