如何聚合异步生产者的数据并将其写入文件?

7
我正在学习C#中的异步/等待模式。目前我试图解决这样一个问题:
  • 有一个生产者(硬件设备),每秒生成1000个数据包。我需要将这些数据记录到文件中。

  • 该设备只有一个ReadAsync()方法,用于报告单个数据包。

  • 我需要缓冲数据包并按照它们生成的顺序将它们写入文件,每秒钟只写一次。

  • 如果在下一批数据包准备好写入时,写操作没有及时完成,则写操作应失败。

到目前为止,我已经编写了以下代码。它可以工作,但我不确定这是否是解决问题的最佳方法。有什么建议或意见吗?在这种生产者/消费者问题中,消费者需要聚合从生产者接收到的数据的最佳实践是什么?

static async Task TestLogger(Device device, int seconds)
{
    const int bufLength = 1000;
    bool firstIteration = true;
    Task writerTask = null;

    using (var writer = new StreamWriter("test.log")))
    {
        do
        {
            var buffer = new byte[bufLength][];

            for (int i = 0; i < bufLength; i++)
            {
                buffer[i] = await device.ReadAsync();
            }

            if (!firstIteration)
            {
                if (!writerTask.IsCompleted)
                    throw new Exception("Write Time Out!");
            }

            writerTask = Task.Run(() =>
                {
                    foreach (var b in buffer)
                        writer.WriteLine(ToHexString(b));
                });

            firstIteration = false;
        } while (--seconds > 0);
    }
}

1
缓冲区的准备就绪是由什么确定的:数据包的数量还是时间框架? - avo
1
基本上,这并不重要:这是一个硬件设备,在非常规律的间隔内每秒生成1000个数据包。 - AlefSin
1
从代码审查的角度来看,我很难验证您的代码在竞态条件方面是否正确。这表明您尚未找到一个很好的设计。良好的代码易于审查。 - usr
2
@usr 当然,但这基本上就是我要问的问题:上面的代码具有基本功能。现在,以清晰、可维护的方式实现它的最佳方法是什么? - AlefSin
1
@AlefSin 我不知道。只是我的印象。你问这个问题是正确的。 - usr
显示剩余2条评论
3个回答

1
在我看来,更好的方法是有两个"工作者",一个生产者和一个消费者。生产者从设备中读取数据并简单地填充到一个列表中。消费者每秒钟"唤醒"一次,并将批处理数据写入文件。
List<byte[]> _data = new List<byte[]>();

async Task Producer(Device device)
{
    while (true)
    {
        _data.Add(await device.ReadAsync());
    }
}

async Task Consumer(Device device)
{
    using (var writer = new StreamWriter("test.log")))
    {
        while (true)
        {
            Stopwatch watch = Stopwatch.StartNew();

            var batch = _data;
            _data = new List<byte[]>();
            foreach (var packet in batch)
            {
                writer.WriteLine(ToHexString(packet));

                if (watch.Elapsed >= TimeSpan.FromSeconds(1))
                {
                    throw new Exception("Write Time Out!");
                }
            }

            await Task.Delay(TimeSpan.FromSeconds(1) - watch.Elapsed);
        }
    }
}

"while (true)" 应该被替换为一个系统范围的取消令牌。

谢谢。我会等待,看是否有其他不同或更好的想法。如果没有什么发生,我会接受您的答案,因为我觉得它比我的解决方案更简单明了。 - AlefSin

1
如果刷新的标准是数据包的数量(最多1000个),则可以使用以下想法。我没有测试过它。它利用了Stephen Cleary在这个问题中提到的AsyncProducerConsumerQueue<T>
AsyncProducerConsumerQueue<byte[]> _queue;
Stream _stream;

// producer
async Task ReceiveAsync(CancellationToken token)
{
    while (true)
    {
       var list = new List<byte>();
       while (true)
       {
           token.ThrowIfCancellationRequested(token);
           var packet = await _device.ReadAsync(token);
           list.Add(packet);
           if (list.Count == 1000)
               break;
       }
       // push next batch
       await _queue.EnqueueAsync(list.ToArray(), token);
    }
}

// consumer
async Task LogAsync(CancellationToken token)
{
    Task previousFlush = Task.FromResult(0); 
    CancellationTokenSource cts = null;
    while (true)
    {
       token.ThrowIfCancellationRequested(token);
       // get next batch
       var nextBatch = await _queue.DequeueAsync(token);
       if (!previousFlush.IsCompleted)
       {
           cts.Cancel(); // cancel the previous flush if not ready
           throw new Exception("failed to flush on time.");
       }
       await previousFlush; // it's completed, observe for any errors
       // start flushing
       cts = CancellationTokenSource.CreateLinkedTokenSource(token);
       previousFlush = _stream.WriteAsync(nextBatch, 0, nextBatch.Count, cts.Token);
    }
}

如果您不想让记录器失败,而是更喜欢取消刷新并继续进行下一批处理,则可以对此代码进行最小更改。
回应@l3arnon评论:
  1. 一个数据包不是一个字节,而是byte[]。2. 您没有使用OP的ToHexString。3. AsyncProducerConsumerQueue比.Net的TPL Dataflow不太稳健且经过测试。4. 在抛出异常后,您等待previousFlush以获取错误,这使得该行多余。等等。简而言之:我认为可能增加的价值无法证明这个非常复杂的解决方案。
  1. “数据包不是一个字节,而是byte[]” - 数据包本身是一个字节,这从OP的代码显而易见:buffer[i] = await device.ReadAsync()。然后,一批数据包就是byte[]
  2. “您没有使用OP的ToHexString。”-目标是展示如何使用原生支持取消令牌的Stream.WriteAsync,而不是不允许取消的WriteLineAsync。使用ToHexStringStream.WriteAsync并且仍能利用取消支持是微不足道的:

    var hexBytes = Encoding.ASCII.GetBytes(ToHexString(nextBatch) + 
        Environment.NewLine);
    _stream.WriteAsync(hexBytes, 0, hexBytes.Length, token);
    
  3. “AsyncProducerConsumerQueue比.Net的TPL Dataflow要不可靠和测试” - 我认为这不是确定的事实。然而,如果OP担心这个问题,他可以使用常规的BlockingCollection,它不会阻塞生产线程。在等待下一批数据时,阻塞消费线程是可以接受的,因为写入是并行进行的。相比之下,您的TPL Dataflow版本带有一个冗余的CPU和锁密集型操作:通过logAction.Post(packet)一字节一字节地将数据从生产者管道移动到编写者管道。我的代码不会这样做。

  4. “您在抛出异常后立即等待previousFlush以检查错误,这使该行变得多余。” - 此行不多余。也许,您忽略了这一点:previousFlush.IsCompleted可以是true,当previousFlush.IsFaultedpreviousFlush.IsCancelled也是true时。因此,await previousFlush在那里是相关的,以观察任何已完成任务(例如,写入失败)上的错误,否则将会丢失。


@AlefSin,它是异步的,因此按设计不会阻塞线程。但是,await _queue.EnqueueAsync() 的继续可能会延迟,直到AsyncProducerConsumerQueue内部的异步锁可用。我没有研究过Stephen的代码,但我认为这是一个非常快的操作,也许只需要几毫秒。 - avo
@AlefSin,Windows 不是实时操作系统。无论你采取什么方法,你仍然可能会以高达每秒 1000 个样本的速率丢失单个数据包。 - avo
1
@avo,为什么你在这里需要AsyncProducerConsumerQueue?我个人认为,你可以使用标准的阻塞BlockingCollection<T>,它可能有助于减少任何额外的延迟。另外,在你的代码中,Packet是否与byte相同? - noseratio - open to work
1
@Noseratio,BlockingCollection<T>也可以使用。可以使用BlockingCollection.Take(token)代替await _queue.DequeueAsync(token)。我用byte替换了Packet并进一步简化了代码。我将让@AlefSin进行性能测试,看看BlockingCollection是否更好。 - avo
1
@avo 很抱歉让你等了这么久。经过一些测试和修改以满足我的其他错误处理需求后,我决定你的答案提供了最好的解决方案构建块。 - AlefSin
显示剩余8条评论

1
假设你可以按数量(1000)而不是时间(1秒)进行批处理,最简单的解决方案可能是使用TPL DataflowBatchBlock,它会自动按大小批处理项目流:
async Task TestLogger(Device device, int seconds)
{
    var writer = new StreamWriter("test.log");
    var batch = new BatchBlock<byte[]>(1000);
    var logAction = new ActionBlock<byte[]>(
        packet =>
        {
            return writer.WriteLineAsync(ToHexString(packet));
        });
    ActionBlock<byte[]> transferAction;
    transferAction = new ActionBlock<byte[][]>(
        bytes =>
        {
            foreach (var packet in bytes)
            {
                if (transferAction.InputCount > 0)
                {
                    return; // or throw new Exception("Write Time Out!");
                }
                logAction.Post(packet);
            }
        }
    );

    batch.LinkTo(transferAction);
    logAction.Completion.ContinueWith(_ => writer.Dispose());

    while (true)
    {
        batch.Post(await device.ReadAsync());
    }
}

@avo 我看法不是这样的,但实际上更简单。只需查询 InputCount - i3arnon
1
@I3arnon 我以前从未使用过TPL。这是一个非常有趣的解决方案。我需要稍微研究一下,因为我的生产质量代码显然还有其他考虑因素。总体而言,我非常喜欢这种方法。 - AlefSin
1
@AlefSin TPL == 任务并行库。每当您使用任务时,您都在使用它。另一方面,TPL数据流是一个不太知名的库,令人遗憾。 - i3arnon
@l3arnon,在我的代码中确实是这样的。有一个Task.WhenAny分支用于获取下一批数据并写入上一批数据。 - avo
@avo 不是这样的。在 WhenAny 之后,您需要等待 previousFlush,只有当其中的代码(您忽略实现的代码)检查令牌参数时,它才会结束。 - i3arnon
显示剩余13条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接