Skip to content

Commit

Permalink
roc-streaminggh-674 Fix RTCP multicast support
Browse files Browse the repository at this point in the history
If receiver RTCP endpoint is bound to multicast address, it
now uses Report_ToAddress instead of Report_Back mode.
  • Loading branch information
gavv committed Jan 31, 2024
1 parent 508e83d commit 5936fee
Show file tree
Hide file tree
Showing 19 changed files with 76 additions and 34 deletions.
2 changes: 1 addition & 1 deletion src/internal_modules/roc_node/receiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ bool Receiver::bind(slot_index_t slot_index,
}

pipeline::ReceiverLoop::Tasks::AddEndpoint endpoint_task(
slot->handle, iface, uri.proto(), outbound_writer);
slot->handle, iface, uri.proto(), port.config.bind_address, outbound_writer);
if (!pipeline_.schedule_and_wait(endpoint_task)) {
roc_log(LogError,
"receiver node:"
Expand Down
2 changes: 1 addition & 1 deletion src/internal_modules/roc_node/receiver_decoder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ bool ReceiverDecoder::activate(address::Interface iface, address::Protocol proto
packet::ConcurrentQueue::NonBlocking));

pipeline::ReceiverLoop::Tasks::AddEndpoint endpoint_task(
slot_, iface, proto, endpoint_queues_[iface].get());
slot_, iface, proto, bind_address_, endpoint_queues_[iface].get());
if (!pipeline_.schedule_and_wait(endpoint_task)) {
roc_log(LogError,
"receiver decoder node:"
Expand Down
2 changes: 1 addition & 1 deletion src/internal_modules/roc_node/receiver_decoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class ReceiverDecoder : public Node, private pipeline::IPipelineTaskScheduler {

core::Mutex mutex_;

address::SocketAddr dest_address_;
address::SocketAddr bind_address_;

core::Optional<packet::ConcurrentQueue> endpoint_queues_[address::Iface_Max];
core::Atomic<packet::IWriter*> endpoint_writers_[address::Iface_Max];
Expand Down
8 changes: 8 additions & 0 deletions src/internal_modules/roc_pipeline/receiver_endpoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ ReceiverEndpoint::ReceiverEndpoint(address::Protocol proto,
StateTracker& state_tracker,
ReceiverSessionGroup& session_group,
const rtp::EncodingMap& encoding_map,
const address::SocketAddr& inbound_address,
packet::IWriter* outbound_writer,
core::IArena& arena)
: core::RefCounted<ReceiverEndpoint, core::ArenaAllocation>(arena)
Expand All @@ -30,6 +31,7 @@ ReceiverEndpoint::ReceiverEndpoint(address::Protocol proto,
, session_group_(session_group)
, composer_(NULL)
, parser_(NULL)
, inbound_address_(inbound_address)
, valid_(false) {
packet::IComposer* composer = NULL;
packet::IParser* parser = NULL;
Expand Down Expand Up @@ -174,6 +176,12 @@ packet::IWriter* ReceiverEndpoint::outbound_writer() {
return shipper_.get();
}

const address::SocketAddr& ReceiverEndpoint::inbound_address() const {
roc_panic_if(!is_valid());

return inbound_address_;
}

packet::IWriter& ReceiverEndpoint::inbound_writer() {
roc_panic_if(!is_valid());

Expand Down
6 changes: 5 additions & 1 deletion src/internal_modules/roc_pipeline/receiver_endpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ class ReceiverEndpoint : public core::RefCounted<ReceiverEndpoint, core::ArenaAl
private packet::IWriter {
public:
//! Initialize.
//! - @p writer to handle packets received on netio thread.
ReceiverEndpoint(address::Protocol proto,
StateTracker& state_tracker,
ReceiverSessionGroup& session_group,
const rtp::EncodingMap& encoding_map,
const address::SocketAddr& inbound_address,
packet::IWriter* outbound_writer,
core::IArena& arena);

Expand Down Expand Up @@ -77,6 +77,9 @@ class ReceiverEndpoint : public core::RefCounted<ReceiverEndpoint, core::ArenaAl
//! not supported, the method returns NULL.
packet::IWriter* outbound_writer();

//! Get bind address for inbound packets.
const address::SocketAddr& inbound_address() const;

//! Get writer for inbound packets.
//! This way packets from network reach receiver pipeline.
//! @remarks
Expand Down Expand Up @@ -112,6 +115,7 @@ class ReceiverEndpoint : public core::RefCounted<ReceiverEndpoint, core::ArenaAl
core::Optional<rtp::Parser> rtp_parser_;
core::ScopedPtr<packet::IParser> fec_parser_;
core::Optional<rtcp::Parser> rtcp_parser_;
address::SocketAddr inbound_address_;
core::MpscQueue<packet::Packet> inbound_queue_;

bool valid_;
Expand Down
9 changes: 6 additions & 3 deletions src/internal_modules/roc_pipeline/receiver_loop.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ ReceiverLoop::Task::Task()
, slot_(NULL)
, iface_(address::Iface_Invalid)
, proto_(address::Proto_None)
, outbound_writer_(NULL)
, inbound_address_()
, inbound_writer_(NULL)
, outbound_writer_(NULL)
, slot_metrics_(NULL)
, sess_metrics_(NULL)
, sess_metrics_size_(NULL) {
Expand Down Expand Up @@ -63,6 +64,7 @@ ReceiverLoop::Tasks::QuerySlot::QuerySlot(SlotHandle slot,
ReceiverLoop::Tasks::AddEndpoint::AddEndpoint(SlotHandle slot,
address::Interface iface,
address::Protocol proto,
const address::SocketAddr& inbound_address,
packet::IWriter* outbound_writer) {
func_ = &ReceiverLoop::task_add_endpoint_;
if (!slot) {
Expand All @@ -71,6 +73,7 @@ ReceiverLoop::Tasks::AddEndpoint::AddEndpoint(SlotHandle slot,
slot_ = (ReceiverSlot*)slot;
iface_ = iface;
proto_ = proto;
inbound_address_ = inbound_address;
outbound_writer_ = outbound_writer;
}

Expand Down Expand Up @@ -290,8 +293,8 @@ bool ReceiverLoop::task_query_slot_(Task& task) {
bool ReceiverLoop::task_add_endpoint_(Task& task) {
roc_panic_if(!task.slot_);

ReceiverEndpoint* endpoint =
task.slot_->add_endpoint(task.iface_, task.proto_, task.outbound_writer_);
ReceiverEndpoint* endpoint = task.slot_->add_endpoint(
task.iface_, task.proto_, task.inbound_address_, task.outbound_writer_);
if (!endpoint) {
return false;
}
Expand Down
4 changes: 3 additions & 1 deletion src/internal_modules/roc_pipeline/receiver_loop.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,9 @@ class ReceiverLoop : public PipelineLoop, private sndio::ISource {
ReceiverSlot* slot_; //!< Slot.
address::Interface iface_; //!< Interface.
address::Protocol proto_; //!< Protocol.
packet::IWriter* outbound_writer_; //!< Outbound packet writer.
address::SocketAddr inbound_address_; //!< Inbound packet address.
packet::IWriter* inbound_writer_; //!< Inbound packet writer.
packet::IWriter* outbound_writer_; //!< Outbound packet writer.
ReceiverSlotMetrics* slot_metrics_; //!< Output for slot metrics.
ReceiverSessionMetrics* sess_metrics_; //!< Output for session metrics.
size_t* sess_metrics_size_; //!< Input/output session metrics size.
Expand Down Expand Up @@ -110,6 +111,7 @@ class ReceiverLoop : public PipelineLoop, private sndio::ISource {
AddEndpoint(SlotHandle slot,
address::Interface iface,
address::Protocol proto,
const address::SocketAddr& inbound_address,
packet::IWriter* outbound_writer);

//! Get packet writer for inbound packets for the endpoint.
Expand Down
10 changes: 9 additions & 1 deletion src/internal_modules/roc_pipeline/receiver_session_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ bool ReceiverSessionGroup::create_control_pipeline(ReceiverEndpoint* control_end
|| !control_endpoint->outbound_writer());
roc_panic_if(rtcp_communicator_);

rtcp_inbound_addr_ = control_endpoint->inbound_address();

rtcp_communicator_.reset(new (rtcp_communicator_) rtcp::Communicator(
receiver_config_.common.rtcp, *this, *control_endpoint->outbound_writer(),
*control_endpoint->outbound_composer(), packet_factory_, byte_buffer_factory_,
Expand Down Expand Up @@ -171,7 +173,13 @@ rtcp::ParticipantInfo ReceiverSessionGroup::participant_info() {

part_info.cname = identity_->cname();
part_info.source_id = identity_->ssrc();
part_info.report_mode = rtcp::Report_Back;

if (rtcp_inbound_addr_.multicast()) {
part_info.report_mode = rtcp::Report_ToAddress;
part_info.report_address = rtcp_inbound_addr_;
} else {
part_info.report_mode = rtcp::Report_Back;
}

return part_info;
}
Expand Down
1 change: 1 addition & 0 deletions src/internal_modules/roc_pipeline/receiver_session_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ class ReceiverSessionGroup : public core::NonCopyable<>, private rtcp::IParticip
core::Optional<rtp::Identity> identity_;

core::Optional<rtcp::Communicator> rtcp_communicator_;
address::SocketAddr rtcp_inbound_addr_;

core::List<ReceiverSession> sessions_;
ReceiverSessionRouter session_router_;
Expand Down
19 changes: 13 additions & 6 deletions src/internal_modules/roc_pipeline/receiver_slot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ bool ReceiverSlot::is_valid() const {

ReceiverEndpoint* ReceiverSlot::add_endpoint(address::Interface iface,
address::Protocol proto,
const address::SocketAddr& inbound_address,
packet::IWriter* outbound_writer) {
roc_panic_if(!is_valid());

Expand All @@ -56,13 +57,13 @@ ReceiverEndpoint* ReceiverSlot::add_endpoint(address::Interface iface,

switch (iface) {
case address::Iface_AudioSource:
return create_source_endpoint_(proto, outbound_writer);
return create_source_endpoint_(proto, inbound_address, outbound_writer);

case address::Iface_AudioRepair:
return create_repair_endpoint_(proto, outbound_writer);
return create_repair_endpoint_(proto, inbound_address, outbound_writer);

case address::Iface_AudioControl:
return create_control_endpoint_(proto, outbound_writer);
return create_control_endpoint_(proto, inbound_address, outbound_writer);

default:
break;
Expand Down Expand Up @@ -123,6 +124,7 @@ void ReceiverSlot::get_metrics(ReceiverSlotMetrics& slot_metrics,

ReceiverEndpoint*
ReceiverSlot::create_source_endpoint_(address::Protocol proto,
const address::SocketAddr& inbound_address,
packet::IWriter* outbound_writer) {
if (source_endpoint_) {
roc_log(LogError, "receiver slot: audio source endpoint is already set");
Expand All @@ -140,7 +142,8 @@ ReceiverSlot::create_source_endpoint_(address::Protocol proto,
}

source_endpoint_.reset(new (source_endpoint_) ReceiverEndpoint(
proto, state_tracker_, session_group_, encoding_map_, outbound_writer, arena()));
proto, state_tracker_, session_group_, encoding_map_, inbound_address,
outbound_writer, arena()));

if (!source_endpoint_ || !source_endpoint_->is_valid()) {
roc_log(LogError, "receiver slot: can't create source endpoint");
Expand All @@ -153,6 +156,7 @@ ReceiverSlot::create_source_endpoint_(address::Protocol proto,

ReceiverEndpoint*
ReceiverSlot::create_repair_endpoint_(address::Protocol proto,
const address::SocketAddr& inbound_address,
packet::IWriter* outbound_writer) {
if (repair_endpoint_) {
roc_log(LogError, "receiver slot: audio repair endpoint is already set");
Expand All @@ -170,7 +174,8 @@ ReceiverSlot::create_repair_endpoint_(address::Protocol proto,
}

repair_endpoint_.reset(new (repair_endpoint_) ReceiverEndpoint(
proto, state_tracker_, session_group_, encoding_map_, outbound_writer, arena()));
proto, state_tracker_, session_group_, encoding_map_, inbound_address,
outbound_writer, arena()));

if (!repair_endpoint_ || !repair_endpoint_->is_valid()) {
roc_log(LogError, "receiver slot: can't create repair endpoint");
Expand All @@ -183,6 +188,7 @@ ReceiverSlot::create_repair_endpoint_(address::Protocol proto,

ReceiverEndpoint*
ReceiverSlot::create_control_endpoint_(address::Protocol proto,
const address::SocketAddr& inbound_address,
packet::IWriter* outbound_writer) {
if (control_endpoint_) {
roc_log(LogError, "receiver slot: audio control endpoint is already set");
Expand All @@ -194,7 +200,8 @@ ReceiverSlot::create_control_endpoint_(address::Protocol proto,
}

control_endpoint_.reset(new (control_endpoint_) ReceiverEndpoint(
proto, state_tracker_, session_group_, encoding_map_, outbound_writer, arena()));
proto, state_tracker_, session_group_, encoding_map_, inbound_address,
outbound_writer, arena()));

if (!control_endpoint_ || !control_endpoint_->is_valid()) {
roc_log(LogError, "receiver slot: can't create control endpoint");
Expand Down
4 changes: 4 additions & 0 deletions src/internal_modules/roc_pipeline/receiver_slot.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class ReceiverSlot : public core::RefCounted<ReceiverSlot, core::ArenaAllocation
//! Add endpoint.
ReceiverEndpoint* add_endpoint(address::Interface iface,
address::Protocol proto,
const address::SocketAddr& inbound_address,
packet::IWriter* outbound_writer);

//! Pull packets and refresh sessions according to current time.
Expand All @@ -76,10 +77,13 @@ class ReceiverSlot : public core::RefCounted<ReceiverSlot, core::ArenaAllocation

private:
ReceiverEndpoint* create_source_endpoint_(address::Protocol proto,
const address::SocketAddr& inbound_address,
packet::IWriter* outbound_writer);
ReceiverEndpoint* create_repair_endpoint_(address::Protocol proto,
const address::SocketAddr& inbound_address,
packet::IWriter* outbound_writer);
ReceiverEndpoint* create_control_endpoint_(address::Protocol proto,
const address::SocketAddr& inbound_address,
packet::IWriter* outbound_writer);

const rtp::EncodingMap& encoding_map_;
Expand Down
2 changes: 1 addition & 1 deletion src/internal_modules/roc_pipeline/sender_endpoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ address::Protocol SenderEndpoint::proto() const {
return proto_;
}

const address::SocketAddr& SenderEndpoint::outbound_address() {
const address::SocketAddr& SenderEndpoint::outbound_address() const {
roc_panic_if(!is_valid());

return shipper_->outbound_address();
Expand Down
2 changes: 1 addition & 1 deletion src/internal_modules/roc_pipeline/sender_endpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class SenderEndpoint : public core::NonCopyable<>, private packet::IWriter {
address::Protocol proto() const;

//! Get destination address for outbound packets.
const address::SocketAddr& outbound_address();
const address::SocketAddr& outbound_address() const;

//! Get composer for outbound packets.
//! @remarks
Expand Down
6 changes: 3 additions & 3 deletions src/internal_modules/roc_pipeline/sender_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ bool SenderSession::create_control_pipeline(SenderEndpoint* control_endpoint) {
roc_panic_if(!control_endpoint);
roc_panic_if(rtcp_communicator_);

rtcp_address_ = control_endpoint->outbound_address();
rtcp_outbound_addr_ = control_endpoint->outbound_address();

rtcp_communicator_.reset(new (rtcp_communicator_) rtcp::Communicator(
config_.rtcp, *this, control_endpoint->outbound_writer(),
Expand Down Expand Up @@ -257,7 +257,7 @@ rtcp::ParticipantInfo SenderSession::participant_info() {
part_info.cname = identity_->cname();
part_info.source_id = identity_->ssrc();
part_info.report_mode = rtcp::Report_ToAddress;
part_info.report_address = rtcp_address_;
part_info.report_address = rtcp_outbound_addr_;

return part_info;
}
Expand Down Expand Up @@ -316,7 +316,7 @@ void SenderSession::start_feedback_monitor_() {
return;
}

if (rtcp_address_.multicast()) {
if (rtcp_outbound_addr_.multicast()) {
// Control endpoint uses multicast, so there are multiple receivers for
// a sender session. We don't support feedback monitoring in this mode.
return;
Expand Down
2 changes: 1 addition & 1 deletion src/internal_modules/roc_pipeline/sender_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ class SenderSession : public core::NonCopyable<>, private rtcp::IParticipant {
core::Optional<audio::FeedbackMonitor> feedback_monitor_;

core::Optional<rtcp::Communicator> rtcp_communicator_;
address::SocketAddr rtcp_address_;
address::SocketAddr rtcp_outbound_addr_;

audio::IFrameWriter* frame_writer_;

Expand Down
13 changes: 7 additions & 6 deletions src/tests/roc_pipeline/test_loopback_sink_2_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -305,21 +305,22 @@ void send_receive(int flags,
packet::IWriter* receiver_repair_endpoint_writer = NULL;
packet::IWriter* receiver_control_endpoint_writer = NULL;

receiver_source_endpoint =
receiver_slot->add_endpoint(address::Iface_AudioSource, source_proto, NULL);
receiver_source_endpoint = receiver_slot->add_endpoint(
address::Iface_AudioSource, source_proto, receiver_source_addr, NULL);
CHECK(receiver_source_endpoint);
receiver_source_endpoint_writer = &receiver_source_endpoint->inbound_writer();

if (repair_proto != address::Proto_None) {
receiver_repair_endpoint =
receiver_slot->add_endpoint(address::Iface_AudioRepair, repair_proto, NULL);
receiver_repair_endpoint = receiver_slot->add_endpoint(
address::Iface_AudioRepair, repair_proto, receiver_repair_addr, NULL);
CHECK(receiver_repair_endpoint);
receiver_repair_endpoint_writer = &receiver_repair_endpoint->inbound_writer();
}

if (control_proto != address::Proto_None) {
receiver_control_endpoint = receiver_slot->add_endpoint(
address::Iface_AudioControl, control_proto, &receiver_outbound_queue);
receiver_control_endpoint =
receiver_slot->add_endpoint(address::Iface_AudioControl, control_proto,
receiver_control_addr, &receiver_outbound_queue);
CHECK(receiver_control_endpoint);
receiver_control_endpoint_writer = &receiver_control_endpoint->inbound_writer();
}
Expand Down
6 changes: 3 additions & 3 deletions src/tests/roc_pipeline/test_receiver_endpoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ TEST(receiver_endpoint, valid) {
sample_buffer_factory, arena);

ReceiverEndpoint endpoint(address::Proto_RTP, state_tracker, session_group,
encoding_map, NULL, arena);
encoding_map, address::SocketAddr(), NULL, arena);
CHECK(endpoint.is_valid());
}

Expand All @@ -68,7 +68,7 @@ TEST(receiver_endpoint, invalid_proto) {
sample_buffer_factory, arena);

ReceiverEndpoint endpoint(address::Proto_None, state_tracker, session_group,
encoding_map, NULL, arena);
encoding_map, address::SocketAddr(), NULL, arena);
CHECK(!endpoint.is_valid());
}

Expand All @@ -92,7 +92,7 @@ TEST(receiver_endpoint, no_memory) {
byte_buffer_factory, sample_buffer_factory, nomem_arena);

ReceiverEndpoint endpoint(protos[n], state_tracker, session_group, encoding_map,
NULL, nomem_arena);
address::SocketAddr(), NULL, nomem_arena);

CHECK(!endpoint.is_valid());
}
Expand Down
Loading

0 comments on commit 5936fee

Please sign in to comment.