如何在不丢弃值的情况下减慢RX中的Observable速度?

3
我的情境: 我有一个计算需要每秒运行一次。在运行后,等待约200毫秒以便其他操作赶上来。如果计算超过一秒仍然在运行,它应该再次启动,但程序应该等待直到它完成并在完成后200毫秒后开始下一个计算。
我现在的做法:
_refreshFinished = new Subject<bool>();
_autoRefresher = Observable.Interval(TimeSpan.FromMilliseconds(1000))
   .Zip(_refreshFinished, (x,y) => x)
   .Subscribe(x => AutoRefresh(stuff));

这段代码的问题在于,我看不到在计算完成后如何设置延迟。Delay方法只能延迟可观察集合的第一个元素。通常这种行为是正确的,因为如果你想要缓冲所有元素,你将不得不缓冲无限数量的元素。但是,由于将对Autorefesh的调用延迟200ms也会延迟_refreshFinished的输出200ms,所以不会有缓冲开销。基本上,我想要一个Observable,每隔MaxTime(some_call,1000ms)就会触发一次,然后延迟200ms甚至更好,可以是一些动态值。在这一点上,我实际上并不关心正在运行的值,尽管这可能会在未来发生改变。
我对任何建议都持开放态度。
5个回答

3

Observable.Generate()有多个重载方法,可以动态调整下一个项目创建的时间。

例如:

IScheduler schd = Scheduler.TaskPool;
var timeout = TimeSpan.FromSeconds(1);
var shortDelay = TimeSpan.FromMilliseconds(200);
var longerDelay = TimeSpan.FromMilliseconds(500);
Observable.Generate(schd.Now, 
                    time => true, 
                    time => schd.Now, 
                    time => new object(), // your code here
                    time => schd.Now.Subtract(time) > timeout  ? shortDelay : longerDelay ,
                    schd);

你在倒数第二行有一个错别字。应该是timeout而不是timout。但我真的很喜欢你的解决方案,比我下面发布的要简单得多。 - LDomagala

1

有一种方法可以做到这一点。虽然等待时间必须在每个值上动态计算,但它可以工作并且相当通用,因此并不是最容易的事情。

当您使用此代码时,您只需在YOURCODE中插入应调用的代码,其他所有内容都会自动处理。您的代码基本上将在Max(yourCodeTime + extraDelay,usualCallTime + extraDelay)之后被调用。这意味着yourCode不会同时被调用两次,并且应用程序始终具有额外的延迟时间来执行其他操作。 如果有更简单/其他的方法来实现这一点,我很乐意听取建议。

double usualCallTime = 1000;
double extraDealy = 100;
var subject = new Subject<double>();
var subscription =
    sub.TimeInterval()
        .Select(x =>
            {
                var processingTime = x.Interval.TotalMilliseconds - x.Value;
                double timeToWait = 
                     Math.Max(0, usualCallTime - processingTime) + extraDelay;
                return Observable.Timer(TimeSpan.FromMilliseconds(timeToWait))
                    .Select(ignore => timeToWait);
            })
        .Switch()
        .Subscribe(x => {YOURCODE();sub.OnNext(x)});
sub.OnNext(0);

private static void YOURCODE()
{
    // do stuff here
    action.Invoke();
}

1

可以详细说明一下为什么这应该成为新的异步框架的工作吗? - Anderson Imes
由于他基本上是在等待某些计算完成,所以最好的方法似乎是等待那个计算完成。此外,“等待其他东西赶上来”对我来说听起来像是等待多个异步作业完成,这正是异步框架的用途。如果作业在给定时间内没有完成,只需添加一些超时并相应地处理它们即可。 - Andreas Krebs

0
假设您有一个现有的“IObservable”,那么以下内容将起作用。
var delay = TimeSpan.FromSeconds(1.0);
var actual = source.Scan(
    new ConcurrentQueue<object>(),
    (q, i) =>
        {
            q.Enqueue(i);
            return q;
        }).CombineLatest(
            Observable.Interval(delay),
            (q, t) =>
                {
                    object item;
                    if (q.TryDequeue(out item))
                    {
                        return item;
                    }

                    return null;
                }).Where(v => v != null);

'actual' 是你的结果可观察对象。但请记住,如果它还没有变成热可观察对象,上面的代码已经将其转换为热可观察对象。因此,你不会收到 'OnCompleted' 的调用。


0

如果我正确理解您的问题,您有一个长时间运行的计算函数,例如:

static String compute()
{
    int t = 300 + new Random().Next(1000);
    Console.Write("[{0}...", t);
    Thread.Sleep(t);
    Console.Write("]");
    return Guid.NewGuid().ToString();
}

你想每秒至少调用一次此函数,但不重叠调用,并且在调用之间具有最小200毫秒的恢复时间。下面的代码适用于这种情况。

我最初采用了更加功能性的方法(使用Scan()Timestamp()),更符合Rx的风格--因为我正在寻找一个好的Rx练习--但最终,这种非聚合方法更简单。

static void Main()
{
    TimeSpan period = TimeSpan.FromMilliseconds(1000);
    TimeSpan recovery = TimeSpan.FromMilliseconds(200);

    Observable
        .Repeat(Unit.Default)
        .Select(_ =>
        {
            var s = DateTimeOffset.Now;
            var x = compute();
            var delay = period - (DateTimeOffset.Now - s);
            if (delay < recovery)
                delay = recovery;

            Console.Write("+{0} ", (int)delay.TotalMilliseconds);

            return Observable.Return(x).Delay(delay).First();
        })
        .Subscribe(Console.WriteLine);
}

这是输出结果:

[1144...]+200 a7cb5d3d-34b9-4d44-95c9-3e363f518e52
[1183...]+200 359ad966-3be7-4027-8b95-1051e3fb20c2
[831...]+200 f433b4dc-d075-49fe-9c84-b790274982d9
[766...]+219 310c9521-7bee-4acc-bbca-81c706a4632a
[505...]+485 0715abfc-db9b-42e2-9ec7-880d7ff58126
[1244...]+200 30a3002a-924a-4a64-9669-095152906d85
[1284...]+200 e5b1cd79-da73-477c-bca0-0870f4b5c640
[354...]+641 a43c9df5-53e8-4b58-a0df-7561cf4b0483
[1094...]+200 8f25019c-77a0-4507-b05e-c9ab8b34bcc3
[993...]+200 840281bd-c8fd-4627-9324-372636f8dea3

[编辑:此示例使用 Rx 2.0(RC)2.0.20612.0]


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接