How to share semaphores between processes using shared memory
Asked Answered
B

4

37

I have to synchronize N client processes with one server. These processes are forked by a main function in which I declared 3 semaphores. I decided to use POSIX semaphores but I don't know how to share them between these processes. I thought that shared memory should work correctly, but I have some questions:

  • How can I allocate the right space of memory in my segment?
  • Can I use sizeof(sem_t) in size_t field of shmget in order to allocate exactly the space I need?
  • Does anyone have some examples similar to this situation?
Brandish answered 2/12, 2011 at 16:19 Comment(0)
B
68

It's easy to share named POSIX semaphores

  • Choose a name for your semaphore

    #define SNAME "/mysem"
    
  • Use sem_open with O_CREAT in the process that creates them

    sem_t *sem = sem_open(SNAME, O_CREAT, 0644, 3); /* Initial value is 3. */
    
  • Open semaphores in the other processes

    sem_t *sem = sem_open(SEM_NAME, 0); /* Open a preexisting semaphore. */
    

If you insist on using shared memory, it's certainly possible.

int fd = shm_open("shmname", O_CREAT, O_RDWR);
ftruncate(fd, sizeof(sem_t));
sem_t *sem = mmap(NULL, sizeof(sem_t), PROT_READ | PROT_WRITE,
    MAP_SHARED, fd, 0);

sem_init(sem, 1, 1);

I haven't tested the above so it could be completely bonkers.

Brazee answered 2/12, 2011 at 16:25 Comment(6)
Using named semaphores i don't have to worry about memory allocation! I didn't know how to do it. Thanks a lotBrandish
Yes, but using named semaphore seems more elegant in my opinionBrandish
@cnicutar, Your answer is great. I realize you have not tested it yet. Could you please compare it with, int fd = shm_open("shmname", O_CREAT, O_RDWR); ftruncate(fd, sizeof(pthread_mutex_t)); pthread_mutec_t *mutex = mmap(NULL, sizeof(pthread_mutex_t), PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); pthread_mutexattr_init(attribute); pthread_mutexattr_setpshared(attribute, PROCESS_SHARED); pthread_mutex_init(mutex, attributes);Neoterize
Don't named semaphores mean that you can't have multiple instances of your program open at the same time without them interfering?Magog
@DavidRoundy The name can be dynamic (for example a command line argument) in which case one can run multiple instances.Brazee
I tried your snippet with shm_open and failed, according to this blog blog.superpat.com/semaphores-on-linux-sem_init-vs-sem_open, we should use shm_open("shmname", O_RDWR | O_CREAT, S_IRWXU)Trestlework
B
5

I wrote a sample application where a parent (producer) spawns N child (consumer) threads and uses a global array to communicate data between them. The global array is a circular memory and is protected when data is written on to the array.

Header File:

#define TRUE 1
#define FALSE 0

#define SUCCESS 0
#define FAILURE -1

/*
 * Function Declaration.
 */
void Parent( void );
int CalcBitmap(unsigned int Num);
void InitSemaphore( void );
void DeInitSemaphore( void );
int ComputeComplexCalc( int op1, int op2);

/*
 * Thread functions.
 */
void *Child( void *arg );
void *Report( void *arg1 );

/*
 * Macro Definition.
 */
#define INPUT_FILE  "./input.txt"
#define NUM_OF_CHILDREN 10
#define SIZE_CIRCULAR_BUF 256
#define BUF_SIZE 128
//Set the bits corresponding to all threads.
#define SET_ALL_BITMAP( a ) a = uiBitmap;     
//Clears the bit corresponding to a thread after
//every read. 
#define CLEAR_BIT( a, b ) a &= ~( 1 << b );   

/*
 * Have a dedicated semaphore for each buffer
 * to synchronize the acess to the shared memory
 * between the producer (parent) and the consumers. 
 */
sem_t ReadCntrMutex;
sem_t semEmptyNode[SIZE_CIRCULAR_BUF];

/*
 * Global Variables.
 */
unsigned int uiBitmap = 0;
//Counter to track the number of processed buffers.
unsigned int Stats;                           
unsigned int uiParentCnt = 0;
char inputfile[BUF_SIZE];
int EndOfFile = FALSE;

/*
 * Data Structure Definition. ( Circular Buffer )
 */
struct Message
{
    int id;
    unsigned int BufNo1;
    unsigned int BufNo2;
    /* Counter to check if all threads read the buffer. */
    unsigned int uiBufReadCntr;
};
struct Message ShdBuf[SIZE_CIRCULAR_BUF];

Source code:

    /*
# 
# ipc_communicator is a IPC binary where a parent
# create ten child threads, read the input parameters
# from a file, sends that parameters to all the children.
# Each child computes the operations on those input parameters
# and writes them on to their own file.
#
#  Author: Ashok Vairavan              Date: 8/8/2014
*/

/*
 * Generic Header Files.
 */
#include "stdio.h"
#include "unistd.h"
#include "string.h"
#include "pthread.h"
#include "errno.h"
#include "malloc.h"
#include "fcntl.h"
#include "getopt.h"
#include "stdlib.h"
#include "math.h"
#include "semaphore.h"

/*
 * Private Header Files.
 */
#include "ipc*.h"

/*
 * Function to calculate the bitmap based on the 
 * number of threads. This bitmap is used to 
 * track which thread read the buffer and the
 * pending read threads.
 */
int CalcBitmap(unsigned int Num)
{
   unsigned int uiIndex;

   for( uiIndex = 0; uiIndex < Num; uiIndex++ )
   {
      uiBitmap |= 1 << uiIndex;
   }
   return uiBitmap;
}
/*
 * Function that performs complex computation on
 * numbers.
 */
int ComputeComplexCalc( int op1, int op2)
{
    return sqrt(op1) + log(op2);
}

/*
 * Function to initialise the semaphores. 
 * semEmptyNode indicates if the buffer is available
 * for write. semFilledNode indicates that the buffer
 * is available for read.
 */
void InitSemaphore( void )
{
   unsigned int uiIndex=0;

   CalcBitmap(NUM_OF_CHILDREN);

   sem_init( &ReadCntrMutex, 0, 1);
   while( uiIndex<SIZE_CIRCULAR_BUF )
   {
    sem_init( &semEmptyNode[uiIndex], 0, 1 );
        uiIndex++;
   }
   return;
}

/* 
 * Function to de-initilaise semaphore.
 */
void DeInitSemaphore( void )
{
   unsigned int uiIndex=0;

   sem_destroy( &ReadCntrMutex);
   while( uiIndex<SIZE_CIRCULAR_BUF )
   {
    sem_destroy( &semEmptyNode[uiIndex] );
        uiIndex++;
   }
   return;
}

/* Returns TRUE if the parent had reached end of file */
int isEOF( void )
{
   return __sync_fetch_and_add(&EndOfFile, 0);
}

/*
 * Thread functions that prints the number of buffers
 * processed per second.
 */ 
void *Report( void *arg1 )
{
   int CumStats = 0;

   while( TRUE )
   {
      sleep( 1 );
      CumStats += __sync_fetch_and_sub(&Stats, 0);
      printf("Processed Per Second: %d Cumulative Stats: %d\n", 
               __sync_fetch_and_sub(&Stats, Stats), CumStats);
   }
   return NULL;
}

/*
 * Function that reads the data from the input file
 * fills the shared buffer and signals the children to
 * read the data.
 */
void Parent( void )
{
   unsigned int op1, op2;
   unsigned int uiCnt = 0;

   /* Read the input parameters and process it in
    * while loop till the end of file.
    */

   FILE *fp = fopen( inputfile, "r" );
   while( fscanf( fp, "%d %d", &op1, &op2 ) == 2 )
   {
     /* Wait for the buffer to become empty */
     sem_wait(&semEmptyNode[uiCnt]);

     /* Access the shared buffer */
     ShdBuf[uiCnt].BufNo1 = op1;
     ShdBuf[uiCnt].BufNo2 = op2;

     /* Bitmap to track which thread read the buffer */
     sem_wait( &ReadCntrMutex );
     SET_ALL_BITMAP( ShdBuf[uiCnt].uiBufReadCntr );
     sem_post( &ReadCntrMutex );

     __sync_fetch_and_add( &uiParentCnt, 1);
     uiCnt = uiParentCnt % SIZE_CIRCULAR_BUF;
   }

   /* If it is end of file, indicate it to other
    * children using global variable.
    */ 
   if( feof( fp ) )
   {
      __sync_add_and_fetch(&EndOfFile, TRUE);
      fclose( fp ); 
      return;
   }

   return;
}

/* 
 * Child thread function which takes threadID as an
 * argument, creates a file using threadID, fetches the
 * parameters from the shared memory, computes the calculation,
 * writes the output to their own file and will release the 
 * semaphore when all the threads read the buffer.
 */
void *Child( void *ThreadPtr )
{
   unsigned int uiCnt = 0, uiTotalCnt = 0;
   unsigned int op1, op2;
   char filename[BUF_SIZE];
   int ThreadID;

   ThreadID = *((int *) ThreadPtr);
   free( ThreadPtr );

   snprintf( filename, BUF_SIZE, "Child%d.txt", ThreadID );

   FILE *fp = fopen( filename, "w" );
   if( fp == NULL )
   {
      perror( "fopen" );
      return NULL;
   }

   while (TRUE)
   {
      /* Wait till the parent fills the buffer */
      if( __sync_fetch_and_add( &uiParentCnt, 0 ) > uiTotalCnt )
      {
         /* Access the shared memory */
         op1 = ShdBuf[uiCnt].BufNo1;
         op2 = ShdBuf[uiCnt].BufNo2;

         fprintf(fp, "%d %d = %d\n", op1, op2, 
                               ComputeComplexCalc( op1, op2) ); 

         sem_wait( &ReadCntrMutex );
         ShdBuf[uiCnt].uiBufReadCntr &= ~( 1 << ThreadID );
         if( ShdBuf[uiCnt].uiBufReadCntr == 0 )
         {
            __sync_add_and_fetch(&Stats, 1);

            /* Release the semaphore lock if 
               all readers read the data */
            sem_post(&semEmptyNode[uiCnt]);
         }
         sem_post( &ReadCntrMutex );

         uiTotalCnt++;
         uiCnt = uiTotalCnt % SIZE_CIRCULAR_BUF;

         /* If the parent reaches the end of file and 
            if the child thread read all the data then break out */
         if( isEOF() && ( uiTotalCnt ==  uiParentCnt ) )
         {
            //printf("Thid %d p %d c %d\n", ThreadID, uiParentCnt, uiCnt );
            break;
         }
      }
      else
      {
         /* Sleep for ten micro seconds before checking 
            the shared memory */
         usleep(10);
      }
   }
   fclose( fp );
   return NULL;
}

void usage( void )
{
    printf(" Usage:\n");
    printf("         -f - the absolute path of the input file where the input parameters are read from.\n\t\t Default input file: ./input.txt");
    printf("         -?  - Help Menu. \n" );
    exit(1);
}

int main( int argc, char *argv[])
{
   pthread_attr_t ThrAttr;
   pthread_t ThrChild[NUM_OF_CHILDREN], ThrReport;
   unsigned int uiThdIndex = 0;
   unsigned int *pThreadID;
   int c;

   strncpy( inputfile, INPUT_FILE, BUF_SIZE );

   while( ( c = getopt( argc, argv, "f:" )) != -1 )
   {
       switch( c )
       {
           case 'f':
                strncpy( inputfile, optarg, BUF_SIZE );
                break;

           default:
                usage();
        }
   }

   if( access( inputfile, F_OK ) == -1 )
   {
      perror( "access" );
      return FAILURE;
   } 

   InitSemaphore();

   pthread_attr_init( &ThrAttr );
   while( uiThdIndex< NUM_OF_CHILDREN )
   {
      /* Allocate the memory from the heap and pass it as an argument
       * to each thread to avoid race condition and invalid reference
       * issues with the local variables.
       */
      pThreadID = (unsigned int *) malloc( sizeof( unsigned int ) );
      if( pThreadID == NULL )
      {
     perror( "malloc" );
     /* Cancel pthread operation */
     return FAILURE;
      }
      *pThreadID = uiThdIndex;
      if( pthread_create( 
             &ThrChild[uiThdIndex], NULL, &Child, pThreadID) != 0 )
      {
    printf( "pthread %d creation failed. Error: %d\n", uiThdIndex, errno);
    perror( "pthread_create" );
    return FAILURE;
      }
      uiThdIndex++;
   }

   /* Report thread reports the statistics of IPC communication
    * between parent and childs threads.
    */
   if( pthread_create( &ThrReport, NULL, &Report, NULL) != 0 )
   {
    perror( "pthread_create" );
    return FAILURE;
   }

   Parent();

   uiThdIndex = 0;
   while( uiThdIndex < NUM_OF_CHILDREN )
   {
      pthread_join( ThrChild[uiThdIndex], NULL );
      uiThdIndex++;
   }

   /*
    * Cancel the report thread function when all the 
    * children exits.
    */
   pthread_cancel( ThrReport );

   DeInitSemaphore();

   return SUCCESS;
}
Breastsummer answered 21/8, 2014 at 18:6 Comment(0)
W
3

Yes, we can use the named semaphore between the two processes. We can open a file.

snprintf(sem_open_file, 128, "/sem_16");
ret_value = sem_open((const char *)sem_open_file, 0, 0, 0);

if (ret_value == SEM_FAILED) {
    /*
     * No such file or directory
     */
    if (errno == ENOENT) {
        ret_value = sem_open((const char *)sem_open_file,
                           O_CREAT | O_EXCL, 0777, 1);
     }
}

And other process can use this.

Wallywalnut answered 4/3, 2013 at 9:17 Comment(0)
C
-2

I really doubt if the shared memory approach would ever work.

AFAIK the semaphore is actually just a handle, valid for the process it is opened in. The real semaphore info lies in the kernel memory.

So using the same handle value in other process is not going to work.

Catchy answered 17/10, 2012 at 9:35 Comment(2)
Thanks for your answer. Could you please compare it with, int fd = shm_open("shmname", O_CREAT, O_RDWR); ftruncate(fd, sizeof(pthread_mutex_t)); pthread_mutex_t *mutex = mmap(NULL, sizeof(pthread_mutex_t), PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); pthread_mutexattr_init(attribute); pthread_mutexattr_setpshared(attribute, PTHREAD_PROCESS_SHARED); pthread_mutex_init(mutex, attributes);Neoterize
This answer is just plain wrong: the semaphore can be shared between processes. Check the documentation of the second argument of sem_init.Calley

© 2022 - 2024 — McMap. All rights reserved.