2

Goal

Implement a MPSC-like queue with mutexes and sempahores.

Issue

Eventually, the consumer will attempt to dequeue an empty queue.

Minimal Reproducible Example

I have edited the question to include a minimal reproducible example. 5 producers are created, and each producer will enqueue their id 4 times. The queue size is 4.

#include <assert.h>
#include <pthread.h>
#include <semaphore.h>
#include <stdbool.h>
#include <stdlib.h>

typedef struct {
        int capacity;
        int* data;
        int length;
        int offset;
} Queue;

typedef struct {
        int id;
        Queue* queue;
        pthread_mutex_t* queue_lock;
        sem_t* queue_sem;
} ProducerArgs;

int dequeue(Queue*);
void enqueue(Queue*, int);
void init_queue(Queue*, int);
bool is_queue_empty(Queue*);
bool is_queue_full(Queue*);
void* producer(void*);

int main(int argc, char* argv[]) {
        pthread_mutex_t queue_lock;

        sem_t queue_sem;
        sem_init(&queue_sem, 0, 0);

        Queue queue;
        init_queue(&queue, 4);

        ProducerArgs args[5];
        pthread_t producers[5];
        for (int i = 0; i < 5; i++) {
                args[i].id = i;
                args[i].queue = &queue;
                args[i].queue_lock = &queue_lock;
                args[i].queue_sem = &queue_sem;
                pthread_create(&producers[i], NULL, producer, (void*)&args[i]);
        }

        for (int j = 0; j < 20; j++) {
                sem_wait(&queue_sem);
                pthread_mutex_lock(&queue_lock);
                int value = dequeue(&queue);
                pthread_mutex_unlock(&queue_lock);
                (void)value;
        }

        return 0;
}

int dequeue(Queue* queue) {
        assert(!is_queue_empty(queue));
        int value = queue->data[queue->offset];
        queue->offset = (queue->offset + 1) % queue->capacity;
        queue->length--;
        return value;
}

void enqueue(Queue* queue, int value) {
        assert(!is_queue_full(queue));
        queue->data[(queue->offset + queue->length) % queue->capacity] = value;
        queue->length++;
}

bool is_queue_empty(Queue* queue) {
        return queue->length == 0;
}

bool is_queue_full(Queue* queue) {
        return queue->length == queue->capacity;
}

void init_queue(Queue* queue, int capacity) {
        queue->capacity = capacity;
        queue->data = malloc(sizeof(int) * capacity);
        queue->length = 0;
        queue->offset = 0;
}

void* producer(void* ptr) {
        ProducerArgs* args = (ProducerArgs*)ptr;

        int completed = 0;
        while (completed < 4) {
                pthread_mutex_lock(args->queue_lock);
                if (is_queue_full(args->queue)) {
                        pthread_mutex_unlock(args->queue_lock);
                } else {
                        enqueue(args->queue, args->id);
                        pthread_mutex_unlock(args->queue_lock);
                        sem_post(args->queue_sem);
                        completed++;
                }
        }
        return NULL;
}

Notes

The semaphore was initialized with a value of 0. As far as I understand, this should give me the semantics I desire. The consumer should wait for sem_posts before entering its dequeue step, and each sem_post should indicate another value has been added to the queue. Are my assumptions wrong?

2
  • @Yunnosch I have added a MRE. Hopefully its not too long. Commented Nov 14, 2024 at 10:44
  • :-) Do not overstress the "minimal", the "reproducible" is equally important. Commented Nov 14, 2024 at 11:20

1 Answer 1

0

I am on Mac OSX, and sem_init is not supported (it was returning -1). I tried running the same code on a linux machine and it worked fine.

Moral of the story: I'm a goober and should check my return codes.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.