I wouldn’t necessarily trust this implementation. This is lock-free(ish). it’s lightly tested and just implemented for fun. The key insight for this sort of queue is a two-step approach in order to own a slot: see a slot available and successfully grab it.
If I want to push an element as a producer I have to do the following:
- see an empty slot
- successfully reserve the slot
- commit the write
The argument follows similarly for a consumer.
Extending naturally from a single producer single consumer, we have an atomic index for the write and read. However, unlike an spscqueue, the write and read indexes aren’t sufficient for a complete algorithm. In a spscqueue, you use the atomic increment of the write index to commit a push for a consumer to see; similarly for a consumer, you use the atomic increment of the read index to commit a pop for a producer to see.
This doesn’t work for mpmc because you have to synchronize grabbing a slot AND changing the state of a slot.
#pragma once
#include <atomic>
#include <cassert>
#include <immintrin.h>
using namespace std;
namespace ab
{
template<typename T>
class mpmcqueue
{
private:
static auto constexpr kCacheLineSize = 64;
size_t _buffer_size{};
struct alignas(kCacheLineSize) Slot
{
T Data{};
// NOTE: Seq has 4 states: WriteAvailable, PendingWrite, ReadAvailable, PendingRead
// Seq % 4 == 0 := WriteAvailable
// Seq % 4 == 1 := PendingWrite
// Seq % 4 == 2 := ReadAvailable
// Seq % 4 == 3 := PendingRead
atomic_size_t Seq{};
};
Slot* _buf;
alignas(kCacheLineSize) atomic<size_t> _writeIndex{};
alignas(kCacheLineSize) atomic<size_t> _readIndex{};
public:
mpmcqueue(size_t size) : _buffer_size(size), _buf(new Slot[size])
{
auto isPowerOf2 = (size >= 2 && size & (size - 1)) == 0;
assert(isPowerOf2);
}
~mpmcqueue()
{
delete[] _buf;
}
mpmcqueue(mpmcqueue&) = delete;
mpmcqueue& operator=(mpmcqueue&) = delete;
void push(T const& item)
{
auto writeIdx = _writeIndex.load(memory_order_acquire);
for(;;)
{
auto seq = _buf[idx(writeIdx)].Seq.load(memory_order_acquire);
if(seq % 4 == 0) // seq is 0 and write available
{
if(_writeIndex.compare_exchange_weak(writeIdx, writeIdx + 1))
{
if(_buf[idx(writeIdx)].Seq.compare_exchange_weak(seq, seq + 1)) // seq is what we expect, add 1 for pending write
{
_buf[idx(writeIdx)].Data = item;
_buf[idx(writeIdx)].Seq.store(seq + 2, memory_order_release); // add 1 more to complete write
return;
}
}
}
_mm_pause();
}
}
bool try_push(T const& item, size_t attempts = 10)
{
auto writeIdx = _writeIndex.load(memory_order_acquire);
for(auto _1 = 0; _1 < attempts; ++_1)
{
auto seq = _buf[idx(writeIdx)].Seq.load(memory_order_acquire);
if(seq % 4 == 0) // seq is 0 and write available
{
if(_writeIndex.compare_exchange_weak(writeIdx, writeIdx + 1))
{
if(_buf[idx(writeIdx)].Seq.compare_exchange_weak(seq, seq + 1)) // seq is what we expect, add 1 for pending write
{
_buf[idx(writeIdx)].Data = item;
_buf[idx(writeIdx)].Seq.store(seq + 2, memory_order_release); // add 1 more to complete write
return true;
}
}
}
_mm_pause();
}
return false;
}
void pop(T& item)
{
auto readIdx = _readIndex.load(memory_order_acquire);
for(;;)
{
auto seq = _buf[idx(readIdx)].Seq.load(memory_order_acquire);
if(seq % 4 == 2) // seq is 2 and read available
{
if(_readIndex.compare_exchange_weak(readIdx, readIdx + 1))
{
if(_buf[idx(readIdx)].Seq.compare_exchange_weak(seq, seq + 1)) // seq is what we expect, add 1 for pending read
{
_buf[idx(readIdx)].Data = item;
_buf[idx(readIdx)].Seq.store(seq + 2, memory_order_release); // add 1 more to complete read
return;
}
}
}
_mm_pause();
}
}
bool try_pop(T& item, size_t attempts = 10)
{
auto readIdx = _readIndex.load(memory_order_acquire);
for(auto _1 = 0; _1 < attempts; ++_1)
{
auto seq = _buf[idx(readIdx)].Seq.load(memory_order_acquire);
if(seq % 4 == 2) // seq is 2 and read available
{
if(_readIndex.compare_exchange_weak(readIdx, readIdx + 1))
{
if(_buf[idx(readIdx)].Seq.compare_exchange_weak(seq, seq + 1)) // seq is what we expect, add 1 for pending read
{
_buf[idx(readIdx)].Data = item;
_buf[idx(readIdx)].Seq.store(seq + 2, memory_order_release); // add 1 more to complete read
return true;
}
}
}
_mm_pause();
}
return false;
}
private:
inline size_t idx(size_t i)
{
// NOTE: _buffer_size is power of 2
return i & (_buffer_size - 1);
}
};
} // namespace ab