为什么块按此顺序运行?

5

这是一个简短的代码示例,快速介绍我的问题所涉及的内容:

using System;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

namespace DataflowTest
{
    class Program
    {
        static void Main(string[] args)
        {
            var firstBlock = new TransformBlock<int, int>(x => x, new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = 4
            });
            var secondBlock = new TransformBlock<int,string>(async x =>
            {
                if (x == 12)
                {
                    await Task.Delay(5000);
                    return $"{DateTime.Now}: Message is {x} (This is delayed message!) ";
                }

                return $"{DateTime.Now}: Message is {x}";
            }, new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = 4
            });
            var thirdBlock = new ActionBlock<string>(s => Console.WriteLine(s), new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = 4
            });

            firstBlock.LinkTo(secondBlock);
            secondBlock.LinkTo(thirdBlock);

            var populateTask = Task.Run(async () =>
            {
                foreach (var x in Enumerable.Range(1, 15))
                {
                    await firstBlock.SendAsync(x);
                }
            });

            populateTask.Wait();
            secondBlock.Completion.Wait();
        }
    }
}

输出为:
09.08.2016 15:03:08: Message is 1
09.08.2016 15:03:08: Message is 5
09.08.2016 15:03:08: Message is 6
09.08.2016 15:03:08: Message is 7
09.08.2016 15:03:08: Message is 8
09.08.2016 15:03:08: Message is 9
09.08.2016 15:03:08: Message is 10
09.08.2016 15:03:08: Message is 11
09.08.2016 15:03:08: Message is 3
09.08.2016 15:03:08: Message is 2
09.08.2016 15:03:08: Message is 4
09.08.2016 15:03:13: Message is 12 (This is delayed message!)
09.08.2016 15:03:08: Message is 15
09.08.2016 15:03:08: Message is 13
09.08.2016 15:03:08: Message is 14

为什么会出现这个顺序,我该如何更改网络以获得下面的输出?
09.08.2016 15:03:08: Message is 1
09.08.2016 15:03:08: Message is 5
09.08.2016 15:03:08: Message is 6
09.08.2016 15:03:08: Message is 7
09.08.2016 15:03:08: Message is 8
09.08.2016 15:03:08: Message is 9
09.08.2016 15:03:08: Message is 10
09.08.2016 15:03:08: Message is 11
09.08.2016 15:03:08: Message is 3
09.08.2016 15:03:08: Message is 2
09.08.2016 15:03:08: Message is 4
09.08.2016 15:03:08: Message is 15
09.08.2016 15:03:08: Message is 13
09.08.2016 15:03:08: Message is 14
09.08.2016 15:03:13: Message is 12 (This is delayed message!)

所以我想知道为什么所有其他块(或任务)都需要等待延迟块?

更新

由于大家要求我更详细地解释我的问题,我制作了这个更接近我正在工作的真实流程的示例。假设应用程序下载一些数据,并根据返回的响应计算哈希值。

using System;
using System.Diagnostics;
using System.Linq;
using System.Net.Http;
using System.Security.Cryptography;
using System.Text;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

namespace DataflowTest
{
    class Program
    {
        static void Main(string[] args)
        {
            var firstBlock = new TransformBlock<int, string>(x => x.ToString(), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });

            var secondBlock = new TransformBlock<string, Tuple<string, string>>(async x =>
            {
                using (var httpClient = new HttpClient())
                {
                    if (x == "4") await Task.Delay(5000);

                    var result = await httpClient.GetStringAsync($"http://scooterlabs.com/echo/{x}");
                    return new Tuple<string, string>(x, result);
                }
            }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });

            var thirdBlock = new TransformBlock<Tuple<string, string>, Tuple<string, byte[]>>(x =>
             {
                 using (var algorithm = SHA256.Create())
                 {
                     var bytes = Encoding.UTF8.GetBytes(x.Item2);
                     var hash = algorithm.ComputeHash(bytes);

                     return new Tuple<string, byte[]>(x.Item1, hash);
                 }
             }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });

            var fourthBlock = new ActionBlock<Tuple<string, byte[]>>(x =>
            {
                var output = $"{DateTime.Now}: Hash for element #{x.Item1}: {GetHashAsString(x.Item2)}";

                Console.WriteLine(output);
            }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });

            firstBlock.LinkTo(secondBlock);
            secondBlock.LinkTo(thirdBlock);
            thirdBlock.LinkTo(fourthBlock);

            var populateTasks = Enumerable.Range(1, 10).Select(x => firstBlock.SendAsync(x));
            Task.WhenAll(populateTasks).ContinueWith(x => firstBlock.Complete()).Wait();

            fourthBlock.Completion.Wait();
        }

        private static string GetHashAsString(byte[] bytes)
        {
            var sb = new StringBuilder();
            int i;
            for (i = 0; i < bytes.Length; i++)
            {
                sb.AppendFormat("{0:X2}", bytes[i]);
                if (i % 4 == 3) sb.Append(" ");
            }

            return sb.ToString();
        }
    }
}

让我们来看一下请求的顺序:

requests

这绝对是有道理的。所有请求都尽可能快地完成。缓慢的第四个请求在列表末尾。

现在让我们看看我们有什么输出:

09.08.2016 20:44:53: Hash for element #3: 4D0AB933 EE521204 CA784F3E 248EC698 F9E4D5F3 8F23A78F 3A00E069 29E73E32
09.08.2016 20:44:53: Hash for element #2: 4D0AB933 EE521204 CA784F3E 248EC698 F9E4D5F3 8F23A78F 3A00E069 29E73E32
09.08.2016 20:44:53: Hash for element #1: 4D0AB933 EE521204 CA784F3E 248EC698 F9E4D5F3 8F23A78F 3A00E069 29E73E32
09.08.2016 20:44:58: Hash for element #6: FC86E4F8 A83036BA 365BC7EE F9371778 59A11186 ED12A43C 3885D686 5004E6B3
09.08.2016 20:44:58: Hash for element #8: FC86E4F8 A83036BA 365BC7EE F9371778 59A11186 ED12A43C 3885D686 5004E6B3
09.08.2016 20:44:58: Hash for element #9: FC86E4F8 A83036BA 365BC7EE F9371778 59A11186 ED12A43C 3885D686 5004E6B3
09.08.2016 20:44:58: Hash for element #10: FC86E4F8 A83036BA 365BC7EE F9371778 59A11186 ED12A43C 3885D686 5004E6B3
09.08.2016 20:44:58: Hash for element #4: 44A63CBF 8E27D0DD AFE5A761 AADA4E49 AA52FE8E E3D7DC82 AFEAAF1D 72A9BC7F
09.08.2016 20:44:58: Hash for element #5: FC86E4F8 A83036BA 365BC7EE F9371778 59A11186 ED12A43C 3885D686 5004E6B3
09.08.2016 20:44:58: Hash for element #7: FC86E4F8 A83036BA 365BC7EE F9371778 59A11186 ED12A43C 3885D686 5004E6B3

你可以看到,在第四个响应之后,所有的哈希值都被计算了。
因此,基于这两个事实,我们可以说所有下载的页面都在等待第四个请求完成。最好不要等待第四个请求,而是在数据下载完毕后立即计算哈希值。我有什么办法可以做到这一点吗?

请具体描述您的问题。 - progyammer
2
在你的列表中,时间戳不匹配 - 这表明这是虚假数据...但是,让我们假装时间戳都是有意义的,并且在你的5秒延迟之后出现了3条消息 - 这表明在延迟消息出现之前,有更多的消息无法处理...所以看起来一切都正常。 - BugFinder
@progy_rock 我的问题是在延迟项未处理之前,所有处理都会停止。在我的另一个网络中,有一个使用 HttpClient 并从 Web 服务器获取数据的块,在某些情况下,它会停止整个网络等待某个页面完成。 - kseen
2
就我看来,你的延迟消息正在阻塞一个线程的队列,所以你只剩下了3个线程。虽然我没有测试过,但是与其中带有await的foreach相比,你不会用enumerable.Range(1,15)中的foreach更好吗?在这种情况下,你可以使用Task.WhenAll(numlist.Select(i => Firstblock.SendAsync(i)))。 - BugFinder
@BugFinder 那并没有帮助。请查看我的问题更新。 - kseen
显示剩余2条评论
3个回答

5

好的,从@SirRufo的参考开始,我开始考虑实现自己的TransformBlock以满足我的需求,处理传入的项目时无需考虑顺序。这样它就不会破坏网络,在下载的某个部分建立块之间的间隔,并且是优雅的方法。

所以我开始看看我可以做什么以及如何做到。查看TransformBlock本身的源代码似乎是一个很好的起点,所以我在Github上打开了TransformBlock源代码并开始分析。

就在类的开头,我发现了这个有趣的东西: // 如果采用并行处理,则需要支持重新排列按顺序完成的消息。

// However, a developer can override this with EnsureOrdered == false.
if (dataflowBlockOptions.SupportsParallelExecution && dataflowBlockOptions.EnsureOrdered)
{
    _reorderingBuffer = new ReorderingBuffer<TOutput>(this, (owningSource, message) => ((TransformBlock<TInput, TOutput>)owningSource)._source.AddMessage(message));
}

看起来就是我们想要的东西!让我们来看一下GithubDataflowBlockOptions 类中的 EnsureOrdered 选项:

/// <summary>Gets or sets whether ordered processing should be enforced on a block's handling of messages.</summary>
/// <remarks>
/// By default, dataflow blocks enforce ordering on the processing of messages. This means that a
/// block like <see cref="TransformBlock{TInput, TOutput}"/> will ensure that messages are output in the same
/// order they were input, even if parallelism is employed by the block and the processing of a message N finishes 
/// after the processing of a subsequent message N+1 (the block will reorder the results to maintain the input
/// ordering prior to making those results available to a consumer).  Some blocks may allow this to be relaxed,
/// however.  Setting <see cref="EnsureOrdered"/> to false tells a block that it may relax this ordering if
/// it's able to do so.  This can be beneficial if the immediacy of a processed result being made available
/// is more important than the input-to-output ordering being maintained.
/// </remarks>
public bool EnsureOrdered
{
    get { return _ensureOrdered; }
    set { _ensureOrdered = value; }
}

看起来非常不错,所以我立即切换到IDE进行设置。不幸的是,没有这样的设置:

No EnsureOrdered

我继续搜索并找到了这个注释

4.5.25-beta-23019

包已被重命名为System.Threading.Tasks.Dataflow

当我谷歌搜索时,发现了名为 System.Threading.Tasks.Dataflow!于是我卸载了 Microsoft.Tpl.Dataflow 包,并通过输入以下命令安装了 System.Threading.Tasks.Dataflow

Install-Package System.Threading.Tasks.Dataflow

还有一个EnsureOrdered选项。我更新了代码,并将EnsureOrdered设置为false

using System;
using System.Diagnostics;
using System.Linq;
using System.Net.Http;
using System.Security.Cryptography;
using System.Text;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

namespace DataflowTest
{
    class Program
    {
        static void Main(string[] args)
        {
            var options = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4, EnsureOrdered = false };
            var firstBlock = new TransformBlock<int, string>(x => x.ToString(), options);

            var secondBlock = new TransformBlock<string, Tuple<string, string>>(async x =>
            {
                using (var httpClient = new HttpClient())
                {
                    if (x == "4") await Task.Delay(5000);

                    var result = await httpClient.GetStringAsync($"http://scooterlabs.com/echo/{x}");
                    return new Tuple<string, string>(x, result);
                }
            }, options);

            var thirdBlock = new TransformBlock<Tuple<string, string>, Tuple<string, byte[]>>(x =>
             {
                 using (var algorithm = SHA256.Create())
                 {
                     var bytes = Encoding.UTF8.GetBytes(x.Item2);
                     var hash = algorithm.ComputeHash(bytes);

                     return new Tuple<string, byte[]>(x.Item1, hash);
                 }
             }, options);

            var fourthBlock = new ActionBlock<Tuple<string, byte[]>>(x =>
            {
                var output = $"{DateTime.Now}: Hash for element #{x.Item1}: {GetHashAsString(x.Item2)}";

                Console.WriteLine(output);
            }, options);

            firstBlock.LinkTo(secondBlock);
            secondBlock.LinkTo(thirdBlock);
            thirdBlock.LinkTo(fourthBlock);

            var populateTasks = Enumerable.Range(1, 10).Select(x => firstBlock.SendAsync(x));
            Task.WhenAll(populateTasks).ContinueWith(x => firstBlock.Complete()).Wait();

            fourthBlock.Completion.Wait();
        }

        private static string GetHashAsString(byte[] bytes)
        {
            var sb = new StringBuilder();
            int i;
            for (i = 0; i < bytes.Length; i++)
            {
                sb.AppendFormat("{0:X2}", bytes[i]);
                if (i % 4 == 3) sb.Append(" ");
            }

            return sb.ToString();
        }
    }
}

而且输出的结果正是我想要的:

10.08.2016 11:03:23: Hash for element #3: 8BA8A86D F25E058E 180F7AA9 1EE996B0 8D721C84 AEE8AA19 0A3F7C44 9FFED481
10.08.2016 11:03:23: Hash for element #1: 8BA8A86D F25E058E 180F7AA9 1EE996B0 8D721C84 AEE8AA19 0A3F7C44 9FFED481
10.08.2016 11:03:23: Hash for element #2: 8BA8A86D F25E058E 180F7AA9 1EE996B0 8D721C84 AEE8AA19 0A3F7C44 9FFED481
10.08.2016 11:03:23: Hash for element #10: C8C87B26 B17A6329 3F6213CD 4F9AFE0D 4F90127B AAE49EB0 7D8C15DF 74F020B1
10.08.2016 11:03:23: Hash for element #8: C8C87B26 B17A6329 3F6213CD 4F9AFE0D 4F90127B AAE49EB0 7D8C15DF 74F020B1
10.08.2016 11:03:23: Hash for element #9: C8C87B26 B17A6329 3F6213CD 4F9AFE0D 4F90127B AAE49EB0 7D8C15DF 74F020B1
10.08.2016 11:03:23: Hash for element #5: C8C87B26 B17A6329 3F6213CD 4F9AFE0D 4F90127B AAE49EB0 7D8C15DF 74F020B1
10.08.2016 11:03:23: Hash for element #7: C8C87B26 B17A6329 3F6213CD 4F9AFE0D 4F90127B AAE49EB0 7D8C15DF 74F020B1
10.08.2016 11:03:23: Hash for element #6: C8C87B26 B17A6329 3F6213CD 4F9AFE0D 4F90127B AAE49EB0 7D8C15DF 74F020B1
10.08.2016 11:03:27: Hash for element #4: FD25E52B FCD8DE81 BD38E11B 13C20B96 09473283 F25346B2 04593B70 E4357BDA

3

这是有意设计的,且已经记录

因为每个预定义的源数据流块类型都保证按照接收顺序传播消息,...

证明:

var ts = Environment.TickCount;

var firstBlock = new TransformBlock<int, int>(
    x => x,
    new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = 4,
    } );

var secondBlock = new TransformBlock<int, string>(
    x =>
    {
        var start = Environment.TickCount;

        if ( x == 3 )
        {
            Thread.Sleep( 5000 );
            return $"Start {start-ts} Finished {Environment.TickCount - ts}: Message is {x} (This is delayed message!) ";
        }

        return $"Start {start - ts} Finished {Environment.TickCount - ts}: Message is {x}";
    },
    new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = 4,
        // limit the internal queue to 10 items
        BoundedCapacity = 10,
    } );

var thirdBlock = new ActionBlock<string>(
    s =>
    {
        Console.WriteLine( s );
    },
    new ExecutionDataflowBlockOptions
    {
        // limit to a single task to watch the order
        MaxDegreeOfParallelism = 1,
    } );

firstBlock.LinkTo( secondBlock, new DataflowLinkOptions { PropagateCompletion = true, } );
secondBlock.LinkTo( thirdBlock, new DataflowLinkOptions { PropagateCompletion = true, } );

foreach ( var x in Enumerable.Range( 1, 15 ) )
{
    // to ensure order of items
    firstBlock.Post( x );
}

firstBlock.Complete();
thirdBlock.Completion.Wait();

输出:

开始 31 结束 31: 消息为 1
开始 31 结束 31: 消息为 2
开始 31 结束 5031: 消息为 3 (这是延迟的消息!)
开始 31 结束 31: 消息为 4
开始 31 结束 31: 消息为 5
开始 31 结束 31: 消息为 6
开始 31 结束 31: 消息为 7
开始 31 结束 31: 消息为 8
开始 31 结束 31: 消息为 9
开始 31 结束 31: 消息为 10
开始 31 结束 31: 消息为 11
开始 31 结束 31: 消息为 12
开始 5031 结束 5031: 消息为 13
开始 5031 结束 5031: 消息为 14
开始 5031 结束 5031: 消息为 15

Solution 1

不要使用DataFlow进行下载部分,因为顺序保证会阻塞您正在寻找的处理。

var ts = Environment.TickCount;

var thirdBlock = new ActionBlock<string>(
    s =>
    {
        Console.WriteLine( s );
    },
    new ExecutionDataflowBlockOptions
    {
        // limit to a single task to watch the order
        MaxDegreeOfParallelism = 4,
    } );

Parallel.ForEach(
    Enumerable.Range( 1, 15 ),
    new ParallelOptions { MaxDegreeOfParallelism = 4, },
    x =>
    {
        var start = Environment.TickCount;
        string result;

        if ( x == 12 )
        {
            Thread.Sleep( 5000 );
            result = $"Start {start - ts} Finished {Environment.TickCount - ts}: Message is {x} (This is delayed message!) ";
        }
        else
            result = $"Start {start - ts} Finished {Environment.TickCount - ts}: Message is {x}";
        thirdBlock.Post( result );
    } );

thirdBlock.Complete();
thirdBlock.Completion.Wait();

输出结果如下:
开始32完成32:消息为2
开始32完成32:消息为6
开始32完成32:消息为5
开始32完成32:消息为8
开始32完成32:消息为9
开始32完成32:消息为10
开始32完成32:消息为11
开始32完成32:消息为7
开始32完成32:消息为13
开始32完成32:消息为14
开始32完成32:消息为15
开始32完成32:消息为3
开始32完成32:消息为4
开始32完成32:消息为1
开始32完成5032:消息为12(这是延迟的消息!)

解决方案2

当然,您可以在自定义类中实现IPropagatorBlock<TInput,TOutput>,但不能保证项目的顺序。


无法更改基本特征(保持顺序)。 - Sir Rufo
你有读文档吗?这里有一个关于顺序的保证,而你想绕过这个保证。int类型保证包含一个int值。如果你想存储一个字符串值,请使用不同于int的类型。没有任何解决方法。 - Sir Rufo
但是也许你想使用Parallel.For来下载并将数据发布到DataFlow中。 - Sir Rufo
请查看我的问题更新。我希望我已经让它更加清晰了一些。 - kseen
3
问题已经被清楚描述,我们不需要关于问题本身以及由此引起的更多和更深入的信息。解决方案是:不要使用DataFlow来进行下载部分。 - Sir Rufo
显示剩余6条评论

1

从时间戳来看,第二个代码块的输出按照您的预期工作-延迟的TransformBlock在所有其他TransformBlock之后运行。似乎是ActionBlock中的Console.WriteLine没有按照您的期望顺序调用。

您的代码secondBlock.Completion.Wait();是否不正确-应该是thirdBlock.Completion.Wait();才能获得您期望的结果?


请不要关注这些时间戳。期望的输出在问题的结尾有所说明。我该如何实现这个?我尝试等待第三个块完成,但这种方式行不通。 - kseen
时间戳告诉您重要的事情 - 问题在于第三个块中的ActionBlocks执行顺序 - TransformBlocks按您预期的顺序执行,这可以从时间戳中看出。虽然Console.WriteLine是线程安全的,但这并不意味着它处理传入请求的顺序是可预测的。问题在于ActionBlock,而ActionBlock依赖于Console.WriteLine。我会消除Console并写入一个线程安全的集合+使用秒表记录ActionBlocks实际被调用的顺序。 - PhillipH
我有另外一个网络,使用httpclients获取页面而不是控制台写入,并且它的工作方式与此处相同:在延迟请求完成之前,不会进行进一步的请求。在这个延迟请求完成后(带有错误),一切都运行良好:请求正在并发运行。 - kseen
请查看我的问题更新。我发布了一些解释性的代码。 - kseen
我无法回答框架为什么会这样工作的问题。你还问:“有没有办法实现这个?”显然,一个更简单的任务集,每个任务都是完整的 - 即获取字符串、计算哈希值并输出结果,将更容易理解,并避免任何框架的奇怪之处。这些单独的步骤可以使用Await/Asynch编程技术,但可以避免您看到的链接问题。 - PhillipH
事实上,我真正拥有的Dataflow网格大约有50个块,并且在网格的某个特定部分,我遇到了这个问题。我只是为了简单起见而为这个问题制作了应用程序。相信我,Dataflow方式非常适合我正在开发的真实应用程序。没有办法将其重写为纯异步/等待。此外,该问题已得到解决,我已发布了带有解决方案的答案。 - kseen

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接