如何实现一个线程队列/等待?

4
我有一个定时器,每15分钟调用一个函数,该函数计算我的DGV中的行数,并为每一行启动一个线程(另一个函数的线程),该线程解析网页,这可能需要1秒到10秒的时间才能完成。
虽然它在1-6行的情况下运行良好,但再多就会导致请求超时。
我希望在创建另一个线程之前等待新创建的线程完成处理,而不锁定主UI。
                for (int x = 0; x <= dataGridFollow.Rows.Count - 1; x++)
                {
                    string getID = dataGridFollow.Rows[x].Cells["ID"].Value.ToString();
                    int ID = int.Parse(getID);
                    Thread t = new Thread(new ParameterizedThreadStart(UpdateLo));
                    t.Start(ID);
                    // <- Wait for thread to finish here before getting back in the for loop
                }

在过去的24小时里,我已经进行了很多谷歌搜索,阅读了很多关于这个特定问题及其实现的内容(Thread.Join、ThreadPools、Queuing,甚至是SmartThreadPool)。

很可能我已经在某处读到了正确的答案,但我对C#的线程工具不够熟悉,无法解密它们。

感谢您的时间。


2
如果是这样,为什么要使用新线程而不是调用一个方法? - Mor Shemesh
你的for循环,如上所述,是否在UI线程上运行? - CodingGorilla
如果你启动一个线程,然后立即阻塞等待它完成,那么启动线程就没有任何意义。你可以直接调用 UpdateLo() 来避免麻烦。 - Constantin
感谢宝贵的评论,康斯坦丁说的有道理,但for循环在UI线程上,因此调用该方法会阻止UI... 我希望我不是这么糟糕。 - Leo
好的,我重新思考了我的过程,并采用了方法+BackgroundWorker,它运行得很好!感谢大家的答案,我真的很喜欢这个论坛及其社区。 - Leo
7个回答

1
你想要的是启动几个工作线程来完成某些任务。
当一个线程完成时,你可以启动一个新的线程。
我相信使用线程池或其他方法可能有更好的方式...但我很无聊,所以就想出了这个。
using System;
using System.Collections.Generic;
using System.Linq;
using System.ComponentModel;
using System.Threading;

namespace WorkerTest
{
    class Program
    {
        static void Main(string[] args)
        {
            WorkerGroup workerGroup = new WorkerGroup();

            Console.WriteLine("Starting...");

            for (int i = 0; i < 100; i++)
            {
                var work = new Action(() => 
                { 
                    Thread.Sleep(1000); //somework
                });

                workerGroup.AddWork(work);
            }

            while (workerGroup.WorkCount > 0)
            {
                Console.WriteLine(workerGroup.WorkCount);
                Thread.Sleep(1000);
            }

            Console.WriteLine("Fin");

            Console.ReadLine();
        }
    }


    public class WorkerGroup
    {
        private List<Worker> workers;

        private Queue<Action> workToDo;

        private object Lock = new object();

        public int WorkCount { get { return workToDo.Count; } }

        public WorkerGroup()
        {
            workers = new List<Worker>();
            workers.Add(new Worker());
            workers.Add(new Worker());

            foreach (var w in workers)
            {
                w.WorkCompleted += (OnWorkCompleted);
            }

            workToDo = new Queue<Action>();
        }

        private void OnWorkCompleted(object sender, EventArgs e)
        {
            FindWork();
        }

        public void AddWork(Action work)
        {
            workToDo.Enqueue(work);
            FindWork();
        }

        private void FindWork()
        {
            lock (Lock)
            {
                if (workToDo.Count > 0)
                {
                    var availableWorker = workers.FirstOrDefault(x => !x.IsBusy);
                    if (availableWorker != null)
                    {
                        var work = workToDo.Dequeue();
                        availableWorker.StartWork(work);
                    }
                }
            }
        }
    }

    public class Worker
    {
        private BackgroundWorker worker;

        private Action work;

        public bool IsBusy { get { return worker.IsBusy; } }

        public event EventHandler WorkCompleted;

        public Worker()
        {
            worker = new BackgroundWorker();
            worker.DoWork += new DoWorkEventHandler(OnWorkerDoWork);
            worker.RunWorkerCompleted += new RunWorkerCompletedEventHandler(OnWorkerRunWorkerCompleted);
        }

        private void OnWorkerRunWorkerCompleted(object sender, RunWorkerCompletedEventArgs e)
        {
            if (WorkCompleted != null)
            {
                WorkCompleted(this, EventArgs.Empty);
            }
        }

        public void StartWork(Action work)
        {
            if (!IsBusy)
            {
                this.work = work;
                worker.RunWorkerAsync();
            }
            else
            {
                throw new InvalidOperationException("Worker is busy");
            }
        }

        private void OnWorkerDoWork(object sender, DoWorkEventArgs e)
        {
            work.Invoke();
            work = null;
        }
    }
}

这只是一个起点。

您可以从操作列表开始,然后在该组操作完成时有一个已完成事件。

然后至少可以使用 ManualResetEvent 等待完成事件... 或者任何您想要的逻辑。


1
为了避免UI冻结,框架专门提供了一个类来处理这些问题:请查看BackgroundWorker类(在单独的线程上执行操作),以下是一些信息:http://msdn.microsoft.com/en-us/library/system.componentmodel.backgroundworker.aspx http://msdn.microsoft.com/en-us/magazine/cc300429.aspx 顺便说一下,如果我理解正确,您不想并行化任何操作,所以只需等待解析页面的方法完成即可。基本上,对于您网格中的每一行(foreach循环),您都会获取ID并调用该方法。如果要并行处理,只需重复使用相同的foreach循环并将其并行化即可。

http://msdn.microsoft.com/en-us/library/dd460720.aspx


0

对于一个简单的后台线程运行器,它将一次从队列中运行一个线程,您可以像这样做:

    private List<Thread> mThreads = new List<Thread>();

    public static void Main()
    {       
        Thread t = new Thread(ThreadMonitor);
        t.IsBackground = true;
        t.Start();
    }

    private static void ThreadMonitor()
    {
        while (true)
        {
            foreach (Thread t in mThreads.ToArray())
            {
                // Runs one thread in the queue and waits for it to finish
                t.Start();
                mThreads.Remove(t);
                t.Join();
            }

            Thread.Sleep(2000); // Wait before checking for new threads
        }
    }

    // Called from the UI or elsewhere to create any number of new threads to run
    public static void DoStuff()
    {
        Thread t = new Thread(DoCorestuff);
        t.IsBackground = true;
        mActiveThreads.Add(t);
    }

    public static void DoStuffCore()
    {
        // Your code here
    }

0
考虑使用异步CTP。这是Microsoft最近发布的一个异步模式,可供下载。这将极大地简化异步编程。链接是http://msdn.microsoft.com/en-us/vstudio/async.aspx。(先阅读白皮书)
你的代码应该看起来像下面这样。(我还没有验证我的语法,抱歉)。
private async Task DoTheWork()
{
    for(int x = 0; x <= dataGridFollow.Rows.Count - 1; x++) 
    { 
        string getID = dataGridFollow.Rows[x].Cells["ID"].Value.ToString(); 
        int ID = int.Parse(getID); 
        task t = new Task(new Action<object>(UpdateLo), ID); 
        t.Start();
        await t;
    }
} 

这个方法返回一个任务,可以定期检查是否完成。这遵循“点火和忘记”的模式,意味着您只需调用它,并且假定您不关心何时完成(只要在15分钟内完成即可)。

编辑
我更正了上面的语法,您需要将UpdateLo更改为接受对象而不是整数。


0
如果我理解正确,您目前正在通过UI线程循环遍历ID列表,并启动一个新线程来处理每个ID。那么您看到的阻塞问题可能是由于创建唯一线程需要太多资源。因此,在不了解更多情况的情况下,我会建议重新设计过程如下:
//Somewhere in the UI Thread
Thread worker = new Thread(new ParameterizedThreadStart(UpdateLoWorker));
worker.Start(dataGridFollow.Rows);

//worker thread
private void UpdateLoWorker(DataRowCollection rows)
{
   foreach(DataRow r in rows){
      string getID = r.Cells["ID"].Value.ToString();
      int ID = int.Parse(getID);
      UpdateLo(ID);
   }
}

在这里,您将拥有一个单一的非阻塞工作线程,按顺序处理每个ID。


0

直接调用方法或使用while循环(带有sleep调用)来检查线程的状态。

还有异步事件,但它们会调用另一个方法,而您希望从同一点继续。


0

我不知道为什么请求会超时。这听起来像是一个不同的问题。然而,我可以就你目前的方法提出一些建议。

  • 避免在具有不确定边界的循环中创建线程。创建线程需要大量开销。如果操作数量事先未知,则使用ThreadPoolTask Parallel Library
  • 通过使用Thread.Join阻塞UI线程,你无法获得所需的行为。这会导致UI变得无响应,并且它将有效地序列化操作并取消你希望通过线程获得的任何优势。

如果你真的想限制并发操作的数量,那么更好的解决方案是为启动操作创建一个单独的专用线程。该线程将无限循环等待队列中出现项目,当它们出现时,它将将其出队并使用该信息异步地启动操作(再次使用ThreadPool或TPL)。出队线程可以包含限制并发操作数量的逻辑。搜索有关生产者-消费者模式的信息,以更好地了解如何实现此功能。

虽然学习曲线有点陡峭,但谁说多线程是容易的呢?


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接