AutoResetEvent和多个Sets

6
我正在尝试设计一种围绕堆栈的数据结构,当堆栈有可用项目时会阻塞。我尝试使用AutoResetEvent,但我认为我误解了同步过程的工作方式。基本上,查看以下代码,我正在尝试从堆栈弹出一个元素,但没有可用项目。
似乎AutoResetEvent的行为类似于信号量。这是正确的吗?我可以在BlockingStack.Get()中摆脱Set()并完成它吗?还是这会导致我只使用一个堆栈项的情况?
public class BlockingStack
{
    private Stack<MyType> _internalStack;
    private AutoResetEvent _blockUntilAvailable;

    public BlockingStack()
    {
        _internalStack = new Stack<MyType>(5);
        _blockUntilAvailable = new AutoResetEvent(false);

        for (int i = 0; i < 5; ++i)
        {
            var obj = new MyType();
            Add(obj);
        }
    }

    public MyType Get()
    {
        _blockUntilAvailable.WatiOne();

        lock (_internalStack)
        {
            var obj = _internalStack.Pop();
            if (_internalStack.Count > 0)
            {
                _blockUntilAvailable.Set(); // do I need to do this?
            }

            return obj;
        }
    }

    public void Add(MyType obj)
    {
        lock (_internalStack)
        {
            _internalStack.Push(obj);
            _blockUntilAvailable.Set();
        }
    }
}

我的假设是当一个线程通过WaitOne()函数调用时,AutoResetEvent会为所有等待的线程重置。然而,似乎有多个线程正在进入。除非我在逻辑上搞错了什么。
编辑:这是针对Silverlight的。

相关:https://dev59.com/RFDTa4cB1Zd3GeqPLrtL#3798033 - Hans Passant
3个回答

6

除非你只是想了解线程如何工作,否则最好使用阻塞集合。这将为您提供由堆栈支持的阻塞集合:

ConcurrentStack<SomeType> MyStack = new ConcurrentStack<SomeType>();
BlockingCollection<SomeType> SharedStack = new BlockingCollection<SomeType>(MyStack)

您可以通过调用sharedStack.Take()来以线程安全的方式访问它,所有阻塞都会被正确地处理。请参见这里
编辑: 我花了一段时间(尝试了两次),但我想我已经解决了你的问题。
考虑一个有3个线程等待事件的空堆栈。
调用Add,堆栈有一个对象,并允许一个线程通过事件。
立即再次调用Add。
现在第一个线程等待从Add获取锁。
Add将第二个对象添加到堆栈中,并允许另一个线程通过事件。
现在堆栈上有两个对象和2个线程通过事件,都在等待锁。
第一个Get线程现在获取锁并弹出。看到堆栈上仍有一个对象并调用SET。
第三个线程允许通过事件。
第二个Get线程现在获取锁并弹出。看到堆栈中没有任何内容,不会调用set。
但是,为时已晚。第三个线程已经被允许通过,因此当第二个线程放弃锁时,第三个线程尝试从空堆栈中弹出并抛出异常。

这并没有回答 OP 的问题 - 我正在尝试设计一个围绕堆栈的数据结构,它会阻塞直到堆栈有可用的项。 - oleksii
不,它并没有特别的 - 这就是为什么我指出他/她可能只是想要理解正在发生的事情。但如果他们实际上正在尝试创建这个结构以供使用,更好的选择是使用阻塞集合(如果使用.net4)。创建阻塞集合很难做到正确,并且非常容易以微妙的方式出错。 - Russell Troywest
1
这正是阻塞集合所做的。您在其上调用Take(),它会一直阻塞,直到底层集合中有可用的内容可以取出。 - Russell Troywest
1
关于Silverlight,真是太遗憾了——BlockingCollection非常有用。我在盯着你的代码看了一会儿后运行了它,没有看到您如何在堆栈为空时获取某些内容。对我来说运行得很好。你是否遇到了异常?按照现有的方式,您将无法检索堆栈中的初始值,直到第一次添加东西,并且AutoResetEvents以不符合预期而著称,但似乎应该可以工作。 - Russell Troywest
1
唉,意识到第一次并没有完全正确。我实际上不得不起床好好想一想,因为这个问题一直在我的脑海中盘旋。我相当确定这次是对的。希望如此,我真的需要一些睡眠 :) - Russell Troywest
显示剩余3条评论

1

我没有验证基于Monitor的解决方案,但我编写了一个基于信号量的解决方案,看起来是有效的:

public class Semaphore
{
    private int _count;
    private int _maximum;
    private object _countGuard;

    public Semaphore(int maximum)
    {
        _count = 0;
        _maximum = maximum;
        _countGuard = new object();
    }

    public void WaitOne()
    {
        while (true)
        {
            lock (_countGuard)
            {
                if (_count < _maximum)
                {
                    _count++;
                    return;
                }
            }
            Thread.Sleep(50);
        }
    }

    public void ReleaseOne()
    {
        lock (_countGuard)
        {
            if (_count > 0)
            {
                _count--;
            }
        }
    }
}

public class BlockingStack
{
    private Stack<MyType> _internalStack;
    private Semaphore _blockUntilAvailable;

    public BlockingStack()
    {
        _internalStack = new Stack<MyType>(5);
        _blockUntilAvailable = new Semaphore(5);

        for (int i = 0; i < 5; ++i)
        {
            var obj = new MyType();
            Add(obj);
        }
    }

    public MyType Get()
    {
        _blockUntilAvailable.WaitOne();

        lock (_internalStack)
        {
            var obj = _internalStack.Pop();
            return obj;
        }
    }

    public void Add(MyType obj)
    {
        lock (_internalStack)
        {
            _internalStack.Push(obj);
            _blockUntilAvailable.ReleaseOne();
        }
    }
}

1

不,你当前的代码毫无意义。目前每次调用Get方法时(.WaitOne调用),你都会阻塞线程。

你可能想要这样的东西:

public class BlockingStack<T>
{
    private Stack<T> _internalStack;
    private AutoResetEvent _blockUntilAvailable;

    public BlockingStack()
    {
        _internalStack = new Stack<T>(5);
        _blockUntilAvailable = new AutoResetEvent(false);
    }

    public T Pop()
    {
        lock (_internalStack)
        {
            if (_internalStack.Count == 0)
                _blockUntilAvailable.WaitOne();

            return _internalStack.Pop();
        }
    }

    public void Push(T obj)
    {
        lock (_internalStack)
        {
            _internalStack.Push(obj);

            if(_internalStack.Count == 0)
                _blockUntilAvailable.Set();
        }
    }
}

这个想法是,如果_internalStack中当前的项目数为0,则应等待来自Push方法的信号。一旦收到信号,它就会继续并从堆栈中弹出一个项目。


编辑: 以上代码存在2个问题:

  1. 每当Pop.WaitOne上阻塞时,它都不会释放锁定的 _internalStack,因此Push永远无法获得锁。

  2. 当同一线程多次调用Pop时,它们共享 AutoResetEvent的初始状态 - 例如,当添加项目时, Push会发出信号AutoResetEvent。现在,当我弹出一个项时, 第一次正常工作,因为实际上有一个项目在 Stack中。但是第二次,Stack中没有值, 它通过在AutoResetEvent上调用.WaitOne来等待-但由于 调用Push已对此事件发出信号,它将返回true,并且 不像预期那样等待。

(可行的)替代方案:

public class BlockingStack<T>
{
    private Stack<T> _internalStack;

    public BlockingStack()
    {
        _internalStack = new Stack<T>(5);
    }

    public T Pop()
    {
        lock (_internalStack)
        {
            if (_internalStack.Count == 0)
                Monitor.Wait(_internalStack);

            return _internalStack.Pop();
        }
    }

    public void Push(T obj)
    {
        lock (_internalStack)
        {
            _internalStack.Push(obj);
            Monitor.Pulse(_internalStack);
        }
    }
}

1
不起作用,调用 Push、Pop、Pop、Push,并观察死锁。 - Hans Passant
@HansPassant,你为什么要在单线程上调用Push、Pop、Pop、Push呢?难道你期望它在同一线程上以某种方式调用Push,尽管Pop正在阻塞该线程吗? - ebb
@HansPassant,请查看更新,看看这次我是否做对了 :) - ebb
1
请注意我链接中的 while 循环,在存在多个消费者线程时是必须的。 - Hans Passant
@HansPassant,我不确定为什么你想要一个while循环... 你能详细说明一下吗? - ebb

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接