命名互斥锁和await

27
因此,我无法在使用async时使用线程亲和锁 - 当运行多个进程时,如何保护我的资源?
例如,我有两个使用下面任务的进程:
 public async Task<bool> MutexWithAsync()
 {
     using (Mutex myMutex = new Mutex(false, "My mutex Name"))
     {
         try
         {
             myMutex.WaitOne();
             await DoSomething();
             return true;
         }
         catch { return false; }
         finally { myMutex.ReleaseMutex(); }
     }
 }
如果由 Mutex 保护的方法是同步的,那么上面的代码将会起作用,但是使用 async 后我会得到以下错误信息:

从未同步的代码块中调用了对象同步方法。

因此,在异步代码中使用命名 Mutex 是否无用?


4
高度相关:https://dev59.com/en7aa4cB1Zd3GeqPu8s4#23122566 - noseratio - open to work
2
@Romasz:是否考虑采用不同的架构?我在想一个专门的资源所有者进程,可以代表其他进程执行请求。 - Stephen Cleary
1
@Noseratio 感谢您提供的链接,它帮助我理解了一些问题,但是也毁了我的信号量想法;) - Romasz
@StephenCleary 这可能是一个解决方案,但目前我正在使用WP,它的后台任务功能有限。也许我可以伪造一个代理来实现这个目的,但我想避免这种情况。感谢您的帮助,您的话帮助我得到了另一个想法。尽管如此,我仍会尝试了解更多关于Mutex和async的内容。 - Romasz
5个回答

16

在某个特定线程上一致地访问互斥锁是必须确保的。你可以通过以下几种方式实现:

  1. 在持有互斥锁的关键部分不使用 await
  2. 在只有一个线程的 TaskScheduler 上调用互斥锁调用

可以像这样实现:

await Task.Factory.StartNew(() => mutex.WaitOne(), myCustomTaskScheduler);

或者,您可以使用同步代码并将所有内容移动到线程池中。如果您只能访问DoSomething的异步版本,请考虑仅对其结果调用Task.Wait。这里可能会有一些小的效率问题。也许还可以接受。


我必须考虑你的答案并尝试一些东西。问题是:我必须在持有Mutex时调用await(它是一个应该受到保护的关键部分 - 例如访问文件),而且我没有访问“DoSomething”的权限。我认为当我调用await DoSomething时,使用默认的ConfigureAwait(true)会捕获上下文,并在完成任务后继续在该捕获的上下文中进行其余操作。 - Romasz
2
正确,但是所捕获的上下文可能意味着“任何线程池线程”。根据我所拥有的信息,我认为这3个提议的解决方案都适用于您。如果您需要更多信息,请告诉我。 - usr
据我现在的理解,当我们从await返回时,我们回到了先前的上下文,但我们可能在不同的线程中。我对这个主题进行了一些研究:至于你的第一个方法-我尝试使用await打开流,并且我想要保护该打开,因此必须(在我看来)同时持有Mutex。第二个提案存在不同的问题 - 在RT内,很难将TaskScheduler限制为一个线程 - 我考虑过AutoResteEvent =,但我不确定它是否保证一个线程... - Romasz
至于第三个选项 - 我已经实现了它,似乎可以工作。据我理解,这使得我的代码更加同步,但打开流不需要太多时间,因此正如你所说,这只是一个小问题。 - Romasz
@Romasz 关于(2):您可以使用任务而不是线程来实现相同的效果。您需要编写一个由线程池任务支持的TaskScheduler(设置了LongRunning选项)。关于(3):您只需使用Task.Run包装所有同步等待即可将其移出UI线程。这样,您可以保持100%的响应性。唯一的缺点是会消耗一个线程池线程的内存,但这并不太糟糕。 - usr
至于(2)-我需要更深入地研究一下,但可能要在完成当前工作后才能进行。至于(3)-如果我没有错的话,我不能简单地用Task.Run包装同步等待-我正在打开流,所以我必须等待它-如果它是同步的,它会阻塞我,但我仍然在一个线程内,如果我用Run包装它,我将不得不使用await(这是我不想要的-> Mutex和其他线程当ruturned)。或者我可能理解错了什么。 - Romasz

8

我在异步方法中使用命名互斥锁来控制仅有一个进程可以调用它。另一个进程检查命名互斥锁并在无法创建新的命名互斥锁时退出。

我可以在异步方法中使用命名互斥锁,因为操作系统保证/控制操作系统中命名对象只有一个实例。此外,我不使用WaitOne/Release,这应该在线程上调用。

public async Task<bool> MutexWithAsync()
{
    // Create OS-wide named object. (It will not use WaitOne/Release)
    using (Mutex myMutex = new Mutex(false, "My mutex Name", out var owned))
    {
        if (owned)
        {
            // New named-object was created. We own it.
            try
            {
                await DoSomething();
                return true;
            }
            catch
            {
                return false;
            }
        }
        else
        {
            // The mutex was created by another process.
            // Exit this instance of process.
        }
    }
}

问题在于如果使用默认同步上下文,await DoSomething(); 可能会在不同的线程上继续执行。你尝试在非 UI 线程上运行过吗? - Romasz
是的,它可以在不同的线程上继续执行。我的主要想法是不同步同一进程的线程。我创建了命名内核对象。命名内核对象只能创建一次,第二次尝试创建将失败。就C#而言:如果已创建命名对象,则“owned”将为“true”。如果创建命名对象失败,则“owned”将为“false”。我可以使用/创建任何内核命名对象来实现此目的。https://learn.microsoft.com/en-us/windows/desktop/sysinfo/kernel-objects 事件、互斥体和信号量都很好。 - Sergei Zinovyev
现在我明白你的观点了,好主意。我只是在想这里是否有竞态条件的可能性,尽管找不到任何证据。 - Romasz
太好了!看起来当进程退出时,互斥锁会自动释放? - Felix
Felix,是的,因为它是从一个具有“finalizer”的类派生而来的。 - Sergei Zinovyev

2
您可以使用二进制信号量来代替互斥锁。信号量不需要由获取它的线程释放。这里的一个巨大劣势是,如果应用程序在DoSomething()内崩溃或被杀死,则信号量将不会被释放,下一个应用程序实例将会挂起。请参见被遗弃的命名信号量未被释放。"最初的回答"
 public async Task<bool> MutexWithAsync()
 {
     using (Semaphore semaphore = new Semaphore(1, 1, "My semaphore Name"))
     {
         try
         {
             semaphore.WaitOne();
             await DoSomething();
             return true;
         }
         catch { return false; }
         finally { semaphore.Release(); }
     }
 }

这是wekempf在他的回答中建议的。 - Romasz
1
很遗憾,命名信号量在所有操作系统上似乎都无法工作。 Stack Trace: at System.Threading.Semaphore.CreateSemaphoreCore(Int32 initialCount, Int32 maximumCount, String name, Boolean& createdNew) at System.Threading.Semaphore..ctor(Int32 initialCount, Int32 maximumCount, String name, Boolean& createdNew) at System.Threading.Semaphore..ctor(Int32 initialCount, Int32 maximumCount, String name)``` - Steve R.

1
我有一个有趣的解决方案提供给您。现在没有时间提供代码示例,如果我的描述不够,请告诉我,我会尝试提供代码。
您面临两个问题。首先,AsyncMutex没有线程亲和性,正如您所指出的那样。因此,您不能使用Mutex构建它。但是,您可以使用计数为1的Semaphore来构建它,因为Semaphore也没有线程亲和性。在C#中,Semaphore类可以命名并跨进程边界使用。因此,第一个问题很容易解决。
第二个问题是不想在“锁定”此AsyncMutex时使用阻塞调用。好吧,您可以使用ThreadPool.RegisterWaitForSingleObject注册回调函数,在Semaphore(WaitHandle)被标记时执行该回调函数。这会进行“异步等待”。使用TaskCompletionSource的一小段代码包装它,您就可以相当轻松地构建一个返回任务的WaitAsync方法,用于您的AsyncMutex。这两个想法应该使得实现可在C#中使用的跨进程命名AsyncMutex变得相当容易。
请记住,与其他AsyncMutex实现一样,这不是一个递归锁(同一个“线程”可以多次锁定互斥量,只要它解锁互斥量相同的次数),因此在代码中必须小心,以避免死锁。

你说得对,Semaphore 可以用于这个目的。然而,对于我的简单需求和短进程来说,让它在单线程上同步运行更容易些。 - Romasz
1
如果您需要更轻量级的东西,并且不需要命名或跨进程同步,您也可以使用SemaphoreSlim。 - Lucas Martins Juviniano

-4

这个守卫与async/await完美配合:

public sealed class AsyncLock : IDisposable
{
    readonly AutoResetEvent Value;

    public AsyncLock(AutoResetEvent value, int milliseconds = Timeout.Infinite)
    {
        if (value == null)
            throw new ArgumentNullException("value");

        value.WaitOne(milliseconds);
        Value = value;
    }

    void IDisposable.Dispose()
    {
        Value.Set();
    }
}

private async Task TestProc()
{
    var guard = new AutoResetEvent(true); // Guard for resource

    using (new AsyncLock(guard)) // Lock resource
    {
        await ProcessNewClientOrders(); // Use resource
    } // Unlock resource
}

1
这是同步等待获取锁,而不是异步等待获取锁。 - Servy
Servy 2:请认真阅读任务要求。这不涉及异步等待。如果你需要,可以向我提出,我会给你。 - Alexander Yudakov
这是一个异步方法;当然它应该异步地进行等待。如果它同步等待以取出锁,该方法将不再是异步的。 - Servy

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接