如何在任何时候暂停一个pthread?

7

最近我开始尝试将ucos-ii移植到Ubuntu PC。

众所周知,通过在pthread的回调函数中添加一个标志来执行暂停和恢复(如下面的解决方案)无法模拟ucos-ii中的“进程”,因为ucos-ii中的“进程”可以随时暂停或恢复!

如何在Linux上使用C睡眠或暂停PThread

我在下面的网站上找到了一个解决方案,但它无法构建,因为它已过时。它使用Linux中的进程来模拟ucos-ii中的任务(类似于我们Linux中的进程)。

http://www2.hs-esslingen.de/~zimmerma/software/index_uk.html

如果pthread能像进程一样随时暂停和恢复,请告诉我一些相关的函数,我可以自己解决。如果不能,我想我应该专注于旧的解决方案。非常感谢。

3
为什么不能只使用条件变量和共享原子标志来暂停/恢复进程? - user406009
1
让我们想象一下您的 CPU 如何调度系统中的进程。例如,当一个进程在 CPU 中运行完自己的时间片时,它会被要求休眠。即使您的程序中没有与休眠或同步相关的语句,操作系统也会坚持要求您的进程这样做。现在,我要做的是使用 pthreads 来模拟操作系统中的进程。 - fish47
4个回答

7
Modula-3的垃圾收集器需要在任意时刻挂起pthread,而不仅仅是在它们等待条件变量或互斥量时。它通过注册一个(Unix)信号处理程序来挂起线程,然后使用pthread_kill向目标线程发送信号来实现此操作。我认为它可以工作(对其他人来说已经很可靠了,但我现在正在调试它的问题...)虽然有点笨拙...请搜索ThreadPThread.m3并查看“StopWorld”和“StartWorld”例程。句柄本身在ThreadPThreadC.c中。

5
如果使用条件变量停留在特定点不足够,那么您无法使用pthread完成此操作。 pthread接口不包括挂起/恢复功能。
例如,请参见E.4答案,位于此处:这里
“POSIX标准没有提供一种机制,使线程A可以暂停另一个线程B的执行,而不需要B的协作。实现挂起/重启机制的唯一方法是让B定期检查一些全局变量以获取挂起请求,然后将自己挂起在条件变量上,另一个线程稍后可以发出信号以重新启动B。”
该FAQ答案继续介绍了一些非标准的方法,在Solaris和LinuxThreads中介绍了其中的一种(现已过时;请勿将其与Linux上的当前线程混淆);这些都不适用于您的情况。

4
在Linux上,您可以设置自定义信号处理程序(例如使用signal()),其中包含等待另一个信号(例如使用sigsuspend())。然后,您可以使用pthread_kill()或tgkill()发送信号。重要的是要使用所谓的“实时信号”,因为正常的信号如SIGUSR1和SIGUSR2不会排队,这意味着它们在高负载条件下可能会丢失。您可以多次发送信号,但只接收一次,因为在信号处理程序运行期间,相同类型的新信号将被忽略。因此,如果有并发线程执行PAUSE/RESUME操作,则可能会丢失RESUME事件并导致死锁。另一方面,挂起的实时信号(例如SIGRTMIN+1和SIGRTMIN+2)不会被去重,因此队列中可能存在多个相同的rt信号。
免责声明:我还没有尝试过这个方法。但理论上应该能够工作。
此外,请参阅man 7 signal-safety。其中有一份调用列表,您可以安全地在信号处理程序中调用。幸运的是,sigsuspend()似乎是其中之一。
更新:我在这里有正在工作的代码。
//Filename: pthread_pause.c
//Author: Tomas 'Harvie' Mudrunka 2021
//Build: CFLAGS=-lpthread make pthread_pause; ./pthread_pause
//Test: valgrind --tool=helgrind ./pthread_pause

//I've wrote this code as excercise to solve following stack overflow question:
// https://dev59.com/Y2DVa4cB1Zd3GeqPgLtR#68119116

#define _GNU_SOURCE //pthread_yield() needs this
#include <signal.h>
#include <pthread.h>
//#include <pthread_extra.h>
#include <semaphore.h>
#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include <unistd.h>
#include <errno.h>
#include <sys/resource.h>
#include <time.h>

#define PTHREAD_XSIG_STOP (SIGRTMIN+0)
#define PTHREAD_XSIG_CONT (SIGRTMIN+1)
#define PTHREAD_XSIGRTMIN (SIGRTMIN+2) //First unused RT signal

pthread_t main_thread;
sem_t pthread_pause_sem;
pthread_once_t pthread_pause_once_ctrl = PTHREAD_ONCE_INIT;

void pthread_pause_once(void) {
    sem_init(&pthread_pause_sem, 0, 1);
}

#define pthread_pause_init() (pthread_once(&pthread_pause_once_ctrl, &pthread_pause_once))

#define NSEC_PER_SEC (1000*1000*1000)
// timespec_normalise() from https://github.com/solemnwarning/timespec/
struct timespec timespec_normalise(struct timespec ts)
{
    while(ts.tv_nsec >= NSEC_PER_SEC) {
        ++(ts.tv_sec); ts.tv_nsec -= NSEC_PER_SEC;
    }
    while(ts.tv_nsec <= -NSEC_PER_SEC) {
        --(ts.tv_sec); ts.tv_nsec += NSEC_PER_SEC;
    }
    if(ts.tv_nsec < 0) { // Negative nanoseconds isn't valid according to POSIX.
        --(ts.tv_sec); ts.tv_nsec = (NSEC_PER_SEC + ts.tv_nsec);
    }
    return ts;
}

void pthread_nanosleep(struct timespec t) {
    //Sleep calls on Linux get interrupted by signals, causing premature wake
    //Pthread (un)pause is built using signals
    //Therefore we need self-restarting sleep implementation
    //IO timeouts are restarted by SA_RESTART, but sleeps do need explicit restart
    //We also need to sleep using absolute time, because relative time is paused
    //You should use this in any thread that gets (un)paused

    struct timespec wake;
    clock_gettime(CLOCK_MONOTONIC, &wake);

    t = timespec_normalise(t);
    wake.tv_sec += t.tv_sec;
    wake.tv_nsec += t.tv_nsec;
    wake = timespec_normalise(wake);

    while(clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, &wake, NULL)) if(errno!=EINTR) break;
    return;
}
void pthread_nsleep(time_t s, long ns) {
    struct timespec t;
    t.tv_sec = s;
    t.tv_nsec = ns;
    pthread_nanosleep(t);
}

void pthread_sleep(time_t s) {
    pthread_nsleep(s, 0);
}

void pthread_pause_yield() {
    //Call this to give other threads chance to run
    //Wait until last (un)pause action gets finished
    sem_wait(&pthread_pause_sem);
    sem_post(&pthread_pause_sem);
        //usleep(0);
    //nanosleep(&((const struct timespec){.tv_sec=0,.tv_nsec=1}), NULL);
    //pthread_nsleep(0,1); //pthread_yield() is not enough, so we use sleep
    pthread_yield();
}

void pthread_pause_handler(int signal) {
    //Do nothing when there are more signals pending (to cleanup the queue)
    //This is no longer needed, since we use semaphore to limit pending signals
    /*
    sigset_t pending;
    sigpending(&pending);
    if(sigismember(&pending, PTHREAD_XSIG_STOP)) return;
    if(sigismember(&pending, PTHREAD_XSIG_CONT)) return;
    */

    //Post semaphore to confirm that signal is handled
    sem_post(&pthread_pause_sem);
    //Suspend if needed
    if(signal == PTHREAD_XSIG_STOP) {
        sigset_t sigset;
        sigfillset(&sigset);
        sigdelset(&sigset, PTHREAD_XSIG_STOP);
        sigdelset(&sigset, PTHREAD_XSIG_CONT);
        sigsuspend(&sigset); //Wait for next signal
    } else return;
}

void pthread_pause_enable() {
    //Having signal queue too deep might not be necessary
    //It can be limited using RLIMIT_SIGPENDING
    //You can get runtime SigQ stats using following command:
    //grep -i sig /proc/$(pgrep binary)/status
    //This is no longer needed, since we use semaphores
    //struct rlimit sigq = {.rlim_cur = 32, .rlim_max=32};
    //setrlimit(RLIMIT_SIGPENDING, &sigq);

    pthread_pause_init();

    //Prepare sigset
    sigset_t sigset;
    sigemptyset(&sigset);
    sigaddset(&sigset, PTHREAD_XSIG_STOP);
    sigaddset(&sigset, PTHREAD_XSIG_CONT);

    //Register signal handlers
    //signal(PTHREAD_XSIG_STOP, pthread_pause_handler);
    //signal(PTHREAD_XSIG_CONT, pthread_pause_handler);
    //We now use sigaction() instead of signal(), because it supports SA_RESTART
    const struct sigaction pause_sa = {
        .sa_handler = pthread_pause_handler,
        .sa_mask = sigset,
        .sa_flags = SA_RESTART,
        .sa_restorer = NULL
    };
    sigaction(PTHREAD_XSIG_STOP, &pause_sa, NULL);
    sigaction(PTHREAD_XSIG_CONT, &pause_sa, NULL);

    //UnBlock signals
    pthread_sigmask(SIG_UNBLOCK, &sigset, NULL);
}

void pthread_pause_disable() {
    //This is important for when you want to do some signal unsafe stuff
    //Eg.: locking mutex, calling printf() which has internal mutex, etc...
    //After unlocking mutex, you can enable pause again.

    pthread_pause_init();

    //Make sure all signals are dispatched before we block them
    sem_wait(&pthread_pause_sem);

    //Block signals
    sigset_t sigset;
    sigemptyset(&sigset);
    sigaddset(&sigset, PTHREAD_XSIG_STOP);
    sigaddset(&sigset, PTHREAD_XSIG_CONT);
    pthread_sigmask(SIG_BLOCK, &sigset, NULL);

    sem_post(&pthread_pause_sem);
}


int pthread_pause(pthread_t thread) {
    sem_wait(&pthread_pause_sem);
    //If signal queue is full, we keep retrying
    while(pthread_kill(thread, PTHREAD_XSIG_STOP) == EAGAIN) usleep(1000);
    pthread_pause_yield();
    return 0;
}

int pthread_unpause(pthread_t thread) {
    sem_wait(&pthread_pause_sem);
    //If signal queue is full, we keep retrying
    while(pthread_kill(thread, PTHREAD_XSIG_CONT) == EAGAIN) usleep(1000);
    pthread_pause_yield();
    return 0;
}

void *thread_test() {
    //Whole process dies if you kill thread immediately before it is pausable
    //pthread_pause_enable();
    while(1) {
    //Printf() is not async signal safe (because it holds internal mutex),
    //you should call it only with pause disabled!
    //Will throw helgrind warnings anyway, not sure why...
    //See: man 7 signal-safety
    pthread_pause_disable();
        printf("Running!\n");
    pthread_pause_enable();

    //Pausing main thread should not cause deadlock
    //We pause main thread here just to test it is OK
    pthread_pause(main_thread);
    //pthread_nsleep(0, 1000*1000);
    pthread_unpause(main_thread);

    //Wait for a while
    //pthread_nsleep(0, 1000*1000*100);
    pthread_unpause(main_thread);
    }
}

int main() {
    pthread_t t;
    main_thread = pthread_self();
    pthread_pause_enable(); //Will get inherited by all threads from now on
    //you need to call pthread_pause_enable (or disable) before creating threads,
    //otherwise first (un)pause signal will kill whole process
    pthread_create(&t, NULL, thread_test, NULL);

    while(1) {
        pthread_pause(t);
        printf("PAUSED\n");
        pthread_sleep(3);

        printf("UNPAUSED\n");
        pthread_unpause(t);
        pthread_sleep(1);

    /*
    pthread_pause_disable();
        printf("RUNNING!\n");
    pthread_pause_enable();
    */
    pthread_pause(t);
    pthread_unpause(t);
    }

    pthread_join(t, NULL);
    printf("DIEDED!\n");
}

我正在开发一个名为“pthread_extra”的库,其中包括像这样的内容和更多的功能。很快会发布。

更新2:当快速调用pause/unpause时,仍然会导致死锁(已删除sleep()调用)。glibc中的Printf()实现有互斥锁,因此如果您挂起打印Printf()中间的线程,然后想要从您计划稍后取消暂停该线程的线程进行printf(),它将永远不会发生,因为printf()被锁定。不幸的是,我已经删除了Printf()并在线程中只运行了空的while循环,但在高暂停/取消暂停率下仍然出现死锁。我不知道为什么。也许(即使是实时)Linux信号不是100%安全的。有实时信号队列,也许它会溢出或类似的问题...

更新3:我认为我已经成功解决了死锁问题,但必须完全重写大部分代码。现在每个线程都有一个(sig_atomic_t)变量,用于保存该线程是否应该运行的状态。工作方式有点像条件变量。pthread_(un)pause()自动透明地记住每个线程的这个变量。我没有两个信号。现在我只有一个信号。该信号的处理程序查看该变量,并且仅在该变量表明该线程不应该运行时才在sigsuspend()上阻塞。否则它从信号处理程序返回。为了挂起/恢复线程,我现在设置sig_atomic_t变量以期望的状态并调用那个信号(这对于暂停和恢复是相同的)。重要的是使用实时信号来确保在修改状态变量后处理程序将实际运行。由于线程状态数据库,代码有点复杂。尽快简化后,我将在单独的解决方案中共享代码。但我想保留此处的两个信号版本,因为它可以工作,我喜欢它的简单性,也许人们会给我们更多优化的见解。

更新4:我已经修复了原始代码中的死锁(不需要辅助变量持有状态),通过使用两个信号的单个处理程序以及稍微优化信号队列。使用helgrind显示Printf()仍然存在一些问题,但它并非由我的信号引起,即使我根本不调用pause/unpause也会发生。总体而言,这仅在LINUX上进行了测试,不确定代码的可移植性如何,因为似乎存在一些未记录的信号处理程序行为最初导致死锁。

请注意,pause/unpause不能嵌套。如果您暂停了3次,并取消暂停1次,则线程将运行。如果需要这样的行为,则应创建某种包装器,该包装器将计算嵌套级别并相应地发出信号。

更新5:我通过以下更改提高了代码的鲁棒性:使用信号量确保暂停/恢复调用的正确序列化。这将解决所有剩余的死锁问题,现在您可以确信当暂停调用返回时,目标线程已经实际上被暂停了。这也解决了信号队列溢出的问题。我还添加了SA_RESTART标志,它可以防止内部信号中断IO等待。休眠/延迟仍然需要手动重新启动,但我提供了一个方便的封装函数pthread_nanosleep()来做到这一点。

更新6:我意识到简单地重启nanosleep()是不够的,因为这样超时时间不会在线程暂停时运行。因此,我修改了pthread_nanosleep()函数,将超时间隔转换为未来的绝对时间点,并休眠至该时间点。同时,我隐藏了信号量初始化,这样用户就不需要自己处理。


1

以下是带有暂停/恢复功能的类中线程函数的示例...

class SomeClass
{
public:
    // ... construction/destruction

    void Resume();
    void Pause();
    void Stop();

private:
    static void* ThreadFunc(void* pParam);

    pthread_t thread;
    pthread_mutex_t mutex;
    pthread_cond_t cond_var;
    int command;
};

SomeClass::SomeClass()
{
    pthread_mutex_init(&mutex, NULL);
    pthread_cond_init(&cond_var, NULL);

    // create thread in suspended state..
    command = 0;
    pthread_create(&thread, NULL, ThreadFunc, this);
}

SomeClass::~SomeClass()
{
    // we should stop the thread and exit ThreadFunc before calling of blocking pthread_join function
    // also it prevents the mutex staying locked..
    Stop();
    pthread_join(thread, NULL);

    pthread_cond_destroy(&cond_var);
    pthread_mutex_destroy(&mutex);
}

void* SomeClass::ThreadFunc(void* pParam)
{
    SomeClass* pThis = (SomeClass*)pParam;
    timespec time_ns = {0, 50*1000*1000};   // 50 milliseconds

    while(1)
    {
        pthread_mutex_lock(&pThis->mutex);

        if (pThis->command == 2) // command to stop thread..
        {
            // be sure to unlock mutex before exit..
            pthread_mutex_unlock(&pThis->mutex);
            return NULL;
        }
        else if (pThis->command == 0) // command to pause thread..
        {
            pthread_cond_wait(&pThis->cond_var, &pThis->mutex);
            // dont forget to unlock the mutex..
            pthread_mutex_unlock(&pThis->mutex);
            continue;
        }

        if (pThis->command == 1) // command to run..
        {
            // normal runing process..
            fprintf(stderr, "*");
        }

        pthread_mutex_unlock(&pThis->mutex);

        // it's important to give main thread few time after unlock 'this'
        pthread_yield();
        // ... or...
        //nanosleep(&time_ns, NULL);
    }
    pthread_exit(NULL);
}

void SomeClass::Stop()
{
    pthread_mutex_lock(&mutex);
    command = 2;
    pthread_cond_signal(&cond_var);
    pthread_mutex_unlock(&mutex);
}

void SomeClass::Pause()
{
    pthread_mutex_lock(&mutex);
    command = 0;
    // in pause command we dont need to signal cond_var because we not in wait state now..
    pthread_mutex_unlock(&mutex);
}

void SomeClass::Resume()
{
    pthread_mutex_lock(&mutex);
    command = 1;
    pthread_cond_signal(&cond_var);
    pthread_mutex_unlock(&mutex);
}

2
这不是所问问题的解决方案。 - Chris Stratton

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接