生成多个线程进行工作,然后等待所有线程完成。

53

我只是想请教一下关于多线程任务的“最佳实践”。 例如,我们有一个C#应用程序,在启动时从我们数据库中的各种“类型”表中读取数据,并将信息存储在一个集合中,我们在整个应用程序中传递该集合。这可以防止我们每次需要此信息时都访问数据库。

目前,该应用程序正在同步地从10个表中读取数据。 我真的很想让应用程序从每个表中以不同的线程读取所有表,并行运行。 应用程序将等待所有线程完成后才继续启动应用程序。

我已经研究了BackGroundWorker,但只是想请教如何实现上述内容的建议。

  1. 这种方法是否合乎逻辑,能加速应用程序的启动时间?
  2. 我们如何最好地处理所有线程,牢记每个线程的工作相互独立,我们只需要在所有线程完成后才能继续。

期待您的回答。


1
看看并行任务库,这将为您真正简化事情。 - TimothyP
你是指任务并行库(TPL)吗?我会调查一下。 - pharoc
1
TPL非常适合这个任务,但需要至少.NET 3.5sp1(如果安装Rx)或.NET 4 RC(内置框架)才能使用...您建议需要一个.NET 2解决方案。 - Reed Copsey
3
只是出于好奇,你尝试过这些解决方案之一并发现了性能提升吗?如果你读取的底层数据来自同一来源,那么将读取并行化可能根本没有任何好处,因为它们最终会相互等待资源访问。所有发生的事情只是额外的上下文切换开销。只有在满足以下两个条件时,多线程才真正有益于性能:a)实际拥有多个核心,b)受制于计算而不是共享资源的访问。 - Dan Bryant
1
如果这个问题有一个被接受的答案,尤其是考虑到为此贡献了时间的不同用户数量,那就太好了。 - demongolem
12个回答

91

我更倾向于使用单个WaitHandle来处理此问题,并使用Interlocked避免在计数器上加锁:

class Program
{
    static void Main(string[] args)
    {
        int numThreads = 10;
        ManualResetEvent resetEvent = new ManualResetEvent(false);
        int toProcess = numThreads;

        // Start workers.
        for (int i = 0; i < numThreads; i++)
        {
            new Thread(delegate()
            {
                Console.WriteLine(Thread.CurrentThread.ManagedThreadId);
                // If we're the last thread, signal
                if (Interlocked.Decrement(ref toProcess) == 0)
                    resetEvent.Set();
            }).Start();
        }

        // Wait for workers.
        resetEvent.WaitOne();
        Console.WriteLine("Finished.");
    }
}

这个方法很好用,并且可以扩展到处理任意数量的线程,而不需要引入锁。


1
你是非常正确的。所有其他解决方案可能会误导等待线程或至少将其放置在就绪队列中。有时这必须避免 - 如果不能避免,至少也应该避免。我将删除它。:P - Andras Vass
@Reed:如果我们真的要挑刺的话,:) 可以说 Interlocked 也会产生“排队”效应。(两个CPU不能同时独占同一地址的缓存进行写操作。它们必须使缓存行失效并通过主内存或缓存间通信传递新内容。如果你从不同的CPU更新相同的内存位置,这会创建一个“排队”效应——写操作必须串行化。) - Andras Vass
这个解决方案不起作用,我认为?至少对我来说是这样。我相信最后一个线程在创建时会发出事件信号。我的问题是我想让主线程等待直到最后一个线程完成,然后再发出信号。但是我很难将特定函数传递给定义线程入口点的函数。因此,如果有人想定义该函数,则可以在委托范围中的“IF语句”之后添加它。 - Shoaib
@ShoaibHaider 只需在调用 Set() 之前放置您的逻辑 - 这会在您调用 Set() 时发出信号。 - Reed Copsey
哦。我尝试在if条件语句之前放置所需的线程函数,但它没有被调用。如果它在if块之后,那么函数就会执行。那可能是问题所在。但如果我像我感觉的那样把它放在条件语句之前,这个函数不会执行吗? - Shoaib
显示剩余6条评论

28

我喜欢 @Reed 的解决方案。在 .NET 4.0 中实现相同效果的另一种方法是使用CountdownEvent

class Program
{
    static void Main(string[] args)
    {
        var numThreads = 10;
        var countdownEvent = new CountdownEvent(numThreads);

        // Start workers.
        for (var i = 0; i < numThreads; i++)
        {
            new Thread(delegate()
            {
                Console.WriteLine(Thread.CurrentThread.ManagedThreadId);
                // Signal the CountdownEvent.
                countdownEvent.Signal();
            }).Start();
        }

        // Wait for workers.
        countdownEvent.Wait();
        Console.WriteLine("Finished.");
    }
}

这很好,唯一的问题是在进入循环之前必须知道将有多少个线程...因此,如果您的循环中有 continue 的逻辑,那么这并不是很有帮助,但如果不需要,这会感觉更加清晰。 - Serj Sagan
在一个玩具示例中没有什么区别,但通常来说,处理 countdownEvent 是一个好主意 -- using (var countdownEvent = new CountdownEvent(numThreads)) { ... } - Nik

9

如果你在STA线程中有超过64个等待句柄,那么你可以像Mark所说的那样创建一个包含所有线程的列表,并在第二个循环中等待它们全部完成。

//check that all threads have completed.
foreach (Thread thread in threadList)
{
     thread.Join();

}  

7
如果您不使用.NET 4.0,则可以使用一个List<ManualResetEvent>,每个线程都有一个,然后Wait它们被Set。要等待多个线程,可以考虑使用WaitAll,但要注意64个等待句柄的限制。如果需要更多,请只需循环并单独等待每个线程。
如果您想要更快的启动体验,可能无需在启动期间等待所有数据都被读取。只需显示GUI和任何缺失的信息即可,可以用某种“更新…”图标或类似物显示为灰色。当信息到达时,只需触发事件以更新GUI。用户可以开始执行许多操作,甚至在读取所有表格的所有数据之前。

ManualResetEvent 是我最喜欢的同步工具! - Polaris878
1
Mark:这可以通过一个ManualResetEvent和Interlocked类完成。请参见下面的我的回复以获取详细信息。 - Reed Copsey

6

如果你想尝试一些冒险,可以使用C# 4.0和任务并行库:

Parallel.ForEach(jobList, curJob => {
  curJob.Process()
});

谢谢,但是对于商业应用程序来说,C# 4.0 似乎太过先进了。可能需要想出一个 C# 2.0 的解决方案。 - pharoc

4

这里有两种等待多个并行操作的模式。诀窍在于你必须把主线程视为其中一个并行操作。否则,在工作线程中完成信号和主线程等待该信号之间会存在微妙的竞争条件。

int threadCount = 1;
ManualResetEvent finished = new ManualResetEvent(false);
for (int i = 0; i < NUM_WORK_ITEMS; i++)
{
  Interlocked.Increment(ref threadCount); 
  ThreadPool.QueueUserWorkItem(delegate 
  { 
      try 
      { 
           // do work 
      } 
      finally 
      { 
          if (Interlocked.Decrement(ref threadCount) == 0) finished.Set();
      } 
  }); 
}
if (Interlocked.Decrement(ref threadCount) == 0) finished.Set();
finished.WaitOne(); 

作为个人偏好,我喜欢使用CountdownEvent类来进行计数,该类在.NET 4.0中可用。
var finished = new CountdownEvent(1);
for (int i = 0; i < NUM_WORK_ITEMS; i++)
{
  finished.AddCount();
  ThreadPool.QueueUserWorkItem(delegate 
  { 
      try 
      { 
           // do work 
      } 
      finally 
      { 
        finished.Signal();
      } 
  }); 
}
finished.Signal();
finished.Wait(); 

上面的示例使用了线程池(ThreadPool),但您可以将其替换为您喜欢的任何线程机制。

2

只是为了好玩,@Reed用Monitor做的事情。:P

class Program
{
    static void Main(string[] args)
    {
        int numThreads = 10;
        int toProcess = numThreads;
        object syncRoot = new object();

        // Start workers.
        for (int i = 0; i < numThreads; i++)
        {
            new Thread(delegate()
            {
                Console.WriteLine(Thread.CurrentThread.ManagedThreadId);
                // If we're the last thread, signal
                if (Interlocked.Decrement(ref toProcess) == 0)
                {
                    lock (syncRoot)
                    {
                        Monitor.Pulse(syncRoot);
                    }
                }
            }).Start();
        }

        // Wait for workers.
        lock (syncRoot)
        {
            if (toProcess > 0)
            {
                Monitor.Wait(syncRoot);
            }
        }

        Console.WriteLine("Finished.");
    }
}

@andras:我加入了另一种使用单个ManualResetEvent的修改版本 - 这更好,因为它完全消除了处理中锁定的必要性,因此可以更好地扩展到更大的工作程序计数。信号量选项类似,尽管我个人认为比下面的答案更复杂。 - Reed Copsey
@Reed:我对信号量的解决方案感到非常不满意。(我甚至曾经从答案中删除过它。)即使是基于监视器的解决方案看起来也要好得多。你的解决方案在所有方面都更好。(而且在性能方面,它是王者....) :) - Andras Vass

2

使用TPL的另一个可能性,假设jobs是要处理的项或运行的子线程的集合:

Task.WaitAll(jobs
    .Select(job => TaskFactory.StartNew(() => /*run job*/))
    .ToArray());

1
假设数据库读取线程在完成后立即返回,您可以从启动线程依次对所有十个线程调用Thread.Join。

0

我喜欢使用更简单的方法:

    private int ThreadsCount = 100; //initialize threads count
    private void button1_Click(object sender, EventArgs e)
    {   
        for (int i = 0; i < ThreadsCount; i++)
        {
            Thread t = new Thread(new ThreadStart(myMethod));
            t.IsBackground = true;
            t.Start(); 
        } 
    }

    private void myMethod()
    {
        //everytime a thread finishes executing decrease the threads count
        ThreadsCount = ThreadsCount - 1;

        if (ThreadsCount < 1)
        {
            //if all threads finished executing do whatever you wanna do here..
            MessageBox.Show("Finished Executing all threads!!!");
        }
    }

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接