无锁队列实现

Bounded MPMC queue

参考:https://blog.dbplayer.org/crossbeam-note/

要讲明白这种实现,需要说明更多的细节:

head 和 tail 一般不存储 array 的下标,而是一个无限递增的序列号,用于避免 ABA 问题(恰好循环了一圈还是同一个节点导致的判断错误) 从序列号到下标,就是对 capacity(array size) 的取余,为了加速取余过程,capacity 会取 2 的幂次,这样 index = seq & (capacity - 1) 具体实现:

每个节点存了一个 stamp 表示节点的状态 如果 stamp 执行了下一圈对应的序列号,表示节点是空闲状态 默认即空闲状态,因此所有节点的 stamp 默认初始化为节点对应的下标 i pop 释放后节点会指向下一个圈的 i,即 capacity * cycle_count + i pop 时取到 tail 节点时,只要检查节点 stamp 是否和 tail 相等,即可确认节点是否空闲 节点被占用时,stamp 修改为原 stamp + 1 即可 stamp 表示的是节点状态,应该最后操作,push 时要先修改 head 再填充值再改 stamp,pop 时要先修改 tail 缓存值再修改 stamp pop 时修改 stamp 不需要知道 cycle_count,只需 stamp = stamp + (capacity - 1) 改进点:

相比于三种状态,只需要一次 cas 操作 某种程度上可以保证 queue 有序 绝大部分地方的内存序都是 relaxed,只有两次存储需要是 release 不涉及 head 和 tail 之间的关系,可以使用 cacheline padding 的技巧加速算法

Text Only
template<typename T>
class mpmc_bounded_queue
{
public:
  mpmc_bounded_queue(size_t buffer_size)
    : buffer_(new cell_t [buffer_size])
    , buffer_mask_(buffer_size - 1)
  {
    // 要求 buffer_size 至少是 2(因为序列号改状态要 +1) 大并且是 2 的倍数
    assert((buffer_size >= 2) &&
      ((buffer_size & (buffer_size - 1)) == 0));

    // 初始化每个节点中的 sequence_,设置为 i,即第一轮的空闲状态
    for (size_t i = 0; i != buffer_size; i += 1)
      buffer_[i].sequence_.store(i, std::memory_order_relaxed);

    // 初始化首尾指针
    enqueue_pos_.store(0, std::memory_order_relaxed);
    dequeue_pos_.store(0, std::memory_order_relaxed);
  }

  ~mpmc_bounded_queue()
  {
    delete [] buffer_;
  }

  bool enqueue(T const& data)
  {
    cell_t* cell;
    // 取 tail 序列号
    size_t pos = enqueue_pos_.load(std::memory_order_relaxed);

    for (;;)
    {
      // 找到 tail 节点,注意,无锁算法中这些快照值都可能是过期的
      cell = &buffer_[pos & buffer_mask_];

      // 取节点中存的序列号
      size_t seq = 
        cell->sequence_.load(std::memory_order_acquire);

      // 如果和 tail 的序列号一致,说明节点是空闲状态
      // 这里保证了 enqueue(push) 操作和 dequeue(pop) 操作是互斥的
      intptr_t dif = (intptr_t)seq - (intptr_t)pos;
      if (dif == 0)
      {
        // CAS 操作修改 tail 的值
        // 这里保证 enqueue(push) 操作之间是互斥的
        if (enqueue_pos_.compare_exchange_weak
            (pos, pos + 1, std::memory_order_relaxed))
          break;
      }
      // 这说明节点值还是上一轮的值,队列已满
      else if (dif < 0)
        return false;
      // 走到这里说明节点已经被占用了,tail 值不是最新的,需要更新
      else
        pos = enqueue_pos_.load(std::memory_order_relaxed);
    }

    // 填充值、修改 stamp 以允许被 pop
    cell->data_ = data;
    cell->sequence_.store(pos + 1, std::memory_order_release);
    return true;

  }

  bool dequeue(T& data)
  {
    cell_t* cell;
    // 取 head 的序列号
    size_t pos = dequeue_pos_.load(std::memory_order_relaxed);

    for (;;)
    {
      // 取 head 节点和对应序列号
      cell = &buffer_[pos & buffer_mask_];
      size_t seq = 
        cell->sequence_.load(std::memory_order_acquire);

      // 通过序列号和确认节点是可 pop 的,保证 dequeue 和 enqueue 互斥
      intptr_t dif = (intptr_t)seq - (intptr_t)(pos + 1);
      if (dif == 0)
      {
        // 修改 head,保证 dequeue 操作之间是互斥的
        if (dequeue_pos_.compare_exchange_weak
            (pos, pos + 1, std::memory_order_relaxed))
          break;
      }
      // 队列空了
      else if (dif < 0)
        return false;
      // 更新已经被更改的 head
      else
        pos = dequeue_pos_.load(std::memory_order_relaxed);
    }

    // 存数据快照,修改节点的序列号到下一圈空闲状态的值
    data = cell->data_;
    cell->sequence_.store
      (pos + buffer_mask_ + 1, std::memory_order_release);
    return true;
  }

private:

  struct cell_t
  {
    std::atomic<size_t>   sequence_;
    T                     data_;
  };

  // 保证 head / tail 在不同的 cacheline 中,修改更快
  static size_t const     cacheline_size = 64;
  typedef char            cacheline_pad_t [cacheline_size];
  cacheline_pad_t         pad0_;
  cell_t* const           buffer_;
  size_t const            buffer_mask_;
  cacheline_pad_t         pad1_;
  std::atomic<size_t>     enqueue_pos_;
  cacheline_pad_t         pad2_;
  std::atomic<size_t>     dequeue_pos_;
  cacheline_pad_t         pad3_;

  mpmc_bounded_queue(mpmc_bounded_queue const&);

  void operator = (mpmc_bounded_queue const&);
};