任务.WhenAny和SemaphoreSlim类

4
当使用WaitHandle.WaitAnySemaphore时,如下所示:
var s1 = new Semaphore(1, 1);
var s2 = new Semaphore(1, 1);

var handles = new [] { s1, s2 };

var index = WaitHandle.WaitAny(handles);

handles[index].Release();

看起来使用WaitHandle.WaitAny只会获得一个信号量。

对于异步(async/await)代码,是否可能获得类似的行为?


你是指 Task.WaitAny() 吗? - Erik Philips
我认为这并不重要,因为Task.WaitAny似乎只是Task.WhenAny的同步版本。 - drowa
我也打算发布Task.WhenAny() - Erik Philips
你应该点赞这个问题。 - drowa
1
@drowa:这几乎肯定不是你所面临问题的正确解决方案。你真正想要解决的问题是什么? - Stephen Cleary
显示剩余8条评论
3个回答

1
这是一个通用的实现WaitAnyAsync方法的示例,它可以异步地获取任何提供的信号量:
/// <summary>
/// Asynchronously waits to enter any of the semaphores in the specified array.
/// </summary>
public static async Task<SemaphoreSlim> WaitAnyAsync(SemaphoreSlim[] semaphores,
    CancellationToken cancellationToken = default)
{
    // Fast path
    cancellationToken.ThrowIfCancellationRequested();
    var acquired = semaphores.FirstOrDefault(x => x.Wait(0));
    if (acquired != null) return acquired;

    // Slow path
    using var cts = CancellationTokenSource.CreateLinkedTokenSource(
        cancellationToken);
    Task<SemaphoreSlim>[] acquireTasks = semaphores
        .Select(async s => { await s.WaitAsync(cts.Token); return s; })
        .ToArray();

    Task<SemaphoreSlim> acquiredTask = await Task.WhenAny(acquireTasks);

    cts.Cancel(); // Cancel all other tasks

    var releaseOtherTasks = acquireTasks
        .Where(task => task != acquiredTask)
        .Select(async task => (await task).Release());

    try { await Task.WhenAll(releaseOtherTasks); }
    catch (OperationCanceledException) { } // Ignore
    catch
    {
        // Consider any other error (possibly SemaphoreFullException or
        // ObjectDisposedException) as a failure, and propagate the exception.
        try { (await acquiredTask).Release(); } catch { }
        throw;
    }

    try { return await acquiredTask; }
    catch (OperationCanceledException)
    {
        // Propagate an exception holding the correct CancellationToken
        cancellationToken.ThrowIfCancellationRequested();
        throw; // Should never happen
    }
}

这种方法在争用越来越高时效率变得越来越低,因此我不建议在热路径中使用它。

请参考以下链接:https://dev59.com/AsL5oIgBc1ULPQZFhrAu#73235275 - Ruben Bartelink

1

我想不到有内置的解决方案。我会这样做:

var s1 = new SemaphoreSlim(1, 1);
var s2 = new SemaphoreSlim(1, 1);

var waits = new [] { s1.WaitAsync(), s2.WaitAsync() };

var firstWait = await Task.WhenAny(waits);

//The wait is still running - perform compensation.
if (firstWait == waits[0])
 waits[1].ContinueWith(_ => s2.Release());
if (firstWait == waits[1])
 waits[0].ContinueWith(_ => s1.Release());

这会获取两个信号量,但立即释放第二个获得的信号量。这应该是等效的。我无法想象获取不需要的信号量会有什么负面影响(当然除了性能方面)。

1
这是一个不错的开始。然而,ContinueWith 返回的任务“泄漏”并不好。 - drowa
我喜欢追踪我创建的任务。例如,在应用程序终止之前确保它们都已完成。 - drowa
1
我明白了,我也喜欢这样做。特别是对于错误跟踪非常有用。解决方案:使WaitAsync接受CancellationToken。发出等待的令牌以中止操作。然后,等待继续任务和等待任务。等待应该几乎立即完成,所有任务都将被正确关闭。(我不会添加那段代码。太多了。) - usr
我也在考虑那个解决方案。 - drowa
我在想这个的同步版本会是什么样子(即仍然使用SemaphoreSlim)。我们可能需要使用SemaphoreSlim.AvailableWaitHandle - drowa
@drowa 一个同步版本可以通过在生成的任务上调用Wait来实现。所有内部操作都是异步进行的。然后再添加等待。如果可以避免使用等待句柄,请永远不要使用它们。它们很笨拙,必须被处理。任务很灵活,甚至在等待时(在最近的.NET版本中)也不需要处理。 - usr

0

@usr的答案进行的变体解决了我稍微更一般的问题(在花了相当长时间试图将AvailableWaitHandleTask结合之后...)

    class SemaphoreSlimExtensions
    
        public static Task AwaitButReleaseAsync(this SemaphoreSlim s) => 
            s.WaitAsync().ContinueWith(_t -> s.Release(), ct,
                TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnRanToCompletion,
                TaskScheduler.Default);
        public static bool TryTake(this SemaphoreSlim s) => 
            s.Wait(0);

在我的使用案例中,await只是用来触发同步逻辑的信号,然后遍历整个集合。在我的情况下,TryTake助手是处理条件获取信号量以及依赖于此的处理的一种自然方式。
var sems = new[] { new SemaphoreSlim(1, 1), new SemaphoreSlim(1, 1) };

await Task.WhenAny(from s in sems select s.AwaitButReleaseAsync());

把它放在这里是因为我认为它干净、清晰且相对高效,但如果有改进的地方,我会很高兴看到。

等待:https://github.com/jet/propulsion/blob/c1c67e56bb329061f687e7b0282d8ec215fd9539/src/Propulsion/Submission.fs#L120-L125 处理:https://github.com/jet/propulsion/blob/c1c67e56bb329061f687e7b0282d8ec215fd9539/src/Propulsion/Submission.fs#L81-L88 - Ruben Bartelink
1
不,TaskContinuationOptions.ExecuteSynchronously 只是一个提示,TaskScheduler.Current 不一定要遵守。为了符合指南,您必须明确指定 scheduler,最可能是 TaskScheduler.Default - Theodor Zoulias
1
谢谢提供调度器的建议,我会进行修复。你说得对,现在距离原始问题有些远了,因为要求是获取一个锁,而我实际上并不需要立即获取任何一个锁。我发帖的原因是我认为这是@usr深刻技巧的应用,但是现在你指出来了,我可以看到这有些牵强。如果更多人反对缺乏直接相关性,我会将其移动到自我回答的问题中。 - Ruben Bartelink
1
你可以查看这篇文章:Task.Run vs Task.Factory.StartNew。其中有一个重载函数也接受取消标记,你可以传入 CancellationToken.None 或者 default。这篇文章主要讲述了 Task.Factory.StartNew,但是 ContinueWith 的用法类似。 - Theodor Zoulias
1
https://dev59.com/AsL5oIgBc1ULPQZFhrAu#73233209 - Ruben Bartelink
显示剩余7条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接