C#一位作者多个读者仅允许读取一次

4

我有4个线程。其中一个正在从网络中读取一些信息并将其写入变量,每读取一次后都应该发出信号。另外三个线程正在读取此变量,并且应该恰好读取一次。当前的解决方案是写线程在写入后设置事件并等待读线程的事件。读线程等待事件,然后读取并设置它们自己的事件(即表示已读取)。问题是读线程可能会读取多次,导致其中出现重复项。如何让读线程仅读取一次?


我更喜欢从任何可行的解决方案开始,然后再做得更好。 - syned
1
我认为读者应该处理重复写入消息。我不认为你应该让客户端假设更改事件只会触发一次。例如,有些网络场景可能会导致这种行为。 - neontapir
5个回答

2

实现此功能的一种方法如下:

将数据作为单链表在线程之间共享。列表中的每个节点可以是标记或具有数据。列表从一个类型为标记的单个节点开始。当读取数据时,会形成一个新列表,其中包含一系列数据节点,后跟一个标记。这个列表被附加到最近添加到列表中的标记。

每个读取器线程都从对原始标记节点的引用和一个AutoResetEvent开始。每当写入器收到新数据时,它将为每个读取器线程发出AutoResetEvent信号。然后读取器线程将简单地向前走,直到找到没有下一个节点的标记。

这个方案确保所有读取器只看到数据一次。最大的复杂性在于构建列表,使得它可以以无锁的方式写入和读取。不过,使用Interlocked.CompareExchange相当简单。

链表类型

class Node<T> { 
  public bool IsMarker;
  public T Data;
  public Node<T> Next;
}

示例编写器类型

class Writer<T> {
  private List<AutoResetEvent> m_list; 
  private Node<T> m_lastMarker;

  public Writer(List<AutoResetEvent> list, Node<T> marker) { 
    m_lastMarker = marker;
    m_list = list;
  }

  // Assuming this can't overlap.  If this can overload then you will
  // need synchronization in this method around the writing of 
  // m_lastMarker      
  void OnDataRead(T[] items) {
    if (items.Length == 0) {
      return;
    }

    // Build up a linked list of the new data followed by a 
    // Marker to signify the end of the data.  
    var head = new Node<T>() { Data = items[0] };
    var current = head;
    for (int i = 1; i < items.Length; i++) {
      current.Next = new Node<T>{ Data = items[i] };
      current = current.Next;
    }
    var marker = new Node<T> { IsMarker = true };
    current.Next = marker;

    // Append the list to the end of the last marker node the writer
    // created 
    m_lastMarker.Next = head;
    m_lastMarker = marker;

    // Tell each of the readers that there is new data 
    foreach (var e in m_list) { 
      e.Set();
    }
  }
}

示例读取器类型

class Reader<T> { 
  private AutoResetEvent m_event;
  private Node<T> m_marker;

  void Go() {
    while(true) { 
      m_event.WaitOne();
      var current = m_marker.Next;
      while (current != null) { 
        if (current.IsMarker) { 
          // Found a new marker.  Always record the marker because it may 
          // be the last marker in the chain 
          m_marker = current;
        } else { 
          // Actually process the data 
          ProcessData(current.Data);
        }
        current = current.Next;
      }
    }
  }
}

1
我同意评论中的观点,认为你应该编写消费者线程以接受多次获取相同值的可能性。也许最简单的方法是为每个更新添加一个顺序标识符。这样,线程可以将顺序ID与其读取的上一个ID进行比较,从而知道是否重复获取。
它还将知道是否错过了某个值。
但是,如果您真的需要它们保持同步并且仅获取一次值,我建议您使用两个ManualResetEvent对象和一个CountdownEvent。以下是如何使用它们。
ManualResetEvent DataReadyEvent = new ManualResetEvent();
ManualResetEvent WaitForResultEvent = new ManualResetEvent();
CountdownEvent Acknowledgement = new CountdownEvent(NumWaitingThreads);

读取线程等待 DataReadyEvent
当另一个线程从网络中读取一个值时,它会执行以下操作:
Acknowledgement.Reset(NumWaitingThreads);
DataReadyEvent.Set();  // signal waiting threads to process
Acknowledgement.WaitOne();  // wait for all threads to signal they got it.
DataReadyEvent.Reset(); // block threads' reading
WaitForResultEvent.Set(); // tell threads they can continue

等待的线程执行以下操作:

DataReadyEvent.WaitOne(); // wait for value to be available
// read the value
Acknowledgement.Set();  // acknowledge receipt
WaitForResultEvent.WaitOne(); // wait for signal to proceed

这个和每个等待线程有两个事件的效果相同,但更加简单。然而,它也有缺点,如果一个线程崩溃,这将会在倒数事件上挂起。但是,如果生产者线程等待所有线程消息,你的方法也会出现这种情况。

1

这个Barrier类非常适合。

你可以使用两个Barriers来在两种状态之间切换。

下面是一个例子:

using System;
using System.Threading;
using System.Threading.Tasks;

namespace Demo
{
    internal class Program
    {
        private static void Main(string[] args)
        {
            int readerCount = 4;

            Barrier barrier1 = new Barrier(readerCount + 1);
            Barrier barrier2 = new Barrier(readerCount + 1);

            for (int i = 0; i < readerCount; ++i)
            {
                Task.Factory.StartNew(() => reader(barrier1, barrier2));
            }

            while (true)
            {
                barrier1.SignalAndWait(); // Wait for all threads to reach the "new data available" point.

                if ((value % 10000) == 0)       // Print message every so often.
                    Console.WriteLine(value);

                barrier2.SignalAndWait(); // Wait for the reader threads to read the current value.
                ++value;                  // Produce the next value.
            }
        }

        private static void reader(Barrier barrier1, Barrier barrier2)
        {
            int expected = 0;

            while (true)
            {
                barrier1.SignalAndWait(); // Wait for "new data available".

                if (value != expected)
                {
                    Console.WriteLine("Expected " + expected + ", got " + value);
                }

                ++expected;
                barrier2.SignalAndWait();  // Signal that we've read the data, and wait for all other threads.
            }
        }

        private static volatile int value;
    }
}

0
我建议使用ConcurrentQueue - 它保证每个线程从队列中获取唯一实例。这里是一个很好的解释如何使用它。 ConnurrentQueue<T>.TryDequeue()是一个线程安全的方法,它检查队列是否不为空,如果不为空,则从队列中取出一个项目。由于它同时执行两个操作,程序员不必担心竞争条件。

那么,写入者将新数据添加到队列中,然后读取者读取并删除该元素? - syned
但是作者如何知道它应该向这个队列添加多少元素呢?还是它会自动复制这些元素到整个树中吗? - syned

0

我认为我已经找到了方法。我创建了2个自动重置事件的数组,每个读取器都有2个事件,等待写入事件并设置读取事件,写入器设置所有写入事件并等待所有读取事件。

JaredPar,你的答案很有用,帮助了我。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接