UNIX Environment advanced programming - semaphores

1. General

Semaphore a counter used by multiple processes to access shared data objects.

P operation:

In order to obtain shared resources, the process needs to perform the following operations:

  1. Test the semaphore that controls the resource.

  2. If the value of this semaphore is positive, the resource can be used by the process. The process subtracts the semaphore value by 1, indicating that it uses a resource unit.

  3. If the value of this semaphore is 0, the process enters the sleep state until the semaphore value is greater than 0. If the process is awakened, it returns to step 1.

V operation:

When the process no longer uses the shared resource controlled by an information quantity, the semaphore value increases by 1. If processes are sleeping waiting for this semaphore, wake them up.

In order to correctly realize the amount of information, the test and minus 1 operation of semaphore value should be atomic operation. For this reason, semaphores are usually implemented in the kernel.

2. Acquire semaphore

When using XSI semaphores, you first need to call the function semget to obtain a semaphore ID.

#include <sys/types.h> 
#include <sys/ipc.h> 
#include <sys/sem.h>

int semget(key_t key, int nsems, int flag);
                       Return value: if successful, return semaphore ID;If there is an error, return-1

nsems is the number of semaphores in the set. If you are creating a new collection (usually in the server), you must specify nsems. If you reference an existing collection (a client), specify nsems as 0.

3. Basic operation of semaphore

semctl function contains a variety of semaphore operations.

#include <sys/types.h> 
#include <sys/ipc.h> 
#include <sys/sem.h>

int semctl(int semid, int semum, int cmd, . . . /* union semun arg */);

union semun {
  int     val;            /* value for SETVAL */
  struct  semid_ds *buf;  /* buffer for IPC_STAT & IPC_SET */
  u_short *array;         /* array for GETALL & SETALL */
};

For all GET commands except GETALL, the semctl function returns the corresponding value. The return value of other commands is 0.

Macro definition effect
IPC_STAT Take semid for this set_ DS structure and stored by arg BUF points to the structure.
IPC_SET Press arg The value in the structure pointed to by buf sets the related member in the related structure of the collection.
IPC_RMID Delete the semaphore set from the system.
GETVAL The value returned by the member semval.
SETVAL Set the semval value of the member semnum. This value is determined by arg Val specified.
GETPID Returns the value of the sempid of the member semnum.
GETNCNT Returns the semncnt value of the member semnum
GETZCNT Returns the semzcnt value of the member semnum.
GETALL Returns all semaphore values in the set.
SETALL Set all semaphores in the set to arg The value in the array pointed to by array.

4. Semaphore PV operation

The function semop automatically performs an array of operations on a collection of semaphores.

#include <sys/types.h> 
#include <sys/ipc.h> 
#include <sys/sem.h>

int semop(int semid, struct sembuf semoparray[ ], size_t nops);
                         Return value: if successful, return semaphore ID;If there is an error, return-1
                           
struct sembuf {
  unsigned short sem_num;   /* member # in set (0, 1, ..., nsems-1) */
  short sem_op;             /* operation (negative, 0, or positive) */
  short sem_flg;            /* IPC_NOWAIT, SEM_UNDO */
};

The parameter nops specifies the number of operations (primitives) in the array.

The operation on each member in the set is determined by the corresponding sem_op value is specified. This value can be negative, 0, or positive.

  1. If SEM_ If OP is positive, it corresponds to the number of resources occupied by the process.
  2. If SEM_ If OP is negative, it means that the resource controlled by the semaphore is to be obtained.

5. Packaging (warp function)

The packaging of related APIs can be more general:

bool sem_get(const key_t key, int *sem_id) {
    *sem_id = semget((key_t)key, 1, 0666 | IPC_CREAT);
    if (*sem_id == -1) {
        printf("sem_get fail!\n");
        return false;
    }
    return true;
}

bool sem_init(const int sem_id, const int val) {
    if (semctl(sem_id, 0, SETVAL, val) == -1) {
        printf("sem_init fail!\n");
        return false;
    }
    return true;
}

bool sem_unlink(const int sem_id) {
    if (semctl(sem_id, 0, IPC_RMID, sem_id) == -1) {
        printf("sem_unlink fail!\n");
        return false;
    }
    return true;
}

bool sem_wait(int sem_id) {
    struct sembuf buf;
    buf.sem_num = 0;
    buf.sem_op = -1;
    buf.sem_flg = 0;
    if(semop(sem_id, &buf, 1) == -1) {
        printf("sem_wait fail! %d\n", errno);
        return false;
    }
    return true;
}

bool sem_post(int sem_id) {
    struct sembuf buf;
    buf.sem_num = 0;
    buf.sem_op = +1;
    buf.sem_flg = 0;
    if(semop(sem_id, &buf, 1) == -1) {
        printf("sem_post fail! %d\n", errno);
        return false;
    }
    return true;
}

6. Example: solving producer consumer model with semaphore

Producer consumer problems can be solved with semaphores as follows:

The example uses the above warp function, and the code is as follows:

#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/sem.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/sem.h>
#include <sys/wait.h>       
#include <sys/stat.h>
#include <fcntl.h>
#include <errno.h>

#define NUMBER  50000
#define CHILD   5
#define BUFSIZE 10

bool sem_get(const key_t key, int* sem_id);
bool sem_init(const int sem_id, const int val);
bool sem_unlink(const int sem_id);
bool sem_wait(const int semid); // P
bool sem_post(const int semid); // V

#define IS_ERROR(ret) \
    do {              \
        if (!ret) {   \
            exit(0);  \
        }             \
    } while(0);       \

// critical resource
int fd;

// semaphore
int empty = -1;
int full = -1;
int mutex = -1;

void comsumer() {
    int buf_out = 0;
    int data = 0;
    int cnt = 0;
    for (int k = 0; k < NUMBER / CHILD; k++) {
        sem_wait(full);
        sem_wait(mutex);
        
        // fetch buf_out
        lseek(fd, BUFSIZE * sizeof(int), SEEK_SET);
        read(fd, (char*)&buf_out, sizeof(int));

        cnt++;
        lseek(fd, sizeof(int) * buf_out, SEEK_SET);
        read(fd, (char*)&data, sizeof(int));
        printf("%d comsume %d %d\n", getpid(), data, cnt);
        fflush(stdout);
        
        // write back
        buf_out = (buf_out + 1) % BUFSIZE;
        lseek(fd, BUFSIZE * sizeof(int), SEEK_SET);
        write(fd, (char *)&buf_out, sizeof(int));

        sem_post(mutex);
        sem_post(empty);
    }
    printf("%d total consume %d\n", getpid(), cnt);
}

void producer() {
    int buf_in = 0;
    for (int i = 0 ; i < NUMBER; i++) {
        sem_wait(empty);
        sem_wait(mutex);
        
        lseek(fd, buf_in * sizeof(int), SEEK_SET);
        write(fd, (char*)&i, sizeof(int));
        buf_in = (buf_in + 1) % BUFSIZE;
        printf("produce %d\n", i);
        fflush(stdout);

        sem_post(mutex);
        sem_post(full);
    }
}

int main() {
    int ret = sem_get(123, &empty);
    IS_ERROR(ret);
    ret = sem_get(234, &full);
    IS_ERROR(ret);
    ret = sem_get(345, &mutex);
    IS_ERROR(ret);

    ret = sem_init(empty, BUFSIZE);
    IS_ERROR(ret);
    ret = sem_init(full, 0);
    IS_ERROR(ret);
    ret = sem_init(mutex, 1);
    IS_ERROR(ret);
    
    int out_index = 0;
    fd = open("buffer.dat", O_CREAT | O_RDWR | O_TRUNC, 0666);
    lseek(fd, BUFSIZE * sizeof(int), SEEK_SET);
    write(fd, (char *)&(out_index), sizeof(int));

    pid_t p;
    // create producer
    if((p = fork()) == 0) {
        producer();
        return 0;
    } else if(p < 0){
        printf("Fail to fork!\n");
        return -1;
    }

    // create comsumer
    for(int j = 0; j < CHILD ; j++)
    {
        if((p = fork()) == 0) {
            comsumer();
            return 0;
        } else if(p < 0) {
            printf("Fail to fork!\n");
            return -1;
        }
    }
    int cnt = 0;
    printf("wait children!\n");
    pid_t pid;
    while (pid = waitpid(-1, NULL, 0)) {
        if (errno == ECHILD) {
            break;
        }
        cnt ++;
        printf("pid: %d end | sum: %d\n", pid, cnt);
    }

    ret = sem_unlink(empty);
    IS_ERROR(ret);
    ret = sem_unlink(full);
    IS_ERROR(ret);
    ret = sem_unlink(mutex);
    IS_ERROR(ret);

    return 0;
}

bool sem_get(const key_t key, int *sem_id) {
    *sem_id = semget((key_t)key, 1, 0666 | IPC_CREAT);
    if (*sem_id == -1) {
        printf("sem_get fail!\n");
        return false;
    }
    return true;
}

bool sem_init(const int sem_id, const int val) {
    if (semctl(sem_id, 0, SETVAL, val) == -1) {
        printf("sem_init fail!\n");
        return false;
    }
    return true;
}

bool sem_unlink(const int sem_id) {
    if (semctl(sem_id, 0, IPC_RMID, sem_id) == -1) {
        printf("sem_unlink fail!\n");
        return false;
    }
    return true;
}

bool sem_wait(int sem_id) {
    struct sembuf buf;
    buf.sem_num = 0;
    buf.sem_op = -1;
    buf.sem_flg = 0;
    if(semop(sem_id, &buf, 1) == -1) {
        printf("sem_wait fail! %d\n", errno);
        return false;
    }
    return true;
}

bool sem_post(int sem_id) {
    struct sembuf buf;
    buf.sem_num = 0;
    buf.sem_op = +1;
    buf.sem_flg = 0;
    if(semop(sem_id, &buf, 1) == -1) {
        printf("sem_post fail! %d\n", errno);
        return false;
    }
    return true;
}

Posted by Akinraze on Mon, 18 Apr 2022 16:43:16 +0930