多线程:限制并发线程数

4
我需要开发一个使用多线程的应用程序。基本上,我有一个包含大约200k行的DataTable。从每一行中,我需要取出一个字段,与一个网页进行比较,然后从DataTable中删除它。
问题是,提供这些页面的服务器对并发请求有限制。因此,最多只能同时请求3个页面。
我想通过使用线程池来实现这一点,我甚至成功构建了一个简单的应用程序(锁定了DataTable),但我无法限制并发线程(即使使用SetMaxThreads),似乎它只是忽略了限制。
是否有人有现成的类似功能的解决方案?我很想看看。
我尝试使用信号量,但遇到了问题:
        static SemaphoreSlim _sem = new SemaphoreSlim(3);    // Capacity of 3
    static List<string> records = new List<string>();

    static void Main()
    {
        records.Add("aaa");
        records.Add("bbb");
        records.Add("ccc");
        records.Add("ddd");
        records.Add("eee");
        records.Add("fff");
        records.Add("ggg");
        records.Add("iii");
        records.Add("jjj");

        for (int i = 0; i < records.Count; i++ )
        {
            new Thread(ThreadJob).Start(records[i]);
        }

        Console.WriteLine(records.Count);
        Console.ReadLine();
    }

    static void ThreadJob(object id)
    {
        Console.WriteLine(id + " wants to enter");
        _sem.Wait();
        Console.WriteLine(id + " is in!");           // Only three threads
        //Thread.Sleep(1000 * (int)id);               // can be here at
        Console.WriteLine(id + " is leaving");       // a time.

        lock (records)
        {
            records.Remove((string)id);
        }

        _sem.Release();
    }

这个运行得很好,唯一的问题是,
Console.WriteLine(records.count);

返回不同的结果。 尽管我知道这是因为并非所有线程都已完成(而且在删除所有记录之前,我调用了records.count),但我找不到如何等待所有线程完成的方法。

4个回答

2

如果要等待多个线程完成,您可以使用多个EventWaitHandle,然后调用WaitHandle.WaitAll来阻止主线程,直到所有事件被触发:

// we need to keep a list of synchronization events
var finishEvents = new List<EventWaitHandle>();

for (int i = 0; i < records.Count; i++ )
{
    // for each job, create an event and add it to the list
    var signal = new EventWaitHandle(false, EventResetMode.ManualReset);
    finishEvents.Add(signal);

    // we need to catch the id in a separate variable
    // for the closure to work as expected
    var id = records[i];

    var thread = new Thread(() =>
        {
            // do the job
            ThreadJob(id);

            // signal the main thread
            signal.Set();
        });
}

WaitHandle.WaitAll(finishEvents.ToArray());

由于大多数这些线程在大部分时间内都会被挂起,因此在这种情况下最好使用ThreadPool,这样您就可以将new Thread替换为:

    ThreadPool.QueueUserWorkItem(s =>
    {
        ThreadJob(id);
        signal.Set();
    });

当您完成事件操作后,请不要忘记将其Dispose:

foreach (var evt in finishEvents)
{
    evt.Dispose();
}

[编辑]

为了将所有内容放在一个地方,以下是您的示例代码应该看起来像什么:

static Semaphore _sem = new Semaphore(3, 3);    // Capacity of 3
static List<string> _records = new List<string>(new string[] { "aaa", "bbb", "ccc", "ddd", "eee", "fff", "ggg", "hhh" });

static void Main()
{
    var finishEvents = new List<EventWaitHandle>();

    for (int i = 0; i < _records.Count; i++)
    {
        var signal = new EventWaitHandle(false, EventResetMode.ManualReset);
        finishEvents.Add(signal);

        var id = _records[i];
        var t = new Thread(() =>
        {
            ThreadJob(id);
            signal.Set();
        });

        t.Start();
    }

    WaitHandle.WaitAll(finishEvents.ToArray());

    Console.WriteLine(_records.Count);
    Console.ReadLine();
}

static void ThreadJob(object id)
{
    Console.WriteLine(id + " wants to enter");
    _sem.WaitOne();

    Console.WriteLine(id + " is in!");
    Thread.Sleep(1000);
    Console.WriteLine(id + " is leaving");

    lock (_records)
    {
        _records.Remove((string)id);
    }

    _sem.Release();
}

(请注意,我在这台机器上使用了 Semaphore 而不是 SemaphoreSlim,因为我没有 .NET 4,而且我想在更新答案之前测试代码。)

这样做会让我回到第一个问题,即如何限制并发工作线程的数量。 - Rafael Herscovici
@Rephaelпјҡдёәд»Җд№ҲпјҹдҪ еә”иҜҘд»Қ然дҪҝз”ЁдҝЎеҸ·йҮҸжқҘжҺ§еҲ¶еңЁThreadJobж–№жі•дёӯ并еҸ‘иҜ·жұӮзҡ„ж•°йҮҸпјҢиҝҷдёӘзӨәдҫӢеҸӘжҳҜзЎ®дҝқжүҖжңүзәҝзЁӢйғҪе·Іе®ҢжҲҗгҖӮ - vgru
我认为不会有问题 - ThreadJob() 对于这个解决方案没有进行更改,因此它们仍然会触发信号量,防止超过3个线程访问记录数据库。 - Ben
抱歉,我觉得我完全搞错了。 我不明白我应该在哪里实现那段代码。 这些线程让我失去了注意力。 - Rafael Herscovici
我非常感激你抽出时间写这个。 谢谢! - Rafael Herscovici
显示剩余2条评论

1

基本上,我有你在那个页面描述的相同问题,我会进一步研究。 - Rafael Herscovici
你有关于你如何完成那个项目的详细文章吗? - Rafael Herscovici
很遗憾,这篇文章只专注于信号量的使用。您有什么特定的扩展领域会觉得有帮助吗?如果上下文太狭窄,我可以写更多内容。 :-) - Colin Mackay
其实,我还有另一篇博客文章讨论了那个特定的项目,但我不认为它直接涉及到你的问题,尽管它可能会提供一种不同的思考方式来解决你整体问题。仅供好奇,这是链接:http://colinmackay.co.uk/blog/2011/04/01/tasks-that-create-more-work/ - Colin Mackay

0

首先,应该将Console.WriteLine(id + " is leaving");稍微晚一点,在锁定之后并且在释放信号量之前吗?

至于等待所有线程完成的实际操作,Groo的答案看起来更好,长期来看更健壮,但作为这段特定代码的更快/更简单的解决方案,我认为您也可以顺序调用.Join()在您想要等待的所有线程上。

static List<Thread> ThreadList = new List<Thread>(); // To keep track of them

然后在启动线程时,将当前的new Thread行替换为:

ThreadList.Add(new Thread(ThreadJob).Start(records[i]));

然后就在 Console.WriteLine 之前:

foreach( Thread t in ThreadList )
{
    t.Join();
}

如果任何一个线程没有终止,这将会锁定,而且如果你想知道哪些线程没有完成,这种方法是行不通的。


0

如果你使用的是 .net 3.5,可以使用 Semaphore

或者

如果你使用的是 .net 4.0,可以使用 SemaphoreSlim


我已经尝试过信号量,它确实可以工作,但它不能提供我从线程中所需的所有功能。 - Rafael Herscovici
当前环境中缺少哪些功能? - Colin Mackay

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接