这是一个简短的代码示例,快速介绍我的问题所涉及的内容:
using System;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
namespace DataflowTest
{
class Program
{
static void Main(string[] args)
{
var firstBlock = new TransformBlock<int, int>(x => x, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 4
});
var secondBlock = new TransformBlock<int,string>(async x =>
{
if (x == 12)
{
await Task.Delay(5000);
return $"{DateTime.Now}: Message is {x} (This is delayed message!) ";
}
return $"{DateTime.Now}: Message is {x}";
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 4
});
var thirdBlock = new ActionBlock<string>(s => Console.WriteLine(s), new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 4
});
firstBlock.LinkTo(secondBlock);
secondBlock.LinkTo(thirdBlock);
var populateTask = Task.Run(async () =>
{
foreach (var x in Enumerable.Range(1, 15))
{
await firstBlock.SendAsync(x);
}
});
populateTask.Wait();
secondBlock.Completion.Wait();
}
}
}
输出为:
09.08.2016 15:03:08: Message is 1
09.08.2016 15:03:08: Message is 5
09.08.2016 15:03:08: Message is 6
09.08.2016 15:03:08: Message is 7
09.08.2016 15:03:08: Message is 8
09.08.2016 15:03:08: Message is 9
09.08.2016 15:03:08: Message is 10
09.08.2016 15:03:08: Message is 11
09.08.2016 15:03:08: Message is 3
09.08.2016 15:03:08: Message is 2
09.08.2016 15:03:08: Message is 4
09.08.2016 15:03:13: Message is 12 (This is delayed message!)
09.08.2016 15:03:08: Message is 15
09.08.2016 15:03:08: Message is 13
09.08.2016 15:03:08: Message is 14
为什么会出现这个顺序,我该如何更改网络以获得下面的输出?
09.08.2016 15:03:08: Message is 1
09.08.2016 15:03:08: Message is 5
09.08.2016 15:03:08: Message is 6
09.08.2016 15:03:08: Message is 7
09.08.2016 15:03:08: Message is 8
09.08.2016 15:03:08: Message is 9
09.08.2016 15:03:08: Message is 10
09.08.2016 15:03:08: Message is 11
09.08.2016 15:03:08: Message is 3
09.08.2016 15:03:08: Message is 2
09.08.2016 15:03:08: Message is 4
09.08.2016 15:03:08: Message is 15
09.08.2016 15:03:08: Message is 13
09.08.2016 15:03:08: Message is 14
09.08.2016 15:03:13: Message is 12 (This is delayed message!)
所以我想知道为什么所有其他块(或任务)都需要等待延迟块?
更新
由于大家要求我更详细地解释我的问题,我制作了这个更接近我正在工作的真实流程的示例。假设应用程序下载一些数据,并根据返回的响应计算哈希值。
using System;
using System.Diagnostics;
using System.Linq;
using System.Net.Http;
using System.Security.Cryptography;
using System.Text;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
namespace DataflowTest
{
class Program
{
static void Main(string[] args)
{
var firstBlock = new TransformBlock<int, string>(x => x.ToString(), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });
var secondBlock = new TransformBlock<string, Tuple<string, string>>(async x =>
{
using (var httpClient = new HttpClient())
{
if (x == "4") await Task.Delay(5000);
var result = await httpClient.GetStringAsync($"http://scooterlabs.com/echo/{x}");
return new Tuple<string, string>(x, result);
}
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });
var thirdBlock = new TransformBlock<Tuple<string, string>, Tuple<string, byte[]>>(x =>
{
using (var algorithm = SHA256.Create())
{
var bytes = Encoding.UTF8.GetBytes(x.Item2);
var hash = algorithm.ComputeHash(bytes);
return new Tuple<string, byte[]>(x.Item1, hash);
}
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });
var fourthBlock = new ActionBlock<Tuple<string, byte[]>>(x =>
{
var output = $"{DateTime.Now}: Hash for element #{x.Item1}: {GetHashAsString(x.Item2)}";
Console.WriteLine(output);
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });
firstBlock.LinkTo(secondBlock);
secondBlock.LinkTo(thirdBlock);
thirdBlock.LinkTo(fourthBlock);
var populateTasks = Enumerable.Range(1, 10).Select(x => firstBlock.SendAsync(x));
Task.WhenAll(populateTasks).ContinueWith(x => firstBlock.Complete()).Wait();
fourthBlock.Completion.Wait();
}
private static string GetHashAsString(byte[] bytes)
{
var sb = new StringBuilder();
int i;
for (i = 0; i < bytes.Length; i++)
{
sb.AppendFormat("{0:X2}", bytes[i]);
if (i % 4 == 3) sb.Append(" ");
}
return sb.ToString();
}
}
}
让我们来看一下请求的顺序:
这绝对是有道理的。所有请求都尽可能快地完成。缓慢的第四个请求在列表末尾。
现在让我们看看我们有什么输出:
09.08.2016 20:44:53: Hash for element #3: 4D0AB933 EE521204 CA784F3E 248EC698 F9E4D5F3 8F23A78F 3A00E069 29E73E32
09.08.2016 20:44:53: Hash for element #2: 4D0AB933 EE521204 CA784F3E 248EC698 F9E4D5F3 8F23A78F 3A00E069 29E73E32
09.08.2016 20:44:53: Hash for element #1: 4D0AB933 EE521204 CA784F3E 248EC698 F9E4D5F3 8F23A78F 3A00E069 29E73E32
09.08.2016 20:44:58: Hash for element #6: FC86E4F8 A83036BA 365BC7EE F9371778 59A11186 ED12A43C 3885D686 5004E6B3
09.08.2016 20:44:58: Hash for element #8: FC86E4F8 A83036BA 365BC7EE F9371778 59A11186 ED12A43C 3885D686 5004E6B3
09.08.2016 20:44:58: Hash for element #9: FC86E4F8 A83036BA 365BC7EE F9371778 59A11186 ED12A43C 3885D686 5004E6B3
09.08.2016 20:44:58: Hash for element #10: FC86E4F8 A83036BA 365BC7EE F9371778 59A11186 ED12A43C 3885D686 5004E6B3
09.08.2016 20:44:58: Hash for element #4: 44A63CBF 8E27D0DD AFE5A761 AADA4E49 AA52FE8E E3D7DC82 AFEAAF1D 72A9BC7F
09.08.2016 20:44:58: Hash for element #5: FC86E4F8 A83036BA 365BC7EE F9371778 59A11186 ED12A43C 3885D686 5004E6B3
09.08.2016 20:44:58: Hash for element #7: FC86E4F8 A83036BA 365BC7EE F9371778 59A11186 ED12A43C 3885D686 5004E6B3
你可以看到,在第四个响应之后,所有的哈希值都被计算了。
因此,基于这两个事实,我们可以说所有下载的页面都在等待第四个请求完成。最好不要等待第四个请求,而是在数据下载完毕后立即计算哈希值。我有什么办法可以做到这一点吗?
HttpClient
并从 Web 服务器获取数据的块,在某些情况下,它会停止整个网络等待某个页面完成。 - kseen