From 27c0ab6cf3811b81b5df578f7cccae37577dd136 Mon Sep 17 00:00:00 2001 From: Felix Prillwitz Date: Sun, 24 Nov 2024 17:42:19 +0100 Subject: [PATCH] core/connect: prevent takeover from other clients, handle session-update --- connect/src/spirc.rs | 143 ++++++++++++++++++++++++++++-------- connect/src/state.rs | 18 ++++- core/src/dealer/mod.rs | 2 + core/src/dealer/protocol.rs | 46 ++++++++---- core/src/session.rs | 45 +++++++----- protocol/build.rs | 1 + 6 files changed, 188 insertions(+), 67 deletions(-) diff --git a/connect/src/spirc.rs b/connect/src/spirc.rs index eb9039f10..68cea3584 100644 --- a/connect/src/spirc.rs +++ b/connect/src/spirc.rs @@ -1,25 +1,38 @@ -use crate::model::{ResolveContext, SpircPlayStatus}; -use crate::state::context::{ContextType, LoadNext, UpdateContext}; -use crate::state::provider::IsProvider; -use crate::state::{ConnectState, ConnectStateConfig}; +pub use crate::model::{PlayingTrack, SpircLoadCommand}; use crate::{ - core::{authentication::Credentials, session::UserAttributes, Error, Session, SpotifyId}, + core::{ + authentication::Credentials, + dealer::{ + manager::{Reply, RequestReply}, + protocol::{Message, RequestCommand}, + }, + session::UserAttributes, + Error, Session, SpotifyId, + }, playback::{ mixer::Mixer, player::{Player, PlayerEvent, PlayerEventChannel}, }, protocol::{ - explicit_content_pubsub::UserAttributesUpdate, user_attributes::UserAttributesMutation, + autoplay_context_request::AutoplayContextRequest, + connect::{Cluster, ClusterUpdate, PutStateReason, SetVolumeCommand}, + explicit_content_pubsub::UserAttributesUpdate, + player::{Context, TransferState}, + playlist4_external::PlaylistModificationInfo, + social_connect_v2::{session::_host_active_device_id, SessionUpdate}, + user_attributes::UserAttributesMutation, + }, +}; +use crate::{ + model::{ResolveContext, SpircPlayStatus}, + state::{ + context::{ContextType, LoadNext, UpdateContext}, + provider::IsProvider, + {ConnectState, ConnectStateConfig}, }, }; use futures_util::{Stream, StreamExt}; -use librespot_core::dealer::manager::{Reply, RequestReply}; -use librespot_core::dealer::protocol::RequestCommand; -use librespot_protocol::autoplay_context_request::AutoplayContextRequest; -use librespot_protocol::connect::{Cluster, ClusterUpdate, PutStateReason, SetVolumeCommand}; -use librespot_protocol::player::{Context, TransferState}; -use librespot_protocol::playlist4_external::PlaylistModificationInfo; -use protobuf::{Message, MessageField}; +use protobuf::MessageField; use std::{ future::Future, pin::Pin, @@ -31,8 +44,6 @@ use thiserror::Error; use tokio::{sync::mpsc, time::sleep}; use tokio_stream::wrappers::UnboundedReceiverStream; -pub use crate::model::{PlayingTrack, SpircLoadCommand}; - #[derive(Debug, Error)] pub enum SpircError { #[error("response payload empty")] @@ -56,6 +67,7 @@ impl From for Error { } type BoxedStream = Pin + Send>>; +type BoxedStreamResult = BoxedStream>; struct SpircTask { player: Arc, @@ -66,13 +78,14 @@ struct SpircTask { play_request_id: Option, play_status: SpircPlayStatus, - connection_id_update: BoxedStream>, - connect_state_update: BoxedStream>, - connect_state_volume_update: BoxedStream>, - playlist_update: BoxedStream>, + connection_id_update: BoxedStreamResult, + connect_state_update: BoxedStreamResult, + connect_state_volume_update: BoxedStreamResult, + playlist_update: BoxedStreamResult, + session_update: BoxedStreamResult, connect_state_command: BoxedStream, - user_attributes_update: BoxedStream>, - user_attributes_mutation: BoxedStream>, + user_attributes_update: BoxedStreamResult, + user_attributes_mutation: BoxedStreamResult, commands: Option>, player_events: Option, @@ -153,21 +166,28 @@ impl Spirc { session .dealer() .listen_for("hm://connect-state/v1/cluster")? - .map(|msg| msg.payload.into_message()), + .map(Message::from_raw), ); let connect_state_volume_update = Box::pin( session .dealer() .listen_for("hm://connect-state/v1/connect/volume")? - .map(|msg| msg.payload.into_message()), + .map(Message::from_raw), ); let playlist_update = Box::pin( session .dealer() .listen_for("hm://playlist/v2/playlist/")? - .map(|msg| msg.payload.into_message()), + .map(Message::from_raw), + ); + + let session_update = Box::pin( + session + .dealer() + .listen_for("social-connect/v2/session_update")? + .map(Message::from_json), ); let connect_state_command = Box::pin( @@ -181,7 +201,7 @@ impl Spirc { session .dealer() .listen_for("spotify:user:attributes:update")? - .map(|msg| msg.payload.into_message()), + .map(Message::from_raw), ); // can be trigger by toggling autoplay in a desktop client @@ -189,7 +209,7 @@ impl Spirc { session .dealer() .listen_for("spotify:user:attributes:mutated")? - .map(|msg| msg.payload.into_message()), + .map(Message::from_raw), ); // pre-acquire client_token, preventing multiple request while running @@ -218,6 +238,7 @@ impl Spirc { connect_state_update, connect_state_volume_update, playlist_update, + session_update, connect_state_command, user_attributes_update, user_attributes_mutation, @@ -394,6 +415,16 @@ impl SpircTask { break; } }, + session_update = self.session_update.next() => match session_update { + Some(result) => match result { + Ok(session_update) => self.handle_session_update(session_update), + Err(e) => error!("could not parse session update: {}", e), + } + None => { + error!("session update selected, but none received"); + break; + } + }, cmd = async { commands?.recv().await }, if commands.is_some() => if let Some(cmd) = cmd { if let Err(e) = self.handle_command(cmd).await { debug!("could not dispatch command: {}", e); @@ -428,7 +459,7 @@ impl SpircTask { } } - if !self.shutdown { + if !self.shutdown && self.connect_state.active { if let Err(why) = self.notify().await { warn!("notify before unexpected shutdown couldn't be send: {why}") } @@ -781,16 +812,24 @@ impl SpircTask { self.session.device_id() ); - if !cluster.active_device_id.is_empty() { - info!("active device is <{}>", cluster.active_device_id); + if !cluster.active_device_id.is_empty() || !cluster.player_state.session_id.is_empty() { + info!( + "active device is <{}> with session <{}>", + cluster.active_device_id, cluster.player_state.session_id + ); return Ok(()); } else if cluster.transfer_data.is_empty() { debug!("got empty transfer state, do nothing"); return Ok(()); } else { - info!("trying to take over control automatically") + info!( + "trying to take over control automatically, session_id: {}", + cluster.player_state.session_id + ) } + use protobuf::Message; + // todo: handle received pages from transfer, important to not always shuffle the first 10 tracks // also important when the dealer is restarted, currently we just shuffle again, but at least // the 10 tracks provided should be used and after that the new shuffle context @@ -860,7 +899,7 @@ impl SpircTask { &mut self, mut cluster_update: ClusterUpdate, ) -> Result<(), Error> { - let reason = cluster_update.update_reason.enum_value().ok(); + let reason = cluster_update.update_reason.enum_value(); let device_ids = cluster_update.devices_that_changed.join(", "); debug!( @@ -1451,6 +1490,48 @@ impl SpircTask { Ok(()) } + fn handle_session_update(&mut self, mut session_update: SessionUpdate) { + let reason = session_update.reason.enum_value(); + + let mut session = match session_update.session.take() { + None => return, + Some(session) => session, + }; + + let active_device = session._host_active_device_id.take().map(|id| match id { + _host_active_device_id::HostActiveDeviceId(id) => id, + other => { + warn!("unexpected active device id {other:?}"); + String::new() + } + }); + + if matches!(active_device, Some(ref device) if device == self.session.device_id()) { + info!( + "session update: <{:?}> for self, current session_id {}, new session_id {}", + reason, + self.session.session_id(), + session.session_id + ); + + if self.session.session_id() != session.session_id { + self.session.set_session_id(session.session_id.clone()); + self.connect_state.set_session_id(session.session_id); + } + } else { + debug!("session update: <{reason:?}> from active session host: <{active_device:?}>"); + } + + // this seems to be used for jams or handling the current session_id + // + // handling this event was intended to keep the playback when other clients (primarily + // mobile) connects, otherwise they would steel the current playback when there was no + // session_id provided on the initial PutStateReason::NEW_DEVICE state update + // + // by generating an initial session_id from the get-go we prevent that behavior and + // currently don't need to handle this event, might still be useful for later "jam" support + } + fn position(&mut self) -> u32 { match self.play_status { SpircPlayStatus::Stopped => 0, diff --git a/connect/src/state.rs b/connect/src/state.rs index ff4b5f37e..86410f73b 100644 --- a/connect/src/state.rs +++ b/connect/src/state.rs @@ -68,6 +68,7 @@ impl From for Error { #[derive(Debug, Clone)] pub struct ConnectStateConfig { + pub session_id: String, pub initial_volume: u32, pub name: String, pub device_type: DeviceType, @@ -79,6 +80,7 @@ pub struct ConnectStateConfig { impl Default for ConnectStateConfig { fn default() -> Self { Self { + session_id: String::new(), initial_volume: u32::from(u16::MAX) / 2, name: "librespot".to_string(), device_type: DeviceType::Speaker, @@ -91,6 +93,7 @@ impl Default for ConnectStateConfig { #[derive(Default, Debug)] pub struct ConnectState { + pub session_id: String, pub active: bool, pub active_since: Option, @@ -131,6 +134,7 @@ pub struct ConnectState { impl ConnectState { pub fn new(cfg: ConnectStateConfig, session: &Session) -> Self { let mut state = Self { + session_id: cfg.session_id, device: DeviceInfo { can_play: true, volume: cfg.initial_volume, @@ -188,6 +192,7 @@ impl ConnectState { self.queue_count = 0; self.player = PlayerState { + session_id: self.session_id.clone(), is_system_initiated: true, playback_speed: 1., play_origin: MessageField::some(PlayOrigin::new()), @@ -211,6 +216,15 @@ impl ConnectState { } } + pub fn set_origin(&mut self, origin: PlayOrigin) { + self.player.play_origin = MessageField::some(origin) + } + + pub fn set_session_id(&mut self, session_id: String) { + self.session_id = session_id.clone(); + self.player.session_id = session_id; + } + pub(crate) fn set_status(&mut self, status: &SpircPlayStatus) { self.player.is_paused = matches!( status, @@ -378,10 +392,6 @@ impl ConnectState { self.player.timestamp = timestamp; } - pub fn set_origin(&mut self, origin: PlayOrigin) { - self.player.play_origin = MessageField::some(origin) - } - /// Updates the connect state for the connect session /// /// Prepares a [PutStateRequest] from the current connect state diff --git a/core/src/dealer/mod.rs b/core/src/dealer/mod.rs index abcba3e53..d5bff5b1e 100644 --- a/core/src/dealer/mod.rs +++ b/core/src/dealer/mod.rs @@ -168,6 +168,8 @@ fn split_uri(s: &str) -> Option> { ("hm", '/', rest) } else if let Some(rest) = s.strip_prefix("spotify:") { ("spotify", ':', rest) + } else if s.contains('/') { + ("", '/', s) } else { return None; }; diff --git a/core/src/dealer/protocol.rs b/core/src/dealer/protocol.rs index ef343c870..858a10e7c 100644 --- a/core/src/dealer/protocol.rs +++ b/core/src/dealer/protocol.rs @@ -14,6 +14,11 @@ use serde::Deserialize; use serde_json::Error as SerdeError; use thiserror::Error; +const IGNORE_UNKNOWN: protobuf_json_mapping::ParseOptions = protobuf_json_mapping::ParseOptions { + ignore_unknown_fields: true, + _future_options: (), +}; + type JsonValue = serde_json::Value; #[derive(Debug, Error)] @@ -84,11 +89,33 @@ pub(super) enum MessageOrRequest { pub enum PayloadValue { Empty, Raw(Vec), + Json(String), +} + +#[derive(Clone, Debug)] +pub struct Message { + pub headers: HashMap, + pub payload: PayloadValue, + pub uri: String, } -impl PayloadValue { - pub fn into_message(self) -> Result { - match self { +impl Message { + pub fn from_json(value: Self) -> Result { + use protobuf_json_mapping::*; + match value.payload { + PayloadValue::Json(json) => match parse_from_str::(&json) { + Ok(message) => Ok(message), + Err(_) => match parse_from_str_with_options(&json, &IGNORE_UNKNOWN) { + Ok(message) => Ok(message), + Err(why) => Err(Error::failed_precondition(why)), + }, + }, + other => Err(ProtocolError::UnexpectedData(other).into()), + } + } + + pub fn from_raw(value: Self) -> Result { + match value.payload { PayloadValue::Raw(bytes) => { M::parse_from_bytes(&bytes).map_err(Error::failed_precondition) } @@ -97,13 +124,6 @@ impl PayloadValue { } } -#[derive(Clone, Debug)] -pub struct Message { - pub headers: HashMap, - pub payload: PayloadValue, - pub uri: String, -} - impl WebsocketMessage { pub fn handle_payload(&mut self) -> Result { if self.payloads.is_empty() { @@ -118,11 +138,7 @@ impl WebsocketMessage { .decode(string) .map_err(ProtocolError::Base64)?, MessagePayloadValue::Bytes(bytes) => bytes, - MessagePayloadValue::Json(json) => { - return Err(Error::unimplemented(format!( - "Received unknown data from websocket message: {json:?}" - ))) - } + MessagePayloadValue::Json(json) => return Ok(PayloadValue::Json(json.to_string())), }; handle_transfer_encoding(&self.headers, bytes).map(PayloadValue::Raw) diff --git a/core/src/session.rs b/core/src/session.rs index 3ca3e9335..45d54bc6d 100644 --- a/core/src/session.rs +++ b/core/src/session.rs @@ -9,23 +9,6 @@ use std::{ time::{Duration, SystemTime, UNIX_EPOCH}, }; -use byteorder::{BigEndian, ByteOrder}; -use bytes::Bytes; -use futures_core::TryStream; -use futures_util::StreamExt; -use librespot_protocol::authentication::AuthenticationType; -use num_traits::FromPrimitive; -use once_cell::sync::OnceCell; -use parking_lot::RwLock; -use pin_project_lite::pin_project; -use quick_xml::events::Event; -use thiserror::Error; -use tokio::{ - sync::mpsc, - time::{sleep, Duration as TokioDuration, Instant as TokioInstant, Sleep}, -}; -use tokio_stream::wrappers::UnboundedReceiverStream; - use crate::dealer::manager::DealerManager; use crate::{ apresolve::{ApResolver, SocketAddress}, @@ -44,6 +27,23 @@ use crate::{ token::TokenProvider, Error, }; +use byteorder::{BigEndian, ByteOrder}; +use bytes::Bytes; +use futures_core::TryStream; +use futures_util::StreamExt; +use librespot_protocol::authentication::AuthenticationType; +use num_traits::FromPrimitive; +use once_cell::sync::OnceCell; +use parking_lot::RwLock; +use pin_project_lite::pin_project; +use quick_xml::events::Event; +use thiserror::Error; +use tokio::{ + sync::mpsc, + time::{sleep, Duration as TokioDuration, Instant as TokioInstant, Sleep}, +}; +use tokio_stream::wrappers::UnboundedReceiverStream; +use uuid::Uuid; #[derive(Debug, Error)] pub enum SessionError { @@ -79,6 +79,7 @@ pub struct UserData { #[derive(Debug, Clone, Default)] struct SessionData { + session_id: String, client_id: String, client_name: String, client_brand_name: String, @@ -130,6 +131,8 @@ impl Session { let session_data = SessionData { client_id: config.client_id.clone(), + // can be any guid, doesn't need to be simple + session_id: Uuid::new_v4().as_simple().to_string(), ..SessionData::default() }; @@ -382,6 +385,14 @@ impl Session { self.0.data.read().user_data.clone() } + pub fn session_id(&self) -> String { + self.0.data.read().session_id.clone() + } + + pub fn set_session_id(&self, session_id: String) { + self.0.data.write().session_id = session_id.to_owned(); + } + pub fn device_id(&self) -> &str { &self.config().device_id } diff --git a/protocol/build.rs b/protocol/build.rs index fb1e4c839..43971bc84 100644 --- a/protocol/build.rs +++ b/protocol/build.rs @@ -38,6 +38,7 @@ fn compile() { proto_dir.join("storage-resolve.proto"), proto_dir.join("user_attributes.proto"), proto_dir.join("autoplay_context_request.proto"), + proto_dir.join("social_connect_v2.proto"), // TODO: remove these legacy protobufs when we are on the new API completely proto_dir.join("authentication.proto"), proto_dir.join("canvaz.proto"),