在每种情况下,信号量至少具有 互斥锁, 一个 值 和一些账务。如果 值 低于零,它的绝对值就是等待线程的数量(例如,value==-3 表示有 3 个线程正在等待)。
在任何给定优先级下,信号量跟踪等待者的数量以及已释放的等待者的数量。伪代码如下:
typedef struct priority_sem_s {
int value; // if negative, abs(sem->value) == no. of waiting threads
pthread_mutex_t mutex;
pthread_cond_t cv;
int n_waiting[N_PRIORITIES]; // no. waiting (blocked) at each priority
int n_released[N_PRIORITIES]; // no. waiters released (unblocked) at each priority
} priosem_t;
void post(priosem_t *sem):
lock(sem->mutex);
sem->value++;
if (sem->value <= 0 && prio_waiting_is_NOT_empty(sem)):
// someone was waiting; release one of the highest prio
int prio = fetch_highest_prio_waiting(sem);
sem->prio_waiting[prio]--;
sem->prio_released[prio]++;
cond_broadcast(sem->cv, sem->mutex);
unlock(sem->mutex);
void wait(priosem_t *sem, int prio):
lock(sem->mutex);
sem->value--;
if (sem->value < 0):
// get in line
sem->prio_waiting[prio]++;
while (sem->prio_released[prio] < 0):
cond_wait(sem->cv, sem->mutex);
// ok to leave
sem->prio_released[prio]--;
unlock(sem->mutex);
优点:可以在进程间共享(使用共享内存实现)。
缺点:唤醒每个等待者以释放一个信号量。Martin James建议每个优先级使用一个条件变量,这将减少“不必要”的唤醒,但代价是需要更多的同步原语。
使用sigsuspend和带有noop处理器的实时信号来暂停和恢复等待者。伪C代码如下:
typedef struct priority_sem_s {
int value; // if negative, abs(value) == no. of waiting threads
pthread_mutex_t mutex;
void *waiting; // ordered list of [priority, thread-id] pairs
} priosem_t;
void post(priosem_t *sem):
lock(sem->mutex);
sem->value++;
if (sem->value <= 0 && waiting_queue_is_NOT_empty(sem)):
pthread_t tid = pop_highest_prio_waiter(sem);
pthread_kill(tid, SIGRTMIN+n);
unlock(sem->mutex);
void wait(priosem_t *sem, int prio):
// XXX --> PRECONDITION: SIGRTMIN+n is SIG_BLOCK'd <-- XXX
// XXX --> PRECONDITION: SIGRTMIN+n has a no-op handler installed <-- XXX
lock(sem->mutex);
sem->value--;
if (sem->value < 0):
// get in line
add_me_to_wait_list(sem, pthread_self(), prio);
unlock(sem->mutex);
sigsuspend(full_mask_except_sigrtmin_plus_n);
return; // OK!
unlock(sem->mutex);
优点:概念上更简单;没有不必要的唤醒。
缺点:不能在进程之间共享。必须选择一个可用的实时信号或动态选择(查找具有SIG_DFL状态的未屏蔽信号?),并尽早屏蔽它。
我需要开发一个具有以下特征的信号量结构:
Capacity
个线程。执行后,线程退出关键部分;点1-2通常描述了一个理论的信号量数据类型,而点3-4则规定了其他行为/API约束和功能。毫不意外,这种结构可以仅使用互斥锁和条件变量原语来构建,尽管信号量经常被错误地表示为同步原语。它遵循C++11实现,也可以移植到提供上述原语的任何语言/环境中。由于要求通知机制不保持忙碌的信号量锁,因此实现并不完全简单。自定义优先级和优先级编辑未实现,因为我不需要类似调度程序的功能,但它们也应该是可能的。
Semaphore.h
#pragma once
#include <condition_variable>
#include <mutex>
#include <thread>
#include <functional>
#include <list>
namespace usr
{
typedef std::function<void(unsigned processIndex)> SemaphoreNotifier;
class Semaphore;
class SemaphoreToken final
{
friend class Semaphore;
public:
SemaphoreToken();
private:
SemaphoreToken(Semaphore &semaphore);
private:
void Invalidate();
private:
Semaphore *Parent;
std::thread::id ThreadId;
};
class SemaphoreCounter final
{
friend class Semaphore;
public:
SemaphoreCounter();
private:
void Increment();
public:
unsigned GetCount() const { return m_count; }
private:
unsigned m_count;
};
class Semaphore final
{
class Process
{
public:
Process(unsigned index);
public:
void Wait();
void Set();
void Decrement();
void Detach();
public:
bool IsDetached() const { return m_detached; }
unsigned GetIndex() const { return m_index; }
private:
std::mutex m_mutex;
unsigned m_index; // Guarded by m_mutex
bool m_detached; // Guarded by m_mutex
std::unique_lock<std::mutex> m_lock;
std::condition_variable m_cond;
};
public:
Semaphore(unsigned capacity = 1);
public:
SemaphoreToken Enter();
SemaphoreToken Enter(SemaphoreCounter &counter, unsigned &id);
SemaphoreToken Enter(const SemaphoreNotifier ¬ifier);
SemaphoreToken Enter(const SemaphoreNotifier ¬ifier, SemaphoreCounter &counter, unsigned &id);
bool TryEnter(SemaphoreToken &token);
bool TryEnter(SemaphoreCounter &counter, unsigned &id, SemaphoreToken &token);
void Exit(SemaphoreToken &token);
private:
bool enter(bool tryEnter, const SemaphoreNotifier ¬ifier, SemaphoreCounter *counter, unsigned &id, SemaphoreToken &token);
private:
// Disable copy constructor and assign operator
Semaphore(const Semaphore &);
Semaphore & operator=(const Semaphore &);
public:
unsigned GetCapacity() const { return m_capacity; }
private:
mutable std::mutex m_mutex;
unsigned m_capacity;
unsigned m_leftCapacity; // Guarded by m_mutex
std::list<Process *> m_processes; // Guarded by m_mutex
};
}
Semaphore.cpp
#include "Semaphore.h"
#include <cassert>
#include <limits>
#include <algorithm>
using namespace std;
using namespace usr;
Semaphore::Semaphore(unsigned capacity)
{
if (capacity == 0)
throw runtime_error("Capacity must not be zero");
m_capacity = capacity;
m_leftCapacity = capacity;
}
SemaphoreToken Semaphore::Enter()
{
unsigned id;
SemaphoreToken token;
enter(false, nullptr, nullptr, id, token);
return token;
}
SemaphoreToken Semaphore::Enter(SemaphoreCounter &counter, unsigned &id)
{
SemaphoreToken token;
enter(false, nullptr, &counter, id, token);
return token;
}
SemaphoreToken Semaphore::Enter(const SemaphoreNotifier ¬ifier)
{
unsigned id;
SemaphoreToken token;
enter(false, notifier, nullptr, id, token);
return token;
}
SemaphoreToken Semaphore::Enter(const SemaphoreNotifier ¬ifier,
SemaphoreCounter &counter, unsigned &id)
{
SemaphoreToken token;
enter(false, notifier, &counter, id, token);
return token;
}
bool Semaphore::TryEnter(SemaphoreToken &token)
{
unsigned id;
return enter(true, nullptr, nullptr, id, token);
}
bool Semaphore::TryEnter(SemaphoreCounter &counter, unsigned &id, SemaphoreToken &token)
{
return enter(true, nullptr, &counter, id, token);
}
bool Semaphore::enter(bool tryEnter, const SemaphoreNotifier ¬ifier,
SemaphoreCounter *counter, unsigned &id, SemaphoreToken &token)
{
unique_lock<mutex> lock(m_mutex);
if (counter != nullptr)
{
id = counter->GetCount();
counter->Increment();
}
if (m_leftCapacity > 0)
{
// Semaphore is availabile without accessing queue
assert(m_processes.size() == 0);
m_leftCapacity--;
}
else
{
if (tryEnter)
return false;
Process process((unsigned)m_processes.size());
unsigned previousIndex = numeric_limits<unsigned>::max();
m_processes.push_back(&process);
// Release semaphore unlock
lock.unlock();
NotifyAndWait:
unsigned index = process.GetIndex();
if (notifier != nullptr && index != 0 && index != previousIndex)
{
try
{
// Notify the caller on progress
notifier(index);
}
catch (...)
{
// Retake Semaphore lock
lock.lock();
// Remove the failing process
auto found = std::find(m_processes.begin(), m_processes.end(), &process);
auto it = m_processes.erase(found);
for (; it != m_processes.end(); it++)
{
// Decrement following processes
auto &otherProcess = **it;
otherProcess.Decrement();
otherProcess.Set();
}
// Rethrow. NOTE: lock will be unlocked by RAII
throw;
}
previousIndex = index;
}
process.Wait();
if (!process.IsDetached())
goto NotifyAndWait;
}
token = SemaphoreToken(*this);
return true;
}
void Semaphore::Exit(SemaphoreToken &token)
{
if (this != token.Parent || token.ThreadId != this_thread::get_id())
throw runtime_error("Exit called from wrong semaphore or thread");
{
unique_lock<mutex> lock(m_mutex);
if (m_processes.size() == 0)
{
m_leftCapacity++;
}
else
{
auto front = m_processes.front();
m_processes.pop_front();
front->Detach();
front->Set();
for (auto process : m_processes)
{
process->Decrement();
process->Set();
}
}
token.Invalidate();
}
}
SemaphoreToken::SemaphoreToken() :
Parent(nullptr)
{
}
SemaphoreToken::SemaphoreToken(usr::Semaphore &semaphore) :
Parent(&semaphore),
ThreadId(this_thread::get_id())
{
}
void SemaphoreToken::Invalidate()
{
Parent = nullptr;
ThreadId = thread::id();
}
SemaphoreCounter::SemaphoreCounter()
: m_count(0)
{
}
void SemaphoreCounter::Increment()
{
m_count++;
}
Semaphore::Process::Process(unsigned index) :
m_index(index),
m_detached(false),
m_lock(m_mutex)
{
}
void Semaphore::Process::Wait()
{
m_cond.wait(m_lock);
}
void Semaphore::Process::Set()
{
m_cond.notify_one();
}
void Semaphore::Process::Decrement()
{
unique_lock<mutex> lock(m_mutex);
assert(m_index > 0);
m_index--;
}
void Semaphore::Process::Detach()
{
unique_lock<mutex> lock(m_mutex);
assert(m_index == 0);
m_detached = true;
}
SemaphoreCounter counter;
Semaphore semaphore(4); // Up to 4 threads can execute simultaneously
vector<shared_ptr<thread>> threads;
int threadCount = 300;
for (int i = 0; i < threadCount; i++)
{
threads.push_back(std::make_shared<thread>([&semaphore, &counter]
{
unsigned threadId;
auto token = semaphore.Enter([&threadId](unsigned index) {
cout << "Thread " << threadId << " has " << index << " processes ahead before execution" << endl;
}, counter, threadId);
cout << "EXECUTE Thread " << threadId << endl;
std::this_thread::sleep_for(15ms);
semaphore.Exit(token);
}));
}
for (int i = 0; i < threadCount; i++)
threads[i]->join();
我认为你需要构建自己的“PrioritySemaphore”(PS)类,其中包含post()和wait(priority)方法。你需要一个互斥锁来保护内部数据,一个'totalCount'整数和一个包含线程等待信号量和'PriorityCount'整数的结构体数组[priority]。
wait(priority):锁定互斥锁。如果totalCount>0,则将其减少,解锁互斥锁并返回。如果totalCount=0,则使用(priority)索引数组,增加PriorityCount,解锁互斥锁并在信号量上等待。
post():锁定互斥锁。如果totalCount=0,则增加它,解锁互斥锁并返回。如果totalCount>0,则从最高优先级端迭代数组,查找非零PriorityCount。如果没有找到,则增加totalCount,解锁互斥锁并返回。如果找到非零PriorityCount,则将其减少,在该优先级处发出信号量,解锁互斥锁并返回。