diff --git a/sunshine/stream.cpp b/sunshine/stream.cpp index 07aa4b3f..a1916969 100644 --- a/sunshine/stream.cpp +++ b/sunshine/stream.cpp @@ -229,6 +229,14 @@ struct broadcast_ctx_t { udp::socket video_sock { io }; udp::socket audio_sock { io }; + + // This is purely for adminitrative purposes. + // + // It's possible two instances of Moonlight are behind a NAT. + // From Sunshine's point of view, the ip addresses are identical + // We need some way to know what ports are already used for different streams + util::sync_t>> audio_video_connections; + control_server_t control_server; }; @@ -260,6 +268,11 @@ struct session_t { std::uint32_t avRiKeyId; std::uint32_t timestamp; udp::endpoint peer; + + util::buffer_t shards; + util::buffer_t shards_p; + + audio_fec_packet_t fec_packet; } audio; struct { @@ -800,7 +813,7 @@ void recvThread(broadcast_ctx_t &ctx) { auto it = peer_to_session.find(peer.address()); if(it != std::end(peer_to_session)) { - BOOST_LOG(debug) << "RAISE: "sv << peer.address().to_string() << ":"sv << peer.port() << " :: " << type_str; + BOOST_LOG(debug) << "RAISE: "sv << peer.address().to_string() << ':' << peer.port() << " :: " << type_str; it->second->raise(peer.port(), std::string { buf[buf_elem].data(), bytes }); } }; @@ -878,14 +891,12 @@ void videoBroadcastThread(udp::socket &sock) { auto lastBlockIndex = 0; if(payload.size() > multi_fec_threshold) { - BOOST_LOG(debug) << "Generating multiple FEC blocks"sv; + BOOST_LOG(verbose) << "Generating multiple FEC blocks"sv; // Align individual fec blocks to blocksize auto unaligned_size = payload.size() / MAX_FEC_BLOCKS; auto aligned_size = ((unaligned_size + (blocksize - 1)) / blocksize) * blocksize; - BOOST_LOG(fatal) << blocksize << " :: "sv << payload.size() << " :: "sv << aligned_size; - // Break the data up into 3 blocks, each containing multiple complete video packets. fec_blocks[0] = payload.substr(0, aligned_size); fec_blocks[1] = payload.substr(aligned_size, aligned_size); @@ -901,10 +912,6 @@ void videoBroadcastThread(udp::socket &sock) { auto blockIndex = 0; std::for_each(fec_blocks_begin, fec_blocks_end, [&](std::string_view ¤t_payload) { - if(lastBlockIndex > 0) { - BOOST_LOG(fatal) << current_payload.size(); - } - auto shards = fec::encode(current_payload, blocksize, fecPercentage, session->config.minRequiredFecPackets); // set FEC info now that we know for sure what our percentage will be for this frame @@ -961,15 +968,8 @@ void audioBroadcastThread(udp::socket &sock) { auto packets = mail::man->queue(mail::audio_packets); constexpr auto max_block_size = crypto::cipher::round_to_pkcs7_padded(2048); - util::buffer_t shards { RTPA_TOTAL_SHARDS * max_block_size }; - util::buffer_t shards_p { RTPA_TOTAL_SHARDS }; - - for(auto x = 0; x < RTPA_TOTAL_SHARDS; ++x) { - shards_p[x] = (uint8_t *)&shards[x * max_block_size]; - } audio_packet_t audio_packet { (audio_packet_raw_t *)malloc(sizeof(audio_packet_raw_t) + max_block_size) }; - audio_fec_packet_t audio_fec_packet { (audio_fec_packet_raw_t *)malloc(sizeof(audio_fec_packet_raw_t) + max_block_size) }; fec::rs_t rs { reed_solomon_new(RTPA_DATA_SHARDS, RTPA_FEC_SHARDS) }; // For unknown reasons, the RS parity matrix computed by our RS implementation @@ -985,14 +985,6 @@ void audioBroadcastThread(udp::socket &sock) { audio_packet->rtp.packetType = 97; audio_packet->rtp.ssrc = 0; - audio_fec_packet->rtp.header = 0x80; - audio_fec_packet->rtp.packetType = 127; - audio_fec_packet->rtp.timestamp = 0; - audio_fec_packet->rtp.ssrc = 0; - - audio_fec_packet->fecHeader.payloadType = audio_packet->rtp.packetType; - audio_fec_packet->fecHeader.ssrc = audio_packet->rtp.ssrc; - while(auto packet = packets->pop()) { if(shutdown_event->peek()) { break; @@ -1020,16 +1012,19 @@ void audioBroadcastThread(udp::socket &sock) { session->audio.sequenceNumber++; session->audio.timestamp += session->config.audio.packetDuration; + auto &shards_p = session->audio.shards_p; + std::copy_n(audio_packet->payload(), bytes, shards_p[sequenceNumber % RTPA_DATA_SHARDS]); sock.send_to(asio::buffer((char *)audio_packet.get(), sizeof(audio_packet_raw_t) + bytes), session->audio.peer); BOOST_LOG(verbose) << "Audio ["sv << sequenceNumber << "] :: send..."sv; + auto &fec_packet = session->audio.fec_packet; // initialize the FEC header at the beginning of the FEC block if(sequenceNumber % RTPA_DATA_SHARDS == 0) { - audio_fec_packet->fecHeader.baseSequenceNumber = util::endian::big(sequenceNumber); - audio_fec_packet->fecHeader.baseTimestamp = util::endian::big(timestamp); + fec_packet->fecHeader.baseSequenceNumber = util::endian::big(sequenceNumber); + fec_packet->fecHeader.baseTimestamp = util::endian::big(timestamp); } // generate parity shards at the end of the FEC block @@ -1037,10 +1032,10 @@ void audioBroadcastThread(udp::socket &sock) { reed_solomon_encode(rs.get(), shards_p.begin(), RTPA_TOTAL_SHARDS, bytes); for(auto x = 0; x < RTPA_FEC_SHARDS; ++x) { - audio_fec_packet->rtp.sequenceNumber = util::endian::big(sequenceNumber + x + 1); - audio_fec_packet->fecHeader.fecShardIndex = x; - memcpy(audio_fec_packet->payload(), shards_p[RTPA_DATA_SHARDS + x], bytes); - sock.send_to(asio::buffer((char *)audio_fec_packet.get(), sizeof(audio_fec_packet_raw_t) + bytes), session->audio.peer); + fec_packet->rtp.sequenceNumber = util::endian::big(sequenceNumber + x + 1); + fec_packet->fecHeader.fecShardIndex = x; + memcpy(fec_packet->payload(), shards_p[RTPA_DATA_SHARDS + x], bytes); + sock.send_to(asio::buffer((char *)fec_packet.get(), sizeof(audio_fec_packet_raw_t) + bytes), session->audio.peer); BOOST_LOG(verbose) << "Audio FEC ["sv << (sequenceNumber & ~(RTPA_DATA_SHARDS - 1)) << ' ' << x << "] :: send..."sv; } } @@ -1134,76 +1129,105 @@ void end_broadcast(broadcast_ctx_t &ctx) { broadcast_shutdown_event->reset(); } -int recv_ping(decltype(broadcast)::ptr_t ref, socket_e type, asio::ip::address &addr, std::chrono::milliseconds timeout) { +int recv_ping(decltype(broadcast)::ptr_t ref, socket_e type, udp::endpoint &peer, std::chrono::milliseconds timeout) { auto constexpr ping = "PING"sv; auto messages = std::make_shared(30); - ref->message_queue_queue->raise(type, addr, messages); + ref->message_queue_queue->raise(type, peer.address(), messages); auto fg = util::fail_guard([&]() { + messages->stop(); + // remove message queue from session - ref->message_queue_queue->raise(type, addr, nullptr); + ref->message_queue_queue->raise(type, peer.address(), nullptr); }); - auto msg_opt = messages->pop(config::stream.ping_timeout); - messages->stop(); + auto start_time = std::chrono::steady_clock::now(); + auto current_time = start_time; - if(!msg_opt) { - BOOST_LOG(error) << "Initial Ping Timeout"sv; + while(current_time - start_time < config::stream.ping_timeout) { + auto delta_time = current_time - start_time; - return -1; - } + auto msg_opt = messages->pop(config::stream.ping_timeout - delta_time); + if(!msg_opt) { + break; + } - TUPLE_2D_REF(port, msg, *msg_opt); - if(msg != ping) { - BOOST_LOG(error) << "First message is not a PING"; - BOOST_LOG(debug) << "Received from "sv << addr << ':' << port << " ["sv << util::hex_vec(msg) << ']'; + TUPLE_2D_REF(port, msg, *msg_opt); + if(msg == ping) { + BOOST_LOG(debug) << "Received ping from "sv << peer.address() << ':' << port << " ["sv << util::hex_vec(msg) << ']'; - return -1; + // Update connection details. + { + auto addr_str = peer.address().to_string(); + + auto &connections = ref->audio_video_connections; + + auto lg = connections.lock(); + + std::remove_reference_t::iterator pos = std::end(*connections); + + for(auto it = std::begin(*connections); it != std::end(*connections); ++it) { + TUPLE_2D_REF(addr, port_ref, *it); + + if(!port_ref && addr_str == addr) { + pos = it; + } + else if(port_ref == port) { + break; + } + } + + if(pos == std::end(*connections)) { + continue; + } + + pos->second = port; + peer.port(port); + } + + return port; + } + + BOOST_LOG(debug) << "Received non-ping from "sv << peer.address() << ':' << port << " ["sv << util::hex_vec(msg) << ']'; + + current_time = std::chrono::steady_clock::now(); } - return port; + BOOST_LOG(error) << "Initial Ping Timeout"sv; + return -1; } -void videoThread(session_t *session, std::string addr_str) { +void videoThread(session_t *session) { auto fg = util::fail_guard([&]() { session::stop(*session); }); while_starting_do_nothing(session->state); - auto addr = asio::ip::make_address(addr_str); auto ref = broadcast.ref(); - auto port = recv_ping(ref, socket_e::video, addr, config::stream.ping_timeout); + auto port = recv_ping(ref, socket_e::video, session->video.peer, config::stream.ping_timeout); if(port < 0) { return; } - session->video.peer.address(addr); - session->video.peer.port(port); - BOOST_LOG(debug) << "Start capturing Video"sv; video::capture(session->mail, session->config.monitor, session); } -void audioThread(session_t *session, std::string addr_str) { +void audioThread(session_t *session) { auto fg = util::fail_guard([&]() { session::stop(*session); }); while_starting_do_nothing(session->state); - auto addr = asio::ip::make_address(addr_str); - auto ref = broadcast.ref(); - auto port = recv_ping(ref, socket_e::audio, addr, config::stream.ping_timeout); + auto port = recv_ping(ref, socket_e::audio, session->audio.peer, config::stream.ping_timeout); if(port < 0) { return; } - session->audio.peer.address(addr); - session->audio.peer.port(port); - BOOST_LOG(debug) << "Start capturing Audio"sv; audio::capture(session->mail, session->config.audio, session); } @@ -1234,6 +1258,42 @@ void join(session_t &session) { //Reset input on session stop to avoid stuck repeated keys BOOST_LOG(debug) << "Resetting Input..."sv; input::reset(session.input); + + BOOST_LOG(debug) << "Removing references to any connections..."sv; + { + auto video_addr = session.video.peer.address().to_string(); + auto audio_addr = session.audio.peer.address().to_string(); + + auto video_port = session.video.peer.port(); + auto audio_port = session.audio.peer.port(); + + auto &connections = session.broadcast_ref->audio_video_connections; + + auto lg = connections.lock(); + + auto validate_size = connections->size(); + for(auto it = std::begin(*connections); it != std::end(*connections);) { + TUPLE_2D_REF(addr, port, *it); + + if((video_port == port && video_addr == addr) || + (audio_port == port && audio_addr == addr)) { + it = connections->erase(it); + } + else { + ++it; + } + } + + auto new_size = connections->size(); + if(validate_size != new_size + 2) { + BOOST_LOG(warning) << "Couldn't remove reference to session connections: ending all broadcasts"sv; + + // A reference to the event object is still stored somewhere else. So no need to keep + // a reference to it. + mail::man->event(mail::broadcast_shutdown)->raise(true); + } + } + BOOST_LOG(debug) << "Session ended"sv; } @@ -1247,10 +1307,28 @@ int start(session_t &session, const std::string &addr_string) { session.broadcast_ref->control_server.emplace_addr_to_session(addr_string, session); + auto addr = boost::asio::ip::make_address(addr_string); + session.video.peer.address(addr); + session.video.peer.port(0); + + session.audio.peer.address(addr); + session.audio.peer.port(0); + + { + auto &connections = session.broadcast_ref->audio_video_connections; + + auto lg = connections.lock(); + + // allocate a location for connections + connections->emplace_back(addr_string, 0); + connections->emplace_back(addr_string, 0); + } + + session.pingTimeout = std::chrono::steady_clock::now() + config::stream.ping_timeout; - session.audioThread = std::thread { audioThread, &session, addr_string }; - session.videoThread = std::thread { videoThread, &session, addr_string }; + session.audioThread = std::thread { audioThread, &session }; + session.videoThread = std::thread { videoThread, &session }; session.state.store(state_e::RUNNING, std::memory_order_relaxed); @@ -1274,6 +1352,30 @@ std::shared_ptr alloc(config_t &config, crypto::aes_t &gcm_key, crypt session->video.idr_events = mail->event(mail::idr); session->video.lowseq = 0; + constexpr auto max_block_size = crypto::cipher::round_to_pkcs7_padded(2048); + + util::buffer_t shards { RTPA_TOTAL_SHARDS * max_block_size }; + util::buffer_t shards_p { RTPA_TOTAL_SHARDS }; + + for(auto x = 0; x < RTPA_TOTAL_SHARDS; ++x) { + shards_p[x] = (uint8_t *)&shards[x * max_block_size]; + } + + // Audio FEC spans multiple audio packets, + // therefore its session specific + session->audio.shards = std::move(shards); + session->audio.shards_p = std::move(shards_p); + + session->audio.fec_packet.reset((audio_fec_packet_raw_t *)malloc(sizeof(audio_fec_packet_raw_t) + max_block_size)); + + session->audio.fec_packet->rtp.header = 0x80; + session->audio.fec_packet->rtp.packetType = 127; + session->audio.fec_packet->rtp.timestamp = 0; + session->audio.fec_packet->rtp.ssrc = 0; + + session->audio.fec_packet->fecHeader.payloadType = 97; + session->audio.fec_packet->fecHeader.ssrc = 0; + session->audio.cipher = crypto::cipher::cbc_t { gcm_key, true };