如何在EF Core 3.1中异步使用GroupBy?

21
当我在LINQ查询中使用GroupBy作为EFCore的一部分时,会出现错误System.InvalidOperationException:不支持客户端GroupBy。
这是因为EF Core 3.1尽可能地在服务器端评估查询,而不是在客户端评估它们,并且该调用无法转换为SQL。
因此,以下语句无法正常工作,并产生上述错误:
var blogs = await context.Blogs
    .Where(blog => blog.Url.Contains("dotnet"))
    .GroupBy(t => t.BlobNumber)
    .Select(b => b)
    .ToListAsync();

现在显然的解决方案是在调用GroupBy()之前使用.AsEnumerable()或.ToList(),因为这明确告诉EF Core你想在客户端进行分组。有关此问题的讨论可以在GitHub上Microsoft文档中找到。

var blogs = context.Blogs
    .Where(blog => blog.Url.Contains("dotnet"))
    .AsEnumerable()
    .GroupBy(t => t.BlobNumber)
    .Select(b => b)
    .ToList();

然而,这不是异步的。我该如何使其异步?

如果我将AsEnumerable()更改为AsAsyncEnumerable(),则会出现错误。如果我尝试将AsEnumerable()更改为ToListAsync(),则GroupBy()命令将失败。

我考虑将其包装在Task.FromResult中,但这实际上是否是异步的?或者数据库查询仍然是同步的,只有后续的分组是异步的?

var blogs = await Task.FromResult(context.Blogs
    .Where(blog => blog.Url.Contains("dotnet"))
    .AsEnumerable()
    .GroupBy(t => t.BlobNumber)
    .Select(b => b)
    .ToList());

如果这不起作用,还有其他方法吗?

即使没有那个函数,问题似乎仍然存在。(上面的示例只是我从其他地方复制和粘贴拼凑出来的。我的实际代码并不使用那个函数。) - Gary
无论如何,我认为如果使用AsEnumerable可以如此轻松地解决问题,那么肯定还有另一种简单的异步方式来解决它。我只是想不出来是什么。 - Gary
@IvanStoev 没有理由,OP并没有真正地对任何东西进行分组。该查询正在加载所有详细行并将它们分批处理,它不会聚合任何内容。实际上,那个 Select(b=>b) 不应该 取消分组 吗? - Panagiotis Kanavos
@PanagiotisKanavos Select(b => b) 并没有实际作用,可以跳过(对问题来说并不重要)。问题在于在 GroupBy 前面放什么内容,以及如何让代码在客户端编译和执行。 - Ivan Stoev
1
@PanagiotisKanavos,在示例查询中具体的Select对于所提出的问题并不重要,该问题是:“如何在EF Core 3.1中以异步方式使用GroupBy?” - Ivan Stoev
显示剩余5条评论
2个回答

15
我认为你唯一的方法就是像这样去做。
var blogs = await context.Blogs
    .Where(blog => blog.Url.Contains("dotnet"))
    .ToListAsync();

var groupedBlogs = blogs.GroupBy(t => t.BlobNumber).Select(b => b).ToList();

因为GroupBy将在客户端上进行评估


1
这就是我最终选择的方案。它很简单,而且有效。 - Gary

8

这个查询并不是在SQL/EF Core的意义上尝试对数据进行分组。没有涉及到任何聚合操作。

它会加载所有详细行然后将它们批量分成不同的桶(bucket)在客户端。 EF Core 在这方面没有任何参与,这是一个完全由客户端执行的操作。相当于:

var blogs = await context.Blogs
    .Where(blog => blog.Url.Contains("dotnet"))
    .ToListAsync();

var blogsByNum = blogs.ToLookup(t => t.BlobNumber);

加速分组

批处理/分组/查找操作是纯粹的CPU密集型操作,因此加速它的唯一方法就是并行化,即使用所有CPU对数据进行分组,例如:

var blogsByNum = blogs.AsParallel()
                      .ToLookup(t => t.BlobNumber);
ToLookup的作用与GroupBy().ToList()差不多 - 它根据键将行分组到桶中。 在加载时分组 另一种方法是异步地加载结果并在到达时将它们放入桶中。为此,我们需要使用AsAsyncEnumerable()ToListAsync()会一次返回所有结果,因此不能使用它。
这种方法与ToLookup的工作方式非常相似。

var blogs = await context.Blogs
    .Where(blog => blog.Url.Contains("dotnet"));

var blogsByNum=new Dictionary<string,List<Blog>>();

await foreach(var blog in blogs.AsAsyncEnumerable())
{
    if(blogsByNum.TryGetValue(blog.BlobNumber,out var blogList))
    {
        blogList.Add(blog);
    }
    else
    {
        blogsByNum[blog.BlobNumber=new List<Blog>(100){blog};
    }
}

AsAsyncEnumerable()方法调用执行查询操作。因为结果是异步返回的,所以在迭代时我们可以将它们添加到桶中。

capacity参数用于列表构造函数,避免了列表内部缓冲区的重新分配。

使用System.LINQ.Async

如果我们有IAsyncEnumerable<>本身的LINQ操作,事情会变得更加简单。这个扩展命名空间提供了这样的功能。它由ReactiveX团队开发。它可以通过NuGet获得,当前的主要版本是4.0。

有了它,我们只需编写:

var blogs = await context.Blogs
    .Where(blog => blog.Url.Contains("dotnet"));

var blogsByNum=await blogs.AsAsyncEnumerable()   individual rows asynchronously
                          .ToLookupAsync(blog=>blog.BlobNumber);

或者
var blogsByNum=await blogs.AsAsyncEnumerable()   
                          .GroupBy(blog=>blog.BlobNumber)
                          .Select(b=>b)
                          .ToListAsync();

1
为什么重新开启只是发布重复答案(AsAsyncEnumerable() + System.LINQ.Async 包)?在这种情况下,答案的第一部分没有意义。问题是(以及整个 GitHub 辩论)如何强制 EF Core 查询中的异步客户端评估模式。对于同步 LINQ,它很简单 AsEnumerable(),其余部分通过 Enumerable LINQ 进行处理。 - Ivan Stoev
感谢提供的信息。我曾考虑使用System.LINQ.Async,但最终选择了Denis的建议,因为它需要更少的依赖项,并且对于我的简单用例效果很好。不过,我会记住这个包以备将来使用。 - Gary

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接