在单独的线程中触发事件

25
我正在开发一个组件,需要处理实时数据并以相当快的方式向听众广播数据(具有大约100纳秒级别的准确度,如果可以的话,甚至低于这个时间)。目前,我正在从我的代码中引发一个事件,订阅者可以订阅该事件。但是,由于在C#中事件处理程序在引发事件的同一线程上运行,因此引发事件的线程将被阻塞,直到所有订阅者完成处理事件。我无法控制订阅者的代码,所以他们可能会在事件处理程序中执行任何耗时的操作,这可能会阻塞正在广播的线程。
我应该怎么做才能将数据广播给其他订阅者,但仍然可以相当快地广播数据?

http://msdn.microsoft.com/en-us/library/ms173178.aspx - Dave
3
大约需要300条指令才能完成100纳秒。如果需要在这样的速率下进行跨线程同步,祝你好运。 - usr
2
@Blam 如果你正在编写库代码,这些代码将被各种不同类型的消费者导入,并添加具有任意代码的事件处理程序... - Servy
非常不切实际的目标。数字100只对人类有意义,机器并不在乎,并用两个手指计数。选择256。 - Hans Passant
看看微软的响应式框架吧,它会为你减少很多痛苦。 - Enigmativity
显示剩余2条评论
5个回答

11

要达到100纳秒的性能目标是非常困难的。我相信要想达成这种性能,需要深刻理解你正在做什么以及为什么要这样做。

然而,异步调用事件订阅者却很容易解决。 如何解决已经在这里得到了回答,而且回答者还是Jon Skeet。

foreach (MyDelegate action in multicast.GetInvocationList())
{
    action.BeginInvoke(...);
}

编辑:我还应该提到,为了向用户提供严格的性能保证,您需要在实时操作系统上运行。


如果你发现另一个重复的问题,应该标记/投票关闭,而不是发布带有链接的答案。 - Servy
@Servy 另一个问题没有涉及时间限制。 - dss539
2
你的回答也不是。 - Servy
11
+1, 就是这样。使用BeginInvoke()达到100ns确实是一个非常困难的目标。仅上下文切换就需要数千个CPU周期 :) - Hans Passant

4

看起来你正在寻找任务。下面是我为我的工作编写的一个扩展方法,可以异步调用事件,以便每个事件处理程序都在自己的线程上。我无法对其速度进行评论,因为这从未是我的要求。


更新

根据评论,我进行了调整,使得只创建一个任务来调用所有订阅者。

/// <summary>
/// Extension method to safely encapsulate asynchronous event calls with checks
/// </summary>
/// <param name="evnt">The event to call</param>
/// <param name="sender">The sender of the event</param>
/// <param name="args">The arguments for the event</param>
/// <param name="object">The state information that is passed to the callback method</param>
/// <remarks>
/// This method safely calls the each event handler attached to the event. This method uses <see cref="System.Threading.Tasks"/> to
/// asynchronously call invoke without any exception handling. As such, if any of the event handlers throw exceptions the application will
/// most likely crash when the task is collected. This is an explicit decision since it is really in the hands of the event handler
/// creators to make sure they handle issues that occur do to their code. There isn't really a way for the event raiser to know
/// what is going on.
/// </remarks>
[System.Diagnostics.DebuggerStepThrough]
public static void AsyncSafeInvoke( this EventHandler evnt, object sender, EventArgs args )
{
    // Used to make a temporary copy of the event to avoid possibility of
    // a race condition if the last subscriber unsubscribes
    // immediately after the null check and before the event is raised.
    EventHandler handler = evnt;
    if (handler != null)
    {
        // Manually calling all event handlers so that we could capture and aggregate all the
        // exceptions that are thrown by any of the event handlers attached to this event.  
        var invocationList = handler.GetInvocationList();

        Task.Factory.StartNew(() =>
        {
            foreach (EventHandler h in invocationList)
            {
                // Explicitly not catching any exceptions. While there are several possibilities for handling these 
                // exceptions, such as a callback, the correct place to handle the exception is in the event handler.
                h.Invoke(sender, args);
            }
        });
    }
}

+1 分,但这并不能解决他的 100 纳秒限制。 - dss539
1
在这里明确复制委托是没有意义的;当它传递到方法中时已经被复制了。 - Servy
这种特定方法的另一个缺点是,你需要为每种类型的委托编写一个新的方法,但实际上除了不将其提取为方法之外,没有什么好的解决方法。 - Servy
2
我的直觉是他确实想要异步调用每个处理程序,以便一个错误的消费者不会延迟消息到其他消费者。 - dss539
1
关于空值检查事件 - 我发现最好是为每个委托订阅一个空处理程序,以避免所有极其微妙的竞态条件。public event EventHandler MyEvent += ()=>{}; 可以防止许多潜在的危险。 - dss539
显示剩余13条评论

4

您可以在事件处理程序上使用这些简单的扩展方法:

public static void Raise<T>(this EventHandler<T> handler, object sender, T e) where T : EventArgs {
    if (handler != null) handler(sender, e);
}

public static void Raise(this EventHandler handler, object sender, EventArgs e) {
    if (handler != null) handler(sender, e);
}

public static void RaiseOnDifferentThread<T>(this EventHandler<T> handler, object sender, T e) where T : EventArgs {
    if (handler != null) Task.Factory.StartNewOnDifferentThread(() => handler.Raise(sender, e));
}

public static void RaiseOnDifferentThread(this EventHandler handler, object sender, EventArgs e) {
    if (handler != null) Task.Factory.StartNewOnDifferentThread(() => handler.Raise(sender, e));
}

public static Task StartNewOnDifferentThread(this TaskFactory taskFactory, Action action) {
    return taskFactory.StartNew(action: action, cancellationToken: new CancellationToken());
}

使用方法:

public static Test() {
     myEventHandler.RaiseOnDifferentThread(null, EventArgs.Empty);
}

cancellationToken 是必要的,以确保 StartNew() 实际上使用了一个不同的线程,正如这里所解释的那样:链接


2
我不确定这种方法是否能可靠地满足 100ns 的要求,但是有一种替代方案,您可以为最终用户提供一种方法来向您提供 ConcurrentQueue,然后您会填充它,他们可以在单独的线程上监听。
class Program
{
    static void Main(string[] args)
    {
        var multicaster = new QueueMulticaster<int>();

        var listener1 = new Listener(); //Make a couple of listening Q objects. 
        listener1.Listen();
        multicaster.Subscribe(listener1);

        var listener2 = new Listener();
        listener2.Listen();
        multicaster.Subscribe(listener2);

        multicaster.Broadcast(6); //Send a 6 to both concurrent Queues. 
        Console.ReadLine();
    }
}

//The listeners would run on their own thread and poll the Q like crazy. 
class Listener : IListenToStuff<int>
{
    public ConcurrentQueue<int> StuffQueue { get; set; }

    public void Listen()
    {
        StuffQueue = new ConcurrentQueue<int>();
        var t = new Thread(ListenAggressively);
        t.Start();

    }

    void ListenAggressively()
    {
        while (true)
        {
            int val;
            if(StuffQueue.TryDequeue(out val))
                Console.WriteLine(val);
        }
    }
}

//Simple class that allows you to subscribe a Queue to a broadcast event. 
public class QueueMulticaster<T>
{
    readonly List<IListenToStuff<T>> _subscribers = new List<IListenToStuff<T>>();
    public void Subscribe(IListenToStuff<T> subscriber)
    {
        _subscribers.Add(subscriber);
    }
    public void Broadcast(T value)
    {
        foreach (var listenToStuff in _subscribers)
        {
            listenToStuff.StuffQueue.Enqueue(value);
        }
    }
}

public interface IListenToStuff<T>
{
    ConcurrentQueue<T> StuffQueue { get; set; }
}

鉴于您不能阻止其他侦听器的处理,因此意味着需要使用多个线程。在侦听器上拥有专用的监听线程似乎是一种合理的方法尝试,而并发队列似乎是一个不错的传递机制。在此实现中,它只是不断轮询,但您可以使用类似于AutoResetEvent的线程信号来减少CPU负载。


你可以使用 BlockingCollection http://msdn.microsoft.com/en-us/library/dd267312.aspx 来简化代码,而不是使用 AutoResetEvent。 - dss539

0

信号和共享内存非常快。您可以发送单独的信号,告诉应用程序从共享内存位置读取消息。当然,如果您想要低延迟,信号仍然是应用程序必须在高优先级线程上消耗的事件。我会在数据中包含时间标记,以便接收方可以补偿不可避免的延迟。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接