Skip to content

Commit

Permalink
Fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
baranovmv committed Dec 12, 2024
1 parent 8fc59cd commit c92d155
Show file tree
Hide file tree
Showing 15 changed files with 179 additions and 261 deletions.
9 changes: 4 additions & 5 deletions src/internal_modules/roc_pipeline/receiver_endpoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ packet::IWriter& ReceiverEndpoint::inbound_writer() {
return *this;
}

status::StatusCode ReceiverEndpoint::pull_packets(core::nanoseconds_t current_time) {
status::StatusCode ReceiverEndpoint::pull_packets() {
roc_panic_if(init_status_ != status::StatusOK);

roc_panic_if(!parser_);
Expand All @@ -209,7 +209,7 @@ status::StatusCode ReceiverEndpoint::pull_packets(core::nanoseconds_t current_ti
// queue were added in a very short time or are being added currently. It's
// acceptable to consider such packets late and pull them next time.
while (packet::PacketPtr packet = inbound_queue_.try_pop_front_exclusive()) {
const status::StatusCode code = handle_packet_(packet, current_time);
const status::StatusCode code = handle_packet_(packet);
state_tracker_.unregister_packet();

if (code != status::StatusOK) {
Expand All @@ -220,14 +220,13 @@ status::StatusCode ReceiverEndpoint::pull_packets(core::nanoseconds_t current_ti
return status::StatusOK;
}

status::StatusCode ReceiverEndpoint::handle_packet_(const packet::PacketPtr& packet,
core::nanoseconds_t current_time) {
status::StatusCode ReceiverEndpoint::handle_packet_(const packet::PacketPtr& packet) {
if (!parser_->parse(*packet, packet->buffer())) {
roc_log(LogDebug, "receiver endpoint: dropping bad packet: can't parse");
return status::StatusOK;
}

const status::StatusCode code = session_group_.route_packet(packet, current_time);
const status::StatusCode code = session_group_.route_packet(packet);

if (code == status::StatusNoRoute) {
roc_log(LogDebug, "receiver endpoint: dropping bad packet: can't route");
Expand Down
5 changes: 2 additions & 3 deletions src/internal_modules/roc_pipeline/receiver_endpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,12 @@ class ReceiverEndpoint : public core::RefCounted<ReceiverEndpoint, core::ArenaAl
//! Packets are written to inbound_writer() from network thread.
//! They don't appear in pipeline immediately. Instead, pipeline thread
//! should periodically call pull_packets() to make them available.
ROC_ATTR_NODISCARD status::StatusCode pull_packets(core::nanoseconds_t current_time);
ROC_ATTR_NODISCARD status::StatusCode pull_packets();

private:
virtual ROC_ATTR_NODISCARD status::StatusCode write(const packet::PacketPtr& packet);

status::StatusCode handle_packet_(const packet::PacketPtr& packet,
core::nanoseconds_t current_time);
status::StatusCode handle_packet_(const packet::PacketPtr& packet);

const address::Protocol proto_;

Expand Down
10 changes: 4 additions & 6 deletions src/internal_modules/roc_pipeline/receiver_session_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,11 @@ void ReceiverSessionGroup::reclock_sessions(core::nanoseconds_t playback_time) {
}
}

status::StatusCode ReceiverSessionGroup::route_packet(const packet::PacketPtr& packet,
core::nanoseconds_t current_time) {
status::StatusCode ReceiverSessionGroup::route_packet(const packet::PacketPtr& packet) {
roc_panic_if(init_status_ != status::StatusOK);

if (packet->has_flags(packet::Packet::FlagControl)) {
return route_control_packet_(packet, current_time);
return route_control_packet_(packet);
}

return route_transport_packet_(packet);
Expand Down Expand Up @@ -344,15 +343,14 @@ ReceiverSessionGroup::route_transport_packet_(const packet::PacketPtr& packet) {
}

status::StatusCode
ReceiverSessionGroup::route_control_packet_(const packet::PacketPtr& packet,
core::nanoseconds_t current_time) {
ReceiverSessionGroup::route_control_packet_(const packet::PacketPtr& packet) {
if (!rtcp_communicator_) {
roc_panic("session group: rtcp communicator is null");
}

// This will invoke IParticipant methods implemented by us,
// in particular notify_recv_stream() and maybe halt_recv_stream().
return rtcp_communicator_->process_packet(packet, current_time);
return rtcp_communicator_->process_packet(packet);
}

bool ReceiverSessionGroup::can_create_session_(const packet::PacketPtr& packet) {
Expand Down
6 changes: 2 additions & 4 deletions src/internal_modules/roc_pipeline/receiver_session_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,7 @@ class ReceiverSessionGroup : public core::NonCopyable<>, private rtcp::IParticip
void reclock_sessions(core::nanoseconds_t playback_time);

//! Route packet to session.
ROC_ATTR_NODISCARD status::StatusCode route_packet(const packet::PacketPtr& packet,
core::nanoseconds_t current_time);
ROC_ATTR_NODISCARD status::StatusCode route_packet(const packet::PacketPtr& packet);

//! Get number of sessions in group.
size_t num_sessions() const;
Expand Down Expand Up @@ -130,8 +129,7 @@ class ReceiverSessionGroup : public core::NonCopyable<>, private rtcp::IParticip
virtual void halt_recv_stream(packet::stream_source_t send_source_id);

status::StatusCode route_transport_packet_(const packet::PacketPtr& packet);
status::StatusCode route_control_packet_(const packet::PacketPtr& packet,
core::nanoseconds_t current_time);
status::StatusCode route_control_packet_(const packet::PacketPtr& packet);

bool can_create_session_(const packet::PacketPtr& packet);

Expand Down
6 changes: 3 additions & 3 deletions src/internal_modules/roc_pipeline/receiver_slot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,19 +84,19 @@ status::StatusCode ReceiverSlot::refresh(core::nanoseconds_t current_time,
status::StatusCode code = status::NoStatus;

if (source_endpoint_) {
if ((code = source_endpoint_->pull_packets(current_time)) != status::StatusOK) {
if ((code = source_endpoint_->pull_packets()) != status::StatusOK) {
return code;
}
}

if (repair_endpoint_) {
if ((code = repair_endpoint_->pull_packets(current_time)) != status::StatusOK) {
if ((code = repair_endpoint_->pull_packets()) != status::StatusOK) {
return code;
}
}

if (control_endpoint_) {
if ((code = control_endpoint_->pull_packets(current_time)) != status::StatusOK) {
if ((code = control_endpoint_->pull_packets()) != status::StatusOK) {
return code;
}
}
Expand Down
9 changes: 4 additions & 5 deletions src/internal_modules/roc_pipeline/sender_endpoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ packet::IWriter* SenderEndpoint::inbound_writer() {
return this;
}

status::StatusCode SenderEndpoint::pull_packets(core::nanoseconds_t current_time) {
status::StatusCode SenderEndpoint::pull_packets() {
roc_panic_if(init_status_ != status::StatusOK);

if (!parser_) {
Expand All @@ -197,7 +197,7 @@ status::StatusCode SenderEndpoint::pull_packets(core::nanoseconds_t current_time
// queue were added in a very short time or are being added currently. It's
// acceptable to consider such packets late and pull them next time.
while (packet::PacketPtr packet = inbound_queue_.try_pop_front_exclusive()) {
const status::StatusCode code = handle_packet_(packet, current_time);
const status::StatusCode code = handle_packet_(packet);
state_tracker_.unregister_packet();

if (code != status::StatusOK) {
Expand All @@ -208,14 +208,13 @@ status::StatusCode SenderEndpoint::pull_packets(core::nanoseconds_t current_time
return status::StatusOK;
}

status::StatusCode SenderEndpoint::handle_packet_(const packet::PacketPtr& packet,
core::nanoseconds_t current_time) {
status::StatusCode SenderEndpoint::handle_packet_(const packet::PacketPtr& packet) {
if (!parser_->parse(*packet, packet->buffer())) {
roc_log(LogDebug, "sender endpoint: dropping bad packet: can't parse");
return status::StatusOK;
}

const status::StatusCode code = sender_session_.route_packet(packet, current_time);
const status::StatusCode code = sender_session_.route_packet(packet);

if (code == status::StatusNoRoute) {
roc_log(LogDebug, "sender endpoint: dropping bad packet: can't route");
Expand Down
5 changes: 2 additions & 3 deletions src/internal_modules/roc_pipeline/sender_endpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,12 @@ class SenderEndpoint : public core::NonCopyable<>, private packet::IWriter {
//! Packets are written to inbound_writer() from network thread.
//! They don't appear in pipeline immediately. Instead, pipeline thread
//! should periodically call pull_packets() to make them available.
ROC_ATTR_NODISCARD status::StatusCode pull_packets(core::nanoseconds_t current_time);
ROC_ATTR_NODISCARD status::StatusCode pull_packets();

private:
virtual ROC_ATTR_NODISCARD status::StatusCode write(const packet::PacketPtr& packet);

status::StatusCode handle_packet_(const packet::PacketPtr& packet,
core::nanoseconds_t current_time);
status::StatusCode handle_packet_(const packet::PacketPtr& packet);

const address::Protocol proto_;

Expand Down
11 changes: 4 additions & 7 deletions src/internal_modules/roc_pipeline/sender_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -281,8 +281,7 @@ status::StatusCode SenderSession::refresh(core::nanoseconds_t current_time,
return status::StatusOK;
}

status::StatusCode SenderSession::route_packet(const packet::PacketPtr& packet,
core::nanoseconds_t current_time) {
status::StatusCode SenderSession::route_packet(const packet::PacketPtr& packet) {
roc_panic_if(init_status_ != status::StatusOK);

if (fail_status_ != status::NoStatus) {
Expand All @@ -294,7 +293,7 @@ status::StatusCode SenderSession::route_packet(const packet::PacketPtr& packet,
roc_panic("sender session: unexpected non-control packet");
}

return route_control_packet_(packet, current_time);
return route_control_packet_(packet);
}

status::StatusCode SenderSession::write(audio::Frame& frame) {
Expand Down Expand Up @@ -439,15 +438,13 @@ void SenderSession::start_feedback_monitor_() {
feedback_monitor_->start();
}

status::StatusCode
SenderSession::route_control_packet_(const packet::PacketPtr& packet,
core::nanoseconds_t current_time) {
status::StatusCode SenderSession::route_control_packet_(const packet::PacketPtr& packet) {
if (!rtcp_communicator_) {
roc_panic("sender session: rtcp communicator is null");
}

// This will invoke IParticipant methods implemented by us.
return rtcp_communicator_->process_packet(packet, current_time);
return rtcp_communicator_->process_packet(packet);
}

} // namespace pipeline
Expand Down
6 changes: 2 additions & 4 deletions src/internal_modules/roc_pipeline/sender_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,7 @@ class SenderSession : public core::NonCopyable<>,
//! This way feedback packets from receiver reach sender pipeline.
//! Packets are stored inside internal pipeline queues, and then fetched
//! when frame are passed from frame_writer().
ROC_ATTR_NODISCARD status::StatusCode route_packet(const packet::PacketPtr& packet,
core::nanoseconds_t current_time);
ROC_ATTR_NODISCARD status::StatusCode route_packet(const packet::PacketPtr& packet);

//! Get slot metrics.
//! @remarks
Expand Down Expand Up @@ -133,8 +132,7 @@ class SenderSession : public core::NonCopyable<>,

void start_feedback_monitor_();

status::StatusCode route_control_packet_(const packet::PacketPtr& packet,
core::nanoseconds_t current_time);
status::StatusCode route_control_packet_(const packet::PacketPtr& packet);

core::IArena& arena_;

Expand Down
6 changes: 3 additions & 3 deletions src/internal_modules/roc_pipeline/sender_slot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,19 +142,19 @@ status::StatusCode SenderSlot::refresh(core::nanoseconds_t current_time,
status::StatusCode code = status::NoStatus;

if (source_endpoint_) {
if ((code = source_endpoint_->pull_packets(current_time)) != status::StatusOK) {
if ((code = source_endpoint_->pull_packets()) != status::StatusOK) {
return code;
}
}

if (repair_endpoint_) {
if ((code = repair_endpoint_->pull_packets(current_time)) != status::StatusOK) {
if ((code = repair_endpoint_->pull_packets()) != status::StatusOK) {
return code;
}
}

if (control_endpoint_) {
if ((code = control_endpoint_->pull_packets(current_time)) != status::StatusOK) {
if ((code = control_endpoint_->pull_packets()) != status::StatusOK) {
return code;
}
}
Expand Down
3 changes: 1 addition & 2 deletions src/internal_modules/roc_rtcp/communicator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,7 @@ size_t Communicator::total_streams() const {
return reporter_.total_streams();
}

status::StatusCode Communicator::process_packet(const packet::PacketPtr& packet,
core::nanoseconds_t current_time) {
status::StatusCode Communicator::process_packet(const packet::PacketPtr& packet) {
roc_panic_if(init_status_ != status::StatusOK);

roc_panic_if_msg(!packet, "rtcp communicator: null packet");
Expand Down
3 changes: 1 addition & 2 deletions src/internal_modules/roc_rtcp/communicator.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,7 @@ class Communicator : public core::NonCopyable<> {

//! Parse and process incoming packet.
//! Invokes IParticipant methods during processing.
ROC_ATTR_NODISCARD status::StatusCode
process_packet(const packet::PacketPtr& packet, core::nanoseconds_t current_time);
ROC_ATTR_NODISCARD status::StatusCode process_packet(const packet::PacketPtr& packet);

//! When we should generate packets next time.
//! Returns absolute time.
Expand Down
1 change: 1 addition & 0 deletions src/tests/roc_pipeline/test_helpers/control_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ class ControlWriter : public core::NonCopyable<> {

pp->udp()->src_addr = src_addr_;
pp->udp()->dst_addr = dst_addr_;
pp->udp()->receive_timestamp = core::timestamp(core::ClockUnix);

pp->set_buffer(buffer);

Expand Down
9 changes: 9 additions & 0 deletions src/tests/roc_pipeline/test_loopback_sink_2_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ class PacketProxy : core::NonCopyable<> {
continue;
}
print_packet_(pp);
set_ts_(pp);
CHECK(source_writer_);
LONGS_EQUAL(status::StatusOK, source_writer_->write(copy_packet_(pp)));
n_source_++;
Expand All @@ -187,11 +188,13 @@ class PacketProxy : core::NonCopyable<> {
continue;
}
print_packet_(pp);
set_ts_(pp);
CHECK(repair_writer_);
LONGS_EQUAL(status::StatusOK, repair_writer_->write(copy_packet_(pp)));
n_repair_++;
} else if (pp->flags() & packet::Packet::FlagControl) {
print_packet_(pp);
set_ts_(pp);
CHECK(control_writer_);
LONGS_EQUAL(status::StatusOK, control_writer_->write(copy_packet_(pp)));
n_control_++;
Expand Down Expand Up @@ -227,6 +230,12 @@ class PacketProxy : core::NonCopyable<> {
}
}

void set_ts_(packet::PacketPtr& pp) {
if (pp->udp()) {
pp->udp()->receive_timestamp = core::timestamp(core::ClockUnix);
}
}

packet::PacketFactory& packet_factory_;

address::SocketAddr proxy_addr_;
Expand Down
Loading

0 comments on commit c92d155

Please sign in to comment.