如何防止任务中的同步继续?

86

我有一些库(socket networking)代码,为请求的待处理响应提供了基于TaskCompletionSource<T>的基于任务的API。然而,在TPL中存在一个烦恼,即似乎无法防止同步延续。我希望能够:

  • 告诉TaskCompletionSource<T>不允许调用者附加TaskContinuationOptions.ExecuteSynchronously选项,或
  • 以一种指定TaskContinuationOptions.ExecuteSynchronously应被忽略并使用线程池的方式设置结果(SetResult / TrySetResult

具体来说,我面临的问题是,传入的数据正在由专用阅读器处理,如果调用者可以附加TaskContinuationOptions.ExecuteSynchronously选项,它们可以阻止阅读器(这会影响更多的操作)。之前,我通过一些技巧解决了这个问题,检测是否存在任何继续操作,如果存在,则将完成推送到ThreadPool上,但是如果调用者已经饱和其工作队列,则会对其产生重大影响,因为完成操作将无法及时得到处理。如果他们使用Task.Wait()(或类似),他们将会陷入死锁。同样,这就是为什么阅读器在专用线程上而不是使用工作者的原因。

所以,在我尝试催促TPL团队之前:

  • 我不希望外部调用者能够劫持我的线程
  • 我不能将ThreadPool作为实现,因为它需要在池饱和时工作

以下示例会生成输出(顺序可能因时间而异):

Continuation on: Main thread
Press [return]
Continuation on: Thread pool

问题在于一个随机的呼叫者成功地获得了“主线程”的延续。在实际代码中,这将会打断主要的读取器;糟糕的事情发生了!

代码:

using System;
using System.Threading;
using System.Threading.Tasks;

static class Program
{
    static void Identify()
    {
        var thread = Thread.CurrentThread;
        string name = thread.IsThreadPoolThread
            ? "Thread pool" : thread.Name;
        if (string.IsNullOrEmpty(name))
            name = "#" + thread.ManagedThreadId;
        Console.WriteLine("Continuation on: " + name);
    }
    static void Main()
    {
        Thread.CurrentThread.Name = "Main thread";
        var source = new TaskCompletionSource<int>();
        var task = source.Task;
        task.ContinueWith(delegate {
            Identify();
        });
        task.ContinueWith(delegate {
            Identify();
        }, TaskContinuationOptions.ExecuteSynchronously);
        source.TrySetResult(123);
        Console.WriteLine("Press [return]");
        Console.ReadLine();
    }
}

2
我会尝试使用自己的API封装TaskCompletionSource,以防止直接调用ContinueWith,因为TaskCompletionSourceTask都不适合从它们继承。 - Dennis
2
@MattH 不是很准确 - 它只是重新表述了问题:要么你使用 ThreadPool 来做这件事(我已经提到过了 - 它会引起问题),要么你有一个专门的“待处理继续”线程,然后它们(指定了 ExecuteSynchronously 的继续)可以劫持那个线程 - 这会导致完全相同的问题,因为它意味着其他消息的继续可能会被阻塞,这又会影响多个调用者。 - Marc Gravell
1
@Hans 不行;TaskCompletionSource<T> 不允许您指定调度程序,因为它从未在调度程序上运行;附加任务可以在ContinueWith中指定调度程序,但如果我们知道所有附加任务都会正确处理事情,那么问题就没有意义了。 - Marc Gravell
3
@Andrey的意思是,想要达到所有调用者都使用ContinueWith而没有exec-sync的效果,这正是我想要实现的。问题在于,如果我的库将一个Task交给其他人,他们可能会做一些非常不良的事情:他们可以使用exec-sync来中断我的读取器。这非常危险,因此我希望从库的内部防止它发生。 - Marc Gravell
2
@Andrey,这是因为:a. 许多任务从一开始就没有继续处理(特别是批处理工作),这会强制每个任务都必须有一个;b. 即使那些本来会有继续处理的任务现在也变得更加复杂、繁琐,增加了工人操作。这很重要。 - Marc Gravell
显示剩余22条评论
6个回答

52

.NET 4.6的新特性:

.NET 4.6中包含一个新的TaskCreationOptions: RunContinuationsAsynchronously


如果您愿意使用Reflection访问私有字段......

您可以使用TASK_STATE_THREAD_WAS_ABORTED标志标记TCS的任务,这将导致所有后续操作不被内联。

const int TASK_STATE_THREAD_WAS_ABORTED = 134217728;

var stateField = typeof(Task).GetField("m_stateFlags", BindingFlags.NonPublic | BindingFlags.Instance);
stateField.SetValue(task, (int) stateField.GetValue(task) | TASK_STATE_THREAD_WAS_ABORTED);

编辑:

建议您使用表达式而不是使用反射发射。这种方法更易读,并且具有与PCL兼容的优势:

var taskParameter = Expression.Parameter(typeof (Task));
const string stateFlagsFieldName = "m_stateFlags";
var setter =
    Expression.Lambda<Action<Task>>(
        Expression.Assign(Expression.Field(taskParameter, stateFlagsFieldName),
            Expression.Or(Expression.Field(taskParameter, stateFlagsFieldName),
                Expression.Constant(TASK_STATE_THREAD_WAS_ABORTED))), taskParameter).Compile();

不使用反射:

如果有人感兴趣,我已经找到了一种不使用反射的方法来做这件事,但它有点“不干净”,当然也带来了不可忽视的性能损失:

try
{
    Thread.CurrentThread.Abort();
}
catch (ThreadAbortException)
{
    source.TrySetResult(123);
    Thread.ResetAbort();
}

3
请使用这个来为TPL团队创建一些伪示例,并提出关于能否通过构造函数选项或其他方式实现此功能的更改请求。 - Adam Houldsworth
1
@Adam 如果你必须称呼这个标志为“它的作用”而不是“引起它的原因”,那么它会像TaskCreationOptions.DoNotInline一样,甚至不需要更改TaskCompletionSource的构造函数签名。 - Marc Gravell
2
@AdamHouldsworth,别担心,我已经给他们发了同样的邮件;p - Marc Gravell
1
对于您的兴趣:这是经过ILGenerator等优化的代码:https://github.com/StackExchange/StackExchange.Redis/blob/master/StackExchange.Redis/StackExchange/Redis/TaskSource.cs - Marc Gravell
1
PCL目前不是一个紧急关注的问题,但还是谢谢:我熟悉Expression和reflection-emit API(以及许多其他元编程包装器)。此外:大多数我关心PCL的框架都会阻止非公共反射。 - Marc Gravell
显示剩余11条评论

9
我认为TPL中没有任何提供对TaskCompletionSource.SetResult继续执行的显式API控制的方法。我决定保留我的初始答案,用于控制async/await场景下的行为。
以下是另一种解决方案,如果tcs.SetResult触发的继续执行发生在调用SetResult的同一线程上,则会强制执行异步操作:ContinueWith
public static class TaskExt
{
    static readonly ConcurrentDictionary<Task, Thread> s_tcsTasks =
        new ConcurrentDictionary<Task, Thread>();

    // SetResultAsync
    static public void SetResultAsync<TResult>(
        this TaskCompletionSource<TResult> @this,
        TResult result)
    {
        s_tcsTasks.TryAdd(@this.Task, Thread.CurrentThread);
        try
        {
            @this.SetResult(result);
        }
        finally
        {
            Thread thread;
            s_tcsTasks.TryRemove(@this.Task, out thread);
        }
    }

    // ContinueWithAsync, TODO: more overrides
    static public Task ContinueWithAsync<TResult>(
        this Task<TResult> @this,
        Action<Task<TResult>> action,
        TaskContinuationOptions continuationOptions = TaskContinuationOptions.None)
    {
        return @this.ContinueWith((Func<Task<TResult>, Task>)(t =>
        {
            Thread thread = null;
            s_tcsTasks.TryGetValue(t, out thread);
            if (Thread.CurrentThread == thread)
            {
                // same thread which called SetResultAsync, avoid potential deadlocks

                // using thread pool
                return Task.Run(() => action(t));

                // not using thread pool (TaskCreationOptions.LongRunning creates a normal thread)
                // return Task.Factory.StartNew(() => action(t), TaskCreationOptions.LongRunning);
            }
            else
            {
                // continue on the same thread
                var task = new Task(() => action(t));
                task.RunSynchronously();
                return Task.FromResult(task);
            }
        }), continuationOptions).Unwrap();
    }
}

更新以回应评论:

我无法控制调用者-我不能让他们使用特定的continue-with变体:如果我能,问题在第一次出现时就不存在了

我不知道你不能控制调用者。尽管如此,如果你不能控制它,你可能也没有直接传递TaskCompletionSource对象给调用者。从逻辑上讲,你会传递它的token部分,即tcs.Task。在这种情况下,通过向上面添加另一个扩展方法,解决方案可能会更加容易:

// ImposeAsync, TODO: more overrides
static public Task<TResult> ImposeAsync<TResult>(this Task<TResult> @this)
{
    return @this.ContinueWith(new Func<Task<TResult>, Task<TResult>>(antecedent =>
    {
        Thread thread = null;
        s_tcsTasks.TryGetValue(antecedent, out thread);
        if (Thread.CurrentThread == thread)
        {
            // continue on a pool thread
            return antecedent.ContinueWith(t => t, 
                TaskContinuationOptions.None).Unwrap();
        }
        else
        {
            return antecedent;
        }
    }), TaskContinuationOptions.ExecuteSynchronously).Unwrap();
}

使用:

// library code
var source = new TaskCompletionSource<int>();
var task = source.Task.ImposeAsync();
// ... 

// client code
task.ContinueWith(delegate
{
    Identify();
}, TaskContinuationOptions.ExecuteSynchronously);

// ...
// library code
source.SetResultAsync(123);

这实际上适用于 awaitContinueWithfiddle),并且不需要使用反射技巧。


1
我无法控制调用者 - 我不能让他们使用特定的continue-with变体:如果我能够这样做,问题在第一时间就不会存在。 - Marc Gravell
@MarcGravell,我不知道你无法控制调用者。我发布了一个关于如何处理它的更新。 - noseratio - open to work
图书馆作者的两难选择;p 注意,有人找到了一种更简单、更直接的方法来实现所需的结果。 - Marc Gravell

4

模拟中止方法看起来非常不错,但在某些情况下导致TPL劫持线程。

然后我有了一个类似于检查连续对象的实现,但只是检查任何连续体,因为实际上有太多的情况适用于给定的代码,但这意味着即使像Task.Wait之类的事情也会导致线程池查找。

最终,在检查了大量的IL之后,唯一安全且有用的情况是SetOnInvokeMres情况(手动重置事件薄连续体)。还有很多其他情况:

  • 一些不安全,会导致线程劫持
  • 其余的不太有用,因为它们最终会导致线程池
所以最终,我选择检查非空的继续对象;如果它是 null,那就好了(没有继续);如果它是非空的,则特殊情况检查 SetOnInvokeMres - 如果是这样:那就好了(可以安全调用);否则,让线程池执行 TrySetComplete,不要告诉任务做任何特殊的事情,比如欺骗中止。 Task.Wait 使用 SetOnInvokeMres 方法,这是我们想尽可能避免死锁的特定场景。
Type taskType = typeof(Task);
FieldInfo continuationField = taskType.GetField("m_continuationObject", BindingFlags.Instance | BindingFlags.NonPublic);
Type safeScenario = taskType.GetNestedType("SetOnInvokeMres", BindingFlags.NonPublic);
if (continuationField != null && continuationField.FieldType == typeof(object) && safeScenario != null)
{
    var method = new DynamicMethod("IsSyncSafe", typeof(bool), new[] { typeof(Task) }, typeof(Task), true);
    var il = method.GetILGenerator();
    var hasContinuation = il.DefineLabel();
    il.Emit(OpCodes.Ldarg_0);
    il.Emit(OpCodes.Ldfld, continuationField);
    Label nonNull = il.DefineLabel(), goodReturn = il.DefineLabel();
    // check if null
    il.Emit(OpCodes.Brtrue_S, nonNull);
    il.MarkLabel(goodReturn);
    il.Emit(OpCodes.Ldc_I4_1);
    il.Emit(OpCodes.Ret);

    // check if is a SetOnInvokeMres - if so, we're OK
    il.MarkLabel(nonNull);
    il.Emit(OpCodes.Ldarg_0);
    il.Emit(OpCodes.Ldfld, continuationField);
    il.Emit(OpCodes.Isinst, safeScenario);
    il.Emit(OpCodes.Brtrue_S, goodReturn);

    il.Emit(OpCodes.Ldc_I4_0);
    il.Emit(OpCodes.Ret);

    IsSyncSafe = (Func<Task, bool>)method.CreateDelegate(typeof(Func<Task, bool>));

4

如果不是这个方法而是

var task = source.Task;

你可以这样做代替。
var task = source.Task.ContinueWith<Int32>( x => x.Result );

因此,您始终会添加一个将异步执行的续集,那么订阅者是否希望在相同的上下文中进行续集就无关紧要了。这有点像柯里化任务,不是吗?


1
这在评论中提到了(请参见Andrey); 问题在那里是它强制所有任务都有一个续集,而否则它们不会有,这是ContinueWithawait通常努力避免的事情(通过检查已经完成等)-并且由于这将强制所有东西进入工人,它实际上会加剧情况。 这是一个积极的想法,我感谢您的贡献:但它在这种情况下无济于事。 - Marc Gravell

3
更新:我发布了一个单独的答案来处理ContinueWith而不是await(因为ContinueWith不关心当前同步上下文)。

您可以使用一个愚蠢的同步上下文来施加异步性,以便在调用TaskCompletionSource上的SetResult/SetCancelled/SetException时触发连续。 我相信TPL在await tcs.Task点处使用当前同步上下文(作为判断是否将此类连续设置为同步或异步的标准)。

以下对我有效:

if (notifyAsync)
{
    tcs.SetResultAsync(null);
}
else
{
    tcs.SetResult(null);
}

SetResultAsync的实现如下:

public static class TaskExt
{
    static public void SetResultAsync<T>(this TaskCompletionSource<T> tcs, T result)
    {
        FakeSynchronizationContext.Execute(() => tcs.SetResult(result));
    }

    // FakeSynchronizationContext
    class FakeSynchronizationContext : SynchronizationContext
    {
        private static readonly ThreadLocal<FakeSynchronizationContext> s_context =
            new ThreadLocal<FakeSynchronizationContext>(() => new FakeSynchronizationContext());

        private FakeSynchronizationContext() { }

        public static FakeSynchronizationContext Instance { get { return s_context.Value; } }

        public static void Execute(Action action)
        {
            var savedContext = SynchronizationContext.Current;
            SynchronizationContext.SetSynchronizationContext(FakeSynchronizationContext.Instance);
            try
            {
                action();
            }
            finally
            {
                SynchronizationContext.SetSynchronizationContext(savedContext);
            }
        }

        // SynchronizationContext methods

        public override SynchronizationContext CreateCopy()
        {
            return this;
        }

        public override void OperationStarted()
        {
            throw new NotImplementedException("OperationStarted");
        }

        public override void OperationCompleted()
        {
            throw new NotImplementedException("OperationCompleted");
        }

        public override void Post(SendOrPostCallback d, object state)
        {
            throw new NotImplementedException("Post");
        }

        public override void Send(SendOrPostCallback d, object state)
        {
            throw new NotImplementedException("Send");
        }
    }
}

SynchronizationContext.SetSynchronizationContext 在添加开销方面非常便宜。事实上,WPF Dispatcher.BeginInvoke的实现采用了非常相似的方法。

TPL将await时的目标同步上下文与tcs.SetResult时的同步上下文进行比较。如果同步上下文相同(或两个地方都没有同步上下文),则直接同步调用继续执行。否则,使用SynchronizationContext.Post在目标同步上下文中排队,即正常的await行为。这种方法总是强制实施SynchronizationContext.Post行为(或者如果没有目标同步上下文,则使用线程池继续执行)。

更新,这种方法对于task.ContinueWith无效,因为ContinueWith不关心当前的同步上下文。但它适用于await task (fiddle)。它也适用于await task.ConfigureAwait(false)
另一方面,这种方法适用于ContinueWith

很诱人,但更改同步上下文几乎肯定会影响调用应用程序 - 例如,只是使用我的库的 Web 或 Windows 应用程序不应该发现同步上下文每秒更改数百次。 - Marc Gravell
然而,这可能无法满足您的第二个要求:不使用ThreadPool。使用此解决方案,如果在await tcs.Task时没有同步上下文(或者它是基本默认值),TPL确实会使用ThreadPool。但这是标准TPL行为。 - noseratio - open to work
然而,正如所示,它对我实际上并没有起作用;使用您的代码,并使用source.SetResultAsync(123);而不是source.TrySetResult(123),我仍然看到输出“Continuation on: Main thread”。 - Marc Gravell
@MarcGravell,TPL在await点比较目标s.context tcs.SetResult点的s.context。如果s.context相同(或两个地方都没有上下文),则直接同步调用继续执行。否则,使用目标s.contextSynchronizationContext.Post将其排队,即正常的await行为。这种方法所做的是强制执行Post(如果没有目标s.context,则使用池线程)。我不明白为什么它会破坏任意调用者的代码。您能解释一下吗? - noseratio - open to work
1
@Noseration 啊,对了:关键点不清楚是它们不同。我会查看的。谢谢。 - Marc Gravell
显示剩余4条评论

3

如果您能并且已经准备好使用反射技术,那么这应该可以解决问题:

public static class MakeItAsync
{
    static public void TrySetAsync<T>(this TaskCompletionSource<T> source, T result)
    {
        var continuation = typeof(Task).GetField("m_continuationObject", BindingFlags.NonPublic | BindingFlags.GetField | BindingFlags.Instance);
        var continuations = (List<object>)continuation.GetValue(source.Task);

        foreach (object c in continuations)
        {
            var option = c.GetType().GetField("m_options", BindingFlags.NonPublic | BindingFlags.GetField | BindingFlags.Instance);
            var options = (TaskContinuationOptions)option.GetValue(c);

            options &= ~TaskContinuationOptions.ExecuteSynchronously;
            option.SetValue(c, options);
        }

        source.TrySetResult(result);
    }        
}

@Noseratio,没错,但现在它可以工作了,而且他们可能会在下一个版本中实现一种正确的方法来做这件事。 - Fredou
但是如果你只需要执行 Task.Run(() => tcs.SetResult(result)),那么为什么还需要这个呢? - noseratio - open to work
@Noseratio,我不知道,去问Marc吧 :-) ,我只是在所有连接到TaskCompletionSource的任务上删除了TaskContinuationOptions.ExecuteSynchronously标志,以确保它们都使用线程池而不是主线程。 - Fredou
m_continuationObject hack实际上是我已经用来识别可能存在问题的任务的欺骗方式,所以这并不超出考虑范围。有趣,谢谢。到目前为止,这是看起来最有用的选项。 - Marc Gravell
这是一种很好的(滥用)反射方式(不要误解我:这是我很乐意做的事情),但这是一种更好的(滥用)反射方式:https://dev59.com/iGEh5IYBdhLWcg3wTB1c#22588431 - Marc Gravell
显示剩余2条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接