PCL弱事件管理器从反应式扩展中处理事件需要3-7分钟进行处理并释放。

4
我正在使用Reactive库在PCL中实现WeakEventManager。它为订阅者保留弱引用,每次事件触发时获取订阅者的委托并触发该委托,但如果无法从弱引用中获取对象,则会释放与该委托的链接。问题是,在短时间内,弱引用返回null(但订阅者仍然存在),然后执行链接的释放。所以我的问题是为什么会发生这种情况,如何解决?代码中请注意:(请查看注释)
    private static IDisposable InternalSubscribeWeakly<TEventPattern, TEvent>(this IObservable<TEventPattern> observable, TEvent Weak_onNext, Action<TEvent, TEventPattern> onNext)
where TEvent : class
    {
        if (onNext.Target != null)
            throw new ArgumentException("onNext must refer to a static method, or else the subscription will still hold a strong reference to target");

        // Is the delegate alive?
        var Weak_onNextReferance = new WeakReference(Weak_onNext);

        //This is a link for that event, so if you want to unsubscribe from event you have to dispose this object
        IDisposable subscription = null;
        subscription = observable.Subscribe(item =>
        {
            //So the library keeps weak reference for this object and each time event fired it tries to get that object
            var current_onNext = Weak_onNextReferance.Target as TEvent;
            if (current_onNext != null)
            {
                //If the object was found, it uses the delegate that subscriber provided and fires the event
                onNext(current_onNext, item);
            }
            else
            {
                //If the object is not found it disposes the link
                //NOTE: For some reasons after a short amount of time it can't get a reference from the WeakReference, however the subscriber is still alive
                subscription.Dispose();
            }
        });
        return subscription;
    }

我是如何使用这个管理器进行订阅的:

private void NoLeakWindow_Loaded(object sender, RoutedEventArgs e)
{
    Loaded -= NoLeakWindow_Loaded;

    this.ObserveOn<Window, ElapsedEventHandler, ElapsedEventArgs>(h => (o, s) => h(o, s),
        r => MainWindow.EPublisher.EventTimer.Elapsed += r,
        r => MainWindow.EPublisher.EventTimer.Elapsed -= r)
        .SubscribeWeakly(EventTimer_Elapsed);

    this.ObserveOn<Window, ElapsedEventHandler, ElapsedEventArgs>(
        h => (o, s) => h(o, s),
        r => MainWindow.EPublisher.EventTimer.Elapsed += r,
        r => MainWindow.EPublisher.EventTimer.Elapsed -= r)
        .SubscribeWeakly(EventTimer_Elapsed2);
}

private void EventTimer_Elapsed(EventPattern<ElapsedEventArgs> e)
{
    MessageBox.Show("EventTimer_Elapsed By Timer");
}

private void EventTimer_Elapsed2(EventPattern<ElapsedEventArgs> e)
{
    MessageBox.Show("EventTimer2_Elapsed2 By Timer2");
}

我的事件发布器:

public class EventPublisher
{
    public Timer EventTimer = new Timer(3000);
    public Timer EventTimer2 = new Timer(2700);

    public event EventHandler<EventArgs> TimeElapsed;

    public EventPublisher()
    {
        EventTimer.Start();
        EventTimer2.Start();
    }
}

最后,这是WeakEventManager类的完整代码:

/// <summary>
    /// Static Class that holds the extension methods to handle events using weak references.
    /// This way we do not need to worry about unregistered the event handler.
    /// </summary>
    public static class WeakEventManager
    {
        /// <summary>
        /// Creates Observable for subscribing to it's event
        /// </summary>
        /// <typeparam name="T">The type of the T.</typeparam>
        /// <typeparam name="TDelegate">The type of the T delegate.</typeparam>
        /// <typeparam name="TArgs">The type of the T args.</typeparam>
        /// <param name="subscriber">The subscriber</param>
        /// <param name="converter">The converter.</param>
        /// <param name="add">The add</param>
        /// <param name="remove">The remove</param>
        /// <returns>IObservable</returns>
        public static IObservable<EventPattern<TArgs>> ObserveOn<T, TDelegate, TArgs>(this T subscriber, Func<EventHandler<TArgs>, TDelegate> converter, Action<TDelegate> add, Action<TDelegate> remove)
            where T : class
        {
            return Observable.FromEventPattern<TDelegate, TArgs>(
                converter,
                add,
                remove);
        }
        /// <summary>
        /// Subscribe's action to event
        /// </summary>
        /// <typeparam name="T">The type of the T.</typeparam>
        /// <param name="observable">The observable</param>
        /// <param name="onNext">The action</param>
        /// <returns></returns>
        public static IDisposable SubscribeWeakly<T>(this IObservable<T> observable, Action<T> onNext) where T : class
        {
            IDisposable Result = null;
            WeakSubscriberHelper<T> SubscriptionHelper = new WeakSubscriberHelper<T>(observable, ref Result, onNext);
            return Result;
        }

        private class WeakSubscriberHelper<T> where T : class
        {
            public WeakSubscriberHelper(IObservable<T> observable, ref IDisposable Result, Action<T> eventAction)
            {
                Result = observable.InternalSubscribeWeakly(eventAction, WeakSubscriberHelper<T>.StaticEventHandler);
            }

            public static void StaticEventHandler(Action<T> subscriber, T item)
            {
                subscriber(item);
            }
        }

        private static IDisposable InternalSubscribeWeakly<TEventPattern, TEvent>(this IObservable<TEventPattern> observable, TEvent Weak_onNext, Action<TEvent, TEventPattern> onNext)
where TEvent : class
        {
            if (onNext.Target != null)
                throw new ArgumentException("onNext must refer to a static method, or else the subscription will still hold a strong reference to target");

            // Is the delegate alive?
            var Weak_onNextReferance = new WeakReference(Weak_onNext);

            //This is a link for that event, so if you want to unsubscribe from event you have to dispose this object
            IDisposable subscription = null;
            subscription = observable.Subscribe(item =>
            {
                //So the library keeps weak reference for this object and each time event fired it tries to get that object
                var current_onNext = Weak_onNextReferance.Target as TEvent;
                if (current_onNext != null)
                {
                    //If the object was found, it uses the delegate that subscriber provided and fires the event
                    onNext(current_onNext, item);
                }
                else
                {
                    //If the object is not found it disposes the link
                    //NOTE: For some reasons after a short amount of time it can't get a reference from the WeakReference, however the subscriber is still alive
                    subscription.Dispose();
                }
            });
            return subscription;
        }

        public static IDisposable SubscribeWeakly<T, TWeakClass>(this IObservable<T> observable, TWeakClass WeakClass, Action<T> onNext) where T : class where TWeakClass : class
        {
            IDisposable Result = null;
            WeakClassSubscriberHelper<T> SubscriptionHelper = new WeakClassSubscriberHelper<T>(observable, WeakClass, ref Result, onNext);
            return Result;
        }

        private class WeakClassSubscriberHelper<T> where T : class
        {
            public WeakClassSubscriberHelper(IObservable<T> observable, object WeakClass, ref IDisposable Result, Action<T> eventAction)
            {
                Result = observable.InternalSubscribeWeaklyToClass(eventAction, WeakClass, WeakClassSubscriberHelper<T>.StaticEventHandler);
            }

            public static void StaticEventHandler(Action<T> subscriber, T item)
            {
                subscriber(item);
            }
        }

        private static IDisposable InternalSubscribeWeaklyToClass<TEventPattern, TEvent, TClass>(this IObservable<TEventPattern> observable, TEvent Weak_onNext, TClass WeakClass, Action<TEvent, TEventPattern> onNext)
    where TEvent : class where TClass : class
        {
            if (onNext.Target != null)
                throw new ArgumentException("onNext must refer to a static method, or else the subscription will still hold a strong reference to target");

            // The class instance could live in a differnt 
            // place than the eventhandler. If either one is null,
            // terminate the subscribtion.
            var WeakClassReference = new WeakReference(WeakClass);
            var Weak_onNextReferance = new WeakReference(Weak_onNext);

            IDisposable subscription = null;
            subscription = observable.Subscribe(item =>
            {
                var currentWeakClass = WeakClassReference.Target as TClass;
                var current_onNext = Weak_onNextReferance.Target as TEvent;
                if (currentWeakClass != null && current_onNext != null)
                {
                    onNext(current_onNext, item);
                }
                else
                {
                    subscription.Dispose();
                }
            });
            return subscription;
        }
    }

1
当对象被垃圾回收时,短弱引用的目标将变为null。我认为没有任何东西可以阻止TEvent委托参数的GC。 - supertopi
此外,将 Observable.FromEventPattern 包装到自己的运算符 Observable.ObserveOn 中非常令人困惑,因为在 Observable 静态类中已经存在一个同名的运算符(执行不同的操作)。 - supertopi
你能详细说明一下解决方案应该如何工作吗?在我看来,你正在重写Rx中已经存在的某些逻辑。使用“弱引用”模式与使用普通的Rx事件处理相比,最终目标是什么? - supertopi
所以我不会重写任何东西,这段WeakEventManager的代码是在codeproject上找到的。所以Rx UI有WeakEventManager - 但是你必须为你想订阅的每个事件编写一个子类,并重写两个方法:StartListening和StopListening。所以我不想这样做,希望能实现一些更通用的东西。上述代码应该解决了这个问题,但是引用被垃圾回收了。 - sponger94
另外,上面的代码没有提供一个很好的方法来移除处理程序,它为您提供了IDisposable - 但如果您正在尝试为集合中的每个项目添加处理程序,并且在某些情况下您必须取消订阅其中一些处理程序,则这是不方便的 - 这意味着您必须跟踪订阅者中的所有IDisposable项。将其封装在eventManager中会非常好,因此Rx UI的实现使用weaktable进行良好的跟踪 - 这就是我试图合并这些实现的原因。 - sponger94
1个回答

1
我想出了自己的解决方案。基本上,我将这两个实现合并为一个。因此,我的解决方案非常简单,我认为有很多方法可以改进它。
所以,这是解决方案:
/// <summary>
/// PclWeakEventManager base class
/// </summary>
/// <typeparam name="TEventSource">The type of the event source.</typeparam>
/// <typeparam name="TEventHandler">The type of the event handler.</typeparam>
/// <typeparam name="TEventArgs">The type of the event arguments.</typeparam>
public class PclWeakEventManager<TEventSource, TEventHandler, TEventArgs>
{
    static readonly object StaticSource = new object();

    /// <summary>
    /// Mapping between the target of the delegate (for example a Button) and the handler (EventHandler).
    /// Windows Phone needs this, otherwise the event handler gets garbage collected.
    /// </summary>
    ConditionalWeakTable<object, List<Delegate>> targetToEventHandler = new ConditionalWeakTable<object, List<Delegate>>();

    /// <summary>
    /// Mapping from the source of the event to the list of handlers. This is a CWT to ensure it does not leak the source of the event.
    /// </summary>
    ConditionalWeakTable<object, WeakHandlerList> sourceToWeakHandlers = new ConditionalWeakTable<object, WeakHandlerList>();

    /// <summary>
    /// Singleton instance
    /// </summary>
    static Lazy<PclWeakEventManager<TEventSource, TEventHandler, TEventArgs>> current =
        new Lazy<PclWeakEventManager<TEventSource, TEventHandler, TEventArgs>>(() => new PclWeakEventManager<TEventSource, TEventHandler, TEventArgs>());

    /// <summary>
    /// Get the singleton instance
    /// </summary>
    static PclWeakEventManager<TEventSource, TEventHandler, TEventArgs> Current
    {
        get { return current.Value; }
    }

    /// <summary>
    /// Initializes a new instance of the <see cref="PclWeakEventManager{TEventSource, TEventHandler, TEventArgs}"/> class.
    /// Protected to disallow instances of this class and force a subclass.
    /// </summary>
    protected PclWeakEventManager()
    {
    }

    #region Public static methods

    /// <summary>
    /// Adds a weak reference to the handler and associates it with the source.
    /// </summary>
    /// <param name="source">The source.</param>
    /// <param name="handler">The handler.</param>
    public static void AddHandler(TEventSource source, TEventHandler handler, Func<EventHandler<TEventArgs>, TEventHandler> converter, Action<TEventHandler> add, Action<TEventHandler> remove)
    {
        if (source == null) throw new ArgumentNullException("source");
        if (handler == null) throw new ArgumentNullException("handler");

        if (!typeof(TEventHandler).GetTypeInfo().IsSubclassOf(typeof(Delegate)))
        {
            throw new ArgumentException("Handler must be Delegate type");
        }
        var observable = Observable.FromEventPattern(converter, add, remove);
        Current.PrivateAddHandler(source, observable, handler);
    }

    /// <summary>
    /// Removes the association between the source and the handler.
    /// </summary>
    /// <param name="source">The source.</param>
    /// <param name="handler">The handler.</param>
    public static void RemoveHandler(TEventSource source, TEventHandler handler)
    {
        if (source == null) throw new ArgumentNullException("source");
        if (handler == null) throw new ArgumentNullException("handler");

        if (!typeof(TEventHandler).GetTypeInfo().IsSubclassOf(typeof(Delegate)))
        {
            throw new ArgumentException("handler must be Delegate type");
        }

        Current.PrivateRemoveHandler(source, handler);
    }

    #endregion

    #region Event delivering

    /// <summary>
    /// Delivers the event to the handlers registered for the source. 
    /// </summary>
    /// <param name="sender">The sender.</param>
    /// <param name="args">The <see cref="TEventArgs"/> instance containing the event data.</param>
    public static void DeliverEvent(TEventSource sender, TEventArgs args)
    {
        Current.PrivateDeliverEvent(sender, args);
    }

    /// <summary>
    /// Override this method to attach to an event.
    /// </summary>
    /// <param name="source">The source.</param>
    protected virtual void StartListening(TEventSource source, IObservable<EventPattern<TEventArgs>> observable, TEventHandler handler)
    {
        //The handler - proxy should be static, otherwise it will create a strong reference
        InternalSubscribeWeakly(observable, source, handler, DeliverEvent);
    }

    /// <summary>
    /// Override this method to detach from an event.
    /// </summary>
    /// <param name="source">The source.</param>
    protected virtual void StopListening(object source)
    {
        //This method is for future usage
    }

    /// <summary>
    /// Fire the event handler
    /// </summary>
    /// <param name="sender">Event publisher</param>
    /// <param name="args">Event arguments</param>
    void PrivateDeliverEvent(object sender, TEventArgs args)
    {
        object source = sender != null ? sender : StaticSource;
        var weakHandlers = default(WeakHandlerList);

        bool hasStaleEntries = false;

        if (this.sourceToWeakHandlers.TryGetValue(source, out weakHandlers))
        {
            using (weakHandlers.DeliverActive())
            {
                hasStaleEntries = weakHandlers.DeliverEvent(source, args);
            }
        }

        if (hasStaleEntries)
        {
            this.Purge(source);
        }
    }

    #endregion

    #region Add weak handler methods

    /// <summary>
    /// Adds the event handler to WeakTables
    /// </summary>
    /// <param name="source">The event publisher source</param>
    /// <param name="observable">Observable object</param>
    /// <param name="handler">The event handler. This is used to create a weak reference</param>
    void PrivateAddHandler(TEventSource source, IObservable<EventPattern<TEventArgs>> observable, TEventHandler handler)
    {
        this.AddWeakHandler(source, observable, handler);
        this.AddTargetHandler(handler);
    }

    /// <summary>
    /// Add a weak handler
    /// </summary>
    /// <param name="source">The event publisher source</param>
    /// <param name="observable">Observable object</param>
    /// <param name="handler">The event handler. This is used to create a weak reference</param>
    void AddWeakHandler(TEventSource source, IObservable<EventPattern<TEventArgs>> observable, TEventHandler handler)
    {
        WeakHandlerList weakHandlers;

        //If for the event source table wasn't created, then it creates a new
        if (this.sourceToWeakHandlers.TryGetValue(source, out weakHandlers))
        {
            // clone list if we are currently delivering an event
            if (weakHandlers.IsDeliverActive)
            {
                weakHandlers = weakHandlers.Clone();
                this.sourceToWeakHandlers.Remove(source);
                this.sourceToWeakHandlers.Add(source, weakHandlers);
            }
            weakHandlers.AddWeakHandler(source, handler);
        }
        else
        {
            weakHandlers = new WeakHandlerList();
            weakHandlers.AddWeakHandler(source, handler);

            this.sourceToWeakHandlers.Add(source, weakHandlers);
            this.StartListening(source, observable, handler);
        }

        this.Purge(source);
    }

    /// <summary>
    /// Subscribe to the event
    /// </summary>
    /// <param name="observable">Observable object</param>
    /// <param name="source">The event publisher source</param>
    /// <param name="handler">The event handler. This is used to create a weak reference</param>
    /// <param name="onNext">Event handler delegate</param>
    private static void InternalSubscribeWeakly(IObservable<EventPattern<TEventArgs>> observable, TEventSource source, TEventHandler handler, Action<TEventSource, TEventArgs> onNext)
    {
        if (onNext.Target != null)
            throw new ArgumentException("onNext must refer to a static method, or else the subscription will still hold a strong reference to target");

        // Is the delegate alive?
        var Weak_onNextReferance = new WeakReference(handler);

        //This is a link for that event, so if you want to unsubscribe from event you have to dispose this object
        IDisposable subscription = null;
        subscription = observable.Subscribe(item =>
        {
            //Purge handler if the subscriber is not alive
            Current.Purge(source);
            //So the library keeps weak reference for this object and each time event fired it tries to get that object
            var current_onNext = Weak_onNextReferance.Target;
            if (current_onNext != null)
            {
                //If the object was found, it uses the delegate that subscriber provided and fires the event
                onNext((TEventSource)item.Sender, item.EventArgs);
            }
            else
            {
                //If the object is not found it disposes the link
                subscription.Dispose();
                Current.sourceToWeakHandlers.Remove(source);
            }
        });
    }

    /// <summary>
    /// Adds the event handler to the weak event handlers list
    /// </summary>
    /// <param name="handler">The event handler. This is used to create a weak reference</param>
    void AddTargetHandler(TEventHandler handler)
    {
        var @delegate = handler as Delegate;
        object key = @delegate.Target ?? StaticSource;
        List<Delegate> delegates;

        if (this.targetToEventHandler.TryGetValue(key, out delegates))
        {
            delegates.Add(@delegate);
        }
        else
        {
            delegates = new List<Delegate>();
            delegates.Add(@delegate);

            this.targetToEventHandler.Add(key, delegates);
        }
    }

    #endregion

    #region Remove weak handler methods

    /// <summary>
    /// Remove the event handler
    /// </summary>
    /// <param name="source">Event source object</param>
    /// <param name="handler">The event handler</param>
    void PrivateRemoveHandler(TEventSource source, TEventHandler handler)
    {
        this.RemoveWeakHandler(source, handler);
        this.RemoveTargetHandler(handler);
    }

    /// <summary>
    /// Remove the event handler
    /// </summary>
    /// <param name="source">Event source object</param>
    /// <param name="handler">The event handler</param>
    void RemoveWeakHandler(TEventSource source, TEventHandler handler)
    {
        var weakHandlers = default(WeakHandlerList);

        if (this.sourceToWeakHandlers.TryGetValue(source, out weakHandlers))
        {
            // clone list if we are currently delivering an event
            if (weakHandlers.IsDeliverActive)
            {
                weakHandlers = weakHandlers.Clone();
                this.sourceToWeakHandlers.Remove(source);
                this.sourceToWeakHandlers.Add(source, weakHandlers);
            }

            if (weakHandlers.RemoveWeakHandler(source, handler) && weakHandlers.Count == 0)
            {
                this.sourceToWeakHandlers.Remove(source);
                this.StopListening(source);
            }
        }
    }

    /// <summary>
    /// Remove the handler from weaktable
    /// </summary>
    /// <param name="handler">The event handler</param>
    void RemoveTargetHandler(TEventHandler handler)
    {
        var @delegate = handler as Delegate;
        object key = @delegate.Target ?? StaticSource;

        var delegates = default(List<Delegate>);
        if (this.targetToEventHandler.TryGetValue(key, out delegates))
        {
            delegates.Remove(@delegate);

            if (delegates.Count == 0)
            {
                this.targetToEventHandler.Remove(key);
            }
        }
    }

    /// <summary>
    /// Remove dead handlers
    /// </summary>
    /// <param name="source">Source object</param>
    void Purge(object source)
    {
        var weakHandlers = default(WeakHandlerList);

        if (this.sourceToWeakHandlers.TryGetValue(source, out weakHandlers))
        {
            if (weakHandlers.IsDeliverActive)
            {
                weakHandlers = weakHandlers.Clone();
                this.sourceToWeakHandlers.Remove(source);
                this.sourceToWeakHandlers.Add(source, weakHandlers);
            }
            else
            {
                weakHandlers.Purge();
            }
        }
    }

    #endregion

    #region WeakHandler table helper classes

    /// <summary>
    /// Weak handler helper class
    /// </summary>
    internal class WeakHandler
    {
        WeakReference source;
        WeakReference originalHandler;

        public bool IsActive
        {
            get { return this.source != null && this.source.IsAlive && this.originalHandler != null && this.originalHandler.IsAlive; }
        }

        public TEventHandler Handler
        {
            get
            {
                if (this.originalHandler == null)
                {
                    return default(TEventHandler);
                }
                else
                {
                    return (TEventHandler)this.originalHandler.Target;
                }
            }
        }

        public WeakHandler(object source, TEventHandler originalHandler)
        {
            this.source = new WeakReference(source);
            this.originalHandler = new WeakReference(originalHandler);
        }

        /// <summary>
        /// Checks if provided handler is the same
        /// </summary>
        /// <param name="source"></param>
        /// <param name="handler"></param>
        /// <returns>True if source.Target is equals to source, otherwise false</returns>
        public bool Matches(object source, TEventHandler handler)
        {
            return this.source != null &&
                object.ReferenceEquals(this.source.Target, source) &&
                this.originalHandler != null;
        }
    }

    /// <summary>
    /// Weak event handler manager
    /// </summary>
    internal class WeakHandlerList
    {
        int deliveries = 0;
        List<WeakHandler> handlers;

        public WeakHandlerList()
        {
            handlers = new List<WeakHandler>();
        }

        /// <summary>
        /// Adds new weak event handler to the list
        /// </summary>
        /// <param name="source">The event source</param>
        /// <param name="handler">The event handler</param>
        public void AddWeakHandler(TEventSource source, TEventHandler handler)
        {
            WeakHandler handlerSink = new WeakHandler(source, handler);
            handlers.Add(handlerSink);
        }

        /// <summary>
        /// Remove weak handler from the list
        /// </summary>
        /// <param name="source">The event source</param>
        /// <param name="handler">The event handler</param>
        /// <returns>True if the handler was removed, otherwise false</returns>
        public bool RemoveWeakHandler(TEventSource source, TEventHandler handler)
        {
            foreach (var weakHandler in handlers)
            {
                if (weakHandler.Matches(source, handler))
                {
                    return handlers.Remove(weakHandler);
                }
            }

            return false;
        }

        /// <summary>
        /// Clones the list
        /// </summary>
        /// <returns></returns>
        public WeakHandlerList Clone()
        {
            WeakHandlerList newList = new WeakHandlerList();
            newList.handlers.AddRange(this.handlers.Where(h => h.IsActive));

            return newList;
        }

        /// <summary>
        /// Items count
        /// </summary>
        public int Count
        {
            get { return this.handlers.Count; }
        }

        /// <summary>
        /// True if any of the events are still in delivering process
        /// </summary>
        public bool IsDeliverActive
        {
            get { return this.deliveries > 0; }
        }


        public IDisposable DeliverActive()
        {
            Interlocked.Increment(ref this.deliveries);

            return Disposable.Create(() => Interlocked.Decrement(ref this.deliveries));
        }

        /// <summary>
        /// Fire the handler
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="args"></param>
        /// <returns></returns>
        public virtual bool DeliverEvent(object sender, TEventArgs args)
        {
            bool hasStaleEntries = false;

            foreach (var handler in handlers)
            {
                if (handler.IsActive)
                {
                    var @delegate = handler.Handler as Delegate;
                    @delegate.DynamicInvoke(sender, args);
                }
                else
                {
                    hasStaleEntries = true;
                }
            }

            return hasStaleEntries;
        }

        /// <summary>
        /// Removes dead handlers
        /// </summary>
        public void Purge()
        {
            for (int i = handlers.Count - 1; i >= 0; i--)
            {
                if (!handlers[i].IsActive)
                {
                    handlers.RemoveAt(i);
                }
            }
        }
    }

    #endregion
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接