Is there a Threadsafe Observable collection in .NE

2019-01-11 11:07发布

问题:

Platform: WPF, .NET 4.0, C# 4.0

Problem: In the Mainwindow.xaml i have a ListBox bound to a Customer collection which is currently an ObservableCollection< Customer >.

ObservableCollection<Customer> c = new ObservableCollection<Customer>();

This collection can be updated via multiple sources, like FileSystem, WebService etc.

To allow parallel loading of Customers I have created a helper class

public class CustomerManager(ref ObsevableCollection<Customer> cust)

that internally spawns a new Task (from Parallel extensions library) for each customer Source and adds a new Customer instance to the customer collection object (passed by ref to its ctor).

The problem is that ObservableCollection< T> (or any collection for that matter) cannot be used from calls other than the UI thread and an exception is encountered:

"NotSupportedException – This type of CollectionView does not support changes to its SourceCollection from a thread different from the Dispatcher thread."

I tried using the

System.Collections.Concurrent.ConcurrentBag<Customer>

collection but it doesnot implement INotifyCollectionChanged interface. Hence my WPF UI won't get updated automatically.

So, is there a collection class that implements both property/collection change notifications and also allows calls from other non-UI threads?

By my initial bing/googling, there is none provided out of the box.

Edit: I created my own collection that inherits from ConcurrentBag< Customer > and also implements the INotifyCollectionChanged interface. But to my surprise even after invoking it in separate tasks, the WPF UI hangs until the task is completed. Aren't the tasks supposed to be executed in parallel and not block the UI thread?

Thanks for any suggestions, in advance.

回答1:

There are two possible approaches. The first would be to inherit from a concurrent collection and add INotifyCollectionChanged functionality, and the second would be to inherit from a collection that implements INotifyCollectionChanged and add concurrency support. I think it is far easier and safer to add INotifyCollectionChanged support to a concurrent collection. My suggestion is below.

It looks long but most of the methods just call the internal concurrent collection as if the caller were using it directly. The handful of methods that add or remove from the collection inject a call to a private method that raises the notification event on the dispatcher provided at construction, thus allowing the class to be thread safe but ensuring the notifications are raised on the same thread all the time.

using System;
using System.Collections;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Collections.Specialized;
using System.Runtime.InteropServices;
using System.Security.Permissions;
using System.Windows.Threading;

namespace Collections
{
    /// <summary>
    /// Concurrent collection that emits change notifications on a dispatcher thread
    /// </summary>
    /// <typeparam name="T">The type of objects in the collection</typeparam>
    [Serializable]
    [ComVisible(false)]
    [HostProtection(SecurityAction.LinkDemand, Synchronization = true, ExternalThreading = true)]
    public class ObservableConcurrentBag<T> : IProducerConsumerCollection<T>,
        IEnumerable<T>, ICollection, IEnumerable
    {
        /// <summary>
        /// The dispatcher on which event notifications will be raised
        /// </summary>
        private readonly Dispatcher dispatcher;

        /// <summary>
        /// The internal concurrent bag used for the 'heavy lifting' of the collection implementation
        /// </summary>
        private readonly ConcurrentBag<T> internalBag;

        /// <summary>
        /// Initializes a new instance of the ConcurrentBag<T> class that will raise <see cref="INotifyCollectionChanged"/> events
        /// on the specified dispatcher
        /// </summary>
        public ObservableConcurrentBag(Dispatcher dispatcher)
        {
            this.dispatcher = dispatcher;
            this.internalBag = new ConcurrentBag<T>();
        }

        /// <summary>
        /// Initializes a new instance of the ConcurrentBag<T> class that contains elements copied from the specified collection 
        /// that will raise <see cref="INotifyCollectionChanged"/> events on the specified dispatcher
        /// </summary>
        public ObservableConcurrentBag(Dispatcher dispatcher, IEnumerable<T> collection)
        {
            this.dispatcher = dispatcher;
            this.internalBag = new ConcurrentBag<T>(collection);
        }

        /// <summary>
        /// Occurs when the collection changes
        /// </summary>
        public event NotifyCollectionChangedEventHandler CollectionChanged;

        /// <summary>
        /// Raises the <see cref="CollectionChanged"/> event on the <see cref="dispatcher"/>
        /// </summary>
        private void RaiseCollectionChangedEventOnDispatcher(NotifyCollectionChangedEventArgs e)
        {
            this.dispatcher.BeginInvoke(new Action<NotifyCollectionChangedEventArgs>(this.RaiseCollectionChangedEvent), e);
        }

        /// <summary>
        /// Raises the <see cref="CollectionChanged"/> event
        /// </summary>
        /// <remarks>
        /// This method must only be raised on the dispatcher - use <see cref="RaiseCollectionChangedEventOnDispatcher" />
        /// to do this.
        /// </remarks>
        private void RaiseCollectionChangedEvent(NotifyCollectionChangedEventArgs e)
        {
            this.CollectionChanged(this, e);
        }

        #region Members that pass through to the internal concurrent bag but also raise change notifications

        bool IProducerConsumerCollection<T>.TryAdd(T item)
        {
            bool result = ((IProducerConsumerCollection<T>)this.internalBag).TryAdd(item);
            if (result)
            {
                this.RaiseCollectionChangedEventOnDispatcher(new NotifyCollectionChangedEventArgs(NotifyCollectionChangedAction.Add, item));
            }
            return result;
        }

        public void Add(T item)
        {
            this.internalBag.Add(item);
            this.RaiseCollectionChangedEventOnDispatcher(new NotifyCollectionChangedEventArgs(NotifyCollectionChangedAction.Add, item));
        }

        public bool TryTake(out T item)
        {
            bool result = this.TryTake(out item);
            if (result)
            {
                this.RaiseCollectionChangedEventOnDispatcher(new NotifyCollectionChangedEventArgs(NotifyCollectionChangedAction.Remove, item));
            }
            return result;
        }

        #endregion

        #region Members that pass through directly to the internal concurrent bag

        public int Count
        {
            get
            {
                return this.internalBag.Count;
            }
        }

        public bool IsEmpty
        {
            get
            {
                return this.internalBag.IsEmpty;
            }
        }

        bool ICollection.IsSynchronized
        {
            get
            {
                return ((ICollection)this.internalBag).IsSynchronized;
            }
        }

        object ICollection.SyncRoot
        {
            get
            {
                return ((ICollection)this.internalBag).SyncRoot;
            }
        }

        IEnumerator<T> IEnumerable<T>.GetEnumerator()
        {
            return ((IEnumerable<T>)this.internalBag).GetEnumerator();
        }

        IEnumerator IEnumerable.GetEnumerator()
        {
            return ((IEnumerable)this.internalBag).GetEnumerator();
        }

        public T[] ToArray()
        {
            return this.internalBag.ToArray();
        }

        void IProducerConsumerCollection<T>.CopyTo(T[] array, int index)
        {
            ((IProducerConsumerCollection<T>)this.internalBag).CopyTo(array, index);
        }

        void ICollection.CopyTo(Array array, int index)
        {
            ((ICollection)this.internalBag).CopyTo(array, index);
        }

        #endregion
    }
}


回答2:

Please take a look at the BindableCollection<T> from Caliburn.Micro library:

/// <summary>
/// A base collection class that supports automatic UI thread marshalling.
/// </summary>
/// <typeparam name="T">The type of elements contained in the collection.</typeparam>
#if !SILVERLIGHT && !WinRT
[Serializable]
#endif
public class BindableCollection<T> : ObservableCollection<T>, IObservableCollection<T> {

    /// <summary>
    ///   Initializes a new instance of the <see cref = "Caliburn.Micro.BindableCollection{T}" /> class.
    /// </summary>
    public BindableCollection() {
        IsNotifying = true;
    }

    /// <summary>
    ///   Initializes a new instance of the <see cref = "Caliburn.Micro.BindableCollection{T}" /> class.
    /// </summary>
    /// <param name = "collection">The collection from which the elements are copied.</param>
    /// <exception cref = "T:System.ArgumentNullException">
    ///   The <paramref name = "collection" /> parameter cannot be null.
    /// </exception>
    public BindableCollection(IEnumerable<T> collection) : base(collection) {
        IsNotifying = true;
    }

#if !SILVERLIGHT && !WinRT
    [field: NonSerialized]
#endif
    bool isNotifying; //serializator try to serialize even autogenerated fields

    /// <summary>
    ///   Enables/Disables property change notification.
    /// </summary>
#if !WinRT
    [Browsable(false)]
#endif
    public bool IsNotifying {
        get { return isNotifying; }
        set { isNotifying = value; }
    }

    /// <summary>
    ///   Notifies subscribers of the property change.
    /// </summary>
    /// <param name = "propertyName">Name of the property.</param>
#if WinRT || NET45
    public virtual void NotifyOfPropertyChange([CallerMemberName]string propertyName = "") {
#else
    public virtual void NotifyOfPropertyChange(string propertyName) {
#endif
        if(IsNotifying)
            Execute.OnUIThread(() => OnPropertyChanged(new PropertyChangedEventArgs(propertyName)));
    }

    /// <summary>
    ///   Raises a change notification indicating that all bindings should be refreshed.
    /// </summary>
    public void Refresh() {
        Execute.OnUIThread(() => {
            OnPropertyChanged(new PropertyChangedEventArgs("Count"));
            OnPropertyChanged(new PropertyChangedEventArgs("Item[]"));
            OnCollectionChanged(new NotifyCollectionChangedEventArgs(NotifyCollectionChangedAction.Reset));
        });
    }

    /// <summary>
    ///   Inserts the item to the specified position.
    /// </summary>
    /// <param name = "index">The index to insert at.</param>
    /// <param name = "item">The item to be inserted.</param>
    protected override sealed void InsertItem(int index, T item) {
        Execute.OnUIThread(() => InsertItemBase(index, item));
    }

    /// <summary>
    ///   Exposes the base implementation of the <see cref = "InsertItem" /> function.
    /// </summary>
    /// <param name = "index">The index.</param>
    /// <param name = "item">The item.</param>
    /// <remarks>
    ///   Used to avoid compiler warning regarding unverifiable code.
    /// </remarks>
    protected virtual void InsertItemBase(int index, T item) {
        base.InsertItem(index, item);
    }

#if NET || WP8 || WinRT
/// <summary>
/// Moves the item within the collection.
/// </summary>
/// <param name="oldIndex">The old position of the item.</param>
/// <param name="newIndex">The new position of the item.</param>
    protected sealed override void MoveItem(int oldIndex, int newIndex) {
        Execute.OnUIThread(() => MoveItemBase(oldIndex, newIndex));
    }

    /// <summary>
    /// Exposes the base implementation fo the <see cref="MoveItem"/> function.
    /// </summary>
    /// <param name="oldIndex">The old index.</param>
    /// <param name="newIndex">The new index.</param>
    /// <remarks>Used to avoid compiler warning regarding unverificable code.</remarks>
    protected virtual void MoveItemBase(int oldIndex, int newIndex) {
        base.MoveItem(oldIndex, newIndex);
    }
#endif

    /// <summary>
    ///   Sets the item at the specified position.
    /// </summary>
    /// <param name = "index">The index to set the item at.</param>
    /// <param name = "item">The item to set.</param>
    protected override sealed void SetItem(int index, T item) {
        Execute.OnUIThread(() => SetItemBase(index, item));
    }

    /// <summary>
    ///   Exposes the base implementation of the <see cref = "SetItem" /> function.
    /// </summary>
    /// <param name = "index">The index.</param>
    /// <param name = "item">The item.</param>
    /// <remarks>
    ///   Used to avoid compiler warning regarding unverifiable code.
    /// </remarks>
    protected virtual void SetItemBase(int index, T item) {
        base.SetItem(index, item);
    }

    /// <summary>
    ///   Removes the item at the specified position.
    /// </summary>
    /// <param name = "index">The position used to identify the item to remove.</param>
    protected override sealed void RemoveItem(int index) {
        Execute.OnUIThread(() => RemoveItemBase(index));
    }

    /// <summary>
    ///   Exposes the base implementation of the <see cref = "RemoveItem" /> function.
    /// </summary>
    /// <param name = "index">The index.</param>
    /// <remarks>
    ///   Used to avoid compiler warning regarding unverifiable code.
    /// </remarks>
    protected virtual void RemoveItemBase(int index) {
        base.RemoveItem(index);
    }

    /// <summary>
    ///   Clears the items contained by the collection.
    /// </summary>
    protected override sealed void ClearItems() {
        Execute.OnUIThread(ClearItemsBase);
    }

    /// <summary>
    ///   Exposes the base implementation of the <see cref = "ClearItems" /> function.
    /// </summary>
    /// <remarks>
    ///   Used to avoid compiler warning regarding unverifiable code.
    /// </remarks>
    protected virtual void ClearItemsBase() {
        base.ClearItems();
    }

    /// <summary>
    ///   Raises the <see cref = "E:System.Collections.ObjectModel.ObservableCollection`1.CollectionChanged" /> event with the provided arguments.
    /// </summary>
    /// <param name = "e">Arguments of the event being raised.</param>
    protected override void OnCollectionChanged(NotifyCollectionChangedEventArgs e) {
        if (IsNotifying) {
            base.OnCollectionChanged(e);
        }
    }

    /// <summary>
    ///   Raises the PropertyChanged event with the provided arguments.
    /// </summary>
    /// <param name = "e">The event data to report in the event.</param>
    protected override void OnPropertyChanged(PropertyChangedEventArgs e) {
        if (IsNotifying) {
            base.OnPropertyChanged(e);
        }
    }

    /// <summary>
    ///   Adds the range.
    /// </summary>
    /// <param name = "items">The items.</param>
    public virtual void AddRange(IEnumerable<T> items) {
        Execute.OnUIThread(() => {
            var previousNotificationSetting = IsNotifying;
            IsNotifying = false;
            var index = Count;
            foreach(var item in items) {
                InsertItemBase(index, item);
                index++;
            }
            IsNotifying = previousNotificationSetting;

            OnPropertyChanged(new PropertyChangedEventArgs("Count"));
            OnPropertyChanged(new PropertyChangedEventArgs("Item[]"));
            OnCollectionChanged(new NotifyCollectionChangedEventArgs(NotifyCollectionChangedAction.Reset));
        });
    }

    /// <summary>
    ///   Removes the range.
    /// </summary>
    /// <param name = "items">The items.</param>
    public virtual void RemoveRange(IEnumerable<T> items) {
        Execute.OnUIThread(() => {
            var previousNotificationSetting = IsNotifying;
            IsNotifying = false;
            foreach(var item in items) {
                var index = IndexOf(item);
                if (index >= 0) {
                    RemoveItemBase(index);
                }
            }
            IsNotifying = previousNotificationSetting;

            OnPropertyChanged(new PropertyChangedEventArgs("Count"));
            OnPropertyChanged(new PropertyChangedEventArgs("Item[]"));
            OnCollectionChanged(new NotifyCollectionChangedEventArgs(NotifyCollectionChangedAction.Reset));
        });
    }

    /// <summary>
    /// Called when the object is deserialized.
    /// </summary>
    /// <param name="c">The streaming context.</param>
    [OnDeserialized]
    public void OnDeserialized(StreamingContext c) {
        IsNotifying = true;
    }

    /// <summary>
    /// Used to indicate whether or not the IsNotifying property is serialized to Xml.
    /// </summary>
    /// <returns>Whether or not to serialize the IsNotifying property. The default is false.</returns>
    public virtual bool ShouldSerializeIsNotifying() {
        return false;
    }
}

Source

PS. Just take in mind that this class use some other classes from Caliburn.Micro so that you could either copy/pase all dependencies by your-self - OR - if you are not using any other application frameworks - just reference the library binary and give it a chance.



回答3:

I spent ages looking at all the solutions and none really fit what I needed, until I finally realized the problem: I didn't want a threadsafe list - I just wanted a non-threadsafe list that could be modified on any thread, but that notified changes on the UI thread.

(The reason for not wanting a threadsafe collection is the usual one - often you need to perform multiple operations, like "if it's not in the list, then add it" which threadsafe lists don't actually help with, so you want to control the locking yourself).

The solution turned out to be quite simple in concept and has worked well for me. Just create a new list class that implements IList<T> and INotifyCollectionChanged. Delegate all calls you need to an underlying implementation (e.g. a List<T>) and then call notifications on the UI thread where needed.

public class AlbumList : IList<Album>, INotifyCollectionChanged
{
    private readonly IList<Album> _listImplementation = new List<Album>();

    public event NotifyCollectionChangedEventHandler CollectionChanged;

    private void OnChanged(NotifyCollectionChangedEventArgs e)
    {
        Application.Current?.Dispatcher.Invoke(DispatcherPriority.Render, 
                     new Action(() => CollectionChanged?.Invoke(this, e)));
    }

    public void Add(Album item)
    {
        _listImplementation.Add(item);
        OnChanged(new NotifyCollectionChangedEventArgs(
                      NotifyCollectionChangedAction.Add, item));
    }

    public bool Remove(Album item)
    {
        int index = _listImplementation.IndexOf(item);
        var removed = index >= 0;
        if (removed)
        {
            _listImplementation.RemoveAt(index);
            OnChanged(new NotifyCollectionChangedEventArgs(
                          NotifyCollectionChangedAction.Remove, item, index));
        }
        return removed;
    }
    // ...snip...
}


回答4:

There's a detailed explanation and an implementation here. It was written mainly for .NET 3.5 SP1 but it will still work in 4.0.

The primary target of this implementation is when the "real" list exists longer than the bindable view of it (eg. if it is bound in a window that the user can open and close). If the lifetimes are the other way around (eg. you're updating the list from a background worker that runs only when the window is open), then there are some simpler designs available.