定时器 + 生产者消费者

3

我是新手,正在学习线程的不同概念。

现在我正在使用计时器线程进行生产者/消费者模式。问题是,在让计时器线程滴答一定时间并处置所有创建的生产者和消费者线程之前,我不知道如何检查所有生产者和消费者线程是否完成其进程。

想请您帮助和指导我如何为此方法创建一个解决方案。

这是我的示例代码:

public class WorkerThread
{
    public BlockingQueue<Item> collection = new BlockingQueue<Item>(100);

    private Timer TimerThread { get; set; }

    public void ThreadTimer()
    {
        this.TimerThread = new Timer(new TimerCallback(StartMonitor), null, 500, Timeout.Infinite);
    }

    public void StartMonitor(object state)
    {
        List<Thread> producers = new List<Thread>();
        List<Thread> consumers = new List<Thread>();

        for (int i = 0; i < 1; i++)
        {
            producers.Add(new Thread(() => RunProducers(this.collection)));
        }

        //TODO: Start all producer threads...

        for (int i = 0; i < 2; i++)
        {
            consumers.Add(new Thread(() => RunConsumers(this.collection)));
        }

        //TODO: Start all consumer threads...

        //TODO: Let Thread wait until all worker threads are done
        //TODO: Dispose Threads

        TimerThread.Change(5000, Timeout.Infinite);

    }



    public void RunProducers(BlockingQueue<Item> collection)
    {
        List<Item> lsItems = CreateListOfItems();

        foreach(var item in lsItems)
        {
            collection.Add(item);
        }

    }

    public void RunConsumers(BlockingQueue<Item> collection) 
    {
        while(true)
        {
            Item item = collection.Take();
            Console.WriteLine("Processed[{0}] : {1}", item.ID, item.Name);
            //Thread.Sleep(100);
        }
    }

    public List<Item> CreateListOfItems()
    {
        List<Item> lsItems = new List<Item>();
        for (int i = 0; i <= 9999; i++)
        {
            lsItems.Add(new Item() { ID = i, Name = "Item[" + i + "]" });
        }
        return lsItems;
    }

}

块集合实现(由于我们的环境在.Net 3.5中,因此我们无法使用更高版本的库)。

public class BlockingQueue<T> 
{
    private readonly Queue<T> queue = new Queue<T>();
    private readonly int MaxSize;
    public bool closing;

    public BlockingQueue(int maxSize) {
        this.MaxSize = maxSize;
    }

    public void Add(T item) 
    {
        lock(queue)
        {
            while(queue.Count >= this.MaxSize)
            {
                Monitor.Wait(queue);
            }

            queue.Enqueue(item);
            if(queue.Count == 1)
            {
                Monitor.PulseAll(queue);
            }

        }
    }

    public T Take() 
    {
        lock(queue)
        {
            while(queue.Count == 0)
            {
                Monitor.Wait(queue);
            }

            T item = queue.Dequeue();
            if(queue.Count == MaxSize - 1)
            {
                Monitor.PulseAll(queue);
            }
            return item;
        }
    }

    public void Close() 
    {
        lock (queue)
        {
            closing = true;
            Monitor.PulseAll(queue);
        }
    }

    public bool TryDequeue(out T value)
    {
        lock (queue)
        {
            while (queue.Count == 0)
            {
                if (closing)
                {
                    value = default(T);
                    return false;
                }
                Monitor.Wait(queue);
            }
            value = queue.Dequeue();
            if (queue.Count == MaxSize - 1)
            {
                Monitor.PulseAll(queue);
            }
            return true;
        }
    }
}

2
你必须摆脱计时器,否则后果不堪设想。当前状态下,它会不断创建永远不会结束的消费者线程,你必须等待相当长的时间才能看到它炸毁你的程序。在代码片段中,它只是为了解决有问题的生产者代码而需要的临时措施。明智的做法是,在任何真正生成项目的实际代码中,只要有项目可用就将其推入队列中。如果需要使用计时器来模拟,则可以这样做。 - Hans Passant
我刚刚在本地环境中更新了我的代码,同时进行了基准测试,确实创建了很多消费者线程。现在我尝试只让计时器线程在每个滴答声中创建生产者线程来排队项目。 - Jonathan Daniel
1个回答

0
您可以检查所有工作线程的 IsAlive 属性。代码看起来可能不太清晰,但它是有效的:
public void StartMonitor(object state)
{
        List<Thread> producers = new List<Thread>();
        List<Thread> consumers = new List<Thread>();

        for (int i = 0; i < 1; i++)
        {
            producers.Add(new Thread(() => RunProducers(this.collection)));
        }

        //TODO: Start all producer threads...

        for (int i = 0; i < 2; i++)
        {
            consumers.Add(new Thread(() => RunConsumers(this.collection)));
        }

       //TODO: Let Thread wait until all worker threads are done
        List<Thread> to_check = new List<Thread>(producers);
        to_check.AddRange(consumers);

        while(true)
        {
            Thread.Sleep(50);
            List<Thread> is_alive = new List<Thread>();
            foreach(Thread t in to_check)
                 if(t.IsAlive)
                     is_alive.Add(t);
            if(is_alive.Count == 0)
                break;
            to_check = is_alive;
        }
        //TODO: Dispose Threads

        TimerThread.Change(5000, Timeout.Infinite);

}

或者,也许有稍微更好的方式:

    private int[] _counter = new int[1];
    private int Counter
    {
        get 
        {
            lock (_counter) { return _counter[0]; }
        }
        set 
        {
            lock (_counter) { _counter[0] = value; }
        }

    }

    public void StartMonitor(object state)
    {
        List<Thread> producers = new List<Thread>();
        List<Thread> consumers = new List<Thread>();
        Counter = 0;
        for (int i = 0; i < 1; i++)
        {
            producers.Add(new Thread(() => { Counter++; RunProducers(this.collection); Counter--; }));
        }

        //TODO: Start all producer threads...

        for (int i = 0; i < 2; i++)
        {
            consumers.Add(new Thread(() => { Counter++; RunConsumers(this.collection); Counter--; }));
        }

        //TODO: Let Thread wait until all worker threads are done
        List<Thread> to_check = new List<Thread>(producers);
        to_check.AddRange(consumers);

        while (Counter > 0)
            Thread.Sleep(50);

        //TODO: Dispose Threads

        TimerThread.Change(5000, Timeout.Infinite);

    }

为了避免使用Sleep(),您可以使用Barrier类:

    public void StartMonitor(object state)
    {
        List<Thread> producers = new List<Thread>();
        List<Thread> consumers = new List<Thread>();
        int producer_cnt = 1,
            consumer_cnt = 2;

        Barrier b = new Barrier(producer_cnt + consumer_cnt + 1);
        try
        {
            for (int i = 0; i < 1; i++)
            {
                producers.Add(new Thread(() => { try { RunProducers(this.collection); } finally { b.SignalAndWait(); } }));
            }

            //TODO: Start all producer threads...

            for (int i = 0; i < 2; i++)
            {
                consumers.Add(new Thread(() => { try { RunConsumers(this.collection); } finally { b.SignalAndWait(); } }));
            }

            //TODO: Let Thread wait until all worker threads are done
            List<Thread> to_check = new List<Thread>(producers);
            to_check.AddRange(consumers);
        }
        finally
        {
            b.SignalAndWait();
        }
        //TODO: Dispose Threads

        TimerThread.Change(5000, Timeout.Infinite);

    }

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接