Skip to content

Commit

Permalink
[spo] roc-streaminggh-731: More granular control of lost packets in A…
Browse files Browse the repository at this point in the history
…PI tests

Sponsored-by: waspd
  • Loading branch information
gavv committed Jul 17, 2024
1 parent 4575698 commit 52652f1
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 35 deletions.
23 changes: 19 additions & 4 deletions src/tests/public_api/test_helpers/proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "test_helpers/utils.h"

#include "roc_address/socket_addr.h"
#include "roc_core/atomic.h"
#include "roc_core/heap_arena.h"
#include "roc_netio/network_loop.h"
#include "roc_packet/fifo_queue.h"
Expand All @@ -31,12 +32,14 @@ class Proxy : private packet::IWriter {
Proxy(const roc_endpoint* receiver_source_endp,
const roc_endpoint* receiver_repair_endp,
size_t n_source_packets,
size_t n_repair_packets)
size_t n_repair_packets,
unsigned flags)
: packet_pool_("proxy_packet_pool", arena_)
, buffer_pool_("proxy_buffer_pool", arena_, 2000)
, net_loop_(packet_pool_, buffer_pool_, arena_)
, n_source_packets_(n_source_packets)
, n_repair_packets_(n_repair_packets)
, flags_(flags)
, pos_(0) {
LONGS_EQUAL(status::StatusOK, net_loop_.init_status());

Expand Down Expand Up @@ -125,6 +128,10 @@ class Proxy : private packet::IWriter {
return input_repair_endp_;
}

size_t n_dropped_packets() const {
return n_dropped_packets_;
}

private:
virtual ROC_ATTR_NODISCARD status::StatusCode write(const packet::PacketPtr& pp) {
pp->udp()->src_addr = send_config_.bind_address;
Expand All @@ -141,11 +148,15 @@ class Proxy : private packet::IWriter {
const size_t block_pos = pos_ % (n_source_packets_ + n_repair_packets_);

if (block_pos < n_source_packets_) {
if (!send_packet_(source_queue_, block_pos == 1)) {
const bool drop_packet = (flags_ & FlagLoseSomePkts) && (block_pos == 1);

if (!send_packet_(source_queue_, drop_packet)) {
break;
}
} else {
if (!send_packet_(repair_queue_, false)) {
const bool drop_packet = (flags_ & FlagLoseAllRepairPkts);

if (!send_packet_(repair_queue_, drop_packet)) {
break;
}
}
Expand All @@ -164,7 +175,9 @@ class Proxy : private packet::IWriter {
}
CHECK(pp);
pos_++;
if (!drop) {
if (drop) {
n_dropped_packets_++;
} else {
LONGS_EQUAL(status::StatusOK, writer_->write(pp));
}
return true;
Expand Down Expand Up @@ -194,7 +207,9 @@ class Proxy : private packet::IWriter {

const size_t n_source_packets_;
const size_t n_repair_packets_;
core::Atomic<size_t> n_dropped_packets_;

const unsigned flags_;
size_t pos_;
};

Expand Down
3 changes: 3 additions & 0 deletions src/tests/public_api/test_helpers/receiver.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,14 +135,17 @@ class Receiver : public core::Thread {
}

const roc_endpoint* source_endpoint(roc_slot slot = ROC_SLOT_DEFAULT) const {
CHECK(source_endp_[slot]);
return source_endp_[slot];
}

const roc_endpoint* repair_endpoint(roc_slot slot = ROC_SLOT_DEFAULT) const {
CHECK(repair_endp_[slot]);
return repair_endp_[slot];
}

const roc_endpoint* control_endpoint(roc_slot slot = ROC_SLOT_DEFAULT) const {
CHECK(control_endp_[slot]);
return control_endp_[slot];
}

Expand Down
7 changes: 7 additions & 0 deletions src/tests/public_api/test_helpers/sender.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,22 +56,29 @@ class Sender : public core::Thread {
const roc_endpoint* receiver_control_endp,
roc_slot slot = ROC_SLOT_DEFAULT) {
if ((flags_ & FlagRS8M) || (flags_ & FlagLDPC)) {
CHECK(receiver_source_endp);
CHECK(receiver_repair_endp);
CHECK(roc_sender_connect(sndr_, slot, ROC_INTERFACE_AUDIO_SOURCE,
receiver_source_endp)
== 0);
CHECK(roc_sender_connect(sndr_, slot, ROC_INTERFACE_AUDIO_REPAIR,
receiver_repair_endp)
== 0);
} else {
CHECK(receiver_source_endp);
CHECK(!receiver_repair_endp);
CHECK(roc_sender_connect(sndr_, slot, ROC_INTERFACE_AUDIO_SOURCE,
receiver_source_endp)
== 0);
}

if (flags_ & FlagRTCP) {
CHECK(receiver_control_endp);
CHECK(roc_sender_connect(sndr_, slot, ROC_INTERFACE_AUDIO_CONTROL,
receiver_control_endp)
== 0);
} else {
CHECK(!receiver_control_endp);
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/tests/public_api/test_helpers/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ enum {
FlagMultitrack = (1 << 3),
FlagNonStrict = (1 << 4),
FlagInfinite = (1 << 5),
FlagLoseSomePkts = (1 << 6),
FlagLoseAllRepairPkts = (1 << 7),
};

inline float increment_sample_value(float sample_value, float sample_step) {
Expand Down
66 changes: 35 additions & 31 deletions src/tests/public_api/test_loopback_sender_2_receiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ TEST(loopback_sender_2_receiver, bare_rtp) {
test::Sender sender(context, sender_conf, sample_step, FrameChans, test::FrameSamples,
Flags);

sender.connect(receiver.source_endpoint(), receiver.repair_endpoint(), NULL);
sender.connect(receiver.source_endpoint(), NULL, NULL);

CHECK(sender.start());
receiver.receive();
Expand All @@ -157,8 +157,7 @@ TEST(loopback_sender_2_receiver, rtp_rtcp) {
test::Sender sender(context, sender_conf, sample_step, FrameChans, test::FrameSamples,
Flags);

sender.connect(receiver.source_endpoint(), receiver.repair_endpoint(),
receiver.control_endpoint());
sender.connect(receiver.source_endpoint(), NULL, receiver.control_endpoint());

CHECK(sender.start());
receiver.receive();
Expand Down Expand Up @@ -198,7 +197,11 @@ TEST(loopback_sender_2_receiver, rs8m_with_losses) {
return;
}

enum { Flags = test::FlagRS8M, FrameChans = 2, PacketChans = 2 };
enum {
Flags = test::FlagRS8M | test::FlagLoseSomePkts,
FrameChans = 2,
PacketChans = 2
};

init_config(Flags, FrameChans, PacketChans);

Expand All @@ -210,7 +213,7 @@ TEST(loopback_sender_2_receiver, rs8m_with_losses) {
receiver.bind();

test::Proxy proxy(receiver.source_endpoint(), receiver.repair_endpoint(),
test::SourcePackets, test::RepairPackets);
test::SourcePackets, test::RepairPackets, Flags);

test::Sender sender(context, sender_conf, sample_step, FrameChans, test::FrameSamples,
Flags);
Expand All @@ -221,6 +224,8 @@ TEST(loopback_sender_2_receiver, rs8m_with_losses) {
receiver.receive();
sender.stop();
sender.join();

CHECK(proxy.n_dropped_packets() > 0);
}

TEST(loopback_sender_2_receiver, ldpc_without_losses) {
Expand Down Expand Up @@ -255,7 +260,11 @@ TEST(loopback_sender_2_receiver, ldpc_with_losses) {
return;
}

enum { Flags = test::FlagLDPC, FrameChans = 2, PacketChans = 2 };
enum {
Flags = test::FlagLDPC | test::FlagLoseSomePkts,
FrameChans = 2,
PacketChans = 2
};

init_config(Flags, FrameChans, PacketChans);

Expand All @@ -267,7 +276,7 @@ TEST(loopback_sender_2_receiver, ldpc_with_losses) {
receiver.bind();

test::Proxy proxy(receiver.source_endpoint(), receiver.repair_endpoint(),
test::SourcePackets, test::RepairPackets);
test::SourcePackets, test::RepairPackets, Flags);

test::Sender sender(context, sender_conf, sample_step, FrameChans, test::FrameSamples,
Flags);
Expand All @@ -278,6 +287,8 @@ TEST(loopback_sender_2_receiver, ldpc_with_losses) {
receiver.receive();
sender.stop();
sender.join();

CHECK(proxy.n_dropped_packets() > 0);
}

TEST(loopback_sender_2_receiver, separate_context) {
Expand All @@ -295,7 +306,7 @@ TEST(loopback_sender_2_receiver, separate_context) {
test::Sender sender(send_context, sender_conf, sample_step, FrameChans,
test::FrameSamples, Flags);

sender.connect(receiver.source_endpoint(), receiver.repair_endpoint(), NULL);
sender.connect(receiver.source_endpoint(), NULL, NULL);

CHECK(sender.start());
receiver.receive();
Expand All @@ -318,7 +329,7 @@ TEST(loopback_sender_2_receiver, multiple_senders_one_receiver_sequential) {
test::Sender sender_1(context, sender_conf, sample_step, FrameChans,
test::FrameSamples, Flags);

sender_1.connect(receiver.source_endpoint(), receiver.repair_endpoint(), NULL);
sender_1.connect(receiver.source_endpoint(), NULL, NULL);

CHECK(sender_1.start());
receiver.receive();
Expand All @@ -330,7 +341,7 @@ TEST(loopback_sender_2_receiver, multiple_senders_one_receiver_sequential) {
test::Sender sender_2(context, sender_conf, sample_step, FrameChans,
test::FrameSamples, Flags);

sender_2.connect(receiver.source_endpoint(), receiver.repair_endpoint(), NULL);
sender_2.connect(receiver.source_endpoint(), NULL, NULL);

CHECK(sender_2.start());
receiver.receive();
Expand Down Expand Up @@ -358,10 +369,8 @@ TEST(loopback_sender_2_receiver, sender_slots) {
test::Sender sender(context, sender_conf, sample_step, FrameChans, test::FrameSamples,
Flags);

sender.connect(receiver_1.source_endpoint(), receiver_1.repair_endpoint(), NULL,
Slot1);
sender.connect(receiver_2.source_endpoint(), receiver_2.repair_endpoint(), NULL,
Slot2);
sender.connect(receiver_1.source_endpoint(), NULL, NULL, Slot1);
sender.connect(receiver_2.source_endpoint(), NULL, NULL, Slot2);

CHECK(sender.start());

Expand Down Expand Up @@ -390,8 +399,7 @@ TEST(loopback_sender_2_receiver, receiver_slots_sequential) {
test::Sender sender_1(context, sender_conf, sample_step, FrameChans,
test::FrameSamples, Flags);

sender_1.connect(receiver.source_endpoint(Slot1), receiver.repair_endpoint(Slot1),
NULL);
sender_1.connect(receiver.source_endpoint(Slot1), NULL, NULL);

CHECK(sender_1.start());
receiver.receive();
Expand All @@ -403,8 +411,7 @@ TEST(loopback_sender_2_receiver, receiver_slots_sequential) {
test::Sender sender_2(context, sender_conf, sample_step, FrameChans,
test::FrameSamples, Flags);

sender_2.connect(receiver.source_endpoint(Slot2), receiver.repair_endpoint(Slot2),
NULL);
sender_2.connect(receiver.source_endpoint(Slot2), NULL, NULL);

CHECK(sender_2.start());
receiver.receive();
Expand All @@ -427,7 +434,7 @@ TEST(loopback_sender_2_receiver, mono) {
test::Sender sender(context, sender_conf, sample_step, FrameChans, test::FrameSamples,
Flags);

sender.connect(receiver.source_endpoint(), receiver.repair_endpoint(), NULL);
sender.connect(receiver.source_endpoint(), NULL, NULL);

CHECK(sender.start());
receiver.receive();
Expand All @@ -450,7 +457,7 @@ TEST(loopback_sender_2_receiver, stereo_mono_stereo) {
test::Sender sender(context, sender_conf, sample_step, FrameChans, test::FrameSamples,
Flags);

sender.connect(receiver.source_endpoint(), receiver.repair_endpoint(), NULL);
sender.connect(receiver.source_endpoint(), NULL, NULL);

CHECK(sender.start());
receiver.receive();
Expand All @@ -473,7 +480,7 @@ TEST(loopback_sender_2_receiver, mono_stereo_mono) {
test::Sender sender(context, sender_conf, sample_step, FrameChans, test::FrameSamples,
Flags);

sender.connect(receiver.source_endpoint(), receiver.repair_endpoint(), NULL);
sender.connect(receiver.source_endpoint(), NULL, NULL);

CHECK(sender.start());
receiver.receive();
Expand Down Expand Up @@ -503,7 +510,7 @@ TEST(loopback_sender_2_receiver, multitrack) {
test::Sender sender(context, sender_conf, sample_step, FrameChans, test::FrameSamples,
Flags);

sender.connect(receiver.source_endpoint(), receiver.repair_endpoint(), NULL);
sender.connect(receiver.source_endpoint(), NULL, NULL);

CHECK(sender.start());
receiver.receive();
Expand Down Expand Up @@ -534,7 +541,7 @@ TEST(loopback_sender_2_receiver, multitrack_separate_contexts) {
test::Sender sender(send_context, sender_conf, sample_step, FrameChans,
test::FrameSamples, Flags);

sender.connect(receiver.source_endpoint(), receiver.repair_endpoint(), NULL);
sender.connect(receiver.source_endpoint(), NULL, NULL);

CHECK(sender.start());
receiver.receive();
Expand Down Expand Up @@ -562,8 +569,7 @@ TEST(loopback_sender_2_receiver, metrics_measurements) {
test::Sender sender(context, sender_conf, sample_step, FrameChans, test::FrameSamples,
Flags);

sender.connect(receiver.source_endpoint(), receiver.repair_endpoint(),
receiver.control_endpoint());
sender.connect(receiver.source_endpoint(), NULL, receiver.control_endpoint());

{
receiver.query_metrics(MaxSess);
Expand Down Expand Up @@ -633,14 +639,12 @@ TEST(loopback_sender_2_receiver, metrics_connections) {
test::Sender sender_1(context, sender_conf, sample_step, FrameChans,
test::FrameSamples, Flags);

sender_1.connect(receiver.source_endpoint(), receiver.repair_endpoint(),
receiver.control_endpoint());
sender_1.connect(receiver.source_endpoint(), NULL, receiver.control_endpoint());

test::Sender sender_2(context, sender_conf, sample_step, FrameChans,
test::FrameSamples, Flags);

sender_2.connect(receiver.source_endpoint(), receiver.repair_endpoint(),
receiver.control_endpoint());
sender_2.connect(receiver.source_endpoint(), NULL, receiver.control_endpoint());

{
receiver.query_metrics(MaxSess);
Expand Down Expand Up @@ -743,10 +747,10 @@ TEST(loopback_sender_2_receiver, metrics_slots) {
test::Sender sender(context, sender_conf, sample_step, FrameChans, test::FrameSamples,
Flags);

sender.connect(receiver.source_endpoint(Slot1), receiver.repair_endpoint(Slot1),
sender.connect(receiver.source_endpoint(Slot1), NULL,
receiver.control_endpoint(Slot1), Slot1);

sender.connect(receiver.source_endpoint(Slot2), receiver.repair_endpoint(Slot2),
sender.connect(receiver.source_endpoint(Slot2), NULL,
receiver.control_endpoint(Slot2), Slot2);

{
Expand Down

0 comments on commit 52652f1

Please sign in to comment.