以非阻塞方式调用TaskCompletionSource.SetResult

36
我发现 TaskCompletionSource.SetResult(); 在返回之前会调用等待任务的代码。在我的情况下,这导致了死锁。
以下是一个在普通线程中启动的简化版本。
void ReceiverRun()
    while (true)
    {
        var msg = ReadNextMessage();
        TaskCompletionSource<Response> task = requests[msg.RequestID];

        if(msg.Error == null)
            task.SetResult(msg);
        else
            task.SetException(new Exception(msg.Error));
    }
}

代码中的"async"部分大概长这样。
await SendAwaitResponse("first message");
SendAwaitResponse("second message").Wait();

等待实际上嵌套在非异步调用中。
发送等待响应(简化)。
public static Task<Response> SendAwaitResponse(string msg)
{
    var t = new TaskCompletionSource<Response>();
    requests.Add(GetID(msg), t);
    stream.Write(msg);
    return t.Task;
}

我的假设是第二个SendAwaitResponse将在线程池线程中执行,但它继续在ReceiverRun创建的线程中执行。

有没有一种方法可以设置任务的结果而不继续等待它的代码?

该应用程序是一个控制台应用程序


5
一个非常简单的解决方法是使用 Task.Run(() => task.SetResult(msg));(类似地使用 SetException())。但我希望有更好的选择。 - svick
1
这篇来自2018年的博客文章详细讨论了该问题及其解决方法:https://devblogs.microsoft.com/premier-developer/the-danger-of-taskcompletionsourcet-class/ - C-F
4个回答

39
我发现TaskCompletionSource.SetResult();在返回之前会调用等待任务的代码。在我的情况下,这导致了死锁。
是的,我有一篇博客文章记录了这个问题(据我所知,MSDN上没有记录)。死锁发生的原因有两个:
1.存在异步和阻塞代码的混合(即,一个异步方法正在调用Wait)。 2.使用TaskContinuationOptions.ExecuteSynchronously安排任务延续。
我建议从最简单的解决方案开始:删除第一件事(1)。也就是说,不要混合使用async和Wait调用:
await SendAwaitResponse("first message");
SendAwaitResponse("second message").Wait();

相反,始终使用await

await SendAwaitResponse("first message");
await SendAwaitResponse("second message");

如果需要的话,您可以在调用栈的另一个点(而不是async方法中) Wait

这是我最推荐的解决方案。但是,如果您想尝试删除第二个事情(2),您可以使用一些技巧:要么将SetResult包装在Task.Run中以强制它运行在单独的线程上(我的AsyncEx库*WithBackgroundContinuations扩展方法可以做到这一点),要么为您的线程提供实际的上下文(例如我的AsyncContext类型)并指定ConfigureAwait(false),这将导致继续忽略ExecuteSynchronously标志

但是,这些解决方案比仅仅分离异步和阻塞代码要复杂得多。
另外,看一下TPL Dataflow;听起来你可能会觉得它很有用。

将SetResult包装在Task.Run中是有效的!设置上下文看起来很有趣,我会研究一下。 调用SendAwaitResponse的代码可能被认为不在我的控制范围内,因此我的目标是保持ReceiverRun继续传递消息,即使单个await continuation决定使用Wait或任何其他耗时操作。 - hultqvist
16
现在还有一个名为TaskCreationOptions.RunContinuationsAsynchronously的方法,它允许您指定(在TaskCompletionSource构造函数中)延续应异步执行。然而,这仅适用于.NET 4.6及以上版本。 - zastrowm
5
SetResult只应标记任务为已完成,其后续应该在线程池线程可用时运行。如果SetResult正在阻塞并等待此操作完成,则应将其视为严重错误。 - Triynko
2
@Triynko:我在7年前将其报告为一个bug。如果我没记错的话,它被关闭并标记为“按设计要求”。 - Stephen Cleary
如果SetResult正在阻塞等待它发生,那也会让我感到困扰。这个7年前设计的bug真是太可怕了。 - quetzalcoatl

7
作为控制台应用程序,您的应用程序在默认的同步上下文上运行,在此上下文中,await继续回调将在等待任务已完成的相同线程上调用。如果您想在await SendAwaitResponse之后切换线程,可以使用await Task.Yield()来实现:
await SendAwaitResponse("first message");
await Task.Yield(); 
// will be continued on a pool thread
// ...
SendAwaitResponse("second message").Wait(); // so no deadlock

您可以通过将Thread.CurrentThread.ManagedThreadId存储在Task.Result中,并在await之后将其与当前线程的id进行比较,从而进一步改进此功能。如果仍然在同一线程上,则执行await Task.Yield()

虽然我理解SendAwaitResponse是您实际代码的简化版本(就像您在问题中展示的那样),但它仍然完全在内部同步。为什么您希望在其中进行任何线程切换呢?

无论如何,您可能应该重新设计逻辑,使其不会对您当前所在的线程做出任何假设。避免混合使用awaitTask.Wait(),并使所有代码都异步化。通常,在顶级别(例如在Main内部)只需要使用一个Wait()

[编辑]ReceiverRun中调用task.SetResult(msg)实际上将控制流转移到您在taskawait的点,由于默认同步上下文的行为,没有线程切换。因此,处理实际消息的代码接管了ReceiverRun线程。最终,在同一线程上调用SendAwaitResponse("second message").Wait(),导致死锁。

以下是一个控制台应用程序代码,模拟您的示例。它在ProcessAsync内使用await Task.Yield()来在单独的线程上调度继续,因此控制流返回到ReceiverRun,并且没有死锁。

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApplication
{
    class Program
    {
        class Worker
        {
            public struct Response
            {
                public string message;
                public int threadId;
            }

            CancellationToken _token;
            readonly ConcurrentQueue<string> _messages = new ConcurrentQueue<string>();
            readonly ConcurrentDictionary<string, TaskCompletionSource<Response>> _requests = new ConcurrentDictionary<string, TaskCompletionSource<Response>>();

            public Worker(CancellationToken token)
            {
                _token = token;
            }

            string ReadNextMessage()
            {
                // using Thread.Sleep(100) for test purposes here,
                // should be using ManualResetEvent (or similar synchronization primitive),
                // depending on how messages arrive
                string message;
                while (!_messages.TryDequeue(out message))
                {
                    Thread.Sleep(100);
                    _token.ThrowIfCancellationRequested();
                }
                return message;
            }

            public void ReceiverRun()
            {
                LogThread("Enter ReceiverRun");
                while (true)
                {
                    var msg = ReadNextMessage();
                    LogThread("ReadNextMessage: " + msg);
                    var tcs = _requests[msg];
                    tcs.SetResult(new Response { message = msg, threadId = Thread.CurrentThread.ManagedThreadId });
                    _token.ThrowIfCancellationRequested(); // this is how we terminate the loop
                }
            }

            Task<Response> SendAwaitResponse(string msg)
            {
                LogThread("SendAwaitResponse: " + msg);
                var tcs = new TaskCompletionSource<Response>();
                _requests.TryAdd(msg, tcs);
                _messages.Enqueue(msg);
                return tcs.Task;
            }

            public async Task ProcessAsync()
            {
                LogThread("Enter Worker.ProcessAsync");

                var task1 = SendAwaitResponse("first message");
                await task1;
                LogThread("result1: " + task1.Result.message);
                // avoid deadlock for task2.Wait() with Task.Yield()
                // comment this out and task2.Wait() will dead-lock
                if (task1.Result.threadId == Thread.CurrentThread.ManagedThreadId)
                    await Task.Yield();

                var task2 = SendAwaitResponse("second message");
                task2.Wait();
                LogThread("result2: " + task2.Result.message);

                var task3 = SendAwaitResponse("third message");
                // still on the same thread as with result 2, no deadlock for task3.Wait()
                task3.Wait();
                LogThread("result3: " + task3.Result.message);

                var task4 = SendAwaitResponse("fourth message");
                await task4;
                LogThread("result4: " + task4.Result.message);
                // avoid deadlock for task5.Wait() with Task.Yield()
                // comment this out and task5.Wait() will dead-lock
                if (task4.Result.threadId == Thread.CurrentThread.ManagedThreadId)
                    await Task.Yield();

                var task5 = SendAwaitResponse("fifth message");
                task5.Wait();
                LogThread("result5: " + task5.Result.message);

                LogThread("Leave Worker.ProcessAsync");
            }

            public static void LogThread(string message)
            {
                Console.WriteLine("{0}, thread: {1}", message, Thread.CurrentThread.ManagedThreadId);
            }
        }

        static void Main(string[] args)
        {
            Worker.LogThread("Enter Main");
            var cts = new CancellationTokenSource(5000); // cancel after 5s
            var worker = new Worker(cts.Token);
            Task receiver = Task.Run(() => worker.ReceiverRun());
            Task main = worker.ProcessAsync();
            try
            {
                Task.WaitAll(main, receiver);
            }
            catch (Exception e)
            {
                Console.WriteLine("Exception: " + e.Message);
            }
            Worker.LogThread("Leave Main");
            Console.ReadLine();
        }
    }
}

这与在ReceiverRun内执行Task.Run(() => task.SetResult(msg))没有太大区别。我能想到的唯一优点是你可以明确控制何时切换线程。这样,你可以尽可能长时间地保持在同一线程上(例如,对于task2task3task4),但在task5.Wait()上避免死锁仍需要另一个线程切换。

两种解决方案最终都会使线程池增长,这对性能和可扩展性来说是不好的。

现在,如果我们在上面的代码中ProcessAsync内部到处使用await task替换task.Wait(),我们将不必使用await Task.Yield,并且仍然不会出现死锁。但是,在ProcessAsync内部的第一个await task1之后的整个await调用链实际上将在ReceiverRun线程上执行。只要我们不使用其他Wait()风格的调用阻塞此线程,并且在处理消息时不进行大量的CPU绑定工作,这种方法可能可以正常工作(异步IO-bound await风格的调用仍然应该可以正常工作,并且它们可能实际上会触发隐式线程切换)。
说到这里,我认为你需要一个单独的线程,并安装一个串行同步上下文来处理消息(类似于WindowsFormsSynchronizationContext)。这就是包含awaits的异步代码应该运行的地方。你仍然需要避免在该线程上使用Task.Wait。如果个别消息处理需要大量的CPU绑定工作,则应该使用Task.Run来进行此类工作。对于异步IO绑定调用,可以保持在同一线程上。
你可能需要查看@StephenClearyNito Asynchronous Library中的ActionDispatcher/ActionDispatcherSynchronizationContext,以获取异步消息处理逻辑。希望Stephen能够提供更好的答案。

0
有点晚了,但这是我认为能增加价值的解决方案。
我也一直在苦恼这个问题,我通过捕获等待方法的同步上下文来解决它。
代码大致如下:
// just a default sync context
private readonly SynchronizationContext _defaultContext = new SynchronizationContext();

void ReceiverRun()
{
    while (true)    // <-- i would replace this with a cancellation token
    {
        var msg = ReadNextMessage();
        TaskWithContext<TResult> task = requests[msg.RequestID];

        // if it wasn't a winforms/wpf thread, it would be null
        // we choose our default context (threadpool)
        var context = task.Context ?? _defaultContext;

        // execute it on the context which was captured where it was added. So it won't get completed on this thread.
        context.Post(state =>
        {
            if (msg.Error == null)
                task.TaskCompletionSource.SetResult(msg);
            else
                task.TaskCompletionSource.SetException(new Exception(msg.Error));
        });
    }
}

public static Task<Response> SendAwaitResponse(string msg)
{
    // The key is here! Save the current synchronization context.
    var t = new TaskWithContext<Response>(SynchronizationContext.Current); 

    requests.Add(GetID(msg), t);
    stream.Write(msg);
    return t.TaskCompletionSource.Task;
}

// class to hold a task and context
public class TaskWithContext<TResult>
{
    public SynchronizationContext Context { get; }

    public TaskCompletionSource<TResult> TaskCompletionSource { get; } = new TaskCompletionSource<Response>();

    public TaskWithContext(SynchronizationContext context)
    {
        Context = context;
    }
}

0
“我的假设是第二个SendAwaitResponse将在ThreadPool线程中执行,但它继续在ReceiverRun创建的线程中执行。”
“这完全取决于您在SendAwaitResponse中执行的操作。 异步和并发不是同一件事。”
“请查看:C#5 Async / Await-它是否*并发*?

我添加了我的应用程序是一个控制台应用程序,但最佳答案建议它会在线程池线程中继续运行,但实际上并没有发生。 - hultqvist
Stream.Write是同步方法,你可能想使用WriteAsync。 - Slugart
@Slugart 当然可以,但使用 WriteAsync() 并不能解决这个问题。 - svick

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接