在Linux中控制Semaphore队列的出队顺序

3
我希望实现一段代码,其中我需要为不同的线程分配多个“优先级数字”。有些线程可能会在同一个信号量上等待。假设这些线程排队在信号量S上,另一个线程对信号量S执行了sem_post操作。一旦sem_post被执行,我希望在信号量S队列中具有最高“优先级数字”的线程获得对信号量的访问权,而不是其他任何线程。据我所知,没有直接的方法可以实现这一点,因为将选择哪个线程来访问信号量是队列元素中的任意一个(并不一定是FIFO等)。事实上,我尝试增加线程的pthread优先级,但我意识到这也行不通。请问有人能指导我如何在C语言中手动实现控制信号量队列的设计吗?谢谢。
3个回答

1
我可以想到两种方法:
  • 使用 条件变量 来“唤醒某些或所有等候者”,他们将自行解决优先级释放;或
  • 使用(实时)信号来“唤醒按优先级顺序排列的单个特定等候者”

在每种情况下,信号量至少具有 互斥锁, 一个 和一些账务。如果 低于零,它的绝对值就是等待线程的数量(例如,value==-3 表示有 3 个线程正在等待)。

条件变量方法

在任何给定优先级下,信号量跟踪等待者的数量以及已释放的等待者的数量。伪代码如下:

typedef struct priority_sem_s {
  int              value;     // if negative, abs(sem->value) == no. of waiting threads
  pthread_mutex_t  mutex;
  pthread_cond_t   cv;
  int              n_waiting[N_PRIORITIES];  // no. waiting (blocked) at each priority
  int              n_released[N_PRIORITIES]; // no. waiters released (unblocked) at each priority
} priosem_t;

void post(priosem_t *sem):
  lock(sem->mutex);
  sem->value++;

  if (sem->value <= 0 && prio_waiting_is_NOT_empty(sem)):
    // someone was waiting; release one of the highest prio
    int prio = fetch_highest_prio_waiting(sem);
    sem->prio_waiting[prio]--;
    sem->prio_released[prio]++;
    cond_broadcast(sem->cv, sem->mutex);

  unlock(sem->mutex);

void wait(priosem_t *sem, int prio):
  lock(sem->mutex);
  sem->value--;

  if (sem->value < 0):
    // get in line
    sem->prio_waiting[prio]++;
    while (sem->prio_released[prio] < 0):
      cond_wait(sem->cv, sem->mutex);
    // ok to leave
    sem->prio_released[prio]--;

  unlock(sem->mutex);

优点:可以在进程间共享(使用共享内存实现)。

缺点:唤醒每个等待者以释放一个信号量。Martin James建议每个优先级使用一个条件变量,这将减少“不必要”的唤醒,但代价是需要更多的同步原语。

信号量方法

使用sigsuspend和带有noop处理器的实时信号来暂停和恢复等待者。伪C代码如下:

typedef struct priority_sem_s {
  int              value;    // if negative, abs(value) == no. of waiting threads
  pthread_mutex_t  mutex;
  void            *waiting;  // ordered list of [priority, thread-id] pairs
} priosem_t;

void post(priosem_t *sem):
  lock(sem->mutex);
  sem->value++;

  if (sem->value <= 0 && waiting_queue_is_NOT_empty(sem)):
    pthread_t tid = pop_highest_prio_waiter(sem);
    pthread_kill(tid, SIGRTMIN+n);

  unlock(sem->mutex);

void wait(priosem_t *sem, int prio):
  // XXX --> PRECONDITION:  SIGRTMIN+n is SIG_BLOCK'd <-- XXX
  // XXX --> PRECONDITION:  SIGRTMIN+n has a no-op handler installed <-- XXX
  lock(sem->mutex);
  sem->value--;

  if (sem->value < 0):
    // get in line
    add_me_to_wait_list(sem, pthread_self(), prio);
    unlock(sem->mutex);
    sigsuspend(full_mask_except_sigrtmin_plus_n);
    return;  // OK!

  unlock(sem->mutex);

优点:概念上更简单;没有不必要的唤醒。

缺点:不能在进程之间共享。必须选择一个可用的实时信号或动态选择(查找具有SIG_DFL状态的未屏蔽信号?),并尽早屏蔽它。


+1 对于实际的代码,(虽然我认为这非常勇敢:) - Martin James
@MartinJames,你可以说“鲁莽的” :) - pilcrow

0

我需要开发一个具有以下特征的信号量结构:

  1. 有一个关键部分,最多可以同时进入和执行Capacity个线程。执行后,线程退出关键部分;
  2. 当信号量达到其最大容量并且执行队列已满时:队列中的线程将被置于睡眠状态,并在其他线程退出关键部分时唤醒;
  3. 执行队列具有FIFO语义;
  4. 有一个通知机制,通知等待线程其在队列中的位置;
  5. 只有进入关键部分的线程才允许退出。

点1-2通常描述了一个理论的信号量数据类型,而点3-4则规定了其他行为/API约束和功能。毫不意外,这种结构可以仅使用互斥锁条件变量原语来构建,尽管信号量经常被错误地表示为同步原语。它遵循C++11实现,也可以移植到提供上述原语的任何语言/环境中。由于要求通知机制不保持忙碌的信号量锁,因此实现并不完全简单。自定义优先级和优先级编辑未实现,因为我不需要类似调度程序的功能,但它们也应该是可能的。

Semaphore.h

#pragma once

#include <condition_variable>
#include <mutex>
#include <thread>
#include <functional>
#include <list>

namespace usr
{
    typedef std::function<void(unsigned processIndex)> SemaphoreNotifier;

    class Semaphore;

    class SemaphoreToken final
    {
        friend class Semaphore;
    public:
        SemaphoreToken();
    private:
        SemaphoreToken(Semaphore &semaphore);
    private:
        void Invalidate();
    private:
        Semaphore *Parent;
        std::thread::id ThreadId;
    };

    class SemaphoreCounter final
    {
        friend class Semaphore;
    public:
        SemaphoreCounter();
    private:
        void Increment();
    public:
        unsigned GetCount() const { return m_count; }
    private:
        unsigned m_count;
    };

    class Semaphore final
    {
        class Process
        {
        public:
            Process(unsigned index);
        public:
            void Wait();
            void Set();
            void Decrement();
            void Detach();
        public:
            bool IsDetached() const { return m_detached; }
            unsigned GetIndex() const { return m_index; }
        private:
            std::mutex m_mutex;
            unsigned m_index;                   // Guarded by m_mutex
            bool m_detached;                    // Guarded by m_mutex
            std::unique_lock<std::mutex> m_lock;
            std::condition_variable m_cond;
        };
    public:
        Semaphore(unsigned capacity = 1);
    public:
        SemaphoreToken Enter();
        SemaphoreToken Enter(SemaphoreCounter &counter, unsigned &id);
        SemaphoreToken Enter(const SemaphoreNotifier &notifier);
        SemaphoreToken Enter(const SemaphoreNotifier &notifier, SemaphoreCounter &counter, unsigned &id);
        bool TryEnter(SemaphoreToken &token);
        bool TryEnter(SemaphoreCounter &counter, unsigned &id, SemaphoreToken &token);
        void Exit(SemaphoreToken &token);
    private:
        bool enter(bool tryEnter, const SemaphoreNotifier &notifier, SemaphoreCounter *counter, unsigned &id, SemaphoreToken &token);
    private:
        // Disable copy constructor and assign operator
        Semaphore(const Semaphore &);
        Semaphore & operator=(const Semaphore &);
    public:
        unsigned GetCapacity() const { return m_capacity; }
    private:
        mutable std::mutex m_mutex;
        unsigned m_capacity;
        unsigned m_leftCapacity;               // Guarded by m_mutex
        std::list<Process *> m_processes;      // Guarded by m_mutex
    };
}

Semaphore.cpp

#include "Semaphore.h"
#include <cassert>
#include <limits>
#include <algorithm>

using namespace std;
using namespace usr;

Semaphore::Semaphore(unsigned capacity)
{
    if (capacity == 0)
        throw runtime_error("Capacity must not be zero");

    m_capacity = capacity;
    m_leftCapacity = capacity;
}

SemaphoreToken Semaphore::Enter()
{
    unsigned id;
    SemaphoreToken token;
    enter(false, nullptr, nullptr, id, token);
    return token;
}

SemaphoreToken Semaphore::Enter(SemaphoreCounter &counter, unsigned &id)
{
    SemaphoreToken token;
    enter(false, nullptr, &counter, id, token);
    return token;
}

SemaphoreToken Semaphore::Enter(const SemaphoreNotifier &notifier)
{
    unsigned id;
    SemaphoreToken token;
    enter(false, notifier, nullptr, id, token);
    return token;
}

SemaphoreToken Semaphore::Enter(const SemaphoreNotifier &notifier,
    SemaphoreCounter &counter, unsigned &id)
{
    SemaphoreToken token;
    enter(false, notifier, &counter, id, token);
    return token;
}

bool Semaphore::TryEnter(SemaphoreToken &token)
{
    unsigned id;
    return enter(true, nullptr, nullptr, id, token);
}

bool Semaphore::TryEnter(SemaphoreCounter &counter, unsigned &id, SemaphoreToken &token)
{
    return enter(true, nullptr, &counter, id, token);
}

bool Semaphore::enter(bool tryEnter, const SemaphoreNotifier &notifier,
    SemaphoreCounter *counter, unsigned &id, SemaphoreToken &token)
{
    unique_lock<mutex> lock(m_mutex);
    if (counter != nullptr)
    {
        id = counter->GetCount();
        counter->Increment();
    }

    if (m_leftCapacity > 0)
    {
        // Semaphore is availabile without accessing queue
        assert(m_processes.size() == 0);
        m_leftCapacity--;
    }
    else
    {
        if (tryEnter)
            return false;

        Process process((unsigned)m_processes.size());
        unsigned previousIndex = numeric_limits<unsigned>::max();
        m_processes.push_back(&process);

        // Release semaphore unlock
        lock.unlock();

    NotifyAndWait:
        unsigned index = process.GetIndex();
        if (notifier != nullptr && index != 0 && index != previousIndex)
        {
            try
            {
                // Notify the caller on progress
                notifier(index);
            }
            catch (...)
            {
                // Retake Semaphore lock
                lock.lock();

                // Remove the failing process
                auto found = std::find(m_processes.begin(), m_processes.end(), &process);
                auto it = m_processes.erase(found);
                for (; it != m_processes.end(); it++)
                {
                    // Decrement following processes
                    auto &otherProcess = **it;
                    otherProcess.Decrement();
                    otherProcess.Set();
                }

                // Rethrow. NOTE: lock will be unlocked by RAII
                throw;
            }
            previousIndex = index;
        }

        process.Wait();
        if (!process.IsDetached())
            goto NotifyAndWait;
    }

    token = SemaphoreToken(*this);
    return true;
}

void Semaphore::Exit(SemaphoreToken &token)
{
    if (this != token.Parent || token.ThreadId != this_thread::get_id())
        throw runtime_error("Exit called from wrong semaphore or thread");

    {
        unique_lock<mutex> lock(m_mutex);
        if (m_processes.size() == 0)
        {
            m_leftCapacity++;
        }
        else
        {
            auto front = m_processes.front();
            m_processes.pop_front();
            front->Detach();
            front->Set();

            for (auto process : m_processes)
            {
                process->Decrement();
                process->Set();
            }
        }

        token.Invalidate();
    }
}

SemaphoreToken::SemaphoreToken() :
    Parent(nullptr)
{
}

SemaphoreToken::SemaphoreToken(usr::Semaphore &semaphore) :
    Parent(&semaphore),
    ThreadId(this_thread::get_id())
{
}

void SemaphoreToken::Invalidate()
{
    Parent = nullptr;
    ThreadId = thread::id();
}

SemaphoreCounter::SemaphoreCounter()
    : m_count(0)
{
}

void SemaphoreCounter::Increment()
{
    m_count++;
}

Semaphore::Process::Process(unsigned index) :
    m_index(index),
    m_detached(false),
    m_lock(m_mutex)
{
}

void Semaphore::Process::Wait()
{
    m_cond.wait(m_lock);
}

void Semaphore::Process::Set()
{
    m_cond.notify_one();
}

void Semaphore::Process::Decrement()
{
    unique_lock<mutex> lock(m_mutex);
    assert(m_index > 0);
    m_index--;
}

void Semaphore::Process::Detach()
{
    unique_lock<mutex> lock(m_mutex);
    assert(m_index == 0);
    m_detached = true;
}

我使用以下示例代码进行了测试:
SemaphoreCounter counter;
Semaphore semaphore(4);  // Up to 4 threads can execute simultaneously

vector<shared_ptr<thread>> threads;
int threadCount = 300;
for (int i = 0; i < threadCount; i++)
{
    threads.push_back(std::make_shared<thread>([&semaphore, &counter]
    {
        unsigned threadId;
        auto token = semaphore.Enter([&threadId](unsigned index) {
            cout << "Thread " << threadId << " has " << index << " processes ahead before execution" << endl;
        }, counter, threadId);

        cout << "EXECUTE Thread " << threadId << endl;
        std::this_thread::sleep_for(15ms);
        semaphore.Exit(token);
    }));
}

for (int i = 0; i < threadCount; i++)
    threads[i]->join();

0

我认为你需要构建自己的“PrioritySemaphore”(PS)类,其中包含post()和wait(priority)方法。你需要一个互斥锁来保护内部数据,一个'totalCount'整数和一个包含线程等待信号量和'PriorityCount'整数的结构体数组[priority]。

wait(priority):锁定互斥锁。如果totalCount>0,则将其减少,解锁互斥锁并返回。如果totalCount=0,则使用(priority)索引数组,增加PriorityCount,解锁互斥锁并在信号量上等待。

post():锁定互斥锁。如果totalCount=0,则增加它,解锁互斥锁并返回。如果totalCount>0,则从最高优先级端迭代数组,查找非零PriorityCount。如果没有找到,则增加totalCount,解锁互斥锁并返回。如果找到非零PriorityCount,则将其减少,在该优先级处发出信号量,解锁互斥锁并返回。


如果我理解正确的话,在上面你所说的“semaphore”,在pthread中我们会称之为“条件变量”。我建议每个优先级槽位设置一个“PendingReleaseCount”——当找到等待者时,通过post()递增,然后在wait()中被唤醒后递减——如果多个等待者可以到达同一优先级。否则,就会出现竞争情况,新的post()信号可能会“丢失”。 - pilcrow
能否请您再详细解释一下?我认为上面提出的解决方案在等待函数中存在竞争条件。该函数会释放互斥锁,然后等待信号量。此外,假设一个线程首先获取了PrioritySemaphore(初始值为1,使其成为互斥锁),然后另一个线程排队等待该信号量。当前者退出临界区并调用“post”函数时,totalcount=0,因此信号量被解锁,但后者线程无法获得控制权。我们需要再调用一次“post”才能启动后者。 - swarnim_narayan
@wang '该函数释放互斥锁,然后在信号量上等待。' 当然。如果在释放互斥锁之前等待信号量,则该类会发生死锁,因为互斥锁永远不会被释放。仍在考虑您提出的其他观点 :) - Martin James
@wang - 第二点 - 你想要一种在创建时初始化PS的方法,对吗?我必须承认,我只考虑了一个初始计数为0的PS。 - Martin James

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接