QUILL_NODISCARDstaticvoid*_alloc_aligned(size_tsize,size_talignment,QUILL_MAYBE_UNUSEDboolhuges_pages_enabled){// Calculate the total size including the metadata and alignment// metadata保存了total_size和offset// | total_size | offs | xxxx | storage area |// | 元数据 |未对齐区域 | 对齐的存储区域 |// | 分配的全部区域 |constexprsize_tmetadata_size{2u*sizeof(size_t)};size_tconsttotal_size{size+metadata_size+alignment};// Allocate the memory,匿名 私有内存区域,私有用到了写时拷贝机制,修改的内容其它进程看不到。intflags=MAP_PRIVATE|MAP_ANONYMOUS;// 宏定义开启huge page#if defined(__linux__)if(huges_pages_enabled){flags|=MAP_HUGETLB;}#endifvoid*mem=::mmap(nullptr,total_size,PROT_READ|PROT_WRITE,flags,-1,0);if(mem==MAP_FAILED){QUILL_THROW(QuillError{std::string{"mmap failed. errno: "}+std::to_string(errno)+" error: "+strerror(errno)});}// Calculate the aligned address after the metadatastd::byte*aligned_address=_align_pointer(static_cast<std::byte*>(mem)+metadata_size,alignment);// Calculate the offset from the original memory locationautoconstoffset=static_cast<size_t>(aligned_address-static_cast<std::byte*>(mem));// Store the size and offset information in the metadatastd::memcpy(aligned_address-sizeof(size_t),&total_size,sizeof(total_size));std::memcpy(aligned_address-(2u*sizeof(size_t)),&offset,sizeof(offset));returnaligned_address;}
QUILL_NODISCARDstaticstd::byte*_align_pointer(void*pointer,size_talignment)noexcept
{assert(is_power_of_two(alignment)&&"alignment must be a power of two");returnreinterpret_cast<std::byte*>((reinterpret_cast<uintptr_t>(pointer)+(alignment-1ul))&~(alignment-1ul));}
voidstatic_free_aligned(void*ptr)noexcept{// Retrieve the size and offset information from the metadatasize_toffset;std::memcpy(&offset,static_cast<std::byte*>(ptr)-(2u*sizeof(size_t)),sizeof(offset));size_ttotal_size;std::memcpy(&total_size,static_cast<std::byte*>(ptr)-sizeof(size_t),sizeof(total_size));// Calculate the original memory block addressvoid*mem=static_cast<std::byte*>(ptr)-offset;::munmap(mem,total_size);}
QUILL_NODISCARDQUILL_ATTRIBUTE_HOTboolempty()constnoexcept{// reader会一直等writer commit提交了以后,也就是改变了_atomic_writer_pos,empty返回false,才能读取内容if(_writer_pos_cache==_reader_pos){// if we think the queue is empty we also load the atomic variable to check further_writer_pos_cache=_atomic_writer_pos.load(std::memory_order_acquire);if(_writer_pos_cache==_reader_pos){returntrue;}}returnfalse;}
do{// 更新read_posfrontend_queue.finish_read(bytes_read);}while(xxx);// If we read something from the queue, we commit all the reads together at the end.// This strategy enhances cache coherence performance by updating the shared atomic flag// only once.// 多次读取修改read_pos,一次提交修改shared atomic_read_pos,增强缓存一致性frontend_queue.commit_read();
QUILL_NODISCARDQUILL_ATTRIBUTE_HOTstd::byte*prepare_write(integer_typen)noexcept{// 判断剩余内存容量是否小于nif((_capacity-static_cast<integer_type>(_writer_pos-_reader_pos_cache))<n){// not enough space, we need to load reader and re-check// 获取_atomic_reader_pos,二次确认reader是否读了并且commit了_reader_pos_cache=_atomic_reader_pos.load(std::memory_order_acquire);if((_capacity-static_cast<integer_type>(_writer_pos-_reader_pos_cache))<n){returnnullptr;}}// 如果内存容量充裕,返回writer_pos对应的指针return_storage+(_writer_pos&_mask);}
QUILL_ATTRIBUTE_HOTvoidfinish_write(integer_typen)noexcept{_writer_pos+=n;}QUILL_ATTRIBUTE_HOTvoidcommit_write()noexcept{// set the atomic flag so the reader can see write_atomic_writer_pos.store(_writer_pos,std::memory_order_release);#if defined(QUILL_X86ARCH)// flush writen cache lines_flush_cachelines(_last_flushed_writer_pos,_writer_pos);// prefetch a future cache line_mm_prefetch(reinterpret_cast<charconst*>(_storage+(_writer_pos&_mask)+(CACHE_LINE_SIZE*10)),_MM_HINT_T0);#endif}