ChannelReader.ReadAllAsync(CancellationToken)在迭代过程中并未实际取消。

4

我一直在开发一个将耗时任务排队到通道中的功能,我使用以下方式来迭代通道: await foreach(var item in channel.Reader.ReadAllAsync(cancellationToken)) {...}

我原本以为,当通过cancellationToken请求取消时,ReadAllAsync会在随后的第一次迭代中抛出异常。

但实际上,情况并非如此。该循环会一直执行,直到处理完所有的项目,然后才会抛出OperationCanceledException异常。

这看起来有点奇怪。从ChannelReadergithub repo中可以看到,取消令牌标记了[EnumeratorCancellation]属性,因此应该将其传递给围绕yield return item;生成的状态机 (如果我错了,请纠正我)。

我的问题是,这是ReadAllAsync(CancellationToken)的(有些)正常行为,还是我遗漏了什么?

这里有一个简单的测试代码,演示了该问题(在dotnetfiddle上试一试):
var channel = Channel.CreateUnbounded<int>();
for (int i = 1; i <= 10; i++) channel.Writer.TryWrite(i);
int itemsRead = 0;
var cts = new CancellationTokenSource();
try
{
    await foreach (var i in channel.Reader.ReadAllAsync(cts.Token))
    {
        Console.WriteLine($"Read item: {i}. Requested cancellation: " +
            $"{cts.Token.IsCancellationRequested}");

        if (++itemsRead > 4 && !cts.IsCancellationRequested)
        {
            Console.WriteLine("Cancelling...");
            cts.Cancel();
        }
    }
}
catch (OperationCanceledException)
{
    Console.WriteLine($"Operation cancelled. Items read: {itemsRead}");
}

以下是上述内容的输出。请注意,在应该在中途取消后,项目获取仍在继续:
Read item: 1. Requested cancellation: False
Read item: 2. Requested cancellation: False
Read item: 3. Requested cancellation: False
Read item: 4. Requested cancellation: False
Read item: 5. Requested cancellation: False
Cancelling...
Read item: 6. Requested cancellation: True
Read item: 7. Requested cancellation: True
Read item: 8. Requested cancellation: True
Read item: 9. Requested cancellation: True
Read item: 10. Requested cancellation: True
Operation cancelled. Items read: 10

2
我想这取决于你认为你正在取消什么。你可以将其视为取消实际的异步部分,但我同意文档中确实说“用于取消枚举的取消标记”,这意味着它取消了实际的枚举。我找不到任何关于该标记应该取消什么的讨论,因此在dotnet/runtime中提出问题可能值得一试。 - canton7
1
想象一下,在通道中有成千上万个耗时的项目 - 耗时的不是这些项目,而是对它们的处理。因此,处理器(await foreach循环体)应该检查取消令牌。 - Stephen Cleary
2
@StephenCleary 是的,这就是我所说的,处理可能会耗费时间。而且,最终,检查令牌并抛出异常是我要做的。不过,对我来说,这仍然似乎是一种变通方法。 - Peter Todorov
1
@jdweng,如果取消被触发,您是否期望ReadAllAsync在下一次调用时抛出异常,从而打破循环? - Peter Todorov
1
我也认为当提供的令牌被取消时,迭代器循环会中断。也许这值得向corefx团队报告一个错误? - Alek
显示剩余5条评论
2个回答

4
这种行为是经过设计的。我会从相关的 GitHub 问题(GitHub issue)复制并粘贴Stephen Toub的回答

I would like to ask if this behavior is by design.

It is. There's already data available for reading immediately, so there's effectively nothing to cancel. The implementation of the iterator is just sitting in this tight loop:

while (TryRead(out T? item)) 
{ 
   yield return item; 
} 

as long as data is available immediately. As soon as there isn't, it'll escape out to the outer loop, which will check for cancellation.

That said, it could be changed. I don't have a strong opinion on whether preferring cancellation is preferable; I expect it would depend on the use case.


1
感谢您提出这个问题。除了性能考虑之外,我仍然认为这种行为是有问题的,因为可能存在高频数据写入通道的情况,而这些数据以一种使通道永远不会为空的方式被消耗(以便它可以对取消做出反应)。因此,使用ReadAllAsync的读取任务应该在每次读取时检查取消标记作为解决方法。我已经发出了一个请求,在文档中指定这一点在这里 - Peter Todorov
1
@PeterTodorov 是的,如果我们认为当前行为是理所当然的话,我们就不能依靠ReadAllAsync及时取消await foreach循环。他们必须在循环中注入一个cancellationToken.ThrowIfCancellationRequested();作为最后一行,以便及时尊重取消请求。另一方面,如果在循环内部已经调用了可取消的API,例如await _httpClient.GetStringAsync(url, cancellationToken),那么在每次迭代中在ReadAllAsync中进行额外的检查看起来会很傻(冗余)。 - Theodor Zoulias
具有讽刺意味的是,我之所以知道这一点,是因为在我的循环中没有其他取消方式。至于可能的修复方法,我没有深入研究代码,但我认为注册一个取消回调也可能会有益处。例如,引发一些标志,指示TryRead返回false,从而打破while循环并抛出异常。是的,这仍然需要额外的检查,除非TryRead已经进行了一些检查(我相信它应该)。 - Peter Todorov

2
对于那些不得不研究同样问题的人来说,这只是一个更新而不是实际答案:现在在有关ChannelReader.ReadAllAsync()的文档中已经指定了这种行为,可以在这里找到。
由于提出了这个问题,第二个描述取消标记作用的句子(“如果数据立即准备好读取,则即使在请求取消后,该数据也可能被生成。”)已经被添加。
感谢所有参与者!

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接