等待所有线程完成,带有超时限制。

70
我在编写的代码中遇到一个常见的模式,需要等待一组线程完成,同时还需要设置超时时间。超时时间应该是所有线程完成所需的时间,因此仅对每个线程执行 Thread.Join(timeout) 不起作用,因为可能的超时时间是 timeout * numThreads
目前我会像下面这样做:
var threadFinishEvents = new List<EventWaitHandle>();

foreach (DataObject data in dataList)
{
    // Create local variables for the thread delegate
    var threadFinish = new EventWaitHandle(false, EventResetMode.ManualReset);
    threadFinishEvents.Add(threadFinish);

    var localData = (DataObject) data.Clone();
    var thread = new Thread(
        delegate()
        {
            DoThreadStuff(localData);
            threadFinish.Set();
        }
    );
    thread.Start();
}

Mutex.WaitAll(threadFinishEvents.ToArray(), timeout);

然而,似乎应该有一种更简单的惯用语来表达这种情况。


这就是我会做的 - 想不出更简单的方法了。 - Guy
有没有完整的源代码可用的最终解决方案?也许可以提供更复杂的示例,以便在每个线程中通知错误,并在 WaitAll 后显示摘要? - Kiquenet
10个回答

29

我仍然认为使用Join更简单。记录预期完成时间(如Now+timeout),然后在循环中执行:

if(!thread.Join(End-now))
    throw new NotFinishedInTime();

21

在使用.NET 4.0时,我发现System.Threading.Tasks更易于操作。以下是一个可靠的旋转等待循环,直到所有任务完成后才阻塞主线程。还有Task.WaitAll,但在某些情况下并不总是有效。

        for (int i = 0; i < N; i++)
        {
            tasks[i] = Task.Factory.StartNew(() =>
            {               
                 DoThreadStuff(localData);
            });
        }
        while (tasks.Any(t => !t.IsCompleted)) { } //spin wait

45
你应该使用 Task.WaitAll(tasks) 来代替那个自旋操作。 - ladenedge
13
@T.Webster - 你说 Task.WaitAll 并不总是适用于你,你是在暗示这个方法有缺陷吗?你能提供一些例子吗?它对我来说总是有效的。另一方面,你建议的旋转操作是非常昂贵的事情,会占用你的 CPU。Task.WaitAll 是更好的解决方案。 - Alex Aza
2
如果您使用此选项,我建议至少在 while 循环体中加入 Thread.Yield() - Wouter Simons
4
抱歉,-1用于SpinWait。只给出“Task.WaitAll在某些情况下无法正常工作”这样的动机并不足以证明做出这种操作是合理的。请提供更多细节或合适的理由。 - erikkallen
1
我曾经遇到过在4.0中WaitAll不总是正常工作的情况。也许问题出在任务本身上。我通常希望任务记录结果,并且经常发现一个任务永远不会写入结果(任务中的最后一件事)。要么全部完成,要么少一个。从未出现过两个、三个或N个缺失。当它发生时,总是只有一个。我尝试了不同的旋转选项来检查结果数量是否与任务数量匹配,但它从未走出那个循环。即使WaitAll被释放,任务也在完成之前消失了。 - Quintium
显示剩余6条评论

11

这不是问题的答案(没有超时),但我做了一个非常简单的扩展方法来等待一个集合中的所有线程:

using System.Collections.Generic;
using System.Threading;
namespace Extensions
{
    public static class ThreadExtension
    {
        public static void WaitAll(this IEnumerable<Thread> threads)
        {
            if(threads!=null)
            {
                foreach(Thread thread in threads)
                { thread.Join(); }
            }
        }
    }
}

那么你只需调用:

List<Thread> threads=new List<Thread>();
//Add your threads to this collection
threads.WaitAll();

10

由于问题被推到了前面,我将继续发布我的解决方案。

using (var finished = new CountdownEvent(1)) 
{ 
  for (DataObject data in dataList) 
  {   
    finished.AddCount();
    var localData = (DataObject)data.Clone(); 
    var thread = new Thread( 
        delegate() 
        {
          try
          {
            DoThreadStuff(localData); 
            threadFinish.Set();
          }
          finally
          {
            finished.Signal();
          }
        } 
    ); 
    thread.Start(); 
  }  
  finished.Signal(); 
  finished.Wait(YOUR_TIMEOUT); 
} 

9

就我个人而言,你可以使用Thread.Join(timeout)方法并从总超时时间中减去等待线程加入所需的时间。

// pseudo-c#:

TimeSpan timeout = timeoutPerThread * threads.Count();

foreach (Thread thread in threads)
{
    DateTime start = DateTime.Now;

    if (!thread.Join(timeout))
        throw new TimeoutException();

    timeout -= (DateTime.Now - start);
}

编辑:代码现在不那么伪了。不明白为什么你会对一个得到了+4的答案进行-2的修改,因为被修改的答案和原答案完全相同,只是更简略。


这是错误的:给出了所有线程的完整超时时间,任何线程(包括正在等待的第一个线程)都可能消耗完整的超时时间。您的代码没有检查Join的结果。 - Martin v. Löwis
3
首先,这是伪代码,因为它不是真正的代码,所以是不完整的。我假设时间等于或小于零会抛出异常之类的情况。其次,如果第一个线程消耗了全部时间,那么整个操作应该超时。根据问题描述,这是正确的。 - Omer van Kloeten

7

如果您可以使用.NET的Parallel Extension,那么您可以使用Task代替原始线程,然后使用Task.WaitAll()等待它们完成。但如果您无法使用此选项,则需要考虑其他方法。


1
我读了Herbert Schildt的《C# 4.0:完全参考》一书。作者使用join给出了一个解决方案:
class MyThread
    {
        public int Count;
        public Thread Thrd;
        public MyThread(string name)
        {
            Count = 0;
            Thrd = new Thread(this.Run);
            Thrd.Name = name;
            Thrd.Start();
        }
        // Entry point of thread.
        void Run()
        {
            Console.WriteLine(Thrd.Name + " starting.");
            do
            {
                Thread.Sleep(500);
                Console.WriteLine("In " + Thrd.Name +
                ", Count is " + Count);
                Count++;
            } while (Count < 10);
            Console.WriteLine(Thrd.Name + " terminating.");
        }
    }
    // Use Join() to wait for threads to end.
    class JoinThreads
    {
        static void Main()
        {
            Console.WriteLine("Main thread starting.");
            // Construct three threads.
            MyThread mt1 = new MyThread("Child #1");
            MyThread mt2 = new MyThread("Child #2");
            MyThread mt3 = new MyThread("Child #3");
            mt1.Thrd.Join();
            Console.WriteLine("Child #1 joined.");
            mt2.Thrd.Join();
            Console.WriteLine("Child #2 joined.");
            mt3.Thrd.Join();
            Console.WriteLine("Child #3 joined.");
            Console.WriteLine("Main thread ending.");
            Console.ReadKey();
        }
    }

0

这里是一个受Martin v. Löwis答案启发的实现:

/// <summary>
/// Blocks the calling thread until all threads terminate, or the specified
/// time elapses. Returns true if all threads terminated in time, or false if
/// at least one thread has not terminated after the specified amount of time
/// elapsed.
/// </summary>
public static bool JoinAll(IEnumerable<Thread> threads, TimeSpan timeout)
{
    ArgumentNullException.ThrowIfNull(threads);
    if (timeout < TimeSpan.Zero)
        throw new ArgumentOutOfRangeException(nameof(timeout));

    Stopwatch stopwatch = Stopwatch.StartNew();
    foreach (Thread thread in threads)
    {
        if (!thread.IsAlive) continue;
        TimeSpan remaining = timeout - stopwatch.Elapsed;
        if (remaining < TimeSpan.Zero) return false;
        if (!thread.Join(remaining)) return false;
    }
    return true;
}

为了测量剩余时间,它使用 Stopwatch 而不是 DateTime.NowStopwatch 组件不受系统范围的时钟调整的影响。

用法示例:

bool allTerminated = JoinAll(new[] { thread1, thread2 }, TimeSpan.FromSeconds(10));

timeout 必须是正数或零的 TimeSpan。不支持 Timeout.InfiniteTimeSpan 常量。


0

我试图弄清楚如何做到这一点,但是我在谷歌上找不到任何答案。

我知道这是一个旧的帖子,但这是我的解决方案:

使用以下类:

class ThreadWaiter
    {
        private int _numThreads = 0;
        private int _spinTime;

        public ThreadWaiter(int SpinTime)
        {
            this._spinTime = SpinTime;
        }

        public void AddThreads(int numThreads)
        {
            _numThreads += numThreads;
        }

        public void RemoveThread()
        {
            if (_numThreads > 0)
            {
                _numThreads--;
            }
        }

        public void Wait()
        {
            while (_numThreads != 0)
            {
                System.Threading.Thread.Sleep(_spinTime);
            }
        }
    }
  1. 在执行线程之前调用Addthreads(int numThreads)。
  2. 每个线程完成后调用RemoveThread()。
  3. 在继续之前,在您想要等待所有线程完成的点处使用Wait()

0
可能的解决方案:
var tasks = dataList
    .Select(data => Task.Factory.StartNew(arg => DoThreadStuff(data), TaskContinuationOptions.LongRunning | TaskContinuationOptions.PreferFairness))
    .ToArray();

var timeout = TimeSpan.FromMinutes(1);
Task.WaitAll(tasks, timeout);

假设dataList是项目列表,每个项目需要在单独的线程中处理。

在每个线程中通知错误并在 WaitAll 后显示摘要的更复杂样例还有吗? - Kiquenet

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接