Skip to content

Commit

Permalink
Add changes based on PR review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
ForeverASilver committed Oct 13, 2023
1 parent 8fcee5f commit cb7a586
Show file tree
Hide file tree
Showing 8 changed files with 45 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ void UdpReceiverPort::recv_cb_(uv_udp_t* handle,

pp->udp()->src_addr = src_addr;
pp->udp()->dst_addr = self.config_.bind_address;
pp->udp()->recieve_timestamp = core::timestamp(core::ClockMonotonic);

pp->set_data(core::Slice<uint8_t>(*bp, 0, (size_t)nread));

Expand Down
3 changes: 3 additions & 0 deletions src/internal_modules/roc_packet/target_libuv/roc_packet/udp.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ struct UDP {

//! Sender request state.
uv_udp_send_t request;

//! Received Timestamp.
core::nanoseconds_t recieve_timestamp;
};

} // namespace packet
Expand Down
16 changes: 6 additions & 10 deletions src/internal_modules/roc_pipeline/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,9 @@ struct ReceiverSessionConfig {
//! Target latency, nanoseconds.
core::nanoseconds_t target_latency;

//! Packet prebuffer length, nanoseconds.
core::nanoseconds_t prebuf_len;

//! Packet payload type.
unsigned int payload_type;

Expand All @@ -186,15 +189,12 @@ struct ReceiverSessionConfig {
//! Resampler profile.
audio::ResamplerProfile resampler_profile;

//! Max number of receiver sessions.
size_t max_sessions;

ReceiverSessionConfig()
: target_latency(DefaultLatency)
, prebuf_len(DefaultLatency)
, payload_type(0)
, resampler_backend(audio::ResamplerBackend_Default)
, resampler_profile(audio::ResamplerProfile_Medium)
, max_sessions(0) {
, resampler_profile(audio::ResamplerProfile_Medium) {
latency_monitor.deduce_min_latency(DefaultLatency);
latency_monitor.deduce_max_latency(DefaultLatency);
}
Expand Down Expand Up @@ -232,16 +232,12 @@ struct ReceiverCommonConfig {
//! Insert weird beeps instead of silence on packet loss.
bool enable_beeping;

//! Maximum number of packets per session.
size_t max_session_packets;

ReceiverCommonConfig()
: output_sample_spec(DefaultSampleSpec)
, enable_timing(false)
, enable_auto_reclock(false)
, enable_profiling(false)
, enable_beeping(false)
, max_session_packets(0) {
, enable_beeping(false) {
}
};

Expand Down
34 changes: 15 additions & 19 deletions src/internal_modules/roc_pipeline/receiver_session_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
#include "roc_address/socket_addr_to_str.h"
#include "roc_core/log.h"
#include "roc_core/panic.h"
#include "roc_core/parse_duration.h"
#include <iostream>

namespace roc {
namespace pipeline {
Expand Down Expand Up @@ -165,6 +165,8 @@ void ReceiverSessionGroup::route_transport_packet_(const packet::PacketPtr& pack

if (can_create_session_(packet)) {
create_session_(packet);
} else {
enqueue_prebuf_packet_(packet);
}
}

Expand All @@ -179,7 +181,6 @@ void ReceiverSessionGroup::route_control_packet_(const packet::PacketPtr& packet
}

if (!rtcp_session_->is_valid()) {
drop_packet_(packet);
return;
}

Expand All @@ -190,40 +191,35 @@ void ReceiverSessionGroup::route_control_packet_(const packet::PacketPtr& packet
bool ReceiverSessionGroup::can_create_session_(const packet::PacketPtr& packet) {
if (packet->flags() & packet::Packet::FlagRepair) {
roc_log(LogDebug, "session group: ignoring repair packet for unknown session");
drop_packet_(packet);
return false;
}

return true;
}

void ReceiverSessionGroup::drop_packet_(const packet::PacketPtr& packet_ptr) {
dropped_packet_buffer_.push_back(*packet_ptr);
void ReceiverSessionGroup::enqueue_prebuf_packet_(const packet::PacketPtr& packet_ptr) {
prebuf_packets_.push_back(*packet_ptr.get());

const size_t max_size = receiver_config_.common.max_session_packets
* receiver_config_.default_session.max_sessions;
core::nanoseconds_t now = core::timestamp(core::ClockMonotonic);
core::nanoseconds_t received = prebuf_packets_.front()->udp()->recieve_timestamp;

if (max_size == 0) {
return;
}

if (dropped_packet_buffer_.size() > max_size) {
dropped_packet_buffer_.remove(*dropped_packet_buffer_.front());
if (now - received > receiver_config_.default_session.prebuf_len) {
prebuf_packets_.remove(*prebuf_packets_.front());
}
}

void ReceiverSessionGroup::handle_buffer_(ReceiverSession& sess) {
void ReceiverSessionGroup::dequeue_prebuf_packets_(ReceiverSession& sess) {
packet::PacketPtr curr, next;

if (dropped_packet_buffer_.size() == 0) {
if (prebuf_packets_.size() == 0) {
return;
}

for (curr = dropped_packet_buffer_.front(); curr; curr = next) {
next = dropped_packet_buffer_.nextof(*curr);
for (curr = prebuf_packets_.front(); curr; curr = next) {
next = prebuf_packets_.nextof(*curr);

if (sess.handle(curr)) {
dropped_packet_buffer_.remove(*curr);
prebuf_packets_.remove(*curr);
}
}
}
Expand Down Expand Up @@ -270,7 +266,7 @@ void ReceiverSessionGroup::create_session_(const packet::PacketPtr& packet) {

receiver_state_.add_sessions(+1);

handle_buffer_(*sess);
dequeue_prebuf_packets_(*sess);
}

void ReceiverSessionGroup::remove_session_(ReceiverSession& sess) {
Expand Down
6 changes: 3 additions & 3 deletions src/internal_modules/roc_pipeline/receiver_session_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ class ReceiverSessionGroup : public core::NonCopyable<>, private rtcp::IReceiver

void route_transport_packet_(const packet::PacketPtr& packet);
void route_control_packet_(const packet::PacketPtr& packet);
void drop_packet_(const packet::PacketPtr& packet);
void handle_buffer_(ReceiverSession& sess);
void enqueue_prebuf_packet_(const packet::PacketPtr& packet);
void dequeue_prebuf_packets_(ReceiverSession& sess);

bool can_create_session_(const packet::PacketPtr& packet);

Expand All @@ -109,7 +109,7 @@ class ReceiverSessionGroup : public core::NonCopyable<>, private rtcp::IReceiver
core::Optional<rtcp::Session> rtcp_session_;

core::List<ReceiverSession> sessions_;
core::List<packet::Packet> dropped_packet_buffer_;
core::List<packet::Packet> prebuf_packets_;
};

} // namespace pipeline
Expand Down
14 changes: 6 additions & 8 deletions src/public_api/include/roc/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -782,15 +782,13 @@ typedef struct roc_receiver_config {
*/
long long choppy_playback_timeout;

/** Maximum number of receiver sessions.
* If zero, unlimited number of sessions allowed.
*/
unsigned int max_sessions;

/** Maximum number of packets in ring buffer per session.
* If zero, unlimited number of packets allowed.
/** Packet prebuffer length, in nanoseconds.
* Packets received for sessions that have not yet been created
* will be buffered. Any packets older than the prebuf_len
* will be discarded.
* If zero, default value is used.
*/
unsigned int max_session_packets;
unsigned long long prebuf_len;

} roc_receiver_config;

Expand Down
7 changes: 2 additions & 5 deletions src/tools/roc_recv/cmdline.ggo
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,8 @@ section "Options"
option "no-play-timeout" - "No playback timeout, TIME units"
string optional

option "max-session-packets" - "Max number of buffered session packets"
int optional

option "max-sessions" - "Max number of receiver sessions"
int optional
option "prebuf-len" - "Length of packet prebuffer, TIME units"
string optional

option "choppy-play-timeout" - "Choppy playback timeout, TIME units"
string optional
Expand Down
21 changes: 9 additions & 12 deletions src/tools/roc_recv/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,20 +155,17 @@ int main(int argc, char** argv) {
}
}

if (args.max_session_packets_given) {
if (args.max_session_packets_arg <= 0) {
roc_log(LogError, "invalid --max-session-packets: should be > 0");
if (args.prebuf_len_given) {
core::nanoseconds_t prebuf_len = 0;
if (!core::parse_duration(args.prebuf_len_arg, prebuf_len)) {
roc_log(LogError, "invalid --prebuf-len");
return 1;
}
receiver_config.common.max_session_packets = (size_t)args.max_session_packets_arg;
}

if (args.max_sessions_given) {
if (args.max_sessions_arg <= 0) {
roc_log(LogError, "invalid --max-sessions: should be > 0");
return 1;
}
receiver_config.default_session.max_sessions = (size_t)args.max_sessions_arg;
receiver_config.default_session.prebuf_len =
(core::nanoseconds_t)args.prebuf_len_arg;
} else {
receiver_config.default_session.prebuf_len =
receiver_config.default_session.target_latency;
}

if (args.choppy_play_timeout_given) {
Expand Down

0 comments on commit cb7a586

Please sign in to comment.