使用任务并行库时与对象同步

4
我有一个WPF应用程序,需要不时执行长时间运行的操作 - 或者说,许多小操作总共需要一段时间。我发现.Net 4中的Task Parallel Library可以很好地处理这个问题。然而,由于本质上这个操作在另一个相同类型的操作开始之前必须完成,因此只能同时运行一个操作。即使上一个操作仍在进行中,用户也有非常真实的机会执行需要该进程运行的操作。我想同步这个过程,以便始终只有一个操作在运行。当正在运行的实例完成后,另一个实例获得锁并继续运行,直到没有更多这样的操作为止。我有一个运行这个过程的类名为EntityUpdater。在这个类中,我认为定义同步对象是聪明的:
private static object _lockObject = new object();

使其静态化应确保只要锁定正确,任何EntityUpdater对象都将等待其轮到自己的时间,对吗?
因此,在启动任务之前(这将依次启动所有其他小子任务,并附加到它们的父任务),我进行了天真的第一次尝试:
Monitor.Enter(_lockObject, ref _lockAquired);

(_lockAquired只是一个本地的布尔值)

主任务(带有所有子任务的任务)有一个继续,其存在主要是为了执行以下操作

Monitor.Exit(_lockObject);

我知道我应该把这段代码放在finally块中,但它几乎是连续执行的唯一代码,所以我不知道这样做有什么区别。
无论如何,我认为这里涉及到一些线程魔法,导致我收到“从未同步的代码块调用了对象同步方法”SynchronizationLockException异常。我已确保_lockAquired实际上为true,并尝试在几个不同的位置使用Monitor.Enter,但我总是得到这个异常。
因此,我的问题基本上是如何同步访问一个对象(对象本身并不重要),以便在任何给定时间只运行一个进程副本,并且在一个已经运行时启动的任何其他进程都将被阻止?复杂性 - 我猜 - 出现在添加的要求上,即锁定应在将来的某个时候释放,当第一个TPL任务的所有子任务完成时。
更新:
下面是一些代码,展示了我目前正在做的事情。
public class EntityUpdater
{
    #region Fields

    private static object _lockObject = new object();
    private bool _lockAquired;

    private Stopwatch stopWatch;

    #endregion

    public void RunProcess(IEnumerable<Action<IEntity>> process, IEnumerable<IEntity> entities)
    {
        stopWatch = new Stopwatch();

        var processList = process.ToList();

        Monitor.Enter(_lockObject, ref _lockAquired);

        //stopWatch.Start();

        var task = Task.Factory.StartNew(() => ProcessTask(processList, entities), TaskCreationOptions.LongRunning);
        task.ContinueWith(t =>
        {
            if(_lockAquired)
                Monitor.Exit(_lockObject);
            //stopWatch.Stop();

        });
    }

    private void ProcessTask(List<Action<IEntity>> process, IEnumerable<IEntity> entities)
    {

        foreach (var entity in entities)
        {
            var switcheroo = entity; // To avoid closure or whatever
            Task.Factory.StartNew(() => RunSingleEntityProcess(process, switcheroo), TaskCreationOptions.AttachedToParent);
        }
    }

    private void RunSingleEntityProcess(List<Action<IEntity>> process, IEntity entity)
    {
        foreach (var step in process)
        {
            step(entity);
        }
    }
}

正如您所看到的,这并不复杂,而且这也可能远非生产所需 - 只是一次尝试,展示了我无法使其工作的内容。

我得到的异常当然是在任务继续中的Monitor.Exit()调用。

希望这能让事情变得更清晰一些。


抱歉,你能再解释清楚一下吗?我印象中同一个对象上的两个lock代码块不可能同时执行,所以从你写的内容来看似乎是正确的。或许提供更多的代码示例会有帮助。 - faester
嗨,好的,我会更新更多的源代码来展示我正在做什么。 - Rune Jacobsen
ContinueWith方法不具备线程安全性,并且过于复杂。建议在Parallel.ForEach()周围加上try/finally语句块。 - H H
@Henk - 好的,我会尝试这个方法- 但是,我的理解是,在这种情况下,续行将在所有子任务完成后运行 - 只要它不会做其他太多事情而实际上释放锁,那么这会引起问题吗? 我主要关注使呼叫者易于调用且非阻塞。谢谢! - Rune Jacobsen
不是完整的答案,但是在开始每个任务时小心阻塞获取锁。如果你这样做足够多次,你可能会使线程池饿死,发生各种糟糕的事情(例如,你的继续可能无法运行,因为它们无法被调度,因为其他任务已经启动并进入了阻塞状态)。如果你不释放直到继续执行,你现在就死锁了。 - Dan Bryant
显示剩余3条评论
1个回答

3
您可以使用队列并确保一次只执行一个任务,该任务将处理队列,例如:

您可以使用队列并确保一次只执行一个任务,该任务将处理队列,例如:

private readonly object _syncObj = new object();
private readonly ConcurrentQueue<Action> _tasks = new ConcurrentQueue<Action>();

public void QueueTask(Action task)
{
    _tasks.Enqueue(task);

    Task.Factory.StartNew(ProcessQueue);
}

private void ProcessQueue()
{
    while (_tasks.Count != 0 && Monitor.TryEnter(_syncObj))
    {
        try
        {
            Action action;

            while (_tasks.TryDequeue(out action))
            {
                action();
            }
        }
        finally
        {
            Monitor.Exit(_syncObj);
        }
    }
}

嗨,谢谢你的建议。当然,那是一个可能性 - 但这意味着我必须跟踪这个类来保持状态,对吧?那是一种可能性,但我试图避免这种情况。不过,我会记住它作为一个可能的解决方案! - Rune Jacobsen
1
@Rune 最好将这样的功能分离到它们自己的类中。这是一种被称为“单一职责原则”的软件工程实践。简而言之,它旨在使您的软件更模块化、更易于理解和维护。希望对您有所帮助。 - Tim Lloyd
嗨,是的,我知道这个原则 - 因此EntityUpdater是一个单独的类,只做这个。然而,我的观点是,我基本上希望我的代码的任何部分都可以新建一个EntityUpdater并启动一个进程,而不是必须定位整个系统的单个“全局”EntityUpdater,该EntityUpdater具有ConcurrentQueue<T>。如果可能的话,我更喜欢尽可能减少全局变量的使用。 :) - Rune Jacobsen
@Rune 你可能也知道依赖注入。 - Tim Lloyd
当 _tasks.Count == 0 时,忙等待的代码不太好。相反,应该使用 Monitor.Wait 将线程置于睡眠状态。 - h9uest
显示剩余5条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接