如何使我的方法等待所有线程完成?

3

我有一个方法,它会启动线程来执行一些任务。有两个线程异步运行一段时间,当它们的回调函数被调用时,回调会再次启动另一个线程,直到所有工作都完成。如何让我的方法等待所有这些线程完成并被启动?


可能是重复的问题:C#:等待所有线程完成 - Brian Gideon
5个回答

6
如果您使用的是 .Net 4.0,您可以使用 CountdownEvent
const int threads = 10;
using( CountdownEvent evt = new CountdownEvent(threads) )
{
    for( int x = 0; x < threads; ++x )
    {
        ThreadPool.QueueUserWorkItem((state) =>
            {
                // Do work here
                ((CountdownEvent)state).Signal();
            }, evt);
    }

    evt.Wait();
}
Console.WriteLine("Everyone finished!");

这样做的好处是在无法使用Thread.Join的情况下工作(例如,如果您正在使用线程池),并且比使用等待句柄更具扩展性(因为WaitHandle.WaitAll最多只能有64个句柄,并且您也不需要分配太多对象)。
请注意,如果您使用的是.Net 4,还可以使用任务并行库来简化此类操作。
更新:
由于您说这不是.Net 4.0,这里是一个简单版本的CountdownEvent,可用于.Net 3.5。我最初编写它是因为我需要一个可以在Mono中使用的CountdownEvent,而当时Mono还不支持.Net 4。它没有真正的灵活性,但确实可以满足您的需求。
/// <summary>
/// Represents a synchronization primitive that is signaled when its count reaches zero.
/// </summary>
/// <remarks>
/// <para>
///   This class is similar to but less versatile than .Net 4's built-in CountdownEvent.
/// </para>
/// </remarks>
public sealed class CountdownEvent : IDisposable
{
    private readonly ManualResetEvent _reachedZeroEvent = new ManualResetEvent(false);
    private volatile int _count;
    private volatile bool _disposed;

    /// <summary>
    /// Initializes a new instance of the <see cref="CountdownEvent"/> class.
    /// </summary>
    /// <param name="initialCount">The initial count.</param>
    public CountdownEvent(int initialCount)
    {
        _count = initialCount;
    }

    // Disable volatile not treated as volatile warning.
#pragma warning disable 420

    /// <summary>
    /// Signals the event by decrementing the count by one.
    /// </summary>
    /// <returns><see langword="true" /> if the count reached zero and the event was signalled; otherwise, <see langword="false"/>.</returns>
    public bool Signal()
    {
        CheckDisposed();

        // This is not meant to prevent _count from dropping below zero (that can still happen due to race conditions),
        // it's just a simple way to prevent the function from doing unnecessary work if the count has already reached zero.
        if( _count <= 0 )
            return true;

        if( Interlocked.Decrement(ref _count) <= 0 )
        {
            _reachedZeroEvent.Set();
            return true;
        }
        return false;
    }

#pragma warning restore 420

    /// <summary>
    /// Blocks the calling thread until the <see cref="CountdownEvent"/> is set.
    /// </summary>
    public void Wait()
    {
        CheckDisposed();
        _reachedZeroEvent.WaitOne();
    }

    /// <summary>
    /// Blocks the calling thread until the <see cref="CountdownEvent"/> is set, using a <see cref="TimeSpan"/> to measure the timeout.
    /// </summary>
    /// <param name="timeout">The timeout to wait, or a <see cref="TimeSpan"/> representing -1 milliseconds to wait indefinitely.</param>
    /// <returns><see langword="true"/> if the <see cref="CountdownEvent"/> was set; otherwise, <see langword="false"/>.</returns>
    public bool Wait(TimeSpan timeout)
    {
        CheckDisposed();
        return _reachedZeroEvent.WaitOne(timeout, false);
    }

    /// <summary>
    /// Blocks the calling thread until the <see cref="CountdownEvent"/> is set, using a 32-bit signed integer to measure the timeout.
    /// </summary>
    /// <param name="millisecondsTimeout">The timeout to wait, or <see cref="Timeout.Infinite"/> (-1) to wait indefinitely.</param>
    /// <returns><see langword="true"/> if the <see cref="CountdownEvent"/> was set; otherwise, <see langword="false"/>.</returns>
    public bool Wait(int millisecondsTimeout)
    {
        CheckDisposed();
        return _reachedZeroEvent.WaitOne(millisecondsTimeout, false);
    }

    /// <summary>
    /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
    /// </summary>
    public void Dispose()
    {
        Dispose(true);
        GC.SuppressFinalize(this);
    }

    private void Dispose(bool disposing)
    {
        if( !_disposed )
        {
            if( disposing )
                ((IDisposable)_reachedZeroEvent).Dispose();
            _disposed = true;
        }
    }

    private void CheckDisposed()
    {
        if( _disposed )
            throw new ObjectDisposedException(typeof(CountdownEvent).FullName);
    }
}

仅仅作为补充,基于Joe Albahari的Monitor类实现的CountdownEvent - http://www.albahari.com/threading/part4.aspx#_Writing_a_CountdownEvent - resnyanskiy

5

对所有线程简单调用Join。所以如果你只有两个线程变量:

thread1.Join();
thread2.Join();

如果你有一个集合:

foreach (Thread thread in threads)
{
    thread.Join();
}

线程完成的顺序并不重要;只有在所有线程都完成后,代码才会继续执行。

然而,如果你一直在创建新的线程,这可能没有太大帮助...你可能需要一个集合(例如队列),该集合仅在锁定时访问,并让每个线程生成活动将新线程添加到队列中...然后小心地迭代,直到队列为空:

while (true)
{
    Thread nextThread;
    lock (collectionLock)
    {
        if (queue.Count == 0)
        {
            break;
        }
        nextThread = queue.Dequeue();
    }
    nextThread.Join();
}

如果你使用的是.NET 4,最好尝试使用任务并行库,它可以使很多这方面的工作更容易 :)


我正在使用线程池,如果我只是将用户工作项排队,那么如何执行Thread.join?另外,我不能使用TPL,因为我在3.5上,无法获取4 :/ - Joey Gfd
@Joey:好的,如果你正在使用线程池,那会让事情变得更加困难。你的问题说你正在“发射线程”,这并不完全相同... 写问题时要小心谨慎 :) - Jon Skeet
我可以很容易地将它改为使用线程,我想我会这样做。 - Joey Gfd
@Joey Gfd:我更新了我的答案,加入了一个自定义的CountdownEvent实现,这是我一段时间以前为个人项目编写的,你可以在.Net 3.5中使用它。你可以轻松地将它与ThreadPool一起使用。 - Sven

1

使用WaitHandles,每个线程都应该有一个WaitHandle,例如ManualResetEvent,完成后调用事件上的Set()方法。

主方法应该使用WaitHandle.WaitAll,传入每个线程的句柄。

        IList<WaitHandle> waitHandles = new List<WaitHandle>();
        var newThread = new Thread(new ParameterizedThreadStart((handle) =>
        {
            // thread stuff goes here

            ((ManualResetEvent)handle).Set();
        }));
        var manualResetEvent = new ManualResetEvent(false);
        waitHandles.Add(manualResetEvent);
        newThread.Start(manualResetEvent);

        // create other threads similarly

        // wait for all threads to complete - specify a timeout to prevent a deadlock if a thread fails to set the event
        WaitHandle.WaitAll(waitHandles.ToArray());

1
WaitAll也有64个句柄限制,因此它的可扩展性不是很好。 - Brian Gideon

1

在启动任何线程之前,使用 Interlocked.Increment 将计数器初值设为零。在线程退出/循环返回时,使用 Interlocked.Decrement 减少计数器的值。如果任何线程将计数器减少到零,则使用 Set() 设置 AutoResetEvent。在 AutoResetEvent 上使用 WaitOne()。

敬礼, 马丁


注意到其他帖子后,我应该可能要补充一下 - 我的解决方案可以在启动多少个线程时都能正常工作 - 有时这在编译时是未知的。此外,使用Join()的解决方案应该可以正常工作,前提是线程实际上退出 - 如果它们循环并等待某些事情以后做更多的工作,则Join()将无法工作。另一个要点 - WaitHandle.WaitAll只能处理(!)64个句柄 - 这是WaitForMultipleObjects() API在内部使用的限制(现在可能是挑剔了 - 我应该闭嘴 :))。 - Martin James

0
在最简单的情况下,您可以使用Join函数。
    Threading.Thread myThread1 = new Thread(new ThreadStart(Worker1));
    Threading.Thread myThread2 = new Thread(new ThreadStart(Worker2));
    myThread1.Start();
    myThread2.Start();
    myThread1.Join();
    myThread2.Join();

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接