Skip to content

Commit

Permalink
draft-183: Refine status code handling in roc_pipeline
Browse files Browse the repository at this point in the history
roc_status:
- remove StatusConflict (use StatusBadArg)
- rename StatusEnd to StatusFinish
- allow StatusFinish for both read and write
- add StatusBadState

roc_audio:
- both mixer and fanout now handle StatusFinish

roc_sndio:
- new state DeviceState_Broken

roc_pipeline:
- sessions: when session is broken, all operations become no-op,
  refresh() reports error
- sink/source: when sink/source is broken, all operations return
  StatusBadState
- state tracker: set_broken, is_broken
  • Loading branch information
gavv committed Jul 28, 2024
1 parent 968677e commit 428a64b
Show file tree
Hide file tree
Showing 35 changed files with 458 additions and 233 deletions.
5 changes: 0 additions & 5 deletions src/internal_modules/roc_audio/depacketizer.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,6 @@ struct DepacketizerMetrics {
class Depacketizer : public IFrameReader, public core::NonCopyable<> {
public:
//! Initialization.
//!
//! @b Parameters
//! - @p packet_reader is used to read packets
//! - @p payload_decoder is used to extract samples from packets
//! - @p sample_spec describes output frames
Depacketizer(packet::IReader& packet_reader,
IFrameDecoder& payload_decoder,
FrameFactory& frame_factory,
Expand Down
74 changes: 60 additions & 14 deletions src/internal_modules/roc_audio/fanout.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,11 @@
namespace roc {
namespace audio {

Fanout::Fanout(const SampleSpec& sample_spec)
: sample_spec_(sample_spec)
Fanout::Fanout(FrameFactory& frame_factory,
core::IArena& arena,
const SampleSpec& sample_spec)
: outputs_(arena)
, sample_spec_(sample_spec)
, init_status_(status::NoStatus) {
roc_panic_if_msg(!sample_spec_.is_valid(), "fanout: required valid sample spec: %s",
sample_spec_to_str(sample_spec_).c_str());
Expand All @@ -30,37 +33,80 @@ status::StatusCode Fanout::init_status() const {
bool Fanout::has_output(IFrameWriter& writer) {
roc_panic_if(init_status_ != status::StatusOK);

return frame_writers_.contains(writer);
for (size_t no = 0; no < outputs_.size(); no++) {
if (outputs_[no].writer == &writer) {
return true;
}
}

return false;
}

void Fanout::add_output(IFrameWriter& writer) {
status::StatusCode Fanout::add_output(IFrameWriter& writer) {
roc_panic_if(init_status_ != status::StatusOK);

frame_writers_.push_back(writer);
Output output;
output.writer = &writer;

if (!outputs_.push_back(output)) {
roc_log(LogError, "fanout: can't add output: allocation failed");
return status::StatusNoMem;
}

return status::StatusOK;
}

void Fanout::remove_output(IFrameWriter& writer) {
roc_panic_if(init_status_ != status::StatusOK);

frame_writers_.remove(writer);
size_t rm_idx = (size_t)-1;

for (size_t no = 0; no < outputs_.size(); no++) {
if (outputs_[no].writer == &writer) {
rm_idx = no;
break;
}
}

if (rm_idx == (size_t)-1) {
roc_panic("fanout: can't remove output: writer not found");
}

// Remove from array.
for (size_t no = rm_idx + 1; no < outputs_.size(); no++) {
outputs_[no - 1] = outputs_[no];
}

if (!outputs_.resize(outputs_.size() - 1)) {
roc_panic("fanout: can't remove output: resize failed");
}
}

status::StatusCode Fanout::write(Frame& in_frame) {
roc_panic_if(init_status_ != status::StatusOK);

sample_spec_.validate_frame(in_frame);

for (IFrameWriter* writer = frame_writers_.front(); writer != NULL;
writer = frame_writers_.nextof(*writer)) {
const status::StatusCode code = writer->write(in_frame);
for (size_t no = 0; no < outputs_.size(); no++) {
Output& output = outputs_[no];

if (output.is_finished) {
continue;
}

const status::StatusCode code = output.writer->write(in_frame);

if (code == status::StatusFinish) {
// From now on, skip this writer until it's removed.
output.is_finished = true;
continue;
}

if (code != status::StatusOK) {
// These codes can be returned only from read().
roc_panic_if_msg(
code == status::StatusPart || code == status::StatusDrain,
"fanout loop: unexpected status from write operation: status=%s",
status::code_to_str(code));

roc_panic_if_msg(code == status::StatusPart || code == status::StatusDrain,
"fanout: unexpected status from write operation: status=%s",
status::code_to_str(code));
return code;
}
}
Expand Down
35 changes: 26 additions & 9 deletions src/internal_modules/roc_audio/fanout.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@

#include "roc_audio/frame_factory.h"
#include "roc_audio/iframe_writer.h"
#include "roc_audio/sample.h"
#include "roc_audio/sample_spec.h"
#include "roc_core/list.h"
#include "roc_core/array.h"
#include "roc_core/iarena.h"
#include "roc_core/noncopyable.h"

namespace roc {
Expand All @@ -26,33 +26,50 @@ namespace audio {
//!
//! Duplicates audio stream to multiple output writers.
//!
//! Since StatusPart and StatusDrain are not allowed for write operations
//! Fanout does not need any special handling, unlike Mixer.
//! Features:
//! - Since StatusPart and StatusDrain are not allowed for write operations,
//! fanout does not need any special handling for them.
//!
//! - If pipeline element reports end-of-stream (StatusFinish), fanout skips this
//! element until it's removed.
class Fanout : public IFrameWriter, public core::NonCopyable<> {
public:
//! Initialize.
Fanout(const SampleSpec& sample_spec);
Fanout(FrameFactory& frame_factory,
core::IArena& arena,
const SampleSpec& sample_spec);

//! Check if the object was successfully constructed.
status::StatusCode init_status() const;

//! Check if writer is already added.
bool has_output(IFrameWriter&);
bool has_output(IFrameWriter& writer);

//! Add output writer.
void add_output(IFrameWriter&);
ROC_ATTR_NODISCARD status::StatusCode add_output(IFrameWriter& writer);

//! Remove output writer.
void remove_output(IFrameWriter&);
void remove_output(IFrameWriter& writer);

//! Write audio frame.
//! @remarks
//! Writes samples to every output writer.
virtual ROC_ATTR_NODISCARD status::StatusCode write(Frame& frame);

private:
core::List<IFrameWriter, core::NoOwnership> frame_writers_;
struct Output {
// to where to write samples, typically sender session
IFrameWriter* writer;
// if true, output returned StatusFinish and should not be used
bool is_finished;

Output()
: writer(NULL)
, is_finished(false) {
}
};

core::Array<Output, 8> outputs_;
const SampleSpec sample_spec_;

status::StatusCode init_status_;
Expand Down
26 changes: 19 additions & 7 deletions src/internal_modules/roc_audio/mixer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,18 @@ status::StatusCode Mixer::init_status() const {
return init_status_;
}

bool Mixer::has_input(IFrameReader& reader) {
roc_panic_if(init_status_ != status::StatusOK);

for (size_t ni = 0; ni < inputs_.size(); ni++) {
if (inputs_[ni].reader == &reader) {
return true;
}
}

return false;
}

ROC_ATTR_NODISCARD status::StatusCode Mixer::add_input(IFrameReader& reader) {
roc_panic_if(init_status_ != status::StatusOK);

Expand Down Expand Up @@ -90,8 +102,8 @@ void Mixer::remove_input(IFrameReader& reader) {
}

// Remove from array.
for (size_t n = rm_idx + 1; n < inputs_.size(); n++) {
inputs_[n - 1] = inputs_[n];
for (size_t ni = rm_idx + 1; ni < inputs_.size(); ni++) {
inputs_[ni - 1] = inputs_[ni];
}

if (!inputs_.resize(inputs_.size() - 1)) {
Expand Down Expand Up @@ -306,8 +318,8 @@ Mixer::mix_one_(Input& input, sample_t* mix_data, size_t mix_size, FrameReadMode
roc_panic_if(input.n_mixed % sample_spec_.num_channels() != 0);
roc_panic_if(mix_size % sample_spec_.num_channels() != 0);

// If input returned StatusEnd, don't call it anymore.
if (input.ended && input.n_mixed < mix_size) {
// If input returned StatusFinish, don't call it anymore.
if (input.is_finished && input.n_mixed < mix_size) {
input.n_mixed = mix_size;
}

Expand All @@ -316,7 +328,7 @@ Mixer::mix_one_(Input& input, sample_t* mix_data, size_t mix_size, FrameReadMode
// We stop when one of the following happens:
// - we have fully filled requested buffer
// - we got StatusDrain, which means that soft read stopped early
// - we got StatusEnd, which means that reader is terminating
// - we got StatusFinish, which means that reader is terminating
// - we got an error (any other status), which means that the whole mixer fails
while (input.n_mixed < mix_size) {
const packet::stream_timestamp_t remained_duration = packet::stream_timestamp_t(
Expand All @@ -334,10 +346,10 @@ Mixer::mix_one_(Input& input, sample_t* mix_data, size_t mix_size, FrameReadMode
const status::StatusCode code =
input.reader->read(*in_frame_, capped_duration, mode);

if (code == status::StatusEnd) {
if (code == status::StatusFinish) {
// Stream ended and will be removed soon, pad it with zeros until that.
input.n_mixed = mix_size;
input.ended = true;
input.is_finished = true;
break;
}

Expand Down
20 changes: 12 additions & 8 deletions src/internal_modules/roc_audio/mixer.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,10 @@

#include "roc_audio/frame_factory.h"
#include "roc_audio/iframe_reader.h"
#include "roc_audio/sample.h"
#include "roc_audio/sample_spec.h"
#include "roc_core/array.h"
#include "roc_core/iarena.h"
#include "roc_core/list.h"
#include "roc_core/noncopyable.h"
#include "roc_core/time.h"

namespace roc {
namespace audio {
Expand All @@ -41,6 +39,9 @@ namespace audio {
//! (In other words, StatusPart and StatusDrain never leave mixer. Mixer
//! always returns as much samples as requested).
//!
//! - If pipeline element reports end-of-stream (StatusFinish), mixer skips this
//! element until it's removed.
//!
//! - If timestamps are enabled, mixer computes capture timestamp of output
//! frame as the average capture timestamps of all mixed input frames.
//!
Expand All @@ -58,11 +59,14 @@ class Mixer : public IFrameReader, public core::NonCopyable<> {
//! Check if the object was successfully constructed.
status::StatusCode init_status() const;

//! Check if reader is already added.
bool has_input(IFrameReader& reader);

//! Add input reader.
ROC_ATTR_NODISCARD status::StatusCode add_input(IFrameReader&);
ROC_ATTR_NODISCARD status::StatusCode add_input(IFrameReader& reader);

//! Remove input reader.
void remove_input(IFrameReader&);
void remove_input(IFrameReader& reader);

//! Read audio frame.
//! @remarks
Expand All @@ -82,14 +86,14 @@ class Mixer : public IFrameReader, public core::NonCopyable<> {
size_t n_mixed;
// capture timestamp of first sample in mix_frame_
core::nanoseconds_t cts;
// if true, input returned StatusEnd and should not be used
bool ended;
// if true, input returned StatusFinish and should not be used
bool is_finished;

Input()
: reader(NULL)
, n_mixed(0)
, cts(0)
, ended(false) {
, is_finished(false) {
}
};

Expand Down
10 changes: 0 additions & 10 deletions src/internal_modules/roc_audio/packetizer.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,6 @@ struct PacketizerMetrics {
class Packetizer : public IFrameWriter, public core::NonCopyable<> {
public:
//! Initialization.
//!
//! @b Parameters
//! - @p writer is used to write generated packets
//! - @p composer is used to initialize new packets
//! - @p sequencer is used to put packets in sequence
//! - @p payload_encoder is used to write samples to packets
//! - @p packet_factory is used to allocate packets
//! - @p buffer_factory is used to allocate buffers for packets
//! - @p packet_length defines packet length in nanoseconds
//! - @p sample_spec describes input frames
Packetizer(packet::IWriter& writer,
packet::IComposer& composer,
packet::ISequencer& sequencer,
Expand Down
2 changes: 1 addition & 1 deletion src/internal_modules/roc_audio/processor_map.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ ProcessorMap::register_plc(int backend_id, void* backend_owner, PlcFunc ctor_fn)
"processor map: failed to register plc backend:"
" backend id %d already exists",
backend_id);
return status::StatusConflict;
return status::StatusBadArg;
}

if (!backend_owner) {
Expand Down
19 changes: 11 additions & 8 deletions src/internal_modules/roc_pipeline/receiver_loop.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -219,24 +219,27 @@ status::StatusCode ReceiverLoop::read(audio::Frame& frame,

core::Mutex::Lock lock(source_mutex_);

if (source_.state() == sndio::DeviceState_Broken) {
// Don't go to sleep if we're broke.
return status::StatusBadState;
}

if (ticker_) {
ticker_->wait(ticker_ts_);
}

// invokes process_subframe_imp() and process_task_imp()
// Invokes process_subframe_imp() and process_task_imp().
const status::StatusCode code = process_subframes_and_tasks(frame, duration, mode);

roc_panic_if_msg(code <= status::NoStatus || code >= status::MaxStatus,
"receiver loop: invalid status code %d", code);

if (code != status::StatusOK && code != status::StatusPart) {
return code;
}

ticker_ts_ += frame.duration();
if (code == status::StatusOK || code == status::StatusPart) {
ticker_ts_ += frame.duration();

if (auto_reclock_) {
source_.reclock(core::timestamp(core::ClockUnix));
if (auto_reclock_) {
source_.reclock(core::timestamp(core::ClockUnix));
}
}

return code;
Expand Down
Loading

0 comments on commit 428a64b

Please sign in to comment.