TPL数据流循环中的完成处理

4

我有一个关于如何在循环TPL数据流中检测完成的问题。

我的数据流中有一个反馈循环,它向远程服务器发出GET请求并处理数据响应(使用更多的数据流进行转换,然后提交结果)。

数据源将其结果拆分为每页1000条记录,并不告诉我有多少可用页面。我必须一直读取,直到获取到不足一页的数据。

通常页面数量为1,经常达到10,偶尔会有数千个页面。

一开始我有很多请求需要获取。
我希望能够使用线程池来处理这些请求,这一切都没问题,我可以排队多个数据请求并同时请求它们。如果我遇到需要获取大量页面的情况,我希望使用所有线程来处理。我不想留下一个线程在忙碌而其他线程已经完成的情况。

我遇到的问题是当我将这个逻辑放入数据流中时,例如:

//generate initial requests for activity
var request = new TransformManyBlock<int, DataRequest>(cmp => QueueRequests(cmp));

//fetch the initial requests and feedback more requests to our input buffer if we need to
TransformBlock<DataRequest, DataResponse> fetch = null;
fetch = new TransformBlock<DataRequest, DataResponse>(async req =>
{
    var resp = await Fetch(req);

    if (resp.Results.Count == 1000)
        await fetch.SendAsync(QueueAnotherRequest(req));

    return resp;
}
, new ExecutionDataflowBlockOptions {  MaxDegreeOfParallelism = 10 });

//commit each type of request
var commit = new ActionBlock<DataResponse>(async resp => await Commit(resp));

request.LinkTo(fetch);
fetch.LinkTo(commit);

//when are we complete?
QueueRequests生成了一个IEnumerable<DataRequest>。我一次性排队了下一个N个页面的请求,接受这意味着我发送了比我需要的多一点的调用。 DataRequest实例共享一个LastPage计数器,以避免不必要地进行我们知道在最后一页之后的请求。这一切都很好。
问题是: 如果像我在这个例子中展示的那样通过反馈更多的请求到fetch的输入缓冲区来循环,那么我就有一个关于如何信号化(甚至检测)完成的问题。 我不能从请求设置完成到fetch,因为一旦设置完成,我就不能再反馈任何内容了。
我可以监视fetch的输入和输出缓冲区是否为空,但我认为当我设置完成时,我会冒险fetch仍然忙于处理请求,从而阻止为其他页面排队请求。
我需要一些方法来知道fetch正在繁忙(具有输入或正忙于处理输入)。
我是否忽略了解决此问题的明显/简单方法?
- 我可以在fetch内部循环,而不是排队更多的请求。问题在于我想能够使用一组最大线程数来限制我对远程服务器所做的事情。 块内部的并行循环能否与块本身共享调度程序,并通过调度程序控制结果线程计数? - 我可以为fetch创建一个自定义转换块来处理完成信号。 似乎对于这样一个简单的场景来说,需要很多工作。
非常感谢您提供的任何帮助!

你知道所有请求都在第一个块中生成的那一刻吗? - VMAtm
是的,为了启动管道,我调用 foreach (var c in todolist) { request.Post(c); };。然后,我可以调用 request.Complete(); 因为我不会再添加任何请求了。 - ajk
@ajk,如果你正在做这件事,为什么不直接在所有块链接上使用a.LinkTo(b, new DataflowLinkOptions { PropagateCompletion = true })呢?然后调用request.Complete()将会导致commit.Completion在所有项目通过管道的所有阶段后自然地转换为已完成状态。 - Kirill Shlenskiy
@KirillShlenskiy。是的,那会很好,但是在fetch处于完成状态之后,它将不再接受任何消息,这就是fetch本身正在产生的。因此,行await fetch.SendAsync无法成功。 - ajk
2个回答

1
在TPL Dataflow中,您可以使用DataflowLinkOptions链接块,并指定块完成的传播propagation点击此处了解如何链接块
request.LinkTo(fetch, new DataflowLinkOptions { PropagateCompletion = true });
fetch.LinkTo(commit, new DataflowLinkOptions { PropagateCompletion = true });

之后,您只需为request块调用Complete()方法,就完成了!

// the completion will be propagated to all the blocks
request.Complete();

您应该使用最后一个块的Completion任务属性作为最终工具:

commit.Completion.ContinueWith(t =>
    {
        /* check the status of the task and correctness of the requests handling */
    });

嗨@VMAtm,是的,如上面评论中所讨论的那样,这是可以理解的。但是一旦完成已传播到Fetch,Fetch就不能再向其输入缓冲区发布更多消息了。如果Fetch在收到响应时发现有更多数据可用,则会将消息反馈给自己。当在Fetch上设置完成时,此反馈方法不再允许。 - ajk
那么,你只需将完成状态从 fetch 传递到 commit,并使用 request.Completion.ContinueWith 循环来检查 fetch 的状态,就像你在答案中所做的那样。 - VMAtm
非常感谢。我不确定是否有更好的方法来知道获取已完成,但如果没有,我可以接受这个! - ajk

0

目前我已经在抓取块中加入了一个简单的忙状态计数器:

int fetch_busy = 0;

TransformBlock<DataRequest, DataResponse>  fetch_activity=null;
fetch = new TransformBlock<DataRequest, ActivityResponse>(async req => 
    {
        try
        {
            Interlocked.Increment(ref fetch_busy);
            var resp = await Fetch(req);

            if (resp.Results.Count == 1000)
            {
                await fetch.SendAsync( QueueAnotherRequest(req) );
            }

            Interlocked.Decrement(ref fetch_busy);
            return resp;
        }
        catch (Exception ex)
        {
            Interlocked.Decrement(ref fetch_busy);
            throw ex;
        }
    }
    , new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 10 });

然后我使用它来发出完成信号,如下所示:

request.Completion.ContinueWith(async _ =>
    {
        while ( fetch.InputCount > 0 || fetch_busy > 0 )
        {
            await Task.Delay(100);
        }

        fetch.Complete();
    });

这似乎不太优雅,但我认为应该能行。


2
我的理解是,SendAsync 会在验证下一个块是否可以接受新项后立即返回。因此,在“内部”转换有机会再次增加 fetch_busy 之前,await fetch.SendAsync 可能已经完成(随后减少 fetch_busy)。在此期间,如果 fetch_busyfetch.InputCount 都恰好为零,则您的继续可能会将 fetch 块标记为已完成。如果正在飞行的内部 Fetch 任务然后生成 1000 个项目并尝试另一个 SendAsync,它将悄悄地失败。 - Kirill Shlenskiy
这显然是一个相当牵强但不难想象的情况,因此如果await fetch.SendAsync返回false,也许你应该抛出异常。还要记住:使用异步lambda作为参数的ContinueWith会返回一个Task<Task>(如果你决定对结果进行任何操作,这可能会导致意外)。 - Kirill Shlenskiy
@KirillShlenskiy,谢谢你的回复,我会进行调查。我已经尝试通过在ContinueWith中检查fetch.InputCount > 0来缓解这个问题。你是说await fetch.SendAsync可能会在新排队的请求显示在fetch.InputCount之前返回吗? - ajk
不,那是不太可能的。但是,在内部获取要处理的项目后(从而减少InputCount-潜在地降为零),可以立即进行fetch.InputCount/fetch_busy检查,但在执行任何工作之前(即增加fetch_busy)之前。 - Kirill Shlenskiy

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接