@ -23,60 +23,76 @@ class SPSCQueue {
public :
bool TryPush ( T & & t ) {
const size_t write_index = m_write_index . load ( ) ;
/ / Check if we have free slots to write to .
if ( ( write_index - m_read_index . load ( ) ) = = Capacity ) {
return false ;
}
/ / Determine the position to write to .
const size_t pos = write_index % Capacity ;
/ / Push into the queue .
m_data [ pos ] = std : : move ( t ) ;
/ / Increment the write index .
+ + m_write_index ;
return Push < PushMode : : Try > ( std : : move ( t ) ) ;
}
/ / Notify the consumer that we have pushed into the queue .
std : : scoped_lock lock { cv_mutex } ;
cv . notify_one ( ) ;
template < typename . . . Args >
bool TryEmplace ( Args & & . . . args ) {
return Emplace < PushMode : : Try > ( std : : forward < Args > ( args ) . . . ) ;
}
return true ;
void PushWait ( T & & t ) {
Push < PushMode : : Wait > ( std : : move ( t ) ) ;
}
template < typename . . . Args >
bool TryPush ( Args & & . . . args ) {
const size_t write_index = m_write_index . load ( ) ;
void EmplaceWait ( Args & & . . . args ) {
Emplace < PushMode : : Wait > ( std : : forward < Args > ( args ) . . . ) ;
}
/ / Check if we have free slots to write to .
if ( ( write_index - m_read_index . load ( ) ) = = Capacity ) {
return false ;
}
bool TryPop ( T & t ) {
return Pop ( t ) ;
}
/ / Determine the position to write to .
const size_t pos = write_index % Capacity ;
void PopWait ( T & t , std : : stop_token stop_token ) {
Wait ( stop_token ) ;
Pop ( t ) ;
}
/ / Emplace into the queue .
std : : construct_at ( std : : addressof ( m_data [ pos ] ) , std : : forward < Args > ( args ) . . . ) ;
T PopWait ( std : : stop_token stop_token ) {
Wait ( stop_token ) ;
T t ;
Pop ( t ) ;
return t ;
}
/ / Increment the write index .
+ + m_write_index ;
void Clear ( ) {
while ( ! Empty ( ) ) {
Pop ( ) ;
}
}
/ / Notify the consumer that we have pushed into the queue .
std : : scoped_lock lock { cv_mutex } ;
cv . notify_one ( ) ;
bool Empty ( ) const {
return m_read_index . load ( ) = = m_write_index . load ( ) ;
}
return true ;
size_t Size ( ) const {
return m_write_index . load ( ) - m_read_index . load ( ) ;
}
void Push ( T & & t ) {
private :
enum class PushMode {
Try ,
Wait ,
Count ,
} ;
template < PushMode Mode >
bool Push ( T & & t ) {
const size_t write_index = m_write_index . load ( ) ;
/ / Wait until we have free slots to write to .
while ( ( write_index - m_read_index . load ( ) ) = = Capacity ) {
std : : this_thread : : yield ( ) ;
if constexpr ( Mode = = PushMode : : Try ) {
/ / Check if we have free slots to write to .
if ( ( write_index - m_read_index . load ( ) ) = = Capacity ) {
return false ;
}
} else if constexpr ( Mode = = PushMode : : Wait ) {
/ / Wait until we have free slots to write to .
while ( ( write_index - m_read_index . load ( ) ) = = Capacity ) {
std : : this_thread : : yield ( ) ;
}
} else {
static_assert ( Mode < PushMode : : Count , " Invalid PushMode. " ) ;
}
/ / Determine the position to write to .
@ -91,15 +107,26 @@ public:
/ / Notify the consumer that we have pushed into the queue .
std : : scoped_lock lock { cv_mutex } ;
cv . notify_one ( ) ;
return true ;
}
template < typename . . . Args >
void Push ( Args & & . . . args ) {
template < PushMode Mode , typename . . . Args >
bool Emplace ( Args & & . . . args ) {
const size_t write_index = m_write_index . load ( ) ;
/ / Wait until we have free slots to write to .
while ( ( write_index - m_read_index . load ( ) ) = = Capacity ) {
std : : this_thread : : yield ( ) ;
if constexpr ( Mode = = PushMode : : Try ) {
/ / Check if we have free slots to write to .
if ( ( write_index - m_read_index . load ( ) ) = = Capacity ) {
return false ;
}
} else if constexpr ( Mode = = PushMode : : Wait ) {
/ / Wait until we have free slots to write to .
while ( ( write_index - m_read_index . load ( ) ) = = Capacity ) {
std : : this_thread : : yield ( ) ;
}
} else {
static_assert ( Mode < PushMode : : Count , " Invalid PushMode. " ) ;
}
/ / Determine the position to write to .
@ -114,39 +141,10 @@ public:
/ / Notify the consumer that we have pushed into the queue .
std : : scoped_lock lock { cv_mutex } ;
cv . notify_one ( ) ;
}
bool TryPop ( T & t ) {
return Pop ( t ) ;
}
void PopWait ( T & t , std : : stop_token stop_token ) {
Wait ( stop_token ) ;
Pop ( t ) ;
}
T PopWait ( std : : stop_token stop_token ) {
Wait ( stop_token ) ;
T t ;
Pop ( t ) ;
return t ;
}
void Clear ( ) {
while ( ! Empty ( ) ) {
Pop ( ) ;
}
}
bool Empty ( ) const {
return m_read_index . load ( ) = = m_write_index . load ( ) ;
}
size_t Size ( ) const {
return m_write_index . load ( ) - m_read_index . load ( ) ;
return true ;
}
private :
void Pop ( ) {
const size_t read_index = m_read_index . load ( ) ;
@ -208,20 +206,20 @@ public:
}
template < typename . . . Args >
bool TryPush ( Args & & . . . args ) {
bool TryEmplace ( Args & & . . . args ) {
std : : scoped_lock lock { write_mutex } ;
return spsc_queue . TryPush ( std : : forward < Args > ( args ) . . . ) ;
return spsc_queue . TryEmplace ( std : : forward < Args > ( args ) . . . ) ;
}
void Push ( T & & t ) {
void PushWait ( T & & t ) {
std : : scoped_lock lock { write_mutex } ;
spsc_queue . Push ( std : : move ( t ) ) ;
spsc_queue . PushWait ( std : : move ( t ) ) ;
}
template < typename . . . Args >
void Push ( Args & & . . . args ) {
void EmplaceWait ( Args & & . . . args ) {
std : : scoped_lock lock { write_mutex } ;
spsc_queue . Push ( std : : forward < Args > ( args ) . . . ) ;
spsc_queue . EmplaceWait ( std : : forward < Args > ( args ) . . . ) ;
}
bool TryPop ( T & t ) {
@ -262,20 +260,20 @@ public:
}
template < typename . . . Args >
bool TryPush ( Args & & . . . args ) {
bool TryEmplace ( Args & & . . . args ) {
std : : scoped_lock lock { write_mutex } ;
return spsc_queue . TryPush ( std : : forward < Args > ( args ) . . . ) ;
return spsc_queue . TryEmplace ( std : : forward < Args > ( args ) . . . ) ;
}
void Push ( T & & t ) {
void PushWait ( T & & t ) {
std : : scoped_lock lock { write_mutex } ;
spsc_queue . Push ( std : : move ( t ) ) ;
spsc_queue . PushWait ( std : : move ( t ) ) ;
}
template < typename . . . Args >
void Push ( Args & & . . . args ) {
void EmplaceWait ( Args & & . . . args ) {
std : : scoped_lock lock { write_mutex } ;
spsc_queue . Push ( std : : forward < Args > ( args ) . . . ) ;
spsc_queue . EmplaceWait ( std : : forward < Args > ( args ) . . . ) ;
}
bool TryPop ( T & t ) {