diff --git a/src/audio_core/sink/sink_stream.cpp b/src/audio_core/sink/sink_stream.cpp index 22abeb60b7..f94217ebeb 100644 --- a/src/audio_core/sink/sink_stream.cpp +++ b/src/audio_core/sink/sink_stream.cpp @@ -1,5 +1,6 @@ // SPDX-FileCopyrightText: Copyright 2025 Eden Emulator Project // SPDX-License-Identifier: GPL-3.0-or-later + // SPDX-FileCopyrightText: Copyright 2018 yuzu Emulator Project // SPDX-License-Identifier: GPL-2.0-or-later @@ -27,19 +28,20 @@ void SinkStream::AppendBuffer(SinkBuffer& buffer, std::span samples) { constexpr s32 min = (std::numeric_limits::min)(); constexpr s32 max = (std::numeric_limits::max)(); + auto yuzu_volume = Settings::Volume(); if (yuzu_volume > 1.0f) yuzu_volume = 0.6f + 20.0f * std::log10(yuzu_volume); + yuzu_volume = std::max(yuzu_volume, 0.001f); auto const volume = system_volume * device_volume * yuzu_volume; + if (system_channels > device_channels) { - // "Topological" coefficients, basically makes back sounds be less noisy :) - // Front = 1.0; Center = 0.596; LFE = 0.354; Back = 0.707 static constexpr std::array tcoeff{1.0f, 0.596f, 0.354f, 0.707f}; - // We're given 6 channels, but our device only outputs 2, so downmix. for (u32 r_offs = 0, w_offs = 0; r_offs < samples.size(); r_offs += system_channels, w_offs += device_channels) { std::array ccoeff{0.f}; for (u32 i = 0; i < system_channels; ++i) ccoeff[i] = f32(samples[r_offs + i]); + std::array rcoeff{ ccoeff[u32(Channels::FrontLeft)], ccoeff[u32(Channels::BackLeft)], @@ -48,31 +50,34 @@ void SinkStream::AppendBuffer(SinkBuffer& buffer, std::span samples) { ccoeff[u32(Channels::BackRight)], ccoeff[u32(Channels::FrontRight)], }; - std::array scoeff{ - rcoeff[0] * tcoeff[0] + rcoeff[2] * tcoeff[1] + rcoeff[3] * tcoeff[2] + rcoeff[1] * tcoeff[3], - rcoeff[5] * tcoeff[0] + rcoeff[2] * tcoeff[1] + rcoeff[3] * tcoeff[2] + rcoeff[4] * tcoeff[3], - rcoeff[4] * tcoeff[0] + rcoeff[3] * tcoeff[1] + rcoeff[2] * tcoeff[2] + rcoeff[2] * tcoeff[3], - rcoeff[3] * tcoeff[0] + rcoeff[3] * tcoeff[1] + rcoeff[2] * tcoeff[2] + rcoeff[3] * tcoeff[3], - rcoeff[2] * tcoeff[0] + rcoeff[4] * tcoeff[1] + rcoeff[1] * tcoeff[2] + rcoeff[0] * tcoeff[3], - rcoeff[1] * tcoeff[0] + rcoeff[4] * tcoeff[1] + rcoeff[1] * tcoeff[2] + rcoeff[5] * tcoeff[3] - }; - for (u32 i = 0; i < system_channels; ++i) - samples[w_offs + i] = s16(std::clamp(s32(scoeff[i] * volume), min, max)); + + const f32 left = rcoeff[0] * tcoeff[0] + rcoeff[2] * tcoeff[1] + rcoeff[3] * tcoeff[2] + rcoeff[1] * tcoeff[3]; + const f32 right = rcoeff[5] * tcoeff[0] + rcoeff[2] * tcoeff[1] + rcoeff[3] * tcoeff[2] + rcoeff[4] * tcoeff[3]; + + samples[w_offs + 0] = s16(std::clamp(s32(left * volume), min, max)); + samples[w_offs + 1] = s16(std::clamp(s32(right * volume), min, max)); } + + queue.EmplaceWait(buffer); samples_buffer.Push(samples.subspan(0, samples.size() / system_channels * device_channels)); } else if (system_channels < device_channels) { - // We need moar samples! Not all games will provide 6 channel audio. std::vector new_samples(samples.size() / system_channels * device_channels); for (u32 r_offs = 0, w_offs = 0; r_offs < samples.size(); r_offs += system_channels, w_offs += device_channels) for (u32 channel = 0; channel < system_channels; ++channel) new_samples[w_offs + channel] = s16(std::clamp(s32(f32(samples[r_offs + channel]) * volume), min, max)); + + queue.EmplaceWait(buffer); samples_buffer.Push(new_samples); } else { - for (u32 i = 0; i < samples.size() && volume != 1.0f; ++i) - samples[i] = s16(std::clamp(s32(f32(samples[i]) * volume), min, max)); + if (volume != 1.0f) { + for (u32 i = 0; i < samples.size(); ++i) + samples[i] = s16(std::clamp(s32(f32(samples[i]) * volume), min, max)); + } + + queue.EmplaceWait(buffer); samples_buffer.Push(samples); } - queue.EmplaceWait(buffer); + ++queued_buffers; } @@ -96,10 +101,12 @@ std::vector SinkStream::ReleaseBuffer(u64 num_samples) { } void SinkStream::ClearQueue() { + std::scoped_lock lk{release_mutex}; + samples_buffer.Pop(); SinkBuffer tmp; - while (queue.TryPop(tmp)) - ; + while (queue.TryPop(tmp)); + queued_buffers = 0; playing_buffer = {}; playing_buffer.consumed = true; @@ -122,8 +129,7 @@ void SinkStream::ProcessAudioIn(std::span input_buffer, std::size_t n if (!queue.TryPop(playing_buffer)) { // If no buffer was available we've underrun, just push the samples and // continue. - samples_buffer.Push(&input_buffer[frames_written * frame_size], - (num_frames - frames_written) * frame_size); + samples_buffer.Push(&input_buffer[frames_written * frame_size], (num_frames - frames_written) * frame_size); frames_written = num_frames; continue; } @@ -133,11 +139,9 @@ void SinkStream::ProcessAudioIn(std::span input_buffer, std::size_t n // Get the minimum frames available between the currently playing buffer, and the // amount we have left to fill - size_t frames_available{std::min(playing_buffer.frames - playing_buffer.frames_played, - num_frames - frames_written)}; + size_t frames_available{std::min(playing_buffer.frames - playing_buffer.frames_played, num_frames - frames_written)}; - samples_buffer.Push(&input_buffer[frames_written * frame_size], - frames_available * frame_size); + samples_buffer.Push(&input_buffer[frames_written * frame_size], frames_available * frame_size); frames_written += frames_available; playing_buffer.frames_played += frames_available; @@ -158,63 +162,49 @@ void SinkStream::ProcessAudioOutAndRender(std::span output_buffer, std::siz size_t frames_written{0}; size_t actual_frames_written{0}; - // If we're paused or going to shut down, we don't want to consume buffers as coretiming is - // paused and we'll desync, so just play silence. if (system.IsPaused() || system.IsShuttingDown()) { if (system.IsShuttingDown()) { - { - std::scoped_lock lk{release_mutex}; - queued_buffers.store(0); - } + std::scoped_lock lk{release_mutex}; + queued_buffers.store(0); release_cv.notify_one(); } static constexpr std::array silence{}; - for (size_t i = frames_written; i < num_frames; i++) - std::memcpy(&output_buffer[i * frame_size], &silence[0], frame_size_bytes); + for (size_t i = 0; i < num_frames; i++) + std::memcpy(&output_buffer[i * frame_size], silence.data(), frame_size_bytes); return; } while (frames_written < num_frames) { - // If the playing buffer has been consumed or has no frames, we need a new one if (playing_buffer.consumed || playing_buffer.frames == 0) { + std::unique_lock lk{release_mutex}; + if (!queue.TryPop(playing_buffer)) { - // If no buffer was available we've underrun, fill the remaining buffer with - // the last written frame and continue. + lk.unlock(); for (size_t i = frames_written; i < num_frames; i++) - std::memcpy(&output_buffer[i * frame_size], &last_frame[0], frame_size_bytes); + std::memcpy(&output_buffer[i * frame_size], last_frame.data(), frame_size_bytes); frames_written = num_frames; continue; } - // Successfully dequeued a new buffer. - { - std::unique_lock lk{release_mutex}; - queued_buffers--; - } + + --queued_buffers; + lk.unlock(); release_cv.notify_one(); } - // Get the minimum frames available between the currently playing buffer, and the - // amount we have left to fill - size_t frames_available{std::min(playing_buffer.frames - playing_buffer.frames_played, - num_frames - frames_written)}; + const size_t frames_available = std::min(playing_buffer.frames - playing_buffer.frames_played, num_frames - frames_written); - samples_buffer.Pop(&output_buffer[frames_written * frame_size], - frames_available * frame_size); + samples_buffer.Pop(&output_buffer[frames_written * frame_size], frames_available * frame_size); frames_written += frames_available; actual_frames_written += frames_available; playing_buffer.frames_played += frames_available; - // If that's all the frames in the current buffer, add its samples and mark it as - // consumed - if (playing_buffer.frames_played >= playing_buffer.frames) { + if (playing_buffer.frames_played >= playing_buffer.frames) playing_buffer.consumed = true; - } } - std::memcpy(&last_frame[0], &output_buffer[(frames_written - 1) * frame_size], - frame_size_bytes); + std::memcpy(last_frame.data(), &output_buffer[(frames_written - 1) * frame_size], frame_size_bytes); { std::scoped_lock lk{sample_count_lock}; @@ -228,8 +218,7 @@ u64 SinkStream::GetExpectedPlayedSampleCount() { std::scoped_lock lk{sample_count_lock}; auto cur_time{system.CoreTiming().GetGlobalTimeNs()}; auto time_delta{cur_time - last_sample_count_update_time}; - auto exp_played_sample_count{min_played_sample_count + - (TargetSampleRate * time_delta) / std::chrono::seconds{1}}; + auto exp_played_sample_count{min_played_sample_count + (TargetSampleRate * time_delta) / std::chrono::seconds{1}}; // Add 15ms of latency in sample reporting to allow for some leeway in scheduler timings return std::min(exp_played_sample_count, max_played_sample_count) + TargetSampleCount * 3; @@ -238,14 +227,14 @@ u64 SinkStream::GetExpectedPlayedSampleCount() { void SinkStream::WaitFreeSpace(std::stop_token stop_token) { std::unique_lock lk{release_mutex}; - const auto has_space = [this]() { - const u32 current_size = queued_buffers.load(std::memory_order_relaxed); - return paused || max_queue_size == 0 || current_size < max_queue_size; + auto can_continue = [this]() { + return paused || queued_buffers < max_queue_size; }; - if (!has_space()) { - // Wait until the queue falls below the configured limit or the stream is paused/stopped. - release_cv.wait(lk, stop_token, has_space); + release_cv.wait_for(lk, std::chrono::milliseconds(10), can_continue); + + if (queued_buffers > max_queue_size + 10) { + release_cv.wait(lk, stop_token, can_continue); } } diff --git a/src/audio_core/sink/sink_stream.h b/src/audio_core/sink/sink_stream.h index b1fc91b649..acb7537dda 100644 --- a/src/audio_core/sink/sink_stream.h +++ b/src/audio_core/sink/sink_stream.h @@ -15,10 +15,10 @@ #include #include "audio_core/common/common.h" +#include "common/bounded_threadsafe_queue.h" #include "common/common_types.h" #include "common/polyfill_thread.h" #include "common/ring_buffer.h" -#include "common/bounded_threadsafe_queue.h" #include "common/thread.h" namespace Core { diff --git a/src/common/CMakeLists.txt b/src/common/CMakeLists.txt index 2c2eeb189c..17d6a42f37 100644 --- a/src/common/CMakeLists.txt +++ b/src/common/CMakeLists.txt @@ -28,7 +28,6 @@ add_library( announce_multiplayer_room.h assert.cpp assert.h - atomic_helpers.h atomic_ops.h bit_field.h bit_util.h diff --git a/src/common/atomic_helpers.h b/src/common/atomic_helpers.h deleted file mode 100644 index d997f10bac..0000000000 --- a/src/common/atomic_helpers.h +++ /dev/null @@ -1,776 +0,0 @@ -// SPDX-FileCopyrightText: 2013-2016 Cameron Desrochers -// SPDX-FileCopyrightText: 2015 Jeff Preshing -// SPDX-License-Identifier: BSD-2-Clause AND Zlib - -// Distributed under the simplified BSD license (see the license file that -// should have come with this header). -// Uses Jeff Preshing's semaphore implementation (under the terms of its -// separate zlib license, embedded below). - -#pragma once - -// Provides portable (VC++2010+, Intel ICC 13, GCC 4.7+, and anything C++11 compliant) -// implementation of low-level memory barriers, plus a few semi-portable utility macros (for -// inlining and alignment). Also has a basic atomic type (limited to hardware-supported atomics with -// no memory ordering guarantees). Uses the AE_* prefix for macros (historical reasons), and the -// "moodycamel" namespace for symbols. - -#include -#include -#include -#include -#include - -// Platform detection -#if defined(__INTEL_COMPILER) -#define AE_ICC -#elif defined(_MSC_VER) -#define AE_VCPP -#elif defined(__GNUC__) -#define AE_GCC -#endif - -#if defined(_M_IA64) || defined(__ia64__) -#define AE_ARCH_IA64 -#elif defined(_WIN64) || defined(__amd64__) || defined(_M_X64) || defined(__x86_64__) -#define AE_ARCH_X64 -#elif defined(_M_IX86) || defined(__i386__) -#define AE_ARCH_X86 -#elif defined(_M_PPC) || defined(__powerpc__) -#define AE_ARCH_PPC -#else -#define AE_ARCH_UNKNOWN -#endif - -// AE_UNUSED -#define AE_UNUSED(x) ((void)x) - -// AE_NO_TSAN/AE_TSAN_ANNOTATE_* -#if defined(__has_feature) -#if __has_feature(thread_sanitizer) -#if __cplusplus >= 201703L // inline variables require C++17 -namespace Common { -inline int ae_tsan_global; -} -#define AE_TSAN_ANNOTATE_RELEASE() \ - AnnotateHappensBefore(__FILE__, __LINE__, (void*)(&::moodycamel::ae_tsan_global)) -#define AE_TSAN_ANNOTATE_ACQUIRE() \ - AnnotateHappensAfter(__FILE__, __LINE__, (void*)(&::moodycamel::ae_tsan_global)) -extern "C" void AnnotateHappensBefore(const char*, int, void*); -extern "C" void AnnotateHappensAfter(const char*, int, void*); -#else // when we can't work with tsan, attempt to disable its warnings -#define AE_NO_TSAN __attribute__((no_sanitize("thread"))) -#endif -#endif -#endif -#ifndef AE_NO_TSAN -#define AE_NO_TSAN -#endif -#ifndef AE_TSAN_ANNOTATE_RELEASE -#define AE_TSAN_ANNOTATE_RELEASE() -#define AE_TSAN_ANNOTATE_ACQUIRE() -#endif - -// AE_FORCEINLINE -#if defined(AE_VCPP) || defined(AE_ICC) -#define AE_FORCEINLINE __forceinline -#elif defined(AE_GCC) -// #define AE_FORCEINLINE __attribute__((always_inline)) -#define AE_FORCEINLINE inline -#else -#define AE_FORCEINLINE inline -#endif - -// AE_ALIGN -#if defined(AE_VCPP) || defined(AE_ICC) -#define AE_ALIGN(x) __declspec(align(x)) -#elif defined(AE_GCC) -#define AE_ALIGN(x) __attribute__((aligned(x))) -#else -// Assume GCC compliant syntax... -#define AE_ALIGN(x) __attribute__((aligned(x))) -#endif - -// Portable atomic fences implemented below: - -namespace Common { - -enum memory_order { - memory_order_relaxed, - memory_order_acquire, - memory_order_release, - memory_order_acq_rel, - memory_order_seq_cst, - - // memory_order_sync: Forces a full sync: - // #LoadLoad, #LoadStore, #StoreStore, and most significantly, #StoreLoad - memory_order_sync = memory_order_seq_cst -}; - -} // namespace Common - -#if (defined(AE_VCPP) && (_MSC_VER < 1700 || defined(__cplusplus_cli))) || \ - (defined(AE_ICC) && __INTEL_COMPILER < 1600) -// VS2010 and ICC13 don't support std::atomic_*_fence, implement our own fences - -#include - -#if defined(AE_ARCH_X64) || defined(AE_ARCH_X86) -#define AeFullSync _mm_mfence -#define AeLiteSync _mm_mfence -#elif defined(AE_ARCH_IA64) -#define AeFullSync __mf -#define AeLiteSync __mf -#elif defined(AE_ARCH_PPC) -#include -#define AeFullSync __sync -#define AeLiteSync __lwsync -#endif - -#ifdef AE_VCPP -#pragma warning(push) -#pragma warning(disable : 4365) // Disable erroneous 'conversion from long to unsigned int, - // signed/unsigned mismatch' error when using `assert` -#ifdef __cplusplus_cli -#pragma managed(push, off) -#endif -#endif - -namespace Common { - -AE_FORCEINLINE void compiler_fence(memory_order order) AE_NO_TSAN { - switch (order) { - case memory_order_relaxed: - break; - case memory_order_acquire: - _ReadBarrier(); - break; - case memory_order_release: - _WriteBarrier(); - break; - case memory_order_acq_rel: - _ReadWriteBarrier(); - break; - case memory_order_seq_cst: - _ReadWriteBarrier(); - break; - default: - assert(false); - break; - } -} - -// x86/x64 have a strong memory model -- all loads and stores have -// acquire and release semantics automatically (so only need compiler -// barriers for those). -#if defined(AE_ARCH_X86) || defined(AE_ARCH_X64) -AE_FORCEINLINE void fence(memory_order order) AE_NO_TSAN { - switch (order) { - case memory_order_relaxed: - break; - case memory_order_acquire: - _ReadBarrier(); - break; - case memory_order_release: - _WriteBarrier(); - break; - case memory_order_acq_rel: - _ReadWriteBarrier(); - break; - case memory_order_seq_cst: - _ReadWriteBarrier(); - AeFullSync(); - _ReadWriteBarrier(); - break; - default: - assert(false); - } -} -#else -AE_FORCEINLINE void fence(memory_order order) AE_NO_TSAN { - // Non-specialized arch, use heavier memory barriers everywhere just in case :-( - switch (order) { - case memory_order_relaxed: - break; - case memory_order_acquire: - _ReadBarrier(); - AeLiteSync(); - _ReadBarrier(); - break; - case memory_order_release: - _WriteBarrier(); - AeLiteSync(); - _WriteBarrier(); - break; - case memory_order_acq_rel: - _ReadWriteBarrier(); - AeLiteSync(); - _ReadWriteBarrier(); - break; - case memory_order_seq_cst: - _ReadWriteBarrier(); - AeFullSync(); - _ReadWriteBarrier(); - break; - default: - assert(false); - } -} -#endif -} // namespace Common -#else -// Use standard library of atomics -#include - -namespace Common { - -AE_FORCEINLINE void compiler_fence(memory_order order) AE_NO_TSAN { - switch (order) { - case memory_order_relaxed: - break; - case memory_order_acquire: - std::atomic_signal_fence(std::memory_order_acquire); - break; - case memory_order_release: - std::atomic_signal_fence(std::memory_order_release); - break; - case memory_order_acq_rel: - std::atomic_signal_fence(std::memory_order_acq_rel); - break; - case memory_order_seq_cst: - std::atomic_signal_fence(std::memory_order_seq_cst); - break; - default: - assert(false); - } -} - -AE_FORCEINLINE void fence(memory_order order) AE_NO_TSAN { - switch (order) { - case memory_order_relaxed: - break; - case memory_order_acquire: - AE_TSAN_ANNOTATE_ACQUIRE(); - std::atomic_thread_fence(std::memory_order_acquire); - break; - case memory_order_release: - AE_TSAN_ANNOTATE_RELEASE(); - std::atomic_thread_fence(std::memory_order_release); - break; - case memory_order_acq_rel: - AE_TSAN_ANNOTATE_ACQUIRE(); - AE_TSAN_ANNOTATE_RELEASE(); - std::atomic_thread_fence(std::memory_order_acq_rel); - break; - case memory_order_seq_cst: - AE_TSAN_ANNOTATE_ACQUIRE(); - AE_TSAN_ANNOTATE_RELEASE(); - std::atomic_thread_fence(std::memory_order_seq_cst); - break; - default: - assert(false); - } -} - -} // namespace Common - -#endif - -#if !defined(AE_VCPP) || (_MSC_VER >= 1700 && !defined(__cplusplus_cli)) -#define AE_USE_STD_ATOMIC_FOR_WEAK_ATOMIC -#endif - -#ifdef AE_USE_STD_ATOMIC_FOR_WEAK_ATOMIC -#include -#endif -#include - -// WARNING: *NOT* A REPLACEMENT FOR std::atomic. READ CAREFULLY: -// Provides basic support for atomic variables -- no memory ordering guarantees are provided. -// The guarantee of atomicity is only made for types that already have atomic load and store -// guarantees at the hardware level -- on most platforms this generally means aligned pointers and -// integers (only). -namespace Common { -template -class weak_atomic { -public: - AE_NO_TSAN weak_atomic() : value() {} -#ifdef AE_VCPP -#pragma warning(push) -#pragma warning(disable : 4100) // Get rid of (erroneous) 'unreferenced formal parameter' warning -#endif - template - AE_NO_TSAN weak_atomic(U&& x) : value(std::forward(x)) {} -#ifdef __cplusplus_cli - // Work around bug with universal reference/nullptr combination that only appears when /clr is - // on - AE_NO_TSAN weak_atomic(nullptr_t) : value(nullptr) {} -#endif - AE_NO_TSAN weak_atomic(weak_atomic const& other) : value(other.load()) {} - AE_NO_TSAN weak_atomic(weak_atomic&& other) : value(std::move(other.load())) {} -#ifdef AE_VCPP -#pragma warning(pop) -#endif - - AE_FORCEINLINE operator T() const AE_NO_TSAN { - return load(); - } - -#ifndef AE_USE_STD_ATOMIC_FOR_WEAK_ATOMIC - template - AE_FORCEINLINE weak_atomic const& operator=(U&& x) AE_NO_TSAN { - value = std::forward(x); - return *this; - } - AE_FORCEINLINE weak_atomic const& operator=(weak_atomic const& other) AE_NO_TSAN { - value = other.value; - return *this; - } - - AE_FORCEINLINE T load() const AE_NO_TSAN { - return value; - } - - AE_FORCEINLINE T fetch_add_acquire(T increment) AE_NO_TSAN { -#if defined(AE_ARCH_X64) || defined(AE_ARCH_X86) - if (sizeof(T) == 4) - return _InterlockedExchangeAdd((long volatile*)&value, (long)increment); -#if defined(_M_AMD64) - else if (sizeof(T) == 8) - return _InterlockedExchangeAdd64((long long volatile*)&value, (long long)increment); -#endif -#else -#error Unsupported platform -#endif - assert(false && "T must be either a 32 or 64 bit type"); - return value; - } - - AE_FORCEINLINE T fetch_add_release(T increment) AE_NO_TSAN { -#if defined(AE_ARCH_X64) || defined(AE_ARCH_X86) - if (sizeof(T) == 4) - return _InterlockedExchangeAdd((long volatile*)&value, (long)increment); -#if defined(_M_AMD64) - else if (sizeof(T) == 8) - return _InterlockedExchangeAdd64((long long volatile*)&value, (long long)increment); -#endif -#else -#error Unsupported platform -#endif - assert(false && "T must be either a 32 or 64 bit type"); - return value; - } -#else - template - AE_FORCEINLINE weak_atomic const& operator=(U&& x) AE_NO_TSAN { - value.store(std::forward(x), std::memory_order_relaxed); - return *this; - } - - AE_FORCEINLINE weak_atomic const& operator=(weak_atomic const& other) AE_NO_TSAN { - value.store(other.value.load(std::memory_order_relaxed), std::memory_order_relaxed); - return *this; - } - - AE_FORCEINLINE T load() const AE_NO_TSAN { - return value.load(std::memory_order_relaxed); - } - - AE_FORCEINLINE T fetch_add_acquire(T increment) AE_NO_TSAN { - return value.fetch_add(increment, std::memory_order_acquire); - } - - AE_FORCEINLINE T fetch_add_release(T increment) AE_NO_TSAN { - return value.fetch_add(increment, std::memory_order_release); - } -#endif - -private: -#ifndef AE_USE_STD_ATOMIC_FOR_WEAK_ATOMIC - // No std::atomic support, but still need to circumvent compiler optimizations. - // `volatile` will make memory access slow, but is guaranteed to be reliable. - volatile T value; -#else - std::atomic value; -#endif -}; - -} // namespace Common - -// Portable single-producer, single-consumer semaphore below: - -#if defined(_WIN32) -// Avoid including windows.h in a header; we only need a handful of -// items, so we'll redeclare them here (this is relatively safe since -// the API generally has to remain stable between Windows versions). -// I know this is an ugly hack but it still beats polluting the global -// namespace with thousands of generic names or adding a .cpp for nothing. -extern "C" { -struct _SECURITY_ATTRIBUTES; -__declspec(dllimport) void* __stdcall CreateSemaphoreW(_SECURITY_ATTRIBUTES* lpSemaphoreAttributes, - long lInitialCount, long lMaximumCount, - const wchar_t* lpName); -__declspec(dllimport) int __stdcall CloseHandle(void* hObject); -__declspec(dllimport) unsigned long __stdcall WaitForSingleObject(void* hHandle, - unsigned long dwMilliseconds); -__declspec(dllimport) int __stdcall ReleaseSemaphore(void* hSemaphore, long lReleaseCount, - long* lpPreviousCount); -} -#elif defined(__MACH__) -#include -#elif defined(__unix__) -#include -#elif defined(FREERTOS) -#include -#include -#include -#endif - -namespace Common { -// Code in the spsc_sema namespace below is an adaptation of Jeff Preshing's -// portable + lightweight semaphore implementations, originally from -// https://github.com/preshing/cpp11-on-multicore/blob/master/common/sema.h -// LICENSE: -// Copyright (c) 2015 Jeff Preshing -// -// This software is provided 'as-is', without any express or implied -// warranty. In no event will the authors be held liable for any damages -// arising from the use of this software. -// -// Permission is granted to anyone to use this software for any purpose, -// including commercial applications, and to alter it and redistribute it -// freely, subject to the following restrictions: -// -// 1. The origin of this software must not be misrepresented; you must not -// claim that you wrote the original software. If you use this software -// in a product, an acknowledgement in the product documentation would be -// appreciated but is not required. -// 2. Altered source versions must be plainly marked as such, and must not be -// misrepresented as being the original software. -// 3. This notice may not be removed or altered from any source distribution. -namespace spsc_sema { -#if defined(_WIN32) -class Semaphore { -private: - void* m_hSema; - - Semaphore(const Semaphore& other); - Semaphore& operator=(const Semaphore& other); - -public: - AE_NO_TSAN Semaphore(int initialCount = 0) : m_hSema() { - assert(initialCount >= 0); - const long maxLong = 0x7fffffff; - m_hSema = CreateSemaphoreW(nullptr, initialCount, maxLong, nullptr); - assert(m_hSema); - } - - AE_NO_TSAN ~Semaphore() { - CloseHandle(m_hSema); - } - - bool wait() AE_NO_TSAN { - const unsigned long infinite = 0xffffffff; - return WaitForSingleObject(m_hSema, infinite) == 0; - } - - bool try_wait() AE_NO_TSAN { - return WaitForSingleObject(m_hSema, 0) == 0; - } - - bool timed_wait(std::uint64_t usecs) AE_NO_TSAN { - return WaitForSingleObject(m_hSema, (unsigned long)(usecs / 1000)) == 0; - } - - void signal(int count = 1) AE_NO_TSAN { - while (!ReleaseSemaphore(m_hSema, count, nullptr)) - ; - } -}; -#elif defined(__MACH__) -//--------------------------------------------------------- -// Semaphore (Apple iOS and OSX) -// Can't use POSIX semaphores due to -// http://lists.apple.com/archives/darwin-kernel/2009/Apr/msg00010.html -//--------------------------------------------------------- -class Semaphore { -private: - semaphore_t m_sema; - - Semaphore(const Semaphore& other); - Semaphore& operator=(const Semaphore& other); - -public: - AE_NO_TSAN Semaphore(int initialCount = 0) : m_sema() { - assert(initialCount >= 0); - kern_return_t rc = - semaphore_create(mach_task_self(), &m_sema, SYNC_POLICY_FIFO, initialCount); - assert(rc == KERN_SUCCESS); - AE_UNUSED(rc); - } - - AE_NO_TSAN ~Semaphore() { - semaphore_destroy(mach_task_self(), m_sema); - } - - bool wait() AE_NO_TSAN { - return semaphore_wait(m_sema) == KERN_SUCCESS; - } - - bool try_wait() AE_NO_TSAN { - return timed_wait(0); - } - - bool timed_wait(std::uint64_t timeout_usecs) AE_NO_TSAN { - mach_timespec_t ts; - ts.tv_sec = static_cast(timeout_usecs / 1000000); - ts.tv_nsec = static_cast((timeout_usecs % 1000000) * 1000); - - // added in OSX 10.10: - // https://developer.apple.com/library/prerelease/mac/documentation/General/Reference/APIDiffsMacOSX10_10SeedDiff/modules/Darwin.html - kern_return_t rc = semaphore_timedwait(m_sema, ts); - return rc == KERN_SUCCESS; - } - - void signal() AE_NO_TSAN { - while (semaphore_signal(m_sema) != KERN_SUCCESS) - ; - } - - void signal(int count) AE_NO_TSAN { - while (count-- > 0) { - while (semaphore_signal(m_sema) != KERN_SUCCESS) - ; - } - } -}; -#elif defined(__unix__) -//--------------------------------------------------------- -// Semaphore (POSIX, Linux) -//--------------------------------------------------------- -class Semaphore { -private: - sem_t m_sema; - - Semaphore(const Semaphore& other); - Semaphore& operator=(const Semaphore& other); - -public: - AE_NO_TSAN Semaphore(int initialCount = 0) : m_sema() { - assert(initialCount >= 0); - int rc = sem_init(&m_sema, 0, static_cast(initialCount)); - assert(rc == 0); - AE_UNUSED(rc); - } - - AE_NO_TSAN ~Semaphore() { - sem_destroy(&m_sema); - } - - bool wait() AE_NO_TSAN { - // http://stackoverflow.com/questions/2013181/gdb-causes-sem-wait-to-fail-with-eintr-error - int rc; - do { - rc = sem_wait(&m_sema); - } while (rc == -1 && errno == EINTR); - return rc == 0; - } - - bool try_wait() AE_NO_TSAN { - int rc; - do { - rc = sem_trywait(&m_sema); - } while (rc == -1 && errno == EINTR); - return rc == 0; - } - - bool timed_wait(std::uint64_t usecs) AE_NO_TSAN { - struct timespec ts; - const int usecs_in_1_sec = 1000000; - const int nsecs_in_1_sec = 1000000000; - clock_gettime(CLOCK_REALTIME, &ts); - ts.tv_sec += static_cast(usecs / usecs_in_1_sec); - ts.tv_nsec += static_cast(usecs % usecs_in_1_sec) * 1000; - // sem_timedwait bombs if you have more than 1e9 in tv_nsec - // so we have to clean things up before passing it in - if (ts.tv_nsec >= nsecs_in_1_sec) { - ts.tv_nsec -= nsecs_in_1_sec; - ++ts.tv_sec; - } - - int rc; - do { - rc = sem_timedwait(&m_sema, &ts); - } while (rc == -1 && errno == EINTR); - return rc == 0; - } - - void signal() AE_NO_TSAN { - while (sem_post(&m_sema) == -1) - ; - } - - void signal(int count) AE_NO_TSAN { - while (count-- > 0) { - while (sem_post(&m_sema) == -1) - ; - } - } -}; -#elif defined(FREERTOS) -//--------------------------------------------------------- -// Semaphore (FreeRTOS) -//--------------------------------------------------------- -class Semaphore { -private: - SemaphoreHandle_t m_sema; - - Semaphore(const Semaphore& other); - Semaphore& operator=(const Semaphore& other); - -public: - AE_NO_TSAN Semaphore(int initialCount = 0) : m_sema() { - assert(initialCount >= 0); - m_sema = xSemaphoreCreateCounting(static_cast(~0ull), - static_cast(initialCount)); - assert(m_sema); - } - - AE_NO_TSAN ~Semaphore() { - vSemaphoreDelete(m_sema); - } - - bool wait() AE_NO_TSAN { - return xSemaphoreTake(m_sema, portMAX_DELAY) == pdTRUE; - } - - bool try_wait() AE_NO_TSAN { - // Note: In an ISR context, if this causes a task to unblock, - // the caller won't know about it - if (xPortIsInsideInterrupt()) - return xSemaphoreTakeFromISR(m_sema, NULL) == pdTRUE; - return xSemaphoreTake(m_sema, 0) == pdTRUE; - } - - bool timed_wait(std::uint64_t usecs) AE_NO_TSAN { - std::uint64_t msecs = usecs / 1000; - TickType_t ticks = static_cast(msecs / portTICK_PERIOD_MS); - if (ticks == 0) - return try_wait(); - return xSemaphoreTake(m_sema, ticks) == pdTRUE; - } - - void signal() AE_NO_TSAN { - // Note: In an ISR context, if this causes a task to unblock, - // the caller won't know about it - BaseType_t rc; - if (xPortIsInsideInterrupt()) - rc = xSemaphoreGiveFromISR(m_sema, NULL); - else - rc = xSemaphoreGive(m_sema); - assert(rc == pdTRUE); - AE_UNUSED(rc); - } - - void signal(int count) AE_NO_TSAN { - while (count-- > 0) - signal(); - } -}; -#else -#error Unsupported platform! (No semaphore wrapper available) -#endif - -//--------------------------------------------------------- -// LightweightSemaphore -//--------------------------------------------------------- -class LightweightSemaphore { -public: - typedef std::make_signed::type ssize_t; - -private: - weak_atomic m_count; - Semaphore m_sema; - - bool waitWithPartialSpinning(std::int64_t timeout_usecs = -1) AE_NO_TSAN { - ssize_t oldCount; - // Is there a better way to set the initial spin count? - // If we lower it to 1000, testBenaphore becomes 15x slower on my Core i7-5930K Windows PC, - // as threads start hitting the kernel semaphore. - int spin = 1024; - while (--spin >= 0) { - if (m_count.load() > 0) { - m_count.fetch_add_acquire(-1); - return true; - } - compiler_fence(memory_order_acquire); // Prevent the compiler from collapsing the loop. - } - oldCount = m_count.fetch_add_acquire(-1); - if (oldCount > 0) - return true; - if (timeout_usecs < 0) { - if (m_sema.wait()) - return true; - } - if (timeout_usecs > 0 && m_sema.timed_wait(static_cast(timeout_usecs))) - return true; - // At this point, we've timed out waiting for the semaphore, but the - // count is still decremented indicating we may still be waiting on - // it. So we have to re-adjust the count, but only if the semaphore - // wasn't signaled enough times for us too since then. If it was, we - // need to release the semaphore too. - while (true) { - oldCount = m_count.fetch_add_release(1); - if (oldCount < 0) - return false; // successfully restored things to the way they were - // Oh, the producer thread just signaled the semaphore after all. Try again: - oldCount = m_count.fetch_add_acquire(-1); - if (oldCount > 0 && m_sema.try_wait()) - return true; - } - } - -public: - AE_NO_TSAN LightweightSemaphore(ssize_t initialCount = 0) : m_count(initialCount), m_sema() { - assert(initialCount >= 0); - } - - bool tryWait() AE_NO_TSAN { - if (m_count.load() > 0) { - m_count.fetch_add_acquire(-1); - return true; - } - return false; - } - - bool wait() AE_NO_TSAN { - return tryWait() || waitWithPartialSpinning(); - } - - bool wait(std::int64_t timeout_usecs) AE_NO_TSAN { - return tryWait() || waitWithPartialSpinning(timeout_usecs); - } - - void signal(ssize_t count = 1) AE_NO_TSAN { - assert(count >= 0); - ssize_t oldCount = m_count.fetch_add_release(count); - assert(oldCount >= -1); - if (oldCount < 0) { - m_sema.signal(1); - } - } - - std::size_t availableApprox() const AE_NO_TSAN { - ssize_t count = m_count.load(); - return count > 0 ? static_cast(count) : 0; - } -}; -} // namespace spsc_sema -} // namespace Common - -#if defined(AE_VCPP) && (_MSC_VER < 1700 || defined(__cplusplus_cli)) -#pragma warning(pop) -#ifdef __cplusplus_cli -#pragma managed(pop) -#endif -#endif