0

I am trying to implement a lock-free multiple-producer-single-consumer ring buffer in C++. Here is the full definition and the test code.

#include <iostream>
#include <memory>
#include <atomic>
#include <thread>
#include <string>

template<typename T>
class RingBuffer
{
    public:
    RingBuffer(size_t size);
    bool push(const T& t);
    bool pop(T& t);
    private:
    size_t m_size;
    std::unique_ptr<T[]> m_buffer;
    std::atomic<size_t> m_writeIndex;
    std::atomic<size_t> m_endOfReadIndex;
    std::atomic<size_t> m_readIndex;
};

template<typename T>
RingBuffer<T>::RingBuffer(size_t size):m_size(size+1),m_buffer(new T[size+1]),m_writeIndex(0),m_endOfReadIndex(0),m_readIndex(0)
{
}

template<typename T>
bool RingBuffer<T>::push(const T& t)
{
    size_t readIndex;
    size_t writeIndex=m_writeIndex.load(std::memory_order_relaxed);
    do
    {
        readIndex=m_readIndex.load(std::memory_order_acquire);
        if((writeIndex+1)%m_size==readIndex)
        {
            return false;
        }
    } while (!m_writeIndex.compare_exchange_strong(writeIndex,(writeIndex+1)%m_size),std::memory_order_release,std::memory_order_relaxed);
    //std::cout<<std::to_string(writeIndex)+": "+t+"\n";
    m_buffer[writeIndex]=std::move(t);
    size_t endOfReadIndex;
    do
    {
        endOfReadIndex=writeIndex;
    } while (!m_endOfReadIndex.compare_exchange_strong(endOfReadIndex,(endOfReadIndex+1)%m_size),std::memory_order_release,std::memory_order_relaxed);

    return true;
}

template<typename T>
bool RingBuffer<T>::pop(T& t)
{
    size_t readIndex=m_readIndex.load(std::memory_order_relaxed);
    size_t endOfReadIndex=m_endOfReadIndex.load(std::memory_order_acquire);
    if(readIndex==endOfReadIndex)
    {
        return false;
    }
    t=m_buffer[readIndex];
    m_readIndex.store((readIndex+1)%m_size,std::memory_order_release);
    return true;
}

void produce(const std::string& prefix, RingBuffer<std::string>& rb)
{
    for(int i=1;i<=10;i++)
    {
        rb.push(prefix+std::to_string(i));
        std::this_thread::sleep_for(std::chrono::seconds(1));
    }
}

int main()
{
    RingBuffer<std::string> rb(30);
    std::thread t1(produce,"A",std::ref(rb));
    std::thread t2(produce,"B",std::ref(rb));
    std::thread t3(produce,"C",std::ref(rb));
    
    int i=0;
    while(i<30)
    {
        std::string value;
        if(rb.pop(value))
        {
            //std::cout<<value<<std::endl;
            ++i;
        }
        std::this_thread::sleep_for(std::chrono::milliseconds(500));
    }
    t1.join();
    t2.join();
    t3.join();
    return 0;
}

It appears that the m_writeIndex.compare_exchange_strong call in the push function would occasionally failed to update writeIndex variable when there is a mismatch, which means different threads could add new element to the same slot. If I uncomment the line right after it I could see output like this:

0: B1
1: A1
2: C1
3: C2
4: B2
5: A2
7: B3
7: A3 //Duplicated index used
6: C3
7: A4 //Duplicated index used
8: B4
9: C4
10: C5
11: B5
12: A5
13: A6
14: B6
15: C6
16: A7
17: C7
18: B7
18: B8 //Duplicated index used
19: C8
20: A8
21: A9
22: B9
23: C9
24: C10
25: B10
26: A10

However if I remove the two memory order paramenters in the m_writeIndex.compare_exchange_strong call then it would work as expected. Any idea why this happens? I am running this in VS Code on Windows. The compiler is cl.exe.

8
  • I'm not sure if I'm missing something, but where is A3, A4 and B8 duplicated in the list? Commented Nov 18 at 5:21
  • @NathanOliver It is not A3, A4 and B8 themselves duplicated but the index before them. A3 and A4 are both added to slot with index 7 which is also occupied by value B3, and B8 and B7 occupy the same slot with index 18. All indices should be unique in theory. Commented Nov 18 at 5:25
  • 1
    Please remember to paste code which compiles. Had to fix a lot to make it compile. Please check my fixes are aligned with your actual code. Commented Nov 18 at 10:36
  • 1
    @MarekR Indeed there are two parenthesis mismatches in the push function, but surprisingly VS code would not throw me any compilation error, though the result was not quite right. After fixing the parenthesis the code produces the correct result. I guess the problem is with VS Code then. It should have shouted out in my face that I got the code wrong. Commented Nov 18 at 12:51
  • 1
    Please don't edit answers into the question. Post it as an answer by clicking the "answer my own question" button, and roll back the question edit which turns it into a non-question. Commented Nov 18 at 17:31

2 Answers 2

2

Thanks to @MarekR I figured the root cause now. It appears that the two compare_swap_strong calls in push function have incorrectly placed parenthesis. More specifically a right parenthesis has been placed after the second parameter where in fact should be at the end of the statement. This causes the two memory order parameters to become part of the while loop condition thanks to comma operator, and they end up interfering with condition evaluation. Therefore the do while loop would terminate even if compare_swap_strong returns false. Moving the right parenthesis to the end of the line solves the problem.

Sign up to request clarification or add additional context in comments.

2 Comments

advice: enable as many warning as possible (for example /W4) and threat them as errors (/WX).
Your answer does not add up. With the current version of the broken code, it still does not compile, even when warnings are disabled. Unfortunately, the fact that you do not see this bad result now does not prove anything, because of the nondeterministic nature of multithreaded code. I’m not saying the fixed version is wrong; I’m saying that the code which produced this an invalid result had some other bug then this messed up parenthesis.
-5
#include <atomic>
#include <cassert>
#include <cstddef>
#include <iostream>
#include <memory>
#include <thread>
#include <vector>
#include <chrono>
#include <string>

// ---------- MPMC/MPSC bounded ring (Vyukov algorithm) ----------
template<typename T>
class RingBuffer {
public:
    explicit RingBuffer(size_t capacity)
    {
        // round up to power of two
        size_t cap = 1;
        while (cap < capacity) cap <<= 1;
        capacity_ = cap;
        mask_ = capacity_ - 1;

        buffer_ = std::unique_ptr<Node[]>(new Node[capacity_]);
        for (size_t i = 0; i < capacity_; ++i) {
            buffer_[i].sequence.store(i, std::memory_order_relaxed);
        }

        enqueuePos_.store(0, std::memory_order_relaxed);
        dequeuePos_.store(0, std::memory_order_relaxed);
    }

    // Non-copyable
    RingBuffer(const RingBuffer&) = delete;
    RingBuffer& operator=(const RingBuffer&) = delete;

    // Try to push. Returns false if buffer was full.
    bool push(const T& item)
    {
        Node* node;
        size_t pos = enqueuePos_.load(std::memory_order_relaxed);

        for (;;) {
            node = &buffer_[pos & mask_];
            size_t seq = node->sequence.load(std::memory_order_acquire);
            intptr_t dif = (intptr_t)seq - (intptr_t)pos;
            if (dif == 0) {
                // attempt to claim this slot by advancing enqueuePos_
                if (enqueuePos_.compare_exchange_weak(pos, pos + 1,
                                                      std::memory_order_relaxed,
                                                      std::memory_order_relaxed)) {
                    break; // we have claimed node
                }
                // else CAS failed, pos updated with current enqueuePos_ (weak semantics),
                // try again with new pos
            } else if (dif < 0) {
                // seq < pos => queue is full (another writer hasn't advanced seq)
                return false;
            } else {
                // seq > pos => some other producer has already advanced; try new pos
                pos = enqueuePos_.load(std::memory_order_relaxed);
            }
        }

        // We own the slot at node. Write data then publish by updating sequence.
        node->storage = item; // copy-assign the item
        node->sequence.store(pos + 1, std::memory_order_release);
        return true;
    }

    // Try to pop. Returns false if buffer empty.
    bool pop(T& item)
    {
        Node* node;
        size_t pos = dequeuePos_.load(std::memory_order_relaxed);
        node = &buffer_[pos & mask_];
        size_t seq = node->sequence.load(std::memory_order_acquire);
        intptr_t dif = (intptr_t)seq - (intptr_t)(pos + 1);
        if (dif == 0) {
            // slot is ready for consumption
            // read item
            item = node->storage;
            // mark slot as free for producers: set sequence to pos + capacity
            node->sequence.store(pos + capacity_, std::memory_order_release);
            dequeuePos_.store(pos + 1, std::memory_order_relaxed);
            return true;
        } else if (dif < 0) {
            // empty
            return false;
        }
        // otherwise seq > pos+1 indicates producer not yet published? Shouldn't happen for single consumer
        return false;
    }

    size_t capacity() const noexcept { return capacity_; }

private:
    struct Node {
        std::atomic<size_t> sequence;
        T storage;
    };

    size_t capacity_;
    size_t mask_;
    std::unique_ptr<Node[]> buffer_;
    std::atomic<size_t> enqueuePos_; // producers advance
    std::atomic<size_t> dequeuePos_; // consumer advances
};

// ---------------------- Test program ----------------------

void producer(RingBuffer<std::string>& rb, const std::string& prefix, int count, int sleep_ms)
{
    for (int i = 1; i <= count; ++i) {
        std::string s = prefix + std::to_string(i);
        // spin until pushed (drop or backoff policy could be used instead)
        while (!rb.push(s)) {
            std::this_thread::sleep_for(std::chrono::milliseconds(1));
        }
        if (sleep_ms > 0)
            std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms));
    }
}

int main()
{
    constexpr int num_producers = 3;
    constexpr int per_producer = 10;
    RingBuffer<std::string> rb(32); // capacity >= total in-flight

    std::vector<std::thread> producers;
    producers.reserve(num_producers);
    producers.emplace_back(producer, std::ref(rb), "A", per_producer, 100);
    producers.emplace_back(producer, std::ref(rb), "B", per_producer, 120);
    producers.emplace_back(producer, std::ref(rb), "C", per_producer, 80);

    int total_to_consume = num_producers * per_producer;
    int consumed = 0;
    auto start = std::chrono::steady_clock::now();
    while (consumed < total_to_consume) {
        std::string value;
        if (rb.pop(value)) {
            std::cout << "Consumed: " << value << "\n";
            ++consumed;
        } else {
            // no item right now - wait a bit
            std::this_thread::sleep_for(std::chrono::milliseconds(10));
        }
    }
    for (auto& t : producers) t.join();
    auto end = std::chrono::steady_clock::now();
    std::chrono::duration<double> dur = end - start;
    std::cout << "Done. Consumed " << consumed << " items in " << dur.count() << "s\n";
    return 0;
}

6 Comments

you can also check this
How and where would you check this?
While this code may solve the problem the answer would be a lot better with an explanation on how/why it does. Remember that your answer is not just for the user that asked the question but also for all the other people that find it.
Where is your explanation? I had an architect once.. he would swoop in, modify my code... say "it works" now and would leave. It might have been quick, but I didn't learn anything that way. In the end the best way should be to talk and ask questions to the person with a question so he can figure it out himself. That's not possible here so at least explain what the problem was in the original question and how your code fixes that.
A good question has three parts: an explanation of what went wrong for the asker, a solution, and an explanation of how the proposed solution solves the asker's problem. one out of three rarely cuts it.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.