diff --git a/src/internal_modules/roc_audio/depacketizer.h b/src/internal_modules/roc_audio/depacketizer.h index a857a47ba..28fef14a5 100644 --- a/src/internal_modules/roc_audio/depacketizer.h +++ b/src/internal_modules/roc_audio/depacketizer.h @@ -75,11 +75,6 @@ struct DepacketizerMetrics { class Depacketizer : public IFrameReader, public core::NonCopyable<> { public: //! Initialization. - //! - //! @b Parameters - //! - @p packet_reader is used to read packets - //! - @p payload_decoder is used to extract samples from packets - //! - @p sample_spec describes output frames Depacketizer(packet::IReader& packet_reader, IFrameDecoder& payload_decoder, FrameFactory& frame_factory, diff --git a/src/internal_modules/roc_audio/fanout.cpp b/src/internal_modules/roc_audio/fanout.cpp index 2844c8dd0..39c2f0740 100644 --- a/src/internal_modules/roc_audio/fanout.cpp +++ b/src/internal_modules/roc_audio/fanout.cpp @@ -14,8 +14,11 @@ namespace roc { namespace audio { -Fanout::Fanout(const SampleSpec& sample_spec) - : sample_spec_(sample_spec) +Fanout::Fanout(FrameFactory& frame_factory, + core::IArena& arena, + const SampleSpec& sample_spec) + : outputs_(arena) + , sample_spec_(sample_spec) , init_status_(status::NoStatus) { roc_panic_if_msg(!sample_spec_.is_valid(), "fanout: required valid sample spec: %s", sample_spec_to_str(sample_spec_).c_str()); @@ -30,19 +33,53 @@ status::StatusCode Fanout::init_status() const { bool Fanout::has_output(IFrameWriter& writer) { roc_panic_if(init_status_ != status::StatusOK); - return frame_writers_.contains(writer); + for (size_t no = 0; no < outputs_.size(); no++) { + if (outputs_[no].writer == &writer) { + return true; + } + } + + return false; } -void Fanout::add_output(IFrameWriter& writer) { +status::StatusCode Fanout::add_output(IFrameWriter& writer) { roc_panic_if(init_status_ != status::StatusOK); - frame_writers_.push_back(writer); + Output output; + output.writer = &writer; + + if (!outputs_.push_back(output)) { + roc_log(LogError, "fanout: can't add output: allocation failed"); + return status::StatusNoMem; + } + + return status::StatusOK; } void Fanout::remove_output(IFrameWriter& writer) { roc_panic_if(init_status_ != status::StatusOK); - frame_writers_.remove(writer); + size_t rm_idx = (size_t)-1; + + for (size_t no = 0; no < outputs_.size(); no++) { + if (outputs_[no].writer == &writer) { + rm_idx = no; + break; + } + } + + if (rm_idx == (size_t)-1) { + roc_panic("fanout: can't remove output: writer not found"); + } + + // Remove from array. + for (size_t no = rm_idx + 1; no < outputs_.size(); no++) { + outputs_[no - 1] = outputs_[no]; + } + + if (!outputs_.resize(outputs_.size() - 1)) { + roc_panic("fanout: can't remove output: resize failed"); + } } status::StatusCode Fanout::write(Frame& in_frame) { @@ -50,17 +87,26 @@ status::StatusCode Fanout::write(Frame& in_frame) { sample_spec_.validate_frame(in_frame); - for (IFrameWriter* writer = frame_writers_.front(); writer != NULL; - writer = frame_writers_.nextof(*writer)) { - const status::StatusCode code = writer->write(in_frame); + for (size_t no = 0; no < outputs_.size(); no++) { + Output& output = outputs_[no]; + + if (output.is_finished) { + continue; + } + + const status::StatusCode code = output.writer->write(in_frame); + + if (code == status::StatusFinish) { + // From now on, skip this writer until it's removed. + output.is_finished = true; + continue; + } if (code != status::StatusOK) { // These codes can be returned only from read(). - roc_panic_if_msg( - code == status::StatusPart || code == status::StatusDrain, - "fanout loop: unexpected status from write operation: status=%s", - status::code_to_str(code)); - + roc_panic_if_msg(code == status::StatusPart || code == status::StatusDrain, + "fanout: unexpected status from write operation: status=%s", + status::code_to_str(code)); return code; } } diff --git a/src/internal_modules/roc_audio/fanout.h b/src/internal_modules/roc_audio/fanout.h index fcf18856b..71c341c60 100644 --- a/src/internal_modules/roc_audio/fanout.h +++ b/src/internal_modules/roc_audio/fanout.h @@ -14,9 +14,9 @@ #include "roc_audio/frame_factory.h" #include "roc_audio/iframe_writer.h" -#include "roc_audio/sample.h" #include "roc_audio/sample_spec.h" -#include "roc_core/list.h" +#include "roc_core/array.h" +#include "roc_core/iarena.h" #include "roc_core/noncopyable.h" namespace roc { @@ -26,24 +26,30 @@ namespace audio { //! //! Duplicates audio stream to multiple output writers. //! -//! Since StatusPart and StatusDrain are not allowed for write operations -//! Fanout does not need any special handling, unlike Mixer. +//! Features: +//! - Since StatusPart and StatusDrain are not allowed for write operations, +//! fanout does not need any special handling for them. +//! +//! - If pipeline element reports end-of-stream (StatusFinish), fanout skips this +//! element until it's removed. class Fanout : public IFrameWriter, public core::NonCopyable<> { public: //! Initialize. - Fanout(const SampleSpec& sample_spec); + Fanout(FrameFactory& frame_factory, + core::IArena& arena, + const SampleSpec& sample_spec); //! Check if the object was successfully constructed. status::StatusCode init_status() const; //! Check if writer is already added. - bool has_output(IFrameWriter&); + bool has_output(IFrameWriter& writer); //! Add output writer. - void add_output(IFrameWriter&); + ROC_ATTR_NODISCARD status::StatusCode add_output(IFrameWriter& writer); //! Remove output writer. - void remove_output(IFrameWriter&); + void remove_output(IFrameWriter& writer); //! Write audio frame. //! @remarks @@ -51,8 +57,19 @@ class Fanout : public IFrameWriter, public core::NonCopyable<> { virtual ROC_ATTR_NODISCARD status::StatusCode write(Frame& frame); private: - core::List frame_writers_; + struct Output { + // to where to write samples, typically sender session + IFrameWriter* writer; + // if true, output returned StatusFinish and should not be used + bool is_finished; + + Output() + : writer(NULL) + , is_finished(false) { + } + }; + core::Array outputs_; const SampleSpec sample_spec_; status::StatusCode init_status_; diff --git a/src/internal_modules/roc_audio/mixer.cpp b/src/internal_modules/roc_audio/mixer.cpp index 10d0b3fd5..c9a846447 100644 --- a/src/internal_modules/roc_audio/mixer.cpp +++ b/src/internal_modules/roc_audio/mixer.cpp @@ -48,6 +48,18 @@ status::StatusCode Mixer::init_status() const { return init_status_; } +bool Mixer::has_input(IFrameReader& reader) { + roc_panic_if(init_status_ != status::StatusOK); + + for (size_t ni = 0; ni < inputs_.size(); ni++) { + if (inputs_[ni].reader == &reader) { + return true; + } + } + + return false; +} + ROC_ATTR_NODISCARD status::StatusCode Mixer::add_input(IFrameReader& reader) { roc_panic_if(init_status_ != status::StatusOK); @@ -90,8 +102,8 @@ void Mixer::remove_input(IFrameReader& reader) { } // Remove from array. - for (size_t n = rm_idx + 1; n < inputs_.size(); n++) { - inputs_[n - 1] = inputs_[n]; + for (size_t ni = rm_idx + 1; ni < inputs_.size(); ni++) { + inputs_[ni - 1] = inputs_[ni]; } if (!inputs_.resize(inputs_.size() - 1)) { @@ -306,8 +318,8 @@ Mixer::mix_one_(Input& input, sample_t* mix_data, size_t mix_size, FrameReadMode roc_panic_if(input.n_mixed % sample_spec_.num_channels() != 0); roc_panic_if(mix_size % sample_spec_.num_channels() != 0); - // If input returned StatusEnd, don't call it anymore. - if (input.ended && input.n_mixed < mix_size) { + // If input returned StatusFinish, don't call it anymore. + if (input.is_finished && input.n_mixed < mix_size) { input.n_mixed = mix_size; } @@ -316,7 +328,7 @@ Mixer::mix_one_(Input& input, sample_t* mix_data, size_t mix_size, FrameReadMode // We stop when one of the following happens: // - we have fully filled requested buffer // - we got StatusDrain, which means that soft read stopped early - // - we got StatusEnd, which means that reader is terminating + // - we got StatusFinish, which means that reader is terminating // - we got an error (any other status), which means that the whole mixer fails while (input.n_mixed < mix_size) { const packet::stream_timestamp_t remained_duration = packet::stream_timestamp_t( @@ -334,10 +346,10 @@ Mixer::mix_one_(Input& input, sample_t* mix_data, size_t mix_size, FrameReadMode const status::StatusCode code = input.reader->read(*in_frame_, capped_duration, mode); - if (code == status::StatusEnd) { + if (code == status::StatusFinish) { // Stream ended and will be removed soon, pad it with zeros until that. input.n_mixed = mix_size; - input.ended = true; + input.is_finished = true; break; } diff --git a/src/internal_modules/roc_audio/mixer.h b/src/internal_modules/roc_audio/mixer.h index d63adf59c..d78033420 100644 --- a/src/internal_modules/roc_audio/mixer.h +++ b/src/internal_modules/roc_audio/mixer.h @@ -14,12 +14,10 @@ #include "roc_audio/frame_factory.h" #include "roc_audio/iframe_reader.h" -#include "roc_audio/sample.h" #include "roc_audio/sample_spec.h" +#include "roc_core/array.h" #include "roc_core/iarena.h" -#include "roc_core/list.h" #include "roc_core/noncopyable.h" -#include "roc_core/time.h" namespace roc { namespace audio { @@ -41,6 +39,9 @@ namespace audio { //! (In other words, StatusPart and StatusDrain never leave mixer. Mixer //! always returns as much samples as requested). //! +//! - If pipeline element reports end-of-stream (StatusFinish), mixer skips this +//! element until it's removed. +//! //! - If timestamps are enabled, mixer computes capture timestamp of output //! frame as the average capture timestamps of all mixed input frames. //! @@ -58,11 +59,14 @@ class Mixer : public IFrameReader, public core::NonCopyable<> { //! Check if the object was successfully constructed. status::StatusCode init_status() const; + //! Check if reader is already added. + bool has_input(IFrameReader& reader); + //! Add input reader. - ROC_ATTR_NODISCARD status::StatusCode add_input(IFrameReader&); + ROC_ATTR_NODISCARD status::StatusCode add_input(IFrameReader& reader); //! Remove input reader. - void remove_input(IFrameReader&); + void remove_input(IFrameReader& reader); //! Read audio frame. //! @remarks @@ -82,14 +86,14 @@ class Mixer : public IFrameReader, public core::NonCopyable<> { size_t n_mixed; // capture timestamp of first sample in mix_frame_ core::nanoseconds_t cts; - // if true, input returned StatusEnd and should not be used - bool ended; + // if true, input returned StatusFinish and should not be used + bool is_finished; Input() : reader(NULL) , n_mixed(0) , cts(0) - , ended(false) { + , is_finished(false) { } }; diff --git a/src/internal_modules/roc_audio/packetizer.h b/src/internal_modules/roc_audio/packetizer.h index aa2d5b039..e718c2a79 100644 --- a/src/internal_modules/roc_audio/packetizer.h +++ b/src/internal_modules/roc_audio/packetizer.h @@ -50,16 +50,6 @@ struct PacketizerMetrics { class Packetizer : public IFrameWriter, public core::NonCopyable<> { public: //! Initialization. - //! - //! @b Parameters - //! - @p writer is used to write generated packets - //! - @p composer is used to initialize new packets - //! - @p sequencer is used to put packets in sequence - //! - @p payload_encoder is used to write samples to packets - //! - @p packet_factory is used to allocate packets - //! - @p buffer_factory is used to allocate buffers for packets - //! - @p packet_length defines packet length in nanoseconds - //! - @p sample_spec describes input frames Packetizer(packet::IWriter& writer, packet::IComposer& composer, packet::ISequencer& sequencer, diff --git a/src/internal_modules/roc_audio/processor_map.cpp b/src/internal_modules/roc_audio/processor_map.cpp index b61b00d18..9f62295dd 100644 --- a/src/internal_modules/roc_audio/processor_map.cpp +++ b/src/internal_modules/roc_audio/processor_map.cpp @@ -149,7 +149,7 @@ ProcessorMap::register_plc(int backend_id, void* backend_owner, PlcFunc ctor_fn) "processor map: failed to register plc backend:" " backend id %d already exists", backend_id); - return status::StatusConflict; + return status::StatusBadArg; } if (!backend_owner) { diff --git a/src/internal_modules/roc_pipeline/receiver_loop.cpp b/src/internal_modules/roc_pipeline/receiver_loop.cpp index c78e0d99c..5706bde6a 100644 --- a/src/internal_modules/roc_pipeline/receiver_loop.cpp +++ b/src/internal_modules/roc_pipeline/receiver_loop.cpp @@ -219,24 +219,27 @@ status::StatusCode ReceiverLoop::read(audio::Frame& frame, core::Mutex::Lock lock(source_mutex_); + if (source_.state() == sndio::DeviceState_Broken) { + // Don't go to sleep if we're broke. + return status::StatusBadState; + } + if (ticker_) { ticker_->wait(ticker_ts_); } - // invokes process_subframe_imp() and process_task_imp() + // Invokes process_subframe_imp() and process_task_imp(). const status::StatusCode code = process_subframes_and_tasks(frame, duration, mode); roc_panic_if_msg(code <= status::NoStatus || code >= status::MaxStatus, "receiver loop: invalid status code %d", code); - if (code != status::StatusOK && code != status::StatusPart) { - return code; - } - - ticker_ts_ += frame.duration(); + if (code == status::StatusOK || code == status::StatusPart) { + ticker_ts_ += frame.duration(); - if (auto_reclock_) { - source_.reclock(core::timestamp(core::ClockUnix)); + if (auto_reclock_) { + source_.reclock(core::timestamp(core::ClockUnix)); + } } return code; diff --git a/src/internal_modules/roc_pipeline/receiver_session.cpp b/src/internal_modules/roc_pipeline/receiver_session.cpp index 1d7feef84..122deabe6 100644 --- a/src/internal_modules/roc_pipeline/receiver_session.cpp +++ b/src/internal_modules/roc_pipeline/receiver_session.cpp @@ -288,17 +288,12 @@ audio::IFrameReader& ReceiverSession::frame_reader() { return *this; } -status::StatusCode ReceiverSession::route_packet(const packet::PacketPtr& packet) { - roc_panic_if(init_status_ != status::StatusOK); - - return packet_router_->write(packet); -} - status::StatusCode ReceiverSession::refresh(core::nanoseconds_t current_time, core::nanoseconds_t& next_deadline) { roc_panic_if(init_status_ != status::StatusOK); if (fail_status_ != status::NoStatus) { + // Report remembered error code. return fail_status_; } @@ -308,9 +303,48 @@ status::StatusCode ReceiverSession::refresh(core::nanoseconds_t current_time, void ReceiverSession::reclock(core::nanoseconds_t playback_time) { roc_panic_if(init_status_ != status::StatusOK); + if (fail_status_ != status::NoStatus) { + // Session broken. + return; + } + latency_monitor_->reclock(playback_time); } +status::StatusCode ReceiverSession::route_packet(const packet::PacketPtr& packet) { + roc_panic_if(init_status_ != status::StatusOK); + + if (fail_status_ != status::NoStatus) { + // Session broken. + return status::StatusNoRoute; + } + + return packet_router_->write(packet); +} + +status::StatusCode ReceiverSession::read(audio::Frame& frame, + packet::stream_timestamp_t duration, + audio::FrameReadMode mode) { + roc_panic_if(init_status_ != status::StatusOK); + + if (fail_status_ != status::NoStatus) { + // Session broken. + return status::StatusFinish; + } + + const status::StatusCode code = frame_reader_->read(frame, duration, mode); + + // On failure, mark session broken and return StatusFinish to be excluded from mixer. + // Error will be reported later from refresh(). + if (code != status::StatusOK && code != status::StatusPart + && code != status::StatusDrain) { + fail_status_ = code; + return status::StatusFinish; + } + + return code; +} + size_t ReceiverSession::num_reports() const { roc_panic_if(init_status_ != status::StatusOK); @@ -409,34 +443,5 @@ ReceiverParticipantMetrics ReceiverSession::get_metrics() const { return metrics; } -status::StatusCode ReceiverSession::read(audio::Frame& frame, - packet::stream_timestamp_t duration, - audio::FrameReadMode mode) { - roc_panic_if(init_status_ != status::StatusOK); - - if (fail_status_ != status::NoStatus) { - // Failure happened, and session will be removed soon. Until that, - // always return StatusEnd to be excluded from mixing. - return status::StatusEnd; - } - - const status::StatusCode code = frame_reader_->read(frame, duration, mode); - - roc_panic_if_msg(code <= status::NoStatus || code >= status::MaxStatus, - "receiver session: invalid status code %d", code); - - // Failure happened. Remember error to return it from next refresh() call. - // Return StatusEnd to be excluded from mixing. - // We don't return error from read() because we don't want the whole - // receiver to fail, we just need to remove one session. - if (code != status::StatusOK && code != status::StatusPart - && code != status::StatusDrain) { - fail_status_ = code; - return status::StatusEnd; - } - - return code; -} - } // namespace pipeline } // namespace roc diff --git a/src/internal_modules/roc_pipeline/receiver_session.h b/src/internal_modules/roc_pipeline/receiver_session.h index 22d34ca42..5441b82fd 100644 --- a/src/internal_modules/roc_pipeline/receiver_session.h +++ b/src/internal_modules/roc_pipeline/receiver_session.h @@ -83,13 +83,6 @@ class ReceiverSession : public core::RefCountedhas_flags(packet::Packet::FlagControl)) { - return route_control_packet_(packet, current_time); - } - - return route_transport_packet_(packet); -} - status::StatusCode ReceiverSessionGroup::refresh_sessions(core::nanoseconds_t current_time, core::nanoseconds_t& next_deadline) { @@ -123,7 +112,7 @@ ReceiverSessionGroup::refresh_sessions(core::nanoseconds_t current_time, const status::StatusCode code = curr_sess->refresh(current_time, sess_deadline); // These errors break only session, but not the whole receiver. - if (code == status::StatusEnd || code == status::StatusAbort) { + if (code == status::StatusFinish || code == status::StatusAbort) { remove_session_(curr_sess, code); continue; } @@ -153,6 +142,17 @@ void ReceiverSessionGroup::reclock_sessions(core::nanoseconds_t playback_time) { } } +status::StatusCode ReceiverSessionGroup::route_packet(const packet::PacketPtr& packet, + core::nanoseconds_t current_time) { + roc_panic_if(init_status_ != status::StatusOK); + + if (packet->has_flags(packet::Packet::FlagControl)) { + return route_control_packet_(packet, current_time); + } + + return route_transport_packet_(packet); +} + size_t ReceiverSessionGroup::num_sessions() const { roc_panic_if(init_status_ != status::StatusOK); diff --git a/src/internal_modules/roc_pipeline/receiver_session_group.h b/src/internal_modules/roc_pipeline/receiver_session_group.h index fa33c3ab5..b0479c357 100644 --- a/src/internal_modules/roc_pipeline/receiver_session_group.h +++ b/src/internal_modules/roc_pipeline/receiver_session_group.h @@ -75,10 +75,6 @@ class ReceiverSessionGroup : public core::NonCopyable<>, private rtcp::IParticip ROC_ATTR_NODISCARD status::StatusCode create_control_pipeline(ReceiverEndpoint* control_endpoint); - //! Route packet to session. - ROC_ATTR_NODISCARD status::StatusCode route_packet(const packet::PacketPtr& packet, - core::nanoseconds_t current_time); - //! Refresh pipeline according to current time. //! @remarks //! Should be invoked before reading each frame. @@ -94,6 +90,10 @@ class ReceiverSessionGroup : public core::NonCopyable<>, private rtcp::IParticip //! retrieved from pipeline will be actually played on sink void reclock_sessions(core::nanoseconds_t playback_time); + //! Route packet to session. + ROC_ATTR_NODISCARD status::StatusCode route_packet(const packet::PacketPtr& packet, + core::nanoseconds_t current_time); + //! Get number of sessions in group. size_t num_sessions() const; diff --git a/src/internal_modules/roc_pipeline/receiver_source.cpp b/src/internal_modules/roc_pipeline/receiver_source.cpp index a0e3fe369..03597e4c0 100644 --- a/src/internal_modules/roc_pipeline/receiver_source.cpp +++ b/src/internal_modules/roc_pipeline/receiver_source.cpp @@ -98,6 +98,11 @@ status::StatusCode ReceiverSource::init_status() const { ReceiverSlot* ReceiverSource::create_slot(const ReceiverSlotConfig& slot_config) { roc_panic_if(init_status_ != status::StatusOK); + if (state_tracker_.is_broken()) { + // TODO(gh-183): return StatusBadState (control ops) + return NULL; + } + roc_log(LogInfo, "receiver source: adding slot"); core::SharedPtr slot = new (arena_) ReceiverSlot( @@ -125,6 +130,11 @@ ReceiverSlot* ReceiverSource::create_slot(const ReceiverSlotConfig& slot_config) void ReceiverSource::delete_slot(ReceiverSlot* slot) { roc_panic_if(init_status_ != status::StatusOK); + if (state_tracker_.is_broken()) { + // TODO(gh-183): return StatusBadState (control ops) + return; + } + roc_log(LogInfo, "receiver source: removing slot"); slots_.remove(*slot); @@ -138,6 +148,11 @@ status::StatusCode ReceiverSource::refresh(core::nanoseconds_t current_time, core::nanoseconds_t* next_deadline) { roc_panic_if(init_status_ != status::StatusOK); + if (state_tracker_.is_broken()) { + // Receiver broken. + return status::StatusBadState; + } + roc_panic_if_msg(current_time <= 0, "receiver source: invalid timestamp:" " expected positive value, got %lld", @@ -151,6 +166,7 @@ status::StatusCode ReceiverSource::refresh(core::nanoseconds_t current_time, if (code != status::StatusOK) { roc_log(LogError, "receiver source: failed to refresh slot: status=%s", status::code_to_str(code)); + state_tracker_.set_broken(); return code; } @@ -189,10 +205,20 @@ sndio::DeviceState ReceiverSource::state() const { } status::StatusCode ReceiverSource::pause() { + if (state_tracker_.is_broken()) { + // Receiver broken. + return status::StatusBadState; + } + return status::StatusOK; } status::StatusCode ReceiverSource::resume() { + if (state_tracker_.is_broken()) { + // Receiver broken. + return status::StatusBadState; + } + return status::StatusOK; } @@ -231,12 +257,18 @@ status::StatusCode ReceiverSource::read(audio::Frame& frame, audio::FrameReadMode mode) { roc_panic_if(init_status_ != status::StatusOK); + if (state_tracker_.is_broken()) { + // Receiver broken. + return status::StatusBadState; + } + const status::StatusCode code = frame_reader_->read(frame, duration, mode); if (code != status::StatusOK && code != status::StatusPart && code != status::StatusDrain) { roc_log(LogError, "receiver source: failed to read frame: status=%s", status::code_to_str(code)); + state_tracker_.set_broken(); } return code; diff --git a/src/internal_modules/roc_pipeline/sender_loop.cpp b/src/internal_modules/roc_pipeline/sender_loop.cpp index 34bef9128..ba701c6b9 100644 --- a/src/internal_modules/roc_pipeline/sender_loop.cpp +++ b/src/internal_modules/roc_pipeline/sender_loop.cpp @@ -206,12 +206,17 @@ status::StatusCode SenderLoop::write(audio::Frame& frame) { core::Mutex::Lock lock(sink_mutex_); + if (sink_.state() == sndio::DeviceState_Broken) { + // Don't go to sleep if we're broke. + return status::StatusBadState; + } + if (ticker_) { ticker_->wait(ticker_ts_); ticker_ts_ += frame.duration(); } - // invokes process_subframe_imp() and process_task_imp() + // Invokes process_subframe_imp() and process_task_imp(). const status::StatusCode code = process_subframes_and_tasks(frame, frame.duration(), audio::ModeHard); diff --git a/src/internal_modules/roc_pipeline/sender_session.cpp b/src/internal_modules/roc_pipeline/sender_session.cpp index 4392f0a6b..2b86688ee 100644 --- a/src/internal_modules/roc_pipeline/sender_session.cpp +++ b/src/internal_modules/roc_pipeline/sender_session.cpp @@ -260,22 +260,12 @@ audio::IFrameWriter* SenderSession::frame_writer() { return this; } -status::StatusCode SenderSession::route_packet(const packet::PacketPtr& packet, - core::nanoseconds_t current_time) { - roc_panic_if(init_status_ != status::StatusOK); - - if (packet->has_flags(packet::Packet::FlagControl)) { - return route_control_packet_(packet, current_time); - } - - roc_panic("sender session: unexpected non-control packet"); -} - status::StatusCode SenderSession::refresh(core::nanoseconds_t current_time, core::nanoseconds_t& next_deadline) { roc_panic_if(init_status_ != status::StatusOK); if (fail_status_ != status::NoStatus) { + // Report remembered error code. return fail_status_; } @@ -293,6 +283,46 @@ status::StatusCode SenderSession::refresh(core::nanoseconds_t current_time, return status::StatusOK; } +status::StatusCode SenderSession::route_packet(const packet::PacketPtr& packet, + core::nanoseconds_t current_time) { + roc_panic_if(init_status_ != status::StatusOK); + + if (fail_status_ != status::NoStatus) { + // Session broken. + return status::StatusNoRoute; + } + + if (!packet->has_flags(packet::Packet::FlagControl)) { + roc_panic("sender session: unexpected non-control packet"); + } + + return route_control_packet_(packet, current_time); +} + +status::StatusCode SenderSession::write(audio::Frame& frame) { + roc_panic_if(init_status_ != status::StatusOK); + + if (fail_status_ != status::NoStatus) { + // Session broken. + return status::StatusFinish; + } + + const status::StatusCode code = frame_writer_->write(frame); + + // On failure, mark session broken and return StatusFinish to be excluded from fanout. + // Error will be reported later from refresh(). + if (code != status::StatusOK) { + // These codes can't be returned from write(). + roc_panic_if_msg(code == status::StatusPart || code == status::StatusDrain, + "sender session: unexpected status code %s", + status::code_to_str(code)); + fail_status_ = code; + return status::StatusFinish; + } + + return code; +} + void SenderSession::get_slot_metrics(SenderSlotMetrics& slot_metrics) const { roc_panic_if(init_status_ != status::StatusOK); @@ -422,29 +452,5 @@ SenderSession::route_control_packet_(const packet::PacketPtr& packet, return rtcp_communicator_->process_packet(packet, current_time); } -status::StatusCode SenderSession::write(audio::Frame& frame) { - roc_panic_if(init_status_ != status::StatusOK); - - if (fail_status_ != status::NoStatus) { - // Failure happened, and session will be removed soon. Until that, - // always return StatusOK and do nothing. - return status::StatusOK; - } - - const status::StatusCode code = frame_writer_->write(frame); - - roc_panic_if_msg(code <= status::NoStatus || code >= status::MaxStatus, - "sender session: invalid status code %d", code); - - // If error happens, save it to return later from refresh(), which allows - // SenderSlot to handle it. - if (code != status::StatusOK) { - fail_status_ = code; - return status::StatusOK; - } - - return code; -} - } // namespace pipeline } // namespace roc diff --git a/src/internal_modules/roc_pipeline/sender_session.h b/src/internal_modules/roc_pipeline/sender_session.h index 8de48888b..5b9be7efe 100644 --- a/src/internal_modules/roc_pipeline/sender_session.h +++ b/src/internal_modules/roc_pipeline/sender_session.h @@ -82,14 +82,6 @@ class SenderSession : public core::NonCopyable<>, //! etc, happens during the write operation. audio::IFrameWriter* frame_writer(); - //! Route a packet to the session. - //! @remarks - //! This way feedback packets from receiver reach sender pipeline. - //! Packets are stored inside internal pipeline queues, and then fetched - //! when frame are passed from frame_writer(). - ROC_ATTR_NODISCARD status::StatusCode route_packet(const packet::PacketPtr& packet, - core::nanoseconds_t current_time); - //! Refresh pipeline according to current time. //! @remarks //! Should be invoked before reading each frame. @@ -98,6 +90,14 @@ class SenderSession : public core::NonCopyable<>, ROC_ATTR_NODISCARD status::StatusCode refresh(core::nanoseconds_t current_time, core::nanoseconds_t& next_deadline); + //! Route a packet to the session. + //! @remarks + //! This way feedback packets from receiver reach sender pipeline. + //! Packets are stored inside internal pipeline queues, and then fetched + //! when frame are passed from frame_writer(). + ROC_ATTR_NODISCARD status::StatusCode route_packet(const packet::PacketPtr& packet, + core::nanoseconds_t current_time); + //! Get slot metrics. //! @remarks //! These metrics are for the whole slot. diff --git a/src/internal_modules/roc_pipeline/sender_sink.cpp b/src/internal_modules/roc_pipeline/sender_sink.cpp index cc4477c90..d313ce163 100644 --- a/src/internal_modules/roc_pipeline/sender_sink.cpp +++ b/src/internal_modules/roc_pipeline/sender_sink.cpp @@ -40,7 +40,7 @@ SenderSink::SenderSink(const SenderSinkConfig& sink_config, audio::Sample_RawFormat, sink_config_.input_sample_spec.channel_set()); - fanout_.reset(new (fanout_) audio::Fanout(inout_spec)); + fanout_.reset(new (fanout_) audio::Fanout(frame_factory_, arena_, inout_spec)); if ((init_status_ = fanout_->init_status()) != status::StatusOK) { return; } @@ -87,6 +87,11 @@ status::StatusCode SenderSink::init_status() const { SenderSlot* SenderSink::create_slot(const SenderSlotConfig& slot_config) { roc_panic_if(init_status_ != status::StatusOK); + if (state_tracker_.is_broken()) { + // TODO(gh-183): return StatusBadState (control ops) + return NULL; + } + roc_log(LogInfo, "sender sink: adding slot"); core::SharedPtr slot = new (arena_) SenderSlot( @@ -115,6 +120,11 @@ SenderSlot* SenderSink::create_slot(const SenderSlotConfig& slot_config) { void SenderSink::delete_slot(SenderSlot* slot) { roc_panic_if(init_status_ != status::StatusOK); + if (state_tracker_.is_broken()) { + // TODO(gh-183): return StatusBadState (control ops) + return; + } + roc_log(LogInfo, "sender sink: removing slot"); slots_.remove(*slot); @@ -130,6 +140,11 @@ status::StatusCode SenderSink::refresh(core::nanoseconds_t current_time, core::nanoseconds_t* next_deadline) { roc_panic_if(init_status_ != status::StatusOK); + if (state_tracker_.is_broken()) { + // Sender broken. + return status::StatusBadState; + } + roc_panic_if_msg(current_time <= 0, "sender sink: invalid timestamp:" " expected positive value, got %lld", @@ -143,6 +158,7 @@ status::StatusCode SenderSink::refresh(core::nanoseconds_t current_time, if (code != status::StatusOK) { roc_log(LogError, "sender sink: failed to refresh slot: status=%s", status::code_to_str(code)); + state_tracker_.set_broken(); return code; } @@ -181,10 +197,20 @@ sndio::DeviceState SenderSink::state() const { } status::StatusCode SenderSink::pause() { + if (state_tracker_.is_broken()) { + // Sender broken. + return status::StatusBadState; + } + return status::StatusOK; } status::StatusCode SenderSink::resume() { + if (state_tracker_.is_broken()) { + // Sender broken. + return status::StatusBadState; + } + return status::StatusOK; } @@ -203,11 +229,17 @@ bool SenderSink::has_clock() const { status::StatusCode SenderSink::write(audio::Frame& frame) { roc_panic_if(init_status_ != status::StatusOK); + if (state_tracker_.is_broken()) { + // Sender broken. + return status::StatusBadState; + } + const status::StatusCode code = frame_writer_->write(frame); if (code != status::StatusOK) { roc_log(LogError, "sender sink: failed to write frame: status=%s", status::code_to_str(code)); + state_tracker_.set_broken(); } return code; diff --git a/src/internal_modules/roc_pipeline/sender_slot.cpp b/src/internal_modules/roc_pipeline/sender_slot.cpp index 9ac3e0e6b..21ce20c15 100644 --- a/src/internal_modules/roc_pipeline/sender_slot.cpp +++ b/src/internal_modules/roc_pipeline/sender_slot.cpp @@ -109,7 +109,10 @@ SenderEndpoint* SenderSlot::add_endpoint(address::Interface iface, } if (session_.frame_writer()) { if (!fanout_.has_output(*session_.frame_writer())) { - fanout_.add_output(*session_.frame_writer()); + if (fanout_.add_output(*session_.frame_writer()) != status::StatusOK) { + // TODO(gh-183): forward status (control ops) + return NULL; + } state_tracker_.register_session(); } } diff --git a/src/internal_modules/roc_pipeline/state_tracker.cpp b/src/internal_modules/roc_pipeline/state_tracker.cpp index 50e649aee..a2e4a75b9 100644 --- a/src/internal_modules/roc_pipeline/state_tracker.cpp +++ b/src/internal_modules/roc_pipeline/state_tracker.cpp @@ -18,6 +18,10 @@ StateTracker::StateTracker() } sndio::DeviceState StateTracker::get_state() const { + if (is_broken_) { + return sndio::DeviceState_Broken; + } + if (active_sessions_ != 0) { // We have sessions and they're producing some sound. return sndio::DeviceState_Active; @@ -32,6 +36,14 @@ sndio::DeviceState StateTracker::get_state() const { return sndio::DeviceState_Idle; } +bool StateTracker::is_broken() const { + return is_broken_; +} + +void StateTracker::set_broken() { + is_broken_ = true; +} + size_t StateTracker::num_sessions() const { return (size_t)active_sessions_; } diff --git a/src/internal_modules/roc_pipeline/state_tracker.h b/src/internal_modules/roc_pipeline/state_tracker.h index 5bb4d938c..80d8367fd 100644 --- a/src/internal_modules/roc_pipeline/state_tracker.h +++ b/src/internal_modules/roc_pipeline/state_tracker.h @@ -35,6 +35,12 @@ class StateTracker : public core::NonCopyable<> { //! Compute current state. sndio::DeviceState get_state() const; + //! Check if sender/receiver was marked as broken. + bool is_broken() const; + + //! Mark sender/receiver as permanently broken. + void set_broken(); + //! Get active sessions counter. size_t num_sessions() const; @@ -51,6 +57,7 @@ class StateTracker : public core::NonCopyable<> { void unregister_packet(); private: + core::Atomic is_broken_; core::Atomic active_sessions_; core::Atomic pending_packets_; }; diff --git a/src/internal_modules/roc_rtp/encoding_map.cpp b/src/internal_modules/roc_rtp/encoding_map.cpp index b95df07c4..1e63d8bf2 100644 --- a/src/internal_modules/roc_rtp/encoding_map.cpp +++ b/src/internal_modules/roc_rtp/encoding_map.cpp @@ -97,7 +97,7 @@ status::StatusCode EncodingMap::register_encoding(Encoding enc) { "encoding map: failed to register encoding:" " encoding id %u already exists", enc.payload_type); - return status::StatusConflict; + return status::StatusBadArg; } core::SharedPtr node = new (node_pool_) Node(node_pool_, enc); diff --git a/src/internal_modules/roc_sndio/device_state.cpp b/src/internal_modules/roc_sndio/device_state.cpp index 0c825c11b..6611ce7be 100644 --- a/src/internal_modules/roc_sndio/device_state.cpp +++ b/src/internal_modules/roc_sndio/device_state.cpp @@ -22,6 +22,9 @@ const char* device_state_to_str(DeviceState state) { case DeviceState_Paused: return "paused"; + case DeviceState_Broken: + return "broken"; + default: break; } diff --git a/src/internal_modules/roc_sndio/device_state.h b/src/internal_modules/roc_sndio/device_state.h index a98547d37..06904341c 100644 --- a/src/internal_modules/roc_sndio/device_state.h +++ b/src/internal_modules/roc_sndio/device_state.h @@ -27,7 +27,11 @@ enum DeviceState { //! Device is paused. //! It's not producing anything. - DeviceState_Paused = (1 << 2) + DeviceState_Paused = (1 << 2), + + //! Device is broken. + //! The only thing can be done is to close device. + DeviceState_Broken = (1 << 3) }; //! Convert device state to string. diff --git a/src/internal_modules/roc_sndio/pump.cpp b/src/internal_modules/roc_sndio/pump.cpp index ca1b855f6..8275916c3 100644 --- a/src/internal_modules/roc_sndio/pump.cpp +++ b/src/internal_modules/roc_sndio/pump.cpp @@ -68,7 +68,7 @@ status::StatusCode Pump::run() { roc_panic_if_msg(code <= status::NoStatus || code >= status::MaxStatus, "pump: invalid status code %d", code); - if (code == status::StatusEnd) { + if (code == status::StatusFinish) { code = status::StatusOK; // EOF is fine } @@ -97,7 +97,7 @@ status::StatusCode Pump::next_() { if (mode_ == ModeOneshot && was_active_) { roc_log(LogInfo, "pump: main source became inactive in oneshot mode, exiting"); - return status::StatusEnd; + return status::StatusFinish; } // User specified --backup, when main source becomes inactive, we @@ -129,7 +129,7 @@ status::StatusCode Pump::next_() { // Transfer one frame. code = transfer_frame_(*current_source_, sink_); - if (code == status::StatusEnd) { + if (code == status::StatusFinish) { // EOF from main source causes exit. if (current_source_ == &main_source_) { roc_log(LogInfo, "pump: got eof from main source, exiting"); diff --git a/src/internal_modules/roc_sndio/target_sndfile/roc_sndio/sndfile_source.cpp b/src/internal_modules/roc_sndio/target_sndfile/roc_sndio/sndfile_source.cpp index a099af82a..fddef4d55 100644 --- a/src/internal_modules/roc_sndio/target_sndfile/roc_sndio/sndfile_source.cpp +++ b/src/internal_modules/roc_sndio/target_sndfile/roc_sndio/sndfile_source.cpp @@ -141,7 +141,7 @@ status::StatusCode SndfileSource::read(audio::Frame& frame, } if (n_samples == 0) { - return status::StatusEnd; + return status::StatusFinish; } frame.set_num_raw_samples((size_t)n_samples); diff --git a/src/internal_modules/roc_sndio/target_sox/roc_sndio/sox_source.cpp b/src/internal_modules/roc_sndio/target_sox/roc_sndio/sox_source.cpp index 211464c0e..8a8e9374a 100644 --- a/src/internal_modules/roc_sndio/target_sox/roc_sndio/sox_source.cpp +++ b/src/internal_modules/roc_sndio/target_sox/roc_sndio/sox_source.cpp @@ -226,7 +226,7 @@ status::StatusCode SoxSource::read(audio::Frame& frame, } if (paused_ || eof_) { - return status::StatusEnd; + return status::StatusFinish; } if (!frame_factory_.reallocate_frame( @@ -270,7 +270,7 @@ status::StatusCode SoxSource::read(audio::Frame& frame, } if (frame_size == 0) { - return status::StatusEnd; + return status::StatusFinish; } frame.set_num_raw_samples(frame_size); diff --git a/src/internal_modules/roc_sndio/wav_source.cpp b/src/internal_modules/roc_sndio/wav_source.cpp index eb390b76c..2a4fcaabd 100644 --- a/src/internal_modules/roc_sndio/wav_source.cpp +++ b/src/internal_modules/roc_sndio/wav_source.cpp @@ -108,7 +108,7 @@ status::StatusCode WavSource::read(audio::Frame& frame, } if (eof_) { - return status::StatusEnd; + return status::StatusFinish; } if (!frame_factory_.reallocate_frame( @@ -141,7 +141,7 @@ status::StatusCode WavSource::read(audio::Frame& frame, } if (frame_size == 0) { - return status::StatusEnd; + return status::StatusFinish; } frame.set_num_raw_samples(frame_size); diff --git a/src/internal_modules/roc_status/code_to_str.cpp b/src/internal_modules/roc_status/code_to_str.cpp index 8f61ce5b4..da490582e 100644 --- a/src/internal_modules/roc_status/code_to_str.cpp +++ b/src/internal_modules/roc_status/code_to_str.cpp @@ -26,10 +26,8 @@ const char* code_to_str(StatusCode code) { return "Drain"; case StatusAbort: return "Abort"; - case StatusEnd: - return "End"; - case StatusConflict: - return "Conflict"; + case StatusFinish: + return "Finish"; case StatusNoMem: return "NoMem"; case StatusNoRoute: @@ -64,6 +62,8 @@ const char* code_to_str(StatusCode code) { return "BadArg"; case StatusBadOperation: return "BadOperation"; + case StatusBadState: + return "BadState"; } // Most likely someone forgot to initialize status to a proper diff --git a/src/internal_modules/roc_status/status_code.h b/src/internal_modules/roc_status/status_code.h index 2c109f8e6..33c3d27b3 100644 --- a/src/internal_modules/roc_status/status_code.h +++ b/src/internal_modules/roc_status/status_code.h @@ -58,22 +58,14 @@ enum StatusCode { //! Example: session terminated because of no_playback timeout. StatusAbort, - //! Stream is fully read. + //! Stream is fully read or written. //! @remarks - //! Indicates that we've successfully read everything and there is no - //! more data ever. + //! Indicates that we've successfully read or write everything and there is + //! no more data expected. //! @note //! Example: we've got end of file when reading from file, or end of //! stream when reading from network. - StatusEnd, - - //! Conflicting identifier. - //! @remarks - //! Operation can't be completed because of conflicting name or id. - //! @note - //! Example: encoding can't be registered because another one already - //! exists with the same numeric id. - StatusConflict, + StatusFinish, //! Insufficient memory. //! @remarks @@ -191,7 +183,7 @@ enum StatusCode { //! size is not multiple of sample size. StatusBadBuffer, - //! Bad argument. + //! Illegal argument. //! @remark //! One of the provided function arguments has invalid value. //! @note @@ -199,7 +191,7 @@ enum StatusCode { //! invalid enum value. StatusBadArg, - //! Bad operation. + //! Illegal operation. //! @remark //! Operation is not allowed or supported in this context. //! @note @@ -207,6 +199,13 @@ enum StatusCode { //! it, trying to connect using a protocol that doesn't support it. StatusBadOperation, + //! Illegal object state. + //! @remark + //! Object state is invalid and object can't be used anymore. + //! @note + //! Example: trying to write frame after previous write failed. + StatusBadState, + //! Maximum enum value. MaxStatus, }; diff --git a/src/tests/roc_audio/test_fanout.cpp b/src/tests/roc_audio/test_fanout.cpp index 251d14d43..19724f4d3 100644 --- a/src/tests/roc_audio/test_fanout.cpp +++ b/src/tests/roc_audio/test_fanout.cpp @@ -62,7 +62,8 @@ void expect_written(test::MockWriter& mock_writer, size_t sz, sample_t value) { TEST_GROUP(fanout) {}; TEST(fanout, no_writers) { - Fanout fanout(sample_spec); + Fanout fanout(frame_factory, arena, sample_spec); + LONGS_EQUAL(status::StatusOK, fanout.init_status()); write_frame(fanout, BufSz, 0.11f); } @@ -70,8 +71,10 @@ TEST(fanout, no_writers) { TEST(fanout, one_output) { test::MockWriter writer; - Fanout fanout(sample_spec); - fanout.add_output(writer); + Fanout fanout(frame_factory, arena, sample_spec); + LONGS_EQUAL(status::StatusOK, fanout.init_status()); + + LONGS_EQUAL(status::StatusOK, fanout.add_output(writer)); write_frame(fanout, BufSz, 0.11f); @@ -85,9 +88,11 @@ TEST(fanout, two_outputs) { test::MockWriter writer1; test::MockWriter writer2; - Fanout fanout(sample_spec); - fanout.add_output(writer1); - fanout.add_output(writer2); + Fanout fanout(frame_factory, arena, sample_spec); + LONGS_EQUAL(status::StatusOK, fanout.init_status()); + + LONGS_EQUAL(status::StatusOK, fanout.add_output(writer1)); + LONGS_EQUAL(status::StatusOK, fanout.add_output(writer2)); write_frame(fanout, BufSz, 0.11f); @@ -106,10 +111,12 @@ TEST(fanout, remove_output) { test::MockWriter writer2; test::MockWriter writer3; - Fanout fanout(sample_spec); - fanout.add_output(writer1); - fanout.add_output(writer2); - fanout.add_output(writer3); + Fanout fanout(frame_factory, arena, sample_spec); + LONGS_EQUAL(status::StatusOK, fanout.init_status()); + + LONGS_EQUAL(status::StatusOK, fanout.add_output(writer1)); + LONGS_EQUAL(status::StatusOK, fanout.add_output(writer2)); + LONGS_EQUAL(status::StatusOK, fanout.add_output(writer3)); write_frame(fanout, BufSz, 0.11f); @@ -127,25 +134,41 @@ TEST(fanout, remove_output) { } TEST(fanout, has_output) { - test::MockWriter writer; - Fanout fanout(sample_spec); + test::MockWriter writer1; + test::MockWriter writer2; - CHECK(!fanout.has_output(writer)); + Fanout fanout(frame_factory, arena, sample_spec); + LONGS_EQUAL(status::StatusOK, fanout.init_status()); - fanout.add_output(writer); - CHECK(fanout.has_output(writer)); + CHECK(!fanout.has_output(writer1)); + CHECK(!fanout.has_output(writer2)); - fanout.remove_output(writer); - CHECK(!fanout.has_output(writer)); + LONGS_EQUAL(status::StatusOK, fanout.add_output(writer1)); + CHECK(fanout.has_output(writer1)); + CHECK(!fanout.has_output(writer2)); + + LONGS_EQUAL(status::StatusOK, fanout.add_output(writer2)); + CHECK(fanout.has_output(writer1)); + CHECK(fanout.has_output(writer2)); + + fanout.remove_output(writer1); + CHECK(!fanout.has_output(writer1)); + CHECK(fanout.has_output(writer2)); + + fanout.remove_output(writer2); + CHECK(!fanout.has_output(writer1)); + CHECK(!fanout.has_output(writer2)); } TEST(fanout, forward_error) { test::MockWriter writer1; test::MockWriter writer2; - Fanout fanout(sample_spec); - fanout.add_output(writer1); - fanout.add_output(writer2); + Fanout fanout(frame_factory, arena, sample_spec); + LONGS_EQUAL(status::StatusOK, fanout.init_status()); + + LONGS_EQUAL(status::StatusOK, fanout.add_output(writer1)); + LONGS_EQUAL(status::StatusOK, fanout.add_output(writer2)); FramePtr frame = new_frame(BufSz); CHECK(frame); diff --git a/src/tests/roc_audio/test_mixer.cpp b/src/tests/roc_audio/test_mixer.cpp index 9859276a0..1c6ca6f65 100644 --- a/src/tests/roc_audio/test_mixer.cpp +++ b/src/tests/roc_audio/test_mixer.cpp @@ -88,7 +88,7 @@ TEST(mixer, no_readers) { expect_output(status::StatusOK, mixer, BufSz, BufSz, 0); } -TEST(mixer, one_reader) { +TEST(mixer, one_input) { test::MockReader reader(frame_factory, sample_spec); Mixer mixer(frame_factory, arena, sample_spec, true); @@ -103,7 +103,7 @@ TEST(mixer, one_reader) { LONGS_EQUAL(1, reader.total_reads()); } -TEST(mixer, one_reader_big_frame) { +TEST(mixer, one_input_big_frame) { enum { Factor = 3 }; test::MockReader reader(frame_factory, sample_spec); @@ -120,7 +120,7 @@ TEST(mixer, one_reader_big_frame) { LONGS_EQUAL(Factor, reader.total_reads()); } -TEST(mixer, two_readers) { +TEST(mixer, two_inputs) { test::MockReader reader1(frame_factory, sample_spec); test::MockReader reader2(frame_factory, sample_spec); @@ -142,7 +142,7 @@ TEST(mixer, two_readers) { LONGS_EQUAL(1, reader2.total_reads()); } -TEST(mixer, remove_reader) { +TEST(mixer, remove_input) { test::MockReader reader1(frame_factory, sample_spec); test::MockReader reader2(frame_factory, sample_spec); @@ -172,8 +172,35 @@ TEST(mixer, remove_reader) { LONGS_EQUAL(BufSz * 2, reader2.num_unread()); } -// If reader returns StreamEnd, mixer skips it. -TEST(mixer, end) { +TEST(mixer, has_input) { + test::MockReader reader1(frame_factory, sample_spec); + test::MockReader reader2(frame_factory, sample_spec); + + Mixer mixer(frame_factory, arena, sample_spec, true); + LONGS_EQUAL(status::StatusOK, mixer.init_status()); + + CHECK(!mixer.has_input(reader1)); + CHECK(!mixer.has_input(reader2)); + + LONGS_EQUAL(status::StatusOK, mixer.add_input(reader1)); + CHECK(mixer.has_input(reader1)); + CHECK(!mixer.has_input(reader2)); + + LONGS_EQUAL(status::StatusOK, mixer.add_input(reader2)); + CHECK(mixer.has_input(reader1)); + CHECK(mixer.has_input(reader2)); + + mixer.remove_input(reader1); + CHECK(!mixer.has_input(reader1)); + CHECK(mixer.has_input(reader2)); + + mixer.remove_input(reader2); + CHECK(!mixer.has_input(reader1)); + CHECK(!mixer.has_input(reader2)); +} + +// If reader returns StatusFinish, mixer skips it. +TEST(mixer, finish) { test::MockReader reader1(frame_factory, sample_spec); test::MockReader reader2(frame_factory, sample_spec); @@ -187,13 +214,13 @@ TEST(mixer, end) { reader2.add_samples(BufSz, 0.22f); expect_output(status::StatusOK, mixer, BufSz, BufSz, 0.33f); - reader2.set_status(status::StatusEnd); + reader2.set_status(status::StatusFinish); reader1.add_samples(BufSz, 0.44f); reader2.add_samples(BufSz, 0.55f); expect_output(status::StatusOK, mixer, BufSz, BufSz, 0.44f); - reader1.set_status(status::StatusEnd); + reader1.set_status(status::StatusFinish); reader1.add_samples(BufSz, 0.77f); reader2.add_samples(BufSz, 0.88f); @@ -243,7 +270,7 @@ TEST(mixer, partial) { LONGS_EQUAL(Factor1 + Factor2, reader2.total_reads()); } -// Reader returns StreamEnd in the middle of repeating partial. +// Reader returns StatusFinish in the middle of repeating partial. TEST(mixer, partial_end) { test::MockReader reader1(frame_factory, sample_spec); test::MockReader reader2(frame_factory, sample_spec); @@ -261,7 +288,7 @@ TEST(mixer, partial_end) { reader1.set_limit(BufSz); reader2.set_limit(BufSz); - reader1.set_no_samples_status(status::StatusEnd); + reader1.set_no_samples_status(status::StatusFinish); expect_output(status::StatusOK, mixer, BufSz * 4, BufSz * 4, 0.33f, 0, 0); @@ -271,7 +298,7 @@ TEST(mixer, partial_end) { LONGS_EQUAL(3, reader1.total_reads()); LONGS_EQUAL(4, reader2.total_reads()); - LONGS_EQUAL(status::StatusEnd, reader1.last_status()); + LONGS_EQUAL(status::StatusFinish, reader1.last_status()); LONGS_EQUAL(status::StatusOK, reader2.last_status()); } @@ -671,7 +698,7 @@ TEST(mixer, soft_read_partial_drain_two_readers) { LONGS_EQUAL(status::StatusOK, reader2.last_status()); } -// One reader returns StreamEnd during soft read. +// One reader returns StatusFinish during soft read. TEST(mixer, soft_read_partial_end_two_readers) { test::MockReader reader1(frame_factory, sample_spec); test::MockReader reader2(frame_factory, sample_spec); @@ -683,12 +710,12 @@ TEST(mixer, soft_read_partial_end_two_readers) { LONGS_EQUAL(status::StatusOK, mixer.add_input(reader2)); // reader1 returns StatusOK - // reader2 returns StatusPart, then StatusEnd + // reader2 returns StatusPart, then StatusFinish reader1.add_samples(BufSz, 0.11f); reader2.add_samples(BufSz, 0.22f); reader2.add_samples(BufSz, 0.33f); - reader1.set_no_samples_status(status::StatusEnd); + reader1.set_no_samples_status(status::StatusFinish); reader1.set_limit(BufSz / 2); reader2.set_limit(BufSz / 2); @@ -701,7 +728,7 @@ TEST(mixer, soft_read_partial_end_two_readers) { LONGS_EQUAL(3, reader1.total_reads()); LONGS_EQUAL(4, reader2.total_reads()); - LONGS_EQUAL(status::StatusEnd, reader1.last_status()); + LONGS_EQUAL(status::StatusFinish, reader1.last_status()); LONGS_EQUAL(status::StatusOK, reader2.last_status()); } diff --git a/src/tests/roc_audio/test_pcm_mapper_reader.cpp b/src/tests/roc_audio/test_pcm_mapper_reader.cpp index 67bfb707d..977da14fe 100644 --- a/src/tests/roc_audio/test_pcm_mapper_reader.cpp +++ b/src/tests/roc_audio/test_pcm_mapper_reader.cpp @@ -129,7 +129,7 @@ template struct CountReader : IFrameReader { } if (duration == 0) { - return (last_status = status::StatusEnd); + return (last_status = status::StatusFinish); } CHECK(frame_factory.reallocate_frame( diff --git a/src/tests/roc_pipeline/test_helpers/mock_source.h b/src/tests/roc_pipeline/test_helpers/mock_source.h index e0ef443cf..c688a6ece 100644 --- a/src/tests/roc_pipeline/test_helpers/mock_source.h +++ b/src/tests/roc_pipeline/test_helpers/mock_source.h @@ -94,7 +94,7 @@ class MockSource : public sndio::ISource { LONGS_EQUAL(audio::ModeHard, mode); if (pos_ == size_) { - return status::StatusEnd; + return status::StatusFinish; } CHECK(frame_factory_.reallocate_frame( diff --git a/src/tests/roc_pipeline/test_transcoder_source.cpp b/src/tests/roc_pipeline/test_transcoder_source.cpp index 841f10481..75f6a477a 100644 --- a/src/tests/roc_pipeline/test_transcoder_source.cpp +++ b/src/tests/roc_pipeline/test_transcoder_source.cpp @@ -183,7 +183,7 @@ TEST(transcoder_source, eof) { mock_source.add(SamplesPerFrame, input_sample_spec); read_frame(status::StatusOK, transcoder, SamplesPerFrame); - read_frame(status::StatusEnd, transcoder, SamplesPerFrame); + read_frame(status::StatusFinish, transcoder, SamplesPerFrame); } TEST(transcoder_source, frame_size_small) { diff --git a/src/tests/roc_sndio/test_backend_source.cpp b/src/tests/roc_sndio/test_backend_source.cpp index 197c373ed..4e7a8561c 100644 --- a/src/tests/roc_sndio/test_backend_source.cpp +++ b/src/tests/roc_sndio/test_backend_source.cpp @@ -215,7 +215,7 @@ TEST(backend_source, rewind_after_eof) { for (int i = 0; i < 10; i++) { expect_read(status::StatusOK, *backend_source, *frame, FrameSize); expect_read(status::StatusOK, *backend_source, *frame, FrameSize); - expect_read(status::StatusEnd, *backend_source, *frame, FrameSize); + expect_read(status::StatusFinish, *backend_source, *frame, FrameSize); // rewind LONGS_EQUAL(status::StatusOK, backend_source->rewind());