Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
dshil committed Oct 11, 2023
1 parent 98a77a2 commit 72ff091
Show file tree
Hide file tree
Showing 41 changed files with 637 additions and 186 deletions.
6 changes: 5 additions & 1 deletion src/internal_modules/roc_audio/packetizer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "roc_core/fast_random.h"
#include "roc_core/log.h"
#include "roc_core/panic.h"
#include "roc_status/code_to_str.h"

namespace roc {
namespace audio {
Expand Down Expand Up @@ -125,7 +126,10 @@ void Packetizer::end_packet_() {
pad_packet_();
}

writer_.write(packet_);
const status::StatusCode code = writer_.write(packet_);
roc_panic_if_msg(code != status::StatusOK,
"packetizer: failed to write packet: status=%s",
status::code_to_str(code));

seqnum_++;
stream_ts_ += (packet::stream_timestamp_t)packet_pos_;
Expand Down
7 changes: 5 additions & 2 deletions src/internal_modules/roc_fec/reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ status::StatusCode Reader::fetch_packets_(packet::IReader& reader,
for (;;) {
packet::PacketPtr pp;

const status::StatusCode code = reader.read(pp);
status::StatusCode code = reader.read(pp);
if (code != status::StatusOK) {
if (code == status::StatusNoData) {
break;
Expand All @@ -300,7 +300,10 @@ status::StatusCode Reader::fetch_packets_(packet::IReader& reader,
break;
}

writer.write(pp);
code = writer.write(pp);
if (code != status::StatusOK) {
return code;
}
}

return status::StatusOK;
Expand Down
36 changes: 26 additions & 10 deletions src/internal_modules/roc_fec/writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
#include "roc_core/log.h"
#include "roc_core/panic.h"
#include "roc_packet/fec_scheme_to_str.h"
#include "roc_status/code_to_str.h"
#include "roc_status/status_code.h"

namespace roc {
namespace fec {
Expand Down Expand Up @@ -92,12 +94,13 @@ bool Writer::resize(size_t sblen, size_t rblen) {
return true;
}

void Writer::write(const packet::PacketPtr& pp) {
status::StatusCode Writer::write(const packet::PacketPtr& pp) {
roc_panic_if_not(is_valid());
roc_panic_if_not(pp);

if (!alive_) {
return;
// TODO: return StatusDead (gh-183)
return status::StatusNoData;
}

validate_fec_packet_(pp);
Expand All @@ -108,22 +111,29 @@ void Writer::write(const packet::PacketPtr& pp) {

if (cur_packet_ == 0) {
if (!begin_block_(pp)) {
return;
// TODO: return StatusDead (gh-183)
return status::StatusNoData;
}
}

if (!validate_source_packet_(pp)) {
return;
// TODO: return StatusDead (gh-183)
return status::StatusNoData;
}

write_source_packet_(pp);
const status::StatusCode code = write_source_packet_(pp);
roc_panic_if_msg(code != status::StatusOK,
"fec writer: failed to write source packet: status=%s",
status::code_to_str(code));

cur_packet_++;

if (cur_packet_ == cur_sblen_) {
end_block_();
next_block_();
}

return status::StatusOK;
}

bool Writer::begin_block_(const packet::PacketPtr& pp) {
Expand Down Expand Up @@ -185,7 +195,7 @@ bool Writer::apply_sizes_(size_t sblen, size_t rblen, size_t payload_size) {
return true;
}

void Writer::write_source_packet_(const packet::PacketPtr& pp) {
status::StatusCode Writer::write_source_packet_(const packet::PacketPtr& pp) {
encoder_.set(cur_packet_, pp->fec()->payload);

pp->add_flags(packet::Packet::FlagComposed);
Expand All @@ -195,7 +205,7 @@ void Writer::write_source_packet_(const packet::PacketPtr& pp) {
roc_panic("fec writer: can't compose source packet");
}

writer_.write(pp);
return writer_.write(pp);
}

void Writer::make_repair_packets_() {
Expand Down Expand Up @@ -272,10 +282,16 @@ void Writer::compose_repair_packets_() {
void Writer::write_repair_packets_() {
for (size_t i = 0; i < cur_rblen_; i++) {
packet::PacketPtr rp = repair_block_[i];
if (rp) {
writer_.write(repair_block_[i]);
repair_block_[i] = NULL;
if (!rp) {
continue;
}

const status::StatusCode code = writer_.write(repair_block_[i]);
roc_panic_if_msg(code != status::StatusOK,
"fec writer: failed to write repair packet: status=%s",
status::code_to_str(code));

repair_block_[i] = NULL;
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/internal_modules/roc_fec/writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class Writer : public packet::IWriter, public core::NonCopyable<> {
//! @remarks
//! - writes the given source packet to the output writer
//! - generates repair packets and also writes them to the output writer
virtual void write(const packet::PacketPtr&);
virtual status::StatusCode write(const packet::PacketPtr&);

private:
bool begin_block_(const packet::PacketPtr& pp);
Expand All @@ -86,7 +86,7 @@ class Writer : public packet::IWriter, public core::NonCopyable<> {

bool apply_sizes_(size_t sblen, size_t rblen, size_t payload_size);

void write_source_packet_(const packet::PacketPtr&);
status::StatusCode write_source_packet_(const packet::PacketPtr&);
void make_repair_packets_();
packet::PacketPtr make_repair_packet_(packet::seqnum_t n);
void encode_repair_packets_();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "roc_core/panic.h"
#include "roc_core/shared_ptr.h"
#include "roc_core/string_builder.h"
#include "roc_status/code_to_str.h"

namespace roc {
namespace netio {
Expand Down Expand Up @@ -289,7 +290,10 @@ void UdpReceiverPort::recv_cb_(uv_udp_t* handle,

pp->set_data(core::Slice<uint8_t>(*bp, 0, (size_t)nread));

self.writer_.write(pp);
const status::StatusCode code = self.writer_.write(pp);
roc_panic_if_msg(code != status::StatusOK,
"udp receiver: failed to write packet: status=%s",
status::code_to_str(code));
}

bool UdpReceiverPort::join_multicast_group_() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ AsyncOperationStatus UdpSenderPort::async_close(ICloseHandler& handler,
return AsyncOp_Started;
}

void UdpSenderPort::write(const packet::PacketPtr& pp) {
status::StatusCode UdpSenderPort::write(const packet::PacketPtr& pp) {
if (!pp) {
roc_panic("udp sender: %s: unexpected null packet", descriptor());
}
Expand All @@ -174,6 +174,8 @@ void UdpSenderPort::write(const packet::PacketPtr& pp) {
write_(pp);

report_stats_();

return status::StatusOK;
}

void UdpSenderPort::write_(const packet::PacketPtr& pp) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class UdpSenderPort : public BasicPort, public packet::IWriter {
//! Write packet.
//! @remarks
//! May be called from any thread.
virtual void write(const packet::PacketPtr&);
virtual status::StatusCode write(const packet::PacketPtr&);

protected:
//! Format descriptor.
Expand Down
10 changes: 6 additions & 4 deletions src/internal_modules/roc_node/receiver_decoder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "roc_address/interface.h"
#include "roc_core/log.h"
#include "roc_core/panic.h"
#include "roc_status/code_to_str.h"

namespace roc {
namespace node {
Expand Down Expand Up @@ -134,7 +135,8 @@ bool ReceiverDecoder::get_metrics(pipeline::ReceiverSlotMetrics& slot_metrics,
return true;
}

bool ReceiverDecoder::write(address::Interface iface, const packet::PacketPtr& packet) {
status::StatusCode ReceiverDecoder::write(address::Interface iface,
const packet::PacketPtr& packet) {
roc_panic_if_not(is_valid());

roc_panic_if(iface < 0);
Expand All @@ -146,11 +148,11 @@ bool ReceiverDecoder::write(address::Interface iface, const packet::PacketPtr& p
"receiver decoder node:"
" can't write to %s interface: interface not activated",
address::interface_to_str(iface));
return false;
// TODO: return StatusInvalidArg (gh-183)
return status::StatusNoData;
}

writer->write(packet);
return true;
return writer->write(packet);
}

sndio::ISource& ReceiverDecoder::source() {
Expand Down
5 changes: 4 additions & 1 deletion src/internal_modules/roc_node/receiver_decoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@

#include "roc_address/interface.h"
#include "roc_address/protocol.h"
#include "roc_core/attributes.h"
#include "roc_core/mutex.h"
#include "roc_node/context.h"
#include "roc_node/node.h"
#include "roc_pipeline/ipipeline_task_scheduler.h"
#include "roc_pipeline/receiver_loop.h"
#include "roc_status/status_code.h"

namespace roc {
namespace node {
Expand Down Expand Up @@ -54,7 +56,8 @@ class ReceiverDecoder : public Node, private pipeline::IPipelineTaskScheduler {
void* sess_metrics_arg);

//! Write packet for decoding.
bool write(address::Interface iface, const packet::PacketPtr& packet);
ROC_ATTR_NODISCARD status::StatusCode write(address::Interface iface,
const packet::PacketPtr& packet);

//! Source for reading decoded frames.
sndio::ISource& source();
Expand Down
4 changes: 3 additions & 1 deletion src/internal_modules/roc_packet/concurrent_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ status::StatusCode ConcurrentQueue::read(PacketPtr& ptr) {
return status::StatusOK;
}

void ConcurrentQueue::write(const PacketPtr& packet) {
status::StatusCode ConcurrentQueue::write(const PacketPtr& packet) {
if (!packet) {
roc_panic("concurrent queue: packet is null");
}
Expand All @@ -44,6 +44,8 @@ void ConcurrentQueue::write(const PacketPtr& packet) {
if (write_sem_) {
write_sem_->post();
}

return status::StatusOK;
}

} // namespace packet
Expand Down
2 changes: 1 addition & 1 deletion src/internal_modules/roc_packet/concurrent_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class ConcurrentQueue : public IReader, public IWriter, public core::NonCopyable

//! Add packet to the queue.
//! Wait-free operation.
virtual void write(const PacketPtr& packet);
virtual status::StatusCode write(const PacketPtr& packet);

private:
core::Optional<core::Semaphore> write_sem_;
Expand Down
23 changes: 18 additions & 5 deletions src/internal_modules/roc_packet/interleaver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "roc_packet/interleaver.h"
#include "roc_core/fast_random.h"
#include "roc_core/log.h"
#include "roc_status/code_to_str.h"

namespace roc {
namespace packet {
Expand Down Expand Up @@ -46,27 +47,39 @@ bool Interleaver::is_valid() const {
return valid_;
}

void Interleaver::write(const PacketPtr& p) {
status::StatusCode Interleaver::write(const PacketPtr& p) {
roc_panic_if_not(is_valid());

packets_[next_2_put_] = p;
next_2_put_ = (next_2_put_ + 1) % block_size_;

while (packets_[send_seq_[next_2_send_]]) {
writer_.write(packets_[send_seq_[next_2_send_]]);
const status::StatusCode code = writer_.write(packets_[send_seq_[next_2_send_]]);
if (code != status::StatusOK) {
return code;
}

packets_[send_seq_[next_2_send_]] = NULL;
next_2_send_ = (next_2_send_ + 1) % block_size_;
}

return status::StatusOK;
}

void Interleaver::flush() {
roc_panic_if_not(is_valid());

for (size_t i = 0; i < block_size_; ++i) {
if (packets_[i]) {
writer_.write(packets_[i]);
packets_[i] = NULL;
if (!packets_[i]) {
continue;
}

const status::StatusCode code = writer_.write(packets_[i]);
roc_panic_if_msg(code != status::StatusOK,
"interleaver: failed to write packet: status=%s",
status::code_to_str(code));

packets_[i] = NULL;
}

next_2_put_ = next_2_send_ = 0;
Expand Down
2 changes: 1 addition & 1 deletion src/internal_modules/roc_packet/interleaver.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class Interleaver : public IWriter, public core::NonCopyable<> {
//! @remarks
//! Packets are written to internal buffer. Buffered packets are
//! then reordered and sent to output writer.
virtual void write(const PacketPtr& packet);
virtual status::StatusCode write(const PacketPtr& packet);

//! Send all buffered packets to output writer.
void flush();
Expand Down
10 changes: 9 additions & 1 deletion src/internal_modules/roc_packet/iwriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
#ifndef ROC_PACKET_IWRITER_H_
#define ROC_PACKET_IWRITER_H_

#include "roc_core/attributes.h"
#include "roc_packet/packet.h"
#include "roc_status/status_code.h"

namespace roc {
namespace packet {
Expand All @@ -23,7 +25,13 @@ class IWriter {
virtual ~IWriter();

//! Write packet.
virtual void write(const PacketPtr&) = 0;
//!
//! @returns
//! - If a returned code is not status::StatusOK, a packet is never written;
//! - If a packet is written, a returned code is always status::StatusOK.
//!
//! @see status::StatusCode.
virtual ROC_ATTR_NODISCARD status::StatusCode write(const PacketPtr&) = 0;
};

} // namespace packet
Expand Down
3 changes: 2 additions & 1 deletion src/internal_modules/roc_packet/queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ status::StatusCode Queue::read(PacketPtr& packet) {
return status::StatusOK;
}

void Queue::write(const PacketPtr& packet) {
status::StatusCode Queue::write(const PacketPtr& packet) {
if (!packet) {
roc_panic("queue: null packet");
}
list_.push_back(*packet);
return status::StatusOK;
}

size_t Queue::size() const {
Expand Down
2 changes: 1 addition & 1 deletion src/internal_modules/roc_packet/queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class Queue : public IReader, public IWriter, public core::NonCopyable<> {
//! Add packet to the queue.
//! @remarks
//! Adds packet to the end of the queue.
virtual void write(const PacketPtr& packet);
virtual status::StatusCode write(const PacketPtr& packet);

//! Get number of packets in queue.
size_t size() const;
Expand Down
Loading

0 comments on commit 72ff091

Please sign in to comment.