如何在TPL中实现持续运行的数据流块?

6
我已经设置了使用BufferBlock和ActionBlock的生产者/消费者数据流块,并在控制台应用程序中正常工作;将所有项目添加到BurfferBlock中并将其连接到其他操作项后,它可以很好地工作。
现在我想在服务内使用它,其中这个数据流块管道将始终保持运行状态,当外部事件中有消息可用时,它会进入bufferblock并开始处理。我该如何实现这一点?
到目前为止,我已经完成了以下工作:
public void SetupPipeline()
{
    FirstBlock = new ActionBlock<WorkItem>(new Action<WorkItem>(ProcessIncomingMessage),
    new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
    });

    BufferBlock = new BufferBlock<WorkItem>();

    GroupingDataflowBlockOptions GroupingDataflowBlockOptions = new GroupingDataflowBlockOptions();
    GroupingDataflowBlockOptions.Greedy = true;
    GroupingDataflowBlockOptions.BoundedCapacity = GroupingDataflowBlockOptions.Unbounded;
    CancellationTokenSource = new CancellationTokenSource();
    CancellationToken = CancellationTokenSource.Token;
    GroupingDataflowBlockOptions.CancellationToken = CancellationToken;
    BatchBlock = new BatchBlock<WorkItem>(BoundingCapacity, GroupingDataflowBlockOptions);

    ProcessItems = new ActionBlock<WorkItem[]>(WorkItems =>
        ProcessWorkItems(WorkItems.ToList<WorkItem>()),
        new ExecutionDataflowBlockOptions
      {
          CancellationToken = CancellationToken
      });

    Timer = new Timer(_ =>
            BatchBlock.TriggerBatch()
        );

    TimingBlock = new TransformBlock<WorkItem, WorkItem>(WorkItem =>
    {
        Timer.Change(TimerInterval, Timeout.Infinite);
        logger.Debug("Inside TimingBlock : " + WorkItem.ToString());
        return WorkItem;
    }, new ExecutionDataflowBlockOptions
    {
        CancellationToken = CancellationToken
    });

    BatchBlock.LinkTo(ProcessItems);
    TimingBlock.LinkTo(BatchBlock);
    BufferBlock.LinkTo(TimingBlock);
}

1
为什么你不直接去做呢?你尝试过什么,又是怎样失败的呢? - svick
@svick 我已经添加了我目前所实现的部分 - user2757350
那么,问题出在哪里?那段代码是否按照你的预期工作?是什么阻止你将任何事件发布到该管道中? - svick
我可以发布消息;但我怎样才能在没有定时器的情况下实现这一点呢?我不想调用Pipeline.Complete(),否则我将不得不重新初始化Pipeline;这是我不想要的(因为我试图保持该管道始终处于开放状态)。 - user2757350
2个回答

2

您的批处理大小由batchblock构造函数中的变量'BoundingCapacity'定义。当以下情况之一发生时,批处理将被发送:

  • 已接收到等于批处理大小的发布次数(在构造函数中指定)
  • 批处理块被标记为完成
  • 调用triggerbatch方法

看起来您希望在达到批处理大小或超时时发送批处理。如果是这样,并且批处理大小不是关键因素,我建议您只需向定时器添加一个重复间隔,并使批处理块下游的对象忽略空白发布。

实际上,您可能想要的,并且最符合数据流编程哲学的是,在开始发布一系列项目时创建新的批处理块,然后在完成或超时时将其完成。如果尚未存在,则新发布将创建新的批处理块。

尝试在仅基于第一次触发时在batchblock周围实现超时计时器的问题是,您将需要计算和验证对缓冲区块的发布,或者您将需要监视来自缓冲区块的发布。这两种情况都会产生很多丑陋的代码和/或违反块封装。


1
感谢您的建议;我想避免每次创建批处理块。正如您所看到的,我的程序基本上使用缓冲区将聊天消息转换为块。缓冲区与BoundingCapacity配合得很好;在我的情况下,我已将其设置为100。但是我不想等到所有100条消息都到达。我希望对BufferBlock有双重控制;例如,如果有100条消息或5秒钟(两者都可以配置)。解决方案符合我的需求,但我想知道是否有其他更好的解决方案。关键在于我希望BufferBlock在达到BoundinCapacity和超时时能够以双重模式运行。 - user2757350
一旦您将项目传递给数据流块,您需要忽略该项目。数据流块应该是数据驱动的。外部控制是不被赞同的。老实说,我会修改您的代码,以便“超时触发器”和批处理块包含在单个IPropagatingBlock中。 - VoteCoffee
1
@VeteCoffee - 这很有道理;这将把块分隔开来,我以后可以用其他东西替换... 谢谢 - user2757350
踩一下是可以的,但请留下反馈帮助我修改我的答案。 - VoteCoffee

2
作为一种过度简化,DataFlow 是使用一组方法处理一堆对象的方式。它不提供或期望任何特定的创建这些对象的方式。
如果要让管道保持活动状态,只需不终止应用程序即可。如果不想使用控制台应用程序,请创建一个服务来构建管道并将对象发送到它直到关闭为止。
消息只是您通过阅读数据、响应事件(无论意味着什么)或任何其他方式创建的对象。
至于外部事件,您指的是什么?有许多方法可以实现这一点:
- 如果数据来自另一个控制台应用程序,可以将一个应用程序的结果导出到另一个应用程序,解析从命令行应用程序的输入流中传入的数据,创建消息并将其传递给管道。 - 如果要监听请求的服务,可以托管 .NET Pipe、WCF 或 Web API 服务来侦听调用并将发布的数据传递给管道。 - 如果数据来自数据库,则可以轮询更改并将任何更改的数据发送到管道。
关键是,Dataflow 是关于处理数据,而不是关于监听事件。如果您正在寻找完全成熟的分布式代理系统,那么 DataFlow 不是您要找的东西。

外部事件,即来自不同渠道的通知更新;我正在将它们发布到缓冲块。我有24/7的服务在运行,监听这些传入的位置更新,并根据此进行一些下游处理... - user2757350

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接