异步等待 Task<T> 完成并设置超时时间

528

我希望等待一个特定规则的Task<T> 执行完成: 如果它在X毫秒后还没有完成,我想向用户显示一条消息。 如果它在Y毫秒后仍未完成,我想自动请求取消

我可以使用Task.ContinueWith异步等待任务完成(即在完成任务时安排要执行的操作),但是无法指定超时时间。 我可以使用Task.Wait同步等待任务以设置超时时间,但会阻塞线程。 如何异步等待任务完成并设置超时时间呢?


3
没错,我很惊讶它没有提供超时功能。也许在.NET 5.0中会提供吧……当然我们可以将超时功能集成到任务本身,但那样不太好,这类功能应该是免费的。 - Aliostad
5
虽然你描述的双层超时仍需要逻辑,但.NET 4.5确实提供了一种简单的方法来创建基于超时的CancellationTokenSource。构造函数有两个重载,一个接受以毫秒为单位的整数延迟,另一个接受TimeSpan延迟。 - patridge
完整的简单库源代码在这里:http://stackoverflow.com/questions/11831844/unobservedtaskexception-being-throw-but-it-is-handled-by-a-taskscheduler-unobser - user1997529
有没有完整的源代码可用的最终解决方案?也许可以提供更复杂的示例,以便在每个线程中通知错误,并在 WaitAll 后显示摘要? - Kiquenet
1
除了@patridge建议的方法外,还可以使用CancellationTokenSource.CancelAfter(<时间段或毫秒数>)来实现。 - maicalal
我认为 Vijay Nirmal 提供的答案(链接:https://dev59.com/um855IYBdhLWcg3woVw_#68998339),展示了最新的 .NET 6 Task.WaitAsync API,应该被采纳。 - Theodor Zoulias
20个回答

7
另一种解决此问题的方法是使用响应式扩展(Reactive Extensions):
public static Task TimeoutAfter(this Task task, TimeSpan timeout, IScheduler scheduler)
{
        return task.ToObservable().Timeout(timeout, scheduler).ToTask();
}

在你的单元测试中使用以下代码进行测试,这对我有效:

TestScheduler scheduler = new TestScheduler();
Task task = Task.Run(() =>
                {
                    int i = 0;
                    while (i < 5)
                    {
                        Console.WriteLine(i);
                        i++;
                        Thread.Sleep(1000);
                    }
                })
                .TimeoutAfter(TimeSpan.FromSeconds(5), scheduler)
                .ContinueWith(t => { }, TaskContinuationOptions.OnlyOnFaulted);

scheduler.AdvanceBy(TimeSpan.FromSeconds(6).Ticks);

你可能需要以下命名空间:

using System.Threading.Tasks;
using System.Reactive.Subjects;
using System.Reactive.Linq;
using System.Reactive.Threading.Tasks;
using Microsoft.Reactive.Testing;
using System.Threading;
using System.Reactive.Concurrency;

5

出于好玩,我为任务创建了一个“OnTimeout”扩展。在超时的情况下,任务会执行所需的内联lambda Action()并返回true,否则返回false。

public static async Task<bool> OnTimeout<T>(this T t, Action<T> action, int waitms) where T : Task
{
    if (!(await Task.WhenAny(t, Task.Delay(waitms)) == t))
    {
        action(t);
        return true;
    } else {
        return false;
    }
}

OnTimeout扩展功能返回一个布尔值结果,可以像在此示例中调用UDP套接字异步操作时那样将其赋值给变量:

var t = UdpSocket.ReceiveAsync();

var timeout = await t.OnTimeout(task => {
    Console.WriteLine("No Response");
}, 5000);

'task'变量可以在timeout lambda中访问以进行更多处理。

使用接收对象的Action可能会激发出各种其他扩展设计。


5
@Kevan的答案可以使用响应式扩展进行泛化。
(Note: HTML标签已保留,仅做参考)
public static Task<T> TimeoutAfter<T>(this Task<T> task, TimeSpan timeout, IScheduler scheduler)
{
    return task.ToObservable().Timeout(timeout, scheduler).ToTask();
}

带有可选的调度程序:

public static Task<T> TimeoutAfter<T>(this Task<T> task, TimeSpan timeout, Scheduler scheduler = null)
{
    return scheduler is null 
       ? task.ToObservable().Timeout(timeout).ToTask() 
       : task.ToObservable().Timeout(timeout, scheduler).ToTask();
}

顺便提一下:当超时发生时,将抛出一个超时异常。

3
创建一个扩展程序,等待任务完成或延迟完成,以先到者为准。如果延迟获胜,则抛出异常。请保留 HTML 标签。
public static async Task<TResult> WithTimeout<TResult>(this Task<TResult> task, TimeSpan timeout)
{
    if (await Task.WhenAny(task, Task.Delay(timeout)) != task)
        throw new TimeoutException();
    return await task;
}

1

这个方法非常古老,但是现代有更好的解决方案。不确定需要哪个版本的c#/.NET,但这是我做的方式:


... Other method code not relevant to the question.

// a token source that will timeout at the specified interval, or if cancelled outside of this scope
using var timeoutTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(5));
using var linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(token, timeoutTokenSource.Token);

async Task<MessageResource> FetchAsync()
{
    try
    {
        return await MessageResource.FetchAsync(m.Sid);
    } catch (TaskCanceledException e)
    {
        if (timeoutTokenSource.IsCancellationRequested)
            throw new TimeoutException("Timeout", e);
        throw;
    }
}

return await Task.Run(FetchAsync, linkedTokenSource.Token);

CancellationTokenSource构造函数接受一个TimeSpan参数,该参数将导致令牌在经过该间隔后取消。然后,您可以在另一个对Task.Run的调用中包装您的异步(或同步)代码,并传递超时令牌。

这假设您正在传递取消令牌(变量token)。如果您没有单独取消任务的需要,您可以直接使用timeoutTokenSource。否则,您创建linkedTokenSource,如果超时发生,它将被取消,或者如果它被取消。

然后,我们只是捕获OperationCancelledException并检查哪个令牌引发了异常,如果超时导致引发此异常,则抛出TimeoutException。否则,我们重新抛出异常。

此外,我在这里使用局部函数,这是在C#7中引入的,但您可以轻松地使用lambda或实际函数来达到相同的效果。同样,c#8引入了更简单的语法以使用语句,但那些很容易重写。


1

在紧密的网络循环中,我觉得其他答案中的Task.Delay()任务和CancellationTokenSource有点过于繁琐。

虽然Joe Hoag's Crafting a Task.TimeoutAfter Method on MSDN blogs很有启发性,但出于与上述相同的原因,我有点不敢使用TimeoutException来进行流程控制,因为超时更多时候是预期的。

因此,我选择了这个方案,它还处理了博客中提到的优化:

public static async Task<bool> BeforeTimeout(this Task task, int millisecondsTimeout)
{
    if (task.IsCompleted) return true;
    if (millisecondsTimeout == 0) return false;

    if (millisecondsTimeout == Timeout.Infinite)
    {
        await Task.WhenAll(task);
        return true;
    }

    var tcs = new TaskCompletionSource<object>();

    using (var timer = new Timer(state => ((TaskCompletionSource<object>)state).TrySetCanceled(), tcs,
        millisecondsTimeout, Timeout.Infinite))
    {
        return await Task.WhenAny(task, tcs.Task) == task;
    }
}

一个例子用例如下所示:

var receivingTask = conn.ReceiveAsync(ct);

while (!await receivingTask.BeforeTimeout(keepAliveMilliseconds))
{
    // Send keep-alive
}

// Read and do something with data
var data = await receivingTask;

1
安德鲁·阿诺特的答案有几个变体:

  1. If you want to wait for an existing task and find out whether it completed or timed out, but don't want to cancel it if the timeout occurs:

    public static async Task<bool> TimedOutAsync(this Task task, int timeoutMilliseconds)
    {
        if (timeoutMilliseconds < 0 || (timeoutMilliseconds > 0 && timeoutMilliseconds < 100)) { throw new ArgumentOutOfRangeException(); }
    
        if (timeoutMilliseconds == 0) {
            return !task.IsCompleted; // timed out if not completed
        }
        var cts = new CancellationTokenSource();
        if (await Task.WhenAny( task, Task.Delay(timeoutMilliseconds, cts.Token)) == task) {
            cts.Cancel(); // task completed, get rid of timer
            await task; // test for exceptions or task cancellation
            return false; // did not timeout
        } else {
            return true; // did timeout
        }
    }
    
  2. If you want to start a work task and cancel the work if the timeout occurs:

    public static async Task<T> CancelAfterAsync<T>( this Func<CancellationToken,Task<T>> actionAsync, int timeoutMilliseconds)
    {
        if (timeoutMilliseconds < 0 || (timeoutMilliseconds > 0 && timeoutMilliseconds < 100)) { throw new ArgumentOutOfRangeException(); }
    
        var taskCts = new CancellationTokenSource();
        var timerCts = new CancellationTokenSource();
        Task<T> task = actionAsync(taskCts.Token);
        if (await Task.WhenAny(task, Task.Delay(timeoutMilliseconds, timerCts.Token)) == task) {
            timerCts.Cancel(); // task completed, get rid of timer
        } else {
            taskCts.Cancel(); // timer completed, get rid of task
        }
        return await task; // test for exceptions or task cancellation
    }
    
  3. If you have a task already created that you want to cancel if a timeout occurs:

    public static async Task<T> CancelAfterAsync<T>(this Task<T> task, int timeoutMilliseconds, CancellationTokenSource taskCts)
    {
        if (timeoutMilliseconds < 0 || (timeoutMilliseconds > 0 && timeoutMilliseconds < 100)) { throw new ArgumentOutOfRangeException(); }
    
        var timerCts = new CancellationTokenSource();
        if (await Task.WhenAny(task, Task.Delay(timeoutMilliseconds, timerCts.Token)) == task) {
            timerCts.Cancel(); // task completed, get rid of timer
        } else {
            taskCts.Cancel(); // timer completed, get rid of task
        }
        return await task; // test for exceptions or task cancellation
    }
    
另一个评论,这些版本将在超时未发生时取消计时器,因此多次调用不会导致计时器堆积。
sjb

0
如果您使用BlockingCollection来调度任务,生产者可以运行潜在的长时间运行的任务,而消费者可以使用TryTake方法,该方法具有超时和取消令牌。

我需要写一些内容(不想在这里放置专有代码),但场景是这样的。生产者将是执行可能超时的方法的代码,并在完成后将结果放入队列中。消费者将使用超时调用trytake(),并在超时时接收令牌。生产者和消费者都将是后台任务,并在需要时使用UI线程调度程序向用户显示消息。 - kns98

0
我正在整合其他答案的想法,包括另一个线程上的这个答案,并将其转化为Try-style扩展方法。如果您想要一个扩展方法,但又想避免超时异常,这样做会有好处。
public static async Task<bool> TryWithTimeoutAfter<TResult>(this Task<TResult> task,
    TimeSpan timeout, Action<TResult> successor)
{

    using var timeoutCancellationTokenSource = new CancellationTokenSource();
    var completedTask = await Task.WhenAny(task, Task.Delay(timeout, timeoutCancellationTokenSource.Token))
                                  .ConfigureAwait(continueOnCapturedContext: false);

    if (completedTask == task)
    {
        timeoutCancellationTokenSource.Cancel();

        // propagate exception rather than AggregateException, if calling task.Result.
        var result = await task.ConfigureAwait(continueOnCapturedContext: false);
        successor(result);
        return true;
    }
    else return false;        
}     

async Task Example(Task<string> task)
{
    string result = null;
    if (await task.TryWithTimeoutAfter(TimeSpan.FromSeconds(1), r => result = r))
    {
        Console.WriteLine(result);
    }
}    

-1
如果有人在寻找类似的内容(12年后原帖的问题)...另一个选项是在另一个Task.Run()中使用Task.Wait(timeout)。这是如果你想以某种方式避免使用Task.WaitAny()或甚至await调用。或者,对于我的情况,只是为了与我正在处理的其他.cs文件保持一致性。代码示例如下:
        int timeout = 5000;
        var actualTask = new Task(() =>
        {
            // Do your stuff here
        });

        Task.Run(() =>
        {
            actualTask.Start();
            if (!actualTask.Wait(timeout))
            {
                return false;
                // or throw new TimeoutException("Operation timed out!");
            }

            return true;
        }).ContinueWith((timedTaskResult) =>
        {
            if (!timedTaskResult.Result)
            {
                // tell user it timed out!
            }

            if (timedTaskResult.IsFaulted)
            {
                // Tell the user about the error/s via the timedTaskResult.Exception
            }
        });

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接