我希望等待一个特定规则的Task<T> 执行完成: 如果它在X毫秒后还没有完成,我想向用户显示一条消息。 如果它在Y毫秒后仍未完成,我想自动请求取消。
我可以使用Task.ContinueWith异步等待任务完成(即在完成任务时安排要执行的操作),但是无法指定超时时间。 我可以使用Task.Wait同步等待任务以设置超时时间,但会阻塞线程。 如何异步等待任务完成并设置超时时间呢?
我希望等待一个特定规则的Task<T> 执行完成: 如果它在X毫秒后还没有完成,我想向用户显示一条消息。 如果它在Y毫秒后仍未完成,我想自动请求取消。
我可以使用Task.ContinueWith异步等待任务完成(即在完成任务时安排要执行的操作),但是无法指定超时时间。 我可以使用Task.Wait同步等待任务以设置超时时间,但会阻塞线程。 如何异步等待任务完成并设置超时时间呢?
public static Task TimeoutAfter(this Task task, TimeSpan timeout, IScheduler scheduler)
{
return task.ToObservable().Timeout(timeout, scheduler).ToTask();
}
在你的单元测试中使用以下代码进行测试,这对我有效:
TestScheduler scheduler = new TestScheduler();
Task task = Task.Run(() =>
{
int i = 0;
while (i < 5)
{
Console.WriteLine(i);
i++;
Thread.Sleep(1000);
}
})
.TimeoutAfter(TimeSpan.FromSeconds(5), scheduler)
.ContinueWith(t => { }, TaskContinuationOptions.OnlyOnFaulted);
scheduler.AdvanceBy(TimeSpan.FromSeconds(6).Ticks);
using System.Threading.Tasks;
using System.Reactive.Subjects;
using System.Reactive.Linq;
using System.Reactive.Threading.Tasks;
using Microsoft.Reactive.Testing;
using System.Threading;
using System.Reactive.Concurrency;
出于好玩,我为任务创建了一个“OnTimeout”扩展。在超时的情况下,任务会执行所需的内联lambda Action()并返回true,否则返回false。
public static async Task<bool> OnTimeout<T>(this T t, Action<T> action, int waitms) where T : Task
{
if (!(await Task.WhenAny(t, Task.Delay(waitms)) == t))
{
action(t);
return true;
} else {
return false;
}
}
OnTimeout扩展功能返回一个布尔值结果,可以像在此示例中调用UDP套接字异步操作时那样将其赋值给变量:
var t = UdpSocket.ReceiveAsync();
var timeout = await t.OnTimeout(task => {
Console.WriteLine("No Response");
}, 5000);
'task'变量可以在timeout lambda中访问以进行更多处理。
使用接收对象的Action可能会激发出各种其他扩展设计。
public static Task<T> TimeoutAfter<T>(this Task<T> task, TimeSpan timeout, IScheduler scheduler)
{
return task.ToObservable().Timeout(timeout, scheduler).ToTask();
}
带有可选的调度程序:
public static Task<T> TimeoutAfter<T>(this Task<T> task, TimeSpan timeout, Scheduler scheduler = null)
{
return scheduler is null
? task.ToObservable().Timeout(timeout).ToTask()
: task.ToObservable().Timeout(timeout, scheduler).ToTask();
}
public static async Task<TResult> WithTimeout<TResult>(this Task<TResult> task, TimeSpan timeout)
{
if (await Task.WhenAny(task, Task.Delay(timeout)) != task)
throw new TimeoutException();
return await task;
}
这个方法非常古老,但是现代有更好的解决方案。不确定需要哪个版本的c#/.NET,但这是我做的方式:
... Other method code not relevant to the question.
// a token source that will timeout at the specified interval, or if cancelled outside of this scope
using var timeoutTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(5));
using var linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(token, timeoutTokenSource.Token);
async Task<MessageResource> FetchAsync()
{
try
{
return await MessageResource.FetchAsync(m.Sid);
} catch (TaskCanceledException e)
{
if (timeoutTokenSource.IsCancellationRequested)
throw new TimeoutException("Timeout", e);
throw;
}
}
return await Task.Run(FetchAsync, linkedTokenSource.Token);
CancellationTokenSource
构造函数接受一个TimeSpan
参数,该参数将导致令牌在经过该间隔后取消。然后,您可以在另一个对Task.Run
的调用中包装您的异步(或同步)代码,并传递超时令牌。
这假设您正在传递取消令牌(变量token
)。如果您没有单独取消任务的需要,您可以直接使用timeoutTokenSource
。否则,您创建linkedTokenSource
,如果超时发生,它将被取消,或者如果它被取消。
然后,我们只是捕获OperationCancelledException
并检查哪个令牌引发了异常,如果超时导致引发此异常,则抛出TimeoutException
。否则,我们重新抛出异常。
此外,我在这里使用局部函数,这是在C#7中引入的,但您可以轻松地使用lambda或实际函数来达到相同的效果。同样,c#8引入了更简单的语法以使用语句,但那些很容易重写。
在紧密的网络循环中,我觉得其他答案中的Task.Delay()
任务和CancellationTokenSource
有点过于繁琐。
虽然Joe Hoag's Crafting a Task.TimeoutAfter Method on MSDN blogs很有启发性,但出于与上述相同的原因,我有点不敢使用TimeoutException
来进行流程控制,因为超时更多时候是预期的。
因此,我选择了这个方案,它还处理了博客中提到的优化:
public static async Task<bool> BeforeTimeout(this Task task, int millisecondsTimeout)
{
if (task.IsCompleted) return true;
if (millisecondsTimeout == 0) return false;
if (millisecondsTimeout == Timeout.Infinite)
{
await Task.WhenAll(task);
return true;
}
var tcs = new TaskCompletionSource<object>();
using (var timer = new Timer(state => ((TaskCompletionSource<object>)state).TrySetCanceled(), tcs,
millisecondsTimeout, Timeout.Infinite))
{
return await Task.WhenAny(task, tcs.Task) == task;
}
}
var receivingTask = conn.ReceiveAsync(ct);
while (!await receivingTask.BeforeTimeout(keepAliveMilliseconds))
{
// Send keep-alive
}
// Read and do something with data
var data = await receivingTask;
If you want to wait for an existing task and find out whether it completed or timed out, but don't want to cancel it if the timeout occurs:
public static async Task<bool> TimedOutAsync(this Task task, int timeoutMilliseconds)
{
if (timeoutMilliseconds < 0 || (timeoutMilliseconds > 0 && timeoutMilliseconds < 100)) { throw new ArgumentOutOfRangeException(); }
if (timeoutMilliseconds == 0) {
return !task.IsCompleted; // timed out if not completed
}
var cts = new CancellationTokenSource();
if (await Task.WhenAny( task, Task.Delay(timeoutMilliseconds, cts.Token)) == task) {
cts.Cancel(); // task completed, get rid of timer
await task; // test for exceptions or task cancellation
return false; // did not timeout
} else {
return true; // did timeout
}
}
If you want to start a work task and cancel the work if the timeout occurs:
public static async Task<T> CancelAfterAsync<T>( this Func<CancellationToken,Task<T>> actionAsync, int timeoutMilliseconds)
{
if (timeoutMilliseconds < 0 || (timeoutMilliseconds > 0 && timeoutMilliseconds < 100)) { throw new ArgumentOutOfRangeException(); }
var taskCts = new CancellationTokenSource();
var timerCts = new CancellationTokenSource();
Task<T> task = actionAsync(taskCts.Token);
if (await Task.WhenAny(task, Task.Delay(timeoutMilliseconds, timerCts.Token)) == task) {
timerCts.Cancel(); // task completed, get rid of timer
} else {
taskCts.Cancel(); // timer completed, get rid of task
}
return await task; // test for exceptions or task cancellation
}
If you have a task already created that you want to cancel if a timeout occurs:
public static async Task<T> CancelAfterAsync<T>(this Task<T> task, int timeoutMilliseconds, CancellationTokenSource taskCts)
{
if (timeoutMilliseconds < 0 || (timeoutMilliseconds > 0 && timeoutMilliseconds < 100)) { throw new ArgumentOutOfRangeException(); }
var timerCts = new CancellationTokenSource();
if (await Task.WhenAny(task, Task.Delay(timeoutMilliseconds, timerCts.Token)) == task) {
timerCts.Cancel(); // task completed, get rid of timer
} else {
taskCts.Cancel(); // timer completed, get rid of task
}
return await task; // test for exceptions or task cancellation
}
public static async Task<bool> TryWithTimeoutAfter<TResult>(this Task<TResult> task,
TimeSpan timeout, Action<TResult> successor)
{
using var timeoutCancellationTokenSource = new CancellationTokenSource();
var completedTask = await Task.WhenAny(task, Task.Delay(timeout, timeoutCancellationTokenSource.Token))
.ConfigureAwait(continueOnCapturedContext: false);
if (completedTask == task)
{
timeoutCancellationTokenSource.Cancel();
// propagate exception rather than AggregateException, if calling task.Result.
var result = await task.ConfigureAwait(continueOnCapturedContext: false);
successor(result);
return true;
}
else return false;
}
async Task Example(Task<string> task)
{
string result = null;
if (await task.TryWithTimeoutAfter(TimeSpan.FromSeconds(1), r => result = r))
{
Console.WriteLine(result);
}
}
int timeout = 5000;
var actualTask = new Task(() =>
{
// Do your stuff here
});
Task.Run(() =>
{
actualTask.Start();
if (!actualTask.Wait(timeout))
{
return false;
// or throw new TimeoutException("Operation timed out!");
}
return true;
}).ContinueWith((timedTaskResult) =>
{
if (!timedTaskResult.Result)
{
// tell user it timed out!
}
if (timedTaskResult.IsFaulted)
{
// Tell the user about the error/s via the timedTaskResult.Exception
}
});