Skip to content

Commit

Permalink
GH-303 Add support for status codes in packet writer
Browse files Browse the repository at this point in the history
  • Loading branch information
dshil committed Oct 13, 2023
1 parent 98a77a2 commit 1e3d12b
Show file tree
Hide file tree
Showing 63 changed files with 1,555 additions and 407 deletions.
9 changes: 8 additions & 1 deletion src/internal_modules/roc_audio/packetizer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "roc_core/fast_random.h"
#include "roc_core/log.h"
#include "roc_core/panic.h"
#include "roc_status/code_to_str.h"

namespace roc {
namespace audio {
Expand Down Expand Up @@ -125,7 +126,13 @@ void Packetizer::end_packet_() {
pad_packet_();
}

writer_.write(packet_);
const status::StatusCode code = writer_.write(packet_);
if (code != status::StatusOK) {
// TODO: handle returned status code (gh-183).
roc_panic_if_msg(code != status::StatusNoData,
"packetizer: failed to write packet: status=%s",
status::code_to_str(code));
}

seqnum_++;
stream_ts_ += (packet::stream_timestamp_t)packet_pos_;
Expand Down
10 changes: 8 additions & 2 deletions src/internal_modules/roc_fec/reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ status::StatusCode Reader::fetch_packets_(packet::IReader& reader,
for (;;) {
packet::PacketPtr pp;

const status::StatusCode code = reader.read(pp);
status::StatusCode code = reader.read(pp);
if (code != status::StatusOK) {
if (code == status::StatusNoData) {
break;
Expand All @@ -300,7 +300,13 @@ status::StatusCode Reader::fetch_packets_(packet::IReader& reader,
break;
}

writer.write(pp);
code = writer.write(pp);
if (code != status::StatusOK) {
// TODO: properly handle returned status (gh-183)
roc_panic_if_msg(code != status::StatusNoData,
"fec reader: failed to write fetched packets: status=%s",
status::code_to_str(code));
}
}

return status::StatusOK;
Expand Down
41 changes: 31 additions & 10 deletions src/internal_modules/roc_fec/writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "roc_core/log.h"
#include "roc_core/panic.h"
#include "roc_packet/fec_scheme_to_str.h"
#include "roc_status/code_to_str.h"

namespace roc {
namespace fec {
Expand Down Expand Up @@ -92,12 +93,13 @@ bool Writer::resize(size_t sblen, size_t rblen) {
return true;
}

void Writer::write(const packet::PacketPtr& pp) {
status::StatusCode Writer::write(const packet::PacketPtr& pp) {
roc_panic_if_not(is_valid());
roc_panic_if_not(pp);

if (!alive_) {
return;
// TODO: return StatusDead (gh-183)
return status::StatusNoData;
}

validate_fec_packet_(pp);
Expand All @@ -108,22 +110,32 @@ void Writer::write(const packet::PacketPtr& pp) {

if (cur_packet_ == 0) {
if (!begin_block_(pp)) {
return;
// TODO: return StatusDead (gh-183)
return status::StatusNoData;
}
}

if (!validate_source_packet_(pp)) {
return;
// TODO: return StatusDead (gh-183)
return status::StatusNoData;
}

write_source_packet_(pp);
const status::StatusCode code = write_source_packet_(pp);
if (code != status::StatusOK) {
// TODO: handle returned status code (gh-183).
roc_panic_if_msg(code != status::StatusNoData,
"fec writer: failed to write source packet: status=%s",
status::code_to_str(code));
}

cur_packet_++;

if (cur_packet_ == cur_sblen_) {
end_block_();
next_block_();
}

return status::StatusOK;
}

bool Writer::begin_block_(const packet::PacketPtr& pp) {
Expand Down Expand Up @@ -185,7 +197,7 @@ bool Writer::apply_sizes_(size_t sblen, size_t rblen, size_t payload_size) {
return true;
}

void Writer::write_source_packet_(const packet::PacketPtr& pp) {
status::StatusCode Writer::write_source_packet_(const packet::PacketPtr& pp) {
encoder_.set(cur_packet_, pp->fec()->payload);

pp->add_flags(packet::Packet::FlagComposed);
Expand All @@ -195,7 +207,7 @@ void Writer::write_source_packet_(const packet::PacketPtr& pp) {
roc_panic("fec writer: can't compose source packet");
}

writer_.write(pp);
return writer_.write(pp);
}

void Writer::make_repair_packets_() {
Expand Down Expand Up @@ -272,10 +284,19 @@ void Writer::compose_repair_packets_() {
void Writer::write_repair_packets_() {
for (size_t i = 0; i < cur_rblen_; i++) {
packet::PacketPtr rp = repair_block_[i];
if (rp) {
writer_.write(repair_block_[i]);
repair_block_[i] = NULL;
if (!rp) {
continue;
}

const status::StatusCode code = writer_.write(repair_block_[i]);
if (code != status::StatusOK) {
// TODO: handle returned status code (gh-183).
roc_panic_if_msg(code != status::StatusNoData,
"fec writer: failed to write repair packet: status=%s",
status::code_to_str(code));
}

repair_block_[i] = NULL;
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/internal_modules/roc_fec/writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class Writer : public packet::IWriter, public core::NonCopyable<> {
//! @remarks
//! - writes the given source packet to the output writer
//! - generates repair packets and also writes them to the output writer
virtual void write(const packet::PacketPtr&);
virtual ROC_ATTR_NODISCARD status::StatusCode write(const packet::PacketPtr&);

private:
bool begin_block_(const packet::PacketPtr& pp);
Expand All @@ -86,7 +86,7 @@ class Writer : public packet::IWriter, public core::NonCopyable<> {

bool apply_sizes_(size_t sblen, size_t rblen, size_t payload_size);

void write_source_packet_(const packet::PacketPtr&);
status::StatusCode write_source_packet_(const packet::PacketPtr&);
void make_repair_packets_();
packet::PacketPtr make_repair_packet_(packet::seqnum_t n);
void encode_repair_packets_();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "roc_core/panic.h"
#include "roc_core/shared_ptr.h"
#include "roc_core/string_builder.h"
#include "roc_status/code_to_str.h"

namespace roc {
namespace netio {
Expand Down Expand Up @@ -289,7 +290,13 @@ void UdpReceiverPort::recv_cb_(uv_udp_t* handle,

pp->set_data(core::Slice<uint8_t>(*bp, 0, (size_t)nread));

self.writer_.write(pp);
const status::StatusCode code = self.writer_.write(pp);
if (code != status::StatusOK) {
// TODO: handle status code (gh-183)
roc_panic_if_msg(code != status::StatusNoData,
"udp receiver: failed to write packet: status=%s",
status::code_to_str(code));
}
}

bool UdpReceiverPort::join_multicast_group_() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ AsyncOperationStatus UdpSenderPort::async_close(ICloseHandler& handler,
return AsyncOp_Started;
}

void UdpSenderPort::write(const packet::PacketPtr& pp) {
status::StatusCode UdpSenderPort::write(const packet::PacketPtr& pp) {
if (!pp) {
roc_panic("udp sender: %s: unexpected null packet", descriptor());
}
Expand All @@ -174,6 +174,8 @@ void UdpSenderPort::write(const packet::PacketPtr& pp) {
write_(pp);

report_stats_();

return status::StatusOK;
}

void UdpSenderPort::write_(const packet::PacketPtr& pp) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class UdpSenderPort : public BasicPort, public packet::IWriter {
//! Write packet.
//! @remarks
//! May be called from any thread.
virtual void write(const packet::PacketPtr&);
virtual ROC_ATTR_NODISCARD status::StatusCode write(const packet::PacketPtr&);

protected:
//! Format descriptor.
Expand Down
10 changes: 6 additions & 4 deletions src/internal_modules/roc_node/receiver_decoder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "roc_address/interface.h"
#include "roc_core/log.h"
#include "roc_core/panic.h"
#include "roc_status/code_to_str.h"

namespace roc {
namespace node {
Expand Down Expand Up @@ -134,7 +135,8 @@ bool ReceiverDecoder::get_metrics(pipeline::ReceiverSlotMetrics& slot_metrics,
return true;
}

bool ReceiverDecoder::write(address::Interface iface, const packet::PacketPtr& packet) {
status::StatusCode ReceiverDecoder::write(address::Interface iface,
const packet::PacketPtr& packet) {
roc_panic_if_not(is_valid());

roc_panic_if(iface < 0);
Expand All @@ -146,11 +148,11 @@ bool ReceiverDecoder::write(address::Interface iface, const packet::PacketPtr& p
"receiver decoder node:"
" can't write to %s interface: interface not activated",
address::interface_to_str(iface));
return false;
// TODO: return StatusNotFound (gh-183)
return status::StatusNoData;
}

writer->write(packet);
return true;
return writer->write(packet);
}

sndio::ISource& ReceiverDecoder::source() {
Expand Down
5 changes: 4 additions & 1 deletion src/internal_modules/roc_node/receiver_decoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@

#include "roc_address/interface.h"
#include "roc_address/protocol.h"
#include "roc_core/attributes.h"
#include "roc_core/mutex.h"
#include "roc_node/context.h"
#include "roc_node/node.h"
#include "roc_pipeline/ipipeline_task_scheduler.h"
#include "roc_pipeline/receiver_loop.h"
#include "roc_status/status_code.h"

namespace roc {
namespace node {
Expand Down Expand Up @@ -54,7 +56,8 @@ class ReceiverDecoder : public Node, private pipeline::IPipelineTaskScheduler {
void* sess_metrics_arg);

//! Write packet for decoding.
bool write(address::Interface iface, const packet::PacketPtr& packet);
ROC_ATTR_NODISCARD status::StatusCode write(address::Interface iface,
const packet::PacketPtr& packet);

//! Source for reading decoded frames.
sndio::ISource& source();
Expand Down
4 changes: 3 additions & 1 deletion src/internal_modules/roc_packet/concurrent_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ status::StatusCode ConcurrentQueue::read(PacketPtr& ptr) {
return status::StatusOK;
}

void ConcurrentQueue::write(const PacketPtr& packet) {
status::StatusCode ConcurrentQueue::write(const PacketPtr& packet) {
if (!packet) {
roc_panic("concurrent queue: packet is null");
}
Expand All @@ -44,6 +44,8 @@ void ConcurrentQueue::write(const PacketPtr& packet) {
if (write_sem_) {
write_sem_->post();
}

return status::StatusOK;
}

} // namespace packet
Expand Down
2 changes: 1 addition & 1 deletion src/internal_modules/roc_packet/concurrent_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class ConcurrentQueue : public IReader, public IWriter, public core::NonCopyable

//! Add packet to the queue.
//! Wait-free operation.
virtual void write(const PacketPtr& packet);
virtual ROC_ATTR_NODISCARD status::StatusCode write(const PacketPtr& packet);

private:
core::Optional<core::Semaphore> write_sem_;
Expand Down
11 changes: 9 additions & 2 deletions src/internal_modules/roc_packet/delayed_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "roc_packet/delayed_reader.h"
#include "roc_core/log.h"
#include "roc_core/panic.h"
#include "roc_status/code_to_str.h"
#include "roc_status/status_code.h"

namespace roc {
Expand Down Expand Up @@ -44,15 +45,21 @@ status::StatusCode DelayedReader::read(PacketPtr& ptr) {
status::StatusCode DelayedReader::fetch_packets_() {
PacketPtr pp;
for (;;) {
const status::StatusCode code = reader_.read(pp);
status::StatusCode code = reader_.read(pp);
if (code != status::StatusOK) {
if (code == status::StatusNoData) {
break;
}
return code;
}

queue_.write(pp);
code = queue_.write(pp);
if (code != status::StatusOK) {
// TODO: properly handle returned status (gh-183)
roc_panic_if_msg(code != status::StatusNoData,
"delayed reader: failed to write fetched packets: status=%s",
status::code_to_str(code));
}
}

const stream_timestamp_t qs = queue_size_();
Expand Down
23 changes: 18 additions & 5 deletions src/internal_modules/roc_packet/interleaver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "roc_packet/interleaver.h"
#include "roc_core/fast_random.h"
#include "roc_core/log.h"
#include "roc_status/code_to_str.h"

namespace roc {
namespace packet {
Expand Down Expand Up @@ -46,27 +47,39 @@ bool Interleaver::is_valid() const {
return valid_;
}

void Interleaver::write(const PacketPtr& p) {
status::StatusCode Interleaver::write(const PacketPtr& p) {
roc_panic_if_not(is_valid());

packets_[next_2_put_] = p;
next_2_put_ = (next_2_put_ + 1) % block_size_;

while (packets_[send_seq_[next_2_send_]]) {
writer_.write(packets_[send_seq_[next_2_send_]]);
const status::StatusCode code = writer_.write(packets_[send_seq_[next_2_send_]]);
if (code != status::StatusOK) {
return code;
}

packets_[send_seq_[next_2_send_]] = NULL;
next_2_send_ = (next_2_send_ + 1) % block_size_;
}

return status::StatusOK;
}

void Interleaver::flush() {
roc_panic_if_not(is_valid());

for (size_t i = 0; i < block_size_; ++i) {
if (packets_[i]) {
writer_.write(packets_[i]);
packets_[i] = NULL;
if (!packets_[i]) {
continue;
}

const status::StatusCode code = writer_.write(packets_[i]);
roc_panic_if_msg(code != status::StatusOK,
"interleaver: failed to write packet: status=%s",
status::code_to_str(code));

packets_[i] = NULL;
}

next_2_put_ = next_2_send_ = 0;
Expand Down
2 changes: 1 addition & 1 deletion src/internal_modules/roc_packet/interleaver.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class Interleaver : public IWriter, public core::NonCopyable<> {
//! @remarks
//! Packets are written to internal buffer. Buffered packets are
//! then reordered and sent to output writer.
virtual void write(const PacketPtr& packet);
virtual ROC_ATTR_NODISCARD status::StatusCode write(const PacketPtr& packet);

//! Send all buffered packets to output writer.
void flush();
Expand Down
Loading

0 comments on commit 1e3d12b

Please sign in to comment.