如何使用共享内存在进程之间共享信号量

36

我需要将N个客户端进程与一个服务器进行同步。这些进程是由主函数分叉生成的,在其中我声明了3种信号量。我决定使用POSIX信号量,但不知道如何在这些进程之间共享它们。我认为应该可以使用共享内存来正确实现,但我有一些问题:

  • 我如何在我的段中分配正确大小的内存空间?
  • 我能否在shmgetsize_t字段中使用sizeof(sem_t)以便精确分配所需的空间?
  • 是否有类似于这种情况的示例可以参考?
4个回答

67

分享命名为POSIX信号量很容易

  • 选择一个名称作为您的信号量

#define SNAME "/mysem"
  • 在创建它们的进程中使用带有O_CREAT参数的sem_open

  • sem_t *sem = sem_open(SNAME, O_CREAT, 0644, 3); /* Initial value is 3. */
    
  • 在其他进程中打开信号量

  • sem_t *sem = sem_open(SEM_NAME, 0); /* Open a preexisting semaphore. */
    

    如果您坚持使用共享内存,这是可以实现的。

    int fd = shm_open("shmname", O_CREAT, O_RDWR);
    ftruncate(fd, sizeof(sem_t));
    sem_t *sem = mmap(NULL, sizeof(sem_t), PROT_READ | PROT_WRITE,
        MAP_SHARED, fd, 0);
    
    sem_init(sem, 1, 1);
    

    我没有测试过上面的代码,因此它可能完全不可行。


    1
    使用命名信号量,我不需要担心内存分配!我不知道该怎么做。非常感谢。 - Sicioldr
    1
    是的,但在我看来,使用命名信号量更加优雅。 - Sicioldr
    1
    @cnicutar,你的回答很好。我意识到你还没有测试过它。你能否请你将其与以下代码进行比较:int fd = shm_open("shmname", O_CREAT, O_RDWR); ftruncate(fd, sizeof(pthread_mutex_t)); pthread_mutec_t *mutex = mmap(NULL, sizeof(pthread_mutex_t), PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); pthread_mutexattr_init(attribute); pthread_mutexattr_setpshared(attribute, PROCESS_SHARED); pthread_mutex_init(mutex, attributes); - Frank
    不使用命名信号量意味着您无法在没有它们干扰的情况下同时打开多个程序实例吗? - David Roundy
    1
    @DavidRoundy 名称可以是动态的(例如命令行参数),这种情况下可以运行多个实例。 - cnicutar
    我尝试了你的代码片段,使用shm_open却失败了。根据这篇博客https://blog.superpat.com/semaphores-on-linux-sem_init-vs-sem_open,我们应该使用`shm_open("shmname", O_RDWR | O_CREAT, S_IRWXU)`。 - undefined

    5
    我写了一个示例应用程序,其中父进程(生产者)生成N个子进程(消费者),并使用全局数组在它们之间通信。全局数组是一个循环内存,并在向数组写入数据时进行保护。
    头文件:
    #define TRUE 1
    #define FALSE 0
    
    #define SUCCESS 0
    #define FAILURE -1
    
    /*
     * Function Declaration.
     */
    void Parent( void );
    int CalcBitmap(unsigned int Num);
    void InitSemaphore( void );
    void DeInitSemaphore( void );
    int ComputeComplexCalc( int op1, int op2);
    
    /*
     * Thread functions.
     */
    void *Child( void *arg );
    void *Report( void *arg1 );
    
    /*
     * Macro Definition.
     */
    #define INPUT_FILE  "./input.txt"
    #define NUM_OF_CHILDREN 10
    #define SIZE_CIRCULAR_BUF 256
    #define BUF_SIZE 128
    //Set the bits corresponding to all threads.
    #define SET_ALL_BITMAP( a ) a = uiBitmap;     
    //Clears the bit corresponding to a thread after
    //every read. 
    #define CLEAR_BIT( a, b ) a &= ~( 1 << b );   
    
    /*
     * Have a dedicated semaphore for each buffer
     * to synchronize the acess to the shared memory
     * between the producer (parent) and the consumers. 
     */
    sem_t ReadCntrMutex;
    sem_t semEmptyNode[SIZE_CIRCULAR_BUF];
    
    /*
     * Global Variables.
     */
    unsigned int uiBitmap = 0;
    //Counter to track the number of processed buffers.
    unsigned int Stats;                           
    unsigned int uiParentCnt = 0;
    char inputfile[BUF_SIZE];
    int EndOfFile = FALSE;
    
    /*
     * Data Structure Definition. ( Circular Buffer )
     */
    struct Message
    {
        int id;
        unsigned int BufNo1;
        unsigned int BufNo2;
        /* Counter to check if all threads read the buffer. */
        unsigned int uiBufReadCntr;
    };
    struct Message ShdBuf[SIZE_CIRCULAR_BUF];
    

    源代码:

        /*
    # 
    # ipc_communicator is a IPC binary where a parent
    # create ten child threads, read the input parameters
    # from a file, sends that parameters to all the children.
    # Each child computes the operations on those input parameters
    # and writes them on to their own file.
    #
    #  Author: Ashok Vairavan              Date: 8/8/2014
    */
    
    /*
     * Generic Header Files.
     */
    #include "stdio.h"
    #include "unistd.h"
    #include "string.h"
    #include "pthread.h"
    #include "errno.h"
    #include "malloc.h"
    #include "fcntl.h"
    #include "getopt.h"
    #include "stdlib.h"
    #include "math.h"
    #include "semaphore.h"
    
    /*
     * Private Header Files.
     */
    #include "ipc*.h"
    
    /*
     * Function to calculate the bitmap based on the 
     * number of threads. This bitmap is used to 
     * track which thread read the buffer and the
     * pending read threads.
     */
    int CalcBitmap(unsigned int Num)
    {
       unsigned int uiIndex;
    
       for( uiIndex = 0; uiIndex < Num; uiIndex++ )
       {
          uiBitmap |= 1 << uiIndex;
       }
       return uiBitmap;
    }
    /*
     * Function that performs complex computation on
     * numbers.
     */
    int ComputeComplexCalc( int op1, int op2)
    {
        return sqrt(op1) + log(op2);
    }
    
    /*
     * Function to initialise the semaphores. 
     * semEmptyNode indicates if the buffer is available
     * for write. semFilledNode indicates that the buffer
     * is available for read.
     */
    void InitSemaphore( void )
    {
       unsigned int uiIndex=0;
    
       CalcBitmap(NUM_OF_CHILDREN);
    
       sem_init( &ReadCntrMutex, 0, 1);
       while( uiIndex<SIZE_CIRCULAR_BUF )
       {
        sem_init( &semEmptyNode[uiIndex], 0, 1 );
            uiIndex++;
       }
       return;
    }
    
    /* 
     * Function to de-initilaise semaphore.
     */
    void DeInitSemaphore( void )
    {
       unsigned int uiIndex=0;
    
       sem_destroy( &ReadCntrMutex);
       while( uiIndex<SIZE_CIRCULAR_BUF )
       {
        sem_destroy( &semEmptyNode[uiIndex] );
            uiIndex++;
       }
       return;
    }
    
    /* Returns TRUE if the parent had reached end of file */
    int isEOF( void )
    {
       return __sync_fetch_and_add(&EndOfFile, 0);
    }
    
    /*
     * Thread functions that prints the number of buffers
     * processed per second.
     */ 
    void *Report( void *arg1 )
    {
       int CumStats = 0;
    
       while( TRUE )
       {
          sleep( 1 );
          CumStats += __sync_fetch_and_sub(&Stats, 0);
          printf("Processed Per Second: %d Cumulative Stats: %d\n", 
                   __sync_fetch_and_sub(&Stats, Stats), CumStats);
       }
       return NULL;
    }
    
    /*
     * Function that reads the data from the input file
     * fills the shared buffer and signals the children to
     * read the data.
     */
    void Parent( void )
    {
       unsigned int op1, op2;
       unsigned int uiCnt = 0;
    
       /* Read the input parameters and process it in
        * while loop till the end of file.
        */
    
       FILE *fp = fopen( inputfile, "r" );
       while( fscanf( fp, "%d %d", &op1, &op2 ) == 2 )
       {
         /* Wait for the buffer to become empty */
         sem_wait(&semEmptyNode[uiCnt]);
    
         /* Access the shared buffer */
         ShdBuf[uiCnt].BufNo1 = op1;
         ShdBuf[uiCnt].BufNo2 = op2;
    
         /* Bitmap to track which thread read the buffer */
         sem_wait( &ReadCntrMutex );
         SET_ALL_BITMAP( ShdBuf[uiCnt].uiBufReadCntr );
         sem_post( &ReadCntrMutex );
    
         __sync_fetch_and_add( &uiParentCnt, 1);
         uiCnt = uiParentCnt % SIZE_CIRCULAR_BUF;
       }
    
       /* If it is end of file, indicate it to other
        * children using global variable.
        */ 
       if( feof( fp ) )
       {
          __sync_add_and_fetch(&EndOfFile, TRUE);
          fclose( fp ); 
          return;
       }
    
       return;
    }
    
    /* 
     * Child thread function which takes threadID as an
     * argument, creates a file using threadID, fetches the
     * parameters from the shared memory, computes the calculation,
     * writes the output to their own file and will release the 
     * semaphore when all the threads read the buffer.
     */
    void *Child( void *ThreadPtr )
    {
       unsigned int uiCnt = 0, uiTotalCnt = 0;
       unsigned int op1, op2;
       char filename[BUF_SIZE];
       int ThreadID;
    
       ThreadID = *((int *) ThreadPtr);
       free( ThreadPtr );
    
       snprintf( filename, BUF_SIZE, "Child%d.txt", ThreadID );
    
       FILE *fp = fopen( filename, "w" );
       if( fp == NULL )
       {
          perror( "fopen" );
          return NULL;
       }
    
       while (TRUE)
       {
          /* Wait till the parent fills the buffer */
          if( __sync_fetch_and_add( &uiParentCnt, 0 ) > uiTotalCnt )
          {
             /* Access the shared memory */
             op1 = ShdBuf[uiCnt].BufNo1;
             op2 = ShdBuf[uiCnt].BufNo2;
    
             fprintf(fp, "%d %d = %d\n", op1, op2, 
                                   ComputeComplexCalc( op1, op2) ); 
    
             sem_wait( &ReadCntrMutex );
             ShdBuf[uiCnt].uiBufReadCntr &= ~( 1 << ThreadID );
             if( ShdBuf[uiCnt].uiBufReadCntr == 0 )
             {
                __sync_add_and_fetch(&Stats, 1);
    
                /* Release the semaphore lock if 
                   all readers read the data */
                sem_post(&semEmptyNode[uiCnt]);
             }
             sem_post( &ReadCntrMutex );
    
             uiTotalCnt++;
             uiCnt = uiTotalCnt % SIZE_CIRCULAR_BUF;
    
             /* If the parent reaches the end of file and 
                if the child thread read all the data then break out */
             if( isEOF() && ( uiTotalCnt ==  uiParentCnt ) )
             {
                //printf("Thid %d p %d c %d\n", ThreadID, uiParentCnt, uiCnt );
                break;
             }
          }
          else
          {
             /* Sleep for ten micro seconds before checking 
                the shared memory */
             usleep(10);
          }
       }
       fclose( fp );
       return NULL;
    }
    
    void usage( void )
    {
        printf(" Usage:\n");
        printf("         -f - the absolute path of the input file where the input parameters are read from.\n\t\t Default input file: ./input.txt");
        printf("         -?  - Help Menu. \n" );
        exit(1);
    }
    
    int main( int argc, char *argv[])
    {
       pthread_attr_t ThrAttr;
       pthread_t ThrChild[NUM_OF_CHILDREN], ThrReport;
       unsigned int uiThdIndex = 0;
       unsigned int *pThreadID;
       int c;
    
       strncpy( inputfile, INPUT_FILE, BUF_SIZE );
    
       while( ( c = getopt( argc, argv, "f:" )) != -1 )
       {
           switch( c )
           {
               case 'f':
                    strncpy( inputfile, optarg, BUF_SIZE );
                    break;
    
               default:
                    usage();
            }
       }
    
       if( access( inputfile, F_OK ) == -1 )
       {
          perror( "access" );
          return FAILURE;
       } 
    
       InitSemaphore();
    
       pthread_attr_init( &ThrAttr );
       while( uiThdIndex< NUM_OF_CHILDREN )
       {
          /* Allocate the memory from the heap and pass it as an argument
           * to each thread to avoid race condition and invalid reference
           * issues with the local variables.
           */
          pThreadID = (unsigned int *) malloc( sizeof( unsigned int ) );
          if( pThreadID == NULL )
          {
         perror( "malloc" );
         /* Cancel pthread operation */
         return FAILURE;
          }
          *pThreadID = uiThdIndex;
          if( pthread_create( 
                 &ThrChild[uiThdIndex], NULL, &Child, pThreadID) != 0 )
          {
        printf( "pthread %d creation failed. Error: %d\n", uiThdIndex, errno);
        perror( "pthread_create" );
        return FAILURE;
          }
          uiThdIndex++;
       }
    
       /* Report thread reports the statistics of IPC communication
        * between parent and childs threads.
        */
       if( pthread_create( &ThrReport, NULL, &Report, NULL) != 0 )
       {
        perror( "pthread_create" );
        return FAILURE;
       }
    
       Parent();
    
       uiThdIndex = 0;
       while( uiThdIndex < NUM_OF_CHILDREN )
       {
          pthread_join( ThrChild[uiThdIndex], NULL );
          uiThdIndex++;
       }
    
       /*
        * Cancel the report thread function when all the 
        * children exits.
        */
       pthread_cancel( ThrReport );
    
       DeInitSemaphore();
    
       return SUCCESS;
    }
    

    2

    是的,我们可以在两个进程之间使用命名信号量。 我们可以打开一个文件。

    snprintf(sem_open_file, 128, "/sem_16");
    ret_value = sem_open((const char *)sem_open_file, 0, 0, 0);
    
    if (ret_value == SEM_FAILED) {
        /*
         * No such file or directory
         */
        if (errno == ENOENT) {
            ret_value = sem_open((const char *)sem_open_file,
                               O_CREAT | O_EXCL, 0777, 1);
         }
    }
    

    其他进程也可以使用它。

    -2
    我真的怀疑共享内存方法是否会奏效。 据我所知,信号量实际上只是一个句柄,仅对打开它的进程有效。真正的信号量信息驻留在内核内存中。 因此,在其他进程中使用相同的句柄值是行不通的。

    谢谢您的回答。您能否将其与以下代码进行比较:int fd = shm_open("shmname", O_CREAT, O_RDWR); ftruncate(fd, sizeof(pthread_mutex_t)); pthread_mutex_t *mutex = mmap(NULL, sizeof(pthread_mutex_t), PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); pthread_mutexattr_init(attribute); pthread_mutexattr_setpshared(attribute, PTHREAD_PROCESS_SHARED); pthread_mutex_init(mutex, attributes); - Frank
    2
    这个答案是完全错误的:信号量可以在进程之间共享。请查看sem_init函数的第二个参数的文档。 - ntd

    网页内容由stack overflow 提供, 点击上面的
    可以查看英文原文,
    原文链接