一个可观察对象的同步机制

20
假设我们需要同步访问共享资源的读写操作。多个线程将同时对其进行读写访问(大部分情况下为读取,有时为写入)。假设每次写入都会触发读取操作(该对象是可观察对象)。
针对这个例子,我将想象一个类,如下所示(原谅语法和风格,仅供说明用途):
class Container {
    public ObservableCollection<Operand> Operands;
    public ObservableCollection<Result> Results;
}

我打算使用 ReadWriterLockSlim 来实现这个目的,而且我会将其放置在容器级别上(想象一下对象不是那么简单,一个读写操作可能涉及多个对象):
public ReadWriterLockSlim Lock;

OperandResult的实现对于这个例子没有意义。 现在让我们想象一些代码,观察Operands并生成一个结果放入Results中:

void AddNewOperand(Operand operand) {
    try {
        _container.Lock.EnterWriteLock();
        _container.Operands.Add(operand);
    }
    finally {
        _container.ExitReadLock();
    }
}

我们的假想观察者会执行类似的操作,但是为了使用新元素,它将使用EnterReadLock()锁定以获取操作数,然后使用EnterWriteLock()添加结果(让我省略此代码)。这将由于递归而产生异常,但如果我设置LockRecursionPolicy.SupportsRecursion,那么我只会打开我的代码以进行死锁(来自MSDN):

默认情况下,使用LockRecursionPolicy.NoRecursion标志创建ReaderWriterLockSlim的新实例,不允许递归。建议所有新开发都使用此默认策略,因为递归引入了不必要的复杂性,并使您的代码更容易出现死锁

我重复相关部分以明确: 递归[...]使您的代码更容易出现死锁。 如果我没有理解错,如果我在同一个线程中请求读锁,然后“某人”请求写锁,那么我将会遇到死锁,这是因为LockRecursionPolicy.SupportsRecursion。此外,递归也会以可测量的方式降低性能(如果我使用ReadWriterLockSlim而不是ReadWriterLockMonitor,这不是我想要的)。
问题:
最后我的问题是(请注意,我不是在寻找关于一般同步机制的讨论,我想知道在这种生产者/可观察者/观察者场景下有什么问题):
- 在这种情况下,什么更好?放弃ReadWriterLockSlim,转而使用Monitor(即使在实际代码中读取远多于写入)? - 放弃如此粗略的同步?这甚至可能会带来更好的性能,但它会使代码变得更加复杂(当然不是在这个例子中,而是在实际应用中)。 - 我只需使通知(从观察到的集合)异步吗? - 还有其他我看不到的东西吗?

我知道不存在一个最好的同步机制,所以我们使用的工具必须是适合我们情况的,但我想知道在线程和观察者之间是否有一些最佳实践或者我是否忽略了一些非常重要的事情(假设使用Microsoft Reactive Extensions,但问题是普遍的,不局限于该框架)。

可能的解决方案?

我会尝试使事件(以某种方式)延迟:

第一种解决方案
每次更改都不会触发任何CollectionChanged事件,而是将其保留在队列中。当提供程序(推送数据的对象)完成后,它将手动强制刷新队列(按顺序引发每个事件)。这可以在另一个线程中完成,甚至可以在调用线程中完成(但在锁定外部)。

这可能有效,但它会使一切变得不那么“自动化”(每个更改通知必须由生产者自己手动触发,需要编写更多的代码,到处都有更多的错误)。

第二个解决方案
另一个解决方案可能是向可观察集合提供对我们的lock的引用。如果我在自定义对象中包装ReadWriterLockSlim(有助于将其隐藏在易于使用的IDisposable对象中),我可以添加一个ManualResetEvent来通知所有锁已被释放,这样集合本身就可以在同一线程或另一线程中引发事件。

第三个解决方案
另一个想法可能只是使事件异步化。如果事件处理程序需要锁定,则会停止等待其时间框架。对此,我担心可能使用大量线程(特别是如果来自线程池)。

老实说,我不知道这些解决方案是否适用于实际应用程序(个人而言-从用户的角度来看-我更喜欢第二种方法,但它意味着为每个东西创建自定义集合,并且它使集合了解线程,如果可能,我会避免它)。我不想使代码比必要的复杂。


我只想提到,我认为EnterReadLockAdd组合相当可怕。代码的意图是仅进行读操作,但实际上它也会将数据写入集合。您确定在该特定点不想使用EnterWriteLock吗? - Caramiriel
1
@Caramiriel,你说得对,我修正了例子! - Adriano Repetti
1
如果您将方法稍微粗略一些,例如将Operands和Result设置为只读属性并添加AddOperand和AddResult方法,那么您将能够使锁定变为私有,并更好地控制发生的情况。或者我完全误解了重点? - flup
@flup,你说得很对。我的问题是这会让一切变得更加复杂,并且模型将意识到线程(如果可能的话,我会避免这种情况,因为当它在单线程场景中使用时,性能会受到影响)。此外,模型本身当然比我的示例复杂得多。也许可以在模型上构建一个线程安全层,其中包含像你建议的方法? - Adriano Repetti
在这种情况下,我会选择第三个解决方案-异步触发事件。我不会担心线程数量。这种情况的重要性在很大程度上已经过去了。 - Tar
显示剩余5条评论
3个回答

8
这听起来是一个相当复杂的多线程问题。在这种事件链式模式中使用递归而又避免死锁是非常具有挑战性的。您可能需要考虑完全围绕该问题进行设计。
例如,您可以将操作数的添加设置为与引发事件异步执行:
private readonly BlockingCollection<Operand> _additions
    = new BlockingCollection<Operand>();

public void AddNewOperand(Operand operand)
{
    _additions.Add(operand);
}

然后在后台线程中执行实际的添加操作:

private void ProcessAdditions()
{
    foreach(var operand in _additions.GetConsumingEnumerable())
    {
        _container.Lock.EnterWriteLock();
        _container.Operands.Add(operand);
        _container.Lock.ExitWriteLock();
    }
}

public void Initialize()
{
    var pump = new Thread(ProcessAdditions)
    {
        Name = "Operand Additions Pump"
    };
    pump.Start();
}

这样的分离会牺牲一些一致性 - 在 add 方法后运行的代码实际上不知道 add 已经发生了,也许这对你的代码是一个问题。如果是这样的话,可以重新编写以订阅观察并使用 Task 在添加完成时发出信号:
public Task AddNewOperandAsync(Operand operand)
{
    var tcs = new TaskCompletionSource<byte>();

    // Compose an event handler for the completion of this task
    NotifyCollectionChangedEventHandler onChanged = null;
    onChanged = (sender, e) =>
    {
        // Is this the event for the operand we have added?
        if (e.NewItems.Contains(operand))
        {
            // Complete the task.
            tcs.SetCompleted(0);

            // Remove the event-handler.
            _container.Operands.CollectionChanged -= onChanged;
        }
    }

    // Hook in the handler.
    _container.Operands.CollectionChanged += onChanged;

    // Perform the addition.
    _additions.Add(operand);

    // Return the task to be awaited.
    return tcs.Task;
}

事件处理逻辑是在后台线程上引发添加消息的,因此不可能阻止前台线程。如果您在窗口的消息泵上等待添加,同步上下文会聪明地将延续安排在消息泵线程上。

无论您是否选择使用 Task,此策略意味着您可以安全地从可观察事件中添加更多操作数,而无需重新输入任何锁定。


我应该检查一下使用AddAsync()方法的工作方式,但我喜欢这个解决方案!如果消费者需要确保他们添加的内容可用,他们将等待。 - Adriano Repetti

1

我不确定这是否完全是同一个问题,但当处理相对较小的数据量(2k-3k条记录)时,我使用下面的代码来方便地实现跨线程读写访问绑定到UI的集合。此代码最初在这里找到。

public class BaseObservableCollection<T> : ObservableCollection<T>
{
  // Constructors
  public BaseObservableCollection() : base() { }
  public BaseObservableCollection(IEnumerable<T> items) : base(items) { }
  public BaseObservableCollection(List<T> items) : base(items) { }

  // Evnet
  public override event NotifyCollectionChangedEventHandler CollectionChanged;

  // Event Handler
  protected override void OnCollectionChanged(NotifyCollectionChangedEventArgs e)
  {
    // Be nice - use BlockReentrancy like MSDN said
    using (BlockReentrancy())
    {
      if (CollectionChanged != null)
      {
        // Walk thru invocation list
        foreach (NotifyCollectionChangedEventHandler handler in CollectionChanged.GetInvocationList())
        {
          DispatcherObject dispatcherObject = handler.Target as DispatcherObject;

          // If the subscriber is a DispatcherObject and different thread
          if (dispatcherObject != null && dispatcherObject.CheckAccess() == false)
          {
            // Invoke handler in the target dispatcher's thread
            dispatcherObject.Dispatcher.Invoke(DispatcherPriority.DataBind, handler, this, e);
          }
          else
          {
            // Execute handler as is
            handler(this, e);
          }
        }
      }
    }
  }
}

我还使用了以下代码(继承自上面的代码),以支持在集合内部的项引发PropertyChanged时引发CollectionChanged事件。
public class BaseViewableCollection<T> : BaseObservableCollection<T>
  where T : INotifyPropertyChanged
{
  // Constructors
  public BaseViewableCollection() : base() { }
  public BaseViewableCollection(IEnumerable<T> items) : base(items) { }
  public BaseViewableCollection(List<T> items) : base(items) { }

  // Event Handlers
  private void ItemPropertyChanged(object sender, PropertyChangedEventArgs e)
  {
    var arg = new NotifyCollectionChangedEventArgs(NotifyCollectionChangedAction.Replace, sender, sender);
    base.OnCollectionChanged(arg);
  }

  protected override void ClearItems()
  {
    foreach (T item in Items) { if (item != null) { item.PropertyChanged -= ItemPropertyChanged; } }
    base.ClearItems();
  }

  protected override void InsertItem(int index, T item)
  {
    if (item != null) { item.PropertyChanged += ItemPropertyChanged; }
    base.InsertItem(index, item);
  }

  protected override void RemoveItem(int index)
  {
    if (Items[index] != null) { Items[index].PropertyChanged -= ItemPropertyChanged; }
    base.RemoveItem(index);
  }

  protected override void SetItem(int index, T item)
  {
    if (item != null) { item.PropertyChanged += ItemPropertyChanged; }
    base.SetItem(index, item);
  }
}

0

跨线程集合同步

将ListBox绑定到ObservableCollection时,当数据发生更改时,您可以更新ListBox,因为它实现了INotifyCollectionChanged。ObservableCollection的缺陷是只能由创建它的线程更改数据。

SynchronizedCollection没有多线程问题,但不会更新ListBox,因为它没有实现INotifyCollectionChanged,即使您实现了INotifyCollectionChanged,CollectionChanged(this,e)也只能从创建它的线程调用..所以它无法工作。

结论

-如果您想要一个自动更新单线程列表,请使用ObservableCollection

-如果您想要一个非自动更新但多线程列表,请使用SynchronizedCollection

-如果您两者都想要,请使用Framework 4.5,BindingOperations.EnableCollectionSynchronization和ObservableCollection(),如下所示:

/ / Creates the lock object somewhere
private static object _lock = new object () ;
...
/ / Enable the cross acces to this collection elsewhere
BindingOperations.EnableCollectionSynchronization ( _persons , _lock )

完整的示例 http://10rem.net/blog/2012/01/20/wpf-45-cross-thread-collection-synchronization-redux


谢谢,这对参考很有帮助!问题在于SynchronizedCollection使用监视器作为同步机制。我的问题是避免使用监视器,而是使用更加并发的东西(例如ReadWriterLockSlim),适用于多个并发读取和少量写入的情况(实际上与用户界面不直接相关)。 - Adriano Repetti

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接