From 5936feef9b05db4b55943f41f7b908ae42e29396 Mon Sep 17 00:00:00 2001 From: Victor Gaydov Date: Wed, 31 Jan 2024 13:02:37 +0400 Subject: [PATCH] gh-674 Fix RTCP multicast support If receiver RTCP endpoint is bound to multicast address, it now uses Report_ToAddress instead of Report_Back mode. --- src/internal_modules/roc_node/receiver.cpp | 2 +- .../roc_node/receiver_decoder.cpp | 2 +- .../roc_node/receiver_decoder.h | 2 +- .../roc_pipeline/receiver_endpoint.cpp | 8 ++++++++ .../roc_pipeline/receiver_endpoint.h | 6 +++++- .../roc_pipeline/receiver_loop.cpp | 9 ++++++--- .../roc_pipeline/receiver_loop.h | 4 +++- .../roc_pipeline/receiver_session_group.cpp | 10 +++++++++- .../roc_pipeline/receiver_session_group.h | 1 + .../roc_pipeline/receiver_slot.cpp | 19 +++++++++++++------ .../roc_pipeline/receiver_slot.h | 4 ++++ .../roc_pipeline/sender_endpoint.cpp | 2 +- .../roc_pipeline/sender_endpoint.h | 2 +- .../roc_pipeline/sender_session.cpp | 6 +++--- .../roc_pipeline/sender_session.h | 2 +- .../test_loopback_sink_2_source.cpp | 13 +++++++------ .../roc_pipeline/test_receiver_endpoint.cpp | 6 +++--- src/tests/roc_pipeline/test_receiver_loop.cpp | 6 ++++-- .../roc_pipeline/test_receiver_source.cpp | 6 ++++-- 19 files changed, 76 insertions(+), 34 deletions(-) diff --git a/src/internal_modules/roc_node/receiver.cpp b/src/internal_modules/roc_node/receiver.cpp index 2d318d425..03c98eeb0 100644 --- a/src/internal_modules/roc_node/receiver.cpp +++ b/src/internal_modules/roc_node/receiver.cpp @@ -207,7 +207,7 @@ bool Receiver::bind(slot_index_t slot_index, } pipeline::ReceiverLoop::Tasks::AddEndpoint endpoint_task( - slot->handle, iface, uri.proto(), outbound_writer); + slot->handle, iface, uri.proto(), port.config.bind_address, outbound_writer); if (!pipeline_.schedule_and_wait(endpoint_task)) { roc_log(LogError, "receiver node:" diff --git a/src/internal_modules/roc_node/receiver_decoder.cpp b/src/internal_modules/roc_node/receiver_decoder.cpp index 8fd00be43..8ae63ae85 100644 --- a/src/internal_modules/roc_node/receiver_decoder.cpp +++ b/src/internal_modules/roc_node/receiver_decoder.cpp @@ -94,7 +94,7 @@ bool ReceiverDecoder::activate(address::Interface iface, address::Protocol proto packet::ConcurrentQueue::NonBlocking)); pipeline::ReceiverLoop::Tasks::AddEndpoint endpoint_task( - slot_, iface, proto, endpoint_queues_[iface].get()); + slot_, iface, proto, bind_address_, endpoint_queues_[iface].get()); if (!pipeline_.schedule_and_wait(endpoint_task)) { roc_log(LogError, "receiver decoder node:" diff --git a/src/internal_modules/roc_node/receiver_decoder.h b/src/internal_modules/roc_node/receiver_decoder.h index 95c8306a7..c35bce161 100644 --- a/src/internal_modules/roc_node/receiver_decoder.h +++ b/src/internal_modules/roc_node/receiver_decoder.h @@ -70,7 +70,7 @@ class ReceiverDecoder : public Node, private pipeline::IPipelineTaskScheduler { core::Mutex mutex_; - address::SocketAddr dest_address_; + address::SocketAddr bind_address_; core::Optional endpoint_queues_[address::Iface_Max]; core::Atomic endpoint_writers_[address::Iface_Max]; diff --git a/src/internal_modules/roc_pipeline/receiver_endpoint.cpp b/src/internal_modules/roc_pipeline/receiver_endpoint.cpp index 6fc2b6913..6314610b0 100644 --- a/src/internal_modules/roc_pipeline/receiver_endpoint.cpp +++ b/src/internal_modules/roc_pipeline/receiver_endpoint.cpp @@ -22,6 +22,7 @@ ReceiverEndpoint::ReceiverEndpoint(address::Protocol proto, StateTracker& state_tracker, ReceiverSessionGroup& session_group, const rtp::EncodingMap& encoding_map, + const address::SocketAddr& inbound_address, packet::IWriter* outbound_writer, core::IArena& arena) : core::RefCounted(arena) @@ -30,6 +31,7 @@ ReceiverEndpoint::ReceiverEndpoint(address::Protocol proto, , session_group_(session_group) , composer_(NULL) , parser_(NULL) + , inbound_address_(inbound_address) , valid_(false) { packet::IComposer* composer = NULL; packet::IParser* parser = NULL; @@ -174,6 +176,12 @@ packet::IWriter* ReceiverEndpoint::outbound_writer() { return shipper_.get(); } +const address::SocketAddr& ReceiverEndpoint::inbound_address() const { + roc_panic_if(!is_valid()); + + return inbound_address_; +} + packet::IWriter& ReceiverEndpoint::inbound_writer() { roc_panic_if(!is_valid()); diff --git a/src/internal_modules/roc_pipeline/receiver_endpoint.h b/src/internal_modules/roc_pipeline/receiver_endpoint.h index b7e25fb4e..a7dee78cc 100644 --- a/src/internal_modules/roc_pipeline/receiver_endpoint.h +++ b/src/internal_modules/roc_pipeline/receiver_endpoint.h @@ -44,11 +44,11 @@ class ReceiverEndpoint : public core::RefCounted rtp_parser_; core::ScopedPtr fec_parser_; core::Optional rtcp_parser_; + address::SocketAddr inbound_address_; core::MpscQueue inbound_queue_; bool valid_; diff --git a/src/internal_modules/roc_pipeline/receiver_loop.cpp b/src/internal_modules/roc_pipeline/receiver_loop.cpp index f0d86bd0d..56cc8651c 100644 --- a/src/internal_modules/roc_pipeline/receiver_loop.cpp +++ b/src/internal_modules/roc_pipeline/receiver_loop.cpp @@ -19,8 +19,9 @@ ReceiverLoop::Task::Task() , slot_(NULL) , iface_(address::Iface_Invalid) , proto_(address::Proto_None) - , outbound_writer_(NULL) + , inbound_address_() , inbound_writer_(NULL) + , outbound_writer_(NULL) , slot_metrics_(NULL) , sess_metrics_(NULL) , sess_metrics_size_(NULL) { @@ -63,6 +64,7 @@ ReceiverLoop::Tasks::QuerySlot::QuerySlot(SlotHandle slot, ReceiverLoop::Tasks::AddEndpoint::AddEndpoint(SlotHandle slot, address::Interface iface, address::Protocol proto, + const address::SocketAddr& inbound_address, packet::IWriter* outbound_writer) { func_ = &ReceiverLoop::task_add_endpoint_; if (!slot) { @@ -71,6 +73,7 @@ ReceiverLoop::Tasks::AddEndpoint::AddEndpoint(SlotHandle slot, slot_ = (ReceiverSlot*)slot; iface_ = iface; proto_ = proto; + inbound_address_ = inbound_address; outbound_writer_ = outbound_writer; } @@ -290,8 +293,8 @@ bool ReceiverLoop::task_query_slot_(Task& task) { bool ReceiverLoop::task_add_endpoint_(Task& task) { roc_panic_if(!task.slot_); - ReceiverEndpoint* endpoint = - task.slot_->add_endpoint(task.iface_, task.proto_, task.outbound_writer_); + ReceiverEndpoint* endpoint = task.slot_->add_endpoint( + task.iface_, task.proto_, task.inbound_address_, task.outbound_writer_); if (!endpoint) { return false; } diff --git a/src/internal_modules/roc_pipeline/receiver_loop.h b/src/internal_modules/roc_pipeline/receiver_loop.h index d06e868e1..a9c644a04 100644 --- a/src/internal_modules/roc_pipeline/receiver_loop.h +++ b/src/internal_modules/roc_pipeline/receiver_loop.h @@ -61,8 +61,9 @@ class ReceiverLoop : public PipelineLoop, private sndio::ISource { ReceiverSlot* slot_; //!< Slot. address::Interface iface_; //!< Interface. address::Protocol proto_; //!< Protocol. - packet::IWriter* outbound_writer_; //!< Outbound packet writer. + address::SocketAddr inbound_address_; //!< Inbound packet address. packet::IWriter* inbound_writer_; //!< Inbound packet writer. + packet::IWriter* outbound_writer_; //!< Outbound packet writer. ReceiverSlotMetrics* slot_metrics_; //!< Output for slot metrics. ReceiverSessionMetrics* sess_metrics_; //!< Output for session metrics. size_t* sess_metrics_size_; //!< Input/output session metrics size. @@ -110,6 +111,7 @@ class ReceiverLoop : public PipelineLoop, private sndio::ISource { AddEndpoint(SlotHandle slot, address::Interface iface, address::Protocol proto, + const address::SocketAddr& inbound_address, packet::IWriter* outbound_writer); //! Get packet writer for inbound packets for the endpoint. diff --git a/src/internal_modules/roc_pipeline/receiver_session_group.cpp b/src/internal_modules/roc_pipeline/receiver_session_group.cpp index aabb971bb..b871979c4 100644 --- a/src/internal_modules/roc_pipeline/receiver_session_group.cpp +++ b/src/internal_modules/roc_pipeline/receiver_session_group.cpp @@ -59,6 +59,8 @@ bool ReceiverSessionGroup::create_control_pipeline(ReceiverEndpoint* control_end || !control_endpoint->outbound_writer()); roc_panic_if(rtcp_communicator_); + rtcp_inbound_addr_ = control_endpoint->inbound_address(); + rtcp_communicator_.reset(new (rtcp_communicator_) rtcp::Communicator( receiver_config_.common.rtcp, *this, *control_endpoint->outbound_writer(), *control_endpoint->outbound_composer(), packet_factory_, byte_buffer_factory_, @@ -171,7 +173,13 @@ rtcp::ParticipantInfo ReceiverSessionGroup::participant_info() { part_info.cname = identity_->cname(); part_info.source_id = identity_->ssrc(); - part_info.report_mode = rtcp::Report_Back; + + if (rtcp_inbound_addr_.multicast()) { + part_info.report_mode = rtcp::Report_ToAddress; + part_info.report_address = rtcp_inbound_addr_; + } else { + part_info.report_mode = rtcp::Report_Back; + } return part_info; } diff --git a/src/internal_modules/roc_pipeline/receiver_session_group.h b/src/internal_modules/roc_pipeline/receiver_session_group.h index bea78d67f..761a5e1d2 100644 --- a/src/internal_modules/roc_pipeline/receiver_session_group.h +++ b/src/internal_modules/roc_pipeline/receiver_session_group.h @@ -136,6 +136,7 @@ class ReceiverSessionGroup : public core::NonCopyable<>, private rtcp::IParticip core::Optional identity_; core::Optional rtcp_communicator_; + address::SocketAddr rtcp_inbound_addr_; core::List sessions_; ReceiverSessionRouter session_router_; diff --git a/src/internal_modules/roc_pipeline/receiver_slot.cpp b/src/internal_modules/roc_pipeline/receiver_slot.cpp index be747e488..b102d85a9 100644 --- a/src/internal_modules/roc_pipeline/receiver_slot.cpp +++ b/src/internal_modules/roc_pipeline/receiver_slot.cpp @@ -48,6 +48,7 @@ bool ReceiverSlot::is_valid() const { ReceiverEndpoint* ReceiverSlot::add_endpoint(address::Interface iface, address::Protocol proto, + const address::SocketAddr& inbound_address, packet::IWriter* outbound_writer) { roc_panic_if(!is_valid()); @@ -56,13 +57,13 @@ ReceiverEndpoint* ReceiverSlot::add_endpoint(address::Interface iface, switch (iface) { case address::Iface_AudioSource: - return create_source_endpoint_(proto, outbound_writer); + return create_source_endpoint_(proto, inbound_address, outbound_writer); case address::Iface_AudioRepair: - return create_repair_endpoint_(proto, outbound_writer); + return create_repair_endpoint_(proto, inbound_address, outbound_writer); case address::Iface_AudioControl: - return create_control_endpoint_(proto, outbound_writer); + return create_control_endpoint_(proto, inbound_address, outbound_writer); default: break; @@ -123,6 +124,7 @@ void ReceiverSlot::get_metrics(ReceiverSlotMetrics& slot_metrics, ReceiverEndpoint* ReceiverSlot::create_source_endpoint_(address::Protocol proto, + const address::SocketAddr& inbound_address, packet::IWriter* outbound_writer) { if (source_endpoint_) { roc_log(LogError, "receiver slot: audio source endpoint is already set"); @@ -140,7 +142,8 @@ ReceiverSlot::create_source_endpoint_(address::Protocol proto, } source_endpoint_.reset(new (source_endpoint_) ReceiverEndpoint( - proto, state_tracker_, session_group_, encoding_map_, outbound_writer, arena())); + proto, state_tracker_, session_group_, encoding_map_, inbound_address, + outbound_writer, arena())); if (!source_endpoint_ || !source_endpoint_->is_valid()) { roc_log(LogError, "receiver slot: can't create source endpoint"); @@ -153,6 +156,7 @@ ReceiverSlot::create_source_endpoint_(address::Protocol proto, ReceiverEndpoint* ReceiverSlot::create_repair_endpoint_(address::Protocol proto, + const address::SocketAddr& inbound_address, packet::IWriter* outbound_writer) { if (repair_endpoint_) { roc_log(LogError, "receiver slot: audio repair endpoint is already set"); @@ -170,7 +174,8 @@ ReceiverSlot::create_repair_endpoint_(address::Protocol proto, } repair_endpoint_.reset(new (repair_endpoint_) ReceiverEndpoint( - proto, state_tracker_, session_group_, encoding_map_, outbound_writer, arena())); + proto, state_tracker_, session_group_, encoding_map_, inbound_address, + outbound_writer, arena())); if (!repair_endpoint_ || !repair_endpoint_->is_valid()) { roc_log(LogError, "receiver slot: can't create repair endpoint"); @@ -183,6 +188,7 @@ ReceiverSlot::create_repair_endpoint_(address::Protocol proto, ReceiverEndpoint* ReceiverSlot::create_control_endpoint_(address::Protocol proto, + const address::SocketAddr& inbound_address, packet::IWriter* outbound_writer) { if (control_endpoint_) { roc_log(LogError, "receiver slot: audio control endpoint is already set"); @@ -194,7 +200,8 @@ ReceiverSlot::create_control_endpoint_(address::Protocol proto, } control_endpoint_.reset(new (control_endpoint_) ReceiverEndpoint( - proto, state_tracker_, session_group_, encoding_map_, outbound_writer, arena())); + proto, state_tracker_, session_group_, encoding_map_, inbound_address, + outbound_writer, arena())); if (!control_endpoint_ || !control_endpoint_->is_valid()) { roc_log(LogError, "receiver slot: can't create control endpoint"); diff --git a/src/internal_modules/roc_pipeline/receiver_slot.h b/src/internal_modules/roc_pipeline/receiver_slot.h index 1e07d4018..2315946fa 100644 --- a/src/internal_modules/roc_pipeline/receiver_slot.h +++ b/src/internal_modules/roc_pipeline/receiver_slot.h @@ -52,6 +52,7 @@ class ReceiverSlot : public core::RefCountedoutbound_address(); diff --git a/src/internal_modules/roc_pipeline/sender_endpoint.h b/src/internal_modules/roc_pipeline/sender_endpoint.h index 78631e067..9034dc703 100644 --- a/src/internal_modules/roc_pipeline/sender_endpoint.h +++ b/src/internal_modules/roc_pipeline/sender_endpoint.h @@ -58,7 +58,7 @@ class SenderEndpoint : public core::NonCopyable<>, private packet::IWriter { address::Protocol proto() const; //! Get destination address for outbound packets. - const address::SocketAddr& outbound_address(); + const address::SocketAddr& outbound_address() const; //! Get composer for outbound packets. //! @remarks diff --git a/src/internal_modules/roc_pipeline/sender_session.cpp b/src/internal_modules/roc_pipeline/sender_session.cpp index 92f4b89ca..3de4c8422 100644 --- a/src/internal_modules/roc_pipeline/sender_session.cpp +++ b/src/internal_modules/roc_pipeline/sender_session.cpp @@ -187,7 +187,7 @@ bool SenderSession::create_control_pipeline(SenderEndpoint* control_endpoint) { roc_panic_if(!control_endpoint); roc_panic_if(rtcp_communicator_); - rtcp_address_ = control_endpoint->outbound_address(); + rtcp_outbound_addr_ = control_endpoint->outbound_address(); rtcp_communicator_.reset(new (rtcp_communicator_) rtcp::Communicator( config_.rtcp, *this, control_endpoint->outbound_writer(), @@ -257,7 +257,7 @@ rtcp::ParticipantInfo SenderSession::participant_info() { part_info.cname = identity_->cname(); part_info.source_id = identity_->ssrc(); part_info.report_mode = rtcp::Report_ToAddress; - part_info.report_address = rtcp_address_; + part_info.report_address = rtcp_outbound_addr_; return part_info; } @@ -316,7 +316,7 @@ void SenderSession::start_feedback_monitor_() { return; } - if (rtcp_address_.multicast()) { + if (rtcp_outbound_addr_.multicast()) { // Control endpoint uses multicast, so there are multiple receivers for // a sender session. We don't support feedback monitoring in this mode. return; diff --git a/src/internal_modules/roc_pipeline/sender_session.h b/src/internal_modules/roc_pipeline/sender_session.h index 72d7fa0fe..1fceb2cbb 100644 --- a/src/internal_modules/roc_pipeline/sender_session.h +++ b/src/internal_modules/roc_pipeline/sender_session.h @@ -141,7 +141,7 @@ class SenderSession : public core::NonCopyable<>, private rtcp::IParticipant { core::Optional feedback_monitor_; core::Optional rtcp_communicator_; - address::SocketAddr rtcp_address_; + address::SocketAddr rtcp_outbound_addr_; audio::IFrameWriter* frame_writer_; diff --git a/src/tests/roc_pipeline/test_loopback_sink_2_source.cpp b/src/tests/roc_pipeline/test_loopback_sink_2_source.cpp index 43259bc43..7c103ca43 100644 --- a/src/tests/roc_pipeline/test_loopback_sink_2_source.cpp +++ b/src/tests/roc_pipeline/test_loopback_sink_2_source.cpp @@ -305,21 +305,22 @@ void send_receive(int flags, packet::IWriter* receiver_repair_endpoint_writer = NULL; packet::IWriter* receiver_control_endpoint_writer = NULL; - receiver_source_endpoint = - receiver_slot->add_endpoint(address::Iface_AudioSource, source_proto, NULL); + receiver_source_endpoint = receiver_slot->add_endpoint( + address::Iface_AudioSource, source_proto, receiver_source_addr, NULL); CHECK(receiver_source_endpoint); receiver_source_endpoint_writer = &receiver_source_endpoint->inbound_writer(); if (repair_proto != address::Proto_None) { - receiver_repair_endpoint = - receiver_slot->add_endpoint(address::Iface_AudioRepair, repair_proto, NULL); + receiver_repair_endpoint = receiver_slot->add_endpoint( + address::Iface_AudioRepair, repair_proto, receiver_repair_addr, NULL); CHECK(receiver_repair_endpoint); receiver_repair_endpoint_writer = &receiver_repair_endpoint->inbound_writer(); } if (control_proto != address::Proto_None) { - receiver_control_endpoint = receiver_slot->add_endpoint( - address::Iface_AudioControl, control_proto, &receiver_outbound_queue); + receiver_control_endpoint = + receiver_slot->add_endpoint(address::Iface_AudioControl, control_proto, + receiver_control_addr, &receiver_outbound_queue); CHECK(receiver_control_endpoint); receiver_control_endpoint_writer = &receiver_control_endpoint->inbound_writer(); } diff --git a/src/tests/roc_pipeline/test_receiver_endpoint.cpp b/src/tests/roc_pipeline/test_receiver_endpoint.cpp index fae886923..7ed1c22ff 100644 --- a/src/tests/roc_pipeline/test_receiver_endpoint.cpp +++ b/src/tests/roc_pipeline/test_receiver_endpoint.cpp @@ -54,7 +54,7 @@ TEST(receiver_endpoint, valid) { sample_buffer_factory, arena); ReceiverEndpoint endpoint(address::Proto_RTP, state_tracker, session_group, - encoding_map, NULL, arena); + encoding_map, address::SocketAddr(), NULL, arena); CHECK(endpoint.is_valid()); } @@ -68,7 +68,7 @@ TEST(receiver_endpoint, invalid_proto) { sample_buffer_factory, arena); ReceiverEndpoint endpoint(address::Proto_None, state_tracker, session_group, - encoding_map, NULL, arena); + encoding_map, address::SocketAddr(), NULL, arena); CHECK(!endpoint.is_valid()); } @@ -92,7 +92,7 @@ TEST(receiver_endpoint, no_memory) { byte_buffer_factory, sample_buffer_factory, nomem_arena); ReceiverEndpoint endpoint(protos[n], state_tracker, session_group, encoding_map, - NULL, nomem_arena); + address::SocketAddr(), NULL, nomem_arena); CHECK(!endpoint.is_valid()); } diff --git a/src/tests/roc_pipeline/test_receiver_loop.cpp b/src/tests/roc_pipeline/test_receiver_loop.cpp index 873139e7b..048e757b8 100644 --- a/src/tests/roc_pipeline/test_receiver_loop.cpp +++ b/src/tests/roc_pipeline/test_receiver_loop.cpp @@ -66,7 +66,8 @@ class TaskIssuer : public IPipelineTaskCompleter { slot_ = task_create_slot_->get_handle(); roc_panic_if_not(slot_); task_add_endpoint_ = new ReceiverLoop::Tasks::AddEndpoint( - slot_, address::Iface_AudioSource, address::Proto_RTP, NULL); + slot_, address::Iface_AudioSource, address::Proto_RTP, + address::SocketAddr(), NULL); pipeline_.schedule(*task_add_endpoint_, *this); return; } @@ -129,7 +130,8 @@ TEST(receiver_loop, endpoints_sync) { { ReceiverLoop::Tasks::AddEndpoint task(slot, address::Iface_AudioSource, - address::Proto_RTP, NULL); + address::Proto_RTP, address::SocketAddr(), + NULL); CHECK(receiver.schedule_and_wait(task)); CHECK(task.success()); CHECK(task.get_inbound_writer()); diff --git a/src/tests/roc_pipeline/test_receiver_source.cpp b/src/tests/roc_pipeline/test_receiver_source.cpp index aacb21751..6706bec04 100644 --- a/src/tests/roc_pipeline/test_receiver_source.cpp +++ b/src/tests/roc_pipeline/test_receiver_source.cpp @@ -84,7 +84,8 @@ packet::IWriter* create_transport_endpoint(ReceiverSlot* slot, address::Interface iface, address::Protocol proto) { CHECK(slot); - ReceiverEndpoint* endpoint = slot->add_endpoint(iface, proto, NULL); + ReceiverEndpoint* endpoint = + slot->add_endpoint(iface, proto, address::SocketAddr(), NULL); CHECK(endpoint); return &endpoint->inbound_writer(); } @@ -94,7 +95,8 @@ packet::IWriter* create_control_endpoint(ReceiverSlot* slot, address::Protocol proto, packet::IWriter& outbound_writer) { CHECK(slot); - ReceiverEndpoint* endpoint = slot->add_endpoint(iface, proto, &outbound_writer); + ReceiverEndpoint* endpoint = + slot->add_endpoint(iface, proto, address::SocketAddr(), &outbound_writer); CHECK(endpoint); return &endpoint->inbound_writer(); }