C#事件:如何并行处理事件

4

我有一个事件,希望以并行方式处理。我的想法是将每个回调添加到线程池中,有效地使注册事件的每个方法由线程池处理。

我的试验代码类似于以下内容:

Delegate[] delegates = myEvent.GetInvocationList();
IAsyncResult[] results = new IAsyncResult[ delegates.Count<Delegate>() ];

for ( int i = 0; i < delegates.Count<Delegate>(); i++ )
{
    IAsyncResult result = ( ( TestDelegate )delegates[ i ] ).BeginInvoke( "BeginInvoke/EndInvoke", null, null );
    results[ i ] = result;
}

for ( int i = 0; i < delegates.Length; i++ )
{
    ( ( TestDelegate )delegates[ i ] ).EndInvoke( results[ i ] );
}

我只是抱着好奇心试试,想知道如何做到。我相信有更好的方法。 我不喜欢使用创建包含lambda表达式的WaitCallback的Func。此外,与直接调用委托相比,DynamicInvoke非常慢。我怀疑这种事件处理方式是否比顺序处理更快。

我的问题是:如何使用线程池以并行方式处理事件?

由于我通常使用Mono,所以.NET 4.0或任务并行库都不是选项。

谢谢!

编辑: - 由于Earwickers的答案进行了修正示例。 - 更新了尝试代码。


你应该确保为每个BeginInvoke调用EndInvoke,以避免资源泄漏。请参见http://msdn.microsoft.com/en-us/magazine/cc164036.aspx#S3。 - Lucero
关于使用AsyncWaitHandle属性:http://msdn.microsoft.com/en-us/library/system.runtime.remoting.messaging.asyncresult.asyncwaithandle.aspx "当您调用用于进行异步方法调用的委托的EndInvoke方法时,等待句柄不会自动关闭。如果您释放了对等待句柄的所有引用,则在垃圾回收收回等待句柄时将释放系统资源。为了在使用等待句柄完成后立即释放系统资源,请调用WaitHandle.Close方法。" 哎呀。 - Lucero
4个回答

6
我会采用使用动态方法(LCG)和状态对象的方法,该对象携带参数并跟踪调用(以便您可以等待它们完成)。
代码: 像这样的东西应该可以做到(尽管还没有经过彻底测试,在某些情况下可能会抛出一些不好的异常)。
/// <summary>
/// Class for dynamic parallel invoking of a MulticastDelegate.
/// (C) 2009 Arsène von Wyss, avw@gmx.ch
/// No warranties of any kind, use at your own risk. Copyright notice must be kept in the source when re-used.
/// </summary>
public static class ParallelInvoke {
    private class ParallelInvokeContext<TDelegate> where TDelegate: class {
        private static readonly DynamicMethod invoker;
        private static readonly Type[] parameterTypes;

        static ParallelInvokeContext() {
            if (!typeof(Delegate).IsAssignableFrom(typeof(TDelegate))) {
                throw new InvalidOperationException("The TDelegate type must be a delegate");
            }
            Debug.Assert(monitor_enter != null, "Could not find the method Monitor.Enter()");
            Debug.Assert(monitor_pulse != null, "Could not find the method Monitor.Pulse()");
            Debug.Assert(monitor_exit != null, "Could not find the method Monitor.Exit()");
            FieldInfo parallelInvokeContext_activeCalls = typeof(ParallelInvokeContext<TDelegate>).GetField("activeCalls", BindingFlags.Instance|BindingFlags.NonPublic);
            Debug.Assert(parallelInvokeContext_activeCalls != null, "Could not find the private field ParallelInvokeContext.activeCalls");
            FieldInfo parallelInvokeContext_arguments = typeof(ParallelInvokeContext<TDelegate>).GetField("arguments", BindingFlags.Instance|BindingFlags.NonPublic);
            Debug.Assert(parallelInvokeContext_arguments != null, "Could not find the private field ParallelInvokeContext.arguments");
            MethodInfo delegate_invoke = typeof(TDelegate).GetMethod("Invoke", BindingFlags.Instance|BindingFlags.Public);
            Debug.Assert(delegate_invoke != null, string.Format("Could not find the method {0}.Invoke()", typeof(TDelegate).FullName));
            if (delegate_invoke.ReturnType != typeof(void)) {
                throw new InvalidOperationException("The TDelegate delegate must not have a return value");
            }
            ParameterInfo[] parameters = delegate_invoke.GetParameters();
            parameterTypes = new Type[parameters.Length];
            invoker = new DynamicMethod(string.Format("Invoker<{0}>", typeof(TDelegate).FullName), typeof(void), new[] {typeof(ParallelInvokeContext<TDelegate>), typeof(object)},
                                        typeof(ParallelInvokeContext<TDelegate>), true);
            ILGenerator il = invoker.GetILGenerator();
            LocalBuilder args = (parameters.Length > 2) ? il.DeclareLocal(typeof(object[])) : null;
            bool skipLoad = false;
            il.BeginExceptionBlock();
            il.Emit(OpCodes.Ldarg_1); // the delegate we are going to invoke
            if (args != null) {
                Debug.Assert(args.LocalIndex == 0);
                il.Emit(OpCodes.Ldarg_0);
                il.Emit(OpCodes.Ldfld, parallelInvokeContext_arguments);
                il.Emit(OpCodes.Dup);
                il.Emit(OpCodes.Stloc_0);
                skipLoad = true;
            }
            foreach (ParameterInfo parameter in parameters) {
                if (parameter.ParameterType.IsByRef) {
                    throw new InvalidOperationException("The TDelegate delegate must note have out or ref parameters");
                }
                parameterTypes[parameter.Position] = parameter.ParameterType;
                if (args == null) {
                    il.Emit(OpCodes.Ldarg_0);
                    il.Emit(OpCodes.Ldfld, parallelInvokeContext_arguments);
                } else if (skipLoad) {
                    skipLoad = false;
                } else {
                    il.Emit(OpCodes.Ldloc_0);
                }
                il.Emit(OpCodes.Ldc_I4, parameter.Position);
                il.Emit(OpCodes.Ldelem_Ref);
                if (parameter.ParameterType.IsValueType) {
                    il.Emit(OpCodes.Unbox_Any, parameter.ParameterType);
                }
            }
            il.Emit(OpCodes.Callvirt, delegate_invoke);
            il.BeginFinallyBlock();
            il.Emit(OpCodes.Ldarg_0);
            il.Emit(OpCodes.Call, monitor_enter);
            il.Emit(OpCodes.Ldarg_0);
            il.Emit(OpCodes.Dup);
            il.Emit(OpCodes.Ldfld, parallelInvokeContext_activeCalls);
            il.Emit(OpCodes.Ldc_I4_1);
            il.Emit(OpCodes.Sub);
            il.Emit(OpCodes.Dup);
            Label noPulse = il.DefineLabel();
            il.Emit(OpCodes.Brtrue, noPulse);
            il.Emit(OpCodes.Stfld, parallelInvokeContext_activeCalls);
            il.Emit(OpCodes.Ldarg_0);
            il.Emit(OpCodes.Call, monitor_pulse);
            Label exit = il.DefineLabel();
            il.Emit(OpCodes.Br, exit);
            il.MarkLabel(noPulse);
            il.Emit(OpCodes.Stfld, parallelInvokeContext_activeCalls);
            il.MarkLabel(exit);
            il.Emit(OpCodes.Ldarg_0);
            il.Emit(OpCodes.Call, monitor_exit);
            il.EndExceptionBlock();
            il.Emit(OpCodes.Ret);
        }

        [Conditional("DEBUG")]
        private static void VerifyArgumentsDebug(object[] args) {
            for (int i = 0; i < parameterTypes.Length; i++) {
                if (args[i] == null) {
                    if (parameterTypes[i].IsValueType) {
                        throw new ArgumentException(string.Format("The parameter {0} cannot be null, because it is a value type", i));
                    }
                } else if (!parameterTypes[i].IsAssignableFrom(args[i].GetType())) {
                    throw new ArgumentException(string.Format("The parameter {0} is not compatible", i));
                }
            }
        }

        private readonly object[] arguments;
        private readonly WaitCallback invokeCallback;
        private int activeCalls;

        public ParallelInvokeContext(object[] args) {
            if (parameterTypes.Length > 0) {
                if (args == null) {
                    throw new ArgumentNullException("args");
                }
                if (args.Length != parameterTypes.Length) {
                    throw new ArgumentException("The parameter count does not match");
                }
                VerifyArgumentsDebug(args);
                arguments = args;
            } else if ((args != null) && (args.Length > 0)) {
                throw new ArgumentException("This delegate does not expect any parameters");
            }
            invokeCallback = (WaitCallback)invoker.CreateDelegate(typeof(WaitCallback), this);
        }

        public void QueueInvoke(Delegate @delegate) {
            Debug.Assert(@delegate is TDelegate);
            activeCalls++;
            ThreadPool.QueueUserWorkItem(invokeCallback, @delegate);
        }
    }

    private static readonly MethodInfo monitor_enter;
    private static readonly MethodInfo monitor_exit;
    private static readonly MethodInfo monitor_pulse;

    static ParallelInvoke() {
        monitor_enter = typeof(Monitor).GetMethod("Enter", BindingFlags.Static|BindingFlags.Public, null, new[] {typeof(object)}, null);
        monitor_pulse = typeof(Monitor).GetMethod("Pulse", BindingFlags.Static|BindingFlags.Public, null, new[] {typeof(object)}, null);
        monitor_exit = typeof(Monitor).GetMethod("Exit", BindingFlags.Static|BindingFlags.Public, null, new[] {typeof(object)}, null);
    }

    public static void Invoke<TDelegate>(TDelegate @delegate) where TDelegate: class {
        Invoke(@delegate, null);
    }

    public static void Invoke<TDelegate>(TDelegate @delegate, params object[] args) where TDelegate: class {
        if (@delegate == null) {
            throw new ArgumentNullException("delegate");
        }
        ParallelInvokeContext<TDelegate> context = new ParallelInvokeContext<TDelegate>(args);
        lock (context) {
            foreach (Delegate invocationDelegate in ((Delegate)(object)@delegate).GetInvocationList()) {
                context.QueueInvoke(invocationDelegate);
            }
            Monitor.Wait(context);
        }
    }
}

使用方法:

ParallelInvoke.Invoke(yourDelegate, arguments);

注:

  • 事件处理程序中的异常未被处理(但 IL 代码包含一个 finally 来减少计数器,以便方法正确结束),这可能会引起麻烦。也可以在 IL 代码中捕获和传递异常。

  • 除了继承之外的隐式转换(例如 int 转 double)不会执行并将引发异常。

  • 使用的同步技术不分配操作系统等待句柄,这通常对性能有好处。Monitor 工作原理的说明可在 Joseph Albahari's page 上找到。

  • 经过一些性能测试,似乎这种方法比使用“本机”BeginInvoke/EndInvoke调用委托的任何方法都更具可扩展性(至少在MS CLR上)。


那将会非常有趣,期待您的样例。 - galaktor
发布了一些代码,但只进行了快速测试。看起来似乎可以工作。 - Lucero
谢谢你的代码!它似乎可以工作,而且很高兴使用起来比阅读容易得多;-) 我进行了一项小型性能测试,将您的代码与直接调用事件以及我在问题中发布的方法进行比较。结果是...ParallelInvoke:86毫秒,直接调用:21毫秒,我的尝试:36毫秒。经过几次运行,比例似乎保持不变。保留我的版本是否有任何明显的问题?如果有人感兴趣,我可以提供我的测试代码。 - galaktor
我的代码需要热身(两个静态构造函数和对动态生成方法的第一次调用),才能表现良好。这是一个完全通用的解决方案,适用于任何委托和任何数量的参数,包括值类型和引用类型(不包括byref类型,因为这并没有实际意义)。我想不出它比任何DynamicInvoke解决方案更慢的真正原因,但这也可能取决于使用的框架(Mono vs. MS CLR)。所以如果你能给我展示一些你的代码,我会有兴趣调查慢速性能的原因。 - Lucero
好的,我自己进行了一些测试。即使是对委托调用列表中BeginInvoke / EndInvoke的直接调用所需时间也比我的代码多两倍(不计入热身调用),而且这还没有对我的代码进行任何进一步的优化,也没有使用DynamicInvoke或其他“代理”方法(你的Func<,>)。我会在另一个答案中发布此代码,以便您可以自行进行比较。 - Lucero
显示剩余2条评论

4
如果已知委托的类型,可以直接调用它们的BeginInvoke并将IAsyncResults存储在数组中以等待和结束调用。请注意,应调用EndInvoke以避免潜在的资源泄漏。该代码依赖于EndInvoke等待调用完成的事实,因此不需要WaitAll(而且请注意,WaitAll存在多个问题,因此我会避免使用它)。
下面是一个代码示例,同时也是不同方法的简单基准:
public static class MainClass {
    private delegate void TestDelegate(string x);

    private static void A(string x) {}

    private static void Invoke(TestDelegate test, string s) {
        Delegate[] delegates = test.GetInvocationList();
        IAsyncResult[] results = new IAsyncResult[delegates.Length];
        for (int i = 0; i < delegates.Length; i++) {
            results[i] = ((TestDelegate)delegates[i]).BeginInvoke("string", null, null);
        }
        for (int i = 0; i < delegates.Length; i++) {
            ((TestDelegate)delegates[i]).EndInvoke(results[i]);
        }
    }

    public static void Main(string[] args) {
        Console.WriteLine("Warm-up call");
        TestDelegate test = A;
        test += A;
        test += A;
        test += A;
        test += A;
        test += A;
        test += A;
        test += A;
        test += A;
        test += A; // 10 times in the invocation list
        ParallelInvoke.Invoke(test, "string"); // warm-up
        Stopwatch sw = new Stopwatch();
        GC.Collect();
        GC.WaitForPendingFinalizers();
        Console.WriteLine("Profiling calls");
        sw.Start();
        for (int i = 0; i < 100000; i++) {
            // ParallelInvoke.Invoke(test, "string"); // profiling ParallelInvoke
            Invoke(test, "string"); // profiling native BeginInvoke/EndInvoke
        }
        sw.Stop();
        Console.WriteLine("Done in {0} ms", sw.ElapsedMilliseconds);
        Console.ReadKey(true);
    }
}

在我的非常老的笔记本电脑上,使用BeginInvoke/EndInvoke需要95553毫秒,而使用ParallelInvoke方法只需要9038毫秒(MS .NET 3.5)。因此,与ParallelInvoke解决方案相比,这种方法的扩展性不佳。

1
有关BeginInvoke/EndInvoke性能问题的解释,请参见:https://dev59.com/mHRB5IYBdhLWcg3wuZfo - Lucero
我对我的代码进行了预热和更长时间的测试运行,以便更好地比较。每个运行100,000次,ParallelInvoke花费了约16秒(15948毫秒),而Begin/EndInvoke则运行了约19秒(19149毫秒)。因此,您的解决方案似乎要快得多,这就是为什么我将您的ParallelInvoke代码标记为正确答案的原因。再次感谢。 - galaktor
不用谢。你的测试是在哪个运行时上运行的,是Mono吗?因为在我的情况下,Begin/EndInvoke的性能要差得多。另外,请注意,我对ParallelInvoke进行了小修改,并引入了invokeCallback字段,这使其在调用列表中存在多个事件的情况下表现更佳。因此,如果您使用旧代码运行测试,也许可以进行这些更改以获得更好的性能。 - Lucero
我向事件列表中添加了100,000个委托,并以3种不同的方式调用了该事件:ParallelInvoke、Begin/EndInvoke和标准方式直接调用事件。 - galaktor
我在Windows上的Mono上运行了相同的测试6次,以下是有趣的平均结果...ParallelInvoke:66309.2毫秒,直接调用:162.167毫秒,Begin/EndInvoke:24741.7毫秒。显然,在Mono上实现Begin/EndInvoke背后的代码比在MS.NET上更有效率,尽管ParallelInvoke的结果要差得多。 - galaktor
显示剩余8条评论

1

你的代码片段中似乎进行了两次异步启动。

首先,你调用委托的BeginInvoke方法 - 这会将一个工作项排队,以便线程池执行该委托。

然后,在该委托内部,你使用QueueUserWorkItem来...排队另一个工作项,以便线程池执行真正的委托。

这意味着当你从外部委托获取IAsyncResult(因此是等待句柄)时,它将在第二个工作项排队时发出完成信号,而不是在执行完成时。


你说得对,一开始我没有在那里使用BeginInvoke,只是在尝试中加入了它。我会更正这个例子的! - galaktor

0
你是为了性能而这样做吗?
只有在允许你并行使用多个硬件设备的情况下,它才能起作用,并且会在进程切换开销方面花费你的代价。

这只是我好奇的一个实验。性能测试表明,这里展示的两种并行方法都需要比标准串行方法多几百倍的时间。 - galaktor

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接