我有4个线程。其中一个正在从网络中读取一些信息并将其写入变量,每读取一次后都应该发出信号。另外三个线程正在读取此变量,并且应该恰好读取一次。当前的解决方案是写线程在写入后设置事件并等待读线程的事件。读线程等待事件,然后读取并设置它们自己的事件(即表示已读取)。问题是读线程可能会读取多次,导致其中出现重复项。如何让读线程仅读取一次?
我有4个线程。其中一个正在从网络中读取一些信息并将其写入变量,每读取一次后都应该发出信号。另外三个线程正在读取此变量,并且应该恰好读取一次。当前的解决方案是写线程在写入后设置事件并等待读线程的事件。读线程等待事件,然后读取并设置它们自己的事件(即表示已读取)。问题是读线程可能会读取多次,导致其中出现重复项。如何让读线程仅读取一次?
实现此功能的一种方法如下:
将数据作为单链表在线程之间共享。列表中的每个节点可以是标记或具有数据。列表从一个类型为标记的单个节点开始。当读取数据时,会形成一个新列表,其中包含一系列数据节点,后跟一个标记。这个列表被附加到最近添加到列表中的标记。
每个读取器线程都从对原始标记节点的引用和一个AutoResetEvent
开始。每当写入器收到新数据时,它将为每个读取器线程发出AutoResetEvent
信号。然后读取器线程将简单地向前走,直到找到没有下一个节点的标记。
这个方案确保所有读取器只看到数据一次。最大的复杂性在于构建列表,使得它可以以无锁的方式写入和读取。不过,使用Interlocked.CompareExchange
相当简单。
链表类型
class Node<T> {
public bool IsMarker;
public T Data;
public Node<T> Next;
}
示例编写器类型
class Writer<T> {
private List<AutoResetEvent> m_list;
private Node<T> m_lastMarker;
public Writer(List<AutoResetEvent> list, Node<T> marker) {
m_lastMarker = marker;
m_list = list;
}
// Assuming this can't overlap. If this can overload then you will
// need synchronization in this method around the writing of
// m_lastMarker
void OnDataRead(T[] items) {
if (items.Length == 0) {
return;
}
// Build up a linked list of the new data followed by a
// Marker to signify the end of the data.
var head = new Node<T>() { Data = items[0] };
var current = head;
for (int i = 1; i < items.Length; i++) {
current.Next = new Node<T>{ Data = items[i] };
current = current.Next;
}
var marker = new Node<T> { IsMarker = true };
current.Next = marker;
// Append the list to the end of the last marker node the writer
// created
m_lastMarker.Next = head;
m_lastMarker = marker;
// Tell each of the readers that there is new data
foreach (var e in m_list) {
e.Set();
}
}
}
示例读取器类型
class Reader<T> {
private AutoResetEvent m_event;
private Node<T> m_marker;
void Go() {
while(true) {
m_event.WaitOne();
var current = m_marker.Next;
while (current != null) {
if (current.IsMarker) {
// Found a new marker. Always record the marker because it may
// be the last marker in the chain
m_marker = current;
} else {
// Actually process the data
ProcessData(current.Data);
}
current = current.Next;
}
}
}
}
ManualResetEvent
对象和一个CountdownEvent
。以下是如何使用它们。ManualResetEvent DataReadyEvent = new ManualResetEvent();
ManualResetEvent WaitForResultEvent = new ManualResetEvent();
CountdownEvent Acknowledgement = new CountdownEvent(NumWaitingThreads);
DataReadyEvent
。Acknowledgement.Reset(NumWaitingThreads);
DataReadyEvent.Set(); // signal waiting threads to process
Acknowledgement.WaitOne(); // wait for all threads to signal they got it.
DataReadyEvent.Reset(); // block threads' reading
WaitForResultEvent.Set(); // tell threads they can continue
等待的线程执行以下操作:
DataReadyEvent.WaitOne(); // wait for value to be available
// read the value
Acknowledgement.Set(); // acknowledge receipt
WaitForResultEvent.WaitOne(); // wait for signal to proceed
这个Barrier类非常适合。
你可以使用两个Barriers
来在两种状态之间切换。
下面是一个例子:
using System;
using System.Threading;
using System.Threading.Tasks;
namespace Demo
{
internal class Program
{
private static void Main(string[] args)
{
int readerCount = 4;
Barrier barrier1 = new Barrier(readerCount + 1);
Barrier barrier2 = new Barrier(readerCount + 1);
for (int i = 0; i < readerCount; ++i)
{
Task.Factory.StartNew(() => reader(barrier1, barrier2));
}
while (true)
{
barrier1.SignalAndWait(); // Wait for all threads to reach the "new data available" point.
if ((value % 10000) == 0) // Print message every so often.
Console.WriteLine(value);
barrier2.SignalAndWait(); // Wait for the reader threads to read the current value.
++value; // Produce the next value.
}
}
private static void reader(Barrier barrier1, Barrier barrier2)
{
int expected = 0;
while (true)
{
barrier1.SignalAndWait(); // Wait for "new data available".
if (value != expected)
{
Console.WriteLine("Expected " + expected + ", got " + value);
}
++expected;
barrier2.SignalAndWait(); // Signal that we've read the data, and wait for all other threads.
}
}
private static volatile int value;
}
}
ConnurrentQueue<T>.TryDequeue()
是一个线程安全的方法,它检查队列是否不为空,如果不为空,则从队列中取出一个项目。由于它同时执行两个操作,程序员不必担心竞争条件。我认为我已经找到了方法。我创建了2个自动重置事件的数组,每个读取器都有2个事件,等待写入事件并设置读取事件,写入器设置所有写入事件并等待所有读取事件。
JaredPar,你的答案很有用,帮助了我。