I use two mutexes for enqueueTS() and dequeueTS(), so that writer and reader threads can run simultaneously without waiting for each other. However, both enqueueTS() and dequeueTS() can affect member numOfElements in struct queue_t, eventually causing race condition.
queue.c:
struct queue_t {
unsigned head;
unsigned tail;
unsigned size;
unsigned numOfElements;
unsigned elementSize;
char data[];
};
int enqueue(queue_t *me, const void *item) {
if (!me || !item) {
queue_error_callback("%s: invalid input\n", __func__);
return -1;
}
if (me->numOfElements == me->size) {
queue_error_callback("%s: queue is full\n", __func__);
return -1;
}
memcpy(&me->data[me->tail * me->elementSize], item, me->elementSize);
if (++me->tail == me->size) {
me->tail = 0;
}
me->numOfElements++;
return 0;
}
int dequeue(queue_t *me, void *item) {
if (!me || !item) {
queue_error_callback("%s: invalid input\n", __func__);
return -1;
}
if (me->numOfElements == 0) {
queue_error_callback("%s: queue is empty\n", __func__);
return -1;
}
memcpy(item, &me->data[me->head * me->elementSize], me->elementSize);
if (++me->head == me->size) {
me->head = 0;
}
me->numOfElements--;
return 0;
}
thread-safety queue.c
struct thread_safe_queue {
queue_t *queue;
pthread_mutex_t enqueue_mutex;
pthread_mutex_t dequeue_mutex;
pthread_cond_t queue_not_full;
pthread_cond_t queue_not_empty;
};
int enqueueTS(ts_queue_t *tsq, const void *item) {
if (!tsq || !item) {
queue_error_callback("%s: invalid input\n", __func__);
return -1;
}
pthread_mutex_lock(&tsq->enqueue_mutex);
while (numOfEmptySlots(tsq->queue) == 0) {
pthread_cond_wait(&tsq->queue_not_full, &tsq->enqueue_mutex);
}
int ret = enqueue(tsq->queue, item);
pthread_cond_signal(&tsq->queue_not_empty);
pthread_mutex_unlock(&tsq->enqueue_mutex);
return ret;
}
int dequeueTS(ts_queue_t *tsq, void *item) {
if (!tsq || !item) {
queue_error_callback("%s: invalid input\n", __func__);
return -1;
}
pthread_mutex_lock(&tsq->dequeue_mutex);
while (numOfElements(tsq->queue) == 0) {
pthread_cond_wait(&tsq->queue_not_empty, &tsq->dequeue_mutex);
}
int ret = dequeue(tsq->queue, item);
pthread_cond_signal(&tsq->queue_not_full);
pthread_mutex_unlock(&tsq->dequeue_mutex);
return ret;
}
One straightforward solution is to make numOfElements atomic. However, I don't want to do this because
- I want to use pure lock mechanism.
- I want to separate pure queue module from thread-related function, so that open-close principle is followed.
My question: Are there other ways to make enqueueTS() and dequeueTS() run simultaneously without race conditions, while avoiding modifications to queue.c?
pthread_mutex_t *argument toenqueue()anddequeue()pointing to a mutex used to control access to thenumOfElementsmember. Theenqueue()anddequeue()functions could allow the mutex pointer to beNULLto skip locking/unlocking.