执行大规模并行查询的通用类。有反馈吗?

10

我不明白为什么,在Windows Azure Table Storage的客户端库中似乎没有执行多个查询的并行机制。我创建了一个模板类,可以用来节省大量时间,您可以随意使用它。但是,如果您能够仔细研究并提供反馈以改进这个类,我将不胜感激。

public class AsyncDataQuery<T> where T: new()
{
    public AsyncDataQuery(bool preserve_order)
    {
        m_preserve_order = preserve_order;
        this.Queries = new List<CloudTableQuery<T>>(1000);
    }

    public void AddQuery(IQueryable<T> query)
    {
        var data_query = (DataServiceQuery<T>)query;
        var uri = data_query.RequestUri; // required

        this.Queries.Add(new CloudTableQuery<T>(data_query));
    }

    /// <summary>
    /// Blocking but still optimized.
    /// </summary>
    public List<T> Execute()
    {
        this.BeginAsync();
        return this.EndAsync();
    }

    public void BeginAsync()
    {
        if (m_preserve_order == true)
        {
            this.Items = new List<T>(Queries.Count);
            for (var i = 0; i < Queries.Count; i++)
            {
                this.Items.Add(new T());
            }
        }
        else
        {
            this.Items = new List<T>(Queries.Count * 2);
        }

        m_wait = new ManualResetEvent(false);

        for (var i = 0; i < Queries.Count; i++)
        {
            var query = Queries[i];
            query.BeginExecuteSegmented(callback, i);
        }
    }

    public List<T> EndAsync()
    {
        m_wait.WaitOne();
        m_wait.Dispose();

        return this.Items;
    }

    private List<T> Items { get; set; }
    private List<CloudTableQuery<T>> Queries { get; set; }

    private bool m_preserve_order;
    private ManualResetEvent m_wait;
    private int m_completed = 0;
    private object m_lock = new object();

    private void callback(IAsyncResult ar)
    {
        int i = (int)ar.AsyncState;
        CloudTableQuery<T> query = Queries[i];
        var response = query.EndExecuteSegmented(ar);
        if (m_preserve_order == true)
        { // preserve ordering only supports one result per query
            lock (m_lock)
            {
                this.Items[i] = response.Results.Single();
            }
        }
        else
        { // add any number of items
            lock (m_lock)
            {
                this.Items.AddRange(response.Results);
            }
        }
        if (response.HasMoreResults == true)
        { // more data to pull
            query.BeginExecuteSegmented(response.ContinuationToken, callback, i);
            return;
        }
        m_completed = Interlocked.Increment(ref m_completed);
        if (m_completed == Queries.Count)
        {
            m_wait.Set();
        }
    }
}

顺便提一下,这个程序正在生产环境中运行(在newscandy.com上),目前看来我的使用效果还不错。但是似乎还有很大的改进空间。 - Aaron
2
从快速查看来看:不要公开集合的设置器,也不要公开具体的集合实现。 - R. Martinho Fernandes
好建议。我同意实现方面也需要更多的考虑。无论是为了性能还是功能上的改进,都可以做些什么呢? - Aaron
回调函数会在多个线程中被调用吗?我觉得它看起来不是线程安全的,特别是 Items.AddRange 的调用。当有多个结果时,您的代码应该抛出异常以保持顺序。您可以使用 .Single 替代 .First - CodesInChaos
我认为处理取消和超时是一个好主意。这里有一个灵感来源:http://blogs.msdn.com/b/jimoneil/archive/2010/10/05/azure-home-part-7-asynchronous-table-storage-pagination.aspx - makerofthings7
1
这个问题似乎应该放在codereview.stackexchange.com上,而不是在这里吧? - Timwi
2个回答

5

看来我来晚了,我想补充两点:

  1. ManualResetEvent是IDisposable的。所以你需要确保它在某个地方被处理掉。
  2. 错误处理 - 如果其中一个查询失败了,它可能会导致整个过程失败。你应该重试失败的请求。或者你可以返回你得到的值,并指示哪些查询失败了,这样调用者就可以重试这些查询。
  3. 客户端超时 - 没有。如果服务器端为您超时,这不是问题,但如果那个失败了(例如,网络问题),客户端将永远挂起。

此外,我认为这实际上是比任务并行库更好的方法。我之前尝试过每个查询一个任务的方法。代码实际上更加笨拙,并且往往会产生大量活动线程。我仍然没有对你的代码进行广泛测试,但乍一看似乎效果更好。

更新

我做了一些工作,对上面的代码进行了更多或更少的重写。我的重写删除了所有锁定,支持挂起事务的客户端超时(很少发生,但确实会影响你的一天),以及一些异常处理逻辑。在Bitbucket上有一个完整的解决方案和测试,其中最相关的代码位于一个文件中,但它确实需要一些其他项目中的帮助程序。


抱歉回复晚了,不确定为什么错过了它。非常好的反馈。我在 ManualResetEvent 上添加了手动 Dispose 调用,但最终在 GC 所调用的析构函数被调用时被销毁了。手动调用 Dispose 只有在(非常)高密度下才是一个问题。错误处理是另一个好建议,但现在没有时间调试它(因为我没有在使用它)。欢迎回馈,我很乐意更新模板! - Aaron

4

好的建议。我不确定在这一点上它能给我带来多少好处,但如果我从头开始,我可能会尝试那条路线。对于那些不太熟悉多线程开发的人,我肯定会推荐使用任务并行库。 - Aaron

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接