在.NET 4中是否有线程安全的可观察集合?

23

平台:WPF,.NET 4.0,C# 4.0

问题:在MainWindow.xaml中,我有一个ListBox绑定到一个客户集合,该集合当前是ObservableCollection<Customer>。

ObservableCollection<Customer> c = new ObservableCollection<Customer>();

可以通过多个来源(例如FileSystem,WebService等)更新此集合。

为了允许并行加载客户,我创建了一个帮助类

public class CustomerManager(ref ObsevableCollection<Customer> cust)

它在内部为每个客户源生成新任务(来自Parallel扩展库),并将新的Customer实例添加到传递给其构造函数的cust客户集合对象(按引用传递)。

问题是ObservableCollection<T>(或任何集合)不能从除UI线程以外的调用中使用,并且会遇到异常:

"NotSupportedException-此类型的CollectionView不支持对其SourceCollection所做的更改来自于Dispatcher线程之外的线程."

我尝试使用

System.Collections.Concurrent.ConcurrentBag<Customer>

但是它没有实现INotifyCollectionChanged接口。因此,我的WPF UI将不会自动更新。

那么,是否有一种集合类既实现属性/集合更改通知,又允许从其他非UI线程调用?

根据我的Bing / Googling的初步结果,没有提供开箱即用的集合。

编辑:我创建了自己的集合,该集合继承自ConcurrentBag<Customer>并实现了INotifyCollectionChanged接口。但是令我惊讶的是,即使在单独的任务中调用它,WPF UI也会挂起,直到任务完成。 任务不应该并行执行而不阻塞UI线程吗?

提前感谢任何建议。


谢谢。看起来不错。正在尝试! - Vaibhav
嗨Darin,我使用了博客文章中提到的集合类。正如我在问题中提到的那样,我能够通过创建自己的集合类继承和实现所需的类/接口来合并Concurrent和Notification更改,但是UI被阻塞了。 使用Tasks的主要原因是具有UI响应性。 这仍然没有起作用!非常感谢您的建议。 - Vaibhav
链接无法使用,但我意外地发现了缓存版本 - Amanduh
2
这是一篇博客文章的有效链接[archive.org]:http://web.archive.org/web/20101105144104/http://www.deanchalk.me.uk/post/Thread-Safe-Dispatcher-Safe-Observable-Collection-for-WPF.aspx - Eccentropy
@Vaibhav,你的问题还适用吗? - Sevenate
1
对于线程安全的解决方案,请尝试访问http://www.codeproject.com/Articles/64936/Multithreaded-ObservableImmutableCollection - Anthony
4个回答

8
有两种可能的方法。第一种是继承自一个并发集合并添加INotifyCollectionChanged功能,第二种是继承自实现了INotifyCollectionChanged的集合并添加并发支持。我认为将INotifyCollectionChanged支持添加到并发集合中更容易、更安全。我的建议如下。
代码看起来很长,但大多数方法只是像调用内部并发集合一样调用它。少数几个从集合中添加或删除的方法注入了一个调用私有方法的命令,在构造时提供了dispatcher,从而允许类在线程安全的同时确保通知始终在同一线程上引发。
using System;
using System.Collections;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Collections.Specialized;
using System.Runtime.InteropServices;
using System.Security.Permissions;
using System.Windows.Threading;

namespace Collections
{
    /// <summary>
    /// Concurrent collection that emits change notifications on a dispatcher thread
    /// </summary>
    /// <typeparam name="T">The type of objects in the collection</typeparam>
    [Serializable]
    [ComVisible(false)]
    [HostProtection(SecurityAction.LinkDemand, Synchronization = true, ExternalThreading = true)]
    public class ObservableConcurrentBag<T> : IProducerConsumerCollection<T>,
        IEnumerable<T>, ICollection, IEnumerable
    {
        /// <summary>
        /// The dispatcher on which event notifications will be raised
        /// </summary>
        private readonly Dispatcher dispatcher;

        /// <summary>
        /// The internal concurrent bag used for the 'heavy lifting' of the collection implementation
        /// </summary>
        private readonly ConcurrentBag<T> internalBag;

        /// <summary>
        /// Initializes a new instance of the ConcurrentBag<T> class that will raise <see cref="INotifyCollectionChanged"/> events
        /// on the specified dispatcher
        /// </summary>
        public ObservableConcurrentBag(Dispatcher dispatcher)
        {
            this.dispatcher = dispatcher;
            this.internalBag = new ConcurrentBag<T>();
        }

        /// <summary>
        /// Initializes a new instance of the ConcurrentBag<T> class that contains elements copied from the specified collection 
        /// that will raise <see cref="INotifyCollectionChanged"/> events on the specified dispatcher
        /// </summary>
        public ObservableConcurrentBag(Dispatcher dispatcher, IEnumerable<T> collection)
        {
            this.dispatcher = dispatcher;
            this.internalBag = new ConcurrentBag<T>(collection);
        }

        /// <summary>
        /// Occurs when the collection changes
        /// </summary>
        public event NotifyCollectionChangedEventHandler CollectionChanged;

        /// <summary>
        /// Raises the <see cref="CollectionChanged"/> event on the <see cref="dispatcher"/>
        /// </summary>
        private void RaiseCollectionChangedEventOnDispatcher(NotifyCollectionChangedEventArgs e)
        {
            this.dispatcher.BeginInvoke(new Action<NotifyCollectionChangedEventArgs>(this.RaiseCollectionChangedEvent), e);
        }

        /// <summary>
        /// Raises the <see cref="CollectionChanged"/> event
        /// </summary>
        /// <remarks>
        /// This method must only be raised on the dispatcher - use <see cref="RaiseCollectionChangedEventOnDispatcher" />
        /// to do this.
        /// </remarks>
        private void RaiseCollectionChangedEvent(NotifyCollectionChangedEventArgs e)
        {
            this.CollectionChanged(this, e);
        }

        #region Members that pass through to the internal concurrent bag but also raise change notifications

        bool IProducerConsumerCollection<T>.TryAdd(T item)
        {
            bool result = ((IProducerConsumerCollection<T>)this.internalBag).TryAdd(item);
            if (result)
            {
                this.RaiseCollectionChangedEventOnDispatcher(new NotifyCollectionChangedEventArgs(NotifyCollectionChangedAction.Add, item));
            }
            return result;
        }

        public void Add(T item)
        {
            this.internalBag.Add(item);
            this.RaiseCollectionChangedEventOnDispatcher(new NotifyCollectionChangedEventArgs(NotifyCollectionChangedAction.Add, item));
        }

        public bool TryTake(out T item)
        {
            bool result = this.TryTake(out item);
            if (result)
            {
                this.RaiseCollectionChangedEventOnDispatcher(new NotifyCollectionChangedEventArgs(NotifyCollectionChangedAction.Remove, item));
            }
            return result;
        }

        #endregion

        #region Members that pass through directly to the internal concurrent bag

        public int Count
        {
            get
            {
                return this.internalBag.Count;
            }
        }

        public bool IsEmpty
        {
            get
            {
                return this.internalBag.IsEmpty;
            }
        }

        bool ICollection.IsSynchronized
        {
            get
            {
                return ((ICollection)this.internalBag).IsSynchronized;
            }
        }

        object ICollection.SyncRoot
        {
            get
            {
                return ((ICollection)this.internalBag).SyncRoot;
            }
        }

        IEnumerator<T> IEnumerable<T>.GetEnumerator()
        {
            return ((IEnumerable<T>)this.internalBag).GetEnumerator();
        }

        IEnumerator IEnumerable.GetEnumerator()
        {
            return ((IEnumerable)this.internalBag).GetEnumerator();
        }

        public T[] ToArray()
        {
            return this.internalBag.ToArray();
        }

        void IProducerConsumerCollection<T>.CopyTo(T[] array, int index)
        {
            ((IProducerConsumerCollection<T>)this.internalBag).CopyTo(array, index);
        }

        void ICollection.CopyTo(Array array, int index)
        {
            ((ICollection)this.internalBag).CopyTo(array, index);
        }

        #endregion
    }
}

你是否可以在引发事件时利用SynchronizationContext,以便在同一线程上执行。这将允许除集合创建者之外的其他人访问该事件。 - galford13x
2
发现一个错误:在调用“TryTake()”时,它会调用自身。结果是溢出异常。 - Leon Bohmann
为什么我在第74行收到“System.NullReferenceException”错误:“this.CollectionChanged(this, e);”? - Prodromos

3

我花了很长时间查看所有的解决方案,但没有一个真正符合我的需求,直到我最终意识到问题所在:我不想要一个线程安全的列表 - 我只是想要一个非线程安全的列表,可以在任何线程上进行修改,但在UI线程上通知更改。

(不想要线程安全集合的原因是通常需要执行多个操作,例如“如果它不在列表中,则添加”,线程安全列表实际上并没有帮助,因此您需要自己控制锁定)。

解决方案在概念上非常简单,并且对我很有效。只需创建一个实现 IList<T>INotifyCollectionChanged 的新列表类。将您需要的所有调用委托给基础实现(例如 List<T>),然后在需要时在UI线程上调用通知。

public class AlbumList : IList<Album>, INotifyCollectionChanged
{
    private readonly IList<Album> _listImplementation = new List<Album>();

    public event NotifyCollectionChangedEventHandler CollectionChanged;

    private void OnChanged(NotifyCollectionChangedEventArgs e)
    {
        Application.Current?.Dispatcher.Invoke(DispatcherPriority.Render, 
                     new Action(() => CollectionChanged?.Invoke(this, e)));
    }

    public void Add(Album item)
    {
        _listImplementation.Add(item);
        OnChanged(new NotifyCollectionChangedEventArgs(
                      NotifyCollectionChangedAction.Add, item));
    }

    public bool Remove(Album item)
    {
        int index = _listImplementation.IndexOf(item);
        var removed = index >= 0;
        if (removed)
        {
            _listImplementation.RemoveAt(index);
            OnChanged(new NotifyCollectionChangedEventArgs(
                          NotifyCollectionChangedAction.Remove, item, index));
        }
        return removed;
    }
    // ...snip...
}

3
请查看来自 Caliburn.Micro 库的 BindableCollection<T>
/// <summary>
/// A base collection class that supports automatic UI thread marshalling.
/// </summary>
/// <typeparam name="T">The type of elements contained in the collection.</typeparam>
#if !SILVERLIGHT && !WinRT
[Serializable]
#endif
public class BindableCollection<T> : ObservableCollection<T>, IObservableCollection<T> {

    /// <summary>
    ///   Initializes a new instance of the <see cref = "Caliburn.Micro.BindableCollection{T}" /> class.
    /// </summary>
    public BindableCollection() {
        IsNotifying = true;
    }

    /// <summary>
    ///   Initializes a new instance of the <see cref = "Caliburn.Micro.BindableCollection{T}" /> class.
    /// </summary>
    /// <param name = "collection">The collection from which the elements are copied.</param>
    /// <exception cref = "T:System.ArgumentNullException">
    ///   The <paramref name = "collection" /> parameter cannot be null.
    /// </exception>
    public BindableCollection(IEnumerable<T> collection) : base(collection) {
        IsNotifying = true;
    }

#if !SILVERLIGHT && !WinRT
    [field: NonSerialized]
#endif
    bool isNotifying; //serializator try to serialize even autogenerated fields

    /// <summary>
    ///   Enables/Disables property change notification.
    /// </summary>
#if !WinRT
    [Browsable(false)]
#endif
    public bool IsNotifying {
        get { return isNotifying; }
        set { isNotifying = value; }
    }

    /// <summary>
    ///   Notifies subscribers of the property change.
    /// </summary>
    /// <param name = "propertyName">Name of the property.</param>
#if WinRT || NET45
    public virtual void NotifyOfPropertyChange([CallerMemberName]string propertyName = "") {
#else
    public virtual void NotifyOfPropertyChange(string propertyName) {
#endif
        if(IsNotifying)
            Execute.OnUIThread(() => OnPropertyChanged(new PropertyChangedEventArgs(propertyName)));
    }

    /// <summary>
    ///   Raises a change notification indicating that all bindings should be refreshed.
    /// </summary>
    public void Refresh() {
        Execute.OnUIThread(() => {
            OnPropertyChanged(new PropertyChangedEventArgs("Count"));
            OnPropertyChanged(new PropertyChangedEventArgs("Item[]"));
            OnCollectionChanged(new NotifyCollectionChangedEventArgs(NotifyCollectionChangedAction.Reset));
        });
    }

    /// <summary>
    ///   Inserts the item to the specified position.
    /// </summary>
    /// <param name = "index">The index to insert at.</param>
    /// <param name = "item">The item to be inserted.</param>
    protected override sealed void InsertItem(int index, T item) {
        Execute.OnUIThread(() => InsertItemBase(index, item));
    }

    /// <summary>
    ///   Exposes the base implementation of the <see cref = "InsertItem" /> function.
    /// </summary>
    /// <param name = "index">The index.</param>
    /// <param name = "item">The item.</param>
    /// <remarks>
    ///   Used to avoid compiler warning regarding unverifiable code.
    /// </remarks>
    protected virtual void InsertItemBase(int index, T item) {
        base.InsertItem(index, item);
    }

#if NET || WP8 || WinRT
/// <summary>
/// Moves the item within the collection.
/// </summary>
/// <param name="oldIndex">The old position of the item.</param>
/// <param name="newIndex">The new position of the item.</param>
    protected sealed override void MoveItem(int oldIndex, int newIndex) {
        Execute.OnUIThread(() => MoveItemBase(oldIndex, newIndex));
    }

    /// <summary>
    /// Exposes the base implementation fo the <see cref="MoveItem"/> function.
    /// </summary>
    /// <param name="oldIndex">The old index.</param>
    /// <param name="newIndex">The new index.</param>
    /// <remarks>Used to avoid compiler warning regarding unverificable code.</remarks>
    protected virtual void MoveItemBase(int oldIndex, int newIndex) {
        base.MoveItem(oldIndex, newIndex);
    }
#endif

    /// <summary>
    ///   Sets the item at the specified position.
    /// </summary>
    /// <param name = "index">The index to set the item at.</param>
    /// <param name = "item">The item to set.</param>
    protected override sealed void SetItem(int index, T item) {
        Execute.OnUIThread(() => SetItemBase(index, item));
    }

    /// <summary>
    ///   Exposes the base implementation of the <see cref = "SetItem" /> function.
    /// </summary>
    /// <param name = "index">The index.</param>
    /// <param name = "item">The item.</param>
    /// <remarks>
    ///   Used to avoid compiler warning regarding unverifiable code.
    /// </remarks>
    protected virtual void SetItemBase(int index, T item) {
        base.SetItem(index, item);
    }

    /// <summary>
    ///   Removes the item at the specified position.
    /// </summary>
    /// <param name = "index">The position used to identify the item to remove.</param>
    protected override sealed void RemoveItem(int index) {
        Execute.OnUIThread(() => RemoveItemBase(index));
    }

    /// <summary>
    ///   Exposes the base implementation of the <see cref = "RemoveItem" /> function.
    /// </summary>
    /// <param name = "index">The index.</param>
    /// <remarks>
    ///   Used to avoid compiler warning regarding unverifiable code.
    /// </remarks>
    protected virtual void RemoveItemBase(int index) {
        base.RemoveItem(index);
    }

    /// <summary>
    ///   Clears the items contained by the collection.
    /// </summary>
    protected override sealed void ClearItems() {
        Execute.OnUIThread(ClearItemsBase);
    }

    /// <summary>
    ///   Exposes the base implementation of the <see cref = "ClearItems" /> function.
    /// </summary>
    /// <remarks>
    ///   Used to avoid compiler warning regarding unverifiable code.
    /// </remarks>
    protected virtual void ClearItemsBase() {
        base.ClearItems();
    }

    /// <summary>
    ///   Raises the <see cref = "E:System.Collections.ObjectModel.ObservableCollection`1.CollectionChanged" /> event with the provided arguments.
    /// </summary>
    /// <param name = "e">Arguments of the event being raised.</param>
    protected override void OnCollectionChanged(NotifyCollectionChangedEventArgs e) {
        if (IsNotifying) {
            base.OnCollectionChanged(e);
        }
    }

    /// <summary>
    ///   Raises the PropertyChanged event with the provided arguments.
    /// </summary>
    /// <param name = "e">The event data to report in the event.</param>
    protected override void OnPropertyChanged(PropertyChangedEventArgs e) {
        if (IsNotifying) {
            base.OnPropertyChanged(e);
        }
    }

    /// <summary>
    ///   Adds the range.
    /// </summary>
    /// <param name = "items">The items.</param>
    public virtual void AddRange(IEnumerable<T> items) {
        Execute.OnUIThread(() => {
            var previousNotificationSetting = IsNotifying;
            IsNotifying = false;
            var index = Count;
            foreach(var item in items) {
                InsertItemBase(index, item);
                index++;
            }
            IsNotifying = previousNotificationSetting;

            OnPropertyChanged(new PropertyChangedEventArgs("Count"));
            OnPropertyChanged(new PropertyChangedEventArgs("Item[]"));
            OnCollectionChanged(new NotifyCollectionChangedEventArgs(NotifyCollectionChangedAction.Reset));
        });
    }

    /// <summary>
    ///   Removes the range.
    /// </summary>
    /// <param name = "items">The items.</param>
    public virtual void RemoveRange(IEnumerable<T> items) {
        Execute.OnUIThread(() => {
            var previousNotificationSetting = IsNotifying;
            IsNotifying = false;
            foreach(var item in items) {
                var index = IndexOf(item);
                if (index >= 0) {
                    RemoveItemBase(index);
                }
            }
            IsNotifying = previousNotificationSetting;

            OnPropertyChanged(new PropertyChangedEventArgs("Count"));
            OnPropertyChanged(new PropertyChangedEventArgs("Item[]"));
            OnCollectionChanged(new NotifyCollectionChangedEventArgs(NotifyCollectionChangedAction.Reset));
        });
    }

    /// <summary>
    /// Called when the object is deserialized.
    /// </summary>
    /// <param name="c">The streaming context.</param>
    [OnDeserialized]
    public void OnDeserialized(StreamingContext c) {
        IsNotifying = true;
    }

    /// <summary>
    /// Used to indicate whether or not the IsNotifying property is serialized to Xml.
    /// </summary>
    /// <returns>Whether or not to serialize the IsNotifying property. The default is false.</returns>
    public virtual bool ShouldSerializeIsNotifying() {
        return false;
    }
}

源代码

PS. 请注意,该类使用 Caliburn.Micro 的其他类,因此您可以自行复制/粘贴所有依赖项 - 或者 - 如果您没有使用任何其他应用程序框架,请引用库二进制文件并给它一个机会。


使用其他框架并从另一个包中选择任何类,这没有任何问题,对吧? - heltonbiker
2
BindableCollection 不是线程安全的。它重写了 InsertItemRemoveItem 方法,但公共方法在调用这些虚拟方法之前会在调用线程上使用其他方法,例如 IndexOfCount - Yusuf Tarık Günaydın

0

这里有一个详细的解释和实现链接。它主要是针对.NET 3.5 SP1编写的,但在4.0中仍然可以使用。

这个实现的主要目标是当“真正”的列表存在时间比可绑定视图更长时(例如,如果它绑定在用户可以打开和关闭的窗口中)。如果生命周期相反(例如,您正在从仅在窗口打开时运行的后台工作程序更新列表),则有一些更简单的设计可用。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接