多线程单读单写FIFO队列

8
我需要一个队列来从一个线程(A)传递消息到另一个线程(B),但是我一直无法找到真正符合我的要求的队列,因为它们通常允许添加项目失败,而在我的情况下,这几乎是致命的,因为消息需要被处理,而线程不能停止并等待空闲空间。
  • 只有线程A添加项目,只有线程B读取它们
  • 线程A绝不能阻塞,但线程B不关注性能,所以它可以阻塞
  • 添加项目必须始终成功,因此队列不能有上限大小(除非系统内存耗尽)
  • 如果队列为空,线程B应该等待直到有项目可供处理

你使用哪个线程库?pthreads? - Cem Kalyoncu
boost::thread和一些平台特定的代码在这里和那里。 - Fire Lancer
你的目标可能会导致内存耗尽,因为你不允许写入线程阻塞或丢弃项目。因此,如果队列达到关键大小限制,你必须决定是丢弃项目还是阻塞写入线程。否则,你间接地丢弃项目,因为程序失败了 :-) - mmmmmmmm
队列最多可能同时有<100个项目,我预计线程B会大部分时间等待,因为队列为空。但是,线程B可能会被某些东西阻塞相当长的时间(这就是线程B正在做工作而不是A的整个原因),从而允许项目数量迅速增加。如果实际上内存用尽,中止是最好的选择,因为我认为真正继续的唯一方法是放弃非必要数据,而我想线程A的音频处理属于该类别。 - Fire Lancer
4个回答

7

以下是如何在C++中编写无锁队列的方法:

http://www.ddj.com/hpc-high-performance-computing/210604448

但是当你说“线程A不能阻塞”时,你确定这是要求吗?Windows不是实时操作系统(在正常使用情况下Linux也不是)。如果你想让线程A能够使用所有可用的系统内存,则需要分配内存(或等待其他人分配)。操作系统本身无法提供比读写器都获取进程锁(即非共享互斥量)以操作列表时更好的时间保证。而添加消息的最坏情况将不得不去操作系统获取内存。
简而言之,那些你不喜欢的队列有固定容量的原因是为了避免在所谓的低延迟线程中分配内存。
因此,无锁代码通常会少阻塞一些,但由于内存分配不是保证的,使用互斥锁的性能也不应该太差,除非你有真正庞大的事件流要处理(比如,你正在编写网络驱动程序,而消息是传入的以太网数据包)。
因此,在伪代码中,我尝试的第一件事将是:
Writer:
    allocate message and fill it in
    acquire lock
        append node to intrusive list
        signal condition variable
    release lock

Reader:
    for(;;)
        acquire lock
            for(;;)
                if there's a node
                    remove it
                    break
                else
                   wait on condition variable
                endif
            endfor
        release lock
        process message
        free message
    endfor

只有在这种情况下,如果这样做会导致写入线程出现不可接受的延迟,我才会采用无锁代码(除非我已经有一个适当的队列)。

在较低的层次上,可以使用单链表来进行写入和读取过程。这种方法可以实现无锁,写入过程将NULL指针更改为非NULL,而读取过程将非NULL更改为NULL。一个小的私有堆可以为列表项提供良好的平摊性能。写入者使用malloc分配内存,读取者使用free释放内存。如果读取者进入睡眠状态,可以提供第三个进程C,该进程可以推测性地扩大私有堆,从而隐藏了分配过程对进程A的阻塞特性。 - Allan Stokes
你的示例会出现死锁。当读者在等待条件变量时,它持有锁,这会阻止写者获取锁并发出信号。你需要在等待条件变量之前释放锁,并在立即重新获取。 - Bobby Powers
1
@Bobby:你错了。在等待条件变量时,会在等待期间释放关联的锁,并在从等待中返回之前重新获取它。这就是“条件变量”的一部分——如果你使用的API没有为你完成这个操作,那么它就不是一个条件变量,而更像是一个信号量。而且API能够做到这一点非常重要,因为这样你的代码就可以依赖于释放锁并开始等待条件的原子性——也就是说,在你的线程成为等待者之前,没有其他线程可以在锁下执行任何操作。 - Steve Jessop

1

Visual Studio 2010 正在添加两个新库,非常好地支持了这种情况,即 异步代理库 和并行模式库。

代理库支持异步消息传递,并包含用于向“目标”发送消息和从“源”接收消息的消息块。

unbounded_buffer 是一个模板类,提供了我认为您正在寻找的功能:

#include <agents.h>
#include <ppl.h>
#include <iostream>

using namespace ::Concurrency;
using namespace ::std;

int main()
{
   //to hold our messages, the buffer is unbounded...
   unbounded_buffer<int> buf1;
   task_group tasks;

   //thread 1 sends messages to the unbounded_buffer
   //without blocking
   tasks.run([&buf1](){
      for(int i = 0 ; i < 10000; ++i)
         send(&buf1,i)
     //signal exit 
     send(&buf1,-1);
   });

   //thread 2 receives messages and blocks if there are none

   tasks.run([&buf1](){
      int result;
      while(result = receive(&buf1)!=-1)
      {
           cout << "I got a " << result << endl;
      }
   });

   //wait for the threads to end
   tasks.wait();
}

2
那真的在Linux类别下运行吗? - rama-jka toti
就你的接收循环而言,FWIW,你将始终输出“我得到了1”,因为在=之前评估了!=。 - Jere.Jones

1
  • 为什么不使用带有互斥锁的STL <list>或<deque>?STL的线程安全性不够吗?

  • 为什么不创建一个包含指针的(单/双)链表节点类,并使要添加/删除的项从中继承?这样就不需要额外分配了。您只需在threadA::add()threadB::remove()中调整一些指针,就可以完成操作。(虽然您会想在互斥锁下执行此操作,但除非您做错了什么,否则对threadA的阻塞效果将是可以忽略不计的...)

  • 如果您正在使用pthread,请查看sem_post()sem_wait()。其思想是,通过sem_wait(),threadB可以无限期地阻塞,直到threadA将某些内容放入队列中。然后,threadA调用sem_post()。这会唤醒threadB来完成它的工作。之后,threadB可以回到睡眠状态。这是一种处理异步信号的有效方法,支持在threadB::remove()完成之前进行多个threadA::add()的操作。


0

您可能需要考虑您的需求 - 是否真的是A不能丢弃任何队列项?还是说您不希望B从队列中连续取出两个不连续的元素,因为这会错误地表示一系列事件?

例如,如果这是某种数据记录系统,您(可以理解地)不希望记录中有间隙 - 但是在没有无限内存的情况下,在某些角落案例中,您可能会超过队列容量。

在这种情况下,一个解决方案是拥有某种特殊元素,可以放入队列中,表示A发现必须删除项目的情况。基本上,您保留一个额外的元素,大部分时间为空。每次A要向队列添加元素时,如果这个额外的元素不为空,则将其放入队列中。如果A发现队列中没有空间,则配置此额外元素以表示“嘿,队列已满”。

这种方式下,A 永远不会被阻塞,当系统非常繁忙时,您可以丢弃元素,但是您不会失去元素被丢弃的视线,因为一旦队列空间变得可用,此标记就会插入以指示数据丢失发生的位置。然后,当进程 B 发现它已经从队列中取出了这个超限标记元素时,它将执行其需要执行的任何操作。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接