C#事件去抖动

57

我正在监听硬件事件消息,但需要去除抖动以避免过多的查询。

这是一个硬件事件,发送机器状态,我必须将其存储在数据库中进行统计目的,有时它的状态会经常变化(闪烁?)。在这种情况下,我只想存储“稳定”的状态,并希望通过简单地等待1-2秒钟来实现它,然后再将状态存储到数据库中。

这是我的代码:

private MachineClass connect()
{
    try
    {
        MachineClass rpc = new MachineClass();
        rpc.RxVARxH += eventRxVARxH;
        return rpc;
    }
    catch (Exception e1)
    {
        log.Error(e1.Message);
        return null;
    }
}

private void eventRxVARxH(MachineClass Machine)
{
    log.Debug("Event fired");
}

我将这种行为称为"防抖动"(debounce):等待几次才真正完成其工作:如果在防抖时间内再次触发相同事件,则必须取消第一个请求并开始等待防抖时间以完成第二个事件。

最好的选择是什么?仅一个一次性计时器吗?

要解释"防抖"函数,请参见此JavaScript实现的关键事件: http://benalman.com/code/projects/jquery-throttle-debounce/examples/debounce/


使用秒表来测量经过的时间。 - Ed S.
我必须驳回第一个请求 - 这听起来非常有问题。有没有特定的原因,为什么不能允许第一个请求继续进行,而忽略后续的请求? - Damien_The_Unbeliever
这是一个硬件事件,它发送机器状态,我必须将其存储在数据库中以进行统计目的。有时候它的状态会经常变化(闪烁?),在这种情况下,我想只存储“稳定”的状态,并且我希望通过等待1-2秒后再将状态存储到数据库中来实现它。也许我可以用一个计时器来做到这一点?我等待1-2秒钟来触发查询,在状态变化太快的情况下重置它。 - Tobia
1
@Damien_The_Unbeliever 这不是一个奇怪的请求,也不是微不足道的。它不能仅通过简单的计时器来处理。然而,响应式扩展确实涵盖了这种情况。 - Panagiotis Kanavos
1
一个非常类似的问题是等待FileSystemWatcher停止报告更改,例如在复制大文件时。您会收到很多更改事件,但实际上希望在最后一个事件之后等待一段时间,然后再尝试访问修改后的文件。 - Panagiotis Kanavos
尝试保留最后一个而不是第一个的问题在于,你几乎总是会遇到线程(或线程和计时器)之间的竞争。 - Damien_The_Unbeliever
23个回答

60

我曾经使用过这种方法来成功地消除了一些事件的抖动:

public static Action<T> Debounce<T>(this Action<T> func, int milliseconds = 300)
{
    var last = 0;
    return arg =>
    {
        var current = Interlocked.Increment(ref last);
        Task.Delay(milliseconds).ContinueWith(task =>
        {
            if (current == last) func(arg);
            task.Dispose();
        });
    };
}

使用方法

Action<int> a = (arg) =>
{
    // This was successfully debounced...
    Console.WriteLine(arg);
};
var debouncedWrapper = a.Debounce<int>();

while (true)
{
    var rndVal = rnd.Next(400);
    Thread.Sleep(rndVal);
    debouncedWrapper(rndVal);
}

它可能不像RX中的那样强大,但易于理解和使用。

跟进 2020-02-03

使用取消令牌修改@collie的解决方案,如下所示:

public static Action<T> Debounce<T>(this Action<T> func, int milliseconds = 300)
{
    CancellationTokenSource? cancelTokenSource = null;

    return arg =>
    {
        cancelTokenSource?.Cancel();
        cancelTokenSource = new CancellationTokenSource();

        Task.Delay(milliseconds, cancelTokenSource.Token)
            .ContinueWith(t =>
            {
                if (t.IsCompletedSuccessfully)
                {
                    func(arg);
                }
            }, TaskScheduler.Default);
    };
}

注:

  • 调用Cancel就足以处理CTS。
  • 成功完成的CTS在下一次调用之前不会被取消/处理。
  • 正如@collie所指出的,任务被处理,因此不需要对任务调用Dispose

我以前没有使用过取消令牌,可能使用不正确。


1
你是如何使用它的? - Léon Pelletier
6
很流畅,我花了一些时间才注意到你如何取消已经“运行”的动作 :-)。然而,这种方法存在一个问题,你无法对去抖动器进行反馈/控制,因此无法控制所有操作何时结束。当你处置主要对象时,你可能没有意识到去抖动操作将在处置后执行,这是令人头疼的。 - astrowalker
2
请参考 https://dev59.com/2l4b5IYBdhLWcg3w-10I#59296962,该版本使用取消令牌来清理未使用的任务。 - Collie
8
IsCompletedSuccessfully 只适用于 .NET Core。你可以使用 !t.IsCanceled 替代,以使代码在 .NET Framework 中也能正常工作。 - PEK
@PEK 在 .NET Standard 2.1 和所有的 .NET Core 版本中都可用,直到 .NET 7.0 版本。 - Max

55
这不是一个简单的请求,需要从头开始编写代码,因为有几个细节需要注意。类似的情况是监视 FileSystemWatcher 并在大量复制后等待事情平静下来,然后再尝试打开修改过的文件。
.NET 4.5中的响应式扩展是为处理这些场景而创建的。您可以使用Throttle, Buffer, WindowSample等方法轻松提供此类功能。将事件发布到Subject,应用其中一个窗口函数,例如仅在X秒或Y事件没有活动时获取通知,然后订阅该通知。
Subject<MyEventData> _mySubject=new Subject<MyEventData>();
....
var eventSequenc=mySubject.Throttle(TimeSpan.FromSeconds(1))
                          .Subscribe(events=>MySubscriptionMethod(events));

Throttle会返回滑动窗口中的最后一个事件,前提是在窗口内没有其他事件。任何事件都会重置窗口。

您可以在这里找到有关时间移位功能的非常好的概述。

当您的代码接收到事件时,只需要使用OnNext将其发布到Subject中:

_mySubject.OnNext(MyEventData);

如果您的硬件事件表现为典型的.NET事件,您可以使用Observable.FromEventPattern绕过Subject和手动发布,如此处所示:
var mySequence = Observable.FromEventPattern<MyEventData>(
    h => _myDevice.MyEvent += h,
    h => _myDevice.MyEvent -= h);  
_mySequence.Throttle(TimeSpan.FromSeconds(1))
           .Subscribe(events=>MySubscriptionMethod(events));

您也可以从任务中创建可观察对象,使用LINQ操作符组合事件序列来请求例如:使用Zip获取不同硬件事件的配对,使用另一个事件源来绑定Throttle / Buffer等,添加延迟等等。
Reactive Extensions可作为NuGet包提供,因此将它们添加到您的项目非常容易。
Stephen Cleary的书“Concurrency in C# Cookbook”是关于Reactive Extensions等内容的非常好的资源,介绍了您如何使用它以及它如何与.NET中的其他并发API(例如Tasks、Events等)配合使用。 Introduction to Rx是一系列优秀的文章(这就是我从中复制示例的地方),其中包含多个示例。 更新 使用您的特定示例,您可以执行以下操作:
IObservable<MachineClass> _myObservable;

private MachineClass connect()
{

    MachineClass rpc = new MachineClass();
   _myObservable=Observable
                 .FromEventPattern<MachineClass>(
                            h=> rpc.RxVARxH += h,
                            h=> rpc.RxVARxH -= h)
                 .Throttle(TimeSpan.FromSeconds(1));
   _myObservable.Subscribe(machine=>eventRxVARxH(machine));
    return rpc;
}

当然,这可以大大改进 - 观察和订阅都需要在某个时候被处理。此代码假定您只控制一个设备。如果您有多个设备,您可以在类内部创建可观察对象,以便每个 MachineClass 都公开并处理其自己的可观察对象。


1
这似乎是答案!谢谢...但是实现起来并不那么容易,我也不明白如何将你的例子应用到我的代码中。 - Tobia
谢谢,我明白之前错过的内容了! - Tobia
我更新了NuGet链接,因为旧的Rx-Main包已被取消列表并被取代。但是一些文档链接也需要更新。 - hlovdal

12

我遇到了一些问题。我尝试了这里的每个答案,但由于我使用的是Xamarin通用应用程序,似乎缺少这些答案中所需的某些内容,而且我不想添加任何其他包或库。我的解决方案完全符合我的预期,并且我没有遇到任何问题。希望能帮助到某些人。

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace OrderScanner.Models
{
    class Debouncer
    {
        private List<CancellationTokenSource> StepperCancelTokens = new List<CancellationTokenSource>();
        private int MillisecondsToWait;
        private readonly object _lockThis = new object(); // Use a locking object to prevent the debouncer to trigger again while the func is still running

        public Debouncer(int millisecondsToWait = 300)
        {
            this.MillisecondsToWait = millisecondsToWait;
        }

        public void Debouce(Action func)
        {
            CancelAllStepperTokens(); // Cancel all api requests;
            var newTokenSrc = new CancellationTokenSource();
            lock (_lockThis)
            {
                StepperCancelTokens.Add(newTokenSrc);
            }
            Task.Delay(MillisecondsToWait, newTokenSrc.Token).ContinueWith(task => // Create new request
            {
                if (!newTokenSrc.IsCancellationRequested) // if it hasn't been cancelled
                {
                    CancelAllStepperTokens(); // Cancel any that remain (there shouldn't be any)
                    StepperCancelTokens = new List<CancellationTokenSource>(); // set to new list
                    lock (_lockThis)
                    {
                        func(); // run
                    }
                }
            }, TaskScheduler.FromCurrentSynchronizationContext());
        }

        private void CancelAllStepperTokens()
        {
            foreach (var token in StepperCancelTokens)
            {
                if (!token.IsCancellationRequested)
                {
                    token.Cancel();
                }
            }
        }
    }
}

它被称为...

这样。
private Debouncer StepperDeboucer = new Debouncer(1000); // one second

StepperDeboucer.Debouce(() => { WhateverMethod(args) });

我不建议在机器要每秒发送数百个请求的情况下使用它,但对于用户输入,它的表现非常出色。我正在一个 Android / IOS 应用中的步进器上使用它,该应用调用一个 API 来完成步骤。

我不明白这个代码的意图是什么。如果Debounce被调用的频率超过了MillisecondsToWait毫秒,那么代码似乎永远不会执行。我有什么遗漏吗? - jjnguy
你试过了吗?在我的实现中完美运行。只需将去抖时间设置为2000左右,然后进行调试以查看其工作原理。 - Nieminen
2
哦,当然你没有得到输出,因为这是一个去抖动(debounce)而不是节流(throttle)。去抖动会等待一定时间内输入事件停止后再运行函数。如果你想要一个节流(每个一定时间内运行多少次),那么这不是你想要的解决方案。 - Nieminen
1
好的,现在更清楚了。谢谢你的澄清。我完全误解了代码的目标。 - jjnguy
1
如果您正在使用此方法与从UI线程传递的事件进行交互(通常是这样),则值得将TaskScheduler.FromCurrentSynchronizationContext()添加到.ContinueWith调用的第二个参数中。否则,您将不得不再次调用以将线程返回到UI,这会破坏性能。参见:https://dev59.com/fm855IYBdhLWcg3wc0Dy - Arlo
显示剩余2条评论

12

最近我在维护一款应用程序,该程序针对较旧版本的.NET框架(v3.5)进行优化。

我无法使用Reactive Extensions和Task Parallel Library,但我需要一种良好、干净、一致的方式来去抖动事件。这就是我想出来的方法:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;

namespace MyApplication
{
    public class Debouncer : IDisposable
    {
        readonly TimeSpan _ts;
        readonly Action _action;
        readonly HashSet<ManualResetEvent> _resets = new HashSet<ManualResetEvent>();
        readonly object _mutex = new object();

        public Debouncer(TimeSpan timespan, Action action)
        {
            _ts = timespan;
            _action = action;
        }

        public void Invoke()
        {
            var thisReset = new ManualResetEvent(false);

            lock (_mutex)
            {
                while (_resets.Count > 0)
                {
                    var otherReset = _resets.First();
                    _resets.Remove(otherReset);
                    otherReset.Set();
                }

                _resets.Add(thisReset);
            }

            ThreadPool.QueueUserWorkItem(_ =>
            {
                try
                {
                    if (!thisReset.WaitOne(_ts))
                    {
                        _action();
                    }
                }
                finally
                {
                    lock (_mutex)
                    {
                        using (thisReset)
                            _resets.Remove(thisReset);
                    }
                }
            });
        }

        public void Dispose()
        {
            lock (_mutex)
            {
                while (_resets.Count > 0)
                {
                    var reset = _resets.First();
                    _resets.Remove(reset);
                    reset.Set();
                }
            }
        }
    }
}

以下是在具有搜索文本框的Windows表单中使用它的示例:

public partial class Example : Form 
{
    private readonly Debouncer _searchDebouncer;

    public Example()
    {
        InitializeComponent();
        _searchDebouncer = new Debouncer(TimeSpan.FromSeconds(.75), Search);
        txtSearchText.TextChanged += txtSearchText_TextChanged;
    }

    private void txtSearchText_TextChanged(object sender, EventArgs e)
    {
        _searchDebouncer.Invoke();
    }

    private void Search()
    {
        if (InvokeRequired)
        {
            Invoke((Action)Search);
            return;
        }

        if (!string.IsNullOrEmpty(txtSearchText.Text))
        {
            // Search here
        }
    }
}

7

RX可能是最简单的选择,特别是如果您已经在应用程序中使用它。但如果没有,添加它可能有点过度。

对于基于UI的应用程序(如WPF),我使用以下类来使用DispatcherTimer:

public class DebounceDispatcher
{
    private DispatcherTimer timer;
    private DateTime timerStarted { get; set; } = DateTime.UtcNow.AddYears(-1);

    public void Debounce(int interval, Action<object> action,
        object param = null,
        DispatcherPriority priority = DispatcherPriority.ApplicationIdle,
        Dispatcher disp = null)
    {
        // kill pending timer and pending ticks
        timer?.Stop();
        timer = null;

        if (disp == null)
            disp = Dispatcher.CurrentDispatcher;

        // timer is recreated for each event and effectively
        // resets the timeout. Action only fires after timeout has fully
        // elapsed without other events firing in between
        timer = new DispatcherTimer(TimeSpan.FromMilliseconds(interval), priority, (s, e) =>
        {
            if (timer == null)
                return;

            timer?.Stop();
            timer = null;
            action.Invoke(param);
        }, disp);

        timer.Start();
    }
}

使用方法:

private DebounceDispatcher debounceTimer = new DebounceDispatcher();

private void TextSearchText_KeyUp(object sender, KeyEventArgs e)
{
    debounceTimer.Debounce(500, parm =>
    {
        Model.AppModel.Window.ShowStatus("Searching topics...");
        Model.TopicsFilter = TextSearchText.Text;
        Model.AppModel.Window.ShowStatus();
    });
}

键盘空闲200毫秒后才处理按键事件 - 之前的待处理事件将被丢弃。

还有一个Throttle方法,总是在给定的时间间隔之后触发事件:

    public void Throttle(int interval, Action<object> action,
        object param = null,
        DispatcherPriority priority = DispatcherPriority.ApplicationIdle,
        Dispatcher disp = null)
    {
        // kill pending timer and pending ticks
        timer?.Stop();
        timer = null;

        if (disp == null)
            disp = Dispatcher.CurrentDispatcher;

        var curTime = DateTime.UtcNow;

        // if timeout is not up yet - adjust timeout to fire 
        // with potentially new Action parameters           
        if (curTime.Subtract(timerStarted).TotalMilliseconds < interval)
            interval = (int) curTime.Subtract(timerStarted).TotalMilliseconds;

        timer = new DispatcherTimer(TimeSpan.FromMilliseconds(interval), priority, (s, e) =>
        {
            if (timer == null)
                return;

            timer?.Stop();
            timer = null;
            action.Invoke(param);
        }, disp);

        timer.Start();
        timerStarted = curTime;            
    }

7
我需要类似这样的东西,但是是一个Web应用程序,所以我不能将Action存储在变量中,它会在HTTP请求之间丢失。
根据其他答案和@Collie的想法,我创建了一个类,它根据一个唯一的字符串键来进行限流。
public static class Debouncer
{
    static ConcurrentDictionary<string, CancellationTokenSource> _tokens = new ConcurrentDictionary<string, CancellationTokenSource>();
    public static void Debounce(string uniqueKey, Action action, int seconds)
    {
        var token = _tokens.AddOrUpdate(uniqueKey,
            (key) => //key not found - create new
            {
                return new CancellationTokenSource();
            },
            (key, existingToken) => //key found - cancel task and recreate
            {
                existingToken.Cancel(); //cancel previous
                return new CancellationTokenSource();
            }
        );

        //schedule execution after pause
        Task.Delay(seconds * 1000, token.Token).ContinueWith(task =>
        {
            if (!task.IsCanceled)
            {
                action(); //run
                if (_tokens.TryRemove(uniqueKey, out var cts)) cts.Dispose(); //cleanup
            }
        }, token.Token);
    }
}

使用方法:

//throttle for 5 secs if it's already been called with this KEY
Debouncer.Debounce("Some-Unique-ID", () => SendEmails(), 5);

作为一个额外的好处,由于它基于字符串键,你可以使用内联的lambda函数。
Debouncer.Debounce("Some-Unique-ID", () => 
{
    //do some work here
}, 5);

6
这个小工具受到Mike Ward恶毒而巧妙的扩展尝试的启发。然而,这个工具会非常好地清理自己。
public static Action Debounce(this Action action, int milliseconds = 300)
{
    CancellationTokenSource lastCToken = null;

    return () =>
    {
        //Cancel/dispose previous
        lastCToken?.Cancel();
        try { 
            lastCToken?.Dispose(); 
        } catch {}          

        var tokenSrc = lastCToken = new CancellationTokenSource();

        Task.Delay(milliseconds).ContinueWith(task => { action(); }, tokenSrc.Token);
    };
}

注意:在这种情况下不需要处理任务。请参见此处的证据。 用法
Action DebounceToConsole;
int count = 0;

void Main()
{
    //Assign
    DebounceToConsole = ((Action)ToConsole).Debounce(50);

    var random = new Random();
    for (int i = 0; i < 50; i++)
    {
        DebounceToConsole();
        Thread.Sleep(random.Next(100));
    }
}

public void ToConsole()
{
    Console.WriteLine($"I ran for the {++count} time.");
}

好主意。取消/处理/新建/分配部分需要加上保护栏吗?此外,关于任务处理的参考资料很不错。我总是担心会出现僵尸任务。现在我可以稍微放心一些了。谢谢。 - Mike Ward
嗯。仔细查看CancellationTokenSource文档,它似乎是线程安全的,除了Dispose()方法“只有在CancellationTokenSource对象的所有其他操作完成后才能使用”。我相信在此用例中,在Cancel()之后Dispose()是安全的。然而,他们建议在这种情况下使用Dispose()应该包装在Try-Catch中。我会添加这个。 - Collie

5

Panagiotis的回答肯定是正确的,但我想给出一个更简单的例子,因为我花了一些时间来梳理如何使它工作。我的情况是用户在搜索框中输入内容,随着用户的输入,我们希望进行API调用以返回搜索建议,因此我们希望防抖动API调用,以免每次输入字符时都会进行一次调用。

我使用的是Xamarin.Android,但这对任何C#场景都适用...

private Subject<string> typingSubject = new Subject<string> ();
private IDisposable typingEventSequence;

private void Init () {
            var searchText = layoutView.FindViewById<EditText> (Resource.Id.search_text);
            searchText.TextChanged += SearchTextChanged;
            typingEventSequence = typingSubject.Throttle (TimeSpan.FromSeconds (1))
                .Subscribe (query => suggestionsAdapter.Get (query));
}

private void SearchTextChanged (object sender, TextChangedEventArgs e) {
            var searchText = layoutView.FindViewById<EditText> (Resource.Id.search_text);
            typingSubject.OnNext (searchText.Text.Trim ());
        }

public override void OnDestroy () {
            if (typingEventSequence != null)
                typingEventSequence.Dispose ();
            base.OnDestroy ();
        }

当您首次初始化屏幕/类时,创建一个事件来监听用户的输入(SearchTextChanged),然后还设置了一个与“typingSubject”相关的节流订阅。
接下来,在SearchTextChanged事件中,可以调用typingSubject.OnNext并传入搜索框的文本。在去抖动期间(1秒后),它将调用已订阅的事件(在我们的情况下是suggestionsAdapter.Get)。
最后,在关闭屏幕时,请确保处理该订阅!

4

创建了这个类来解决异步调用问题:

public class Debouncer
{
    private CancellationTokenSource _cancelTokenSource = null;

    public async Task Debounce(Func<Task> method, int milliseconds = 300)
    {
        _cancelTokenSource?.Cancel();
        _cancelTokenSource?.Dispose();

        _cancelTokenSource = new CancellationTokenSource();

        await Task.Delay(milliseconds, _cancelTokenSource.Token);

        await method();
    }
}

使用示例:

private Debouncer _debouncer = new Debouncer();
....
await _debouncer.Debounce(YourAwaitableMethod);

3

我需要一个Debounce方法用于Blazor,一直回到这个页面,所以我想分享我的解决方案,以帮助其他人。

public class DebounceHelper
{
    private CancellationTokenSource debounceToken = null;

    public async Task DebounceAsync(Func<CancellationToken, Task> func, int milliseconds = 1000)
    {
        try
        {
            // Cancel previous task
            if (debounceToken != null) { debounceToken.Cancel(); }

            // Assign new token
            debounceToken = new CancellationTokenSource();

            // Debounce delay
            await Task.Delay(milliseconds, debounceToken.Token);

            // Throw if canceled
            debounceToken.Token.ThrowIfCancellationRequested();

            // Run function
            await func(debounceToken.Token);
        }
        catch (TaskCanceledException) { }
    }
}

调用搜索函数的示例

<input type="text" @oninput=@(async (eventArgs) => await OnSearchInput(eventArgs)) />

@code {
    private readonly DebounceHelper debouncer = new DebounceHelper();

    private async Task OnSearchInput(ChangeEventArgs eventArgs)
    {
        await debouncer.DebounceAsync(async (cancellationToken) =>
        {
            // Search Code Here         
        });
    }
}


如果取消了,你在try块中抛出异常有充分的理由吗?通常使用if(token.IsCancellationRequested) {return;}更好,以避免异常作为控制流反模式。 - TheAtomicOption
@TheAtomicOption 不一定。我之前没有听说过这是反模式。只是为了确认,您认为 debounceToken.Token.ThrowIfCancellationRequested(); 应该替换为 if (debounceToken.Token.IsCancellationRequested) { return; } 吗? - Douglas Riddle
是的,你理解得对。但是在我自己测试时,当请求取消时,Task.Delay(milliseconds, debounceToken.Token) 会抛出异常,所以你还需要避免传递令牌并让延迟任务正常完成。我想如果它设计成要抛出异常,那么就是设计成要抛出异常。 - TheAtomicOption

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接