TPL数据流生产者消费者模式

3

我使用TPL DataFlow编写了一个生产者消费者模式的示例。这里有一些基本问题:

  1. 只有在生产者发布所有项目后,消费者才处于活动状态。异步是否意味着生产和消费任务都可以并行运行?

  2. 给消费者设定睡眠时间以验证它是否会阻塞其他数据项。看起来它是按顺序执行的,并没有获得任何并行性。

我在这里做错了什么吗?

class AscDataBlocks
{
    public Int64 start;
    public Int64 End;
    //public string ThreadName;
    public void AscBufferProducer(ITargetBlock<Int64> targetAscTransform) 
    // This is using TPL DataBlock producer consumer pattern.
    {
        for (var i = start; i < End; i++)
        {
            Console.WriteLine("Postingasc : {0}", i);
            targetAscTransform.Post(i);
        }
    }

    public void ProcessDataBuffer(Int64 ascDataSet)
    {
        if (ascDataSet == 5) 
        // Testing if this will delay all the other data processing
            Thread.Sleep(5000);
        else
            Thread.Sleep(500);                
        Console.WriteLine(ascDataSet);
    }

    // Demonstrates the consumption end of the producer and consumer pattern. 
    public async Task<Int64> AscTransConsumerAsync(IReceivableSourceBlock<Int64> source)
    {
        // Initialize a counter to track the number of bytes that are processed. 
        int status = 0;

        // Read from the source buffer until the source buffer has no  
        // available output data. 
        while (await source.OutputAvailableAsync())
        {
            Int64 data;
            source.TryReceive(out data);
            ProcessDataBuffer(data);//This function processed the data buffer for ascollection and writes the data to the database.
            // Increment the count of bytes received.
            status = 1;
        }
        return status;
    }
}

static void Main(string[] args)
{
    AscDataBlocks ascb;
    BufferBlock<Int64> ascbuffer;
    System.Threading.Tasks.Task<Int64> ascProcessingconsumer;
    CancellationToken ct = new CancellationToken();
    CancellationTokenSource cts = new CancellationTokenSource();
    ascb = new AscDataBlocks();
    ascb.start = 1;
    ascb.End = 100;           
    try
    {
        ascbuffer = new BufferBlock<Int64>(new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = 5,
            CancellationToken = ct
        });// Initiallize the buffer bloack
        ascProcessingconsumer = ascb.AscTransConsumerAsync(ascbuffer); //Initialize the consumer.
        //Post source data to the dataflow block.
        ascb.AscBufferProducer(ascbuffer);
        ascProcessingconsumer.Wait();
    }
    catch (Exception ex)
    {
        //foreach (var v in ex.InnerExceptions)
        //    Console.WriteLine("msg: " + v.Message);                
    }            
}  

顺便说一句,C# 不是 C,你不需要在方法的开头指定所有的本地变量。 - svick
1个回答

4
生产者发布所有项目后,消费者才会活跃。异步是否意味着生产和消费任务都可以并行运行。
这是因为您在消费者有机会开始之前非常快地发布了所有项目。如果添加了Thread.Sleep(100),您将看到它们实际上是并行工作的。
给定消费者的睡眠时间以验证其是否阻塞其他数据项。它似乎是按顺序执行而不得到任何并行性。
TPL Dataflow不是魔法:它不会修改您的代码以并行执行。是您调用了AscTransConsumerAsync()一次,所以不要惊讶它只会实际执行一次。
TDF支持并行处理,但您需要实际让它执行处理代码。为此,请使用其中一个执行块。在您的情况下,ActionBlock似乎是合适的。
如果你使用这个,你可以通过设置 MaxDegreeOfParallelism 来配置块以并行执行。当然,这意味着你需要确保处理委托是线程安全的。
有了这个,AscTransConsumerAsync() 现在可能看起来像这样:
public async Task<Int64> AscTransConsumerAsync(ISourceBlock<Int64> source)
{
    // counter to track the number of items that are processed
    Int64 count = 0;

    var actionBlock = new ActionBlock<Int64>(
        data =>
        {
            ProcessDataBuffer(data);
            // count has to be accessed in a thread-safe manner
            // be careful about using Interlocked,
            // for more complicated computations, locking might be more appropriate
            Interlocked.Increment(ref count);
        },
        // some small constant might be better than Unbounded, depedning on circumstances
        new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });

    source.LinkTo(actionBlock, new DataflowLinkOptions { PropagateCompletion = true });

    // this assumes source will be completed when done,
    // you need to call ascbuffer.Complete() after AscBufferProducer() for this
    await actionBlock.Completion;

    return count;
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接