@ -1,10 +1,7 @@
/ / SPDX - FileCopyrightText : Copyright ( c ) 2020 Erik Rigtorp < erik @ rigtorp . se >
/ / SPDX - FileCopyrightText : Copyright ( c ) 2020 Erik Rigtorp < erik @ rigtorp . se >
/ / SPDX - License - Identifier : MIT
/ / SPDX - License - Identifier : MIT
# pragma once
# pragma once
# ifdef _MSC_VER
# pragma warning(push)
# pragma warning(disable : 4324)
# endif
# include <atomic>
# include <atomic>
# include <bit>
# include <bit>
@ -12,105 +9,63 @@
# include <memory>
# include <memory>
# include <mutex>
# include <mutex>
# include <new>
# include <new>
# include <stdexcept>
# include <stop_token>
# include <stop_token>
# include <type_traits>
# include <type_traits>
# include <utility>
# include <utility>
namespace Common {
namespace Common {
namespace mpsc {
# if defined(__cpp_lib_hardware_interference_size)
# if defined(__cpp_lib_hardware_interference_size)
constexpr size_t hardware_interference_size = std : : hardware_destructive_interference_size ;
constexpr size_t hardware_interference_size = std : : hardware_destructive_interference_size ;
# else
# else
constexpr size_t hardware_interference_size = 64 ;
constexpr size_t hardware_interference_size = 64 ;
# endif
# endif
template < typename T >
using AlignedAllocator = std : : allocator < T > ;
template < typename T >
struct Slot {
~ Slot ( ) noexcept {
if ( turn . test ( ) ) {
destroy ( ) ;
}
}
template < typename . . . Args >
void construct ( Args & & . . . args ) noexcept {
static_assert ( std : : is_nothrow_constructible_v < T , Args & & . . . > ,
" T must be nothrow constructible with Args&&... " ) ;
std : : construct_at ( reinterpret_cast < T * > ( & storage ) , std : : forward < Args > ( args ) . . . ) ;
}
void destroy ( ) noexcept {
static_assert ( std : : is_nothrow_destructible_v < T > , " T must be nothrow destructible " ) ;
std : : destroy_at ( reinterpret_cast < T * > ( & storage ) ) ;
}
T & & move ( ) noexcept {
return reinterpret_cast < T & & > ( storage ) ;
}
/ / Align to avoid false sharing between adjacent slots
alignas ( hardware_interference_size ) std : : atomic_flag turn { } ;
struct aligned_store {
struct type {
alignas ( T ) unsigned char data [ sizeof ( T ) ] ;
} ;
} ;
typename aligned_store : : type storage ;
} ;
# ifdef _MSC_VER
# pragma warning(push)
# pragma warning(disable : 4324)
# endif
template < typename T , typename Allocator = AlignedAllocator < Slot < T > > >
class Queue {
template < typename T , size_t capacity = 0x400 >
class MPSCQueue {
public :
public :
explicit Queue ( const size_t capacity , const Allocator & allocator = Allocator ( ) )
: allocator_ ( allocator ) {
if ( capacity < 1 ) {
throw std : : invalid_argument ( " capacity < 1 " ) ;
}
/ / Ensure that the queue length is an integer power of 2
/ / This is so that idx ( i ) can be a simple i & mask_ insted of i % capacity
/ / https : / / github . com / rigtorp / MPMCQueue / pull / 36
if ( ! std : : has_single_bit ( capacity ) ) {
throw std : : invalid_argument ( " capacity must be an integer power of 2 " ) ;
}
mask_ = capacity - 1 ;
explicit MPSCQueue ( ) : allocator { std : : allocator < Slot < T > > ( ) } {
/ / Allocate one extra slot to prevent false sharing on the last slot
/ / Allocate one extra slot to prevent false sharing on the last slot
slots_ = allocator_ . allocate ( mask_ + 2 ) ;
slots = allocator . allocate ( capacity + 1 ) ;
/ / Allocators are not required to honor alignment for over - aligned types
/ / Allocators are not required to honor alignment for over - aligned types
/ / ( see http : / / eel . is / c + + draft / allocator . requirements # 10 ) so we verify
/ / ( see http : / / eel . is / c + + draft / allocator . requirements # 10 ) so we verify
/ / alignment here
/ / alignment here
if ( reinterpret_cast < uintptr_t > ( slots_ ) % alignof ( Slot < T > ) ! = 0 ) {
allocator_ . deallocate ( slots_ , mask_ + 2 ) ;
if ( reinterpret_cast < uintptr_t > ( slots ) % alignof ( Slot < T > ) ! = 0 ) {
allocator . deallocate ( slots , capacity + 1 ) ;
throw std : : bad_alloc ( ) ;
throw std : : bad_alloc ( ) ;
}
}
for ( size_t i = 0 ; i < mask_ + 1 ; + + i ) {
std : : construct_at ( & slots_ [ i ] ) ;
for ( size_t i = 0 ; i < capacity ; + + i ) {
std : : construct_at ( & slots [ i ] ) ;
}
}
static_assert ( std : : has_single_bit ( capacity ) , " capacity must be an integer power of 2 " ) ;
static_assert ( alignof ( Slot < T > ) = = hardware_interference_size ,
static_assert ( alignof ( Slot < T > ) = = hardware_interference_size ,
" Slot must be aligned to cache line boundary to prevent false sharing " ) ;
" Slot must be aligned to cache line boundary to prevent false sharing " ) ;
static_assert ( sizeof ( Slot < T > ) % hardware_interference_size = = 0 ,
static_assert ( sizeof ( Slot < T > ) % hardware_interference_size = = 0 ,
" Slot size must be a multiple of cache line size to prevent "
" Slot size must be a multiple of cache line size to prevent "
" false sharing between adjacent slots " ) ;
" false sharing between adjacent slots " ) ;
static_assert ( sizeof ( Queue ) % hardware_interference_size = = 0 ,
static_assert ( sizeof ( MPSCQueue ) % hardware_interference_size = = 0 ,
" Queue size must be a multiple of cache line size to "
" Queue size must be a multiple of cache line size to "
" prevent false sharing between adjacent queues " ) ;
" prevent false sharing between adjacent queues " ) ;
}
}
~ Queue ( ) noexcept {
for ( size_t i = 0 ; i < mask_ + 1 ; + + i ) {
slots_ [ i ] . ~ Slot ( ) ;
~ MPSCQueue ( ) noexcept {
for ( size_t i = 0 ; i < capacity ; + + i ) {
std : : destroy_at ( & slots [ i ] ) ;
}
}
allocator_ . deallocate ( slots_ , mask_ + 2 ) ;
allocator . deallocate ( slots , capacity + 1 ) ;
}
}
/ / non - copyable and non - movable
Queue ( const Queue & ) = delete ;
Queue & operator = ( const Queue & ) = delete ;
/ / The queue must be both non - copyable and non - movable
MPSCQueue ( const MPSCQueue & ) = delete ;
MPSCQueue & operator = ( const MPSCQueue & ) = delete ;
MPSCQueue ( MPSCQueue & & ) = delete ;
MPSCQueue & operator = ( MPSCQueue & & ) = delete ;
void Push ( const T & v ) noexcept {
void Push ( const T & v ) noexcept {
static_assert ( std : : is_nothrow_copy_constructible_v < T > ,
static_assert ( std : : is_nothrow_copy_constructible_v < T > ,
@ -125,8 +80,8 @@ public:
void Pop ( T & v , std : : stop_token stop ) noexcept {
void Pop ( T & v , std : : stop_token stop ) noexcept {
auto const tail = tail_ . fetch_add ( 1 ) ;
auto const tail = tail_ . fetch_add ( 1 ) ;
auto & slot = slots_ [ idx ( tail ) ] ;
if ( false = = slot . turn . test ( ) ) {
auto & slot = slots [ idx ( tail ) ] ;
if ( ! slot . turn . test ( ) ) {
std : : unique_lock lock { cv_mutex } ;
std : : unique_lock lock { cv_mutex } ;
cv . wait ( lock , stop , [ & slot ] { return slot . turn . test ( ) ; } ) ;
cv . wait ( lock , stop , [ & slot ] { return slot . turn . test ( ) ; } ) ;
}
}
@ -137,12 +92,46 @@ public:
}
}
private :
private :
template < typename U = T >
struct Slot {
~ Slot ( ) noexcept {
if ( turn . test ( ) ) {
destroy ( ) ;
}
}
template < typename . . . Args >
void construct ( Args & & . . . args ) noexcept {
static_assert ( std : : is_nothrow_constructible_v < U , Args & & . . . > ,
" T must be nothrow constructible with Args&&... " ) ;
std : : construct_at ( reinterpret_cast < U * > ( & storage ) , std : : forward < Args > ( args ) . . . ) ;
}
void destroy ( ) noexcept {
static_assert ( std : : is_nothrow_destructible_v < U > , " T must be nothrow destructible " ) ;
std : : destroy_at ( reinterpret_cast < U * > ( & storage ) ) ;
}
U & & move ( ) noexcept {
return reinterpret_cast < U & & > ( storage ) ;
}
/ / Align to avoid false sharing between adjacent slots
alignas ( hardware_interference_size ) std : : atomic_flag turn { } ;
struct aligned_store {
struct type {
alignas ( U ) unsigned char data [ sizeof ( U ) ] ;
} ;
} ;
typename aligned_store : : type storage ;
} ;
template < typename . . . Args >
template < typename . . . Args >
void emplace ( Args & & . . . args ) noexcept {
void emplace ( Args & & . . . args ) noexcept {
static_assert ( std : : is_nothrow_constructible_v < T , Args & & . . . > ,
static_assert ( std : : is_nothrow_constructible_v < T , Args & & . . . > ,
" T must be nothrow constructible with Args&&... " ) ;
" T must be nothrow constructible with Args&&... " ) ;
auto const head = head_ . fetch_add ( 1 ) ;
auto const head = head_ . fetch_add ( 1 ) ;
auto & slot = slots_ [ idx ( head ) ] ;
auto & slot = slots [ idx ( head ) ] ;
slot . turn . wait ( true ) ;
slot . turn . wait ( true ) ;
slot . construct ( std : : forward < Args > ( args ) . . . ) ;
slot . construct ( std : : forward < Args > ( args ) . . . ) ;
slot . turn . test_and_set ( ) ;
slot . turn . test_and_set ( ) ;
@ -150,31 +139,29 @@ private:
}
}
constexpr size_t idx ( size_t i ) const noexcept {
constexpr size_t idx ( size_t i ) const noexcept {
return i & mask_ ;
return i & mask ;
}
}
std : : conditional_t < true , std : : condition_variable_any , std : : condition_variable > cv ;
std : : mutex cv_mutex ;
size_t mask_ ;
Slot < T > * slots_ ;
[ [ no_unique_address ] ] Allocator allocator_ ;
static constexpr size_t mask = capacity - 1 ;
/ / Align to avoid false sharing between head_ and tail_
/ / Align to avoid false sharing between head_ and tail_
alignas ( hardware_interference_size ) std : : atomic < size_t > head_ { 0 } ;
alignas ( hardware_interference_size ) std : : atomic < size_t > head_ { 0 } ;
alignas ( hardware_interference_size ) std : : atomic < size_t > tail_ { 0 } ;
alignas ( hardware_interference_size ) std : : atomic < size_t > tail_ { 0 } ;
std : : mutex cv_mutex ;
std : : condition_variable_any cv ;
Slot < T > * slots ;
[ [ no_unique_address ] ] std : : allocator < Slot < T > > allocator ;
static_assert ( std : : is_nothrow_copy_assignable_v < T > | | std : : is_nothrow_move_assignable_v < T > ,
static_assert ( std : : is_nothrow_copy_assignable_v < T > | | std : : is_nothrow_move_assignable_v < T > ,
" T must be nothrow copy or move assignable " ) ;
" T must be nothrow copy or move assignable " ) ;
static_assert ( std : : is_nothrow_destructible_v < T > , " T must be nothrow destructible " ) ;
static_assert ( std : : is_nothrow_destructible_v < T > , " T must be nothrow destructible " ) ;
} ;
} ;
} / / namespace mpsc
template < typename T , typename Allocator = mpsc : : AlignedAllocator < mpsc : : Slot < T > > >
using MPSCQueue = mpsc : : Queue < T , Allocator > ;
} / / namespace Common
# ifdef _MSC_VER
# ifdef _MSC_VER
# pragma warning(pop)
# pragma warning(pop)
# endif
# endif
} / / namespace Common