Skip to content

Commit

Permalink
GH-303 Add support for status codes in packet reader
Browse files Browse the repository at this point in the history
  • Loading branch information
dshil committed Oct 6, 2023
1 parent 1f4884f commit c9b26be
Show file tree
Hide file tree
Showing 49 changed files with 1,588 additions and 608 deletions.
2 changes: 1 addition & 1 deletion SConstruct
Original file line number Diff line number Diff line change
Expand Up @@ -683,7 +683,7 @@ env['ROC_SOVER'] = '.'.join(env['ROC_VERSION'].split('.')[:2])
# internal modules
env['ROC_MODULES'] = [
'roc_core',
'roc_error',
'roc_status',
'roc_address',
'roc_packet',
'roc_audio',
Expand Down
12 changes: 10 additions & 2 deletions src/internal_modules/roc_audio/depacketizer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "roc_core/log.h"
#include "roc_core/panic.h"
#include "roc_core/stddefs.h"
#include "roc_status/status_code_to_str.h"

namespace roc {
namespace audio {
Expand Down Expand Up @@ -260,8 +261,15 @@ void Depacketizer::update_packet_(FrameInfo& info) {
}

packet::PacketPtr Depacketizer::read_packet_() {
packet::PacketPtr pp = reader_.read();
if (!pp) {
packet::PacketPtr pp;
const status::StatusCode code = reader_.read(pp);
if (code != status::StatusOK) {
if (code != status::StatusNoData) {
// TODO: forward status (gh-302)
roc_log(LogError, "depacketizer: failed to read packet: %s",
status::code_to_str(code));
}

return NULL;
}

Expand Down
26 changes: 0 additions & 26 deletions src/internal_modules/roc_error/error_code.h

This file was deleted.

23 changes: 0 additions & 23 deletions src/internal_modules/roc_error/error_code_to_str.cpp

This file was deleted.

26 changes: 0 additions & 26 deletions src/internal_modules/roc_error/error_code_to_str.h

This file was deleted.

126 changes: 82 additions & 44 deletions src/internal_modules/roc_fec/reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "roc_core/log.h"
#include "roc_core/panic.h"
#include "roc_packet/fec_scheme_to_str.h"
#include "roc_status/status_code_to_str.h"

namespace roc {
namespace fec {
Expand Down Expand Up @@ -59,43 +60,49 @@ bool Reader::is_alive() const {
return alive_;
}

packet::PacketPtr Reader::read() {
status::StatusCode Reader::read(packet::PacketPtr& pp) {
roc_panic_if_not(is_valid());

if (!alive_) {
return NULL;
// TODO: return StatusDead (gh-183)
return status::StatusNoData;
}
packet::PacketPtr pp = read_();
if (pp) {

status::StatusCode code = read_(pp);
if (code == status::StatusOK) {
n_packets_++;
}
// check if alive_ has changed
return (alive_ ? pp : NULL);
if (!alive_) {
pp = NULL;
// TODO: return StatusDead (gh-183)
return status::StatusNoData;
}

return code;
}

packet::PacketPtr Reader::read_() {
fetch_packets_();
status::StatusCode Reader::read_(packet::PacketPtr& ptr) {
const status::StatusCode code = fetch_all_packets_();
if (code != status::StatusOK) {
return code;
}

if (!started_) {
packet::PacketPtr pp = get_first_packet_();
if (!pp || pp->fec()->encoding_symbol_id > 0) {
return source_queue_.read();
}

roc_log(LogDebug,
"fec reader: got first packet in a block, start decoding:"
" n_packets_before=%u sbn=%lu",
n_packets_, (unsigned long)cur_sbn_);
started_ = try_start_();
}

started_ = true;
if (!started_) {
// until started, just forward all source packets
return source_queue_.read(ptr);
}

return get_next_packet_();
return get_next_packet_(ptr);
}

packet::PacketPtr Reader::get_first_packet_() {
bool Reader::try_start_() {
packet::PacketPtr pp = source_queue_.head();
if (!pp) {
return NULL;
return false;
}

const packet::FEC& fec = *pp->fec();
Expand All @@ -107,16 +114,28 @@ packet::PacketPtr Reader::get_first_packet_() {
(unsigned long)fec.encoding_symbol_id,
(unsigned long)fec.source_block_length, (unsigned long)fec.block_length,
(unsigned long)fec.payload.size());
return NULL;
return false;
}

cur_sbn_ = fec.source_block_number;
drop_repair_packets_from_prev_blocks_();

return pp;
if (pp->fec()->encoding_symbol_id > 0) {
// Wait until we receive first packet in block (ESI=0), see also gh-186.
return false;
}

roc_log(LogDebug,
"fec reader: got first packet in a block, start decoding:"
" n_packets_before=%u sbn=%lu",
n_packets_, (unsigned long)cur_sbn_);

started_ = true;

return true;
}

packet::PacketPtr Reader::get_next_packet_() {
status::StatusCode Reader::get_next_packet_(packet::PacketPtr& ptr) {
fill_block_();

packet::PacketPtr pp = source_block_[next_packet_];
Expand All @@ -138,7 +157,7 @@ packet::PacketPtr Reader::get_next_packet_() {

if (pos == source_block_.size()) {
if (source_queue_.size() == 0) {
return NULL;
return status::StatusNoData;
}
} else {
pp = source_block_[pos++];
Expand All @@ -154,7 +173,9 @@ packet::PacketPtr Reader::get_next_packet_() {
}
} while (!pp);

return pp;
ptr = pp;

return status::StatusOK;
}

void Reader::next_block_() {
Expand Down Expand Up @@ -253,28 +274,36 @@ packet::PacketPtr Reader::parse_repaired_packet_(const core::Slice<uint8_t>& buf
return pp;
}

void Reader::fetch_packets_() {
for (;;) {
if (packet::PacketPtr pp = source_reader_.read()) {
if (!validate_fec_packet_(pp)) {
return;
}
source_queue_.write(pp);
} else {
break;
}
status::StatusCode Reader::fetch_all_packets_() {
status::StatusCode code = fetch_packets_(source_reader_, source_queue_);
if (code == status::StatusOK) {
code = fetch_packets_(repair_reader_, repair_queue_);
}

return code;
}

status::StatusCode Reader::fetch_packets_(packet::IReader& reader,
packet::IWriter& writer) {
for (;;) {
if (packet::PacketPtr pp = repair_reader_.read()) {
if (!validate_fec_packet_(pp)) {
return;
packet::PacketPtr pp;

const status::StatusCode code = reader.read(pp);
if (code != status::StatusOK) {
if (code == status::StatusNoData) {
break;
}
repair_queue_.write(pp);
} else {
return code;
}

if (!validate_fec_packet_(pp)) {
break;
}

writer.write(pp);
}

return status::StatusOK;
}

void Reader::fill_block_() {
Expand All @@ -301,7 +330,10 @@ void Reader::fill_source_block_() {
break;
}

(void)source_queue_.read();
packet::PacketPtr p;
const status::StatusCode code = source_queue_.read(p);
roc_panic_if_msg(code != status::StatusOK, "failed to read source packet: %s",
status::code_to_str(code));
n_fetched++;

if (packet::blknum_lt(fec.source_block_number, cur_sbn_)) {
Expand Down Expand Up @@ -366,7 +398,10 @@ void Reader::fill_repair_block_() {
break;
}

(void)repair_queue_.read();
packet::PacketPtr p;
const status::StatusCode code = repair_queue_.read(p);
roc_panic_if_msg(code != status::StatusOK, "failed to read repair packet: %s",
status::code_to_str(code));
n_fetched++;

if (packet::blknum_lt(fec.source_block_number, cur_sbn_)) {
Expand Down Expand Up @@ -737,7 +772,10 @@ void Reader::drop_repair_packets_from_prev_blocks_() {
" decoding not started: cur_sbn=%lu pkt_sbn=%lu",
(unsigned long)cur_sbn_, (unsigned long)fec.source_block_number);

(void)repair_queue_.read();
packet::PacketPtr p;
const status::StatusCode code = repair_queue_.read(p);
roc_panic_if_msg(code != status::StatusOK, "failed to read repair packet: %s",
status::code_to_str(code));
n_dropped++;
}

Expand Down
11 changes: 6 additions & 5 deletions src/internal_modules/roc_fec/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,20 +69,21 @@ class Reader : public packet::IReader, public core::NonCopyable<> {
//! Read packet.
//! @remarks
//! When a packet loss is detected, try to restore it from repair packets.
virtual packet::PacketPtr read();
virtual status::StatusCode read(packet::PacketPtr&);

private:
packet::PacketPtr read_();
status::StatusCode read_(packet::PacketPtr&);

packet::PacketPtr get_first_packet_();
packet::PacketPtr get_next_packet_();
bool try_start_();
status::StatusCode get_next_packet_(packet::PacketPtr&);

void next_block_();
void try_repair_();

packet::PacketPtr parse_repaired_packet_(const core::Slice<uint8_t>& buffer);

void fetch_packets_();
status::StatusCode fetch_all_packets_();
status::StatusCode fetch_packets_(packet::IReader&, packet::IWriter&);

void fill_block_();
void fill_source_block_();
Expand Down
10 changes: 6 additions & 4 deletions src/internal_modules/roc_node/sender_encoder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "roc_core/log.h"
#include "roc_core/panic.h"
#include "roc_pipeline/metrics.h"
#include "roc_status/status_code_to_str.h"

namespace roc {
namespace node {
Expand Down Expand Up @@ -137,7 +138,8 @@ bool SenderEncoder::is_complete() {
return slot_metrics.is_complete;
}

bool SenderEncoder::read(address::Interface iface, packet::PacketPtr& packet) {
status::StatusCode SenderEncoder::read(address::Interface iface,
packet::PacketPtr& packet) {
roc_panic_if_not(is_valid());

roc_panic_if(iface < 0);
Expand All @@ -149,11 +151,11 @@ bool SenderEncoder::read(address::Interface iface, packet::PacketPtr& packet) {
"sender encoder node:"
" can't read from %s interface: interface not activated",
address::interface_to_str(iface));
return false;
// TODO: return StatusInvalidArg (gh-183)
return status::StatusNoData;
}

packet = reader->read();
return true;
return reader->read(packet);
}

sndio::ISink& SenderEncoder::sink() {
Expand Down
Loading

0 comments on commit c9b26be

Please sign in to comment.