0

I use two mutexes for enqueueTS() and dequeueTS(), so that writer and reader threads can run simultaneously without waiting for each other. However, both enqueueTS() and dequeueTS() can affect member numOfElements in struct queue_t, eventually causing race condition.

queue.c:

struct queue_t {
  unsigned head;
  unsigned tail;
  unsigned size;
  unsigned numOfElements;
  unsigned elementSize;
  char data[];
};

int enqueue(queue_t *me, const void *item) {
  if (!me || !item) {
    queue_error_callback("%s: invalid input\n", __func__);
    return -1;
  }

  if (me->numOfElements == me->size) {
    queue_error_callback("%s: queue is full\n", __func__);
    return -1;
  }

  memcpy(&me->data[me->tail * me->elementSize], item, me->elementSize);

  if (++me->tail == me->size) {
    me->tail = 0;
  }

  me->numOfElements++;

  return 0;
}

int dequeue(queue_t *me, void *item) {
  if (!me || !item) {
    queue_error_callback("%s: invalid input\n", __func__);
    return -1;
  }

  if (me->numOfElements == 0) {
    queue_error_callback("%s: queue is empty\n", __func__);
    return -1;
  }

  memcpy(item, &me->data[me->head * me->elementSize], me->elementSize);

  if (++me->head == me->size) {
    me->head = 0;
  }

  me->numOfElements--;

  return 0;
}

thread-safety queue.c

struct thread_safe_queue {
  queue_t *queue;
  pthread_mutex_t enqueue_mutex;
  pthread_mutex_t dequeue_mutex;
  pthread_cond_t queue_not_full;
  pthread_cond_t queue_not_empty;
};

int enqueueTS(ts_queue_t *tsq, const void *item) {
  if (!tsq || !item) {
    queue_error_callback("%s: invalid input\n", __func__);
    return -1;
  }

  pthread_mutex_lock(&tsq->enqueue_mutex);
  while (numOfEmptySlots(tsq->queue) == 0) {
    pthread_cond_wait(&tsq->queue_not_full, &tsq->enqueue_mutex);
  }
  int ret = enqueue(tsq->queue, item);
  pthread_cond_signal(&tsq->queue_not_empty);
  pthread_mutex_unlock(&tsq->enqueue_mutex);
  return ret;
}

int dequeueTS(ts_queue_t *tsq, void *item) {
  if (!tsq || !item) {
    queue_error_callback("%s: invalid input\n", __func__);
    return -1;
  }

  pthread_mutex_lock(&tsq->dequeue_mutex);
  while (numOfElements(tsq->queue) == 0) {
    pthread_cond_wait(&tsq->queue_not_empty, &tsq->dequeue_mutex);
  }
  int ret = dequeue(tsq->queue, item);
  pthread_cond_signal(&tsq->queue_not_full);
  pthread_mutex_unlock(&tsq->dequeue_mutex);
  return ret;
}

One straightforward solution is to make numOfElements atomic. However, I don't want to do this because

  1. I want to use pure lock mechanism.
  2. I want to separate pure queue module from thread-related function, so that open-close principle is followed.

My question: Are there other ways to make enqueueTS() and dequeueTS() run simultaneously without race conditions, while avoiding modifications to queue.c?

2
  • You could perhaps pass a pthread_mutex_t * argument to enqueue() and dequeue() pointing to a mutex used to control access to the numOfElements member. The enqueue() and dequeue() functions could allow the mutex pointer to be NULL to skip locking/unlocking. Commented Jul 23, 2024 at 13:26
  • 1
    How is the number of elements in a queue relevant at all when it can change (ie become invalid) any time? Commented Jul 23, 2024 at 13:49

1 Answer 1

2

I use two mutexes for enqueueTS() and dequeueTS(), so that writer and reader threads can run simultaneously without waiting for each other. However, both enqueueTS() and dequeueTS() can affect member numOfElements in struct queue_t, eventually causing race condition.

No "eventually" about it. Your code indeed has a data race involving the numOfElements member.

Are there [...] ways to make enqueueTS() and dequeueTS() run simultaneously without race conditions, while avoiding modifications to queue.c?

No.

A data race arises from two threads accessing the same non-atomic object, with at least one of the accesses being a write ("conflicting accesses"), without appropriate synchronization measures. The enqueue() and dequeue() functions you are starting with perform conflicting accesses to the numOfElements member.

If you cannot modify them then it doesn't matter that they do more work than just those accesses. Your only alternative is to synchronize them with respect to each other in addition to with respect to themselves. Gating access to them via the same mutex seems the most natural way for you to do that. You should continue to use different CVs, though, unless you switch to using pthread_cond_broadcast.

Note also that just making numOfElements atomic would not be a solution. Although you could eliminate the data race that way, that would still leave you with a generalized race condition that you would need to address with suitable modifications to the enqueue and dequeue functions.

Finally, I observe that your thread-safe version has a design vulnerability that the underlying queue does not: it will deadlock at least one thread if the enqueues and dequeues are sifficiently imbalanced. If there are threads that perform both enqueues and dequeues, then it may be possible for deadlocks to arise just from the enqueues getting too far ahead of the dequeues, or vice versa, even if overall the number of enqueues and dequeues is expected to be the same.

Sign up to request clarification or add additional context in comments.

2 Comments

I'd think there's likely some way to use a semaphore to prevent race conditions caused by enqueue and dequeue operating on the same element, since semaphores are counting objects. But that might be even slower than simply sharing a mutex.
@AndrewHenle, the main issue is not enqueue and dequeue operating on the same element. They would not likely do that unless the queue size were 1, but they always have conflicting accesses to the numOfElements member. The OP could use a semaphore as a replacement for that member, but they don't want to modify the queue data structure, and that would still leave them with much the same coherency issues that converting that member to atomic would do. They could also use a semaphore as a mutex, but for their purposes, that's not better than using an actual mutex.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.