正确的实现无限任务的方法(定时器 vs 任务)

102
因此,我的应用程序需要几乎持续地执行一个操作(每次运行之间暂停约10秒),只要应用程序正在运行或请求取消。它需要执行的工作可能需要最多30秒钟。
使用System.Timers.Timer并使用AutoReset是否更好,以确保在前一个“tick”完成之前不执行操作?
还是应该使用带有取消令牌的LongRunning模式中的一般任务,并在其中使用常规无限循环来调用执行工作的操作,并在调用之间使用10秒的Thread.Sleep?至于async/await模型,我不确定在这里是否合适,因为我没有任何来自工作的返回值。
CancellationTokenSource wtoken;
Task task;

void StopWork()
{
    wtoken.Cancel();

    try 
    {
        task.Wait();
    } catch(AggregateException) { }
}

void StartWork()
{
    wtoken = new CancellationTokenSource();

    task = Task.Factory.StartNew(() =>
    {
        while (true)
        {
            wtoken.Token.ThrowIfCancellationRequested();
            DoWork();
            Thread.Sleep(10000);
        }
    }, wtoken, TaskCreationOptions.LongRunning);
}

void DoWork()
{
    // Some work that takes up to 30 seconds but isn't returning anything.
}

或者只需使用一个简单的计时器,同时使用其AutoReset属性,并调用.Stop()来取消它?

考虑到您想要实现的目标,这个任务似乎有些过度。请参考“KISS原则”(保持简单原则):http://en.wikipedia.org/wiki/KISS_principle。在OnTick()开始时停止计时器,检查一个布尔值以确定是否需要执行任何操作,完成工作后重新启动计时器。 - Mike Trusov
3个回答

98
我会尽力为您进行翻译。这段内容是关于编程的,建议使用TPL Dataflow(因为您正在使用.NET 4.5,并且它在内部使用Task)。您可以轻松地创建一个ActionBlock<TInput>,在处理完其操作并等待适当的时间后,将项目发布到自身。
首先,创建一个工厂来创建您的无休止任务:
ITargetBlock<DateTimeOffset> CreateNeverEndingTask(
    Action<DateTimeOffset> action, CancellationToken cancellationToken)
{
    // Validate parameters.
    if (action == null) throw new ArgumentNullException("action");

    // Declare the block variable, it needs to be captured.
    ActionBlock<DateTimeOffset> block = null;

    // Create the block, it will call itself, so
    // you need to separate the declaration and
    // the assignment.
    // Async so you can wait easily when the
    // delay comes.
    block = new ActionBlock<DateTimeOffset>(async now => {
        // Perform the action.
        action(now);

        // Wait.
        await Task.Delay(TimeSpan.FromSeconds(10), cancellationToken).
            // Doing this here because synchronization context more than
            // likely *doesn't* need to be captured for the continuation
            // here.  As a matter of fact, that would be downright
            // dangerous.
            ConfigureAwait(false);

        // Post the action back to the block.
        block.Post(DateTimeOffset.Now);
    }, new ExecutionDataflowBlockOptions { 
        CancellationToken = cancellationToken
    });

    // Return the block.
    return block;
}

I've chosen the ActionBlock<TInput> to take a DateTimeOffset structure; you have to pass a type parameter, and it might as well pass some useful state (you can change the nature of the state, if you want).
Also, note that the ActionBlock<TInput> by default processes only one item at a time, so you're guaranteed that only one action will be processed (meaning, you won't have to deal with reentrancy when it calls the Post extension method back on itself).
I've also passed the CancellationToken结构 to both the constructor of the ActionBlock<TInput> and to the Task.Delay方法 call; if the process is cancelled, the cancellation will take place at the first possible opportunity.
From there, it's an easy refactoring of your code to store the ITargetBlock<DateTimeoffset>接口 implemented by ActionBlock<TInput>(this is the higher-level abstraction representing blocks that are consumers, and you want to be able to trigger the consumption through a call to the Post extension method):
CancellationTokenSource wtoken;
ActionBlock<DateTimeOffset> task;

你的StartWork方法:

void StartWork()
{
    // Create the token source.
    wtoken = new CancellationTokenSource();

    // Set the task.
    task = CreateNeverEndingTask(now => DoWork(), wtoken.Token);

    // Start the task.  Post the time.
    task.Post(DateTimeOffset.Now);
}

接下来是你的StopWork方法:

void StopWork()
{
    // CancellationTokenSource implements IDisposable.
    using (wtoken)
    {
        // Cancel.  This will cancel the task.
        wtoken.Cancel();
    }

    // Set everything to null, since the references
    // are on the class level and keeping them around
    // is holding onto invalid state.
    wtoken = null;
    task = null;
}

为什么你想在这里使用TPL数据流?有几个原因:

关注点分离

CreateNeverEndingTask方法现在是一个工厂,创建你的“服务”。你控制它何时启动和停止,它是完全自包含的。你不需要将定时器的状态控制与代码的其他方面交织在一起。你只需创建该块,启动它,并在完成后停止它。

更有效地使用线程/任务/资源

TPL数据流块的默认调度程序与Task相同,即线程池。通过使用ActionBlock<TInput>来处理你的操作,以及调用Task.Delay,当你实际上没有做任何事情时,你正在让你使用的线程放弃控制权。当然,当你生成处理连续体的新Task时,这实际上会导致一些开销,但考虑到你不在紧密循环中处理它(你在调用之间等待十秒钟),这应该是很小的。

如果DoWork函数实际上可以被设置为可等待的(也就是说,它返回一个Task),那么你可能可以通过调整上面的工厂方法,使其接受Func<DateTimeOffset, CancellationToken, Task>而不是Action<DateTimeOffset>来进一步优化它,如下所示:

ITargetBlock<DateTimeOffset> CreateNeverEndingTask(
    Func<DateTimeOffset, CancellationToken, Task> action, 
    CancellationToken cancellationToken)
{
    // Validate parameters.
    if (action == null) throw new ArgumentNullException("action");

    // Declare the block variable, it needs to be captured.
    ActionBlock<DateTimeOffset> block = null;

    // Create the block, it will call itself, so
    // you need to separate the declaration and
    // the assignment.
    // Async so you can wait easily when the
    // delay comes.
    block = new ActionBlock<DateTimeOffset>(async now => {
        // Perform the action.  Wait on the result.
        await action(now, cancellationToken).
            // Doing this here because synchronization context more than
            // likely *doesn't* need to be captured for the continuation
            // here.  As a matter of fact, that would be downright
            // dangerous.
            ConfigureAwait(false);

        // Wait.
        await Task.Delay(TimeSpan.FromSeconds(10), cancellationToken).
            // Same as above.
            ConfigureAwait(false);

        // Post the action back to the block.
        block.Post(DateTimeOffset.Now);
    }, new ExecutionDataflowBlockOptions { 
        CancellationToken = cancellationToken
    });

    // Return the block.
    return block;
}

当然,将CancellationToken传递到您的方法(如果它接受一个)是一个好的实践方法,在这里完成该操作。
这意味着您将有一个DoWorkAsync方法,其签名如下:
Task DoWorkAsync(CancellationToken cancellationToken);

您需要略微修改(不用过分关注关注责任分离)StartWork方法以适应传递给CreateNeverEndingTask方法的新签名,如下所示:

void StartWork()
{
    // Create the token source.
    wtoken = new CancellationTokenSource();

    // Set the task.
    task = CreateNeverEndingTask((now, ct) => DoWorkAsync(ct), wtoken.Token);

    // Start the task.  Post the time.
    task.Post(DateTimeOffset.Now, wtoken.Token);
}

你好,我正在尝试这个实现,但是遇到了问题。如果我的DoWork不带参数,task = CreateNeverEndingTask(now => DoWork(), wtoken.Token); 就会出现构建错误(类型不匹配)。另一方面,如果我的DoWork需要一个DateTimeOffset参数,那么同样的代码行会给我一个不同的构建错误,告诉我没有重载函数接受0个参数。请问你能帮我解决这个问题吗? - Bovaz
1
实际上,我通过在分配任务的行中添加一个转换并将参数传递给DoWork来解决了我的问题:task = (ActionBlock<DateTimeOffset>)CreateNeverEndingTask(now => DoWork(now), wtoken.Token); - Bovaz
你也可以将 "ActionBlock<DateTimeOffset> task;" 的类型更改为 "ITargetBlock<DateTimeOffset> task;"。 - XOR
1
我相信这很可能会永久分配内存,最终导致溢出。 - Nate Gardner
@NateGardner 在哪个部分? - casperOne

83

我觉得新的基于任务的界面非常适合像这样的任务 - 甚至比使用计时器类更简单。

你可以对示例进行一些小的调整,而不是:

task = Task.Factory.StartNew(() =>
{
    while (true)
    {
        wtoken.Token.ThrowIfCancellationRequested();
        DoWork();
        Thread.Sleep(10000);
    }
}, wtoken, TaskCreationOptions.LongRunning);

你可以这样做:

task = Task.Run(async () =>  // <- marked async
{
    while (true)
    {
        DoWork();
        await Task.Delay(10000, wtoken.Token); // <- await with cancellation
    }
}, wtoken.Token);

如果在Task.Delay中,取消将立即发生,而不必等待Thread.Sleep完成。

此外,使用Task.Delay而不是Thread.Sleep意味着您不会在整个休眠期间占用一个线程做无用的事情。

如果可能,您还可以使DoWork()接受取消标记,这样取消将更加及时响应。


1
当您将异步lambda作为Task.Factory.StartNew的参数时,请注意您将获得什么任务 - http://blogs.msdn.com/b/pfxteam/archive/2011/10/24/10229468.aspx 当您在请求取消后执行task.Wait();,您将等待错误的任务。 - Lukas Pirkl
是的,现在应该使用Task.Run,它有正确的重载。 - porges
根据http://blogs.msdn.com/b/pfxteam/archive/2011/10/24/10229468.aspx的说法,Task.Run使用线程池,因此你使用Task.Run代替Task.Factory.StartNewTaskCreationOptions.LongRunning的示例并不完全相同——如果我需要任务使用LongRunning选项,我不能像你展示的那样使用Task.Run吗?还是我漏掉了什么? - Jeff
@Porges 理解了。我的使用情况是运行一个无限循环的任务,在每次迭代中都会完成一块工作,并在下一次迭代时“休息”2秒钟,然后再做另一块工作。它永远运行,但会定期休息2秒钟。不过,我的评论更多地是关于您是否可以使用Task.Run语法指定它为“LongRunning”。从文档上看,只要您满意它使用的默认设置,Task.Run就是更简洁的语法。似乎没有重载它,以接受TaskCreationOptions参数。 - Jeff
1
@Lumirris:你说得对,没有办法指定这个;Task.Run (http://referencesource.microsoft.com/#mscorlib/system/threading/Tasks/Task.cs,89fc01f3bb88eed9) 本质上与 Task.Factory.StartNew (http://referencesource.microsoft.com/#mscorlib/system/threading/Tasks/TaskFactory.cs,fb6e45941d931629) 使用默认选项是一样的。(但它确实指定了 DenyChildAttach。) - porges
显示剩余2条评论

8
这里是我的建议:
  • 继承NeverEndingTask类并重写ExecutionCore方法以完成所需工作。
  • 更改ExecutionLoopDelayMs可以调整循环之间的时间,例如如果您想使用退避算法。
  • Start/Stop提供同步接口以启动/停止任务。
  • LongRunning意味着您将获得每个NeverEndingTask一个专用线程。
  • 与上面基于ActionBlock的解决方案不同,该类不会在循环中分配内存。
  • 下面的代码仅为草图示例,不一定适用于生产环境 :)
public abstract class NeverEndingTask
{
    // Using a CTS allows NeverEndingTask to "cancel itself"
    private readonly CancellationTokenSource _cts = new CancellationTokenSource();

    protected NeverEndingTask()
    {
         TheNeverEndingTask = new Task(
            () =>
            {
                // Wait to see if we get cancelled...
                while (!_cts.Token.WaitHandle.WaitOne(ExecutionLoopDelayMs))
                {
                    // Otherwise execute our code...
                    ExecutionCore(_cts.Token);
                }
                // If we were cancelled, use the idiomatic way to terminate task
                _cts.Token.ThrowIfCancellationRequested();
            },
            _cts.Token,
            TaskCreationOptions.DenyChildAttach | TaskCreationOptions.LongRunning);

        // Do not forget to observe faulted tasks - for NeverEndingTask faults are probably never desirable
        TheNeverEndingTask.ContinueWith(x =>
        {
            Trace.TraceError(x.Exception.InnerException.Message);
            // Log/Fire Events etc.
        }, TaskContinuationOptions.OnlyOnFaulted);

    }

    protected readonly int ExecutionLoopDelayMs = 0;
    protected Task TheNeverEndingTask;

    public void Start()
    {
       // Should throw if you try to start twice...
       TheNeverEndingTask.Start();
    }

    protected abstract void ExecutionCore(CancellationToken cancellationToken);

    public void Stop()
    {
        // This code should be reentrant...
        _cts.Cancel();
        TheNeverEndingTask.Wait();
    }
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接