可等待的AutoResetEvent

52

AutoResetEvent的异步(可等待)等效物是什么?

如果在经典线程同步中,我们会使用类似这样的代码:

    AutoResetEvent signal = new AutoResetEvent(false);

    void Thread1Proc()
    {
        //do some stuff
        //..
        //..

        signal.WaitOne(); //wait for an outer thread to signal we are good to continue

        //do some more stuff
        //..
        //..
    }

    void Thread2Proc()
    {
        //do some stuff
        //..
        //..

        signal.Set(); //signal the other thread it's good to go

        //do some more stuff
        //..
        //..
    }

我希望在新的异步处理方式中,能够出现像这样的东西:

SomeAsyncAutoResetEvent asyncSignal = new SomeAsyncAutoResetEvent();

async void Task1Proc()
{
    //do some stuff
    //..
    //..

    await asyncSignal.WaitOne(); //wait for an outer thread to signal we are good to continue

    //do some more stuff
    //..
    //..
}

async void Task2Proc()
{
    //do some stuff
    //..
    //..

    asyncSignal.Set(); //signal the other thread it's good to go

    //do some more stuff
    //..
    //..
}

我已经看到其他自定义解决方案,但是我所掌握的,在某个时间点仍然需要锁定线程。我不想为了使用新的await语法而这样做。我正在寻找一种真正的可等待信号机制,它不会锁定任何线程。

是否有我在任务并行库中遗漏的东西?

编辑:只是为了明确:SomeAsyncAutoResetEvent是我例子中完全虚构的类名。


对于一次性使用,可以使用 TaskCompletionSource,等待任务忽略其结果。 - Dark Falcon
可能是吗?请提供更具体的上下文或信息,以便我更好地理解并回答您的问题。 - Matthew Watson
@MatthewWatson 我看到它使用了锁,这会阻塞线程池中的线程。我希望有一些不涉及阻塞线程的解决方案。 - Mihai Caracostea
1
锁并不一定意味着线程被阻塞。 - Dark Falcon
@DarkFalcon 是的。在这种情况下,它甚至可能不会阻塞任何线程。 - Mihai Caracostea
@MatthewWatson 这个扩展方法使用了ThreadPool.RegisterWaitForSingleObject,它“注册一个等待WaitHandle的委托”。从这段话中我理解到,线程池中的一个线程将会在该waithandle上阻塞。我的理解是否正确? - Mihai Caracostea
11个回答

34

5
为什么 new SemaphoreSlim(1) 不起作用,WaitOne() 变成了 WaitAsync(),而 Set() 则变成了 Release() - Scott Chamberlain
2
AREs 和 Semaphores 非常相似(尽管通常用法不同)。语义上的差异在于当原语已被设置时,是否发出信号。 - Stephen Cleary
await Task.Run(() => loginWaiter.WaitOne(TimeSpan.FromSeconds(75))); 有什么问题吗? - Ashley Jackson
1
@AshleyJackson:这种方法确实使用了另一个线程。有些同步原语不允许这样做(例如MutexMonitor),但由于这是一个AutoResetEvent,所以应该可以工作。 - Stephen Cleary
7
我认为那些被命名为"Stephen"的人天生适合异步做任何事情。 - M.kazem Akhgary
8
Stephen Toub的帖子似乎已被移动到这里 - Klepto

23

如果 Stephen Toub 的博客下线了,这里是他的 AsyncAutoResetEvent 的源代码。

public class AsyncAutoResetEvent
{
    private static readonly Task s_completed = Task.FromResult(true);
    private readonly Queue<TaskCompletionSource<bool>> m_waits = new Queue<TaskCompletionSource<bool>>();
    private bool m_signaled;

    public Task WaitAsync()
    {
        lock (m_waits)
        {
            if (m_signaled)
            {
                m_signaled = false;
                return s_completed;
            }
            else
            {
                var tcs = new TaskCompletionSource<bool>();
                m_waits.Enqueue(tcs);
                return tcs.Task;
            }
        }
    }

    public void Set()
    {
        TaskCompletionSource<bool> toRelease = null;

        lock (m_waits)
        {
            if (m_waits.Count > 0)
                toRelease = m_waits.Dequeue();
            else if (!m_signaled)
                m_signaled = true;
        }

        toRelease?.SetResult(true);
    }
}

为什么在可等待的代码中可以使用常规锁?这里不能将相同的任务作为不同的线程继续并绕过锁吗? - user1713059
3
请注意,WaitAsync 方法实际上不是一个异步方法。这意味着它在处理过程中不会中途交出控制权。相反,它从 TaskCompletionSource 获取一个 Task ,在释放锁之前将其返回。 - Drew Noakes
啊,所以即使我执行 "await WaitAsync()",整个方法也肯定由同一个线程执行,因为它实际上不是异步的 - 是这样吗? "Async" 方法后缀让我误入歧途,但从我看到的内容来看,它也用在没有 "async" 关键字的方法中。 - user1713059
1
它仍然是一个异步方法,因为它返回一个任务,该任务在方法返回时可能尚未完成。但是该方法不是“async”,这意味着该方法在等待其他任务完成时不会在其主体内部产生任何中断。对于返回“Task”(或“Task<T>”)的方法来说,在其名称后面加上“Async”后缀是一种惯例。 - Drew Noakes
1
关于您最初的评论,锁定在将Task返回给调用者之前被释放,因此调用者无法绕过锁定。 - Drew Noakes

18

我认为在 MSDN 上有一个很好的例子:https://msdn.microsoft.com/zh-cn/library/hh873178%28v=vs.110%29.aspx#WHToTap

public static Task WaitOneAsync(this WaitHandle waitHandle)
{
    if (waitHandle == null) 
        throw new ArgumentNullException("waitHandle");

    var tcs = new TaskCompletionSource<bool>();
    var rwh = ThreadPool.RegisterWaitForSingleObject(waitHandle, 
        delegate { tcs.TrySetResult(true); }, null, -1, true);
    var t = tcs.Task;
    t.ContinueWith( (antecedent) => rwh.Unregister(null));
    return t;
}

绝对是最佳答案。 - Felix K.
请注意,这仅适用于ManualResetEvent而不是AutoResetEvent。在AutoResetEvent上,您需要在委托内部等待该对象;否则,下次有人调用WaitOne时,事件仍然会被标记。 - Joshua

9

这里是我设计的一个版本,它允许您指定超时时间。它源自于Stephen Toub的解决方案。我们目前在生产工作负载中使用它。

public class AsyncAutoResetEvent
{
    readonly LinkedList<TaskCompletionSource<bool>> waiters = 
        new LinkedList<TaskCompletionSource<bool>>();

    bool isSignaled;

    public AsyncAutoResetEvent(bool signaled)
    {
        this.isSignaled = signaled;
    }

    public Task<bool> WaitAsync(TimeSpan timeout)
    {
        return this.WaitAsync(timeout, CancellationToken.None);
    }

    public async Task<bool> WaitAsync(TimeSpan timeout, CancellationToken cancellationToken)
    {
        TaskCompletionSource<bool> tcs;

        lock (this.waiters)
        {
            if (this.isSignaled)
            {
                this.isSignaled = false;
                return true;
            }
            else if (timeout == TimeSpan.Zero)
            {
                return this.isSignaled;
            }
            else
            {
                tcs = new TaskCompletionSource<bool>();
                this.waiters.AddLast(tcs);
            }
        }

        Task winner = await Task.WhenAny(tcs.Task, Task.Delay(timeout, cancellationToken));
        if (winner == tcs.Task)
        {
            // The task was signaled.
            return true;
        }
        else
        {
            // We timed-out; remove our reference to the task.
            // This is an O(n) operation since waiters is a LinkedList<T>.
            lock (this.waiters)
            {
                bool removed = this.waiters.Remove(tcs);
                Debug.Assert(removed);
                return false;
            }
        }
    }

    public void Set()
    {
        lock (this.waiters)
        {
            if (this.waiters.Count > 0)
            {
                // Signal the first task in the waiters list. This must be done on a new
                // thread to avoid stack-dives and situations where we try to complete the
                // same result multiple times.
                TaskCompletionSource<bool> tcs = this.waiters.First.Value;
                Task.Run(() => tcs.SetResult(true));
                this.waiters.RemoveFirst();
            }
            else if (!this.isSignaled)
            {
                // No tasks are pending
                this.isSignaled = true;
            }
        }
    }

    public override string ToString()
    {
        return $"Signaled: {this.isSignaled.ToString()}, Waiters: {this.waiters.Count.ToString()}";
    }
}

1
我认为这个.waiters应该在Remove(tcs)操作路径中被锁定? - HelloSam
@HelloSam 我认为你是对的!已修复。感谢你指出这个问题。 - Chris Gillum
我没有太多时间来调试这个问题,但是提前告诫你:我在使用它时遇到了死锁。当一个新的线程调用event.Set()时,它会卡在toRelease.SetResult(true); - Andy
1
@Andy 感谢您的评论。自我最初发布此内容以来,我进行了额外的修复,我怀疑这可以解决您的死锁问题(在我的情况下,它是StackOverflowException)。修复方法是将 SetResult(true) 调用包装在 Task.Run(...) 中。 - Chris Gillum
我是否错了,或者它不是自动重置,在 if (winner == tcs.Task) 后返回 true? - Hugh Jeffner
没关系,我不明白它是如何工作的。结果发现,如果有等待的任务,它就不会打扰设置“isSignaled”。 - Hugh Jeffner

4

我也在寻找一个AsyncAutoResetEvent类,现在在Microsoft.VisualStudio.Threading命名空间中似乎有一个可用的。

// Summary:
//     An asynchronous implementation of an AutoResetEvent.
[DebuggerDisplay("Signaled: {signaled}")]
public class AsyncAutoResetEvent

1
这是它: https://github.com/microsoft/vs-threading/blob/main/src/Microsoft.VisualStudio.Threading/AsyncAutoResetEvent.cs - Salar

2

2
这个实现可以更简单,使用SemaphoreSlim即可。我还没有在生产环境中测试过,但应该可以正常工作。
public class AsyncAutoResetEvent : IDisposable
{
    private readonly SemaphoreSlim _waiters;

    public AsyncAutoResetEvent(bool initialState)
        => _waiters = new SemaphoreSlim(initialState ? 1 : 0, 1);

    public Task<bool> WaitOneAsync(TimeSpan timeout, CancellationToken cancellationToken = default)
        => _waiters.WaitAsync(timeout, cancellationToken);

    public Task WaitOneAsync(CancellationToken cancellationToken = default)
        => _waiters.WaitAsync(cancellationToken);

    public void Set()
    {
        lock (_waiters)
        {
            if (_waiters.CurrentCount == 0)
                _waiters.Release();
        }
    }

    public override string ToString()
        => $"Signaled: {_waiters.CurrentCount != 0}"; 

    private bool _disposed;
    protected virtual void Dispose(bool disposing)
    {
        if (!_disposed)
        {
            if (disposing)
            {
                _waiters.Dispose();
            }

            _disposed = true;
        }
    }

    public void Dispose()
    {
        // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method
        Dispose(disposing: true);
        GC.SuppressFinalize(this);
    }
}

我已经将这个添加到Flettu Lib中(这是一个专门用于此类任务的库):https://github.com/mysteryjeans/Flettu/blob/master/src/Flettu/Lock/AsyncAutoResetEvent.cs - Faraz M. Khan

2

这样做也可以,但可能会削弱使用 asyncawait 的目的。

AutoResetEvent asyncSignal = new AutoResetEvent();

async void Task1Proc()
{
    //do some stuff
    //..
    //..

    await Task.Run(() => asyncSignal.WaitOne()); //wait for an outer thread to signal we are good to continue

    //do some more stuff
    //..
    //..
}

为什么这被认为是不好的? - Yarek T
我记得几个月前写这篇答案时还知道原因,但现在不记得了。虽然在这里有一个以上的上下文切换(通过WaitOne()和await关键字),但我认为这并不是什么坏事。 - Hyunjik Bae
1
没问题。我最近一直在深入研究C#中的任务(Tasks)。据我所知,这种方法不好,因为它会浪费一个线程,先创建一个线程,然后立即通过等待使其被阻塞。我看到有一些解决方案通过某种方式使用计时器来避免这种情况,但它们似乎都非常复杂。无论如何,这里是一个赞。 - Yarek T

0
我使用Oleg Gordeev提供的MSDN示例进行扩展,并添加了一个可选的超时时间(毫秒):
public static Task WaitOneAsync(this WaitHandle waitHandle, double timeout = 0)
        {
            if (waitHandle == null) throw new ArgumentNullException("waitHandle");

            var tcs = new TaskCompletionSource<bool>();

            if (timeout > 0) 
            {
                var timer = new System.Timers.Timer(timeout) 
                { Enabled = true, AutoReset = false };

                ElapsedEventHandler del = default;
                del = delegate (object x, System.Timers.ElapsedEventArgs y)
                {
                    tcs.TrySetResult(true);
                    timer.Elapsed -= del; 
                    timer.Dispose();
                };

                timer.Elapsed += del;
            }
        
            var rwh = ThreadPool.RegisterWaitForSingleObject(waitHandle,
                      delegate { tcs.TrySetResult(true); },
                      null, -1, true);

            var t = tcs.Task;
            t.ContinueWith((antecedent) => rwh.Unregister(null));

            return t;
        }

-1

这是我使用SemaphoreSlim实现的完整代码,使用了所有的SemaphoreSlim.WaitAsync重载方法:

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

/// <summary>
/// Represents an event that, when signaled, resets automatically after releasing a single waiting task.
/// </summary>
public sealed class AutoResetEventAsync : IDisposable {

    /// <summary>
    /// Waits asynchronously until a signal is received.
    /// </summary>
    /// <returns>Task completed when the event is signaled.</returns>
    public async ValueTask WaitAsync() {
        if (CheckSignaled()) return;
        SemaphoreSlim s;
        lock (Q) Q.Enqueue(s = new(0, 1));
        await s.WaitAsync();
        lock (Q) if (Q.Count > 0 && Q.Peek() == s) Q.Dequeue().Dispose();
    }

    /// <summary>
    /// Waits asynchronously until a signal is received or the time runs out.
    /// </summary>
    /// <param name="millisecondsTimeout">The number of milliseconds to wait, <see cref="System.Threading.Timeout.Infinite"/>
    /// (-1) to wait indefinitely, or zero to return immediately.</param>
    /// <returns>Task completed when the event is signaled or the time runs out.</returns>
    public async ValueTask WaitAsync(int millisecondsTimeout) {
        if (CheckSignaled()) return;
        SemaphoreSlim s;
        lock (Q) Q.Enqueue(s = new(0, 1));
        await s.WaitAsync(millisecondsTimeout);
        lock (Q) if (Q.Count > 0 && Q.Peek() == s) Q.Dequeue().Dispose();
    }

    /// <summary>
    /// Waits asynchronously until a signal is received, the time runs out or the token is cancelled.
    /// </summary>
    /// <param name="millisecondsTimeout">The number of milliseconds to wait, <see cref="System.Threading.Timeout.Infinite"/>
    /// (-1) to wait indefinitely, or zero to return immediately.</param>
    /// <param name="cancellationToken">The <see cref="System.Threading.CancellationToken"/> to observe.</param>
    /// <returns>Task completed when the event is signaled, the time runs out or the token is cancelled.</returns>
    public async ValueTask WaitAsync(int millisecondsTimeout, CancellationToken cancellationToken) {
        if (CheckSignaled()) return;
        SemaphoreSlim s;
        lock (Q) Q.Enqueue(s = new(0, 1));
        try {
            await s.WaitAsync(millisecondsTimeout, cancellationToken);
        }
        finally {
            lock (Q) if (Q.Count > 0 && Q.Peek() == s) Q.Dequeue().Dispose();
        }
    }

    /// <summary>
    /// Waits asynchronously until a signal is received or the token is cancelled.
    /// </summary>
    /// <param name="cancellationToken">The <see cref="System.Threading.CancellationToken"/> to observe.</param>
    /// <returns>Task completed when the event is signaled or the token is cancelled.</returns>
    public async ValueTask WaitAsync(CancellationToken cancellationToken) {
        if (CheckSignaled()) return;
        SemaphoreSlim s;
        lock (Q) Q.Enqueue(s = new(0, 1));
        try {
            await s.WaitAsync(cancellationToken);
        }
        finally {
            lock (Q) if (Q.Count > 0 && Q.Peek() == s) Q.Dequeue().Dispose();
        }
    }

    /// <summary>
    /// Waits asynchronously until a signal is received or the time runs out.
    /// </summary>
    /// <param name="timeout">A <see cref="System.TimeSpan"/> that represents the number of milliseconds to wait,
    /// a <see cref="System.TimeSpan"/> that represents -1 milliseconds to wait indefinitely, or a System.TimeSpan
    /// that represents 0 milliseconds to return immediately.</param>
    /// <returns>Task completed when the event is signaled or the time runs out.</returns>
    public async ValueTask WaitAsync(TimeSpan timeout) {
        if (CheckSignaled()) return;
        SemaphoreSlim s;
        lock (Q) Q.Enqueue(s = new(0, 1));
        await s.WaitAsync(timeout);
        lock (Q) if (Q.Count > 0 && Q.Peek() == s) Q.Dequeue().Dispose();
    }

    /// <summary>
    /// Waits asynchronously until a signal is received, the time runs out or the token is cancelled.
    /// </summary>
    /// <param name="timeout">A <see cref="System.TimeSpan"/> that represents the number of milliseconds to wait,
    /// a <see cref="System.TimeSpan"/> that represents -1 milliseconds to wait indefinitely, or a System.TimeSpan
    /// that represents 0 milliseconds to return immediately.</param>
    /// <param name="cancellationToken">The <see cref="System.Threading.CancellationToken"/> to observe.</param>
    /// <returns>Task completed when the event is signaled, the time runs out or the token is cancelled.</returns>
    public async ValueTask WaitAsync(TimeSpan timeout, CancellationToken cancellationToken) {
        if (CheckSignaled()) return;
        SemaphoreSlim s;
        lock (Q) Q.Enqueue(s = new(0, 1));
        try {
            await s.WaitAsync(timeout, cancellationToken);
        }
        finally {
            lock (Q) if (Q.Count > 0 && Q.Peek() == s) Q.Dequeue().Dispose();
        }
    }

    /// <summary>
    /// Sets the state of the event to signaled, allowing one or more waiting tasks to proceed.
    /// </summary>
    public void Set() {
        SemaphoreSlim? toRelease = null;
        lock (Q) {
            if (Q.Count > 0) toRelease = Q.Dequeue();
            else if (!IsSignaled) IsSignaled = true;
        }
        toRelease?.Release();
    }

    /// <summary>
    /// Sets the state of the event to non nonsignaled, making the waiting tasks to wait.
    /// </summary>
    public void Reset() => IsSignaled = false;

    /// <summary>
    /// Disposes any semaphores left in the queue.
    /// </summary>
    public void Dispose() {
        lock (Q) {
            while (Q.Count > 0) Q.Dequeue().Dispose();
        }
    }

    /// <summary>
    /// Checks the <see cref="IsSignaled"/> state and resets it when it's signaled.
    /// </summary>
    /// <returns>True if the event was in signaled state.</returns>
    private bool CheckSignaled() {
        lock (Q) {
            if (IsSignaled) {
                IsSignaled = false;
                return true;
            }
            return false;
        }
    }

    private readonly Queue<SemaphoreSlim> Q = new();
    private volatile bool IsSignaled;

}

我使用了SemaphoreSlim,因为它提供了超时和取消令牌支持,而且这些功能是“免费”的。如果我修改了原始的.NET源代码,使得SemaphoreSlim的行为类似于AutoResetEvent,那么它甚至可能会更好,但是算了吧。如果你发现任何错误,请告诉我。


1
我在谈论这一行代码:if (IsSignaled) { IsSignaled = false; return; }。它没有受到锁的保护。IsSignaled甚至不是一个volatile字段。至于if (Q.Contains(s)),如果你确定s只能在队列头部,那么if (Q.Peak() == s)会更快,并且更能表达代码的意图。顺便问一下,如果cancellationToken被取消并且WaitAsync抛出异常会发生什么? - Theodor Zoulias
你发现了一些有趣的边界情况。我会尝试修复它们并编辑我的例子... 待会回来。 - Harry
已修复。我不得不添加 if (Q.Count > 0),因为当 Q 为空时,Peek() 会抛出异常,并且在大多数情况下,在调用 Set() 时它是空的。 - Harry
Harry,既然你意识到实现自定义异步AutoResetEvent很具有挑战性,为什么要试图自己编写呢?不如直接使用Stephen Cleary的AsyncAutoResetEvent - Theodor Zoulias
1
谢谢你的见解,我会使用你的扩展来使用更好的版本。无论如何,这是一次非常值得的学习经历。你是最有价值的球员。 - Harry
显示剩余8条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接