TPL数据流 - 并行和异步处理,同时保持顺序

5

我创建了一个TPL数据流管道,其中包括3个TransformBlock和一个ActionBlock。

var loadXml = new TransformBlock<Job, Job>(job => { ... }); // I/O
var validateData = new TransformBlock<Job, Job>(job => { ... }); // Parsing&Validating&Calculations
var importJob = new TransformBlock<Job, Job>(job => { ... }); // Saving to database

var loadingFailed = new ActionBlock<Job>(job => CreateResponse(job));
var validationFailed = new ActionBlock<Job>(job => CreateResponse(job));
var reportImport = new ActionBlock<Job>(job => CreateResponse(job));

loadXml.LinkTo(validateData, job => job.ReturnCode == 100);
loadXml.LinkTo(loadingFailed);

validateData.LinkTo(importJob, Job => Job.ReturnCode == 100);
validateData.LinkTo(validationFailed);

importJob.LinkTo(reportImport);

每个块都会使用处理后的数据填充Job对象,因为我不仅需要数据本身,还需要一般信息,这些信息是我需要用于响应消息的。我基本上将路径添加到XML中,并获取一个Response对象,其中包含有关是否一切正常的信息。
如何实现同时读取两个或更多需要从HDD读取一段时间的文件,并保持它们来的顺序?如果file1花费更多时间,则file2需要等待file1完成,然后将数据传递给下一个块,然后也会开始并行和异步验证数据,但在此处也保持下一块的顺序?
现在看起来即使我调用SendAsync到headblock,它也会按顺序处理所有文件。
编辑:所以我为我的管道编写了一个小测试类。它有3个阶段。我想要实现的是第一个TransformBlock随着它们进入(从FileSystemWatcher发送异步)而持续读取文件,并按它们进入的顺序输出。这意味着如果File1是一个大文件,而File2+3进入,两者都将被读取,而File1仍在处理中,但File2+3将不得不等到它可以发送到第二个TransformBlock,因为File1仍在被读取。第二阶段应该也是这样工作的。另一方面,第三阶段需要将从File1生成的对象保存到数据库中,这可以并行和异步完成。然而,需要在处理文件2和文件3之前处理来自file1的对象。因此,整个文件内容需要按它们进入的顺序依次处理。我尝试通过将第三个TransformBlock限制为MaxDegreeOfParallelism和BoundedCapacity都设置为1来实现,但这似乎失败了,而且Console.WriteLine's没有真正保持顺序。
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using System.Xml;
using System.Linq;

namespace OrderProcessing
{
    public class Job
    {
        public string Path { get; set; }

        public XmlDocument Document { get; set; }

        public List<Object> BusinessObjects { get; set; }

        public int ReturnCode { get; set; }

        public int ID { get; set; }
    }

    public class Test
    {
        ITargetBlock<Job> pathBlock = null;

        CancellationTokenSource cancellationTokenSource;

        Random rnd = new Random();

        private bool ReadDocument(Job job)
        {
            Console.WriteLine($"ReadDocument {DateTime.Now.TimeOfDay} | Thread {Thread.CurrentThread.ManagedThreadId} is processing Job Id: {job.ID}");
            Task.Delay(rnd.Next(1000, 3000)).Wait();

            // Throw OperationCanceledException if cancellation is requested.
            cancellationTokenSource.Token.ThrowIfCancellationRequested();

            // Read the document
            job.Document = new XmlDocument();

            // Some checking
            return true;
        }

        private bool ValidateXml(Job job)
        {
            Console.WriteLine($"ValidateXml {DateTime.Now.TimeOfDay} | Thread {Thread.CurrentThread.ManagedThreadId} is processing Job Id: {job.ID}");
            Task.Delay(rnd.Next(1000, 3000)).Wait();

            // Throw OperationCanceledException if cancellation is requested.
            cancellationTokenSource.Token.ThrowIfCancellationRequested();

            // Check XML against XSD and perform remaining checks
            job.BusinessObjects = new List<object>();

            // Just for tests
            job.BusinessObjects.Add(new object());
            job.BusinessObjects.Add(new object());

            // Parse Xml and create business objects
            return true;
        }

        private bool ProcessJob(Job job)
        {
            Console.WriteLine($"ProcessJob {DateTime.Now.TimeOfDay} | Thread {Thread.CurrentThread.ManagedThreadId} is processing Job Id: {job.ID}");

            // Throw OperationCanceledException if cancellation is requested.
            cancellationTokenSource.Token.ThrowIfCancellationRequested();

            Parallel.ForEach(job.BusinessObjects, bO =>
            {
                ImportObject(bO);
            });


            // Import the job
            return true;
        }

        private object ImportObject(object o)
        {
            Task.Delay(rnd.Next(1000, 3000)).Wait();

            return new object();
        }

        private void CreateResponse(Job job)
        {
            if(job.ReturnCode == 100)
            {
                Console.WriteLine("ID {0} was successfully imported.", job.ID);

            }
            else
            {
                Console.WriteLine("ID {0} failed to import.", job.ID);
            }

            // Create response XML with returncodes
        }

        ITargetBlock<Job> CreateJobProcessingPipeline()
        {
            var loadXml = new TransformBlock<Job, Job>(job =>
            {
                try
                {
                    if(ReadDocument(job))
                    {
                        // For later error handling
                        job.ReturnCode = 100; // success
                    }
                    else
                    {
                        job.ReturnCode = 200;
                    }

                    return job;
                }
                catch(OperationCanceledException)
                {
                    job.ReturnCode = 300;
                    return job;
                }
            }, TransformBlockOptions());

            var validateXml = new TransformBlock<Job, Job>(job =>
            {
                try
                {
                    if(ValidateXml(job))
                    {
                        // For later error handling
                        job.ReturnCode = 100;
                    }
                    else
                    {
                        job.ReturnCode = 200;
                    }

                    return job;
                }
                catch(OperationCanceledException)
                {
                    job.ReturnCode = 300;
                    return job;
                }
            }, TransformBlockOptions());


            var importJob = new TransformBlock<Job, Job>(job =>
            {
                try
                {
                    if(ProcessJob(job))
                    {
                        // For later error handling
                        job.ReturnCode = 100; // success
                    }
                    else
                    {
                        job.ReturnCode = 200;
                    }

                    return job;
                }
                catch(OperationCanceledException)
                {
                    job.ReturnCode = 300;
                    return job;
                }
            }, ActionBlockOptions());

            var loadingFailed = new ActionBlock<Job>(job => CreateResponse(job), ActionBlockOptions());
            var validationFailed = new ActionBlock<Job>(job => CreateResponse(job), ActionBlockOptions());
            var reportImport = new ActionBlock<Job>(job => CreateResponse(job), ActionBlockOptions());

            //
            // Connect the pipeline
            //
            loadXml.LinkTo(validateXml, job => job.ReturnCode == 100);
            loadXml.LinkTo(loadingFailed);

            validateXml.LinkTo(importJob, Job => Job.ReturnCode == 100);
            validateXml.LinkTo(validationFailed);

            importJob.LinkTo(reportImport);

            // Return the head of the network.
            return loadXml;
        }

        public void Start()
        {
            cancellationTokenSource = new CancellationTokenSource();

            pathBlock = CreateJobProcessingPipeline();
        }

        public async void AddJob(string path, int id)
        {
            Job j = new Job();
            j.Path = path;
            j.ID = id;

            await pathBlock.SendAsync(j);
        }

        static ExecutionDataflowBlockOptions TransformBlockOptions()
        {
            return new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = 8,
                BoundedCapacity = 32
            };
        }

        private static ExecutionDataflowBlockOptions ActionBlockOptions()
        {
            return new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = 1,
                BoundedCapacity = 1
            };
        }

        public void Cancel()
        {
            if(cancellationTokenSource != null)
                cancellationTokenSource.Cancel();
        }
    }

    class Program
    {
        private static String InputXml = @"C:\XML\Part.xml";
        private static Test _Pipeline;

        static void Main(string[] args)
        {
            _Pipeline = new Test();
            _Pipeline.Start();


            var data = Enumerable.Range(1, 100);

            foreach(var d in data)
                _Pipeline.AddJob(InputXml, d);

            //Wait before closing the application so we can see the results.
            Console.ReadLine();
        }
    }
}
编辑2: 我将BoundedCapacity设置为无限后,我得到了按照发送到管道中的顺序的所有内容。所以之前并不是真的没有顺序,但我想消息可能被丢弃了?

如果我确保EnsureOrdered为真,并在最后一个TransformBlock中使用MaxDegreeOfParallelism为8,那么输出结果的一部分将不再按顺序排列。但这里需要按顺序,因为我要保存数据到数据库中,它需要按照输入顺序进行排序。当离开最后一个TransformBlock时,如果未按顺序排列,实际上并不重要,所以我想在这里不能保持并行吗?

ValidateXml 08:27:24.2855461 | Thread 21 is processing Job Id: 36
ValidateXml 08:27:24.2855461 | Thread 28 is processing Job Id: 37
+++ ProcessJob 08:27:24.2880490 | Thread 33 is processing Job Id: 9
ReadDocument 08:27:24.2855461 | Thread 6 is processing Job Id: 56
ValidateXml 08:27:25.2853094 | Thread 19 is processing Job Id: 38
ReadDocument 08:27:25.2853094 | Thread 13 is processing Job Id: 58
+++ ProcessJob 08:27:25.2868091 | Thread 34 is processing Job Id: 13
ReadDocument 08:27:25.2858087 | Thread 16 is processing Job Id: 59
+++ ProcessJob 08:27:25.2858087 | Thread 25 is processing Job Id: 10
+++ ProcessJob 08:27:25.2858087 | Thread 29 is processing Job Id: 12
ReadDocument 08:27:25.2853094 | Thread 11 is processing Job Id: 57
ReadDocument 08:27:25.2873097 | Thread 15 is processing Job Id: 60
ValidateXml 08:27:25.2853094 | Thread 22 is processing Job Id: 40
ValidateXml 08:27:25.2853094 | Thread 23 is processing Job Id: 39
+++ ProcessJob 08:27:25.2858087 | Thread 30 is processing Job Id: 11
ValidateXml 08:27:26.2865381 | Thread 21 is processing Job Id: 41
ReadDocument 08:27:26.2865381 | Thread 14 is processing Job Id: 61
ValidateXml 08:27:26.2865381 | Thread 20 is processing Job Id: 42
ValidateXml 08:27:26.2865381 | Thread 26 is processing Job Id: 43
ReadDocument 08:27:26.2865381 | Thread 17 is processing Job Id: 62
ReadDocument 08:27:26.2870374 | Thread 12 is processing Job Id: 63
+++ ProcessJob 08:27:26.2870374 | Thread 24 is processing Job Id: 14

编辑3: 使用@JSteward最新的代码后的输出结果。

ReadDocument 09:01:03.9363340 JobId: 1
ReadDocument 09:01:03.9368357 JobId: 5
ReadDocument 09:01:03.9373347 JobId: 6
ReadDocument 09:01:03.9368357 JobId: 8
ReadDocument 09:01:03.9363340 JobId: 4
ReadDocument 09:01:03.9373347 JobId: 3
ReadDocument 09:01:03.9373347 JobId: 7
ReadDocument 09:01:03.9368357 JobId: 2
ReadDocument 09:01:05.2037570 JobId: 9
ReadDocument 09:01:05.3108413 JobId: 10
ReadDocument 09:01:05.5678177 JobId: 11
ReadDocument 09:01:05.6308763 JobId: 12
ValidateXml 09:01:05.6338782 JobId: 1
ValidateXml 09:01:06.3754174 JobId: 2
ReadDocument 09:01:06.3764184 JobId: 13
ReadDocument 09:01:06.3764184 JobId: 14
ReadDocument 09:01:07.3756634 JobId: 15
ReadDocument 09:01:07.3756634 JobId: 18
ValidateXml 09:01:07.3756634 JobId: 3
ValidateXml 09:01:07.3756634 JobId: 4
ReadDocument 09:01:07.3756634 JobId: 17
ReadDocument 09:01:07.3756634 JobId: 16
ReadDocument 09:01:08.3753887 JobId: 19
ReadDocument 09:01:08.3753887 JobId: 20
ValidateXml 09:01:08.3753887 JobId: 5
ProcessJob 09:01:08.3763906 JobId: 1
ReadDocument 09:01:09.3744411 JobId: 21
ReadDocument 09:01:09.3749410 JobId: 24
ProcessJob 09:01:09.3749410 JobId: 2
ReadDocument 09:01:09.3749410 JobId: 22
ReadDocument 09:01:09.3749410 JobId: 23
ReadDocument 09:01:10.3752061 JobId: 25
ReadDocument 09:01:10.3752061 JobId: 27
ValidateXml 09:01:10.3752061 JobId: 6
ValidateXml 09:01:10.3752061 JobId: 7
ValidateXml 09:01:10.3752061 JobId: 8
ReadDocument 09:01:10.3752061 JobId: 26
ReadDocument 09:01:11.3759294 JobId: 29
ReadDocument 09:01:11.3759294 JobId: 28
ValidateXml 09:01:11.3764278 JobId: 10
ReadDocument 09:01:11.3759294 JobId: 31
ValidateXml 09:01:11.3759294 JobId: 9
ReadDocument 09:01:11.3759294 JobId: 30
ValidateXml 09:01:12.3751553 JobId: 11
ReadDocument 09:01:12.3751553 JobId: 33
ValidateXml 09:01:12.3751553 JobId: 12
ReadDocument 09:01:12.3751553 JobId: 34
ReadDocument 09:01:12.3751553 JobId: 32
ValidateXml 09:01:13.3753842 JobId: 13
ValidateXml 09:01:13.3753842 JobId: 14
ValidateXml 09:01:13.3753842 JobId: 16
ReadDocument 09:01:13.3753842 JobId: 35
ReadDocument 09:01:13.3753842 JobId: 36
ValidateXml 09:01:13.3753842 JobId: 15
ReadDocument 09:01:14.3756414 JobId: 37
ValidateXml 09:01:14.3756414 JobId: 19
ValidateXml 09:01:14.3756414 JobId: 18
ValidateXml 09:01:14.3756414 JobId: 17
ReadDocument 09:01:14.3756414 JobId: 40
ReadDocument 09:01:14.3756414 JobId: 38
ReadDocument 09:01:14.3756414 JobId: 39
ProcessJob 09:01:14.3761419 JobId: 3
SendToDataBase 09:01:14.3806453 JobId: 1
SendToDataBase 09:01:14.3821472 JobId: 2
ProcessJob 09:01:14.3821472 JobId: 4
ValidateXml 09:01:15.3763758 JobId: 20
ReadDocument 09:01:15.3763758 JobId: 42
ValidateXml 09:01:15.3763758 JobId: 21
ReadDocument 09:01:15.3773772 JobId: 43
ReadDocument 09:01:15.3763758 JobId: 41
ValidateXml 09:01:15.3768800 JobId: 22
ReadDocument 09:01:15.3773772 JobId: 44
ValidateXml 09:01:16.3761117 JobId: 23
ValidateXml 09:01:16.3761117 JobId: 26
ValidateXml 09:01:16.3761117 JobId: 24
ValidateXml 09:01:16.3761117 JobId: 25
ReadDocument 09:01:16.3761117 JobId: 45
ReadDocument 09:01:16.3761117 JobId: 46
ProcessJob 09:01:16.3761117 JobId: 5
ReadDocument 09:01:17.3758334 JobId: 47
ValidateXml 09:01:17.3763315 JobId: 28
ValidateXml 09:01:17.3763315 JobId: 27
ReadDocument 09:01:17.3763315 JobId: 49
ReadDocument 09:01:17.3763315 JobId: 48
ProcessJob 09:01:17.3763315 JobId: 6
ValidateXml 09:01:17.3763315 JobId: 29
ReadDocument 09:01:17.3763315 JobId: 50
ReadDocument 09:01:18.3755786 JobId: 51
ReadDocument 09:01:18.3755786 JobId: 52
<<<
ProcessJob 09:01:18.3770792 JobId: 10
ProcessJob 09:01:18.3770792 JobId: 9
ProcessJob 09:01:18.3755786 JobId: 7
>>>
ReadDocument 09:01:18.3755786 JobId: 53
ValidateXml 09:01:18.3755786 JobId: 32
ValidateXml 09:01:18.3755786 JobId: 31
ValidateXml 09:01:18.3755786 JobId: 30
ReadDocument 09:01:18.3760794 JobId: 54
ProcessJob 09:01:18.3755786 JobId: 8
ValidateXml 09:01:19.3753274 JobId: 34
ValidateXml 09:01:19.3753274 JobId: 33
ReadDocument 09:01:19.3758261 JobId: 56
ReadDocument 09:01:19.3758261 JobId: 55
ValidateXml 09:01:19.3758261 JobId: 35
ValidateXml 09:01:20.3752782 JobId: 36
ValidateXml 09:01:20.3752782 JobId: 37
ProcessJob 09:01:20.3757709 JobId: 11
ReadDocument 09:01:20.3752782 JobId: 57
ValidateXml 09:01:20.3752782 JobId: 38
ReadDocument 09:01:20.3757709 JobId: 58
ReadDocument 09:01:20.3757709 JobId: 59
ProcessJob 09:01:21.3757202 JobId: 12
ValidateXml 09:01:21.3757202 JobId: 39
ReadDocument 09:01:21.3757202 JobId: 62
ReadDocument 09:01:21.3757202 JobId: 61
ReadDocument 09:01:21.3757202 JobId: 60
ReadDocument 09:01:22.3764154 JobId: 63
ReadDocument 09:01:22.3764154 JobId: 64
ReadDocument 09:01:22.3764154 JobId: 65
ProcessJob 09:01:22.3794167 JobId: 16
ValidateXml 09:01:22.3764154 JobId: 40
ValidateXml 09:01:22.3764154 JobId: 42
ReadDocument 09:01:22.3764154 JobId: 66
ValidateXml 09:01:22.3774149 JobId: 43
ProcessJob 09:01:22.3764154 JobId: 13
ValidateXml 09:01:22.3764154 JobId: 41
ProcessJob 09:01:22.3779160 JobId: 15
SendToDataBase 09:01:22.3784159 JobId: 3
ProcessJob 09:01:22.3764154 JobId: 14
ValidateXml 09:01:22.3859209 JobId: 44
SendToDataBase 09:01:22.4309993 JobId: 4
SendToDataBase 09:01:22.4460051 JobId: 5
SendToDataBase 09:01:22.4465047 JobId: 6
ReadDocument 09:01:23.3760112 JobId: 67
ValidateXml 09:01:23.3760112 JobId: 46
ValidateXml 09:01:23.3760112 JobId: 47
ReadDocument 09:01:23.3760112 JobId: 68
ValidateXml 09:01:23.3760112 JobId: 45
ProcessJob 09:01:23.3760112 JobId: 17
ValidateXml 09:01:24.3762581 JobId: 48
ReadDocument 09:01:24.3762581 JobId: 69
ProcessJob 09:01:24.3762581 JobId: 18
ProcessJob 09:01:24.3762581 JobId: 19
ReadDocument 09:01:24.3762581 JobId: 70
CreateResponse 09:01:24.3777606 JobId: 58
CreateResponse 09:01:24.3994684 JobId: 59
CreateResponse 09:01:24.4059908 JobId: 60
CreateResponse 09:01:24.4114777 JobId: 61
CreateResponse 09:01:24.4134789 JobId: 62
ValidateXml 09:01:25.3759607 JobId: 49
ValidateXml 09:01:25.3759607 JobId: 51
ProcessJob 09:01:25.3784627 JobId: 22
ValidateXml 09:01:25.3759607 JobId: 52
ProcessJob 09:01:25.3759607 JobId: 20
ValidateXml 09:01:25.3774629 JobId: 53
ValidateXml 09:01:25.3759607 JobId: 50
ValidateXml 09:01:25.3774629 JobId: 54
ReadDocument 09:01:25.3759607 JobId: 72
ReadDocument 09:01:25.3774629 JobId: 73
ReadDocument 09:01:25.3759607 JobId: 71
ReadDocument 09:01:25.3779625 JobId: 74
ProcessJob 09:01:25.3759607 JobId: 21
SendToDataBase 09:01:25.3774629 JobId: 7
CreateResponse 09:01:25.3759607 JobId: 39
SendToDataBase 09:01:25.4398495 JobId: 8
SendToDataBase 09:01:25.4448555 JobId: 9
SendToDataBase 09:01:25.4478565 JobId: 10
SendToDataBase 09:01:25.4483570 JobId: 11
CreateResponse 09:01:25.4448555 JobId: 42
CreateResponse 09:01:25.4608868 JobId: 43
SendToDataBase 09:01:25.4553682 JobId: 12
CreateResponse 09:01:25.4613665 JobId: 44
CreateResponse 09:01:25.4698849 JobId: 45
ReadDocument 09:01:26.3754874 JobId: 75
ReadDocument 09:01:26.3754874 JobId: 76
ReadDocument 09:01:26.3754874 JobId: 78
ValidateXml 09:01:26.3754874 JobId: 55
ProcessJob 09:01:26.3759876 JobId: 24
ProcessJob 09:01:26.3754874 JobId: 23
ReadDocument 09:01:26.3754874 JobId: 77
SendToDataBase 09:01:26.3759876 JobId: 13
SendToDataBase 09:01:26.3980055 JobId: 14
SendToDataBase 09:01:26.3985045 JobId: 15
SendToDataBase 09:01:26.4020099 JobId: 16
ReadDocument 09:01:27.3762164 JobId: 79
ValidateXml 09:01:27.3762164 JobId: 56
ProcessJob 09:01:27.3762164 JobId: 26
ReadDocument 09:01:27.3762164 JobId: 82
ProcessJob 09:01:27.3762164 JobId: 25
ReadDocument 09:01:27.3762164 JobId: 81
ReadDocument 09:01:27.3762164 JobId: 80
ValidateXml 09:01:27.3762164 JobId: 63
ValidateXml 09:01:27.3777165 JobId: 64
ProcessJob 09:01:27.3767157 JobId: 27
ValidateXml 09:01:27.3762164 JobId: 57
SendToDataBase 09:01:27.3777165 JobId: 17
SendToDataBase 09:01:27.4327571 JobId: 18
SendToDataBase 09:01:27.4357587 JobId: 19
ReadDocument 09:01:28.3761410 JobId: 83
ProcessJob 09:01:28.3761410 JobId: 28
ProcessJob 09:01:28.3761410 JobId: 29
ValidateXml 09:01:28.3761410 JobId: 66
SendToDataBase 09:01:28.3761410 JobId: 20
ProcessJob 09:01:28.3761410 JobId: 30
ValidateXml 09:01:28.3761410 JobId: 67
ValidateXml 09:01:28.3761410 JobId: 65
SendToDataBase 09:01:28.3861483 JobId: 21
SendToDataBase 09:01:28.4141687 JobId: 22
ReadDocument 09:01:28.6079764 JobId: 84
ReadDocument 09:01:28.6552491 JobId: 85
ReadDocument 09:01:28.7047606 JobId: 86
ValidateXml 09:01:28.7327861 JobId: 68
ProcessJob 09:01:28.7327861 JobId: 31
ReadDocument 09:01:29.1285484 JobId: 87
ProcessJob 09:01:29.1894672 JobId: 32
SendToDataBase 09:01:29.1894672 JobId: 23
SendToDataBase 09:01:29.1944706 JobId: 24
ReadDocument 09:01:29.3910070 JobId: 88
ValidateXml 09:01:29.5569691 JobId: 69
ReadDocument 09:01:29.5995036 JobId: 89
ValidateXml 09:01:29.6085095 JobId: 70
ReadDocument 09:01:29.6581266 JobId: 90
ValidateXml 09:01:29.8797899 JobId: 71
ValidateXml 09:01:30.1244519 JobId: 72
ValidateXml 09:01:30.1584763 JobId: 73
ReadDocument 09:01:30.2100312 JobId: 91
ProcessJob 09:01:30.2490536 JobId: 33
ProcessJob 09:01:30.2950865 JobId: 34
ReadDocument 09:01:30.3290995 JobId: 92
ProcessJob 09:01:30.3636350 JobId: 35
SendToDataBase 09:01:30.3636350 JobId: 25
SendToDataBase 09:01:30.3701300 JobId: 26
SendToDataBase 09:01:30.3706299 JobId: 27
ProcessJob 09:01:30.4987430 JobId: 36
ReadDocument 09:01:30.5642707 JobId: 93
ReadDocument 09:01:30.6088035 JobId: 94
ValidateXml 09:01:30.7213868 JobId: 74
ReadDocument 09:01:30.7544106 JobId: 95
ProcessJob 09:01:30.7544106 JobId: 37
SendToDataBase 09:01:30.7544106 JobId: 28
ProcessJob 09:01:31.1091681 JobId: 38
SendToDataBase 09:01:31.1091681 JobId: 29
SendToDataBase 09:01:31.1151730 JobId: 30
ValidateXml 09:01:31.2012468 JobId: 75
ValidateXml 09:01:31.2827940 JobId: 76
ValidateXml 09:01:31.3143168 JobId: 77
ValidateXml 09:01:31.4073842 JobId: 78
ReadDocument 09:01:31.4369059 JobId: 96
ReadDocument 09:01:31.4699302 JobId: 97
ProcessJob 09:01:31.7201123 JobId: 40
SendToDataBase 09:01:31.7201123 JobId: 31
ProcessJob 09:01:32.1569310 JobId: 41
SendToDataBase 09:01:32.1569310 JobId: 32
ValidateXml 09:01:32.3650822 JobId: 79
ValidateXml 09:01:32.3650822 JobId: 80
ProcessJob 09:01:32.3966047 JobId: 46
ReadDocument 09:01:32.4236247 JobId: 98
ReadDocument 09:01:32.4831869 JobId: 99
ValidateXml 09:01:32.5607342 JobId: 81
ReadDocument 09:01:32.5777363 JobId: 100
ProcessJob 09:01:33.1461630 JobId: 47
ProcessJob 09:01:33.2081967 JobId: 48
SendToDataBase 09:01:33.2081967 JobId: 33
SendToDataBase 09:01:33.2137015 JobId: 34
SendToDataBase 09:01:33.2172021 JobId: 35
ValidateXml 09:01:33.2347146 JobId: 82
ValidateXml 09:01:33.4228519 JobId: 83
ProcessJob 09:01:33.4228519 JobId: 49
ValidateXml 09:01:33.4373638 JobId: 84
ProcessJob 09:01:33.4878995 JobId: 50
SendToDataBase 09:01:33.4878995 JobId: 36
ProcessJob 09:01:33.5819674 JobId: 51
ValidateXml 09:01:33.6239992 JobId: 85
ProcessJob 09:01:33.6239992 JobId: 52
SendToDataBase 09:01:33.6239992 JobId: 37
SendToDataBase 09:01:33.6295082 JobId: 38
ValidateXml 09:01:33.6870563 JobId: 86
ValidateXml 09:01:33.7125626 JobId: 87
ProcessJob 09:01:34.1238635 JobId: 53
ProcessJob 09:01:34.5796949 JobId: 54
<<<
SendToDataBase 09:01:34.5796949 JobId: 40
SendToDataBase 09:01:34.5856995 JobId: 41
SendToDataBase 09:01:34.5887008 JobId: 46
>>>
ValidateXml 09:01:34.7951688 JobId: 88
ValidateXml 09:01:34.9162007 JobId: 89
ProcessJob 09:01:34.9541705 JobId: 55
ValidateXml 09:01:35.0464443 JobId: 90
ProcessJob 09:01:35.3634898 JobId: 56
ProcessJob 09:01:35.3795024 JobId: 57
ValidateXml 09:01:35.5165095 JobId: 91
ValidateXml 09:01:35.8614345 JobId: 92
ProcessJob 09:01:35.9985415 JobId: 63
ValidateXml 09:01:36.0481807 JobId: 93
ProcessJob 09:01:36.0763064 JobId: 64
ProcessJob 09:01:36.0993229 JobId: 65
SendToDataBase 09:01:36.0993229 JobId: 47
SendToDataBase 09:01:36.1048270 JobId: 48
ValidateXml 09:01:36.1572079 JobId: 94
ValidateXml 09:01:36.3791015 JobId: 95
ProcessJob 09:01:36.4212607 JobId: 66
SendToDataBase 09:01:36.4212607 JobId: 49
SendToDataBase 09:01:36.4267655 JobId: 50
SendToDataBase 09:01:36.4272654 JobId: 51
SendToDataBase 09:01:36.4322913 JobId: 52
SendToDataBase 09:01:36.4327837 JobId: 53
ProcessJob 09:01:36.5149796 JobId: 67
SendToDataBase 09:01:36.5149796 JobId: 54
ValidateXml 09:01:36.6861048 JobId: 96
ValidateXml 09:01:36.7845716 JobId: 97
ValidateXml 09:01:37.0175979 JobId: 98
ValidateXml 09:01:37.3788835 JobId: 99
ValidateXml 09:01:37.6477046 JobId: 100
ProcessJob 09:01:37.8269808 JobId: 68
SendToDataBase 09:01:37.8269808 JobId: 55
ProcessJob 09:01:37.8940108 JobId: 69
ProcessJob 09:01:38.2955556 JobId: 70
ProcessJob 09:01:38.3110583 JobId: 71
SendToDataBase 09:01:38.3110583 JobId: 56
SendToDataBase 09:01:38.3125586 JobId: 57
CreateResponse 09:01:38.4551538 JobId: 95
CreateResponse 09:01:38.4925304 JobId: 96
ProcessJob 09:01:38.5382532 JobId: 72
ProcessJob 09:01:38.9129894 JobId: 73
SendToDataBase 09:01:38.9129894 JobId: 63
SendToDataBase 09:01:38.9185062 JobId: 64
SendToDataBase 09:01:38.9189949 JobId: 65
ProcessJob 09:01:38.9852121 JobId: 74
ProcessJob 09:01:39.0317458 JobId: 75
SendToDataBase 09:01:39.0317458 JobId: 66
SendToDataBase 09:01:39.0377511 JobId: 67
ProcessJob 09:01:39.6129381 JobId: 76
SendToDataBase 09:01:39.6129381 JobId: 68
ProcessJob 09:01:39.7833004 JobId: 77
SendToDataBase 09:01:39.7833004 JobId: 69
ProcessJob 09:01:39.8740443 JobId: 78
ProcessJob 09:01:40.3145731 JobId: 79
SendToDataBase 09:01:40.3145731 JobId: 70
SendToDataBase 09:01:40.3205708 JobId: 71
ProcessJob 09:01:40.4912084 JobId: 80
ProcessJob 09:01:40.5307205 JobId: 81
SendToDataBase 09:01:40.5317212 JobId: 72
ProcessJob 09:01:40.5652454 JobId: 82
ProcessJob 09:01:41.2902736 JobId: 83
ProcessJob 09:01:41.2902736 JobId: 84
ProcessJob 09:01:41.3598244 JobId: 85
SendToDataBase 09:01:41.3598244 JobId: 73
SendToDataBase 09:01:41.3663284 JobId: 74
SendToDataBase 09:01:41.3713317 JobId: 75
SendToDataBase 09:01:41.3718392 JobId: 76
SendToDataBase 09:01:41.3723328 JobId: 77
ProcessJob 09:01:42.2677493 JobId: 86
SendToDataBase 09:01:42.2677493 JobId: 78
ProcessJob 09:01:42.6466081 JobId: 87
ProcessJob 09:01:42.8947969 JobId: 88
SendToDataBase 09:01:42.8947969 JobId: 79
ProcessJob 09:01:43.0012509 JobId: 89
ProcessJob 09:01:43.1513589 JobId: 90
ProcessJob 09:01:43.4545800 JobId: 91
SendToDataBase 09:01:43.4545800 JobId: 80
SendToDataBase 09:01:43.4600832 JobId: 81
SendToDataBase 09:01:43.4605919 JobId: 82
ProcessJob 09:01:43.5946813 JobId: 92
ProcessJob 09:01:44.1731027 JobId: 93
SendToDataBase 09:01:44.1731027 JobId: 83
SendToDataBase 09:01:44.1786068 JobId: 84
SendToDataBase 09:01:44.1816090 JobId: 85
ProcessJob 09:01:44.4678171 JobId: 94
SendToDataBase 09:01:44.4678171 JobId: 86
ProcessJob 09:01:45.3426043 JobId: 97
SendToDataBase 09:01:45.3426043 JobId: 87
ProcessJob 09:01:45.3751270 JobId: 98
ProcessJob 09:01:45.7363757 JobId: 99
ProcessJob 09:01:45.7809216 JobId: 100
SendToDataBase 09:01:45.7809216 JobId: 88
SendToDataBase 09:01:45.7879270 JobId: 89
SendToDataBase 09:01:45.7925566 JobId: 90
SendToDataBase 09:01:45.8776726 JobId: 91
SendToDataBase 09:01:45.8776726 JobId: 92
SendToDataBase 09:01:46.5813640 JobId: 93
SendToDataBase 09:01:46.5813640 JobId: 94
SendToDataBase 09:01:47.7407165 JobId: 97
SendToDataBase 09:01:47.7407165 JobId: 98
SendToDataBase 09:01:48.4382058 JobId: 99
SendToDataBase 09:01:48.7357557 JobId: 100
3个回答

5
如果您将TransformBlockActionBlock链接起来,就可以实现这一点。

最简单的方法是通过一个可编译的控制台应用程序进行演示。

此应用程序处理一个整数序列,但您也可以使用自定义工作单元类替换整数。

(我修改了此代码,从我编写的一个实用程序中,该实用程序使用相对较慢的LZMA压缩算法进行多线程文件压缩。该实用程序必须按顺序从文件中顺序读取输入数据,然后将其分块传递到队列中,该队列使用多个线程以任何顺序处理数据,最后将压缩块输出到一个队列中,该队列必须保留数据块的原始顺序。)

示例代码:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

namespace Demo
{
    class Program
    {
        public static void Main()
        {
            var data = Enumerable.Range(1, 100);
            var task = Process(data);

            Console.WriteLine("Waiting for task to complete");
            task.Wait();
            Console.WriteLine("Task complete.");
        }

        public static async Task Process(IEnumerable<int> data)
        {
            var queue = new TransformBlock<int, int>(block => process(block), transformBlockOptions());
            var writer = new ActionBlock<int>(block => write(block), actionBlockOptions());

            queue.LinkTo(writer, new DataflowLinkOptions { PropagateCompletion = true });

            await enqueDataToProcessAndAwaitCompletion(data, queue);

            await writer.Completion;
        }

        static int process(int block)
        {
            Console.WriteLine($"Thread {Thread.CurrentThread.ManagedThreadId} is processing block {block}");
            emulateWorkload();
            return -block;
        }

        static void write(int block)
        {
            Console.WriteLine("Output: " + block);
        }

        static async Task enqueDataToProcessAndAwaitCompletion(IEnumerable<int> data, TransformBlock<int, int> queue)
        {
            await enqueueDataToProcess(data, queue);
            queue.Complete();
        }

        static async Task enqueueDataToProcess(IEnumerable<int> data, ITargetBlock<int> queue)
        {
            foreach (var item in data)
                await queue.SendAsync(item);
        }


        static ExecutionDataflowBlockOptions transformBlockOptions()
        {
            return new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = 8,
                BoundedCapacity = 32
            };
        }

        private static ExecutionDataflowBlockOptions actionBlockOptions()
        {
            return new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = 1,
                BoundedCapacity = 1
            };
        }

        static Random rng = new Random();
        static object locker = new object();

        static void emulateWorkload()
        {
            int delay;

            lock (locker)
            {
                delay = rng.Next(250, 750);
            }

            Thread.Sleep(delay);
        }
    }
}

输出结果:
Waiting for task to complete
Thread 8 is processing block 8
Thread 5 is processing block 2
Thread 6 is processing block 6
Thread 4 is processing block 5
Thread 7 is processing block 7
Thread 10 is processing block 4
Thread 9 is processing block 1
Thread 3 is processing block 3
Thread 3 is processing block 9
Thread 8 is processing block 10
Thread 5 is processing block 11
Thread 6 is processing block 12
Thread 9 is processing block 13
Thread 10 is processing block 14
Thread 7 is processing block 15
Thread 8 is processing block 16
Thread 4 is processing block 17
Thread 5 is processing block 18
Thread 3 is processing block 19
Thread 9 is processing block 20
Thread 8 is processing block 21
Output: -1
Output: -2
Output: -3
Output: -4
Output: -5
Output: -6
Output: -7
Output: -8
Output: -9
Output: -10
Output: -11
Output: -12
Output: -13
Thread 6 is processing block 22
Thread 10 is processing block 23
Output: -14
Thread 7 is processing block 24
Output: -15
Output: -16
Thread 6 is processing block 25
Output: -17
Thread 4 is processing block 26
Thread 5 is processing block 27
----------------->SNIP<-----------------
Thread 10 is processing block 93
Thread 8 is processing block 94
Output: -83
Thread 4 is processing block 95
Output: -84
Output: -85
Output: -86
Output: -87
Thread 3 is processing block 96
Output: -88
Thread 6 is processing block 97
Thread 5 is processing block 98
Thread 10 is processing block 99
Thread 9 is processing block 100
Output: -89
Output: -90
Output: -91
Output: -92
Output: -93
Output: -94
Output: -95
Output: -96
Output: -97
Output: -98
Output: -99
Output: -100
Task complete.
Press any key to continue . . .

请注意,“块”是如何由多个线程以任意顺序处理的,但输出顺序与输入顺序相同。
非常重要的是根据actionBlockOptions()方法设置输出操作块选项,其中MaxDegreeOfParallelismBoundedCapacity都设置为1。
这就是导致输出按正确顺序串行化的原因。如果您将输出的BoundedCapacityMaxDegreeOfParallelism设置为大于1,则可能会以错误的顺序输出。

感谢@Matthew Watson。据我所知,我可以简单地使用ActionBlock输出所有已读取的文件(按它们进入的顺序),并将其发布到同一个BufferBlock/TransformBlock/ActionBlock组合中,您认为在链接这些块时是否存在问题? - Peter
@Peter 我认为那应该没问题。这与我的文件压缩器代码的工作方式类似。 - Matthew Watson

5

@Matthew Watson提供了很好的建议,我想补充一点,除非您使用Microsoft.Tpl.Dataflow软件包,否则无需将MaxDegreeOfParallelism和BoundedCapacity限制为1。更新且正确的System.Threading.Tasks.Dataflow添加了属性EnsureOrdered到执行块选项。虽然这似乎没有在MSDN上记录,但您可以在TPL Dataflow Source中找到此属性及其用法。

以下是演示此行为的示例和测试,将执行选项中的EnsureOrdered更改为false将导致测试失败。默认值为true,不需要显式设置以获得有序行为。

编辑: 正如@Matthew Watson在下面指出的那样,虽然EnsureOrdered可以保持传播器块之间的顺序,但一旦进入操作块,消息就可以以任何顺序处理。

编辑2:注意:如果ActionBlock的MaxDegreeOfParllelismBoundedCapacity设置为1,但EnsureOrdered为false,则测试将失败并且结果将无序。

[TestFixture]
public class TestRunner {

    [Test]
    public void TestPipeline() {
        var data = Enumerable.Range(0, 30).Select(x => new Message(x, x)).ToList();

        var target = new MyDataflow();
        target.PostData(data).Wait();

        Assert.IsTrue(data.SequenceEqual(target.OutputMessages));
    }
}

public class MyDataflow {

    private static Random rnd = new Random();

    private BufferBlock<Message> buffer;
    private TransformBlock<Message, Message> xForm1;
    private ActionBlock<Message> action;
    public IList<Message> OutputMessages { get; set; }

    public MyDataflow() {
        OutputMessages = new List<Message>();
        CreatePipeline();
        LinkPipeline();
    }

    public void CreatePipeline() {
        var options = new ExecutionDataflowBlockOptions() {
            BoundedCapacity = 2,
            MaxDegreeOfParallelism = 10,
            EnsureOrdered = true
        };

        buffer = new BufferBlock<Message>();

        xForm1 = new TransformBlock<Message, Message>(x => {
            Console.WriteLine($"{DateTime.Now.TimeOfDay} - Started Id: {x.Id}");
            Task.Delay(rnd.Next(1000, 3000)).Wait();
            Console.WriteLine($"{DateTime.Now.TimeOfDay} - Finished Id: {x.Id}");
            return x;
        }, options);

        action = new ActionBlock<Message>(x => {
            Console.WriteLine($"{DateTime.Now.TimeOfDay} - Output  Id: {x.Id} Value: {x.Value}");

            //this delay will cause the messages to be unordered
            Task.Delay(rnd.Next(1000, 3000)).Wait();

            OutputMessages.Add(x);
        }, options);
    }

    public void LinkPipeline() {
        var options = new DataflowLinkOptions() {
            PropagateCompletion = true
        };

        buffer.LinkTo(xForm1, options);
        xForm1.LinkTo(action, options);
    }

    public Task PostData(IEnumerable<Message> data) {

        foreach (var item in data) {
            buffer.Post(item);
        }
        buffer.Complete();
        return action.Completion;
    }
}

public class Message {
    public Message(int id, int value) {
        this.Id = id;
        this.Value = value;
    }
    public int Id { get; set; }
    public int Value { get; set; }
}
编辑:不幸的是,我们无法直接访问内部的ReorderingBuffer。因此,将有界容量和一致的MaxDegreeOfParallelismActionBlock替换为将TransformBlock的有序输出链接到流的一个备选方案。请注意上面代码中并行启用的ActionBlock的延迟导致结果被打乱,但在下面代码中,对流进行处理的延迟不会影响顺序。基本上提供了与同步ActionBlock相同的行为,并可以供应另一个网格等部分。
[TestFixture]
public class TestRunner {

    [Test]
    public void TestPipeline() {
        var data = Enumerable.Range(0, 30).Select(x => new Message(x, x)).ToList();

        var target = new MyDataflow();
        target.PostData(data).Wait();

        Assert.IsTrue(data.SequenceEqual(target.OutputMessages));
    }
}

public class MyDataflow {

    private static Random rnd = new Random();

    private BufferBlock<Message> buffer;
    private TransformBlock<Message, Message> xForm1;
    private IObservable<Message> output;
    private TaskCompletionSource<bool> areWeDoneYet;
    public IList<Message> OutputMessages { get; set; }

    public MyDataflow() {
        OutputMessages = new List<Message>();
        CreatePipeline();
        LinkPipeline();
    }

    public void CreatePipeline() {
        var options = new ExecutionDataflowBlockOptions() {
            BoundedCapacity = 13,
            MaxDegreeOfParallelism = 10,
            EnsureOrdered = true
        };

        buffer = new BufferBlock<Message>();

        xForm1 = new TransformBlock<Message, Message>(x => {
            Console.WriteLine($"{DateTime.Now.TimeOfDay} - Started Id: {x.Id}");
            Task.Delay(rnd.Next(1000, 3000)).Wait();
            Console.WriteLine($"{DateTime.Now.TimeOfDay} - Finished Id: {x.Id}");
            return x;
        }, options);

        output = xForm1.AsObservable<Message>();

        areWeDoneYet = new TaskCompletionSource<bool>();
    }

    public void LinkPipeline() {
        var options = new DataflowLinkOptions() {
            PropagateCompletion = true
        };

        buffer.LinkTo(xForm1, options);

        var subscription = output.Subscribe(msg => {
            Task.Delay(rnd.Next(1000, 3000)).Wait();
            OutputMessages.Add(msg);
        }, () => areWeDoneYet.SetResult(true));            
    }

    public Task<bool> PostData(IEnumerable<Message> data) {            
        foreach (var item in data) {
            buffer.Post(item);
        }
        buffer.Complete();
        return areWeDoneYet.Task;
    }
}

public class Message {
    public Message(int id, int value) {
        this.Id = id;
        this.Value = value;
    }
    public int Id { get; set; }
    public int Value { get; set; }
}
编辑2: 另外,我的管道应该有3个这样的阶段,我该如何链接它们?因此,当第一个块完成第一个文件时,它开始向下一个块输出数据,后者将再次并行和异步工作。

这不是由它们如何链接驱动的,而是由ExecutionDataflowBlockOptions决定的。使用下面显示的选项,第一个块将根据发布的文件数量和其给定的处理时间启动任务,随着它们完成,它们将被输出到下一个处理阶段或基于您的Job.ReturnCode谓词的故障处理ActionBlock,接下来的阶段也将遵循相同的流程。您还可以修改您的ActionBlock选项来处理来自TransformBlocks的多个成功/失败情况。

var options = new ExecutionDataflowBlockOptions() {
            BoundedCapacity = 10,
            MaxDegreeOfParallelism = 10,
            EnsureOrdered = true
        };
var loadXml = new TransformBlock<Job, Job>(job => { ... }, options); // I/O
var validateData = new TransformBlock<Job, Job>(job => { ... }, options); // Parsing&Validating&Calculations
var importJob = new TransformBlock<Job, Job>(job => { ... }, options); // Saving to database

var loadingFailed = new ActionBlock<Job>(job => CreateResponse(job));
var validationFailed = new ActionBlock<Job>(job => CreateResponse(job));
var reportImport = new ActionBlock<Job>(job => CreateResponse(job));

loadXml.LinkTo(validateData, job => job.ReturnCode == 100);
loadXml.LinkTo(loadingFailed);

validateData.LinkTo(importJob, Job => Job.ReturnCode == 100);
validateData.LinkTo(validationFailed);

importJob.LinkTo(reportImport);

编辑3 针对OP添加的源代码做出回应: 通过将MaxDegreeOfParallelismBoundedCapcity设置为1,您失去了最后一个转换块中的有序行为。让我明确一点,不要这样做来“确保顺序”,这只是在与库进行斗争。以下是TransformBlock的相关片段:

            // If parallelism is employed, we will need to support reordering messages that complete out-of-order.
            // However, a developer can override this with EnsureOrdered == false.
            if (dataflowBlockOptions.SupportsParallelExecution && dataflowBlockOptions.EnsureOrdered)
            {
                _reorderingBuffer = new ReorderingBuffer<TOutput>(this, (owningSource, message) => ((TransformBlock<TInput, TOutput>)owningSource)._source.AddMessage(message));
            }

这是使用您修改后的代码在最终TBlock中使用并行处理完成的20个数据点的运行结果。修改为基本csv以便在Excel中查看,即将“ ”替换为“,”。
Function,TimeStamp/Inserted JobId,Other,Other,Other,Other,Other,Other,Other,JobId From functions
ReadDocument,04:54.0,|,Thread,6,is,processing,Job,Id:,1
ReadDocument,04:54.0,|,Thread,11,is,processing,Job,Id:,2
ReadDocument,04:56.0,|,Thread,13,is,processing,Job,Id:,3
ReadDocument,04:56.0,|,Thread,6,is,processing,Job,Id:,4
ReadDocument,04:57.0,|,Thread,11,is,processing,Job,Id:,5
ReadDocument,04:57.0,|,Thread,14,is,processing,Job,Id:,6
ReadDocument,04:58.0,|,Thread,15,is,processing,Job,Id:,7
ReadDocument,04:58.0,|,Thread,6,is,processing,Job,Id:,8
ReadDocument,04:59.0,|,Thread,11,is,processing,Job,Id:,9
ReadDocument,04:59.0,|,Thread,16,is,processing,Job,Id:,10
ReadDocument,05:00.0,|,Thread,17,is,processing,Job,Id:,12
ReadDocument,05:00.0,|,Thread,15,is,processing,Job,Id:,11
ReadDocument,05:01.0,|,Thread,16,is,processing,Job,Id:,13
ReadDocument,05:01.0,|,Thread,18,is,processing,Job,Id:,14
ReadDocument,05:02.0,|,Thread,15,is,processing,Job,Id:,15
ReadDocument,05:02.0,|,Thread,17,is,processing,Job,Id:,20
ValidateXml,05:02.0,|,Thread,19,is,processing,Job,Id:,1
ReadDocument,05:02.0,|,Thread,14,is,processing,Job,Id:,17
ReadDocument,05:02.0,|,Thread,13,is,processing,Job,Id:,16
ReadDocument,05:02.0,|,Thread,11,is,processing,Job,Id:,18
ReadDocument,05:02.0,|,Thread,6,is,processing,Job,Id:,19
ValidateXml,05:03.0,|,Thread,16,is,processing,Job,Id:,2
ValidateXml,05:03.0,|,Thread,20,is,processing,Job,Id:,3
ValidateXml,05:04.0,|,Thread,11,is,processing,Job,Id:,4
ValidateXml,05:04.0,|,Thread,21,is,processing,Job,Id:,7
ValidateXml,05:04.0,|,Thread,18,is,processing,Job,Id:,5
ValidateXml,05:04.0,|,Thread,15,is,processing,Job,Id:,6
ValidateXml,05:04.5,|,Thread,16,is,processing,Job,Id:,8
ValidateXml,05:04.5,|,Thread,6,is,processing,Job,Id:,9
ValidateXml,05:04.6,|,Thread,19,is,processing,Job,Id:,10
ProcessJob,05:04.6,|,Thread,14,is,processing,Job,Id:,2
ProcessJob,05:04.6,|,Thread,22,is,processing,Job,Id:,1
ValidateXml,05:05.5,|,Thread,18,is,processing,Job,Id:,11
ValidateXml,05:05.6,|,Thread,20,is,processing,Job,Id:,12
ProcessJob,05:05.6,|,Thread,23,is,processing,Job,Id:,3
ValidateXml,05:06.5,|,Thread,6,is,processing,Job,Id:,13
ValidateXml,05:06.5,|,Thread,21,is,processing,Job,Id:,15
ID,1,was,successfully,imported.,,,,,
ValidateXml,05:06.5,|,Thread,16,is,processing,Job,Id:,14
ValidateXml,05:06.8,|,Thread,15,is,processing,Job,Id:,17
ProcessJob,05:06.8,|,Thread,24,is,processing,Job,Id:,4
ValidateXml,05:06.8,|,Thread,11,is,processing,Job,Id:,16
ProcessJob,05:06.8,|,Thread,22,is,processing,Job,Id:,5
ProcessJob,05:07.5,|,Thread,17,is,processing,Job,Id:,6
ProcessJob,05:07.5,|,Thread,25,is,processing,Job,Id:,8
ValidateXml,05:07.5,|,Thread,19,is,processing,Job,Id:,18
ProcessJob,05:07.5,|,Thread,14,is,processing,Job,Id:,7
ValidateXml,05:08.5,|,Thread,16,is,processing,Job,Id:,19
ProcessJob,05:08.5,|,Thread,23,is,processing,Job,Id:,9
ValidateXml,05:08.5,|,Thread,18,is,processing,Job,Id:,20
ProcessJob,05:09.5,|,Thread,19,is,processing,Job,Id:,10
ID,2,was,successfully,imported.,,,,,
ProcessJob,05:09.5,|,Thread,15,is,processing,Job,Id:,11
ID,3,was,successfully,imported.,,,,,
ProcessJob,05:10.6,|,Thread,14,is,processing,Job,Id:,12
ProcessJob,05:10.9,|,Thread,25,is,processing,Job,Id:,13
ProcessJob,05:11.0,|,Thread,24,is,processing,Job,Id:,14
ID,4,was,successfully,imported.,,,,,
ProcessJob,05:11.1,|,Thread,17,is,processing,Job,Id:,15
ProcessJob,05:11.3,|,Thread,22,is,processing,Job,Id:,16
ID,5,was,successfully,imported.,,,,,
ID,6,was,successfully,imported.,,,,,
ID,7,was,successfully,imported.,,,,,
ID,8,was,successfully,imported.,,,,,
ProcessJob,05:11.6,|,Thread,19,is,processing,Job,Id:,17
ProcessJob,05:11.7,|,Thread,23,is,processing,Job,Id:,18
ID,9,was,successfully,imported.,,,,,
ID,10,was,successfully,imported.,,,,,
ProcessJob,05:12.0,|,Thread,14,is,processing,Job,Id:,19
ProcessJob,05:12.4,|,Thread,15,is,processing,Job,Id:,20
ID,11,was,successfully,imported.,,,,,
ID,12,was,successfully,imported.,,,,,
ID,13,was,successfully,imported.,,,,,
ID,14,was,successfully,imported.,,,,,
ID,15,was,successfully,imported.,,,,,
ID,16,was,successfully,imported.,,,,,
ID,17,was,successfully,imported.,,,,,
ID,18,was,successfully,imported.,,,,,
ID,19,was,successfully,imported.,,,,,
ID,20,was,successfully,imported.,,,,,

最后一点说明:返回bool类型的函数和将异常映射为返回代码可能会出现问题,但这已超出了本问题的范围。您可以通过在Stack Exchange Code Review发布代码来获取很多关于最佳实践方面的建议。


1
我尝试过这个,但如果你将 BoundedCapacity 增加到比如 8,它实际上并不起作用(即结果不是按正确顺序)。对于较小的 BoundedCapacity 值似乎可以,但对于更高的值而言,它无法正常工作似乎表明 EnsureOrdered 并没有像我们想象的那样完全达到效果。 - Matthew Watson
1
@Matthew Watson 很好的发现,进一步检查后发现EnsureOrdered仅在块之间相关。最终的ActionBlock或任何ActionBlock都会忽略该选项。几个注意点:通过在操作块中插入延迟,我使无序行为出现了BoundedCapacity的小值。因此,尽管消息以正确的顺序离开TransformBlock,但可以以任何顺序由ActionBlock处理。也许,我们只需打开tBlock上的内部ReorderingBuffer,就可以完成所有设置 ;) - JSteward
@Peter(1)哪一个包?我看到System包最后更新于2016年11月,而Microsoft包在2014年进行了更新。在哪里推荐使用Microsoft包?[链接](https://www.nuget.org/packages/System.Threading.Tasks.Dataflow/)[链接](https://www.nuget.org/packages/Microsoft.Tpl.Dataflow)(2)Rx订阅; 您需要安装相关的Rx包以使用正确的订阅重载。我将尝试在编辑中回答您的其他问题。 - JSteward
(1) 我指的是这个链接:https://msdn.microsoft.com/de-de/library/system.threading.tasks.dataflow(v=vs.110).aspx。它说需要 Microsoft.Tpl.Dataflow,这让我感到困惑。 (2) 明白了,谢谢! - Peter
关于您的编辑,我需要从Subscribe方法中实际进行Post/SendAsync到第二个和第三个BufferBlock/TransformBlock结构(用于第2和第3阶段),而不是使用LinkTo函数吗?因为如果我只是链接它,我会失去顺序。 - Peter
显示剩余4条评论

2

原始答案内容过长

编辑4:回应OP编辑2 我不确定到底进行了哪些更改以产生所提供的输出,但这里是您修改后的源代码和结果,显示了所有100个输入的有序行为。

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using System.Xml;
using System.Linq;

namespace OrderProcessing {
    public class Job {
        public string Path { get; set; }

        public XmlDocument Document { get; set; }

        public List<Object> BusinessObjects { get; set; }

        public int ReturnCode { get; set; }

        public int ID { get; set; }
    }

    public class Test {
        ITargetBlock<Job> pathBlock = null;

        CancellationTokenSource cancellationTokenSource;

        Random rnd = new Random();

        private bool ReadDocument(Job job) {
            Console.WriteLine($"ReadDocument {DateTime.Now.TimeOfDay} JobId: {job.ID}");
            Task.Delay(rnd.Next(1000, 3000)).Wait();

            // Throw OperationCanceledException if cancellation is requested.
            cancellationTokenSource.Token.ThrowIfCancellationRequested();

            // Read the document
            job.Document = new XmlDocument();

            // Some checking
            return true;
        }

        private bool ValidateXml(Job job) {
            Console.WriteLine($"ValidateXml {DateTime.Now.TimeOfDay} JobId: {job.ID}");
            Task.Delay(rnd.Next(1000, 3000)).Wait();

            // Throw OperationCanceledException if cancellation is requested.
            cancellationTokenSource.Token.ThrowIfCancellationRequested();

            // Check XML against XSD and perform remaining checks
            job.BusinessObjects = new List<object>();

            // Just for tests
            job.BusinessObjects.Add(new object());
            job.BusinessObjects.Add(new object());

            // Parse Xml and create business objects
            return true;
        }

        private bool ProcessJob(Job job) {
            Console.WriteLine($"ProcessJob {DateTime.Now.TimeOfDay} JobId: {job.ID}");

            // Throw OperationCanceledException if cancellation is requested.
            cancellationTokenSource.Token.ThrowIfCancellationRequested();

            Parallel.ForEach(job.BusinessObjects, bO => {
                ImportObject(bO);
            });


            // Import the job
            return true;
        }

        private object ImportObject(object o) {
            Task.Delay(rnd.Next(1000, 3000)).Wait();

            return new object();
        }

        private void CreateResponse(Job job) {
            if (job.ReturnCode == 100) {
                Console.WriteLine($"CreateResponse {DateTime.Now.TimeOfDay} JobId: {job.ID}");

            }
            else {
                Console.WriteLine("ID {0} failed to import.", job.ID);
            }

            // Create response XML with returncodes
        }

        ITargetBlock<Job> CreateJobProcessingPipeline() {
            var loadXml = new TransformBlock<Job, Job>(job => {
                try {
                    if (ReadDocument(job)) {
                        // For later error handling
                        job.ReturnCode = 100; // success
                    }
                    else {
                        job.ReturnCode = 200;
                    }

                    return job;
                }
                catch (OperationCanceledException) {
                    job.ReturnCode = 300;
                    return job;
                }
            }, TransformBlockOptions());

            var validateXml = new TransformBlock<Job, Job>(job => {
                try {
                    if (ValidateXml(job)) {
                        // For later error handling
                        job.ReturnCode = 100;
                    }
                    else {
                        job.ReturnCode = 200;
                    }

                    return job;
                }
                catch (OperationCanceledException) {
                    job.ReturnCode = 300;
                    return job;
                }
            }, TransformBlockOptions());


            var importJob = new TransformBlock<Job, Job>(job => {
                try {
                    if (ProcessJob(job)) {
                        // For later error handling
                        job.ReturnCode = 100; // success
                    }
                    else {
                        job.ReturnCode = 200;
                    }

                    return job;
                }
                catch (OperationCanceledException) {
                    job.ReturnCode = 300;
                    return job;
                }
            }, TransformBlockOptions());

            var loadingFailed = new ActionBlock<Job>(job => CreateResponse(job), ActionBlockOptions());
            var validationFailed = new ActionBlock<Job>(job => CreateResponse(job), ActionBlockOptions());
            var reportImport = new ActionBlock<Job>(job => CreateResponse(job), ActionBlockOptions());

            //
            // Connect the pipeline
            //
            loadXml.LinkTo(validateXml, job => job.ReturnCode == 100);
            loadXml.LinkTo(loadingFailed);

            validateXml.LinkTo(importJob, Job => Job.ReturnCode == 100);
            validateXml.LinkTo(validationFailed);

            //importJob.LinkTo(reportImport);

            var output = importJob.AsObservable();
            var subscription = output.Subscribe(x => {
            if (x.ReturnCode == 100) {
                //job success
                Console.WriteLine($"SendToDataBase {DateTime.Now.TimeOfDay} JobId: {x.ID}");
            }
            else {
                //handle fault
                Console.WriteLine($"Job Failed {DateTime.Now.TimeOfDay} JobId: {x.ID}");
            }                
        });

            // Return the head of the network.
            return loadXml;
        }

        public void Start() {
            cancellationTokenSource = new CancellationTokenSource();

            pathBlock = CreateJobProcessingPipeline();
        }

        public async void AddJob(string path, int id) {
            Job j = new Job();
            j.Path = path;
            j.ID = id;

            await pathBlock.SendAsync(j);
        }

        static ExecutionDataflowBlockOptions TransformBlockOptions() {
            return new ExecutionDataflowBlockOptions {
                MaxDegreeOfParallelism = 8,
                BoundedCapacity = 32
            };
        }

        private static ExecutionDataflowBlockOptions ActionBlockOptions() {
            return new ExecutionDataflowBlockOptions {
                MaxDegreeOfParallelism = 1,
                BoundedCapacity = 1
            };
        }

        public void Cancel() {
            if (cancellationTokenSource != null)
                cancellationTokenSource.Cancel();
        }
    }

    class Program {
        private static String InputXml = @"C:\XML\Part.xml";
        private static Test _Pipeline;

        static void Main(string[] args) {
            _Pipeline = new Test();
            _Pipeline.Start();


            var data = Enumerable.Range(1, 100);

            foreach (var d in data)
                _Pipeline.AddJob(InputXml, d);

            //Wait before closing the application so we can see the results.
            Console.ReadLine();
        }
    }
}

结果

Function,Timestamp,Other,JobId
ReadDocument,08:11:27.2200011,JobId:,1
ReadDocument,08:11:27.2240007,JobId:,2
ReadDocument,08:11:29.7562763,JobId:,3
ReadDocument,08:11:29.7662792,JobId:,4
ReadDocument,08:11:30.7013793,JobId:,5
ReadDocument,08:11:31.7024931,JobId:,6
ReadDocument,08:11:31.7034925,JobId:,7
ReadDocument,08:11:32.7306060,JobId:,9
ReadDocument,08:11:32.7306060,JobId:,8
ReadDocument,08:11:33.7027033,JobId:,10
ReadDocument,08:11:33.7027033,JobId:,11
ReadDocument,08:11:34.7018217,JobId:,12
ReadDocument,08:11:34.7028153,JobId:,13
ReadDocument,08:11:35.7019214,JobId:,14
ReadDocument,08:11:35.7069235,JobId:,15
ReadDocument,08:11:35.7069235,JobId:,16
ReadDocument,08:11:35.7069235,JobId:,17
ReadDocument,08:11:35.7079221,JobId:,18
ValidateXml,08:11:35.7119363,JobId:,1
ValidateXml,08:11:36.7060334,JobId:,2
ReadDocument,08:11:36.7060334,JobId:,19
ReadDocument,08:11:36.7070332,JobId:,20
ReadDocument,08:11:37.7071383,JobId:,21
ReadDocument,08:11:37.7071383,JobId:,22
ReadDocument,08:11:37.7081392,JobId:,23
ValidateXml,08:11:37.7091421,JobId:,3
ReadDocument,08:11:38.7032496,JobId:,24
ValidateXml,08:11:38.7052496,JobId:,6
ValidateXml,08:11:38.7042513,JobId:,4
ReadDocument,08:11:38.7052496,JobId:,27
ValidateXml,08:11:38.7042513,JobId:,5
ReadDocument,08:11:38.7052496,JobId:,28
ReadDocument,08:11:38.7042513,JobId:,26
ReadDocument,08:11:38.7032496,JobId:,25
ValidateXml,08:11:39.7023545,JobId:,7
ReadDocument,08:11:39.7023545,JobId:,29
ValidateXml,08:11:39.7023545,JobId:,8
ReadDocument,08:11:40.7064634,JobId:,30
ReadDocument,08:11:40.7064634,JobId:,31
ValidateXml,08:11:40.7084642,JobId:,9
ValidateXml,08:11:41.7045755,JobId:,10
ReadDocument,08:11:41.7085762,JobId:,33
ValidateXml,08:11:41.7105750,JobId:,11
ValidateXml,08:11:41.7115767,JobId:,12
ValidateXml,08:11:41.7135740,JobId:,13
ValidateXml,08:11:41.7155790,JobId:,14
ReadDocument,08:11:41.7085762,JobId:,34
ReadDocument,08:11:41.7045755,JobId:,32
ReadDocument,08:11:41.7105750,JobId:,35
ReadDocument,08:11:41.7135740,JobId:,36
ReadDocument,08:11:42.7086844,JobId:,37
ValidateXml,08:11:42.7116926,JobId:,15
ValidateXml,08:11:42.7126878,JobId:,16
ReadDocument,08:11:42.7116926,JobId:,38
ValidateXml,08:11:43.7027911,JobId:,17
ValidateXml,08:11:43.7027911,JobId:,18
ValidateXml,08:11:43.7068030,JobId:,20
ProcessJob,08:11:43.7097908,JobId:,1
ValidateXml,08:11:43.7057897,JobId:,19
ReadDocument,08:11:43.7057897,JobId:,39
ReadDocument,08:11:43.7077893,JobId:,40
ReadDocument,08:11:44.7038990,JobId:,41
ProcessJob,08:11:44.7059002,JobId:,2
ValidateXml,08:11:44.7049004,JobId:,21
ReadDocument,08:11:44.7038990,JobId:,42
ValidateXml,08:11:44.7059002,JobId:,22
ReadDocument,08:11:44.7089023,JobId:,44
ReadDocument,08:11:44.7049004,JobId:,43
ReadDocument,08:11:45.7030090,JobId:,45
ValidateXml,08:11:45.7030090,JobId:,23
ValidateXml,08:11:45.7120179,JobId:,24
ValidateXml,08:11:45.7120179,JobId:,25
ReadDocument,08:11:45.7140087,JobId:,46
ValidateXml,08:11:45.7170104,JobId:,26
ReadDocument,08:11:45.7190107,JobId:,47
ProcessJob,08:11:45.7200086,JobId:,3
ValidateXml,08:11:45.7170104,JobId:,27
ReadDocument,08:11:46.7071167,JobId:,48
ReadDocument,08:11:46.7101161,JobId:,50
ProcessJob,08:11:46.7111152,JobId:,4
ValidateXml,08:11:46.7111152,JobId:,28
ReadDocument,08:11:46.7071167,JobId:,49
ValidateXml,08:11:47.7032249,JobId:,29
ReadDocument,08:11:47.7062243,JobId:,51
ReadDocument,08:11:47.7072261,JobId:,52
ReadDocument,08:11:47.7092253,JobId:,53
ProcessJob,08:11:47.7102243,JobId:,5
ProcessJob,08:11:47.7112241,JobId:,7
ReadDocument,08:11:47.7102243,JobId:,55
ValidateXml,08:11:47.7062243,JobId:,30
ProcessJob,08:11:47.7102243,JobId:,6
ValidateXml,08:11:47.7072261,JobId:,31
ReadDocument,08:11:47.7092253,JobId:,54
ReadDocument,08:11:48.7063329,JobId:,56
ProcessJob,08:11:48.7073331,JobId:,8
ValidateXml,08:11:48.7063329,JobId:,32
ValidateXml,08:11:48.7063329,JobId:,33
ValidateXml,08:11:49.7074443,JobId:,34
ReadDocument,08:11:49.7104422,JobId:,59
ReadDocument,08:11:49.7124418,JobId:,60
ProcessJob,08:11:49.7124418,JobId:,9
ValidateXml,08:11:49.7144433,JobId:,36
ValidateXml,08:11:49.7114420,JobId:,35
ReadDocument,08:11:49.7074443,JobId:,57
ReadDocument,08:11:49.7084468,JobId:,58
ValidateXml,08:11:50.7065604,JobId:,37
ReadDocument,08:11:50.7095502,JobId:,61
ProcessJob,08:11:50.7105504,JobId:,10
ReadDocument,08:11:50.7115502,JobId:,63
ValidateXml,08:11:50.7125515,JobId:,40
ReadDocument,08:11:50.7105504,JobId:,62
ValidateXml,08:11:50.7095502,JobId:,39
ValidateXml,08:11:50.7075518,JobId:,38
ReadDocument,08:11:50.7115502,JobId:,64
ReadDocument,08:11:51.7076596,JobId:,65
ReadDocument,08:11:51.7086597,JobId:,66
ProcessJob,08:11:51.7116603,JobId:,13
ProcessJob,08:11:51.7106605,JobId:,12
ProcessJob,08:11:51.7086597,JobId:,11
ValidateXml,08:11:51.7076596,JobId:,41
SendToDataBase,08:11:51.7366672,JobId:,1
SendToDataBase,08:11:51.7416631,JobId:,2
SendToDataBase,08:11:51.7496646,JobId:,3
CreateResponse,08:11:51.7546639,JobId:,56
ValidateXml,08:11:52.7037712,JobId:,42
ValidateXml,08:11:52.7037712,JobId:,43
ValidateXml,08:11:52.7077662,JobId:,44
ReadDocument,08:11:52.7107675,JobId:,69
ProcessJob,08:11:52.7077662,JobId:,14
ProcessJob,08:11:52.7077662,JobId:,15
ProcessJob,08:11:52.7087683,JobId:,16
ProcessJob,08:11:52.7087683,JobId:,17
ValidateXml,08:11:52.7097669,JobId:,45
ReadDocument,08:11:52.7097669,JobId:,67
ValidateXml,08:11:52.7097669,JobId:,46
ReadDocument,08:11:52.7107675,JobId:,68
ValidateXml,08:11:53.7069300,JobId:,47
ReadDocument,08:11:53.7078801,JobId:,70
ValidateXml,08:11:53.7108792,JobId:,48
SendToDataBase,08:11:53.7118774,JobId:,4
SendToDataBase,08:11:53.7208818,JobId:,5
SendToDataBase,08:11:53.7228802,JobId:,6
SendToDataBase,08:11:53.7238781,JobId:,7
SendToDataBase,08:11:53.7258800,JobId:,8
ReadDocument,08:11:53.7118774,JobId:,73
ReadDocument,08:11:53.7098805,JobId:,71
ReadDocument,08:11:53.7118774,JobId:,72
ValidateXml,08:11:54.7059933,JobId:,49
ValidateXml,08:11:54.7069847,JobId:,50
ValidateXml,08:11:54.7089874,JobId:,51
CreateResponse,08:11:54.7109862,JobId:,41
CreateResponse,08:11:54.7169842,JobId:,42
SendToDataBase,08:11:54.7149888,JobId:,9
SendToDataBase,08:11:54.7259874,JobId:,10
SendToDataBase,08:11:54.7269883,JobId:,11
ProcessJob,08:11:54.7119868,JobId:,18
ReadDocument,08:11:54.7059933,JobId:,74
ValidateXml,08:11:54.7109862,JobId:,53
ProcessJob,08:11:54.7119868,JobId:,19
ProcessJob,08:11:54.7129854,JobId:,20
ValidateXml,08:11:54.7099852,JobId:,52
ReadDocument,08:11:54.7129854,JobId:,76
ReadDocument,08:11:54.7069847,JobId:,75
ReadDocument,08:11:55.7090940,JobId:,77
ReadDocument,08:11:55.7140926,JobId:,78
ValidateXml,08:11:55.7140926,JobId:,54
SendToDataBase,08:11:55.7180953,JobId:,12
CreateResponse,08:11:55.7180953,JobId:,43
ProcessJob,08:11:55.7180953,JobId:,21
SendToDataBase,08:11:55.7230962,JobId:,13
ValidateXml,08:11:55.7170947,JobId:,55
ReadDocument,08:11:55.7160937,JobId:,79
ReadDocument,08:11:55.7170947,JobId:,80
ValidateXml,08:11:55.8111031,JobId:,57
ReadDocument,08:11:55.8111031,JobId:,81
ProcessJob,08:11:55.8451120,JobId:,22
ProcessJob,08:11:56.1251577,JobId:,23
ReadDocument,08:11:56.2531569,JobId:,82
ReadDocument,08:11:56.3441756,JobId:,83
ProcessJob,08:11:56.3571695,JobId:,24
ValidateXml,08:11:56.3851785,JobId:,58
ReadDocument,08:11:56.4061804,JobId:,84
ValidateXml,08:11:56.6222012,JobId:,59
CreateResponse,08:11:56.6222012,JobId:,49
ProcessJob,08:11:56.9112320,JobId:,25
ValidateXml,08:11:56.9412405,JobId:,60
ProcessJob,08:11:57.0002533,JobId:,26
ValidateXml,08:11:57.2352587,JobId:,61
ProcessJob,08:11:57.4852908,JobId:,27
ReadDocument,08:11:58.2093656,JobId:,85
SendToDataBase,08:11:58.2163692,JobId:,14
ReadDocument,08:11:58.2113664,JobId:,87
SendToDataBase,08:11:58.2203645,JobId:,15
SendToDataBase,08:11:58.2293743,JobId:,16
SendToDataBase,08:11:58.2303706,JobId:,17
SendToDataBase,08:11:58.2313662,JobId:,18
SendToDataBase,08:11:58.2333692,JobId:,19
SendToDataBase,08:11:58.2353681,JobId:,20
SendToDataBase,08:11:58.2373688,JobId:,21
SendToDataBase,08:11:58.2383671,JobId:,22
SendToDataBase,08:11:58.2393673,JobId:,23
ValidateXml,08:11:58.2123658,JobId:,63
CreateResponse,08:11:58.2163692,JobId:,50
CreateResponse,08:11:58.2543716,JobId:,51
CreateResponse,08:11:58.2643699,JobId:,52
CreateResponse,08:11:58.2663730,JobId:,53
ProcessJob,08:11:58.2143646,JobId:,31
ProcessJob,08:11:58.2123658,JobId:,29
ReadDocument,08:11:58.2093656,JobId:,86
ReadDocument,08:11:58.2123658,JobId:,88
ProcessJob,08:11:58.2133656,JobId:,30
ProcessJob,08:11:58.2103650,JobId:,28
ValidateXml,08:11:58.2113664,JobId:,62
ReadDocument,08:11:58.2123658,JobId:,89
ValidateXml,08:11:58.2133656,JobId:,64
ValidateXml,08:11:59.7055294,JobId:,65
ReadDocument,08:11:59.7065300,JobId:,91
ValidateXml,08:11:59.7065300,JobId:,66
SendToDataBase,08:11:59.7115275,JobId:,24
SendToDataBase,08:11:59.7195324,JobId:,25
SendToDataBase,08:11:59.7205330,JobId:,26
ProcessJob,08:11:59.7085277,JobId:,33
ValidateXml,08:11:59.7085277,JobId:,68
ReadDocument,08:11:59.7095263,JobId:,93
ValidateXml,08:11:59.7085277,JobId:,67
ReadDocument,08:11:59.7095263,JobId:,92
ProcessJob,08:11:59.7095263,JobId:,34
ProcessJob,08:11:59.7075275,JobId:,32
ReadDocument,08:11:59.7055294,JobId:,90
ValidateXml,08:11:59.7105265,JobId:,70
ValidateXml,08:11:59.7095263,JobId:,69
ReadDocument,08:11:59.7105265,JobId:,94
ValidateXml,08:12:00.7146358,JobId:,71
SendToDataBase,08:12:00.7176364,JobId:,27
ReadDocument,08:12:00.7156372,JobId:,97
ProcessJob,08:12:00.7146358,JobId:,35
ProcessJob,08:12:00.7156372,JobId:,36
ReadDocument,08:12:00.7146358,JobId:,95
ReadDocument,08:12:00.7156372,JobId:,96
ReadDocument,08:12:00.8616797,JobId:,98
ValidateXml,08:12:00.8796565,JobId:,72
ReadDocument,08:12:00.9066595,JobId:,99
ReadDocument,08:12:00.9786697,JobId:,100
ValidateXml,08:12:00.9866692,JobId:,73
ProcessJob,08:12:01.0766830,JobId:,37
ValidateXml,08:12:01.1176829,JobId:,74
ProcessJob,08:12:01.1176829,JobId:,38
ProcessJob,08:12:01.2167037,JobId:,39
SendToDataBase,08:12:01.2167037,JobId:,28
SendToDataBase,08:12:01.2216970,JobId:,29
SendToDataBase,08:12:01.2236923,JobId:,30
SendToDataBase,08:12:01.2246914,JobId:,31
ValidateXml,08:12:01.2327001,JobId:,75
ValidateXml,08:12:01.5447286,JobId:,76
ProcessJob,08:12:01.6567738,JobId:,40
ValidateXml,08:12:01.9347686,JobId:,77
ProcessJob,08:12:02.2498041,JobId:,44
ProcessJob,08:12:02.4448257,JobId:,45
SendToDataBase,08:12:02.4458286,JobId:,32
ValidateXml,08:12:02.5469861,JobId:,78
ProcessJob,08:12:02.6268456,JobId:,46
SendToDataBase,08:12:02.6278997,JobId:,33
SendToDataBase,08:12:02.6378977,JobId:,34
SendToDataBase,08:12:02.6398461,JobId:,35
ValidateXml,08:12:02.6538506,JobId:,79
ProcessJob,08:12:03.1399063,JobId:,47
SendToDataBase,08:12:03.1489053,JobId:,36
ValidateXml,08:12:03.2979184,JobId:,80
ProcessJob,08:12:03.4959402,JobId:,48
ValidateXml,08:12:03.6259629,JobId:,81
ValidateXml,08:12:03.6769676,JobId:,82
ProcessJob,08:12:03.7719693,JobId:,54
ProcessJob,08:12:03.8519797,JobId:,55
ProcessJob,08:12:03.9689901,JobId:,57
SendToDataBase,08:12:04.0079945,JobId:,37
SendToDataBase,08:12:04.0099953,JobId:,38
SendToDataBase,08:12:04.0109931,JobId:,39
SendToDataBase,08:12:04.0119941,JobId:,40
ValidateXml,08:12:04.0299989,JobId:,84
ValidateXml,08:12:04.0089966,JobId:,83
ProcessJob,08:12:04.3350372,JobId:,58
ValidateXml,08:12:04.6541474,JobId:,85
ProcessJob,08:12:04.8791864,JobId:,59
SendToDataBase,08:12:04.8791864,JobId:,44
SendToDataBase,08:12:05.0252098,JobId:,45
SendToDataBase,08:12:05.0757198,JobId:,46
ProcessJob,08:12:05.0757198,JobId:,60
ValidateXml,08:12:05.1527328,JobId:,86
ProcessJob,08:12:05.1532325,JobId:,61
ValidateXml,08:12:05.2762716,JobId:,87
ValidateXml,08:12:05.3793706,JobId:,88
ValidateXml,08:12:05.5953056,JobId:,89
ValidateXml,08:12:05.6453136,JobId:,90
ProcessJob,08:12:05.8313378,JobId:,62
SendToDataBase,08:12:05.8313378,JobId:,47
ValidateXml,08:12:06.1573930,JobId:,91
ValidateXml,08:12:06.2043839,JobId:,92
ProcessJob,08:12:06.4384015,JobId:,63
SendToDataBase,08:12:06.4384015,JobId:,48
ProcessJob,08:12:06.6554190,JobId:,64
ProcessJob,08:12:06.7494355,JobId:,65
SendToDataBase,08:12:06.7494355,JobId:,54
SendToDataBase,08:12:06.7594308,JobId:,55
SendToDataBase,08:12:06.7624294,JobId:,57
ProcessJob,08:12:06.9254482,JobId:,66
SendToDataBase,08:12:06.9254482,JobId:,58
ValidateXml,08:12:07.0154624,JobId:,93
ValidateXml,08:12:07.0975086,JobId:,94
ProcessJob,08:12:07.1925138,JobId:,67
ValidateXml,08:12:07.2724877,JobId:,95
ProcessJob,08:12:07.6385268,JobId:,68
ProcessJob,08:12:07.7705429,JobId:,69
ValidateXml,08:12:07.8315476,JobId:,96
ProcessJob,08:12:07.8905526,JobId:,70
SendToDataBase,08:12:07.8905526,JobId:,59
SendToDataBase,08:12:07.8965534,JobId:,60
SendToDataBase,08:12:07.8975535,JobId:,61
ValidateXml,08:12:08.1306009,JobId:,97
ValidateXml,08:12:08.2065895,JobId:,98
ValidateXml,08:12:08.3106332,JobId:,99
ProcessJob,08:12:08.3296082,JobId:,71
ValidateXml,08:12:08.4406159,JobId:,100
ProcessJob,08:12:08.8396557,JobId:,72
SendToDataBase,08:12:08.8446570,JobId:,62
SendToDataBase,08:12:08.8806613,JobId:,63
SendToDataBase,08:12:08.8946619,JobId:,64
ProcessJob,08:12:09.0076746,JobId:,73
SendToDataBase,08:12:09.0086763,JobId:,65
ProcessJob,08:12:09.0996850,JobId:,74
ProcessJob,08:12:09.1106847,JobId:,75
SendToDataBase,08:12:09.1106847,JobId:,66
SendToDataBase,08:12:09.1136860,JobId:,67
ProcessJob,08:12:09.6547630,JobId:,76
SendToDataBase,08:12:09.6557462,JobId:,68
ProcessJob,08:12:09.9218032,JobId:,77
ProcessJob,08:12:10.2218075,JobId:,78
ProcessJob,08:12:10.4288308,JobId:,79
SendToDataBase,08:12:10.4288308,JobId:,69
SendToDataBase,08:12:10.4408307,JobId:,70
SendToDataBase,08:12:10.4448318,JobId:,71
ProcessJob,08:12:10.6858596,JobId:,80
SendToDataBase,08:12:10.6858596,JobId:,72
ProcessJob,08:12:11.4049481,JobId:,81
ProcessJob,08:12:11.7039814,JobId:,82
ProcessJob,08:12:11.8272054,JobId:,83
ProcessJob,08:12:11.9930072,JobId:,84
SendToDataBase,08:12:11.9930072,JobId:,73
SendToDataBase,08:12:11.9979988,JobId:,74
SendToDataBase,08:12:11.9989983,JobId:,75
SendToDataBase,08:12:11.9989983,JobId:,76
ProcessJob,08:12:12.3460366,JobId:,85
ProcessJob,08:12:12.4520491,JobId:,86
SendToDataBase,08:12:12.4520491,JobId:,77
ProcessJob,08:12:12.8810952,JobId:,87
ProcessJob,08:12:13.1443167,JobId:,88
SendToDataBase,08:12:13.1443167,JobId:,78
SendToDataBase,08:12:13.1471282,JobId:,79
ProcessJob,08:12:13.2041414,JobId:,89
SendToDataBase,08:12:13.2081302,JobId:,80
SendToDataBase,08:12:13.2101309,JobId:,81
ProcessJob,08:12:13.4381566,JobId:,90
SendToDataBase,08:12:13.4392215,JobId:,82
ProcessJob,08:12:13.6411889,JobId:,91
SendToDataBase,08:12:13.6411889,JobId:,83
ProcessJob,08:12:13.9472212,JobId:,92
SendToDataBase,08:12:13.9472212,JobId:,84
ProcessJob,08:12:14.3122494,JobId:,93
ProcessJob,08:12:14.7053031,JobId:,94
SendToDataBase,08:12:14.7053031,JobId:,85
SendToDataBase,08:12:14.7092946,JobId:,86
ProcessJob,08:12:14.9393634,JobId:,95
ProcessJob,08:12:15.4103709,JobId:,96
SendToDataBase,08:12:15.4113707,JobId:,87
ProcessJob,08:12:15.9355263,JobId:,97
ProcessJob,08:12:15.9724349,JobId:,98
SendToDataBase,08:12:15.9724349,JobId:,88
SendToDataBase,08:12:15.9774350,JobId:,89
ProcessJob,08:12:15.9724349,JobId:,99
SendToDataBase,08:12:15.9784371,JobId:,90
SendToDataBase,08:12:15.9834330,JobId:,91
ProcessJob,08:12:16.6175125,JobId:,100
SendToDataBase,08:12:16.6175125,JobId:,92
SendToDataBase,08:12:16.6555160,JobId:,93
SendToDataBase,08:12:17.5005984,JobId:,94
SendToDataBase,08:12:17.8846409,JobId:,95
SendToDataBase,08:12:17.8886408,JobId:,96
SendToDataBase,08:12:18.1186677,JobId:,97
SendToDataBase,08:12:18.7557365,JobId:,98
SendToDataBase,08:12:18.7567394,JobId:,99
SendToDataBase,08:12:19.5488221,JobId:,100

编辑 新的订阅可以将您的项目发送到数据库,或以您选择的方式处理故障作业。

进一步的资源:

Stack Exchange Code Review

Dataflow源代码


我使用了你的代码,发现两个问题。其中一个是它会将作业传递到CreateResponse,但由于选项中的“BoundedCapacity”被锁定为1或32,所以不应该这样做。只有在returnCode不是100时,作业才应该进入那里。 如果我将其设置为Unbounded,“ProcessJob”的输出顺序就不正确,但SendToDataBase的输出顺序是正确的,这让我想到它会重新排序当前正在处理的元素,然后从“TransformBlock”返回它,或者等待按顺序处理下一个元素,但块内部的处理本身并不按顺序进行? - Peter
@Peter 在你修改的代码中,我已经将最终输出重定向到一个流中,每个作业都按顺序在该流的处理程序中处理,这可以在订阅中看到。取消和故障不是这个问题的重点,这是你的返回代码映射所做的全部。我无法确定你设置了什么为“Unbounded”,但如果你将具有大于一个MaxDegreeOfParallelismActionBlock设置为无限容量,则会无序处理你的项目。请参见快速修改,它在流中处理返回代码。 - JSteward
所以我将你的代码一对一地复制并粘贴到另一个编辑器中。正如你所看到的,即使我还没有进行错误处理并且在每个函数中只返回true(尚未发生错误),CreateResponse仍然被调用了几次。仍然有一些作业被传递到Error-ActionBlock(CreateResponse)。我猜这是因为TransformBlock已经达到了32的BoundedCapacity,并在此之后拒绝了传入的作业。我需要确保所有内容都得到处理,这就是为什么我将所有块的BoundedCapacity设置为Unbounded的原因。 - Peter
1
我猜这是因为TransformBlock已经达到了32的有界容量。对的。我可以使用订阅保存到数据库,或者只是使用最后一个MaxDegreeOfParallelism= 1的ActionBlock,所以一切都按顺序处理?应该没问题。注意,如你所见,你需要一个更健壮的方式来处理故障/取消,而不是基于ReturnCode的谓词。 - JSteward
仅使用Unbounded有问题吗?能否推荐一个处理故障和取消的最佳实践的好来源? - Peter
显示剩余3条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接