Skip to content

Commit

Permalink
Fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
baranovmv committed Dec 12, 2024
1 parent 8fc59cd commit 99c8903
Show file tree
Hide file tree
Showing 18 changed files with 190 additions and 262 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,11 @@ NetworkLoop::NetworkLoop(core::IPool& packet_pool,
task_sem_.data = this;
task_sem_initialized_ = true;

enable_realtime();
if (!enable_realtime()) {
roc_log(LogInfo,
"network loop: can't set realtime priority of network thread. May need "
"to be root");
}
if (!(started_ = Thread::start())) {
init_status_ = status::StatusErrThread;
return;
Expand Down
3 changes: 3 additions & 0 deletions src/internal_modules/roc_node/receiver_decoder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ status::StatusCode ReceiverDecoder::write_packet(address::Interface iface,
roc_panic_if(!bytes);
roc_panic_if(n_bytes == 0);

const core::nanoseconds_t capture_ts = core::timestamp(core::ClockUnix);

if (n_bytes > packet_factory_.packet_buffer_size()) {
roc_log(LogError,
"receiver decoder node:"
Expand All @@ -195,6 +197,7 @@ status::StatusCode ReceiverDecoder::write_packet(address::Interface iface,
}

packet->add_flags(packet::Packet::FlagUDP);
packet->udp()->receive_timestamp = capture_ts;
packet->set_buffer(buffer);

packet::IWriter* writer = endpoint_writers_[iface];
Expand Down
3 changes: 3 additions & 0 deletions src/internal_modules/roc_node/sender_encoder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,8 @@ SenderEncoder::write_packet(address::Interface iface, const void* bytes, size_t
roc_panic_if(!bytes);
roc_panic_if(n_bytes == 0);

const core::nanoseconds_t capture_ts = core::timestamp(core::ClockUnix);

if (n_bytes > packet_factory_.packet_buffer_size()) {
roc_log(LogError,
"sender encoder node:"
Expand All @@ -252,6 +254,7 @@ SenderEncoder::write_packet(address::Interface iface, const void* bytes, size_t
}

packet->add_flags(packet::Packet::FlagUDP);
packet->udp()->receive_timestamp = capture_ts;
packet->set_buffer(buffer);

packet::IWriter* writer = endpoint_writers_[iface];
Expand Down
9 changes: 4 additions & 5 deletions src/internal_modules/roc_pipeline/receiver_endpoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ packet::IWriter& ReceiverEndpoint::inbound_writer() {
return *this;
}

status::StatusCode ReceiverEndpoint::pull_packets(core::nanoseconds_t current_time) {
status::StatusCode ReceiverEndpoint::pull_packets() {
roc_panic_if(init_status_ != status::StatusOK);

roc_panic_if(!parser_);
Expand All @@ -209,7 +209,7 @@ status::StatusCode ReceiverEndpoint::pull_packets(core::nanoseconds_t current_ti
// queue were added in a very short time or are being added currently. It's
// acceptable to consider such packets late and pull them next time.
while (packet::PacketPtr packet = inbound_queue_.try_pop_front_exclusive()) {
const status::StatusCode code = handle_packet_(packet, current_time);
const status::StatusCode code = handle_packet_(packet);
state_tracker_.unregister_packet();

if (code != status::StatusOK) {
Expand All @@ -220,14 +220,13 @@ status::StatusCode ReceiverEndpoint::pull_packets(core::nanoseconds_t current_ti
return status::StatusOK;
}

status::StatusCode ReceiverEndpoint::handle_packet_(const packet::PacketPtr& packet,
core::nanoseconds_t current_time) {
status::StatusCode ReceiverEndpoint::handle_packet_(const packet::PacketPtr& packet) {
if (!parser_->parse(*packet, packet->buffer())) {
roc_log(LogDebug, "receiver endpoint: dropping bad packet: can't parse");
return status::StatusOK;
}

const status::StatusCode code = session_group_.route_packet(packet, current_time);
const status::StatusCode code = session_group_.route_packet(packet);

if (code == status::StatusNoRoute) {
roc_log(LogDebug, "receiver endpoint: dropping bad packet: can't route");
Expand Down
5 changes: 2 additions & 3 deletions src/internal_modules/roc_pipeline/receiver_endpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,12 @@ class ReceiverEndpoint : public core::RefCounted<ReceiverEndpoint, core::ArenaAl
//! Packets are written to inbound_writer() from network thread.
//! They don't appear in pipeline immediately. Instead, pipeline thread
//! should periodically call pull_packets() to make them available.
ROC_ATTR_NODISCARD status::StatusCode pull_packets(core::nanoseconds_t current_time);
ROC_ATTR_NODISCARD status::StatusCode pull_packets();

private:
virtual ROC_ATTR_NODISCARD status::StatusCode write(const packet::PacketPtr& packet);

status::StatusCode handle_packet_(const packet::PacketPtr& packet,
core::nanoseconds_t current_time);
status::StatusCode handle_packet_(const packet::PacketPtr& packet);

const address::Protocol proto_;

Expand Down
10 changes: 4 additions & 6 deletions src/internal_modules/roc_pipeline/receiver_session_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,11 @@ void ReceiverSessionGroup::reclock_sessions(core::nanoseconds_t playback_time) {
}
}

status::StatusCode ReceiverSessionGroup::route_packet(const packet::PacketPtr& packet,
core::nanoseconds_t current_time) {
status::StatusCode ReceiverSessionGroup::route_packet(const packet::PacketPtr& packet) {
roc_panic_if(init_status_ != status::StatusOK);

if (packet->has_flags(packet::Packet::FlagControl)) {
return route_control_packet_(packet, current_time);
return route_control_packet_(packet);
}

return route_transport_packet_(packet);
Expand Down Expand Up @@ -344,15 +343,14 @@ ReceiverSessionGroup::route_transport_packet_(const packet::PacketPtr& packet) {
}

status::StatusCode
ReceiverSessionGroup::route_control_packet_(const packet::PacketPtr& packet,
core::nanoseconds_t current_time) {
ReceiverSessionGroup::route_control_packet_(const packet::PacketPtr& packet) {
if (!rtcp_communicator_) {
roc_panic("session group: rtcp communicator is null");
}

// This will invoke IParticipant methods implemented by us,
// in particular notify_recv_stream() and maybe halt_recv_stream().
return rtcp_communicator_->process_packet(packet, current_time);
return rtcp_communicator_->process_packet(packet);
}

bool ReceiverSessionGroup::can_create_session_(const packet::PacketPtr& packet) {
Expand Down
6 changes: 2 additions & 4 deletions src/internal_modules/roc_pipeline/receiver_session_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,7 @@ class ReceiverSessionGroup : public core::NonCopyable<>, private rtcp::IParticip
void reclock_sessions(core::nanoseconds_t playback_time);

//! Route packet to session.
ROC_ATTR_NODISCARD status::StatusCode route_packet(const packet::PacketPtr& packet,
core::nanoseconds_t current_time);
ROC_ATTR_NODISCARD status::StatusCode route_packet(const packet::PacketPtr& packet);

//! Get number of sessions in group.
size_t num_sessions() const;
Expand Down Expand Up @@ -130,8 +129,7 @@ class ReceiverSessionGroup : public core::NonCopyable<>, private rtcp::IParticip
virtual void halt_recv_stream(packet::stream_source_t send_source_id);

status::StatusCode route_transport_packet_(const packet::PacketPtr& packet);
status::StatusCode route_control_packet_(const packet::PacketPtr& packet,
core::nanoseconds_t current_time);
status::StatusCode route_control_packet_(const packet::PacketPtr& packet);

bool can_create_session_(const packet::PacketPtr& packet);

Expand Down
6 changes: 3 additions & 3 deletions src/internal_modules/roc_pipeline/receiver_slot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,19 +84,19 @@ status::StatusCode ReceiverSlot::refresh(core::nanoseconds_t current_time,
status::StatusCode code = status::NoStatus;

if (source_endpoint_) {
if ((code = source_endpoint_->pull_packets(current_time)) != status::StatusOK) {
if ((code = source_endpoint_->pull_packets()) != status::StatusOK) {
return code;
}
}

if (repair_endpoint_) {
if ((code = repair_endpoint_->pull_packets(current_time)) != status::StatusOK) {
if ((code = repair_endpoint_->pull_packets()) != status::StatusOK) {
return code;
}
}

if (control_endpoint_) {
if ((code = control_endpoint_->pull_packets(current_time)) != status::StatusOK) {
if ((code = control_endpoint_->pull_packets()) != status::StatusOK) {
return code;
}
}
Expand Down
9 changes: 4 additions & 5 deletions src/internal_modules/roc_pipeline/sender_endpoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ packet::IWriter* SenderEndpoint::inbound_writer() {
return this;
}

status::StatusCode SenderEndpoint::pull_packets(core::nanoseconds_t current_time) {
status::StatusCode SenderEndpoint::pull_packets() {
roc_panic_if(init_status_ != status::StatusOK);

if (!parser_) {
Expand All @@ -197,7 +197,7 @@ status::StatusCode SenderEndpoint::pull_packets(core::nanoseconds_t current_time
// queue were added in a very short time or are being added currently. It's
// acceptable to consider such packets late and pull them next time.
while (packet::PacketPtr packet = inbound_queue_.try_pop_front_exclusive()) {
const status::StatusCode code = handle_packet_(packet, current_time);
const status::StatusCode code = handle_packet_(packet);
state_tracker_.unregister_packet();

if (code != status::StatusOK) {
Expand All @@ -208,14 +208,13 @@ status::StatusCode SenderEndpoint::pull_packets(core::nanoseconds_t current_time
return status::StatusOK;
}

status::StatusCode SenderEndpoint::handle_packet_(const packet::PacketPtr& packet,
core::nanoseconds_t current_time) {
status::StatusCode SenderEndpoint::handle_packet_(const packet::PacketPtr& packet) {
if (!parser_->parse(*packet, packet->buffer())) {
roc_log(LogDebug, "sender endpoint: dropping bad packet: can't parse");
return status::StatusOK;
}

const status::StatusCode code = sender_session_.route_packet(packet, current_time);
const status::StatusCode code = sender_session_.route_packet(packet);

if (code == status::StatusNoRoute) {
roc_log(LogDebug, "sender endpoint: dropping bad packet: can't route");
Expand Down
5 changes: 2 additions & 3 deletions src/internal_modules/roc_pipeline/sender_endpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,12 @@ class SenderEndpoint : public core::NonCopyable<>, private packet::IWriter {
//! Packets are written to inbound_writer() from network thread.
//! They don't appear in pipeline immediately. Instead, pipeline thread
//! should periodically call pull_packets() to make them available.
ROC_ATTR_NODISCARD status::StatusCode pull_packets(core::nanoseconds_t current_time);
ROC_ATTR_NODISCARD status::StatusCode pull_packets();

private:
virtual ROC_ATTR_NODISCARD status::StatusCode write(const packet::PacketPtr& packet);

status::StatusCode handle_packet_(const packet::PacketPtr& packet,
core::nanoseconds_t current_time);
status::StatusCode handle_packet_(const packet::PacketPtr& packet);

const address::Protocol proto_;

Expand Down
11 changes: 4 additions & 7 deletions src/internal_modules/roc_pipeline/sender_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -281,8 +281,7 @@ status::StatusCode SenderSession::refresh(core::nanoseconds_t current_time,
return status::StatusOK;
}

status::StatusCode SenderSession::route_packet(const packet::PacketPtr& packet,
core::nanoseconds_t current_time) {
status::StatusCode SenderSession::route_packet(const packet::PacketPtr& packet) {
roc_panic_if(init_status_ != status::StatusOK);

if (fail_status_ != status::NoStatus) {
Expand All @@ -294,7 +293,7 @@ status::StatusCode SenderSession::route_packet(const packet::PacketPtr& packet,
roc_panic("sender session: unexpected non-control packet");
}

return route_control_packet_(packet, current_time);
return route_control_packet_(packet);
}

status::StatusCode SenderSession::write(audio::Frame& frame) {
Expand Down Expand Up @@ -439,15 +438,13 @@ void SenderSession::start_feedback_monitor_() {
feedback_monitor_->start();
}

status::StatusCode
SenderSession::route_control_packet_(const packet::PacketPtr& packet,
core::nanoseconds_t current_time) {
status::StatusCode SenderSession::route_control_packet_(const packet::PacketPtr& packet) {
if (!rtcp_communicator_) {
roc_panic("sender session: rtcp communicator is null");
}

// This will invoke IParticipant methods implemented by us.
return rtcp_communicator_->process_packet(packet, current_time);
return rtcp_communicator_->process_packet(packet);
}

} // namespace pipeline
Expand Down
6 changes: 2 additions & 4 deletions src/internal_modules/roc_pipeline/sender_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,7 @@ class SenderSession : public core::NonCopyable<>,
//! This way feedback packets from receiver reach sender pipeline.
//! Packets are stored inside internal pipeline queues, and then fetched
//! when frame are passed from frame_writer().
ROC_ATTR_NODISCARD status::StatusCode route_packet(const packet::PacketPtr& packet,
core::nanoseconds_t current_time);
ROC_ATTR_NODISCARD status::StatusCode route_packet(const packet::PacketPtr& packet);

//! Get slot metrics.
//! @remarks
Expand Down Expand Up @@ -133,8 +132,7 @@ class SenderSession : public core::NonCopyable<>,

void start_feedback_monitor_();

status::StatusCode route_control_packet_(const packet::PacketPtr& packet,
core::nanoseconds_t current_time);
status::StatusCode route_control_packet_(const packet::PacketPtr& packet);

core::IArena& arena_;

Expand Down
6 changes: 3 additions & 3 deletions src/internal_modules/roc_pipeline/sender_slot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,19 +142,19 @@ status::StatusCode SenderSlot::refresh(core::nanoseconds_t current_time,
status::StatusCode code = status::NoStatus;

if (source_endpoint_) {
if ((code = source_endpoint_->pull_packets(current_time)) != status::StatusOK) {
if ((code = source_endpoint_->pull_packets()) != status::StatusOK) {
return code;
}
}

if (repair_endpoint_) {
if ((code = repair_endpoint_->pull_packets(current_time)) != status::StatusOK) {
if ((code = repair_endpoint_->pull_packets()) != status::StatusOK) {
return code;
}
}

if (control_endpoint_) {
if ((code = control_endpoint_->pull_packets(current_time)) != status::StatusOK) {
if ((code = control_endpoint_->pull_packets()) != status::StatusOK) {
return code;
}
}
Expand Down
3 changes: 1 addition & 2 deletions src/internal_modules/roc_rtcp/communicator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,7 @@ size_t Communicator::total_streams() const {
return reporter_.total_streams();
}

status::StatusCode Communicator::process_packet(const packet::PacketPtr& packet,
core::nanoseconds_t current_time) {
status::StatusCode Communicator::process_packet(const packet::PacketPtr& packet) {
roc_panic_if(init_status_ != status::StatusOK);

roc_panic_if_msg(!packet, "rtcp communicator: null packet");
Expand Down
3 changes: 1 addition & 2 deletions src/internal_modules/roc_rtcp/communicator.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,7 @@ class Communicator : public core::NonCopyable<> {

//! Parse and process incoming packet.
//! Invokes IParticipant methods during processing.
ROC_ATTR_NODISCARD status::StatusCode
process_packet(const packet::PacketPtr& packet, core::nanoseconds_t current_time);
ROC_ATTR_NODISCARD status::StatusCode process_packet(const packet::PacketPtr& packet);

//! When we should generate packets next time.
//! Returns absolute time.
Expand Down
1 change: 1 addition & 0 deletions src/tests/roc_pipeline/test_helpers/control_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ class ControlWriter : public core::NonCopyable<> {

pp->udp()->src_addr = src_addr_;
pp->udp()->dst_addr = dst_addr_;
pp->udp()->receive_timestamp = core::timestamp(core::ClockUnix);

pp->set_buffer(buffer);

Expand Down
9 changes: 9 additions & 0 deletions src/tests/roc_pipeline/test_loopback_sink_2_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ class PacketProxy : core::NonCopyable<> {
continue;
}
print_packet_(pp);
set_ts_(pp);
CHECK(source_writer_);
LONGS_EQUAL(status::StatusOK, source_writer_->write(copy_packet_(pp)));
n_source_++;
Expand All @@ -187,11 +188,13 @@ class PacketProxy : core::NonCopyable<> {
continue;
}
print_packet_(pp);
set_ts_(pp);
CHECK(repair_writer_);
LONGS_EQUAL(status::StatusOK, repair_writer_->write(copy_packet_(pp)));
n_repair_++;
} else if (pp->flags() & packet::Packet::FlagControl) {
print_packet_(pp);
set_ts_(pp);
CHECK(control_writer_);
LONGS_EQUAL(status::StatusOK, control_writer_->write(copy_packet_(pp)));
n_control_++;
Expand Down Expand Up @@ -227,6 +230,12 @@ class PacketProxy : core::NonCopyable<> {
}
}

void set_ts_(packet::PacketPtr& pp) {
if (pp->udp()) {
pp->udp()->receive_timestamp = core::timestamp(core::ClockUnix);
}
}

packet::PacketFactory& packet_factory_;

address::SocketAddr proxy_addr_;
Expand Down
Loading

0 comments on commit 99c8903

Please sign in to comment.