- 只有线程A添加项目,只有线程B读取它们
- 线程A绝不能阻塞,但线程B不关注性能,所以它可以阻塞
- 添加项目必须始终成功,因此队列不能有上限大小(除非系统内存耗尽)
- 如果队列为空,线程B应该等待直到有项目可供处理
以下是如何在C++中编写无锁队列的方法:
http://www.ddj.com/hpc-high-performance-computing/210604448
但是当你说“线程A不能阻塞”时,你确定这是要求吗?Windows不是实时操作系统(在正常使用情况下Linux也不是)。如果你想让线程A能够使用所有可用的系统内存,则需要分配内存(或等待其他人分配)。操作系统本身无法提供比读写器都获取进程锁(即非共享互斥量)以操作列表时更好的时间保证。而添加消息的最坏情况将不得不去操作系统获取内存。Writer:
allocate message and fill it in
acquire lock
append node to intrusive list
signal condition variable
release lock
Reader:
for(;;)
acquire lock
for(;;)
if there's a node
remove it
break
else
wait on condition variable
endif
endfor
release lock
process message
free message
endfor
Visual Studio 2010 正在添加两个新库,非常好地支持了这种情况,即 异步代理库 和并行模式库。
代理库支持异步消息传递,并包含用于向“目标”发送消息和从“源”接收消息的消息块。
unbounded_buffer 是一个模板类,提供了我认为您正在寻找的功能:
#include <agents.h>
#include <ppl.h>
#include <iostream>
using namespace ::Concurrency;
using namespace ::std;
int main()
{
//to hold our messages, the buffer is unbounded...
unbounded_buffer<int> buf1;
task_group tasks;
//thread 1 sends messages to the unbounded_buffer
//without blocking
tasks.run([&buf1](){
for(int i = 0 ; i < 10000; ++i)
send(&buf1,i)
//signal exit
send(&buf1,-1);
});
//thread 2 receives messages and blocks if there are none
tasks.run([&buf1](){
int result;
while(result = receive(&buf1)!=-1)
{
cout << "I got a " << result << endl;
}
});
//wait for the threads to end
tasks.wait();
}
为什么不使用带有互斥锁的STL <list
>或<deque
>?STL的线程安全性不够吗?
为什么不创建一个包含指针的(单/双)链表节点类,并使要添加/删除的项从中继承?这样就不需要额外分配了。您只需在threadA::add()
和threadB::remove()
中调整一些指针,就可以完成操作。(虽然您会想在互斥锁下执行此操作,但除非您做错了什么,否则对threadA的阻塞效果将是可以忽略不计的...)
如果您正在使用pthread,请查看sem_post()
和sem_wait()
。其思想是,通过sem_wait()
,threadB可以无限期地阻塞,直到threadA将某些内容放入队列中。然后,threadA调用sem_post()
。这会唤醒threadB来完成它的工作。之后,threadB可以回到睡眠状态。这是一种处理异步信号的有效方法,支持在threadB::remove()
完成之前进行多个threadA::add()
的操作。
您可能需要考虑您的需求 - 是否真的是A不能丢弃任何队列项?还是说您不希望B从队列中连续取出两个不连续的元素,因为这会错误地表示一系列事件?
例如,如果这是某种数据记录系统,您(可以理解地)不希望记录中有间隙 - 但是在没有无限内存的情况下,在某些角落案例中,您可能会超过队列容量。
在这种情况下,一个解决方案是拥有某种特殊元素,可以放入队列中,表示A发现必须删除项目的情况。基本上,您保留一个额外的元素,大部分时间为空。每次A要向队列添加元素时,如果这个额外的元素不为空,则将其放入队列中。如果A发现队列中没有空间,则配置此额外元素以表示“嘿,队列已满”。
这种方式下,A 永远不会被阻塞,当系统非常繁忙时,您可以丢弃元素,但是您不会失去元素被丢弃的视线,因为一旦队列空间变得可用,此标记就会插入以指示数据丢失发生的位置。然后,当进程 B 发现它已经从队列中取出了这个超限标记元素时,它将执行其需要执行的任何操作。