TPL数据流块,延迟消息向下一个块的转发。

4

我需要一个数据流块,它可以根据消息(LogEntry)中的时间戳延迟将消息转发到下一个块。

这是我想出来的,但感觉不太对。有什么改进建议吗?

  private IPropagatorBlock<LogEntry, LogEntry> DelayedForwardBlock()
    {
        var buffer = new ConcurrentQueue<LogEntry>();

        var source = new BufferBlock<LogEntry>();

        var target = new ActionBlock<LogEntry>(item =>
        {
            buffer.Enqueue(item);
        });


        Task.Run(() =>
            {
                LogEntry entry;
                while (true)
                {
                    entry = null;
                    if (buffer.TryPeek(out entry))
                    {
                        if (entry.UtcTimestamp < (DateTime.UtcNow - TimeSpan.FromMinutes(5)))
                        {
                            buffer.TryDequeue(out entry);
                            source.Post(entry);
                        }
                    }
                }
            });


        target.Completion.ContinueWith(delegate
        {
            LogEntry entry;
            while (buffer.TryDequeue(out entry))
            {
                source.Post(entry);
            }

            source.Complete();
        });

        return DataflowBlock.Encapsulate(target, source);
    }

如果您的代码可以正常工作,那么将其提交到http://codereview.stackexchange.com/可能会更加合适。 - Daniel Kelley
1个回答

10

您可以简单地使用一个TransformBlock,使用Task.Delay异步等待延迟:

IPropagatorBlock<TItem, TItem> DelayedForwardBlock<TItem>(TimeSpan delay)
{
    return new TransformBlock<TItem, TItem>(async item =>
    {
        await Task.Delay(delay);
        return item;
    });
}

使用方法:

var block = DelayedForwardBlock<LogEntry>(TimeSpan.FromMinutes(5));

1
非常感谢。有时候事情可以这么简单 :-) - rudimenter

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接