C# Events: How to process event in a parallel mann

2019-02-07 09:29发布

I have an event which I would like to have processed in a parallel manner. My idea is to make each callback be added to the ThreadPool, effectivley having each method that registered the event handled by the ThreadPool.

My try-out code looks something like the following:

Delegate[] delegates = myEvent.GetInvocationList();
IAsyncResult[] results = new IAsyncResult[ delegates.Count<Delegate>() ];

for ( int i = 0; i < delegates.Count<Delegate>(); i++ )
{
    IAsyncResult result = ( ( TestDelegate )delegates[ i ] ).BeginInvoke( "BeginInvoke/EndInvoke", null, null );
    results[ i ] = result;
}

for ( int i = 0; i < delegates.Length; i++ )
{
    ( ( TestDelegate )delegates[ i ] ).EndInvoke( results[ i ] );
}

This is just for playing around since I am curious how to do it. I am sure there is a better way to do it. I don't like having a Func which creates a WaitCallback that holds a lambda. Also, DynamicInvoke is pretty slow compared to calling a delegate directly. I doubt this way of processing the event is any faster than just doing it sequentially.

My question is: How can I process an event in a parallel manner, preferably by using ThreadPool?

Since I usually work with Mono, .NET 4.0 or the Task Parallel Library are both not an option.

Thank you!

EDITS: - Corrected example thanks to Earwickers answer. - Updated try-out code

4条回答
啃猪蹄的小仙女
2楼-- · 2019-02-07 09:57

Are you doing this for performance?

That will only work if it allows you to get multiple pieces of hardware working in parallel, and it will cost you in process-switch overhead.

查看更多
萌系小妹纸
3楼-- · 2019-02-07 10:01

I'd go for an approach using DynamicMethod (LCG) and a state object which carries the arguments and does keep track of the calls (so that you can wait for them to complete).

Code: Something like this should do (not throughly tested yet though, may therefore throw some nasty exceptions in some situations):

/// <summary>
/// Class for dynamic parallel invoking of a MulticastDelegate.
/// (C) 2009 Arsène von Wyss, avw@gmx.ch
/// No warranties of any kind, use at your own risk. Copyright notice must be kept in the source when re-used.
/// </summary>
public static class ParallelInvoke {
    private class ParallelInvokeContext<TDelegate> where TDelegate: class {
        private static readonly DynamicMethod invoker;
        private static readonly Type[] parameterTypes;

        static ParallelInvokeContext() {
            if (!typeof(Delegate).IsAssignableFrom(typeof(TDelegate))) {
                throw new InvalidOperationException("The TDelegate type must be a delegate");
            }
            Debug.Assert(monitor_enter != null, "Could not find the method Monitor.Enter()");
            Debug.Assert(monitor_pulse != null, "Could not find the method Monitor.Pulse()");
            Debug.Assert(monitor_exit != null, "Could not find the method Monitor.Exit()");
            FieldInfo parallelInvokeContext_activeCalls = typeof(ParallelInvokeContext<TDelegate>).GetField("activeCalls", BindingFlags.Instance|BindingFlags.NonPublic);
            Debug.Assert(parallelInvokeContext_activeCalls != null, "Could not find the private field ParallelInvokeContext.activeCalls");
            FieldInfo parallelInvokeContext_arguments = typeof(ParallelInvokeContext<TDelegate>).GetField("arguments", BindingFlags.Instance|BindingFlags.NonPublic);
            Debug.Assert(parallelInvokeContext_arguments != null, "Could not find the private field ParallelInvokeContext.arguments");
            MethodInfo delegate_invoke = typeof(TDelegate).GetMethod("Invoke", BindingFlags.Instance|BindingFlags.Public);
            Debug.Assert(delegate_invoke != null, string.Format("Could not find the method {0}.Invoke()", typeof(TDelegate).FullName));
            if (delegate_invoke.ReturnType != typeof(void)) {
                throw new InvalidOperationException("The TDelegate delegate must not have a return value");
            }
            ParameterInfo[] parameters = delegate_invoke.GetParameters();
            parameterTypes = new Type[parameters.Length];
            invoker = new DynamicMethod(string.Format("Invoker<{0}>", typeof(TDelegate).FullName), typeof(void), new[] {typeof(ParallelInvokeContext<TDelegate>), typeof(object)},
                                        typeof(ParallelInvokeContext<TDelegate>), true);
            ILGenerator il = invoker.GetILGenerator();
            LocalBuilder args = (parameters.Length > 2) ? il.DeclareLocal(typeof(object[])) : null;
            bool skipLoad = false;
            il.BeginExceptionBlock();
            il.Emit(OpCodes.Ldarg_1); // the delegate we are going to invoke
            if (args != null) {
                Debug.Assert(args.LocalIndex == 0);
                il.Emit(OpCodes.Ldarg_0);
                il.Emit(OpCodes.Ldfld, parallelInvokeContext_arguments);
                il.Emit(OpCodes.Dup);
                il.Emit(OpCodes.Stloc_0);
                skipLoad = true;
            }
            foreach (ParameterInfo parameter in parameters) {
                if (parameter.ParameterType.IsByRef) {
                    throw new InvalidOperationException("The TDelegate delegate must note have out or ref parameters");
                }
                parameterTypes[parameter.Position] = parameter.ParameterType;
                if (args == null) {
                    il.Emit(OpCodes.Ldarg_0);
                    il.Emit(OpCodes.Ldfld, parallelInvokeContext_arguments);
                } else if (skipLoad) {
                    skipLoad = false;
                } else {
                    il.Emit(OpCodes.Ldloc_0);
                }
                il.Emit(OpCodes.Ldc_I4, parameter.Position);
                il.Emit(OpCodes.Ldelem_Ref);
                if (parameter.ParameterType.IsValueType) {
                    il.Emit(OpCodes.Unbox_Any, parameter.ParameterType);
                }
            }
            il.Emit(OpCodes.Callvirt, delegate_invoke);
            il.BeginFinallyBlock();
            il.Emit(OpCodes.Ldarg_0);
            il.Emit(OpCodes.Call, monitor_enter);
            il.Emit(OpCodes.Ldarg_0);
            il.Emit(OpCodes.Dup);
            il.Emit(OpCodes.Ldfld, parallelInvokeContext_activeCalls);
            il.Emit(OpCodes.Ldc_I4_1);
            il.Emit(OpCodes.Sub);
            il.Emit(OpCodes.Dup);
            Label noPulse = il.DefineLabel();
            il.Emit(OpCodes.Brtrue, noPulse);
            il.Emit(OpCodes.Stfld, parallelInvokeContext_activeCalls);
            il.Emit(OpCodes.Ldarg_0);
            il.Emit(OpCodes.Call, monitor_pulse);
            Label exit = il.DefineLabel();
            il.Emit(OpCodes.Br, exit);
            il.MarkLabel(noPulse);
            il.Emit(OpCodes.Stfld, parallelInvokeContext_activeCalls);
            il.MarkLabel(exit);
            il.Emit(OpCodes.Ldarg_0);
            il.Emit(OpCodes.Call, monitor_exit);
            il.EndExceptionBlock();
            il.Emit(OpCodes.Ret);
        }

        [Conditional("DEBUG")]
        private static void VerifyArgumentsDebug(object[] args) {
            for (int i = 0; i < parameterTypes.Length; i++) {
                if (args[i] == null) {
                    if (parameterTypes[i].IsValueType) {
                        throw new ArgumentException(string.Format("The parameter {0} cannot be null, because it is a value type", i));
                    }
                } else if (!parameterTypes[i].IsAssignableFrom(args[i].GetType())) {
                    throw new ArgumentException(string.Format("The parameter {0} is not compatible", i));
                }
            }
        }

        private readonly object[] arguments;
        private readonly WaitCallback invokeCallback;
        private int activeCalls;

        public ParallelInvokeContext(object[] args) {
            if (parameterTypes.Length > 0) {
                if (args == null) {
                    throw new ArgumentNullException("args");
                }
                if (args.Length != parameterTypes.Length) {
                    throw new ArgumentException("The parameter count does not match");
                }
                VerifyArgumentsDebug(args);
                arguments = args;
            } else if ((args != null) && (args.Length > 0)) {
                throw new ArgumentException("This delegate does not expect any parameters");
            }
            invokeCallback = (WaitCallback)invoker.CreateDelegate(typeof(WaitCallback), this);
        }

        public void QueueInvoke(Delegate @delegate) {
            Debug.Assert(@delegate is TDelegate);
            activeCalls++;
            ThreadPool.QueueUserWorkItem(invokeCallback, @delegate);
        }
    }

    private static readonly MethodInfo monitor_enter;
    private static readonly MethodInfo monitor_exit;
    private static readonly MethodInfo monitor_pulse;

    static ParallelInvoke() {
        monitor_enter = typeof(Monitor).GetMethod("Enter", BindingFlags.Static|BindingFlags.Public, null, new[] {typeof(object)}, null);
        monitor_pulse = typeof(Monitor).GetMethod("Pulse", BindingFlags.Static|BindingFlags.Public, null, new[] {typeof(object)}, null);
        monitor_exit = typeof(Monitor).GetMethod("Exit", BindingFlags.Static|BindingFlags.Public, null, new[] {typeof(object)}, null);
    }

    public static void Invoke<TDelegate>(TDelegate @delegate) where TDelegate: class {
        Invoke(@delegate, null);
    }

    public static void Invoke<TDelegate>(TDelegate @delegate, params object[] args) where TDelegate: class {
        if (@delegate == null) {
            throw new ArgumentNullException("delegate");
        }
        ParallelInvokeContext<TDelegate> context = new ParallelInvokeContext<TDelegate>(args);
        lock (context) {
            foreach (Delegate invocationDelegate in ((Delegate)(object)@delegate).GetInvocationList()) {
                context.QueueInvoke(invocationDelegate);
            }
            Monitor.Wait(context);
        }
    }
}

Usage:

ParallelInvoke.Invoke(yourDelegate, arguments);

Notes:

  • Exceptions in the event handlers are not handled (but the IL code has a finally to decrement the counter, so that the method sould end correctly) and this could cause trouble. It would be possible to catch and transfer the exceptions in the IL code as well.

  • Implicit conversions other than inheritance (such as int to double) are not performed and will throw an exception.

  • The synchronization technique used does not allocate OS wait handles, which is usually good for performance. A description of the Monitor workings can be found on Joseph Albahari's page.

  • After some performance testing, it seems that this approach scales much better than any approach using the "native" BeginInvoke/EndInvoke calls on delegates (at least in the MS CLR).

查看更多
虎瘦雄心在
4楼-- · 2019-02-07 10:11

If the type of the delegate is known, you can directly call their BeginInvoke ans store the IAsyncResults in an array to wait and end the calls. Note that you should call EndInvoke in order to avoid potential resource leaks. The code relies on the fact that EndInvoke waits until the call is finished, so that no WaitAll is required (and, mind you, WaitAll has several issues so that I'd avoid its use).

Here is a code sample, which is at the same time a simplistic benchmark for the different approaches:

public static class MainClass {
    private delegate void TestDelegate(string x);

    private static void A(string x) {}

    private static void Invoke(TestDelegate test, string s) {
        Delegate[] delegates = test.GetInvocationList();
        IAsyncResult[] results = new IAsyncResult[delegates.Length];
        for (int i = 0; i < delegates.Length; i++) {
            results[i] = ((TestDelegate)delegates[i]).BeginInvoke("string", null, null);
        }
        for (int i = 0; i < delegates.Length; i++) {
            ((TestDelegate)delegates[i]).EndInvoke(results[i]);
        }
    }

    public static void Main(string[] args) {
        Console.WriteLine("Warm-up call");
        TestDelegate test = A;
        test += A;
        test += A;
        test += A;
        test += A;
        test += A;
        test += A;
        test += A;
        test += A;
        test += A; // 10 times in the invocation list
        ParallelInvoke.Invoke(test, "string"); // warm-up
        Stopwatch sw = new Stopwatch();
        GC.Collect();
        GC.WaitForPendingFinalizers();
        Console.WriteLine("Profiling calls");
        sw.Start();
        for (int i = 0; i < 100000; i++) {
            // ParallelInvoke.Invoke(test, "string"); // profiling ParallelInvoke
            Invoke(test, "string"); // profiling native BeginInvoke/EndInvoke
        }
        sw.Stop();
        Console.WriteLine("Done in {0} ms", sw.ElapsedMilliseconds);
        Console.ReadKey(true);
    }
}

On my very old laptop this takes 95553 ms with BeginInvoke/EndInvoke vs. 9038 ms with my ParallelInvoke approach (MS .NET 3.5). So this approach scales not well compared to the ParallelInvoke solution.

查看更多
疯言疯语
5楼-- · 2019-02-07 10:16

You appear to be doing the asynchronous launching twice in your code snippet.

First you call BeginInvoke on a delegate - this queues a work item so that the thread pool will execute the delegate.

Then inside that delegate, you use QueueUserWorkItem to... queue another work item so the thread pool will execute the real delegate.

This means that when you get back an IAsyncResult (and hence a wait handle) from the outer delegate, it will signal completion when the second work item has been queued, not when it has finished executing.

查看更多
登录 后发表回答