有没有并发无锁阻塞队列的实现?

4
我知道阻塞队列和无锁队列,这些实现的一个很好的例子是Scott et al.提供的,但是有没有无锁阻塞队列的实现?在无锁阻塞队列中,出队不需要锁定,但如果队列中没有任何项目,则会阻塞消费者。是否有这样的实现?我希望它们是C#实现,但任何实现都可以使用。
更新:
我认为我在D14.1行上最终遇到了竞态条件:
initialize(Q: pointer to queue t)
node = new node() // Allocate a free node
node–>next.ptr = NULL // Make it the only node in the linked list
Q–>Head = Q–>Tail = node // Both Head and Tail point to it
signal = new ManualResetEvent() // create a manual reset event

    enqueue(Q: pointer to queue t, value: data type)
E1:     node = new node() // Allocate a new node from the free list
E2:     node–>value = value // Copy enqueued value into node
E3:     node–>next.ptr = NULL // Set next pointer of node to NULL
E4:     loop // Keep trying until Enqueue is done
E5:         tail = Q–>Tail // Read Tail.ptr and Tail.count together
E6:         next = tail.ptr–>next // Read next ptr and count fields together
E7:         if tail == Q–>Tail // Are tail and next consistent?
E8:             if next.ptr == NULL // Was Tail pointing to the last node?
E9:                 if CAS(&tail.ptr–>next, next, <node, next.count+1>) // Try to link node at the end of the linked list
E10.1:                  signal.Set() // Signal to the blocking dequeues
E10.2:                  break // Enqueue is done. Exit loop
E11:                endif
E12:            else // Tail was not pointing to the last node
E13:                CAS(&Q–>Tail, tail, <next.ptr, tail.count+1>) // Try to swing Tail to the next node
E14:            endif
E15:        endif
E16:     endloop
E17:    CAS(&Q–>Tail, tail, <node, tail.count+1>) // Enqueue is done. Try to swing Tail to the inserted node


    dequeue(Q: pointer to queue t, pvalue: pointer to data type): boolean
D1:     loop // Keep trying until Dequeue is done
D2:         head = Q–>Head // Read Head
D3:         tail = Q–>Tail // Read Tail
D4:         next = head–>next // Read Head.ptr–>next
D5:         if head == Q–>Head // Are head, tail, and next consistent?
D6:             if head.ptr == tail.ptr // Is queue empty or Tail falling behind?
D7:                 if next.ptr == NULL // Is queue empty?
D8.1:                   signal.WaitOne() // Block until an enqueue
D8.X:                   // remove the return --- return FALSE // Queue is empty, couldn’t dequeue
D9:                 endif
D10:                CAS(&Q–>Tail, tail, <next.ptr, tail.count+1>) // Tail is falling behind. Try to advance it
D11:            else // No need to deal with Tail
                    // Read value before CAS, otherwise another dequeue might free the next node
D12:                *pvalue = next.ptr–>value
D13:                if CAS(&Q–>Head, head, <next.ptr, head.count+1>) // Try to swing Head to the next node
D14.1:                  if(head.ptr == tail.ptr && next.ptr==NULL) // Is queue empty? <--- POSSIBLE RACE CONDITION???
D14.2:                      signal.Reset()
D14.3:                  break // Dequeue is done. Exit loop
D15:                endif
D16:            endif
D17:         endif
D18:    endloop
D19:    free(head.ptr) // It is safe now to free the old dummy node
D20:    return TRUE // Queue was not empty, dequeue succeeded

你的PDF链接有问题,你能修复一下吗? - Will Hartung
@Will,抱歉,我的链接中的pdf缺少了一个f,现在应该没问题了(感谢您指出)。 - Kiril
注意:.NET 4带来了大量并发和无锁实现。 - user34537
@David,锁定会导致大量函数调用,并使您的应用程序在锁定的部分/方法方面按顺序运行。移除锁定可以使您的应用程序真正多线程化,并减少需要执行的代码量,但这需要熟练的编程技巧。 - Kiril
@David 我不是独立工作的,所以虽然我不介意转向非托管代码,但我不确定这是否与公司的其他部门或我的老板相协调。转向非托管代码的成本太高,无法证明其结果的合理性。此外,即使整个公司决定采用非托管代码,我仍希望使用最快速的解决方案:无锁阻塞队列。 - Kiril
显示剩余18条评论
2个回答

1

.NET 并行扩展:(内置,适用于 .NET 4.0+):

http://blogs.msdn.com/b/pfxteam/archive/2010/01/26/9953725.aspx


StackOverflow 上的实现:

.net 中的无锁构造



在评论中澄清的响应:

如果空时的阻塞不是繁忙等待(等待信号),那么似乎需要使用计数信号量进行等待。

另一种方法是使用常规队列,结合原子比较和交换或自旋锁来防止同时访问,
然后,如果消费者线程尝试进入时队列为空,就锁定二进制信号量,
如果提供程序线程尝试在队列为空时进入,则解锁二进制信号量以唤醒所有睡眠的消费者(并将它们返回到自旋锁,以便多个线程只能在队列中有足够的项目时进入)。

例如,// 伪代码

/// Non-blocking lock (busy wait)
void SpinLock()
{
    While (CompareAndExchange(myIntegerLock, -1, 0) != 0)
    {
        // wait
    }
}

void UnSpinLock()
{
    Exchange(myIntegerLock, 0);
}

void AddItem(item)
{
    // Use CAS for synchronization
    SpinLock(); // Non-blocking lock (busy wait)

    queue.Push(item);

    // Unblock any blocked consumers
    if (queue.Count() == 1)
    {
        semaphore.Increase();
    }

    // End of CAS synchronization block
    UnSpinLock();
}

Item RemoveItem()
{
    // Use CAS for synchronization
    SpinLock(); // Non-blocking lock (busy wait)

    // If empty then block
    if (queue.Count() == 0)
    {
        // End of CAS synchronization block
        UnSpinLock();

        // Block until queue is not empty
        semaphore.Decrease();

        // Try again (may fail again if there is more than one consumer)
        return RemoveItem();
    }

    result = queue.Pop();

    // End of CAS synchronization block
    UnSpinLock();

    return result;
}

@Danny,感谢提供的链接,但我特别在寻找无锁阻塞队列,根据我所读的资料,这似乎是一件相当困难的事情。我想知道是否有人真正成功地实现了它。 - Kiril
如果队列繁忙,是否会阻塞客户端线程?如果是这样,那么在队列周围使用自旋锁(无互斥锁/忙等待)应该可以解决问题。 - Danny Varod
@Danny 如果队列为空,则应阻塞,但当队列中有项目时不应该有锁。Scott论文展示了两种实现:无锁队列和阻塞队列...它们都是用Java实现的。我还没有看到过既无锁又阻塞的队列(即仅在空时阻塞)。 - Kiril
@lukas ConcurrentBag非常好用,但是它使用锁定机制:https://dev59.com/Cm445IYBdhLWcg3wl7bW - Kiril
@Danny,请看我的编辑,我添加了伪代码,展示了我“设想”的解决方案。 - Kiril

1

编辑:

更简单: 我建议您的队列不需要头和尾。只需要一个头部即可。如果头部= NULL,则列表为空。将项目添加到头部。从头部删除项目。更简单,CAS操作更少。

帮助者: 我在评论中建议您需要考虑一个辅助方案来处理竞争情况。在我理解的“无锁”版本中,如果罕见的竞争条件不会引起问题,那么这是可以接受的。我喜欢额外的性能,而不是让空闲线程睡眠几毫秒。

辅助想法。当消费者获取工作时,它可以检查是否有线程处于昏迷状态。当生产者添加工作时,它可以寻找处于昏迷状态的线程。

跟踪睡眠者。使用一个睡眠者的链接列表。当线程决定没有工作时,它将自己标记为!awake并将自己CAS到睡眠者列表的头部。当收到唤醒信号时,线程将自己标记为awake。然后新唤醒的线程清理睡眠者列表。要清理并发单链表,必须小心。您只能CAS到头部。因此,当睡眠者列表的头部被标记为awake时,您可以将头部CAS掉。如果头部没有醒来,请继续扫描列表并“懒惰取消链接”(我编造了这个术语)其余醒来的项目。懒惰取消链接很简单...只需将前一个项目的下一个指针设置在醒来的项目上方即可。即使扫描到!awake的项目,同时进行扫描仍然会到达列表的末尾。随后的扫描看到更短的列表。最后,任何时候添加工作或取出工作,都要扫描睡眠者列表以获取!awake项目。如果消费者注意到在获取一些工作后仍有剩余的工作(.next work!= NULL),则消费者可以扫描睡眠者列表并向第一个!awake的线程发出信号。生产者添加工作后,可以扫描睡眠者列表并执行相同操作。

如果您有广播场景并且无法信号单个线程,则只需保持睡眠线程的计数。 只要该计数仍然> 0,消费者注意到剩余工作和消费者添加工作将广播信号以唤醒。

在我们的环境中,每个SMT都有1个线程,因此沉睡列表永远不可能那么大(除非我拿到那些新的128个并发线程机器之一!)我们在事务早期生成工作项。 在第一秒钟,我们可能会生成10,000个工作项,并且这种生产迅速减少。 线程在这些工作项上工作了几秒钟。 因此,我们很少有一个线程处于空闲池中。

您仍然可以使用锁 如果您只有1个线程并且很少生成工作...那么这对您没有用。 在这种情况下,互斥体的性能不是问题,您应该只使用它们。 在这种情况下,在睡眠队列上使用锁定。 将无锁视为“关键时刻不使用锁定”。

先前的帖子: 您是说: 有一个工作队列。 有许多消费者线程。 如果有任何工作,消费者需要拉出工作并执行它 消费者线程需要睡眠,直到有工作为止。

如果您是,我们只使用原子操作来做到这一点:

工作队列是一个链接列表。还有一个正在休眠的线程的链接列表。

要添加工作:将列表的头部CAS到新工作中。添加工作时,我们检查是否有任何线程在睡眠列表上。如果有,在添加工作之前,我们将一个睡眠者从睡眠列表中CAS出来,将其工作设置为新工作,然后发出唤醒信号使睡眠者醒来。然后,我们将工作添加到工作队列中。

要消耗工作:将列表的头部CAS到head->next。如果工作列表的头部为空,我们将线程CAS到睡眠者列表中。

一旦线程拥有工作项,线程必须将工作项的状态CAS到WORK_INPROGRESS或其他状态。如果失败了,这意味着该工作正在被另一个线程执行,因此消费者线程需要返回搜索工作。如果线程被唤醒并具有工作项,则仍然必须CAS该状态。

如果添加了工作,睡眠的消费者总是会被唤醒并分配工作。pthread_kill()总是在sigwait()处唤醒线程,因为即使线程在信号之后到达sigwait,也会接收到信号。这解决了一个线程将自己放在睡眠列表上但在睡眠之前被发出信号的问题。所有发生的事情就是线程尝试拥有它的->work(如果有的话)。未能拥有工作或没有工作会将线程发送回consume-start。如果线程无法CAS到睡眠列表,则意味着另一个线程击败了它,或者生产者拉出了一个睡眠者。为了安全起见,我们让线程表现得像刚刚被唤醒一样。

我们这样做不会出现竞争条件,并且具有多个生产者和消费者。我们还能够扩展此功能,以允许线程在单个工作项上休眠。


@johnnycrash 不,我的意思是消费者从队列中取出一个项目,直到队列中没有任何东西。当消费者调用dequeue并且队列中没有任何内容时,消费者将在dequeue调用上被阻塞(这是标准的阻塞队列,我已经有了)。请参阅引用的论文,了解两个队列的描述...基本上,阻塞队列在dequeue上锁定,无锁队列不会锁定(正如您所说,它使用CAS),但是无锁队列返回如果队列中没有任何内容,我希望无锁队列阻塞而不是返回。 - Kiril
@Lirik,我们的GetWork()函数在将线程添加到休眠列表并调用sigwait()时会阻塞。 sigwait()使线程进入睡眠状态。稍后出现工作时,我们调用pthread_kill()来唤醒线程。我不知道.Net是否有一种像Linux那样通过信号使线程进入休眠状态并唤醒它的方法。但是,长话短说...当队列为空时,我们会阻塞,而且我们没有在任何地方使用锁。 - johnnycrash
@johnnycrash 我现在关注你了!好的,我想我知道如何完成这个任务:不是拥有一个睡眠线程列表,我可以只是让出队阻塞在ManualResetEvent上,直到队列中有东西(当入队时,ManualResetEvent将被设置,并在队列为空时重置)。我今晚稍后会尝试写一些代码...我想我现在明白了。 - Kiril
@Lirik,很酷!我相信它能够工作的关键是,如果你将工作添加到队列中,你总是会唤醒一个睡眠者。如果没有睡眠者...不要唤醒任何东西。注意在将自己放在睡眠者列表上后完成睡眠的竞争。如果生产者在该线程完成其睡眠之前发出信号,则可能会出现所有线程都处于睡眠状态的情况。我们没有看到这种情况发生,因为sigwait缓存信号。现在你让我想到了sigwait()中可能有一个互斥锁,呃... - johnnycrash
是的,这就是等待信号可能已经被发送给你的可怕竞争。 - johnnycrash
显示剩余2条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接