Skip to content

Commit

Permalink
:WIP rtp stream calculations
Browse files Browse the repository at this point in the history
  • Loading branch information
baranovmv committed Nov 26, 2023
1 parent d7c2bcd commit 28efa0d
Show file tree
Hide file tree
Showing 9 changed files with 141 additions and 9 deletions.
3 changes: 2 additions & 1 deletion src/internal_modules/roc_packet/rtp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ RTP::RTP()
, duration(0)
, capture_timestamp(0)
, marker(false)
, payload_type(0) {
, payload_type(0)
, fec_recovered(false) {
}

int RTP::compare(const RTP& other) const {
Expand Down
4 changes: 4 additions & 0 deletions src/internal_modules/roc_packet/rtp.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ struct RTP {
//! Packet payload type ("pt").
unsigned int payload_type;

//! Internal flag, valid for receiver side only.
//! Signals if this packet was recovered by FEC.
bool fec_recovered;

//! Packet header.
core::Slice<uint8_t> header;

Expand Down
6 changes: 6 additions & 0 deletions src/internal_modules/roc_packet/target_libuv/roc_packet/udp.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ struct UDP {

//! Sender request state.
uv_udp_send_t request;

//! When the packet was put into jitter-buffer queue.
//! It points to a moment when the packet was transferred to a sink-thread, that
//! "consumes" this packet. The reason to have it separate is that this allows
//! us to account additional jitter introduced by thread-switch time.
core::nanoseconds_t enqueue_ts;
};

} // namespace packet
Expand Down
5 changes: 3 additions & 2 deletions src/internal_modules/roc_pipeline/receiver_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ ReceiverSession::ReceiverSession(
preader = validator_.get();

populator_.reset(new (populator_) rtp::Populator(*preader, *payload_decoder_,
encoding->sample_spec));
encoding->sample_spec), false);
if (!populator_) {
return;
}
Expand Down Expand Up @@ -115,7 +115,7 @@ ReceiverSession::ReceiverSession(
preader = fec_validator_.get();

fec_populator_.reset(new (fec_populator_) rtp::Populator(
*preader, *payload_decoder_, encoding->sample_spec));
*preader, *payload_decoder_, encoding->sample_spec), true);
if (!fec_populator_) {
return;
}
Expand Down Expand Up @@ -221,6 +221,7 @@ status::StatusCode ReceiverSession::route(const packet::PacketPtr& packet) {
roc_panic_if(!is_valid());

packet::UDP* udp = packet->udp();
packet->udp()->enqueue_ts = core::timestamp(core::ClockUnix);
if (!udp) {
// TODO(gh-183): return StatusNoRoute
return status::StatusUnknown;
Expand Down
8 changes: 6 additions & 2 deletions src/internal_modules/roc_rtp/populator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@ namespace rtp {

Populator::Populator(packet::IReader& reader,
audio::IFrameDecoder& decoder,
const audio::SampleSpec& sample_spec)
const audio::SampleSpec& sample_spec,
const bool set_recovered)
: reader_(reader)
, decoder_(decoder)
, sample_spec_(sample_spec) {
, sample_spec_(sample_spec)
, set_recovered_(set_recovered) {
}

status::StatusCode Populator::read(packet::PacketPtr& packet) {
Expand All @@ -36,6 +38,8 @@ status::StatusCode Populator::read(packet::PacketPtr& packet) {
packet->rtp()->payload.data(), packet->rtp()->payload.size());
}

packet->rtp()->fec_recovered = set_recovered_;

return status::StatusOK;
}

Expand Down
4 changes: 3 additions & 1 deletion src/internal_modules/roc_rtp/populator.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ class Populator : public packet::IReader, public core::NonCopyable<> {
//! Initialize.
Populator(packet::IReader& reader,
audio::IFrameDecoder& decoder,
const audio::SampleSpec& sample_spec);
const audio::SampleSpec& sample_spec,
const bool set_recovered);

//! Read next packet.
virtual ROC_ATTR_NODISCARD status::StatusCode read(packet::PacketPtr&);
Expand All @@ -35,6 +36,7 @@ class Populator : public packet::IReader, public core::NonCopyable<> {
packet::IReader& reader_;
audio::IFrameDecoder& decoder_;
const audio::SampleSpec sample_spec_;
const bool set_recovered_;
};

} // namespace rtp
Expand Down
34 changes: 34 additions & 0 deletions src/internal_modules/roc_rtp/stream_stats_monitor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright (c) 2023 Roc Streaming authors
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/

#include "stream_stats_monitor.h"
#include "roc_status/status_code.h"

namespace roc {
namespace rtp {

StreamStatsMonitor::StreamStatsMonitor(packet::IReader& reader,
core::IArena& arena,
const audio::SampleSpec& sample_spec,
const StreamStatsConfig &config)
: reader_(reader)
, arena_(arena)
, config_(config)
, sample_spec_(sample_spec)
{}

status::StatusCode StreamStatsMonitor::read(PacketPtr& packet)
{
status::StatusCode result = reader_.read(packet);
if (result == status::Ok)

return result;
}

} // namespace packet
} // namespace roc
58 changes: 58 additions & 0 deletions src/internal_modules/roc_rtp/stream_stats_monitor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright (c) 2023 Roc Streaming authors
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/

//! @file roc_rtp/stream_stats_monitor.h
//! @brief Calculates basic network stream statistics.


#ifndef ROC_PACKET_STREAMSTATSMONITOR_H_
#define ROC_PACKET_STREAMSTATSMONITOR_H_

#include "roc_core/noncopyable.h"
#include "roc_packet/ireader.h"
#include "roc_core/time.h"
#include "roc_audio/sample_spec.h"
#include "roc_status/status_code.h"

namespace roc {
namespace rtp {

struct StreamStatsConfig {
core::nanoseconds_t window_duration;
core::nanoseconds_t window_overlap;
};

struct StreamStats {
core::nanoseconds_t windowed_max_jitter;

size_t windowed_npackets;
size_t windowed_lost_packets;
size_t windowed_recovered_packets;

float packet_loss_rate;
};

class StreamStatsMonitor : public IReader, public core::NonCopyable<> {
public:
StreamStatsMonitor(packet::IReader& reader, core::IArena& arena,
const audio::SampleSpec& sample_spec,
const StreamStatsConfig &config);

virtual ROC_ATTR_NODISCARD status::StatusCode read(PacketPtr& packet);
private:
packet::IReader& reader_;
core::IArena& arena_;
const StreamStatsConfig config_;
const audio::SampleSpec sample_spec_;

};

} // namespace packet
} // namespace roc

#endif // ROC_PACKET_STREAMSTATSMONITOR_H_
28 changes: 25 additions & 3 deletions src/tests/roc_rtp/test_populator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ TEST(populator, failed_to_read_packet) {
for (unsigned n = 0; n < ROC_ARRAY_SIZE(codes); ++n) {
test::StatusReader reader(codes[n]);
audio::PcmDecoder decoder(PcmFmt, SampleSpec);
Populator populator(reader, decoder, SampleSpec);
Populator populator(reader, decoder, SampleSpec, false);

packet::PacketPtr pp;
LONGS_EQUAL(codes[n], populator.read(pp));
Expand All @@ -75,7 +75,7 @@ TEST(populator, failed_to_read_packet) {
TEST(populator, empty_duration) {
packet::Queue queue;
audio::PcmDecoder decoder(PcmFmt, SampleSpec);
Populator populator(queue, decoder, SampleSpec);
Populator populator(queue, decoder, SampleSpec, false);

const packet::stream_timestamp_t packet_duration = 0;
const packet::stream_timestamp_t expected_duration = 32;
Expand All @@ -89,12 +89,13 @@ TEST(populator, empty_duration) {
CHECK(wp == rp);

LONGS_EQUAL(expected_duration, rp->rtp()->duration);
CHECK_EQUAL(false, rp->rtp()->fec_recovered);
}

TEST(populator, non_empty_duration) {
packet::Queue queue;
audio::PcmDecoder decoder(PcmFmt, SampleSpec);
Populator populator(queue, decoder, SampleSpec);
Populator populator(queue, decoder, SampleSpec, false);

const packet::stream_timestamp_t duration = 100;

Expand All @@ -108,6 +109,27 @@ TEST(populator, non_empty_duration) {
CHECK(rp);
CHECK(wp == rp);
LONGS_EQUAL(duration, rp->rtp()->duration);
CHECK_EQUAL(false, rp->rtp()->fec_recovered);
}

TEST(populator, non_empty_duration_recovered) {
packet::Queue queue;
audio::PcmDecoder decoder(PcmFmt, SampleSpec);
Populator populator(queue, decoder, SampleSpec, true);

const packet::stream_timestamp_t duration = 100;

core::Slice<uint8_t> buffer = buffer_factory.new_buffer();
CHECK(buffer);
packet::PacketPtr wp = new_packet(duration);
queue.write(wp);

packet::PacketPtr rp;
LONGS_EQUAL(0, populator.read(rp));
CHECK(rp);
CHECK(wp == rp);
LONGS_EQUAL(duration, rp->rtp()->duration);
CHECK_EQUAL(true, rp->rtp()->fec_recovered);
}

} // namespace rtp
Expand Down

0 comments on commit 28efa0d

Please sign in to comment.