使用async/await运行多个无限循环

4

我正在开发一款基于Xamarin和.NET 5 Async/Await的Android通讯应用。

在我的应用中,我采用了生产者/消费者模式来处理消息,该模式是基于无限循环实现的。

例如,ReadTcpClientAsync生产者:

async Task ReadTcpClientAsync(CancellationToken cancellationToken)
{
    cde.Signal();
    while (!cancellationToken.IsCancellationRequested)
    {
        byte[] buffer = await atc.ReadAsync(cancellationToken);
        // queue message...
    }
}

或者使用SendStatementsAsync消费者,该消费者将出队消息并等待WriteAsync。
private async Task SendStatementsAsync(CancellationToken cancellationToken)
{
    while (!cancellationToken.IsCancellationRequested)
    {
        var nextItem = await _outputStatements.Take();
        cancellationToken.ThrowIfCancellationRequested();
        // misc ...
        await atc.WriteAsync(call.Serialize());
    }
}

一些消费者只是依赖于接听电话。
 var update = await _inputUpdateStatements.Take();

这个构造在测试中运行得很好,但是有一个方法我认为犯了一个巨大的错误。

这个方法旨在同时启动3个pro/con while (true) 循环,以运行整个客户端后端。

public async Task RunAsync()
{
   _isRunning = true;
   _progress.ProgressChanged += progress_ProgressChanged;
    await InitMTProto(_scheme).ConfigureAwait(false); // init smth...
    // various init stuf...     
    await atc.ConnectAsync().ConfigureAwait(false); // open connection async
    // IS IT WRONG?
    try
    {                                   
        await Task.WhenAny(SendStatementsAsync(_cts.Token),
                               ReadTcpClientAsync(_cts.Token),
                               ProcessUpdateAsync(_cts.Token, _progress)).ConfigureAwait(false);
    }
    catch (OperationCanceledException oce)
    {   

    }
    catch (Exception ex)
    {

    }
}

暂时忘记Android,想象任何UI(WinForm,WPF等)的OnCreate方法在UI上下文中调用RunAsync。
protected async override void OnCreate(Bundle bundle)
{
    // start RA 
    await client.RunAsync()
    // never gets here - BAD, but nonblock UI thread - good
    Debug.WriteLine("nevar");
}

所以,正如您所见,存在问题。在 RunAsync await 调用之后,我无法执行任何操作,因为它永远不会从 Task.WhenAny(...) 中返回。我需要在那里执行状态检查,但我需要启动此 pro/cons 方法,因为我的检查等待 ManualResetEvent:

if (!cde.Wait(15000))
{
    throw new TimeoutException("Init too long");
}

此外,我的检查也是异步的,并且运行得非常顺利 :)
public async Task<TLCombinatorInstance> PerformRpcCall(string combinatorName, params object[] pars)
{
    // wait for init on cde ...
    // prepare call ...

    // Produce
    ProduceOutput(call);

    // wait for answer
    return await _inputRpcAnswersStatements.Take();
}

我认为我应该使用另一种方法来启动这个无限循环,但是我已经一路使用了异步任务方法 - 所以我真的不知道该怎么做了。 请帮忙吗?


不要使用 await 来调用 RunAsync() 吗? - svick
@Blam 我已经有了自己的 IProducerConsumer 实现,它运行得非常好,我不需要另一个。 - xakpc
@svick 哦,你说不要。该死的夜晚 :) - xakpc
@svick 但它会产生什么结果?Task.WhenAny是否按预期启动异步任务?我在官方指南“基于任务的异步模式”中看到了一些Poll示例,但没有任何示例说明如何启动该Poll任务。 - xakpc
你自己的实现不能满足你的需求,这就是不考虑现有的生产者消费者集合的原因吗? - paparazzo
显示剩余4条评论
1个回答

3

好的,在大量阅读(没有找到任何信息)和 @svick 的建议后,我决定将这些方法作为单独的 Task.Run 调用而不需要 "await"。同时,我决定在线程池中运行它。

我的最终代码是:

try
{                                   
    /*await Task.WhenAny(SendStatementsAsync(_cts.Token), 
           ReadTcpClientAsync(_cts.Token),
           ProcessUpdateAsync(_cts.Token, _progress)).ConfigureAwait(false);*/
    Task.Run(() => SendStatementsAsync(_cts.Token)).ConfigureAwait(false);
    Task.Run(() => ReadTcpClientAsync(_cts.Token)).ConfigureAwait(false);
    Task.Run(() => ProcessUpdateAsync(_cts.Token, _progress)).ConfigureAwait(false);
    Trace.WriteLineIf(clientSwitch.TraceInfo, "Worker threads started", "[Client.RunAsync]");
}

一切都像预期的那样正常运作。我不确定在异常处理中会带来什么问题,因为我知道它们将会丢失。

当然,这样的调用会产生警告。

由于此调用未被等待,因此在调用完成之前,当前方法的执行将继续。考虑对调用结果应用“await”运算符。

可以通过以下方式轻松抑制这种警告。

// just save task into variable
var send = Task.Run(() => SendStatementsAsync(_cts.Token)).ConfigureAwait(false); 

此外,如果有人知道更好的解决方案,我将不胜感激。

这是一个非常古老的帖子,但我正面临类似的情况,只是想知道如果你有远见的话,你会做出不同的选择吗? - undefined
1
@tank104 是的,很可能。利用 Xamarin/Droid/iOS 提供的后台工作解决方案。 - undefined

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接