diff --git a/src/internal_modules/roc_packet/router.cpp b/src/internal_modules/roc_packet/router.cpp index 19650915b..9b6f85bbc 100644 --- a/src/internal_modules/roc_packet/router.cpp +++ b/src/internal_modules/roc_packet/router.cpp @@ -59,7 +59,8 @@ status::StatusCode Router::write(const PacketPtr& packet) { if (Route* route = find_route_(packet->flags())) { if (allow_route_(*route, *packet)) { - if (packet->udp()) { + if (packet->has_flags(Packet::FlagUDP) + && packet->udp()->queue_timestamp == 0) { packet->udp()->queue_timestamp = core::timestamp(core::ClockUnix); } diff --git a/src/tests/roc_pipeline/test_helpers/packet_writer.h b/src/tests/roc_pipeline/test_helpers/packet_writer.h index dd326d9b8..efcb902ec 100644 --- a/src/tests/roc_pipeline/test_helpers/packet_writer.h +++ b/src/tests/roc_pipeline/test_helpers/packet_writer.h @@ -14,6 +14,7 @@ #include "test_helpers/utils.h" #include "roc_audio/iframe_encoder.h" +#include "roc_core/fast_random.h" #include "roc_core/noncopyable.h" #include "roc_core/scoped_ptr.h" #include "roc_fec/block_writer.h" @@ -56,6 +57,9 @@ class PacketWriter : public core::NonCopyable<> { , timestamp_(0) , pt_(pt) , sample_offset_(0) + , qts_(core::Minute * 10000) + , qts_jitter_lo_(0) + , qts_jitter_hi_(0) , corrupt_flag_(false) { construct_(arena, packet_factory, encoding_map, pt, packet::FEC_None, fec::BlockWriterConfig()); @@ -85,6 +89,9 @@ class PacketWriter : public core::NonCopyable<> { , timestamp_(0) , pt_(pt) , sample_offset_(0) + , qts_(core::Minute * 10000) + , qts_jitter_lo_(0) + , qts_jitter_hi_(0) , corrupt_flag_(false) { construct_(arena, packet_factory, encoding_map, pt, fec_scheme, fec_config); } @@ -97,7 +104,7 @@ class PacketWriter : public core::NonCopyable<> { for (size_t np = 0; np < num_packets; np++) { packet::PacketPtr pp = create_packet_(samples_per_packet, sample_spec); CHECK(pp); - deliver_packet_(pp); + deliver_packet_(pp, sample_spec); } } @@ -112,7 +119,7 @@ class PacketWriter : public core::NonCopyable<> { } } - void shift_to(size_t num_packets, size_t samples_per_packet) { + void jump_to(size_t num_packets, size_t samples_per_packet) { seqnum_ = packet::seqnum_t(num_packets); timestamp_ = packet::stream_timestamp_t(num_packets * samples_per_packet); sample_offset_ = uint8_t(num_packets * samples_per_packet); @@ -150,6 +157,11 @@ class PacketWriter : public core::NonCopyable<> { timestamp_ = timestamp; } + void set_jitter(core::nanoseconds_t jitter_lo, core::nanoseconds_t jitter_hi) { + qts_jitter_lo_ = jitter_lo; + qts_jitter_hi_ = jitter_hi; + } + void corrupt_packets(bool corrupt) { corrupt_flag_ = corrupt; } @@ -246,6 +258,7 @@ class PacketWriter : public core::NonCopyable<> { pp->rtp()->seqnum = seqnum_; pp->rtp()->stream_timestamp = timestamp_; pp->rtp()->payload_type = pt_; + pp->rtp()->duration = samples_per_packet; seqnum_++; timestamp_ += samples_per_packet; @@ -271,7 +284,8 @@ class PacketWriter : public core::NonCopyable<> { return pp; } - void deliver_packet_(const packet::PacketPtr& pp) { + void deliver_packet_(const packet::PacketPtr& pp, + const audio::SampleSpec& sample_spec) { if (fec_writer_) { // fec_writer will produce source and repair packets and store in fec_queue // note that we're calling copy_packet_() only after fec_writer, because @@ -284,18 +298,21 @@ class PacketWriter : public core::NonCopyable<> { while (fec_queue_.read(fp, packet::ModeFetch) == status::StatusOK) { if (fp->has_flags(packet::Packet::FlagAudio)) { CHECK(source_composer_->compose(*fp)); - LONGS_EQUAL(status::StatusOK, - source_writer_->write(copy_packet_(fp))); + LONGS_EQUAL( + status::StatusOK, + source_writer_->write(prepare_for_delivery_(fp, sample_spec))); } else { CHECK(repair_composer_->compose(*fp)); - LONGS_EQUAL(status::StatusOK, - repair_writer_->write(copy_packet_(fp))); + LONGS_EQUAL( + status::StatusOK, + repair_writer_->write(prepare_for_delivery_(fp, sample_spec))); } } } else { // compose and "deliver" packet CHECK(source_composer_->compose(*pp)); - LONGS_EQUAL(status::StatusOK, source_writer_->write(copy_packet_(pp))); + LONGS_EQUAL(status::StatusOK, + source_writer_->write(prepare_for_delivery_(pp, sample_spec))); } } @@ -303,7 +320,8 @@ class PacketWriter : public core::NonCopyable<> { // like flags, parsed fields, etc; this way we simulate that packet was "delivered" // over network - packets enters receiver's pipeline without any meta-information, // and receiver fills that meta-information using packet parsers - packet::PacketPtr copy_packet_(const packet::PacketPtr& pa) { + packet::PacketPtr prepare_for_delivery_(const packet::PacketPtr& pa, + const audio::SampleSpec& sample_spec) { packet::PacketPtr pb = packet_factory_.new_packet(); CHECK(pb); @@ -311,6 +329,16 @@ class PacketWriter : public core::NonCopyable<> { pb->udp()->src_addr = src_addr_; pb->udp()->dst_addr = source_dst_addr_; + // timestamp when the packet was "received" + pb->udp()->queue_timestamp = qts_; + if (pa->duration() > 0) { + qts_ += sample_spec.stream_timestamp_2_ns(pa->duration()); + if (qts_jitter_hi_ > 0) { + qts_ += (core::nanoseconds_t)core::fast_random_range( + (uint64_t)qts_jitter_lo_, (uint64_t)qts_jitter_hi_); + } + } + pb->set_buffer(pa->buffer()); if (corrupt_flag_) { @@ -344,9 +372,12 @@ class PacketWriter : public core::NonCopyable<> { packet::stream_timestamp_t timestamp_; rtp::PayloadType pt_; - uint8_t sample_offset_; + core::nanoseconds_t qts_; + core::nanoseconds_t qts_jitter_lo_; + core::nanoseconds_t qts_jitter_hi_; + bool corrupt_flag_; }; diff --git a/src/tests/roc_pipeline/test_receiver_source.cpp b/src/tests/roc_pipeline/test_receiver_source.cpp index 1cd99c630..5475ec21a 100644 --- a/src/tests/roc_pipeline/test_receiver_source.cpp +++ b/src/tests/roc_pipeline/test_receiver_source.cpp @@ -20,6 +20,7 @@ #include "roc_core/time.h" #include "roc_pipeline/receiver_source.h" #include "roc_rtp/encoding_map.h" +#include "roc_stat/mov_aggregate.h" // This file contains tests for ReceiverSource. ReceiverSource can be seen as a big // composite processor (consisting of chained smaller processors) that transforms @@ -78,6 +79,8 @@ enum { ManyPackets = Latency / SamplesPerPacket * 10, ManyReports = 20, + LinkMeterWindow = ManyPackets * 10, + MaxSnJump = ManyPackets * 5, MaxTsJump = ManyPackets * 7 * SamplesPerPacket }; @@ -189,6 +192,19 @@ void write_packet(packet::IWriter& writer, const packet::PacketPtr& pp) { LONGS_EQUAL(status::StatusOK, writer.write(pp)); } +core::nanoseconds_t get_niq_latency(ReceiverSlot& receiver_slot) { + ReceiverSlotMetrics slot_metrics; + ReceiverParticipantMetrics party_metrics; + size_t party_metrics_size = 1; + + receiver_slot.get_metrics(slot_metrics, &party_metrics, &party_metrics_size); + + CHECK(slot_metrics.source_id != 0); + UNSIGNED_LONGS_EQUAL(1, slot_metrics.num_participants); + + return party_metrics.latency.niq_latency; +} + } // namespace TEST_GROUP(receiver_source) { @@ -241,6 +257,8 @@ TEST_GROUP(receiver_source) { config.session_defaults.plc.backend = plc_backend; + config.session_defaults.link_meter.sliding_window_length = LinkMeterWindow; + config.common.rtcp.report_interval = ReportInterval * core::Second / SampleRate; config.common.rtcp.inactivity_timeout = ReportTimeout * core::Second / SampleRate; @@ -255,6 +273,42 @@ TEST_GROUP(receiver_source) { return make_custom_config(Latency, LatencyTolerance, Timeout, Warmup); } + ReceiverSourceConfig make_adaptive_config( + core::nanoseconds_t start_latency, core::nanoseconds_t min_target_latency, + core::nanoseconds_t max_target_latency, core::nanoseconds_t latency_tolerance, + core::nanoseconds_t reaction) { + ReceiverSourceConfig config = + make_custom_config(Latency, LatencyTolerance, Timeout, Warmup); + + if (processor_map.has_resampler_backend(audio::ResamplerBackend_SpeexDec)) { + config.session_defaults.resampler.backend = audio::ResamplerBackend_SpeexDec; + } else { + config.session_defaults.resampler.backend = audio::ResamplerBackend_Auto; + } + config.session_defaults.resampler.profile = audio::ResamplerProfile_Low; + + config.session_defaults.latency.tuner_backend = audio::LatencyTunerBackend_Niq; + config.session_defaults.latency.tuner_profile = + audio::LatencyTunerProfile_Gradual; + + config.session_defaults.latency.target_latency = 0; + config.session_defaults.latency.latency_tolerance = latency_tolerance; + + config.session_defaults.latency.start_target_latency = start_latency; + config.session_defaults.latency.min_target_latency = min_target_latency; + config.session_defaults.latency.max_target_latency = max_target_latency; + + config.session_defaults.latency.starting_timeout = reaction; + config.session_defaults.latency.cooldown_dec_timeout = reaction; + config.session_defaults.latency.cooldown_inc_timeout = reaction; + + config.session_defaults.freq_est.stability_duration_criteria = reaction; + config.session_defaults.freq_est.P = 1e-6 * 1.5; + config.session_defaults.freq_est.I = 5e-9 * 1.5; + + return config; + } + void init_with_specs(int output_sample_rate, audio::ChannelMask output_channels, audio::PcmFormat output_format, int packet_sample_rate, audio::ChannelMask packet_channels, @@ -593,7 +647,7 @@ TEST(receiver_source, no_playback_timeout_smaller_than_latency) { UNSIGNED_LONGS_EQUAL(0, receiver.num_sessions()); } -// Latency goes below minimum during playback. +// Latency goes below `Target-Tolerance` during playback. TEST(receiver_source, latency_lower_bound) { enum { SmallTolerance = Latency / 2, @@ -649,7 +703,7 @@ TEST(receiver_source, latency_lower_bound) { UNSIGNED_LONGS_EQUAL(0, receiver.num_sessions()); } -// Latency goes above maximum during playback. +// Latency goes above `Target+Tolerance` during playback. TEST(receiver_source, latency_upper_bound) { enum { SmallTolerance = Latency * 3 / 2, @@ -1134,7 +1188,7 @@ TEST(receiver_source, seqnum_reorder) { } for (ssize_t np = ReorderWindow - 1; np >= 0; np--) { - packet_writer.shift_to(pos + size_t(np), SamplesPerPacket); + packet_writer.jump_to(pos + size_t(np), SamplesPerPacket); packet_writer.write_packets(1, SamplesPerPacket, packet_sample_spec); } @@ -1164,7 +1218,7 @@ TEST(receiver_source, seqnum_late) { packet_writer.write_packets(Latency / SamplesPerPacket, SamplesPerPacket, packet_sample_spec); - packet_writer.shift_to(Latency / SamplesPerPacket + DelayedPackets, SamplesPerPacket); + packet_writer.jump_to(Latency / SamplesPerPacket + DelayedPackets, SamplesPerPacket); for (size_t np = 0; np < Latency / SamplesPerPacket; np++) { for (size_t nf = 0; nf < FramesPerPacket; nf++) { @@ -1189,7 +1243,7 @@ TEST(receiver_source, seqnum_late) { packet_writer.write_packets(1, SamplesPerPacket, packet_sample_spec); } - packet_writer.shift_to(Latency / SamplesPerPacket, SamplesPerPacket); + packet_writer.jump_to(Latency / SamplesPerPacket, SamplesPerPacket); packet_writer.write_packets(DelayedPackets, SamplesPerPacket, packet_sample_spec); for (size_t np = 0; np < Latency / SamplesPerPacket; np++) { @@ -1945,11 +1999,11 @@ TEST(receiver_source, delayed_reordered_packets) { } // deliver P1 - packet_writer.shift_to(P1, SamplesPerPacket); + packet_writer.jump_to(P1, SamplesPerPacket); packet_writer.write_packets(1, SamplesPerPacket, packet_sample_spec); // deliver P4 - packet_writer.shift_to(P4, SamplesPerPacket); + packet_writer.jump_to(P4, SamplesPerPacket); packet_writer.write_packets(1, SamplesPerPacket, packet_sample_spec); // read P1 @@ -1961,11 +2015,11 @@ TEST(receiver_source, delayed_reordered_packets) { frame_reader.read_samples(SamplesPerPacket, 0, output_sample_spec); // deliver P2 - packet_writer.shift_to(P2, SamplesPerPacket); + packet_writer.jump_to(P2, SamplesPerPacket); packet_writer.write_packets(1, SamplesPerPacket, packet_sample_spec); // deliver P3 - packet_writer.shift_to(P3, SamplesPerPacket); + packet_writer.jump_to(P3, SamplesPerPacket); packet_writer.write_packets(1, SamplesPerPacket, packet_sample_spec); // read P3 @@ -3346,6 +3400,278 @@ TEST(receiver_source, timestamp_mapping_remixing) { CHECK(first_ts); } +// Set high jitter, wait until latency increases and stabilizes. +TEST(receiver_source, adaptive_latency_increase) { + const size_t stabilization_window = LinkMeterWindow * 5; + + const core::nanoseconds_t tolerance = core::Millisecond * 5; + const core::nanoseconds_t reaction = core::Second; + + const core::nanoseconds_t min_target_latency = core::Millisecond * 10; + const core::nanoseconds_t max_target_latency = core::Millisecond * 500; + + const core::nanoseconds_t start_latency = core::Millisecond * 50; + const core::nanoseconds_t jitter = core::Millisecond * 30; + + const core::nanoseconds_t expected_min = jitter * 3; + const core::nanoseconds_t expected_max = jitter * 6; + + init_with_defaults(); + + ReceiverSource receiver(make_adaptive_config(start_latency, min_target_latency, + max_target_latency, tolerance, reaction), + processor_map, encoding_map, packet_pool, packet_buffer_pool, + frame_pool, frame_buffer_pool, arena); + LONGS_EQUAL(status::StatusOK, receiver.init_status()); + + ReceiverSlot* slot = create_slot(receiver); + packet::IWriter* endpoint_writer = + create_transport_endpoint(slot, address::Iface_AudioSource, proto1, dst_addr1); + + test::FrameReader frame_reader(receiver, frame_factory); + + test::PacketWriter packet_writer(arena, *endpoint_writer, encoding_map, + packet_factory, src_id1, src_addr1, dst_addr1, + PayloadType_Ch2); + + // set jitter higher than start latency + packet_writer.set_jitter(jitter - tolerance, jitter + tolerance); + + // wait until we reach stable latency + stat::MovAggregate latency_hist(arena, stabilization_window); + CHECK(latency_hist.is_valid()); + + for (;;) { + packet_writer.write_packets(1, SamplesPerPacket, packet_sample_spec); + + refresh_source(receiver, frame_reader.refresh_ts()); + frame_reader.read_any_samples(SamplesPerPacket, output_sample_spec); + + const core::nanoseconds_t cur_latency = get_niq_latency(*slot); + if (cur_latency > 0) { + latency_hist.add(cur_latency); + } + if (latency_hist.is_full() && latency_hist.mov_min() > expected_min + && latency_hist.mov_max() < expected_max + && std::abs(latency_hist.mov_max() - latency_hist.mov_min()) < tolerance) { + break; + } + } + + const core::nanoseconds_t stable_latency = latency_hist.mov_max(); + + roc_log(LogNote, "reached stable latency: %.3fms", + (double)stable_latency / core::Millisecond); + + CHECK(stable_latency > expected_min); + CHECK(stable_latency < expected_max); + + // ensure we've stabilized + for (size_t np = 0; np < stabilization_window; np++) { + packet_writer.write_packets(1, SamplesPerPacket, packet_sample_spec); + + refresh_source(receiver, frame_reader.refresh_ts()); + frame_reader.read_nonzero_samples(SamplesPerPacket, output_sample_spec); + + const core::nanoseconds_t cur_latency = get_niq_latency(*slot); + CHECK(std::abs(cur_latency - stable_latency) < tolerance); + } +} + +// Set low jitter, wait until latency decreases and stabilizes. +TEST(receiver_source, adaptive_latency_decrease) { + const size_t stabilization_window = LinkMeterWindow * 5; + + const core::nanoseconds_t tolerance = core::Millisecond * 5; + const core::nanoseconds_t reaction = core::Second; + + const core::nanoseconds_t min_target_latency = core::Millisecond * 10; + const core::nanoseconds_t max_target_latency = core::Millisecond * 500; + + const core::nanoseconds_t start_latency = core::Millisecond * 120; + const core::nanoseconds_t jitter = core::Millisecond * 20; + + const core::nanoseconds_t expected_min = jitter * 3; + const core::nanoseconds_t expected_max = jitter * 6; + + init_with_defaults(); + + ReceiverSource receiver(make_adaptive_config(start_latency, min_target_latency, + max_target_latency, tolerance, reaction), + processor_map, encoding_map, packet_pool, packet_buffer_pool, + frame_pool, frame_buffer_pool, arena); + LONGS_EQUAL(status::StatusOK, receiver.init_status()); + + ReceiverSlot* slot = create_slot(receiver); + packet::IWriter* endpoint_writer = + create_transport_endpoint(slot, address::Iface_AudioSource, proto1, dst_addr1); + + test::FrameReader frame_reader(receiver, frame_factory); + + test::PacketWriter packet_writer(arena, *endpoint_writer, encoding_map, + packet_factory, src_id1, src_addr1, dst_addr1, + PayloadType_Ch2); + + // set jitter lower than start latency + packet_writer.set_jitter(jitter - tolerance, jitter + tolerance); + + // wait until we reach stable latency + stat::MovAggregate latency_hist(arena, stabilization_window); + CHECK(latency_hist.is_valid()); + + for (;;) { + packet_writer.write_packets(1, SamplesPerPacket, packet_sample_spec); + + refresh_source(receiver, frame_reader.refresh_ts()); + frame_reader.read_any_samples(SamplesPerPacket, output_sample_spec); + + const core::nanoseconds_t cur_latency = get_niq_latency(*slot); + if (cur_latency > 0) { + latency_hist.add(cur_latency); + } + if (latency_hist.is_full() && latency_hist.mov_min() > expected_min + && latency_hist.mov_max() < expected_max + && std::abs(latency_hist.mov_max() - latency_hist.mov_min()) < tolerance) { + break; + } + } + + const core::nanoseconds_t stable_latency = latency_hist.mov_min(); + + roc_log(LogNote, "reached stable latency: %.3fms", + (double)stable_latency / core::Millisecond); + + CHECK(stable_latency > expected_min); + CHECK(stable_latency < expected_max); + + // ensure we've stabilized + for (size_t np = 0; np < stabilization_window; np++) { + packet_writer.write_packets(1, SamplesPerPacket, packet_sample_spec); + + refresh_source(receiver, frame_reader.refresh_ts()); + frame_reader.read_nonzero_samples(SamplesPerPacket, output_sample_spec); + + const core::nanoseconds_t cur_latency = get_niq_latency(*slot); + CHECK(std::abs(cur_latency - stable_latency) < tolerance); + } +} + +// Adaptive latency should be bounded by max_target_latency +TEST(receiver_source, adaptive_latency_upper_bound) { + const size_t stabilization_window = LinkMeterWindow * 5; + + const core::nanoseconds_t tolerance = core::Millisecond * 5; + const core::nanoseconds_t reaction = core::Second; + + const core::nanoseconds_t min_target_latency = core::Millisecond * 100; + const core::nanoseconds_t max_target_latency = core::Millisecond * 140; + const core::nanoseconds_t start_latency = core::Millisecond * 120; + + init_with_defaults(); + + ReceiverSource receiver(make_adaptive_config(start_latency, min_target_latency, + max_target_latency, tolerance, reaction), + processor_map, encoding_map, packet_pool, packet_buffer_pool, + frame_pool, frame_buffer_pool, arena); + LONGS_EQUAL(status::StatusOK, receiver.init_status()); + + ReceiverSlot* slot = create_slot(receiver); + packet::IWriter* endpoint_writer = + create_transport_endpoint(slot, address::Iface_AudioSource, proto1, dst_addr1); + + test::FrameReader frame_reader(receiver, frame_factory); + + test::PacketWriter packet_writer(arena, *endpoint_writer, encoding_map, + packet_factory, src_id1, src_addr1, dst_addr1, + PayloadType_Ch2); + + // set jitter higher than max latency + packet_writer.set_jitter(max_target_latency * 2 - tolerance, + max_target_latency * 2 + tolerance); + + // wait until we reach maximum latency + for (;;) { + packet_writer.write_packets(1, SamplesPerPacket, packet_sample_spec); + + refresh_source(receiver, frame_reader.refresh_ts()); + frame_reader.read_any_samples(SamplesPerPacket, output_sample_spec); + + const core::nanoseconds_t cur_latency = get_niq_latency(*slot); + if (cur_latency >= max_target_latency - tolerance / 2) { + break; + } + } + + // ensure we've stabilized + for (size_t np = 0; np < stabilization_window; np++) { + packet_writer.write_packets(1, SamplesPerPacket, packet_sample_spec); + + refresh_source(receiver, frame_reader.refresh_ts()); + frame_reader.read_nonzero_samples(SamplesPerPacket, output_sample_spec); + + const core::nanoseconds_t cur_latency = get_niq_latency(*slot); + CHECK(std::abs(cur_latency - max_target_latency) < tolerance); + } +} + +// Adaptive latency should be bounded by min_target_latency +TEST(receiver_source, adaptive_latency_lower_bound) { + const size_t stabilization_window = LinkMeterWindow * 5; + + const core::nanoseconds_t tolerance = core::Millisecond * 5; + const core::nanoseconds_t reaction = core::Second; + + const core::nanoseconds_t min_target_latency = core::Millisecond * 100; + const core::nanoseconds_t max_target_latency = core::Millisecond * 140; + const core::nanoseconds_t start_latency = core::Millisecond * 120; + + init_with_defaults(); + + ReceiverSource receiver(make_adaptive_config(start_latency, min_target_latency, + max_target_latency, tolerance, reaction), + processor_map, encoding_map, packet_pool, packet_buffer_pool, + frame_pool, frame_buffer_pool, arena); + LONGS_EQUAL(status::StatusOK, receiver.init_status()); + + ReceiverSlot* slot = create_slot(receiver); + packet::IWriter* endpoint_writer = + create_transport_endpoint(slot, address::Iface_AudioSource, proto1, dst_addr1); + + test::FrameReader frame_reader(receiver, frame_factory); + + test::PacketWriter packet_writer(arena, *endpoint_writer, encoding_map, + packet_factory, src_id1, src_addr1, dst_addr1, + PayloadType_Ch2); + + // set jitter higher than max latency + packet_writer.set_jitter(min_target_latency / 10 - tolerance, + min_target_latency / 10 + tolerance); + + // wait until we reach minimum latency + for (;;) { + packet_writer.write_packets(1, SamplesPerPacket, packet_sample_spec); + + refresh_source(receiver, frame_reader.refresh_ts()); + frame_reader.read_any_samples(SamplesPerPacket, output_sample_spec); + + const core::nanoseconds_t cur_latency = get_niq_latency(*slot); + if (cur_latency > 0 && cur_latency <= min_target_latency + tolerance / 2) { + break; + } + } + + // ensure we've stabilized + for (size_t np = 0; np < stabilization_window; np++) { + packet_writer.write_packets(1, SamplesPerPacket, packet_sample_spec); + + refresh_source(receiver, frame_reader.refresh_ts()); + frame_reader.read_nonzero_samples(SamplesPerPacket, output_sample_spec); + + const core::nanoseconds_t cur_latency = get_niq_latency(*slot); + CHECK(std::abs(cur_latency - min_target_latency) < tolerance); + } +} + // Check receiver metrics for multiple remote participants (senders). TEST(receiver_source, metrics_participants) { enum { MaxParties = 10 }; @@ -3658,8 +3984,79 @@ TEST(receiver_source, metrics_packet_counters) { } // Check how receiver computes jitter metric. -IGNORE_TEST(receiver_source, metrics_jitter) { - // TODO(gh-688): implement test +TEST(receiver_source, metrics_jitter) { + const core::nanoseconds_t jitter1 = core::Millisecond * 40; + const core::nanoseconds_t jitter2 = core::Millisecond * 80; + const core::nanoseconds_t precision = core::Millisecond * 5; + + init_with_defaults(); + + ReceiverSource receiver(make_default_config(), processor_map, encoding_map, + packet_pool, packet_buffer_pool, frame_pool, + frame_buffer_pool, arena); + LONGS_EQUAL(status::StatusOK, receiver.init_status()); + + ReceiverSlot* slot = create_slot(receiver); + packet::IWriter* endpoint_writer = + create_transport_endpoint(slot, address::Iface_AudioSource, proto1, dst_addr1); + + test::FrameReader frame_reader(receiver, frame_factory); + + test::PacketWriter packet_writer(arena, *endpoint_writer, encoding_map, + packet_factory, src_id1, src_addr1, dst_addr1, + PayloadType_Ch2); + + // jitter 1 + packet_writer.set_jitter(jitter1 - precision, jitter1 + precision); + + for (size_t np = 0; np < LinkMeterWindow * 2; np++) { + packet_writer.write_packets(1, SamplesPerPacket, packet_sample_spec); + + refresh_source(receiver, frame_reader.refresh_ts()); + frame_reader.read_any_samples(SamplesPerPacket, output_sample_spec); + + { + ReceiverSlotMetrics slot_metrics; + ReceiverParticipantMetrics party_metrics; + size_t party_metrics_size = 1; + + slot->get_metrics(slot_metrics, &party_metrics, &party_metrics_size); + + CHECK(slot_metrics.source_id != 0); + UNSIGNED_LONGS_EQUAL(1, slot_metrics.num_participants); + UNSIGNED_LONGS_EQUAL(1, party_metrics_size); + + if (np > Latency / SamplesPerPacket) { + DOUBLES_EQUAL(jitter1, party_metrics.link.jitter, precision); + } + } + } + + // jitter 2 + packet_writer.set_jitter(jitter2 - precision, jitter2 + precision); + + for (size_t np = 0; np < LinkMeterWindow * 2; np++) { + packet_writer.write_packets(1, SamplesPerPacket, packet_sample_spec); + + refresh_source(receiver, frame_reader.refresh_ts()); + frame_reader.read_any_samples(SamplesPerPacket, output_sample_spec); + + { + ReceiverSlotMetrics slot_metrics; + ReceiverParticipantMetrics party_metrics; + size_t party_metrics_size = 1; + + slot->get_metrics(slot_metrics, &party_metrics, &party_metrics_size); + + CHECK(slot_metrics.source_id != 0); + UNSIGNED_LONGS_EQUAL(1, slot_metrics.num_participants); + UNSIGNED_LONGS_EQUAL(1, party_metrics_size); + + if (np > LinkMeterWindow) { + DOUBLES_EQUAL(jitter2, party_metrics.link.jitter, precision); + } + } + } } // Check how receiver computes niq_latency metric (network incoming queue size).