Skip to content

Commit

Permalink
roc-streaminggh-675 Deliver niq_latency from receiver to sender
Browse files Browse the repository at this point in the history
we introduce non-standard XR block "Queue Metrics Block"
(BT=220) which holds Network Incoming Queue Delay.
  • Loading branch information
gavv committed Jan 26, 2024
1 parent ffe0e87 commit a426853
Show file tree
Hide file tree
Showing 15 changed files with 318 additions and 24 deletions.
1 change: 1 addition & 0 deletions src/internal_modules/roc_pipeline/receiver_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ void ReceiverSession::generate_reports(const char* report_cname,
report.fract_loss = link_metrics.fract_loss;
report.cum_loss = link_metrics.cum_loss;
report.jitter = link_metrics.jitter;
report.niq_latency = latency_metrics.niq_latency;
report.e2e_latency = latency_metrics.e2e_latency;

reports++;
Expand Down
14 changes: 14 additions & 0 deletions src/internal_modules/roc_rtcp/builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,20 @@ void Builder::add_xr_delay_metrics(const header::XrDelayMetricsBlock& delay_metr
cur_xr_block_header_->set_len_bytes(sizeof(delay_metrics));
}

void Builder::add_xr_queue_metrics(const header::XrQueueMetricsBlock& queue_metrics) {
roc_panic_if_msg(state_ != XR_HEAD, "rtcp builder: wrong call order");

header::XrQueueMetricsBlock* p =
(header::XrQueueMetricsBlock*)add_block_(sizeof(queue_metrics));
if (!p) {
return;
}
memcpy(p, &queue_metrics, sizeof(queue_metrics));

cur_xr_block_header_ = &p->header();
cur_xr_block_header_->set_len_bytes(sizeof(queue_metrics));
}

void Builder::end_xr() {
roc_panic_if_msg(state_ != XR_HEAD, "rtcp builder: wrong call order");

Expand Down
3 changes: 3 additions & 0 deletions src/internal_modules/roc_rtcp/builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ class Builder : public core::NonCopyable<> {
//! Add delay metrics block.to current XR packet.
void add_xr_delay_metrics(const header::XrDelayMetricsBlock& delay_metrics);

//! Add queue metrics block.to current XR packet.
void add_xr_queue_metrics(const header::XrQueueMetricsBlock& queue_metrics);

//! Finish current DLRR block.
void end_xr_dlrr();

Expand Down
12 changes: 12 additions & 0 deletions src/internal_modules/roc_rtcp/communicator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,11 @@ void Communicator::process_extended_report_(const XrTraverser& xr) {
reporter_.process_delay_metrics_block(xr.packet(), iter.get_delay_metrics());
} break;

case XrTraverser::Iterator::QUEUE_METRICS_BLOCK: {
// Queue Metrics is extended receiver report.
reporter_.process_queue_metrics_block(xr.packet(), iter.get_queue_metrics());
} break;

default:
break;
}
Expand Down Expand Up @@ -766,6 +771,7 @@ void Communicator::generate_extended_report_(Builder& bld) {
if (!next_recv_stream_(stream_index)) {
break;
}

header::XrMeasurementInfoBlock mi_blk;
reporter_.generate_measurement_info_block(dest_addr_index_, stream_index,
mi_blk);
Expand All @@ -777,6 +783,12 @@ void Communicator::generate_extended_report_(Builder& bld) {
dm_blk);

bld.add_xr_delay_metrics(dm_blk);

header::XrQueueMetricsBlock qm_blk;
reporter_.generate_queue_metrics_block(dest_addr_index_, stream_index,
qm_blk);

bld.add_xr_queue_metrics(qm_blk);
}
}

Expand Down
92 changes: 91 additions & 1 deletion src/internal_modules/roc_rtcp/headers.h
Original file line number Diff line number Diff line change
Expand Up @@ -1109,7 +1109,9 @@ enum XrBlockType {
// RFC 6776
XR_MEASUREMENT_INFO = 14, //!< Measurement Information Report Block.
// RFC 6843
XR_DELAY_METRICS = 16 //!< Delay Metrics Report Block.
XR_DELAY_METRICS = 16, //!< Delay Metrics Report Block.
// Non-standard
XR_QUEUE_METRICS = 220 //!< Queue Metrics Report Block.
};

//! XR Block Header.
Expand Down Expand Up @@ -1667,6 +1669,94 @@ ROC_ATTR_PACKED_BEGIN class XrDelayMetricsBlock {
}
} ROC_ATTR_PACKED_END;

//! XR Queue Metrics Block.
//!
//! Non-standard.
//!
//! @code
//! 0 1 2 3
//! 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7
//! +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
//! | BT=220 | I | resv. | block length = 6 |
//! +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
//! | SSRC of Source |
//! +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
//! | Network Incoming Queue Delay |
//! +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
//! @endcode
ROC_ATTR_PACKED_BEGIN class XrQueueMetricsBlock {
private:
enum {
MetricFlag_shift = 6,
MetricFlag_mask = 0x03,
};

XrBlockHeader header_;

uint32_t ssrc_;
NtpTimestamp32 niq_delay_;

public:
XrQueueMetricsBlock() {
reset();
}

//! Reset to initial state (all zeros).
void reset() {
header_.reset(XR_QUEUE_METRICS);
ssrc_ = 0;
niq_delay_.set_value(MetricUnavail_32);
}

//! Get common block header.
const XrBlockHeader& header() const {
return header_;
}

//! Get common block header.
XrBlockHeader& header() {
return header_;
}

//! Get Interval Metrics flag.
MetricFlag metric_flag() const {
return (MetricFlag)get_bit_field<uint8_t>(header_.type_specific(),
MetricFlag_shift, MetricFlag_mask);
}

//! Set Interval Metrics flag.
void set_metric_flag(const MetricFlag f) {
uint8_t t = header_.type_specific();
set_bit_field<uint8_t>(t, (uint8_t)f, MetricFlag_shift, MetricFlag_mask);
header_.set_type_specific(t);
}

//! Get SSRC of source being reported.
packet::stream_source_t ssrc() const {
return core::ntoh32u(ssrc_);
}

//! Set SSRC of source being reported.
void set_ssrc(const packet::stream_source_t ssrc) {
ssrc_ = core::hton32u(ssrc);
}

//! Check if Network Incoming Queue Delay is set.
bool has_niq_delay() const {
return niq_delay_.value() != MetricUnavail_32;
}

//! Get Network Incoming Queue Delay.
packet::ntp_timestamp_t niq_delay() const {
return niq_delay_.value();
}

//! Set Network Incoming Queue Delay.
void set_niq_delay(const packet::ntp_timestamp_t t) {
niq_delay_.set_value(std::min(t, MetricUnavail_32 - 1));
}
} ROC_ATTR_PACKED_END;

} // namespace header
} // namespace rtcp
} // namespace roc
Expand Down
42 changes: 31 additions & 11 deletions src/internal_modules/roc_rtcp/print_packet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,26 +151,30 @@ void print_xr_measurement_info(core::Printer& p,
(long long)packet::ntp_2_nanoseconds(blk.cum_duration()));
}

void print_xr_delay_metrics(core::Printer& p, const header::XrDelayMetricsBlock& blk) {
p.writef("|- delay:\n");

print_xr_block_header(p, blk.header());

p.writef("|-- block body:\n");
switch (blk.metric_flag()) {
void print_metric_flag(core::Printer& p, const header::MetricFlag flag) {
switch (flag) {
case header::MetricFlag_IntervalDuration:
p.writef("|--- flag: interval (%d)\n", blk.metric_flag());
p.writef("|--- flag: interval (%d)\n", flag);
break;
case header::MetricFlag_CumulativeDuration:
p.writef("|--- flag: cumulative (%d)\n", blk.metric_flag());
p.writef("|--- flag: cumulative (%d)\n", flag);
break;
case header::MetricFlag_SampledValue:
p.writef("|--- flag: sample (%d)\n", blk.metric_flag());
p.writef("|--- flag: sample (%d)\n", flag);
break;
default:
p.writef("|--- flag: unknown (%d)\n", blk.metric_flag());
p.writef("|--- flag: unknown (%d)\n", flag);
break;
}
}

void print_xr_delay_metrics(core::Printer& p, const header::XrDelayMetricsBlock& blk) {
p.writef("|- delay:\n");

print_xr_block_header(p, blk.header());

p.writef("|-- block body:\n");
print_metric_flag(p, blk.metric_flag());
p.writef("|--- ssrc: %lu\n", (unsigned long)blk.ssrc());
p.writef("|--- rtt_mean: %016llx (unix %lld)\n", (unsigned long long)blk.mean_rtt(),
(long long)packet::ntp_2_nanoseconds(blk.mean_rtt()));
Expand All @@ -182,6 +186,18 @@ void print_xr_delay_metrics(core::Printer& p, const header::XrDelayMetricsBlock&
(long long)packet::ntp_2_nanoseconds(blk.e2e_delay()));
}

void print_xr_queue_metrics(core::Printer& p, const header::XrQueueMetricsBlock& blk) {
p.writef("|- queue:\n");

print_xr_block_header(p, blk.header());

p.writef("|-- block body:\n");
print_metric_flag(p, blk.metric_flag());
p.writef("|--- ssrc: %lu\n", (unsigned long)blk.ssrc());
p.writef("|--- niq_delay: %016llx (unix %lld)\n", (unsigned long long)blk.niq_delay(),
(long long)packet::ntp_2_nanoseconds(blk.niq_delay()));
}

void print_xr(core::Printer& p, const XrTraverser& xr) {
p.writef("+ xr:\n");

Expand Down Expand Up @@ -214,6 +230,10 @@ void print_xr(core::Printer& p, const XrTraverser& xr) {
case XrTraverser::Iterator::DELAY_METRICS_BLOCK:
print_xr_delay_metrics(p, iter.get_delay_metrics());
break;

case XrTraverser::Iterator::QUEUE_METRICS_BLOCK:
print_xr_queue_metrics(p, iter.get_queue_metrics());
break;
}
}
}
Expand Down
59 changes: 59 additions & 0 deletions src/internal_modules/roc_rtcp/reporter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,44 @@ void Reporter::process_delay_metrics_block(const header::XrPacket& xr,
update_stream_(*stream);
}

// Process XR Queue Metrics data generated by remote receiver.
void Reporter::process_queue_metrics_block(const header::XrPacket& xr,
const header::XrQueueMetricsBlock& blk) {
roc_panic_if_msg(report_state_ != State_Processing,
"rtcp reporter: invalid call order");

// SSRC from XR is stream receiver (RTCP packet originator).
// SSRC from Queue Metrics block is stream sender (RTCP packet recipient).
const packet::stream_source_t recv_source_id = xr.ssrc();
const packet::stream_source_t send_source_id = blk.ssrc();

detect_collision_(recv_source_id);

if (send_source_id != local_source_id_) {
// This report is for different sender, not for us, so ignore it.
// Typical for multicast sessions.
return;
}

// Report to local sending stream from remote receiver.
core::SharedPtr<Stream> stream = find_stream_(recv_source_id, NoAutoCreate);
if (!stream || !stream->has_remote_recv_report) {
// Ignore Queue Metrics if there was no matching SR.
return;
}

roc_log(LogTrace,
"rtcp reporter: processing Queue Metrics block: send_ssrc=%lu recv_ssrc=%lu",
(unsigned long)send_source_id, (unsigned long)recv_source_id);

if (blk.has_niq_delay()) {
stream->remote_recv_report.niq_latency =
packet::ntp_2_nanoseconds(blk.niq_delay());
}

update_stream_(*stream);
}

// Process BYE message generated by sender.
void Reporter::process_goodbye(const packet::stream_source_t ssrc) {
roc_panic_if_msg(report_state_ != State_Processing,
Expand Down Expand Up @@ -787,6 +825,27 @@ void Reporter::generate_delay_metrics_block(size_t addr_index,
}
}

// Generate XR Queue Metrics block to deliver to remote sender.
void Reporter::generate_queue_metrics_block(size_t addr_index,
size_t stream_index,
header::XrQueueMetricsBlock& blk) {
roc_panic_if_msg(!is_receiving(),
"rtcp reporter: Queue Metrics can be generated only by receiver");

Stream* stream = address_index_[addr_index]->recv_stream_index[stream_index];
roc_panic_if(!stream);

blk.reset();

blk.set_ssrc(stream->source_id);
blk.set_metric_flag(header::MetricFlag_SampledValue);

if (stream->local_recv_report->niq_latency > 0) {
blk.set_niq_delay(
packet::nanoseconds_2_ntp(stream->local_recv_report->niq_latency));
}
}

bool Reporter::need_goodbye() const {
roc_panic_if_msg(report_state_ != State_Generating,
"rtcp reporter: invalid call order");
Expand Down
11 changes: 11 additions & 0 deletions src/internal_modules/roc_rtcp/reporter.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ class Reporter : public core::NonCopyable<> {
void process_delay_metrics_block(const header::XrPacket& xr,
const header::XrDelayMetricsBlock& blk);

//! Process XR Queue Metrics block (extended receiver report).
void process_queue_metrics_block(const header::XrPacket& xr,
const header::XrQueueMetricsBlock& blk);

//! Process BYE message.
void process_goodbye(packet::stream_source_t ssrc);

Expand Down Expand Up @@ -215,6 +219,13 @@ class Reporter : public core::NonCopyable<> {
size_t stream_index,
header::XrDelayMetricsBlock& blk);

//! Generate XR Queue Metrics block (extended receiver report).
//! @p addr_index should be in range [0; num_dest_addresses()-1].
//! @p stream_index should be in range [0; num_receiving_streams()-1].
void generate_queue_metrics_block(size_t addr_index,
size_t stream_index,
header::XrQueueMetricsBlock& blk);

//! Check if BYE message should be included.
bool need_goodbye() const;

Expand Down
9 changes: 7 additions & 2 deletions src/internal_modules/roc_rtcp/reports.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,13 @@ struct RecvReport {
//! interarrival time.
core::nanoseconds_t jitter;

//! Estimated network incoming queue latency.
//! An estimate of how much media is buffered in receiver packet queue.
core::nanoseconds_t niq_latency;

//! Estimated end-to-end latency.
//! An estimate of the time from recording a frame on sender to playing it on
//! receiver.
//! An estimate of the time from recording a frame on sender to playing it
//! on receiver.
core::nanoseconds_t e2e_latency;

//! Estimated offset of remote clock relative to local clock.
Expand All @@ -174,6 +178,7 @@ struct RecvReport {
, fract_loss(0)
, cum_loss(0)
, jitter(0)
, niq_latency(0)
, e2e_latency(0)
, clock_offset(0)
, rtt(0) {
Expand Down
Loading

0 comments on commit a426853

Please sign in to comment.