Skip to content

Commit

Permalink
core/connect: prevent takeover from other clients, handle session-update
Browse files Browse the repository at this point in the history
  • Loading branch information
photovoltex committed Nov 24, 2024
1 parent b5530bb commit 27c0ab6
Show file tree
Hide file tree
Showing 6 changed files with 188 additions and 67 deletions.
143 changes: 112 additions & 31 deletions connect/src/spirc.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,38 @@
use crate::model::{ResolveContext, SpircPlayStatus};
use crate::state::context::{ContextType, LoadNext, UpdateContext};
use crate::state::provider::IsProvider;
use crate::state::{ConnectState, ConnectStateConfig};
pub use crate::model::{PlayingTrack, SpircLoadCommand};
use crate::{
core::{authentication::Credentials, session::UserAttributes, Error, Session, SpotifyId},
core::{
authentication::Credentials,
dealer::{
manager::{Reply, RequestReply},
protocol::{Message, RequestCommand},
},
session::UserAttributes,
Error, Session, SpotifyId,
},
playback::{
mixer::Mixer,
player::{Player, PlayerEvent, PlayerEventChannel},
},
protocol::{
explicit_content_pubsub::UserAttributesUpdate, user_attributes::UserAttributesMutation,
autoplay_context_request::AutoplayContextRequest,
connect::{Cluster, ClusterUpdate, PutStateReason, SetVolumeCommand},
explicit_content_pubsub::UserAttributesUpdate,
player::{Context, TransferState},
playlist4_external::PlaylistModificationInfo,
social_connect_v2::{session::_host_active_device_id, SessionUpdate},
user_attributes::UserAttributesMutation,
},
};
use crate::{
model::{ResolveContext, SpircPlayStatus},
state::{
context::{ContextType, LoadNext, UpdateContext},
provider::IsProvider,
{ConnectState, ConnectStateConfig},
},
};
use futures_util::{Stream, StreamExt};
use librespot_core::dealer::manager::{Reply, RequestReply};
use librespot_core::dealer::protocol::RequestCommand;
use librespot_protocol::autoplay_context_request::AutoplayContextRequest;
use librespot_protocol::connect::{Cluster, ClusterUpdate, PutStateReason, SetVolumeCommand};
use librespot_protocol::player::{Context, TransferState};
use librespot_protocol::playlist4_external::PlaylistModificationInfo;
use protobuf::{Message, MessageField};
use protobuf::MessageField;
use std::{
future::Future,
pin::Pin,
Expand All @@ -31,8 +44,6 @@ use thiserror::Error;
use tokio::{sync::mpsc, time::sleep};
use tokio_stream::wrappers::UnboundedReceiverStream;

pub use crate::model::{PlayingTrack, SpircLoadCommand};

#[derive(Debug, Error)]
pub enum SpircError {
#[error("response payload empty")]
Expand All @@ -56,6 +67,7 @@ impl From<SpircError> for Error {
}

type BoxedStream<T> = Pin<Box<dyn Stream<Item = T> + Send>>;
type BoxedStreamResult<T> = BoxedStream<Result<T, Error>>;

struct SpircTask {
player: Arc<Player>,
Expand All @@ -66,13 +78,14 @@ struct SpircTask {
play_request_id: Option<u64>,
play_status: SpircPlayStatus,

connection_id_update: BoxedStream<Result<String, Error>>,
connect_state_update: BoxedStream<Result<ClusterUpdate, Error>>,
connect_state_volume_update: BoxedStream<Result<SetVolumeCommand, Error>>,
playlist_update: BoxedStream<Result<PlaylistModificationInfo, Error>>,
connection_id_update: BoxedStreamResult<String>,
connect_state_update: BoxedStreamResult<ClusterUpdate>,
connect_state_volume_update: BoxedStreamResult<SetVolumeCommand>,
playlist_update: BoxedStreamResult<PlaylistModificationInfo>,
session_update: BoxedStreamResult<SessionUpdate>,
connect_state_command: BoxedStream<RequestReply>,
user_attributes_update: BoxedStream<Result<UserAttributesUpdate, Error>>,
user_attributes_mutation: BoxedStream<Result<UserAttributesMutation, Error>>,
user_attributes_update: BoxedStreamResult<UserAttributesUpdate>,
user_attributes_mutation: BoxedStreamResult<UserAttributesMutation>,

commands: Option<mpsc::UnboundedReceiver<SpircCommand>>,
player_events: Option<PlayerEventChannel>,
Expand Down Expand Up @@ -153,21 +166,28 @@ impl Spirc {
session
.dealer()
.listen_for("hm://connect-state/v1/cluster")?
.map(|msg| msg.payload.into_message()),
.map(Message::from_raw),
);

let connect_state_volume_update = Box::pin(
session
.dealer()
.listen_for("hm://connect-state/v1/connect/volume")?
.map(|msg| msg.payload.into_message()),
.map(Message::from_raw),
);

let playlist_update = Box::pin(
session
.dealer()
.listen_for("hm://playlist/v2/playlist/")?
.map(|msg| msg.payload.into_message()),
.map(Message::from_raw),
);

let session_update = Box::pin(
session
.dealer()
.listen_for("social-connect/v2/session_update")?
.map(Message::from_json),
);

let connect_state_command = Box::pin(
Expand All @@ -181,15 +201,15 @@ impl Spirc {
session
.dealer()
.listen_for("spotify:user:attributes:update")?
.map(|msg| msg.payload.into_message()),
.map(Message::from_raw),
);

// can be trigger by toggling autoplay in a desktop client
let user_attributes_mutation = Box::pin(
session
.dealer()
.listen_for("spotify:user:attributes:mutated")?
.map(|msg| msg.payload.into_message()),
.map(Message::from_raw),
);

// pre-acquire client_token, preventing multiple request while running
Expand Down Expand Up @@ -218,6 +238,7 @@ impl Spirc {
connect_state_update,
connect_state_volume_update,
playlist_update,
session_update,
connect_state_command,
user_attributes_update,
user_attributes_mutation,
Expand Down Expand Up @@ -394,6 +415,16 @@ impl SpircTask {
break;
}
},
session_update = self.session_update.next() => match session_update {
Some(result) => match result {
Ok(session_update) => self.handle_session_update(session_update),
Err(e) => error!("could not parse session update: {}", e),
}
None => {
error!("session update selected, but none received");
break;
}
},
cmd = async { commands?.recv().await }, if commands.is_some() => if let Some(cmd) = cmd {
if let Err(e) = self.handle_command(cmd).await {
debug!("could not dispatch command: {}", e);
Expand Down Expand Up @@ -428,7 +459,7 @@ impl SpircTask {
}
}

if !self.shutdown {
if !self.shutdown && self.connect_state.active {
if let Err(why) = self.notify().await {
warn!("notify before unexpected shutdown couldn't be send: {why}")
}
Expand Down Expand Up @@ -781,16 +812,24 @@ impl SpircTask {
self.session.device_id()
);

if !cluster.active_device_id.is_empty() {
info!("active device is <{}>", cluster.active_device_id);
if !cluster.active_device_id.is_empty() || !cluster.player_state.session_id.is_empty() {
info!(
"active device is <{}> with session <{}>",
cluster.active_device_id, cluster.player_state.session_id
);
return Ok(());
} else if cluster.transfer_data.is_empty() {
debug!("got empty transfer state, do nothing");
return Ok(());
} else {
info!("trying to take over control automatically")
info!(
"trying to take over control automatically, session_id: {}",
cluster.player_state.session_id
)
}

use protobuf::Message;

// todo: handle received pages from transfer, important to not always shuffle the first 10 tracks
// also important when the dealer is restarted, currently we just shuffle again, but at least
// the 10 tracks provided should be used and after that the new shuffle context
Expand Down Expand Up @@ -860,7 +899,7 @@ impl SpircTask {
&mut self,
mut cluster_update: ClusterUpdate,
) -> Result<(), Error> {
let reason = cluster_update.update_reason.enum_value().ok();
let reason = cluster_update.update_reason.enum_value();

let device_ids = cluster_update.devices_that_changed.join(", ");
debug!(
Expand Down Expand Up @@ -1451,6 +1490,48 @@ impl SpircTask {
Ok(())
}

fn handle_session_update(&mut self, mut session_update: SessionUpdate) {
let reason = session_update.reason.enum_value();

let mut session = match session_update.session.take() {
None => return,
Some(session) => session,
};

let active_device = session._host_active_device_id.take().map(|id| match id {
_host_active_device_id::HostActiveDeviceId(id) => id,
other => {
warn!("unexpected active device id {other:?}");
String::new()
}
});

if matches!(active_device, Some(ref device) if device == self.session.device_id()) {
info!(
"session update: <{:?}> for self, current session_id {}, new session_id {}",
reason,
self.session.session_id(),
session.session_id
);

if self.session.session_id() != session.session_id {
self.session.set_session_id(session.session_id.clone());
self.connect_state.set_session_id(session.session_id);
}
} else {
debug!("session update: <{reason:?}> from active session host: <{active_device:?}>");
}

// this seems to be used for jams or handling the current session_id
//
// handling this event was intended to keep the playback when other clients (primarily
// mobile) connects, otherwise they would steel the current playback when there was no
// session_id provided on the initial PutStateReason::NEW_DEVICE state update
//
// by generating an initial session_id from the get-go we prevent that behavior and
// currently don't need to handle this event, might still be useful for later "jam" support
}

fn position(&mut self) -> u32 {
match self.play_status {
SpircPlayStatus::Stopped => 0,
Expand Down
18 changes: 14 additions & 4 deletions connect/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ impl From<StateError> for Error {

#[derive(Debug, Clone)]
pub struct ConnectStateConfig {
pub session_id: String,
pub initial_volume: u32,
pub name: String,
pub device_type: DeviceType,
Expand All @@ -79,6 +80,7 @@ pub struct ConnectStateConfig {
impl Default for ConnectStateConfig {
fn default() -> Self {
Self {
session_id: String::new(),
initial_volume: u32::from(u16::MAX) / 2,
name: "librespot".to_string(),
device_type: DeviceType::Speaker,
Expand All @@ -91,6 +93,7 @@ impl Default for ConnectStateConfig {

#[derive(Default, Debug)]
pub struct ConnectState {
pub session_id: String,
pub active: bool,
pub active_since: Option<SystemTime>,

Expand Down Expand Up @@ -131,6 +134,7 @@ pub struct ConnectState {
impl ConnectState {
pub fn new(cfg: ConnectStateConfig, session: &Session) -> Self {
let mut state = Self {
session_id: cfg.session_id,
device: DeviceInfo {
can_play: true,
volume: cfg.initial_volume,
Expand Down Expand Up @@ -188,6 +192,7 @@ impl ConnectState {
self.queue_count = 0;

self.player = PlayerState {
session_id: self.session_id.clone(),
is_system_initiated: true,
playback_speed: 1.,
play_origin: MessageField::some(PlayOrigin::new()),
Expand All @@ -211,6 +216,15 @@ impl ConnectState {
}
}

pub fn set_origin(&mut self, origin: PlayOrigin) {
self.player.play_origin = MessageField::some(origin)
}

pub fn set_session_id(&mut self, session_id: String) {
self.session_id = session_id.clone();
self.player.session_id = session_id;
}

pub(crate) fn set_status(&mut self, status: &SpircPlayStatus) {
self.player.is_paused = matches!(
status,
Expand Down Expand Up @@ -378,10 +392,6 @@ impl ConnectState {
self.player.timestamp = timestamp;
}

pub fn set_origin(&mut self, origin: PlayOrigin) {
self.player.play_origin = MessageField::some(origin)
}

/// Updates the connect state for the connect session
///
/// Prepares a [PutStateRequest] from the current connect state
Expand Down
2 changes: 2 additions & 0 deletions core/src/dealer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,8 @@ fn split_uri(s: &str) -> Option<impl Iterator<Item = &'_ str>> {
("hm", '/', rest)
} else if let Some(rest) = s.strip_prefix("spotify:") {
("spotify", ':', rest)
} else if s.contains('/') {
("", '/', s)
} else {
return None;
};
Expand Down
Loading

0 comments on commit 27c0ab6

Please sign in to comment.