From 9c89a7589c34ea6f40af5967a7a90434bad009a8 Mon Sep 17 00:00:00 2001 From: Mihir Singh Date: Sun, 28 Jul 2024 21:02:39 -0400 Subject: [PATCH] wait_state() first draft --- .../roc_pipeline/state_tracker.cpp | 75 +++++++++++++++++-- .../roc_pipeline/state_tracker.h | 7 ++ 2 files changed, 75 insertions(+), 7 deletions(-) diff --git a/src/internal_modules/roc_pipeline/state_tracker.cpp b/src/internal_modules/roc_pipeline/state_tracker.cpp index a2e4a75b9..927eb8443 100644 --- a/src/internal_modules/roc_pipeline/state_tracker.cpp +++ b/src/internal_modules/roc_pipeline/state_tracker.cpp @@ -8,13 +8,50 @@ #include "roc_pipeline/state_tracker.h" #include "roc_core/panic.h" - namespace roc { namespace pipeline { StateTracker::StateTracker() - : active_sessions_(0) - , pending_packets_(0) { + : sem_(0) + , active_sessions_(0) + , pending_packets_(0) + , waiting_mask_(0) { +} + +// This method should block until the state becomes any of the states specified by the +// mask, or deadline expires. E.g. if mask is ACTIVE | PAUSED, it should block until state +// becomes either ACTIVE or PAUSED. (Currently only two states are used, but later more +// states will be needed). Deadline should be an absolute timestamp. + +// Questions: +// - When should the function return true vs false +bool StateTracker::wait_state(unsigned state_mask, core::nanoseconds_t deadline) { + waiting_mask_ = state_mask; + for (;;) { + // If no state is specified in state_mask, return immediately + if (state_mask == 0) { + return true; + } + + if (get_state() & state_mask) { + waiting_mask_ = 0; + return true; + } + + if (deadline >= 0 && deadline <= core::timestamp(core::ClockMonotonic)) { + waiting_mask_ = 0; + return false; + } + + if (deadline >= 0) { + if (!sem_.timed_wait(deadline)) { + waiting_mask_ = 0; + return false; + } + } else { + sem_.wait(); + } + } } sndio::DeviceState StateTracker::get_state() const { @@ -49,22 +86,46 @@ size_t StateTracker::num_sessions() const { } void StateTracker::register_session() { - active_sessions_++; + if (active_sessions_++ == 0) { + signal_state_change(); + } } void StateTracker::unregister_session() { - if (--active_sessions_ < 0) { + int prev_sessions = active_sessions_--; + if (prev_sessions == 0) { roc_panic("state tracker: unpaired register/unregister session"); + } else if (prev_sessions == 1 && pending_packets_ == 0) { + signal_state_change(); } + + // if (--active_sessions_ < 0) { + // roc_panic("state tracker: unpaired register/unregister session"); + // } } void StateTracker::register_packet() { - pending_packets_++; + if (pending_packets_++ == 0 && active_sessions_ == 0) { + signal_state_change(); + } } void StateTracker::unregister_packet() { - if (--pending_packets_ < 0) { + int prev_packets = pending_packets_--; + if (prev_packets == 0) { roc_panic("state tracker: unpaired register/unregister packet"); + } else if (prev_packets == 1 && active_sessions_ == 0) { + signal_state_change(); + } + + // if (--pending_packets_ < 0) { + // roc_panic("state tracker: unpaired register/unregister packet"); + // } +} + +void StateTracker::signal_state_change() { + if (waiting_mask_ != 0 && (get_state() & waiting_mask_)) { + sem_.post(); } } diff --git a/src/internal_modules/roc_pipeline/state_tracker.h b/src/internal_modules/roc_pipeline/state_tracker.h index 80d8367fd..c97095116 100644 --- a/src/internal_modules/roc_pipeline/state_tracker.h +++ b/src/internal_modules/roc_pipeline/state_tracker.h @@ -14,7 +14,9 @@ #include "roc_core/atomic.h" #include "roc_core/noncopyable.h" +#include "roc_core/semaphore.h" #include "roc_core/stddefs.h" +#include "roc_core/time.h" #include "roc_sndio/device_state.h" namespace roc { @@ -32,6 +34,8 @@ class StateTracker : public core::NonCopyable<> { //! Initialize all counters to zero. StateTracker(); + bool wait_state(unsigned state_mask, core::nanoseconds_t deadline); + //! Compute current state. sndio::DeviceState get_state() const; @@ -57,9 +61,12 @@ class StateTracker : public core::NonCopyable<> { void unregister_packet(); private: + core::Semaphore sem_; core::Atomic is_broken_; core::Atomic active_sessions_; core::Atomic pending_packets_; + core::Atomic waiting_mask_; + void signal_state_change(); }; } // namespace pipeline