无锁队列实现
Bounded MPMC queue
参考:https://blog.dbplayer.org/crossbeam-note/
要讲明白这种实现,需要说明更多的细节:
head 和 tail 一般不存储 array 的下标,而是一个无限递增的序列号,用于避免 ABA 问题(恰好循环了一圈还是同一个节点导致的判断错误)
从序列号到下标,就是对 capacity(array size) 的取余,为了加速取余过程,capacity 会取 2 的幂次,这样 index = seq & (capacity - 1)
具体实现:
每个节点存了一个 stamp 表示节点的状态
如果 stamp 执行了下一圈对应的序列号,表示节点是空闲状态
默认即空闲状态,因此所有节点的 stamp 默认初始化为节点对应的下标 i
pop 释放后节点会指向下一个圈的 i,即 capacity * cycle_count + i
pop 时取到 tail 节点时,只要检查节点 stamp 是否和 tail 相等,即可确认节点是否空闲
节点被占用时,stamp 修改为原 stamp + 1 即可
stamp 表示的是节点状态,应该最后操作,push 时要先修改 head 再填充值再改 stamp,pop 时要先修改 tail 缓存值再修改 stamp
pop 时修改 stamp 不需要知道 cycle_count,只需 stamp = stamp + (capacity - 1)
改进点:
相比于三种状态,只需要一次 cas 操作
某种程度上可以保证 queue 有序
绝大部分地方的内存序都是 relaxed,只有两次存储需要是 release
不涉及 head 和 tail 之间的关系,可以使用 cacheline padding 的技巧加速算法
| Text Only |
|---|
| template<typename T>
class mpmc_bounded_queue
{
public:
mpmc_bounded_queue(size_t buffer_size)
: buffer_(new cell_t [buffer_size])
, buffer_mask_(buffer_size - 1)
{
// 要求 buffer_size 至少是 2(因为序列号改状态要 +1) 大并且是 2 的倍数
assert((buffer_size >= 2) &&
((buffer_size & (buffer_size - 1)) == 0));
// 初始化每个节点中的 sequence_,设置为 i,即第一轮的空闲状态
for (size_t i = 0; i != buffer_size; i += 1)
buffer_[i].sequence_.store(i, std::memory_order_relaxed);
// 初始化首尾指针
enqueue_pos_.store(0, std::memory_order_relaxed);
dequeue_pos_.store(0, std::memory_order_relaxed);
}
~mpmc_bounded_queue()
{
delete [] buffer_;
}
bool enqueue(T const& data)
{
cell_t* cell;
// 取 tail 序列号
size_t pos = enqueue_pos_.load(std::memory_order_relaxed);
for (;;)
{
// 找到 tail 节点,注意,无锁算法中这些快照值都可能是过期的
cell = &buffer_[pos & buffer_mask_];
// 取节点中存的序列号
size_t seq =
cell->sequence_.load(std::memory_order_acquire);
// 如果和 tail 的序列号一致,说明节点是空闲状态
// 这里保证了 enqueue(push) 操作和 dequeue(pop) 操作是互斥的
intptr_t dif = (intptr_t)seq - (intptr_t)pos;
if (dif == 0)
{
// CAS 操作修改 tail 的值
// 这里保证 enqueue(push) 操作之间是互斥的
if (enqueue_pos_.compare_exchange_weak
(pos, pos + 1, std::memory_order_relaxed))
break;
}
// 这说明节点值还是上一轮的值,队列已满
else if (dif < 0)
return false;
// 走到这里说明节点已经被占用了,tail 值不是最新的,需要更新
else
pos = enqueue_pos_.load(std::memory_order_relaxed);
}
// 填充值、修改 stamp 以允许被 pop
cell->data_ = data;
cell->sequence_.store(pos + 1, std::memory_order_release);
return true;
}
bool dequeue(T& data)
{
cell_t* cell;
// 取 head 的序列号
size_t pos = dequeue_pos_.load(std::memory_order_relaxed);
for (;;)
{
// 取 head 节点和对应序列号
cell = &buffer_[pos & buffer_mask_];
size_t seq =
cell->sequence_.load(std::memory_order_acquire);
// 通过序列号和确认节点是可 pop 的,保证 dequeue 和 enqueue 互斥
intptr_t dif = (intptr_t)seq - (intptr_t)(pos + 1);
if (dif == 0)
{
// 修改 head,保证 dequeue 操作之间是互斥的
if (dequeue_pos_.compare_exchange_weak
(pos, pos + 1, std::memory_order_relaxed))
break;
}
// 队列空了
else if (dif < 0)
return false;
// 更新已经被更改的 head
else
pos = dequeue_pos_.load(std::memory_order_relaxed);
}
// 存数据快照,修改节点的序列号到下一圈空闲状态的值
data = cell->data_;
cell->sequence_.store
(pos + buffer_mask_ + 1, std::memory_order_release);
return true;
}
private:
struct cell_t
{
std::atomic<size_t> sequence_;
T data_;
};
// 保证 head / tail 在不同的 cacheline 中,修改更快
static size_t const cacheline_size = 64;
typedef char cacheline_pad_t [cacheline_size];
cacheline_pad_t pad0_;
cell_t* const buffer_;
size_t const buffer_mask_;
cacheline_pad_t pad1_;
std::atomic<size_t> enqueue_pos_;
cacheline_pad_t pad2_;
std::atomic<size_t> dequeue_pos_;
cacheline_pad_t pad3_;
mpmc_bounded_queue(mpmc_bounded_queue const&);
void operator = (mpmc_bounded_queue const&);
};
|