我有一个关于如何在循环TPL数据流中检测完成的问题。
我的数据流中有一个反馈循环,它向远程服务器发出GET
请求并处理数据响应(使用更多的数据流进行转换,然后提交结果)。
数据源将其结果拆分为每页1000条记录,并不告诉我有多少可用页面。我必须一直读取,直到获取到不足一页的数据。
通常页面数量为1,经常达到10,偶尔会有数千个页面。
一开始我有很多请求需要获取。
我希望能够使用线程池来处理这些请求,这一切都没问题,我可以排队多个数据请求并同时请求它们。如果我遇到需要获取大量页面的情况,我希望使用所有线程来处理。我不想留下一个线程在忙碌而其他线程已经完成的情况。
我遇到的问题是当我将这个逻辑放入数据流中时,例如:
//generate initial requests for activity
var request = new TransformManyBlock<int, DataRequest>(cmp => QueueRequests(cmp));
//fetch the initial requests and feedback more requests to our input buffer if we need to
TransformBlock<DataRequest, DataResponse> fetch = null;
fetch = new TransformBlock<DataRequest, DataResponse>(async req =>
{
var resp = await Fetch(req);
if (resp.Results.Count == 1000)
await fetch.SendAsync(QueueAnotherRequest(req));
return resp;
}
, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 10 });
//commit each type of request
var commit = new ActionBlock<DataResponse>(async resp => await Commit(resp));
request.LinkTo(fetch);
fetch.LinkTo(commit);
//when are we complete?
QueueRequests
生成了一个IEnumerable<DataRequest>
。我一次性排队了下一个N个页面的请求,接受这意味着我发送了比我需要的多一点的调用。 DataRequest实例共享一个LastPage计数器,以避免不必要地进行我们知道在最后一页之后的请求。这一切都很好。问题是: 如果像我在这个例子中展示的那样通过反馈更多的请求到fetch的输入缓冲区来循环,那么我就有一个关于如何信号化(甚至检测)完成的问题。 我不能从请求设置完成到fetch,因为一旦设置完成,我就不能再反馈任何内容了。
我可以监视fetch的输入和输出缓冲区是否为空,但我认为当我设置完成时,我会冒险fetch仍然忙于处理请求,从而阻止为其他页面排队请求。
我需要一些方法来知道fetch正在繁忙(具有输入或正忙于处理输入)。
我是否忽略了解决此问题的明显/简单方法?
- 我可以在fetch内部循环,而不是排队更多的请求。问题在于我想能够使用一组最大线程数来限制我对远程服务器所做的事情。 块内部的并行循环能否与块本身共享调度程序,并通过调度程序控制结果线程计数? - 我可以为fetch创建一个自定义转换块来处理完成信号。 似乎对于这样一个简单的场景来说,需要很多工作。
非常感谢您提供的任何帮助!
foreach (var c in todolist) { request.Post(c); };
。然后,我可以调用request.Complete();
因为我不会再添加任何请求了。 - ajka.LinkTo(b, new DataflowLinkOptions { PropagateCompletion = true })
呢?然后调用request.Complete()
将会导致commit.Completion
在所有项目通过管道的所有阶段后自然地转换为已完成状态。 - Kirill Shlenskiy