如何使ObservableCollection线程安全?

27
System.InvalidOperationException: Collection was modified; enumeration operation may not execute.

我正在对一个不在UI线程上的ObservableCollection进行添加/删除操作。

我有一个名为EnqueueReport的方法用于添加到集合中,以及一个名为DequeueReport的方法用于从集合中删除。

步骤如下:

  1. 1.每当请求新报告时,请调用EnqueueReport
  2. 每隔几秒钟调用一次方法,检查报告是否已生成(这里有一个foreach循环,检查ObservableCollection中所有报告的生成状态)
  3. 如果报告已生成,请调用DequeueReport

我在C#库方面了解不多。请问有人能够指导我吗?


1
尝试这个ObservableCollection和线程 - Jakub
1
使用 .net 框架时,您不能将可观察集合绑定到 UI 线程之外进行编辑。有一些模式可以解决这个问题,包括延迟 UI 更新、批量插入或创建一个线程安全的 CollectionView 类实现。 - Aron
6个回答

36

从 .net framework 4.5 开始,您可以使用本地集合同步。

BindingOperations.EnableCollectionSynchronization(YourCollection, YourLockObject);

YourLockObject 是任何对象的实例,例如 new Object();。每个集合使用一个锁对象。

这样可以省去某些特殊类或其他东西的需要。只需启用并享受它 ;)

[编辑] 正如 Mark 和 Ed 在评论中所述(感谢澄清!),这并不意味着在更新时可以免除锁定集合,因为它只同步集合视图绑定,而不能使集合本身神奇地变成线程安全。 [/编辑]

PS:BindingOperations 位于命名空间 System.Windows.Data 中。


4
请注意,这使得其他线程可以对集合进行更改,但如果您从多个线程访问集合,则仍然需要进行同步。也就是说,集合并不会突然变得线程安全,但在WPF中的绑定是线程安全的。 - Mark Gjøl
1
补充@MarkGjøl的评论“从多个线程访问...”:您需要在任何可能被某些线程调用的代码周围使用lock(YourLockObject){...}。也就是说,即使您“只从一个线程调用”,请明确UI线程已经参与其中,因此如果您在其他线程上(或可能在其他线程上),则需要进行锁定。 - ToolmakerSteve

21

在这里发布的Franck解决方案适用于一个线程添加内容的情况,但ObservableCollection本身(以及它基于的List)并不是线程安全的。如果有多个线程同时写入集合中,可能会引入难以跟踪的错误。我编写了一个版本的ObservableCollection,使用ReaderWriteLockSlim来确保真正的线程安全。

不幸的是,由于达到了StackOverflow字符限制,所以在PasteBin上发布了代码。 这个版本可以完美地处理多个读取器/写入器。与常规的ObservableCollection一样,在从其回调的线程(接收到回调的线程)中修改集合是无效的。


1
@Philipp Munin 我把它放在gist上了 https://gist.github.com/anonymous/26d9d070619de58fa8e28ea21fff04fd - Tobias
这个集合的功能很好,但是它应该实现IDisposable接口,因为它有一些可释放的字段,比如ReaderWriterLockSlim和ThreadLocal。在我的情况下,它很快就耗尽了所有可用的内存,因为它保持了几个大型对象图的活动状态。 - wilford
1
可能存在竞争条件;运行了很多小时后,我总是会得到“ItemsControl与其项源不一致”的错误...“累计计数为(自上次重置以来的计数+新增数量-删除数量)”。 - wonko realtime
哦哦,谢谢你指出来。我会检查一下,但现在最好使用微软官方的,因为它经过了更多的测试。 - Robert Fraser
2
如果我对情况和代码的理解是正确的,那么你所提到的竞态条件确实存在:WPF 期望“对集合的更改和通知(通过 INotifyCollectionChanged)是原子的;其他线程不能干扰”(https://learn.microsoft.com/en-us/dotnet/api/system.windows.data.bindingoperations.enablecollectionsynchronization?view=netframework-4.8)例如:工作线程执行 Add 操作,排队事件,UI 线程读取 Count,接收事件并看到不一致性。 - Martin
显示剩余4条评论

14

你可以创建一个简单的线程友好版本的可观察集合,如下所示:

 public class MTObservableCollection<T> : ObservableCollection<T>
    {
        public override event NotifyCollectionChangedEventHandler CollectionChanged;
        protected override void OnCollectionChanged(NotifyCollectionChangedEventArgs e)
        {
            NotifyCollectionChangedEventHandler CollectionChanged = this.CollectionChanged;
            if (CollectionChanged != null)
                foreach (NotifyCollectionChangedEventHandler nh in CollectionChanged.GetInvocationList())
                {
                    DispatcherObject dispObj = nh.Target as DispatcherObject;
                    if (dispObj != null)
                    {
                        Dispatcher dispatcher = dispObj.Dispatcher;
                        if (dispatcher != null && !dispatcher.CheckAccess())
                        {
                            dispatcher.BeginInvoke(
                                (Action)(() => nh.Invoke(this,
                                    new NotifyCollectionChangedEventArgs(NotifyCollectionChangedAction.Reset))),
                                DispatcherPriority.DataBind);
                            continue;
                        }
                    }
                    nh.Invoke(this, e);
                }
        }
    }

现在进行大规模的查找和替换,将所有ObservableCollection更改为MTObservableCollection,就可以继续了。


26
由于ObservableCollection<T>不是线程安全的,因此这段代码并不具备线程安全性。虽然它在大多数情况下能正常工作,但如果多个线程同时写入数据,就可能会出现重入异常或数据损坏,而这种问题很难追踪和解决。 - Robert Fraser
4
通常情况下,您不会直接将线程写入/添加到非线程化对象中。您可以使用进度事件,随着新项目被处理并准备好收集,更新线程信息。然后主线程可以异步调用将项目插入该列表中。该列表通过分发器进行异步更新。但我不知道您如何使用它,但我经常强烈推荐并持续每秒推送和拉取2,250至2,525个项目,有25个线程运行它。每个推送或拉取速度在90到101个项目之间。它24/7运行,仍然没有问题。 - Franck
3
这绝对不是线程安全的,你所做的只是切换到UI线程/调度程序,而不是使集合线程安全。 - Chad Grant
4
虽然不是线程安全的,但它确实回答了提问者的问题。也许问题应该重新命名为“如何安全地从不同的线程向UI线程的ObservableCollection中添加内容”。无论如何,这就是我搜索“线程安全的ObservableCollection”时想要找到的内容。 - avenmore
1
@shtse8 这个问题不是关于线程安全的集合。它是关于从另一个线程访问集合,也就是跨线程访问。调度程序本身是线程安全的,但这种实现方式使其对线程友好而非线程安全。 - Franck
显示剩余7条评论

12

ObservableConcurrentCollection 的功能有些有限。更好的实现在这里:https://www.codeproject.com/Articles/64936/Multithreaded-ObservableImmutableCollection - Anton Krouglov

1
public class ObservableCollectionThreadSafe<T>
    : ObservableCollection<T>, IDisposable
{
    #region Data
    private Dispatcher _dispatcher;
    private ReaderWriterLockSlim _lock;
    #endregion

    #region Constructor
    public ObservableCollectionThreadSafe()
    {
        _dispatcher = Dispatcher.CurrentDispatcher;
        _lock = new ReaderWriterLockSlim();
    }
    #endregion


    #region Overrides

    /// <summary>
    /// Clear all items
    /// </summary>
    protected override void ClearItems()
    {
        _dispatcher.InvokeIfRequired(() =>
            {
                _lock.EnterWriteLock();
                try
                {
                    base.ClearItems();
                }
                finally
                {
                    _lock.ExitWriteLock();
                }
            }, DispatcherPriority.DataBind);
    }

    /// <summary>
    /// Inserts an item
    /// </summary>
    protected override void InsertItem(int index, T item)
    {
        _dispatcher.InvokeIfRequired(() =>
        {
            if (index > this.Count)
                return;

            _lock.EnterWriteLock();
            try
            {
                base.InsertItem(index, item);
            }
            finally
            {
                _lock.ExitWriteLock();
            }
        }, DispatcherPriority.DataBind);

    }

    /// <summary>
    /// Moves an item
    /// </summary>
    protected override void MoveItem(int oldIndex, int newIndex)
    {
        _dispatcher.InvokeIfRequired(() =>
        {
            _lock.EnterReadLock();
            int itemCount = this.Count;
            _lock.ExitReadLock();

            if (oldIndex >= itemCount |
                newIndex >= itemCount |
                oldIndex == newIndex)
                return;

            _lock.EnterWriteLock();
            try
            {
                base.MoveItem(oldIndex, newIndex);
            }
            finally
            {
                _lock.ExitWriteLock();
            }
        }, DispatcherPriority.DataBind);



    }

    /// <summary>
    /// Removes an item
    /// </summary>
    protected override void RemoveItem(int index)
    {

        _dispatcher.InvokeIfRequired(() =>
        {
            if (index >= this.Count)
                return;

            _lock.EnterWriteLock();
            try
            {
                base.RemoveItem(index);
            }
            finally
            {
                _lock.ExitWriteLock();
            }
        }, DispatcherPriority.DataBind);
    }

    /// <summary>
    /// Sets an item
    /// </summary>
    protected override void SetItem(int index, T item)
    {
        _dispatcher.InvokeIfRequired(() =>
        {
            _lock.EnterWriteLock();
            try
            {
                base.SetItem(index, item);
            }
            finally
            {
                _lock.ExitWriteLock();
            }
        }, DispatcherPriority.DataBind);
    }
    #endregion

    #region Public Methods
    /// <summary>
    /// Return as a cloned copy of this Collection
    /// </summary>
    public T[] ToSyncArray()
    {
        _lock.EnterReadLock();
        try
        {
            T[] _sync = new T[this.Count];
            this.CopyTo(_sync, 0);
            return _sync;
        }
        finally
        {
            _lock.ExitReadLock();
        }
    }

    #region IDisposable Support
    private bool disposedValue = false; // To detect redundant calls

    protected virtual void Dispose(bool disposing)
    {
        if (!disposedValue)
        {
            if (disposing)
            {
                if (_lock != null)
                    _lock.Dispose();
            }

            // TODO: free unmanaged resources (unmanaged objects) and override a finalizer below.
            // TODO: set large fields to null.

            disposedValue = true;
        }
    }

    // TODO: override a finalize only if Dispose(bool disposing) above has code to free unmanaged resources.
    // ~ObservableCollectionThreadSafe() {
    //   // Do not change this code. Put cleanup code in Dispose(bool disposing) above.
    //   Dispose(false);
    // }

    // This code added to correctly implement the disposable pattern.
    public void Dispose()
    {
        // Do not change this code. Put cleanup code in Dispose(bool disposing) above.
        Dispose(true);
        // TODO: uncomment the following line if the finalize is overridden above.
        // GC.SuppressFinalize(this);
    }
    #endregion

    #endregion
}

/// <summary>
/// WPF Threading extension methods
/// </summary>
public static class WPFControlThreadingExtensions
{
    #region Public Methods
    /// <summary>
    /// A simple WPF threading extension method, to invoke a delegate
    /// on the correct thread if it is not currently on the correct thread
    /// Which can be used with DispatcherObject types
    /// </summary>
    /// <param name="disp">The Dispatcher object on which to do the Invoke</param>
    /// <param name="performAction">The delegate to run</param>
    /// <param name="priority">The DispatcherPriority</param>
    public static void InvokeIfRequired(this Dispatcher disp,
        Action performAction, DispatcherPriority priority)
    {
        if (disp.Thread != Thread.CurrentThread)
        {
            disp.Invoke(priority, performAction);
        }
        else
            performAction();
    }
    #endregion
}

1
这篇文章是为Xamarin.Forms用户编写的,但它也适用于任何需要使ObservableCollections线程安全的人:

https://codetraveler.io/2019/09/11/using-observablecollection-in-a-multi-threaded-xamarin-forms-application/

这是英语文本,意思是:

这是一个非常简短的解决方案。

在集合被初始化之后,加入以下内容:

Xamarin.Forms.BindingBase.EnableCollectionSynchronization(MyCollection, null, ObservableCollectionCallback);

将此方法添加到同一类中:
    void ObservableCollectionCallback(IEnumerable collection, object context, Action accessMethod, bool writeAccess)
    {
        // `lock` ensures that only one thread access the collection at a time
        lock (collection)
        {
            accessMethod?.Invoke();
        }
    }

这篇文章的作者是Brandon Minnick。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接