如何在C#中等待单个事件,包括超时和取消功能。

22
所以我的要求是让我的函数等待来自另一个类和另一个线程的第一个event Action<T>实例,并在我的线程上处理它,允许等待被超时或CancellationToken中断。
我想创建一个通用的函数,可以重复使用。我设法创建了一些选项(我认为)满足我的需求,但两者似乎比我想象中应该更复杂。
用法
只是为了清楚起见,此函数的示例用法如下,其中serialDevice在单独的线程上产生事件:
var eventOccurred = Helper.WaitForSingleEvent<StatusPacket>(
    cancellationToken,
    statusPacket => OnStatusPacketReceived(statusPacket),
    a => serialDevice.StatusPacketReceived += a,
    a => serialDevice.StatusPacketReceived -= a,
    5000,
    () => serialDevice.RequestStatusPacket());

选项1——ManualResetEventSlim

这个选项并不错,但是ManualResetEventSlimDispose处理比看起来要复杂。我在闭包中访问了已修改或已释放的内容,这让ReSharper感到困惑,而且真的很难理解,所以我甚至不确定它是否正确。也许有什么方法可以使其更简洁,那就是我更喜欢的,但我一时还没有想到。以下是代码。

public static bool WaitForSingleEvent<TEvent>(this CancellationToken token, Action<TEvent> handler, Action<Action<TEvent>> subscribe, Action<Action<TEvent>> unsubscribe, int msTimeout, Action initializer = null)
{
    var eventOccurred = false;
    var eventResult = default(TEvent);
    var o = new object();
    var slim = new ManualResetEventSlim();
    Action<TEvent> setResult = result => 
    {
        lock (o) // ensures we get the first event only
        {
            if (!eventOccurred)
            {
                eventResult = result;
                eventOccurred = true;
                // ReSharper disable AccessToModifiedClosure
                // ReSharper disable AccessToDisposedClosure
                if (slim != null)
                {
                    slim.Set();
                }
                // ReSharper restore AccessToDisposedClosure
                // ReSharper restore AccessToModifiedClosure
            }
        }
    };
    subscribe(setResult);
    try
    {
        if (initializer != null)
        {
            initializer();
        }
        slim.Wait(msTimeout, token);
    }
    finally // ensures unsubscription in case of exception
    {
        unsubscribe(setResult);
        lock(o) // ensure we don't access slim
        {
            slim.Dispose();
            slim = null;
        }
    }
    lock (o) // ensures our variables don't get changed in middle of things
    {
        if (eventOccurred)
        {
            handler(eventResult);
        }
        return eventOccurred;
    }
}

选项2——不使用WaitHandle的轮询

这里使用的WaitForSingleEvent函数更加简洁。我能够使用ConcurrentQueue而无需使用锁。但是,我并不喜欢轮询函数Sleep,并且我没有看到任何避免它的方法。我想传递一个WaitHandle而不是一个Func<bool>来清理Sleep,但是一旦我这样做就需要处理整个Dispose混乱的问题。

public static bool WaitForSingleEvent<TEvent>(this CancellationToken token, Action<TEvent> handler, Action<Action<TEvent>> subscribe, Action<Action<TEvent>> unsubscribe, int msTimeout, Action initializer = null)
{
    var q = new ConcurrentQueue<TEvent>();
    subscribe(q.Enqueue);
    try
    {
        if (initializer != null)
        {
            initializer();
        }
        token.Sleep(msTimeout, () => !q.IsEmpty);
    }
    finally // ensures unsubscription in case of exception
    {
        unsubscribe(q.Enqueue);
    }
    TEvent eventResult;
    var eventOccurred = q.TryDequeue(out eventResult);
    if (eventOccurred)
    {
        handler(eventResult);
    }
    return eventOccurred;
}

public static void Sleep(this CancellationToken token, int ms, Func<bool> exitCondition)
{
    var start = DateTime.Now;
    while ((DateTime.Now - start).TotalMilliseconds < ms && !exitCondition())
    {
        token.ThrowIfCancellationRequested();
        Thread.Sleep(1);
    }
}

问题

我对这两种解决方案都不是特别满意,而且我也不确定它们是否完全正确。这两种方案中有哪一种更好(习惯用语、效率等),或者是否有更简单的方法或内置函数可以满足我在这里需要做的事情?

更新:目前最佳答案

下面是TaskCompletionSource解决方案的修改版。没有长时间的闭包、锁定或任何其他东西。看起来相当简单。这里有任何错误吗?

public static bool WaitForSingleEvent<TEvent>(this CancellationToken token, Action<TEvent> onEvent, Action<Action<TEvent>> subscribe, Action<Action<TEvent>> unsubscribe, int msTimeout, Action initializer = null)
{
    var tcs = new TaskCompletionSource<TEvent>();
    Action<TEvent> handler = result => tcs.TrySetResult(result);
    var task = tcs.Task;
    subscribe(handler);
    try
    {
        if (initializer != null)
        {
            initializer();
        }
        task.Wait(msTimeout, token);
    }
    finally
    {
        unsubscribe(handler);
        // Do not dispose task http://blogs.msdn.com/b/pfxteam/archive/2012/03/25/10287435.aspx
    }
    if (task.Status == TaskStatus.RanToCompletion)
    {
        onEvent(task.Result);
        return true;
    }
    return false;
}

更新2: 另一个好的解决方案

事实证明,BlockingCollectionConcurrentQueue相似,但还有接受超时和取消标记的方法。这个解决方案中的一个好处是它可以很容易地更新为WaitForNEvents:

public static bool WaitForSingleEvent<TEvent>(this CancellationToken token, Action<TEvent> handler, Action<Action<TEvent>> subscribe, Action<Action<TEvent>> unsubscribe, int msTimeout, Action initializer = null)
{
    var q = new BlockingCollection<TEvent>();
    Action<TEvent> add = item => q.TryAdd(item);
    subscribe(add);
    try
    {
        if (initializer != null)
        {
            initializer();
        }
        TEvent eventResult;
        if (q.TryTake(out eventResult, msTimeout, token))
        {
            handler(eventResult);
            return true;
        }   
        return false;
    }
    finally
    {
        unsubscribe(add);
        q.Dispose();
    }
}

听起来你想要类似于“AutoResetEvent”的东西。你考虑过使用它的可能性吗? - Kendall Frey
@KendallFrey 是的,那似乎会让我陷入与 ManualResetEventSlim 相同的 Dispose 麻烦中,或者你有什么解决方法吗? - lobsterism
Resharper 抱怨的原因是它无法分析流程,因为订阅和取消订阅操作是通过传递参数实现的。如果您通过反射传递事件或以某种方式使流程更可验证,则可能会停止抱怨。无论如何,我个人不认为 Resharper 的警告很重要。 - Kendall Frey
这些都毫无意义。如果您无法控制事件源,就没有办法拦截像DataReceived这样在线程池线程上触发的事件。而且让它只触发一次也没有任何意义,这只意味着调用Read()不会阻塞。它根本不保证您能够获取所有想要读取的内容。 - Hans Passant
@HansPassant,您能澄清一下吗?这两个选项在我测试时都没有出错(到目前为止)。我甚至在问题中没有一个“Read”函数,所以我不知道您的意思。serialDevice.RequestStatusPacket()发送命令给我的设备以响应状态数据包,并且我需要等待响应,在不同的线程上使用超时或取消来处理事件并在自己的线程上处理该事件。不确定为什么这不合理。 - lobsterism
显示剩余2条评论
4个回答

6
您可以使用 TaskCompletionSource 来创建一个可以标记为已完成或已取消的 Task。以下是特定事件的可能实现方式:
public Task WaitFirstMyEvent(Foo target, CancellationToken cancellationToken)
{
    var tcs = new TaskCompletionSource<object>();
    Action handler = null;
    var registration = cancellationToken.Register(() =>
    {
        target.MyEvent -= handler;
        tcs.TrySetCanceled();
    });
    handler = () =>
    {
        target.MyEvent -= handler;
        registration.Dispose();
        tcs.TrySetResult(null);
    };
    target.MyEvent += handler;
    return tcs.Task;
}

在C# 5中,您可以像这样使用它:
private async Task MyMethod()
{
    ...
    await WaitFirstMyEvent(foo, cancellationToken);
    ...
}

如果你想同步等待事件,你也可以使用Wait方法:

private void MyMethod()
{
    ...
    WaitFirstMyEvent(foo, cancellationToken).Wait();
    ...
}

这是一个更通用的版本,但仍然只适用于具有Action签名的事件:
public Task WaitFirstEvent(
    Action<Action> subscribe,
    Action<Action> unsubscribe,
    CancellationToken cancellationToken)
{
    var tcs = new TaskCompletionSource<object>();
    Action handler = null;
    var registration = cancellationToken.Register(() =>
    {
        unsubscribe(handler);
        tcs.TrySetCanceled();
    });
    handler = () =>
    {
        unsubscribe(handler);
        registration.Dispose();
        tcs.TrySetResult(null);
    };
    subscribe(handler);
    return tcs.Task;
}

您可以像这样使用它:
await WaitFirstEvent(
        handler => foo.MyEvent += handler,
        handler => foo.MyEvent -= handler,
        cancellationToken);

如果你希望它能与其他事件签名(例如EventHandler)一起使用,你需要创建单独的重载。我认为没有简单的方法可以让它适用于任何签名,特别是因为参数数量并不总是相同的。


我在问题中添加了一个更新——使用您的示例作为起点的可能解决方案。我没有任何关于TaskCompletionSource或者一般的Task的经验。您是否在解决方案中看到任何明显的错误?(这是.Net 4.0和桌面应用程序,因此使用task.Wait来保持线程不是问题。) - lobsterism
@lob,嗯,你的代码不支持取消操作。此外,测试任务状态也没有意义:如果它到达那个点,状态只能是RanToCompletion,否则异常会冒泡出来。 - Thomas Levesque
请注意,我正在使用带有超时和 CancellationToken 的 task.Wait 重载。它似乎能够胜任。如果在调用 Wait 时取消了令牌,则会抛出 OperationCancellationException,如果超时,则 Status 仍为 TaskStatus.WaitingForActivation - lobsterism
@lobsterism,是的,我不知道我怎么错过了CancellationToken的使用...但是你没有捕获异常,所以它会在你检查任务状态之前传播到调用者。 - Thomas Levesque

2
你可以使用 Rx 将事件转换为可观察对象,然后转换为任务,并最终使用你的标记/超时等待该任务。与任何现有解决方案相比,这种方法的优势在于它会在事件线程上调用取消订阅,确保你的处理程序不会被调用两次。 (在第一种解决方案中,你通过 tcs.TrySetResult 而不是 tcs.SetResult 来解决这个问题,但总是能够消除 "TryDoSomething" 并确保 DoSomething 总是正确的更好)。另一个优点是代码的简单性。基本上只有一行代码。因此,你甚至不需要一个独立的函数。你可以将其内联,以便更清楚地了解你的代码到底做了什么,并且你可以进行主题变化,而无需大量的可选参数(例如你的可选初始化器、允许等待 N 个事件、或放弃超时/取消在不需要的情况下)。如果有用的话,你可以在完成时同时具有 bool 返回值和实际结果。
using System.Reactive.Linq;
using System.Reactive.Threading.Tasks;
...
public static bool WaitForSingleEvent<TEvent>(this CancellationToken token, Action<TEvent> onEvent, Action<Action<TEvent>> subscribe, Action<Action<TEvent>> unsubscribe, int msTimeout, Action initializer = null) {
    var task = Observable.FromEvent(subscribe, unsubscribe).FirstAsync().ToTask();
    if (initializer != null) {
        initializer();
    }
    try {
        var finished = task.Wait(msTimeout, token);
        if (finished) onEvent(task.Result);
        return finished;
    } catch (OperationCanceledException) { return false; }
}

1
为什么不直接使用ManualResetEventSlim.Wait (int millisecondsTimeout, CancellationToken cancellationToken)呢?

0

非常感谢您帮助他人理解......(也许可以展示串行设备代码及其操作处理程序代码)

您还可以添加一些通用类型约束,例如:

 where TEvent : EventArgs

在我的情况下,我还需要在“waiter”事件之外得到结果
所以我改变了签名,像这样
(在通用对象上快速而丑陋...)

 public static bool WaitForSingleEventWithResult<TEvent, TObjRes>(
            this CancellationToken token,
            Func<TEvent, TObjRes> onEvent,
             ...

以这种方式调用它

        var ct = new CancellationToken();
        object result;
        bool eventOccurred = ct.WaitForSingleEventWithResult<MyEventArgs, object>(
            onEvent: statusPacket => result = this.OnStatusPacketReceived(statusPacket),
            subscribe: sub => cp.StatusPacketReceived_Action += sub,
            unsubscribe: unsub => cp.StatusPacketReceived_Action -= unsub,
            msTimeout: 5 * 1000,
            initializer: /*() => serialDevice.RequestStatusPacket()*/null);

无论如何...非常感谢!


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接