I am trying to implement a lock-free multiple-producer-single-consumer ring buffer in C++. Here is the full definition and the test code.
#include <iostream>
#include <memory>
#include <atomic>
#include <thread>
#include <string>
template<typename T>
class RingBuffer
{
public:
RingBuffer(size_t size);
bool push(const T& t);
bool pop(T& t);
private:
size_t m_size;
std::unique_ptr<T[]> m_buffer;
std::atomic<size_t> m_writeIndex;
std::atomic<size_t> m_endOfReadIndex;
std::atomic<size_t> m_readIndex;
};
template<typename T>
RingBuffer<T>::RingBuffer(size_t size):m_size(size+1),m_buffer(new T[size+1]),m_writeIndex(0),m_endOfReadIndex(0),m_readIndex(0)
{
}
template<typename T>
bool RingBuffer<T>::push(const T& t)
{
size_t readIndex;
size_t writeIndex=m_writeIndex.load(std::memory_order_relaxed);
do
{
readIndex=m_readIndex.load(std::memory_order_acquire);
if((writeIndex+1)%m_size==readIndex)
{
return false;
}
} while (!m_writeIndex.compare_exchange_strong(writeIndex,(writeIndex+1)%m_size),std::memory_order_release,std::memory_order_relaxed);
//std::cout<<std::to_string(writeIndex)+": "+t+"\n";
m_buffer[writeIndex]=std::move(t);
size_t endOfReadIndex;
do
{
endOfReadIndex=writeIndex;
} while (!m_endOfReadIndex.compare_exchange_strong(endOfReadIndex,(endOfReadIndex+1)%m_size),std::memory_order_release,std::memory_order_relaxed);
return true;
}
template<typename T>
bool RingBuffer<T>::pop(T& t)
{
size_t readIndex=m_readIndex.load(std::memory_order_relaxed);
size_t endOfReadIndex=m_endOfReadIndex.load(std::memory_order_acquire);
if(readIndex==endOfReadIndex)
{
return false;
}
t=m_buffer[readIndex];
m_readIndex.store((readIndex+1)%m_size,std::memory_order_release);
return true;
}
void produce(const std::string& prefix, RingBuffer<std::string>& rb)
{
for(int i=1;i<=10;i++)
{
rb.push(prefix+std::to_string(i));
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}
int main()
{
RingBuffer<std::string> rb(30);
std::thread t1(produce,"A",std::ref(rb));
std::thread t2(produce,"B",std::ref(rb));
std::thread t3(produce,"C",std::ref(rb));
int i=0;
while(i<30)
{
std::string value;
if(rb.pop(value))
{
//std::cout<<value<<std::endl;
++i;
}
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
t1.join();
t2.join();
t3.join();
return 0;
}
It appears that the m_writeIndex.compare_exchange_strong call in the push function would occasionally failed to update writeIndex variable when there is a mismatch, which means different threads could add new element to the same slot. If I uncomment the line right after it I could see output like this:
0: B1
1: A1
2: C1
3: C2
4: B2
5: A2
7: B3
7: A3 //Duplicated index used
6: C3
7: A4 //Duplicated index used
8: B4
9: C4
10: C5
11: B5
12: A5
13: A6
14: B6
15: C6
16: A7
17: C7
18: B7
18: B8 //Duplicated index used
19: C8
20: A8
21: A9
22: B9
23: C9
24: C10
25: B10
26: A10
However if I remove the two memory order paramenters in the m_writeIndex.compare_exchange_strong call then it would work as expected. Any idea why this happens? I am running this in VS Code on Windows. The compiler is cl.exe.