长时间运行的同步接口实现,返回一个任务。

10

我以这个问题为基础提出我的问题。


TL;DR: 如果不应该将同步代码包装在异步包装器中,那么如何处理长时间运行、线程阻塞的方法,这些方法实现了一个期望异步实现的接口方法?
假设我有一个应用程序,持续运行以处理工作队列。它是一个服务器端应用程序(大部分情况下无人值守运行),但它有一个UI客户端,可以根据业务流程要求提供更细粒度的对应用程序行为的控制:启动、停止、在执行过程中调整参数、获取进度等。
业务逻辑层中注入了服务依赖项。BLL为这些服务定义了一组接口。
我想保持客户端的响应性:允许UI客户端与正在运行的进程交互,同时我也希望线程能够高效利用,因为该进程需要可扩展性:根据队列中的工作可能会有任意数量的异步数据库或磁盘操作。因此,我使用async/await一路走来
为此,我在服务接口中有一些明显设计鼓励async/await和支持取消的方法,因为它们接受CancellationToken,以"Async"命名,并返回Task。
我有一个数据存储库服务,执行CRUD操作以持久化我的领域实体。假设在目前,我使用的API并不原生支持异步。将来,我可能会用支持异步的API替换它,但目前,数据存储库服务执行大部分操作都是同步的,其中许多是长时间运行的操作(因为API在数据库IO上阻塞)。
现在,我明白返回Task的方法可以同步运行。我的服务类中实现BLL接口的方法将像我解释的那样同步运行,但使用者(我的BLL、客户端等)将假设它们要么1:异步运行,要么2:同步运行很短的时间。方法不应该在async调用Task.Run内包装同步代码
我知道我可以在接口中定义同步和异步方法。在这种情况下,我不想这样做,因为我正在尝试使用"一路异步"语义,并且因为我不是编写供客户使用的API;如上所述,我不想以后从使用同步版本更改为使用异步版本来更改我的BLL代码。
以下是数据服务接口:
public interface IDataRepository
{
    Task<IReadOnlyCollection<Widget>> 
        GetAllWidgetsAsync(CancellationToken cancellationToken);
}

并且它的实现方式是:

public sealed class DataRepository : IDataRepository
{
    public Task<IReadOnlyCollection<Widget>> GetAllWidgetsAsync(
        CancellationToken cancellationToken)
    {
        /******* The idea is that this will 
        /******* all be replaced hopefully soon by an ORM tool. */

        var ret = new List<Widget>();

        // use synchronous API to load records from DB
        var ds = Api.GetSqlServerDataSet(
            "SELECT ID, Name, Description FROM Widgets", DataResources.ConnectionString);

        foreach (DataRow row in ds.Tables[0].Rows)
        {
            cancellationToken.ThrowIfCancellationRequested();
            // build a widget for the row, add to return.  
        }

        // simulate long-running CPU-bound operation.
        DateTime start = DateTime.Now;
        while (DateTime.Now.Subtract(start).TotalSeconds < 10) { }

        return Task.FromResult((IReadOnlyCollection<Widget>) ret.AsReadOnly());
    }
}

BLL:

public sealed class WorkRunner
{
    private readonly IDataRepository _dataRepository;
    public WorkRunner(IDataRepository dataRepository) => _dataRepository = dataRepository;

    public async Task RunAsync(CancellationToken cancellationToken)
    {
        var allWidgets = await _dataRepository
            .GetAllWidgetsAsync(cancellationToken).ConfigureAwait(false);

        // I'm using Task.Run here because I want this on 
        // another thread even if the above runs synchronously.
        await Task.Run(async () =>
        {
            while (true)
            {
                cancellationToken.ThrowIfCancellationRequested();
                foreach (var widget in allWidgets) { /* do something */ }
                await Task.Delay(2000, cancellationToken); // wait some arbitrary time.
            }
        }).ConfigureAwait(false);
    }
}

展示和展示逻辑:

private async void HandleStartStopButtonClick(object sender, EventArgs e)
{
    if (!_isRunning)
    {
        await DoStart();
    }
    else
    {
        DoStop();
    }
}

private async Task DoStart()
{
    _isRunning = true;          
    var runner = new WorkRunner(_dependencyContainer.Resolve<IDataRepository>());
    _cancellationTokenSource = new CancellationTokenSource();

    try
    {
        _startStopButton.Text = "Stop";
        _resultsTextBox.Clear();
        await runner.RunAsync(_cancellationTokenSource.Token);
        // set results info in UI (invoking on UI thread).
    }
    catch (OperationCanceledException)
    {
        _resultsTextBox.Text = "Canceled early.";
    }
    catch (Exception ex)
    {
        _resultsTextBox.Text = ex.ToString();
    }
    finally
    {
        _startStopButton.Text = "Start";
    }
}

private void DoStop()
{
    _cancellationTokenSource.Cancel();
    _isRunning = false;
}

所以问题是:如何处理长时间运行的、阻塞性方法,这些方法实现了一个期望异步实现的接口方法?这是否是打破“同步代码没有异步包装器”的规则的一个例子?

9
很可能是因为https://stackoverflow.com/help/mcve,第一个词是minimal...并且需要阅读外部博客。minimal很重要,这样回答问题的人可以回答,其他搜索问题的人也可以确定这是否与他们的问题相关。 - user6656930
8
这个问题在 meta 上进行了讨论。 - BDL
3
@ChadNedzlek -- [mcve] 中的“minimal”是指提供的示例代码。至于为我的问题提供“minimal”版本,我在两个地方清楚地指出了问题的最简形式,其中一个就在顶部(TL;DR)。 - rory.ap
3
第一个词可能很少,但第二个词是完整的……故事的寓意是——不要挑拣字眼,要将整个上下文考虑进去。 - RyanfaeScotland
4
"长时间运行的CPU绑定操作(因为API在数据库IO上阻塞)" - 这句话非常令人困惑...被阻塞在某些事情上的代码不应该消耗任何CPU,因此不应被称为“CPU绑定”(除非使用忙等待...) - Alexei Levenkov
显示剩余2条评论
1个回答

10
您没有为同步方法提供异步包装器。您不是外部库的作者,而是客户端。作为客户端,您正在将库API适配到您的服务接口中。
不建议使用异步包装器来处理同步方法的主要原因如下(总结自引用问题中的MSDN文章):
1. 确保客户端了解任何同步库函数的真实本质; 2. 让客户端控制如何调用该函数(异步或同步); 3. 避免通过拥有两个版本的每个函数来增加库的可扩展性。
关于您的服务接口,通过仅定义异步方法,您选择异步调用库操作,不管如何。您实际上在说:我已经做出了我的选择(2),而不考虑(1)。您已经给出了一个合理的理由-从长远来看,您知道您的同步库API将被替换。
此外,即使您的外部库API函数是同步的,它们也不会长时间运行CPU绑定。正如您所说,它们阻塞在IO上。它们实际上是IO绑定的。它们只是等待IO而不释放线程。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接