如何使Dispose等待所有异步方法?

11

我有一个带有异步方法的可处理类。

class Gateway : IDisposable {
  public Gateway() {}
  public void Dispose() {}

  public async Task<Data> Request1 () {...}
  public async Task<Data> Request2 () {...}
  public async Task<Data> Request3 () {...}
}

我希望Dispose能等待所有正在运行的请求完成。

因此,我需要追踪所有正在运行的任务,或者使用 AsyncEx 中的 AsyncLock,或者其他方法?

更新

我注意到有人担心阻塞 Dispose。那么我们可以创建 Task WaitForCompletionAsync()Task CancelAllAsync() 方法。


1
我认为你做不到。你必须依赖调用者在using语句中正确地await你的async方法。有关详细信息,请参阅此博客文章 - Simply Ged
5
这是一个本不应该出现的问题。但是这里没有足够的内容来构思出更好的解决方案。 - H H
实际上,我认为您需要重新考虑您的设计,或者至少向我们展示您如何使用它。 - TheGeneral
1
好文章:https://blog.stephencleary.com/2013/03/async-oop-6-disposal.html - Denis535
4个回答

14

暂时,您需要添加一个CloseAsync方法,供用户调用。

一旦释放C# 8.0,您就可以依赖于IAsyncDisposable接口及其语言支持:

await using (var asyncDisposable = GetAsyncDisposable())
{
    // ...
} // await asyncDisposable.DisposeAsync()

我不确定ServiceProvider是否支持IAsyncDisposable。无论如何,这是另一个问题。 - Denis535
那是另一个问题。它还需要支持异步创建依赖项。 - Paulo Morgado
我现在正在一个新项目中采用这个。谢谢! - Machado
如果 GetAsyncDisposable 返回 null,会发生什么? - jjxtra
当任何可等待的返回方法返回“null”时,会发生相同的事情。将抛出System.NullReferenceException异常。 - Paulo Morgado

3
以下是可重复使用的异步处理支持解决方案。由于.NET Core 3.0尚未发布,我将为当前C#版本(7.3)和测试版(8.0)提供代码。
一旦在对象上调用, 它不会阻塞并将确保一旦所有任务完成就进行处理。
源代码(当前C#版本,不包括IAsyncDisposable):
相关使用:
using System;
using System.Collections.Generic;
using System.Threading.Tasks;

所有跟踪任务完成后可以处理的接口:
public interface ITrackingDisposable : IDisposable
{
    //The implementation of the actual disposings
    Task FinishDisposeAsync();
}

跟踪所有运行的任务并在适当的时机调用延迟处理的清除器:
public class TrackingDisposer : IDisposable
{
    private readonly LinkedList<Task> _tasks = new LinkedList<Task>();

    private readonly ITrackingDisposable _target;

    public bool IsDisposed { get; private set; } = false;

    //The supported class must implement ITrackingDisposable
    public TrackingDisposer(ITrackingDisposable target)
    => _target = target ?? throw new ArgumentNullException();

    //Add a task to the tracking list, returns false if disposed
    //Without return value
    public bool Track(Func<Task> func, out Task result)
    {
        lock (_tasks)
        {
            if (IsDisposed)
            {
                result = null;
                return false;
            }

            var task = func();
            var node = _tasks.AddFirst(task);

            async Task ending()
            {
                await task;
                var dispose = false;
                lock (_tasks)
                {
                    _tasks.Remove(node);
                    dispose = IsDisposed && _tasks.Count == 0;
                }
                if (dispose)
                {
                    await _target.FinishDisposeAsync();
                }
            }

            result = ending();
        }
        return true;
    }

    //With return value
    public bool Track<TResult>(Func<Task<TResult>> func, out Task<TResult> result)
    {
        lock (_tasks)
        {
            if (IsDisposed)
            {
                result = null;
                return false;
            }

            var task = func();
            var node = _tasks.AddFirst(task);

            async Task<TResult> ending()
            {
                var result = await task;
                var dispose = false;
                lock (_tasks)
                {
                    _tasks.Remove(node);
                    dispose = IsDisposed && _tasks.Count == 0;
                }
                if (dispose)
                {
                    await _target.FinishDisposeAsync();
                }
                return result;
            }

            result = ending();
        }
        return true;
    }

    //The entry of applying for dispose
    public void Dispose()
    {
        var dispose = false;

        lock (_tasks)
        {
            if (IsDisposed)
            {
                return;
            }

            IsDisposed = true;
            dispose = _tasks.Count == 0;
        }

        if (dispose)
        {
            _target.FinishDisposeAsync();
        }
    }
}

一个简化实现的基类:
public abstract class TrackingDisposable : ITrackingDisposable
{
    private readonly TrackingDisposer _disposer;

    public TrackingDisposable()
    => _disposer = new TrackingDisposer(this);

    protected virtual void FinishDispose() { }

    protected virtual Task FinishDisposeAsync()
    => Task.CompletedTask;

    Task ITrackingDisposable.FinishDisposeAsync()
    {
        FinishDispose();
        return FinishDisposeAsync();
    }

    public void Dispose()
    => _disposer.Dispose();

    protected Task Track(Func<Task> func)
    => _disposer.Track(func, out var result)
        ? result
        : throw new ObjectDisposedException(nameof(TrackingDisposable));

    protected Task<TResult> Track<TResult>(Func<Task<TResult>> func)
    => _disposer.Track(func, out var result)
        ? result
        : throw new ObjectDisposedException(nameof(TrackingDisposable));
}

演示和测试输出

测试类:

internal sealed class TestDisposingObject : TrackingDisposable
{
    public Task Job0Async() => Track(async () =>
    {
        await Task.Delay(200);
        Console.WriteLine("Job0 done.");
    });

    public Task<string> Job1Async(int ms) => Track(async () =>
    {
        await Task.Delay(ms);
        return "Job1 done.";
    });

    protected override void FinishDispose()
    => Console.WriteLine("Disposed.");
}

主要内容:

internal static class Program
{
    private static async Task Main()
    {
        var result0 = default(Task);
        var result1 = default(Task);
        var obj = new TestDisposingObject();
        result0 = obj.Job0Async();
        result1 = obj.Job1Async(100).ContinueWith(r => Console.WriteLine(r.Result));
        obj.Dispose();
        Console.WriteLine("Waiting For jobs done...");
        await Task.WhenAll(result0, result1);
    }
}

输出:

Waiting For jobs done...
Job1 done.
Job0 done.
Disposed.

此外,C# 8.0(带有 IAsyncDisposable

将类型定义替换为以下内容:

public interface ITrackingDisposable : IDisposable, IAsyncDisposable
{
    Task FinishDisposeAsync();
}

public class TrackingDisposer : IDisposable, IAsyncDisposable
{
    private readonly LinkedList<Task> _tasks = new LinkedList<Task>();

    private readonly ITrackingDisposable _target;

    private readonly TaskCompletionSource<object> _disposing = new TaskCompletionSource<object>();

    public bool IsDisposed { get; private set; } = false;

    public TrackingDisposer(ITrackingDisposable target)
    => _target = target ?? throw new ArgumentNullException();

    public bool Track(Func<Task> func, out Task result)
    {
        lock (_tasks)
        {
            if (IsDisposed)
            {
                result = null;
                return false;
            }

            var task = func();
            var node = _tasks.AddFirst(task);

            async Task ending()
            {
                await task;
                var dispose = false;
                lock (_tasks)
                {
                    _tasks.Remove(node);
                    dispose = IsDisposed && _tasks.Count == 0;
                }
                if (dispose)
                {
                    await _target.FinishDisposeAsync();
                    _disposing.SetResult(null);
                }
            }

            result = ending();
        }
        return true;
    }

    public bool Track<TResult>(Func<Task<TResult>> func, out Task<TResult> result)
    {
        lock (_tasks)
        {
            if (IsDisposed)
            {
                result = null;
                return false;
            }

            var task = func();
            var node = _tasks.AddFirst(task);

            async Task<TResult> ending()
            {
                var result = await task;
                var dispose = false;
                lock (_tasks)
                {
                    _tasks.Remove(node);
                    dispose = IsDisposed && _tasks.Count == 0;
                }
                if (dispose)
                {
                    await _target.FinishDisposeAsync();
                    _disposing.SetResult(null);
                }
                return result;
            }

            result = ending();
        }
        return true;
    }

    public void Dispose()
    {
        var dispose = false;

        lock (_tasks)
        {
            if (IsDisposed)
            {
                return;
            }

            IsDisposed = true;
            dispose = _tasks.Count == 0;
        }

        if (dispose)
        {
            _target.FinishDisposeAsync();
            _disposing.SetResult(null);
        }
    }

    public ValueTask DisposeAsync()
    {
        Dispose();
        return new ValueTask(_disposing.Task);
    }
}

public abstract class TrackingDisposable : ITrackingDisposable
{
    private readonly TrackingDisposer _disposer;

    public TrackingDisposable()
    => _disposer = new TrackingDisposer(this);

    protected virtual void FinishDispose() { }

    protected virtual Task FinishDisposeAsync()
    => Task.CompletedTask;

    Task ITrackingDisposable.FinishDisposeAsync()
    {
        FinishDispose();
        return FinishDisposeAsync();
    }

    public void Dispose()
    => _disposer.Dispose();

    public ValueTask DisposeAsync() => _disposer.DisposeAsync();

    protected Task Track(Func<Task> func)
    => _disposer.Track(func, out var result)
        ? result
        : throw new ObjectDisposedException(nameof(TrackingDisposable));

    protected Task<TResult> Track<TResult>(Func<Task<TResult>> func)
    => _disposer.Track(func, out var result)
        ? result
        : throw new ObjectDisposedException(nameof(TrackingDisposable));
}

测试主要功能:

internal static class Program
{
    private static async Task Main()
    {
        await using var obj = new TestDisposingObject();
        _ = obj.Job0Async();
        _ = obj.Job1Async(100).ContinueWith(r => Console.WriteLine(r.Result));
        Console.WriteLine("Waiting For jobs done...");
    }
}

1
问题在于目前没有Dispose()的异步版本。因此,当您调用Dispose()或者using块结束时,您需要问自己——您希望发生什么?换句话说,这是什么要求?
您可以要求Dispose等待所有未完成的任务,然后执行其工作。但Dispose不能使用await(它不是异步的)。它能做的最好的事情就是调用Result来强制任务完成,但这将是一个阻塞调用,如果任何异步任务正在等待其他东西,它很容易死锁。
相反,我建议以下要求:当调用者调用Dispose()时,调用将标记网关以进行处理,然后立即返回,确信处理机制将在最后一个任务完成时自动激活。
如果该要求是足够的,那么它是可能的,但有点混乱。以下是如何实现的:
  1. 每次调用一个方法(如Request),都要在返回的任务中“包装”另一个任务,其中包括检查调用者是否请求处理网关的释放。

  2. 如果已经请求释放,则立即进行释放并标记任务为完成。因此,当调用者等待任务时,它将强制释放。

这是我的实现。我告诉你它很丑陋。

class Gateway : IDisposable 
{
    protected readonly HttpClient _client = new HttpClient();  //an inner class that must be disposed when Gateway disposes
    protected bool _disposalRequested = false;
    protected bool _disposalCompleted = false;
    protected int _tasksRunning = 0;


    public void Dispose()
    {
        Console.WriteLine("Dispose() called.");
        _disposalRequested = true;  
        if (_tasksRunning == 0)
        {
            Console.WriteLine("No running tasks, so disposing immediately.");
            DisposeInternal();
        }
        else
        {
            Console.WriteLine("There are running tasks, so disposal shall be deferred.");
        }
    }

    protected void DisposeInternal()
    {
        if (!_disposalCompleted)
        {
            Console.WriteLine("Disposing");
            _client.Dispose();
            _disposalCompleted = true;
        }
    }

    protected async Task<T> AddDisposeWrapper<T>(Func<Task<T>> func)
    {
        if (_disposalRequested) throw new ObjectDisposedException("Disposal has already been requested. No new requests can be handled at this point.");

        _tasksRunning++;
        var result = await func();
        _tasksRunning--;
        await DisposalCheck();
        return result;
    }

    protected async Task DisposalCheck()
    {
        if (_disposalRequested) DisposeInternal();
    }

    public Task<Data> Request1()
    {
        return AddDisposeWrapper
        (
            Request1Internal
        );
    }

    public Task<Data> Request2()
    {
        return AddDisposeWrapper
        (
            Request2Internal
        );
    }

    protected async Task<Data> Request1Internal()
    {
        Console.WriteLine("Performing Request1 (slow)");
        await Task.Delay(3000);
        Console.WriteLine("Request1 has finished. Returning new Data.");
        return new Data();
    }

    protected async Task<Data> Request2Internal()
    {
        Console.WriteLine("Performing Request2 (fast)");
        await Task.Delay(1);
        Console.WriteLine("Request2 has finished. Returning new Data.");
        return new Data();
    }
}

Here's some test code:

public class Program
{
    public static async Task Test1()
    {
        Task<Data> task;
        using (var gateway = new Gateway())
        {
            task = gateway.Request1();
            await Task.Delay(1000);
        }
        var data = await task;
        Console.WriteLine("Test 1 is complete.");
    }

    public static async Task Test2()
    {
        Task<Data> task;
        using (var gateway = new Gateway())
        {
            task = gateway.Request2();
            await Task.Delay(1000);
        }
        var data = await task;
        Console.WriteLine("Test 2 is complete.");
    }

    public static async Task MainAsync()
    {
        await Test1();
        await Test2();
    }

    public static void Main()
    {
        MainAsync().GetAwaiter().GetResult();
        Console.WriteLine("Run completed at {0:yyyy-MM-dd HH:mm:ss}", DateTime.Now);
    }
}

这是输出内容:
Performing Request1 (slow)
Dispose() called.
There are running tasks, so disposal shall be deferred.
Request1 has finished. Returning new Data.
Disposing
Test 1 is complete.
Performing Request2 (fast)
Request2 has finished. Returning new Data.
Dispose() called.
No running tasks, so disposing immediately.
Disposing
Test 2 is complete.
Run completed at 2019-05-15 00:34:46

如果您想尝试,这是我的Fiddle链接: Link

我并不是真的推荐这样做(如果要处理某些内容,您应该更好地掌控它的生命周期),但为了让您开心,编写这段代码还是很有趣的。

注意:由于使用了引用计数,需要额外的工作才能使此解决方案线程安全或使其对网关请求方法抛出异常的情况具有弹性。


0

释放和等待完成是两回事。所以,当任务仍在运行时,我宁愿抛出异常。

我使用了 Nito.AsyncEx.AsyncConditionVariable 来编写示例。虽然我没有测试过,但我认为它应该可以工作。只需使用 Completion.WaitAsync()

此外,我推荐阅读这篇文章:https://blog.stephencleary.com/2013/03/async-oop-6-disposal.html

class Gateway : IDisposable {

  private int runningTaskCount;
  public AsyncConditionVariable Completion { get; } = new AsyncConditionVariable( new AsyncLock() );

  public Gateway() {
  }
  public void Dispose() {
    if (runningTaskCount != 0) throw new InvalidOperationException( "You can not call this method when tasks are running" );
  }

  public async Task<Data> Request1 () {
    BeginTask();
    ...
    EndTask();
  }

  private void BeginTask() {
    Interlocked.Increment( ref runningTaskCount );
  }
  private void EndTask() {
    var result = Interlocked.Decrement( ref runningTaskCount );
    if (result == 0) Completion.NotifyAll();
  }

}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接