Goal
Implement a MPSC-like queue with mutexes and sempahores.
Issue
Eventually, the consumer will attempt to dequeue an empty queue.
Minimal Reproducible Example
I have edited the question to include a minimal reproducible example. 5 producers are created, and each producer will enqueue their id 4 times. The queue size is 4.
#include <assert.h>
#include <pthread.h>
#include <semaphore.h>
#include <stdbool.h>
#include <stdlib.h>
typedef struct {
int capacity;
int* data;
int length;
int offset;
} Queue;
typedef struct {
int id;
Queue* queue;
pthread_mutex_t* queue_lock;
sem_t* queue_sem;
} ProducerArgs;
int dequeue(Queue*);
void enqueue(Queue*, int);
void init_queue(Queue*, int);
bool is_queue_empty(Queue*);
bool is_queue_full(Queue*);
void* producer(void*);
int main(int argc, char* argv[]) {
pthread_mutex_t queue_lock;
sem_t queue_sem;
sem_init(&queue_sem, 0, 0);
Queue queue;
init_queue(&queue, 4);
ProducerArgs args[5];
pthread_t producers[5];
for (int i = 0; i < 5; i++) {
args[i].id = i;
args[i].queue = &queue;
args[i].queue_lock = &queue_lock;
args[i].queue_sem = &queue_sem;
pthread_create(&producers[i], NULL, producer, (void*)&args[i]);
}
for (int j = 0; j < 20; j++) {
sem_wait(&queue_sem);
pthread_mutex_lock(&queue_lock);
int value = dequeue(&queue);
pthread_mutex_unlock(&queue_lock);
(void)value;
}
return 0;
}
int dequeue(Queue* queue) {
assert(!is_queue_empty(queue));
int value = queue->data[queue->offset];
queue->offset = (queue->offset + 1) % queue->capacity;
queue->length--;
return value;
}
void enqueue(Queue* queue, int value) {
assert(!is_queue_full(queue));
queue->data[(queue->offset + queue->length) % queue->capacity] = value;
queue->length++;
}
bool is_queue_empty(Queue* queue) {
return queue->length == 0;
}
bool is_queue_full(Queue* queue) {
return queue->length == queue->capacity;
}
void init_queue(Queue* queue, int capacity) {
queue->capacity = capacity;
queue->data = malloc(sizeof(int) * capacity);
queue->length = 0;
queue->offset = 0;
}
void* producer(void* ptr) {
ProducerArgs* args = (ProducerArgs*)ptr;
int completed = 0;
while (completed < 4) {
pthread_mutex_lock(args->queue_lock);
if (is_queue_full(args->queue)) {
pthread_mutex_unlock(args->queue_lock);
} else {
enqueue(args->queue, args->id);
pthread_mutex_unlock(args->queue_lock);
sem_post(args->queue_sem);
completed++;
}
}
return NULL;
}
Notes
The semaphore was initialized with a value of 0. As far as I understand, this should give me the semantics I desire. The consumer should wait for sem_posts before entering its dequeue step, and each sem_post should indicate another value has been added to the queue. Are my assumptions wrong?