Skip to content

Commit

Permalink
Preparation for E2E latency tuning
Browse files Browse the repository at this point in the history
In order to be able to tune receiver's latency
relying on timestamp mapping that we get from
RTCP feedback, and UDP::Receive_timestamp,
adding these features:

* roc-streaminggh-674: Use receive timestamp (RTS) as report time
  when processing RTCP report;

* RTT dumping for debugging (csvplotter ts_offset branch);

* SCHED_RR for network io thread (run with root privs).
  • Loading branch information
baranovmv committed Dec 4, 2024
1 parent d90cf31 commit 4fd3012
Show file tree
Hide file tree
Showing 11 changed files with 202 additions and 142 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ bool Thread::enable_realtime() {
errno_to_str(err).c_str());
return false;
}
roc_log(LogDebug, "thread: set realtime priority");

return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ NetworkLoop::NetworkLoop(core::IPool& packet_pool,
task_sem_.data = this;
task_sem_initialized_ = true;

enable_realtime();
if (!(started_ = Thread::start())) {
init_status_ = status::StatusErrThread;
return;
Expand Down
6 changes: 3 additions & 3 deletions src/internal_modules/roc_pipeline/receiver_session_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,9 @@ ReceiverSessionGroup::create_control_pipeline(ReceiverEndpoint* control_endpoint
// We pass this as implementation of rtcp::IParticipant.
// rtcp::Communicator will call our methods right now (in constructor)
// and later when we call generate_packets() or process_packets().
rtcp_communicator_.reset(new (rtcp_communicator_) rtcp::Communicator(
source_config_.common.rtcp, *this, *control_endpoint->outbound_writer(),
*control_endpoint->outbound_composer(), packet_factory_, arena_));
rtcp_communicator_.reset(new(rtcp_communicator_) rtcp::Communicator(
source_config_.common.rtcp, *this, *control_endpoint->outbound_writer(),
*control_endpoint->outbound_composer(), packet_factory_, arena_, dumper_));

const status::StatusCode code = rtcp_communicator_->init_status();
if (code != status::StatusOK) {
Expand Down
6 changes: 3 additions & 3 deletions src/internal_modules/roc_pipeline/sender_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -233,9 +233,9 @@ SenderSession::create_control_pipeline(SenderEndpoint* control_endpoint) {

rtcp_outbound_addr_ = control_endpoint->outbound_address();

rtcp_communicator_.reset(new (rtcp_communicator_) rtcp::Communicator(
sink_config_.rtcp, *this, control_endpoint->outbound_writer(),
control_endpoint->outbound_composer(), packet_factory_, arena_));
rtcp_communicator_.reset(new(rtcp_communicator_) rtcp::Communicator(
sink_config_.rtcp, *this, control_endpoint->outbound_writer(),
control_endpoint->outbound_composer(), packet_factory_, arena_, dumper_));

const status::StatusCode code = rtcp_communicator_->init_status();
if (code != status::StatusOK) {
Expand Down
18 changes: 8 additions & 10 deletions src/internal_modules/roc_rtcp/communicator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "roc_core/log.h"
#include "roc_core/panic.h"
#include "roc_core/time.h"
#include "roc_dbgio/csv_dumper.h"
#include "roc_packet/ntp.h"
#include "roc_packet/units.h"
#include "roc_rtcp/headers.h"
Expand All @@ -25,17 +26,14 @@ const core::nanoseconds_t LogInterval = core::Second * 30;

} // namespace

Communicator::Communicator(const Config& config,
IParticipant& participant,
packet::IWriter& packet_writer,
packet::IComposer& packet_composer,
packet::PacketFactory& packet_factory,
core::IArena& arena)
Communicator::Communicator(const Config &config, IParticipant &participant, packet::IWriter &packet_writer,
packet::IComposer &packet_composer, packet::PacketFactory &packet_factory,
core::IArena &arena, dbgio::CsvDumper* dumper)
: packet_factory_(packet_factory)
, packet_writer_(packet_writer)
, packet_composer_(packet_composer)
, config_(config)
, reporter_(config, participant, arena)
, reporter_(config, participant, arena, dumper)
, next_deadline_(0)
, dest_addr_count_(0)
, dest_addr_index_(0)
Expand All @@ -50,7 +48,8 @@ Communicator::Communicator(const Config& config,
, processed_packet_count_(0)
, generated_packet_count_(0)
, log_limiter_(LogInterval)
, init_status_(status::NoStatus) {
, init_status_(status::NoStatus)
, dumper_(dumper) {
if ((init_status_ = reporter_.init_status()) != status::StatusOK) {
return;
}
Expand All @@ -76,7 +75,6 @@ status::StatusCode Communicator::process_packet(const packet::PacketPtr& packet,
roc_panic_if_msg(!packet, "rtcp communicator: null packet");
roc_panic_if_msg(!packet->udp(), "rtcp communicator: non-udp packet");
roc_panic_if_msg(!packet->rtcp(), "rtcp communicator: non-rtcp packet");
roc_panic_if_msg(current_time <= 0, "rtcp communicator: invalid timestamp");

roc_log(LogTrace, "rtcp communicator: processing incoming packet");

Expand All @@ -90,7 +88,7 @@ status::StatusCode Communicator::process_packet(const packet::PacketPtr& packet,
}

status::StatusCode status =
reporter_.begin_processing(packet->udp()->src_addr, current_time);
reporter_.begin_processing(packet->udp()->src_addr, packet->udp()->receive_timestamp);
roc_log(LogTrace, "rtcp communicator: begin_processing(): status=%s",
status::code_to_str(status));

Expand Down
12 changes: 6 additions & 6 deletions src/internal_modules/roc_rtcp/communicator.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "roc_core/rate_limiter.h"
#include "roc_core/stddefs.h"
#include "roc_core/time.h"
#include "roc_dbgio/csv_dumper.h"
#include "roc_packet/icomposer.h"
#include "roc_packet/iwriter.h"
#include "roc_packet/packet.h"
Expand Down Expand Up @@ -56,12 +57,9 @@ namespace rtcp {
class Communicator : public core::NonCopyable<> {
public:
//! Initialize.
Communicator(const Config& config,
IParticipant& participant,
packet::IWriter& packet_writer,
packet::IComposer& packet_composer,
packet::PacketFactory& packet_factory,
core::IArena& arena);
Communicator(const Config &config, IParticipant &participant, packet::IWriter &packet_writer,
packet::IComposer &packet_composer, packet::PacketFactory &packet_factory,
core::IArena &arena, dbgio::CsvDumper* dumper);

//! Check if the object was successfully constructed.
status::StatusCode init_status() const;
Expand Down Expand Up @@ -169,6 +167,8 @@ class Communicator : public core::NonCopyable<> {
core::RateLimiter log_limiter_;

status::StatusCode init_status_;

dbgio::CsvDumper* dumper_;
};

} // namespace rtcp
Expand Down
7 changes: 4 additions & 3 deletions src/internal_modules/roc_rtcp/reporter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
namespace roc {
namespace rtcp {

Reporter::Reporter(const Config& config, IParticipant& participant, core::IArena& arena)
Reporter::Reporter(const Config &config, IParticipant &participant, core::IArena &arena, dbgio::CsvDumper *dumper)
: arena_(arena)
, participant_(participant)
, local_source_id_(0)
Expand All @@ -43,7 +43,8 @@ Reporter::Reporter(const Config& config, IParticipant& participant, core::IArena
, report_time_(0)
, config_(config)
, max_delay_(packet::ntp_2_nanoseconds(header::MaxDelay))
, init_status_(status::NoStatus) {
, init_status_(status::NoStatus)
, dumper_(dumper) {
memset(local_cname_, 0, sizeof(local_cname_));

const ParticipantInfo part_info = participant_.participant_info();
Expand Down Expand Up @@ -1400,7 +1401,7 @@ Reporter::find_stream_(packet::stream_source_t source_id, CreateMode mode) {
(unsigned long)source_id);

stream =
new (stream_pool_) Stream(stream_pool_, source_id, report_time_, config_.rtt);
new(stream_pool_) Stream(arena_, stream_pool_, source_id, report_time_, config_.rtt, dumper_);
if (!stream) {
report_error_ = status::StatusNoMem;
return NULL;
Expand Down
14 changes: 8 additions & 6 deletions src/internal_modules/roc_rtcp/reporter.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "roc_rtcp/rtt_estimator.h"
#include "roc_rtcp/sdes.h"
#include "roc_status/status_code.h"
#include "roc_dbgio/csv_dumper.h"

namespace roc {
namespace rtcp {
Expand Down Expand Up @@ -90,7 +91,7 @@ namespace rtcp {
class Reporter : public core::NonCopyable<> {
public:
//! Initialize.
Reporter(const Config& config, IParticipant& participant, core::IArena& arena);
Reporter(const Config &config, IParticipant &participant, core::IArena &arena, dbgio::CsvDumper *dumper);
~Reporter();

//! Check if the object was successfully constructed.
Expand Down Expand Up @@ -261,16 +262,15 @@ class Reporter : public core::NonCopyable<> {
struct Stream : core::RefCounted<Stream, core::PoolAllocation>,
core::HashmapNode<>,
core::ListNode<> {
Stream(core::IPool& pool,
packet::stream_source_t source_id,
Stream(core::IArena &arena, core::IPool &pool, packet::stream_source_t source_id,
core::nanoseconds_t report_time,
const RttConfig& rtt_config)
const RttConfig &rtt_config, dbgio::CsvDumper *dumper)
: core::RefCounted<Stream, core::PoolAllocation>(pool)
, source_id(source_id)
, has_remote_recv_report(false)
, remote_recv_rtt(rtt_config)
, remote_recv_rtt(arena, rtt_config, dumper)
, has_remote_send_report(false)
, remote_send_rtt(rtt_config)
, remote_send_rtt(arena, rtt_config, dumper)
, local_recv_report(NULL)
, last_update(report_time)
, last_local_sr(0)
Expand Down Expand Up @@ -483,6 +483,8 @@ class Reporter : public core::NonCopyable<> {
const core::nanoseconds_t max_delay_;

status::StatusCode init_status_;

dbgio::CsvDumper* dumper_;
};

} // namespace rtcp
Expand Down
37 changes: 33 additions & 4 deletions src/internal_modules/roc_rtcp/rtt_estimator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@
namespace roc {
namespace rtcp {

RttEstimator::RttEstimator(const RttConfig& config)
RttEstimator::RttEstimator(core::IArena &arena, const RttConfig &config, dbgio::CsvDumper *dumper)
: config_(config)
, metrics_()
, has_metrics_(false)
, first_report_ts_(0)
, last_report_ts_(0) {
, last_report_ts_(0)
, dumper_(dumper)
, rtt_stats_(arena, 100){
}

bool RttEstimator::has_metrics() const {
Expand Down Expand Up @@ -85,10 +87,37 @@ void RttEstimator::update(core::nanoseconds_t local_report_ts,
}
last_report_ts_ = local_report_ts;

metrics_.clock_offset = clock_offset;
metrics_.rtt = rtt;
RttOffsetPair p;
p.rtt = rtt;
p.offset = clock_offset;
rtt_stats_.add(p);
RttOffsetPair min = rtt_stats_.mov_min();

// metrics_.clock_offset = clock_offset;
// metrics_.rtt = rtt;
metrics_.clock_offset = min.offset;
metrics_.rtt = min.rtt;

has_metrics_ = true;

if (dumper_) {
dump_(local_report_ts, remote_report_ts, remote_reply_ts, local_reply_ts);
}
}

void RttEstimator::dump_(core::nanoseconds_t local_report_ts, core::nanoseconds_t remote_report_ts,
core::nanoseconds_t remote_reply_ts, core::nanoseconds_t local_reply_ts) {
dbgio::CsvEntry e;
e.type = 'r';
e.n_fields = 7;
e.fields[0] = core::timestamp(core::ClockUnix);
e.fields[1] = metrics_.rtt;
e.fields[2] = metrics_.clock_offset;
e.fields[3] = local_report_ts;
e.fields[4] = remote_report_ts;
e.fields[5] = remote_reply_ts;
e.fields[6] = local_reply_ts;
dumper_->write(e);
}

} // namespace rtcp
Expand Down
30 changes: 29 additions & 1 deletion src/internal_modules/roc_rtcp/rtt_estimator.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

#include "roc_core/time.h"
#include "roc_packet/units.h"
#include "roc_dbgio/csv_dumper.h"
#include "roc_stat/mov_aggregate.h"

namespace roc {
namespace rtcp {
Expand Down Expand Up @@ -51,7 +53,7 @@ struct RttMetrics {
class RttEstimator {
public:
//! Initialize.
RttEstimator(const RttConfig& config);
RttEstimator(core::IArena &arena, const RttConfig &config, dbgio::CsvDumper *dumper);

//! Check whether metrics are already available.
bool has_metrics() const;
Expand All @@ -71,12 +73,38 @@ class RttEstimator {
core::nanoseconds_t local_reply_ts);

private:
void dump_(core::nanoseconds_t local_report_ts, core::nanoseconds_t remote_report_ts,
core::nanoseconds_t remote_reply_ts, core::nanoseconds_t local_reply_ts);

const RttConfig config_;
RttMetrics metrics_;
bool has_metrics_;

core::nanoseconds_t first_report_ts_;
core::nanoseconds_t last_report_ts_;

dbgio::CsvDumper *dumper_;

struct RttOffsetPair {
core::nanoseconds_t rtt;
core::nanoseconds_t offset;

RttOffsetPair() {
rtt = 10e9;
}

RttOffsetPair(const int64_t &r) {
rtt = r;
offset = 0;
}

operator int64_t() const {
return rtt;
}
};

stat::MovAggregate<RttOffsetPair> rtt_stats_;

};

} // namespace rtcp
Expand Down
Loading

0 comments on commit 4fd3012

Please sign in to comment.