如何避免在异步无返回值事件处理程序中发生重入?

13
在一个WPF应用程序中,我有一个类可以通过网络接收消息。每当该类的一个对象接收到完整的消息时,就会引发一个事件。在应用程序的MainWindow中,我订阅了该事件的事件处理程序。保证在应用程序的GUI线程上调用事件处理程序。
每当调用事件处理程序时,需要将消息内容应用于模型。这样做可能非常耗费时间(在当前硬件上大约超过200毫秒)。因此,使用Task.Run将应用程序从主线程中卸载到线程池中。
现在,由于可以连续地接收消息,因此在仍在处理先前更改时可能会调用事件处理程序。确保消息仅一次应用的最简单方法是什么?到目前为止,我想到了以下方法:
using System;
using System.Threading.Tasks;
using System.Windows;

public partial class MainWindow : Window
{
    private Model model = new Model();
    private Task pending = Task.FromResult<bool>(false);

    // Assume e carries a message received over the network.
    private void OnMessageReceived(object sender, EventArgs e)
    {
        this.pending = ApplyToModel(e);
    }

    private async Task ApplyToModel(EventArgs e)
    {
        await this.pending;
        await Task.Run(() => this.model.Apply(e)); // Assume this is an expensive call.
    }
}

这看起来能够正常工作,但它似乎会产生"内存泄漏",因为应用消息的任务总是会首先等待应用前一个消息的任务。如果是这样的话,那么以下更改应该可以避免内存泄漏:

private async Task ApplyToModel(EventArgs e)
{
    if (!this.pending.IsCompleted)
    {
        await this.pending;
    }

    await Task.Run(() => this.model.Apply(e));
}

这是一种避免异步 void 事件处理程序重入的明智方式吗?

编辑:在 OnMessageReceived 中删除不必要的 await this.pending; 语句。

编辑 2:消息必须以接收到的相同顺序应用于模型。


@Servy:你是指在OnMessageReceived中吗?好问题,我想这并不是必要的。 - user49572
@Servy:我同意在OnMessageReceived中不是必需的,但在ApplyToModel中是必需的,对吧? - user49572
我现在明白你正在做什么。 - Servy
2个回答

12

我们需要感谢Stephen Toub,在他的博客系列中演示了一些非常有用的异步锁构造,包括一个async lock块。

这是来自该文章的代码(包括该系列中以前文章中的一些代码):

public class AsyncLock
{
    private readonly AsyncSemaphore m_semaphore;
    private readonly Task<Releaser> m_releaser;

    public AsyncLock()
    {
        m_semaphore = new AsyncSemaphore(1);
        m_releaser = Task.FromResult(new Releaser(this));
    }

    public Task<Releaser> LockAsync()
    {
        var wait = m_semaphore.WaitAsync();
        return wait.IsCompleted ?
            m_releaser :
            wait.ContinueWith((_, state) => new Releaser((AsyncLock)state),
                this, CancellationToken.None,
                TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
    }

    public struct Releaser : IDisposable
    {
        private readonly AsyncLock m_toRelease;

        internal Releaser(AsyncLock toRelease) { m_toRelease = toRelease; }

        public void Dispose()
        {
            if (m_toRelease != null)
                m_toRelease.m_semaphore.Release();
        }
    }
}

public class AsyncSemaphore
{
    private readonly static Task s_completed = Task.FromResult(true);
    private readonly Queue<TaskCompletionSource<bool>> m_waiters = new Queue<TaskCompletionSource<bool>>();
    private int m_currentCount;

    public AsyncSemaphore(int initialCount)
    {
        if (initialCount < 0) throw new ArgumentOutOfRangeException("initialCount");
        m_currentCount = initialCount;
    }
    public Task WaitAsync()
    {
        lock (m_waiters)
        {
            if (m_currentCount > 0)
            {
                --m_currentCount;
                return s_completed;
            }
            else
            {
                var waiter = new TaskCompletionSource<bool>();
                m_waiters.Enqueue(waiter);
                return waiter.Task;
            }
        }
    }
    public void Release()
    {
        TaskCompletionSource<bool> toRelease = null;
        lock (m_waiters)
        {
            if (m_waiters.Count > 0)
                toRelease = m_waiters.Dequeue();
            else
                ++m_currentCount;
        }
        if (toRelease != null)
            toRelease.SetResult(true);
    }
}

现在将其应用到您的情况中:

private readonly AsyncLock m_lock = new AsyncLock();

private async void OnMessageReceived(object sender, EventArgs e)
{
    using(var releaser = await m_lock.LockAsync()) 
    {
        await Task.Run(() => this.model.Apply(e));
    }
}

4
其他选择包括.NET 4.5中的SemaphoreSlim类型和我的AsyncEx库中的AsyncLock类型。 - Stephen Cleary
@StephenCleary Semaphore 解决方案不是会进行阻塞等待而不是异步等待吗? - Servy
@Servy 如果你使用它的 WaitAsync() 方法,就不会这样。 - svick
2
@StephenCleary 顺便说一下,你的 SemaphoreSlim 链接是错误的。它链接到 .Net 4.0 版本,该版本没有 WaitAsync() 方法。这里是 .Net 4.5 版本的链接。 - svick
@Servy:假设我接收到3条非常接近的消息,以至于在第一条消息仍在处理时就收到了最后一条消息。使用您的解决方案,是否保证第二条消息在第三条消息之前被处理? - user49572
@AndreasHuber 是的。锁是使用信号量实现的,您可以看到在信号量等待中它最终会存储一个任务完成源队列。新的等待将添加到队列的末尾,在每次释放时,队列前面的项目被取出以完成。 - Servy

1

如果一个事件处理程序使用了async await,我们无法在Task之外使用锁,因为每个事件调用的调用线程都是相同的,所以锁将始终让它通过。

var object m_LockObject = new Object();

private async void OnMessageReceived(object sender, EventArgs e)
{
    // Does not work
    Monitor.Enter(m_LockObject);

    await Task.Run(() => this.model.Apply(e));

    Monitor.Exit(m_LockObject);
}

但是我们可以在Task内部进行锁定,因为Task.Run始终会生成一个新的Task,该Task不会在同一线程上并行运行。

var object m_LockObject = new Object();

private async void OnMessageReceived(object sender, EventArgs e)
{
    await Task.Run(() => 
    {
        // Does work
        lock(m_LockObject)
        {
            this.model.Apply(e);
        }
    });
}

所以,当事件调用OnMessageReceived时,它立即返回,并且只有在一个接一个地进入model.Apply。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接