任务排序和可重入性

16
我有一个场景,可能非常普遍:
1. 有一个任务(一个 UI 命令处理器),可以同步或异步完成。 2. 命令可能会比它们被处理的速度更快。 3. 如果已经有一个挂起的命令任务,则应将新的命令处理程序任务排队并顺序处理。 4. 每个新任务的结果可能取决于前一个任务的结果。
取消应该被观察,但为了简单起见,我希望将其放在问题范围之外。同时,线程安全(并发)不是必需的,但必须支持重入。
下面是我尝试实现这一目标的基本示例(作为控制台应用程序,以简化操作)。
using System;
using System.Threading.Tasks;

namespace ConsoleApp
{
    class Program
    {
        static void Main(string[] args)
        {
            var asyncOp = new AsyncOp<int>();

            Func<int, Task<int>> handleAsync = async (arg) =>
            {
                Console.WriteLine("this task arg: " + arg);

                //await Task.Delay(arg); // make it async

                return await Task.FromResult(arg); // sync
            };

            Console.WriteLine("Test #1...");
            asyncOp.RunAsync(() => handleAsync(1000));
            asyncOp.RunAsync(() => handleAsync(900));
            asyncOp.RunAsync(() => handleAsync(800));
            asyncOp.CurrentTask.Wait();

            Console.WriteLine("\nPress any key to continue to test #2...");
            Console.ReadLine();

            asyncOp.RunAsync(() =>
            {
                asyncOp.RunAsync(() => handleAsync(200));
                return handleAsync(100);
            });

            asyncOp.CurrentTask.Wait();
            Console.WriteLine("\nPress any key to exit...");
            Console.ReadLine();
        }

        // AsyncOp
        class AsyncOp<T>
        {
            Task<T> _pending = Task.FromResult(default(T));

            public Task<T> CurrentTask { get { return _pending; } }

            public Task<T> RunAsync(Func<Task<T>> handler)
            {
                var pending = _pending;
                Func<Task<T>> wrapper = async () =>
                {
                    // await the prev task
                    var prevResult = await pending;
                    Console.WriteLine("\nprev task result:  " + prevResult);
                    // start and await the handler
                    return await handler();
                };

                _pending = wrapper();
                return _pending;
            }
        }

    }
}

输出结果:
测试#1...
上一个任务的结果:0 这个任务的参数:1000
上一个任务的结果:1000 这个任务的参数:900
上一个任务的结果:900 这个任务的参数:800
按任意键继续进行测试#2...
上一个任务的结果:800
上一个任务的结果:800 这个任务的参数:200 这个任务的参数:100
按任意键退出...
asyncOp.RunAsync(() =>
{
    asyncOp.RunAsync(() => handleAsync(200));
    return handleAsync(100);
});

期望的输出应为100200,而不是200100,这是因为已经有一个挂起的外部任务100。这显然是因为内部任务同步执行,破坏了逻辑var pending = _pending; /*...*/_pending = wrapper()的外部任务。
如何让测试#2也能工作?
一种解决方法是强制对每个任务进行异步处理,使用Task.Factory.StartNew(…,TaskScheduler.FromCurrentSynchronizationContext()。然而,我不想强制要求命令处理程序在内部是同步执行的。此外,我不想依赖于任何特定的同步上下文的行为(即依赖于Task.Factory.StartNew应该在创建的任务实际启动之前返回)。
在实际项目中,我负责上述的AsyncOp,但无法控制命令处理程序(即handleAsync中的任何内容)。

2
最近我注意到你的名字出现在很多好问题和答案上,有新的高质量用户来到这个网站是很好的。 - Scott Chamberlain
1
我知道你已经回答了自己,但这似乎是使用 Rx 的好地方。另外,在您的代码执行第一个调用期间如果有第三个调用会发生什么?使用 Rx,您可以在一行中限制调用速率,甚至在第二行中批量执行它们。此外,使用 Rx 取消操作非常容易。 - Krzysztof Skowronek
3个回答

14
我几乎忘记了可以手动构造一个Task,而不启动或安排它。然后,““Task.Factory.StartNew” vs “new Task(...).Start””让我回到正轨。我认为这是少数几种情况之一,Task<TResult> 构造函数可能实际有用,以及嵌套任务(Task<Task<T>>)和 Task.Unwrap()
// AsyncOp
class AsyncOp<T>
{
    Task<T> _pending = Task.FromResult(default(T));

    public Task<T> CurrentTask { get { return _pending; } }

    public Task<T> RunAsync(Func<Task<T>> handler, bool useSynchronizationContext = false)
    {
        var pending = _pending;
        Func<Task<T>> wrapper = async () =>
        {
            // await the prev task
            var prevResult = await pending;
            Console.WriteLine("\nprev task result:  " + prevResult);
            // start and await the handler
            return await handler();
        };

        var task = new Task<Task<T>>(wrapper);
        var inner = task.Unwrap();
        _pending = inner;

        task.RunSynchronously(useSynchronizationContext ?
            TaskScheduler.FromCurrentSynchronizationContext() :
            TaskScheduler.Current);

        return inner;
    }
}

输出:
测试 #1...
上一个任务的结果:0 这个任务的参数:1000
上一个任务的结果:1000 这个任务的参数:900
上一个任务的结果:900 这个任务的参数:800
按任意键继续进行测试 #2...
上一个任务的结果:800 这个任务的参数:100
上一个任务的结果:100 这个任务的参数:200
现在,通过添加锁以保护 _pending,如果需要,可以非常容易地使 AsyncOp 线程安全。
更新,这已经进一步改进了 取消/重新启动逻辑

1
Task 构造函数大获成功! :-) - Theodor Zoulias
1
你的解决方案在我看来更好。Rx 的学习曲线太陡峭了,而且它的语义太细腻了。例如,Enigmativity 经常提供没有 onError 处理程序的 Subscribe 示例,如果出现任何问题,这将导致进程崩溃。 - Theodor Zoulias
1
@谢谢 :) 这正是我一直对 Rx 的感觉。它非常酷,但它就像是一个完全转变了人们应该如何看待异步工作流程的东西。我曾经试图将其推广到一个前端项目中,以便更好地学习它,但其他开发人员决定反对 :) - noseratio - open to work
1
@TheodorZoulias,这篇论文是金子般的存在,我相信背后的大脑是Erik Meyer和Bart De Smet。 - noseratio - open to work
1
Erik Meijer 是一个很酷的人!我刚刚看了他在一段15分钟视频中讲解 Rx。 - Theodor Zoulias
显示剩余2条评论

1

这里有一个解决方案,与被接受的答案相比,在所有方面都更糟,除了具有线程安全性(这不是问题的要求)。缺点:

  1. 所有lambda表达式都是异步执行的(没有快速路径)。
  2. executeOnCurrentContext配置影响所有lambda表达式(它不是每个lambda表达式的配置)。

此解决方案使用TPL Dataflow库中的ActionBlock作为处理引擎。

public class AsyncOp<T>
{
    private readonly ActionBlock<Task<Task<T>>> _actionBlock;

    public AsyncOp(bool executeOnCurrentContext = false)
    {
        var options = new ExecutionDataflowBlockOptions();
        if (executeOnCurrentContext)
            options.TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext();

        _actionBlock = new ActionBlock<Task<Task<T>>>(async taskTask =>
        {
            try
            {
                taskTask.RunSynchronously();
                await await taskTask;
            }
            catch { } // Ignore exceptions
        }, options);
    }

    public Task<T> RunAsync(Func<Task<T>> taskFactory)
    {
        var taskTask = new Task<Task<T>>(taskFactory);
        if (!_actionBlock.Post(taskTask))
            throw new InvalidOperationException("Not accepted"); // Should never happen
        return taskTask.Unwrap();
    }
}

非常有趣!你能否加入取消功能?我发现在实践中,我几乎总是想要在启动新的操作实例之前取消先前挂起的操作,就像我在这里做的那样(https://stackoverflow.com/a/21639075/1768303)。 - noseratio - open to work
1
@noseratio 制作一个线程安全的可取消操作本质上是很棘手的,因为 CancellationTokenSource 类的 Dispose 方法不是线程安全的,并且文档强烈要求必须对该类进行处理。我在这里尝试了一个实现 [here] (https://dev59.com/nGw05IYBdhLWcg3w4125#61681938),为这样一个简单的要求准备了很多代码。我现在发现,[你自己的尝试] (https://stackoverflow.com/a/21639075/1768303) 也有相当多的代码。 :-) - Theodor Zoulias

1

微软的Rx确实提供了一种简单的方法来实现这种操作。以下是一种简单的(可能过于简单)的方法:

var subject = new BehaviorSubject<int>(0);

IDisposable subscription =
    subject
        .Scan((x0, x1) =>
        {
            Console.WriteLine($"previous value {x0}");
            return x1;
        })
        .Skip(1)
        .Subscribe(x => Console.WriteLine($"current value {x}\r\n"));

subject.OnNext(1000);
subject.OnNext(900);
subject.OnNext(800);

Console.WriteLine("\r\nPress any key to continue to test #2...\r\n");
Console.ReadLine();

subject.OnNext(200);
subject.OnNext(100);

Console.WriteLine("\r\nPress any key to exit...");
Console.ReadLine();

我得到的输出是这样的:
上一个值 0
当前值 1000
上一个值 1000 当前值 900
上一个值 900 当前值 800
按任意键继续测试#2...
上一个值 800 当前值 200
上一个值 200 当前值 100
按任意键退出...
可以通过调用subscription.Dispose()随时取消。
Rx中的错误处理通常比普通情况更具有定制性。这不仅仅是在事情周围抛出try/catch。在像IO错误这样的情况下,您还可以使用Retry运算符重复失败的步骤。
在这种情况下,因为我使用了BehaviorSubject(每当订阅时都会重复其上一个值),所以您可以轻松地使用Catch操作符重新订阅。
var subject = new BehaviorSubject<int>(0);
var random = new Random();

IDisposable subscription =
    subject
        .Select(x =>
        {
            if (random.Next(10) == 0)
                throw new Exception();
            return x;
        })
        .Catch<int, Exception>(ex => subject.Select(x => -x))
        .Scan((x0, x1) =>
        {
            Console.WriteLine($"previous value {x0}");
            return x1;
        })
        .Skip(1)
        .Subscribe(x => Console.WriteLine($"current value {x}\r\n"));

现在使用 .Catch<int, Exception>(ex => subject.Select(x => -x)),当出现异常时它会反转查询的值。
一个典型的输出可能是这样的:
上一个值 0
当前值 1000
上一个值 1000 当前值 900
上一个值 900 当前值 800
按任意键继续测试 #2...
上一个值 800 当前值 -200
上一个值 -200 当前值 -100
按任意键退出...
请注意第二半部分中的负数。异常已被处理,查询能够继续执行。

有多种选择来解决这个问题是很好的 :) 为了完整性,你能加上一些错误处理吗?例如,如何在异常情况下记录和重复执行。谢啦! - noseratio - open to work
2
@noseratio - 我已经添加了一个错误处理的例子。 - Enigmativity

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接