可观察对象节流

3

我正在尝试使用响应式扩展实现事件节流。

我的系统可能会为特定用户或其他实体类型高频率地引发事件。

我需要延迟一段特定的时间,然后在超时后使用最后一个值引发事件。

我的做法是这样的:

 private Subject<int> userBalanceObservable = new Subject<int>();
 userBalanceObservable.Sample(TimeSpan.FromSeconds(sampleSeconds))
            .Subscribe(sample => OnRaiseBalanceEvent(sample));

当事件发生时
userBalanceObservable.OnNext(userId);

编辑

这种方法的问题在于事件是基于传递给OnNext的最后一个值引发的,而我实际上需要为每个传递给OnNext的值都设置延迟。

例如,OnNext(1),OnNext(2),OnNext(3)。我需要对1、2、3分别进行延迟调用,但我只得到了最后一个值,即3。


1
下次请尝试发布一个最小完整可验证示例(http://stackoverflow.com/help/mcve),这样我们就可以确切地知道您想要实现什么。最好附带单元测试。 - Lee Campbell
你真的需要展示给我们所有的代码 - 特别是 OnRaiseBalanceEvent 中发生了什么以及 userBalanceObservable 如何获取其值。只是一个小提示,如果你正在使用 Subject,那么你可能做错了一些事情。 - Enigmativity
2个回答

1
每次达到采样间隔时,Sample都会发布最后一个值。如果这是您需要的行为,那就没问题了。请查看http://www.introtorx.com/content/v1.0.10621.0/13_TimeShiftedSequences.html#Sample以获取更多信息和减慢发射速率的其他方法。
关于您问题的新信息:
如果您想在达到某个超时时间后发出所有值,则可以将这些值分组,直到达到超时时间(注意:如果事件发射频率过高而不断添加事件而没有达到超时,则可能会耗尽内存)。
您可以创建一个缓冲区,直到Debounce超时到达才填充缓冲区,请参见SO上的此答案以获取指针:How to implement buffering with timeout in RX

实际上,它似乎并没有按照我的预期正常工作。我需要的是为每个特定项延迟一个示例,而不是一般的任何项。例如,OnNext(1),OnNext(2),OnNext(3),我希望对1、2、3进行延迟调用,而不是只获取最后一个值,即3。反应式扩展是否支持这样的功能? - NullReference
尝试使用 Delay。请参阅http://www.introtorx.com/content/v1.0.10621.0/13_TimeShiftedSequences.html#Delay。 - Peter Bons
@NullReference - 这不是你在问题中所要求的。你能否更新一下问题? - Enigmativity

1

仅仅使用缓冲不行吗?唯一的“问题”是OnRaiseBalanceEvent必须使用列表而不是一个值,但计算机科学中的所有问题都可以通过另一层间接性来解决 ;)

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Reactive.Subjects;
using System.Reactive.Linq;

namespace ConsoleApplication1
{
  class Program
  {
    static void Main(string[] args)
    {

      Subject<int> userBalanceObservable = new Subject<int>();
    userBalanceObservable.Buffer(TimeSpan.FromSeconds(2)) //get List of items
                     .Subscribe(sampleList => ProcessSamples(sampleList));

      int cont = 0;

      while (!Console.KeyAvailable)
        {
        userBalanceObservable.OnNext(cont);
        cont++;
        userBalanceObservable.OnNext(cont);
        cont++;
        Thread.Sleep(1000);
      }

    }

    private static void ProcessSamples(IList<int> sampleList)
    {
      Console.WriteLine("[{0}]", string.Join(", ", sampleList.ToArray()));
    }

  }
}

嗯,我认为缓冲区可以胜任这项工作。我可以使用 .Where(x=>x.Count >0) 来消除空序列并使用 Distinct() 来获取值的一个实例。 - NullReference

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接