diff --git a/.cargo/config.toml b/.cargo/config.toml index 8cc5ad8..ce8c578 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -1,2 +1,2 @@ [target.x86_64-pc-windows-gnu] -rustflags = "-C link-args=-lssp" # Does does compile without this line \ No newline at end of file +rustflags = "-C link-args=-lssp" # Does not compile without this line \ No newline at end of file diff --git a/.github/FUNDING.yml b/.github/FUNDING.yml new file mode 100644 index 0000000..07fc7c3 --- /dev/null +++ b/.github/FUNDING.yml @@ -0,0 +1 @@ +patreon: rodabafilms \ No newline at end of file diff --git a/src/bot/commands/music/join.rs b/src/bot/commands/music/join.rs index e057b5c..5605c4a 100644 --- a/src/bot/commands/music/join.rs +++ b/src/bot/commands/music/join.rs @@ -1,4 +1,4 @@ -use log::{error, trace}; +use log::error; use serenity::{ builder::CreateApplicationCommand, model::prelude::{interaction::application_command::ApplicationCommandInteraction, Channel}, @@ -167,13 +167,13 @@ pub fn run(ctx: Context, command: ApplicationCommandInteraction) -> CommandOutpu } let data = ctx.data.read().await; - let mut session_manager = data.get::().unwrap().clone(); + let session_manager = data.get::().unwrap().clone(); // Check if another session is already active in this server let mut session_opt = session_manager.get_session(guild.id).await; if let Some(session) = &session_opt { - if let Some(owner) = session.get_owner().await { + if let Some(owner) = session.owner().await { let msg = if owner == command.user.id { "You are already controlling the bot" } else { @@ -206,7 +206,7 @@ pub fn run(ctx: Context, command: ApplicationCommandInteraction) -> CommandOutpu .description( format!( "You are already playing music in another server ({}).\nStop playing in that server first before joining this one.", - ctx.cache.guild(session.get_guild_id()).unwrap().name + ctx.cache.guild(session.guild_id().await).unwrap().name )).status(Status::Error).build(), true, ) @@ -218,8 +218,7 @@ pub fn run(ctx: Context, command: ApplicationCommandInteraction) -> CommandOutpu defer_message(&ctx, &command, false).await; if let Some(session) = &session_opt { - trace!("{} != {}", session.get_channel_id(), channel_id); - if session.get_channel_id() != channel_id { + if session.channel_id().await != channel_id { session.disconnect().await; session_opt = None; @@ -228,7 +227,7 @@ pub fn run(ctx: Context, command: ApplicationCommandInteraction) -> CommandOutpu } } - if let Some(session) = &session_opt { + if let Some(session) = session_opt.as_mut() { if let Err(why) = session.update_owner(&ctx, command.user.id).await { // Need to link first if let SessionCreateError::NoSpotifyError = why { diff --git a/src/bot/commands/music/leave.rs b/src/bot/commands/music/leave.rs index 36802ab..f850402 100644 --- a/src/bot/commands/music/leave.rs +++ b/src/bot/commands/music/leave.rs @@ -36,7 +36,7 @@ pub fn run(ctx: Context, command: ApplicationCommandInteraction) -> CommandOutpu } }; - if let Some(owner) = session.get_owner().await { + if let Some(owner) = session.owner().await { if owner != command.user.id { // This message was generated by AI, and I love it. respond_message( diff --git a/src/bot/commands/music/playing.rs b/src/bot/commands/music/playing.rs index 1abd137..f7080f0 100644 --- a/src/bot/commands/music/playing.rs +++ b/src/bot/commands/music/playing.rs @@ -48,7 +48,7 @@ pub fn run(ctx: Context, command: ApplicationCommandInteraction) -> CommandOutpu } }; - let owner = match session.get_owner().await { + let owner = match session.owner().await { Some(owner) => owner, None => { not_playing.await; @@ -58,7 +58,7 @@ pub fn run(ctx: Context, command: ApplicationCommandInteraction) -> CommandOutpu }; // Get Playback Info from session - let pbi = match session.get_playback_info().await { + let pbi = match session.playback_info().await { Some(pbi) => pbi, None => { not_playing.await; diff --git a/src/ipc/packet.rs b/src/ipc/packet.rs index ca327d4..159b904 100644 --- a/src/ipc/packet.rs +++ b/src/ipc/packet.rs @@ -2,14 +2,22 @@ use serde::{Deserialize, Serialize}; #[derive(Serialize, Deserialize, Debug, Clone)] pub enum IpcPacket { + /// Quit the player process Quit, + /// Connect to Spotify with the given token and device name Connect(String, String), + + /// Disconnect from Spotify (unused) Disconnect, + /// Unable to connect to Spotify ConnectError(String), + /// The audio sink has started writing StartPlayback, + + /// The audio sink has stopped writing StopPlayback, /// The current Spotify track was changed diff --git a/src/player.rs b/src/player.rs index ab80fc3..894b5ae 100644 --- a/src/player.rs +++ b/src/player.rs @@ -1,4 +1,4 @@ -use std::time::Duration; +use std::{process::exit, time::Duration}; use ipc_channel::ipc::{IpcError, TryRecvError}; use librespot::{ @@ -95,8 +95,8 @@ impl SpoticordPlayer { let (spirc, spirc_task) = Spirc::new( ConnectConfig { name: device_name.into(), - // 75% - initial_volume: Some((65535 / 4) * 3), + // 50% + initial_volume: Some(65535 / 2), ..ConnectConfig::default() }, session.clone(), @@ -275,8 +275,7 @@ pub async fn main() { IpcPacket::Quit => { debug!("Received quit packet, exiting"); - player.stop(); - break; + exit(0); } _ => { diff --git a/src/session/manager.rs b/src/session/manager.rs index 2f71621..41ce67e 100644 --- a/src/session/manager.rs +++ b/src/session/manager.rs @@ -10,6 +10,9 @@ use super::SpoticordSession; #[derive(Debug, Error)] pub enum SessionCreateError { + #[error("This session has no owner assigned")] + NoOwnerError, + #[error("The user has not linked their Spotify account")] NoSpotifyError, @@ -24,20 +27,22 @@ pub enum SessionCreateError { } #[derive(Clone)] -pub struct SessionManager { - sessions: Arc>>>, - owner_map: Arc>>, -} +pub struct SessionManager(Arc>); impl TypeMapKey for SessionManager { type Value = SessionManager; } -impl SessionManager { - pub fn new() -> SessionManager { - SessionManager { - sessions: Arc::new(tokio::sync::RwLock::new(HashMap::new())), - owner_map: Arc::new(tokio::sync::RwLock::new(HashMap::new())), +pub struct InnerSessionManager { + sessions: HashMap, + owner_map: HashMap, +} + +impl InnerSessionManager { + pub fn new() -> Self { + Self { + sessions: HashMap::new(), + owner_map: HashMap::new(), } } @@ -54,75 +59,60 @@ impl SessionManager { let session = SpoticordSession::new(ctx, guild_id, channel_id, text_channel_id, owner_id).await?; - let mut sessions = self.sessions.write().await; - let mut owner_map = self.owner_map.write().await; - - sessions.insert(guild_id, Arc::new(session)); - owner_map.insert(owner_id, guild_id); + self.sessions.insert(guild_id, session); + self.owner_map.insert(owner_id, guild_id); Ok(()) } /// Remove a session pub async fn remove_session(&mut self, guild_id: GuildId) { - let mut sessions = self.sessions.write().await; - - if let Some(session) = sessions.get(&guild_id) { - if let Some(owner) = session.get_owner().await { - let mut owner_map = self.owner_map.write().await; - owner_map.remove(&owner); + if let Some(session) = self.sessions.get(&guild_id) { + if let Some(owner) = session.owner().await { + self.owner_map.remove(&owner); } } - sessions.remove(&guild_id); + self.sessions.remove(&guild_id); } /// Remove owner from owner map. /// Used whenever a user stops playing music without leaving the bot. - pub async fn remove_owner(&mut self, owner_id: UserId) { - let mut owner_map = self.owner_map.write().await; - owner_map.remove(&owner_id); + pub fn remove_owner(&mut self, owner_id: UserId) { + self.owner_map.remove(&owner_id); } /// Set the owner of a session /// Used when a user joins a session that is already active - pub async fn set_owner(&mut self, owner_id: UserId, guild_id: GuildId) { - let mut owner_map = self.owner_map.write().await; - owner_map.insert(owner_id, guild_id); + pub fn set_owner(&mut self, owner_id: UserId, guild_id: GuildId) { + self.owner_map.insert(owner_id, guild_id); } /// Get a session by its guild ID - pub async fn get_session(&self, guild_id: GuildId) -> Option> { - let sessions = self.sessions.read().await; - - sessions.get(&guild_id).cloned() + pub fn get_session(&self, guild_id: GuildId) -> Option { + self.sessions.get(&guild_id).cloned() } /// Find a Spoticord session by their current owner's ID - pub async fn find(&self, owner_id: UserId) -> Option> { - let sessions = self.sessions.read().await; - let owner_map = self.owner_map.read().await; + pub fn find(&self, owner_id: UserId) -> Option { + let guild_id = self.owner_map.get(&owner_id)?; - let guild_id = owner_map.get(&owner_id)?; - - sessions.get(&guild_id).cloned() + self.sessions.get(&guild_id).cloned() } /// Get the amount of sessions - pub async fn get_session_count(&self) -> usize { - let sessions = self.sessions.read().await; - - sessions.len() + pub fn get_session_count(&self) -> usize { + self.sessions.len() } /// Get the amount of sessions with an owner pub async fn get_active_session_count(&self) -> usize { - let sessions = self.sessions.read().await; - let mut count: usize = 0; - for session in sessions.values() { - if session.owner.read().await.is_some() { + for session in self.sessions.values() { + let session = session.0.read().await; + + if session.owner.is_some() { count += 1; } } @@ -130,3 +120,65 @@ impl SessionManager { count } } + +impl SessionManager { + pub fn new() -> Self { + Self(Arc::new(tokio::sync::RwLock::new( + InnerSessionManager::new(), + ))) + } + + /// Creates a new session for the given user in the given guild. + pub async fn create_session( + &self, + ctx: &Context, + guild_id: GuildId, + channel_id: ChannelId, + text_channel_id: ChannelId, + owner_id: UserId, + ) -> Result<(), SessionCreateError> { + self + .0 + .write() + .await + .create_session(ctx, guild_id, channel_id, text_channel_id, owner_id) + .await + } + + /// Remove a session + pub async fn remove_session(&self, guild_id: GuildId) { + self.0.write().await.remove_session(guild_id).await; + } + + /// Remove owner from owner map. + /// Used whenever a user stops playing music without leaving the bot. + pub async fn remove_owner(&self, owner_id: UserId) { + self.0.write().await.remove_owner(owner_id); + } + + /// Set the owner of a session + /// Used when a user joins a session that is already active + pub async fn set_owner(&self, owner_id: UserId, guild_id: GuildId) { + self.0.write().await.set_owner(owner_id, guild_id); + } + + /// Get a session by its guild ID + pub async fn get_session(&self, guild_id: GuildId) -> Option { + self.0.read().await.get_session(guild_id) + } + + /// Find a Spoticord session by their current owner's ID + pub async fn find(&self, owner_id: UserId) -> Option { + self.0.read().await.find(owner_id) + } + + /// Get the amount of sessions + pub async fn get_session_count(&self) -> usize { + self.0.read().await.get_session_count() + } + + /// Get the amount of sessions with an owner + pub async fn get_active_session_count(&self) -> usize { + self.0.read().await.get_active_session_count().await + } +} diff --git a/src/session/mod.rs b/src/session/mod.rs index 17fa28d..cc5ec32 100644 --- a/src/session/mod.rs +++ b/src/session/mod.rs @@ -34,8 +34,10 @@ pub mod manager; mod pbi; #[derive(Clone)] -pub struct SpoticordSession { - owner: Arc>>, +pub struct SpoticordSession(Arc>); + +struct InnerSpoticordSession { + owner: Option, guild_id: GuildId, channel_id: ChannelId, text_channel_id: ChannelId, @@ -45,13 +47,13 @@ pub struct SpoticordSession { session_manager: SessionManager, call: Arc>, - track: TrackHandle, + track: Option, - playback_info: Arc>>, + playback_info: Option, - disconnect_handle: Arc>>>, + disconnect_handle: Option>, - client: Client, + client: Option, } impl SpoticordSession { @@ -64,9 +66,87 @@ impl SpoticordSession { ) -> Result { // Get the Spotify token of the owner let data = ctx.data.read().await; - let database = data.get::().unwrap(); let session_manager = data.get::().unwrap().clone(); + // Join the voice channel + let songbird = songbird::get(ctx).await.unwrap().clone(); + + let (call, result) = songbird.join(guild_id, channel_id).await; + + if let Err(why) = result { + error!("Error joining voice channel: {:?}", why); + return Err(SessionCreateError::JoinError(channel_id, guild_id)); + } + + let inner = InnerSpoticordSession { + owner: Some(owner_id.clone()), + guild_id, + channel_id, + text_channel_id, + http: ctx.http.clone(), + session_manager: session_manager.clone(), + call: call.clone(), + track: None, + playback_info: None, + disconnect_handle: None, + client: None, + }; + + let mut instance = Self(Arc::new(RwLock::new(inner))); + + instance.create_player(ctx).await?; + + let mut call = call.lock().await; + + // Set up events + call.add_global_event( + songbird::Event::Core(songbird::CoreEvent::DriverDisconnect), + instance.clone(), + ); + + call.add_global_event( + songbird::Event::Core(songbird::CoreEvent::ClientDisconnect), + instance.clone(), + ); + + Ok(instance) + } + + pub async fn update_owner( + &mut self, + ctx: &Context, + owner_id: UserId, + ) -> Result<(), SessionCreateError> { + // Get the Spotify token of the owner + let data = ctx.data.read().await; + let session_manager = data.get::().unwrap().clone(); + + { + let mut inner = self.0.write().await; + inner.owner = Some(owner_id); + } + + { + let inner = self.0.clone(); + let inner = inner.read().await; + session_manager.set_owner(owner_id, inner.guild_id).await; + } + + // Create the player + self.create_player(ctx).await?; + + Ok(()) + } + + async fn create_player(&mut self, ctx: &Context) -> Result<(), SessionCreateError> { + let owner_id = match self.owner().await.clone() { + Some(owner_id) => owner_id, + None => return Err(SessionCreateError::NoOwnerError), + }; + + let data = ctx.data.read().await; + let database = data.get::().unwrap(); + let token = match database.get_access_token(owner_id.to_string()).await { Ok(token) => token, Err(why) => { @@ -97,21 +177,15 @@ impl SpoticordSession { } }; - // Join the voice channel - let songbird = songbird::get(ctx).await.unwrap().clone(); - - let (call, result) = songbird.join(guild_id, channel_id).await; - - if let Err(why) = result { - error!("Error joining voice channel: {:?}", why); - return Err(SessionCreateError::JoinError(channel_id, guild_id)); - } - - let mut call_mut = call.lock().await; - // Spawn player process let child = match Command::new(std::env::current_exe().unwrap()) - .args(["--player", &tx_name, &rx_name]) + .args([ + "--player", + &tx_name, + &rx_name, + "--debug-guild-id", + &self.guild_id().await.to_string(), + ]) .stdout(Stdio::piped()) .stderr(Stdio::inherit()) .spawn() @@ -141,252 +215,186 @@ impl SpoticordSession { create_player(Input::new(true, reader, Codec::Pcm, Container::Raw, None)); track.pause(); + let call = self.call().await; + let mut call = call.lock().await; + // Set call audio to track - call_mut.play_only(track); - - let instance = Self { - owner: Arc::new(RwLock::new(Some(owner_id.clone()))), - guild_id, - channel_id, - text_channel_id, - http: ctx.http.clone(), - session_manager: session_manager.clone(), - call: call.clone(), - track: track_handle.clone(), - playback_info: Arc::new(RwLock::new(None)), - disconnect_handle: Arc::new(Mutex::new(None)), - client: client.clone(), - }; - - // Clone variables for use in the IPC handler - let ipc_track = track_handle.clone(); - let ipc_client = client.clone(); - let ipc_context = ctx.clone(); - let mut ipc_instance = instance.clone(); + call.play_only(track); // Handle IPC packets // This will automatically quit once the IPC connection is closed - tokio::spawn(async move { - let check_result = |result| { - if let Err(why) = result { - error!("Failed to issue track command: {:?}", why); - } - }; + tokio::spawn({ + let track = track_handle.clone(); + let client = client.clone(); + let ctx = ctx.clone(); + let instance = self.clone(); + let inner = self.0.clone(); - loop { - // Required for IpcPacket::TrackChange to work - tokio::task::yield_now().await; - - let msg = match ipc_client.try_recv() { - Ok(msg) => msg, - Err(why) => { - if let TryRecvError::Empty = why { - // No message, wait a bit and try again - tokio::time::sleep(Duration::from_millis(25)).await; - - continue; - } else if let TryRecvError::IpcError(why) = &why { - if let IpcError::Disconnected = why { - trace!("IPC connection closed, exiting IPC handler"); - break; - } - } - - error!("Failed to receive IPC message: {:?}", why); - break; + async move { + let check_result = |result| { + if let Err(why) = result { + error!("Failed to issue track command: {:?}", why); } }; - trace!("Received IPC message: {:?}", msg); + loop { + // Required for IpcPacket::TrackChange to work + tokio::task::yield_now().await; - match msg { - // Session connect error - IpcPacket::ConnectError(why) => { - error!("Failed to connect to Spotify: {:?}", why); + let msg = match client.try_recv() { + Ok(msg) => msg, + Err(why) => { + if let TryRecvError::Empty = why { + // No message, wait a bit and try again + tokio::time::sleep(Duration::from_millis(25)).await; - // Notify the user in the text channel - if let Err(why) = ipc_instance - .text_channel_id - .send_message(&ipc_instance.http, |message| { - message.embed(|embed| { - embed.title("Failed to connect to Spotify"); - embed.description(why); - embed.color(Status::Error as u64); - - embed - }); - - message - }) - .await - { - error!("Failed to send error message: {:?}", why); - } - - // Clean up session - ipc_instance.player_stopped().await; - - break; - } - - // Sink requests playback to start/resume - IpcPacket::StartPlayback => { - check_result(ipc_track.play()); - } - - // Sink requests playback to pause - IpcPacket::StopPlayback => { - check_result(ipc_track.pause()); - } - - // A new track has been set by the player - IpcPacket::TrackChange(track) => { - // Convert to SpotifyId - let track_id = SpotifyId::from_uri(&track).unwrap(); - - let mut instance = ipc_instance.clone(); - let context = ipc_context.clone(); - - // Fetch track info - // This is done in a separate task to avoid blocking the IPC handler - tokio::spawn(async move { - if let Err(why) = instance.update_track(&context, &owner_id, track_id).await { - error!("Failed to update track: {:?}", why); - - instance.player_stopped().await; + continue; + } else if let TryRecvError::IpcError(why) = &why { + if let IpcError::Disconnected = why { + trace!("IPC connection closed, exiting IPC handler"); + break; + } } - }); - } - // The player has started playing a track - IpcPacket::Playing(track, position_ms, duration_ms) => { - // Convert to SpotifyId - let track_id = SpotifyId::from_uri(&track).unwrap(); + error!("Failed to receive IPC message: {:?}", why); + break; + } + }; - let was_none = ipc_instance - .update_playback(duration_ms, position_ms, true) - .await; + trace!("Received IPC message: {:?}", msg); - if was_none { - // Stop player if update track fails - if let Err(why) = ipc_instance - .update_track(&ipc_context, &owner_id, track_id) + match msg { + // Session connect error + IpcPacket::ConnectError(why) => { + error!("Failed to connect to Spotify: {:?}", why); + + // Notify the user in the text channel + if let Err(why) = instance + .text_channel_id() + .await + .send_message(&instance.http().await, |message| { + message.embed(|embed| { + embed.title("Failed to connect to Spotify"); + embed.description(why); + embed.color(Status::Error as u64); + + embed + }); + + message + }) .await { - error!("Failed to update track: {:?}", why); + error!("Failed to send error message: {:?}", why); + } - ipc_instance.player_stopped().await; - return; + // Clean up session + instance.player_stopped().await; + + break; + } + + // Sink requests playback to start/resume + IpcPacket::StartPlayback => { + check_result(track.play()); + } + + // Sink requests playback to pause + IpcPacket::StopPlayback => { + check_result(track.pause()); + } + + // A new track has been set by the player + IpcPacket::TrackChange(track) => { + // Convert to SpotifyId + let track_id = SpotifyId::from_uri(&track).unwrap(); + + let instance = instance.clone(); + let ctx = ctx.clone(); + + // Fetch track info + // This is done in a separate task to avoid blocking the IPC handler + tokio::spawn(async move { + if let Err(why) = instance.update_track(&ctx, &owner_id, track_id).await { + error!("Failed to update track: {:?}", why); + + instance.player_stopped().await; + } + }); + } + + // The player has started playing a track + IpcPacket::Playing(track, position_ms, duration_ms) => { + // Convert to SpotifyId + let track_id = SpotifyId::from_uri(&track).unwrap(); + + let was_none = instance + .update_playback(duration_ms, position_ms, true) + .await; + + if was_none { + // Stop player if update track fails + if let Err(why) = instance.update_track(&ctx, &owner_id, track_id).await { + error!("Failed to update track: {:?}", why); + + instance.player_stopped().await; + return; + } } } - } - IpcPacket::Paused(track, position_ms, duration_ms) => { - ipc_instance.start_disconnect_timer().await; + IpcPacket::Paused(track, position_ms, duration_ms) => { + instance.start_disconnect_timer().await; - // Convert to SpotifyId - let track_id = SpotifyId::from_uri(&track).unwrap(); + // Convert to SpotifyId + let track_id = SpotifyId::from_uri(&track).unwrap(); - let was_none = ipc_instance - .update_playback(duration_ms, position_ms, false) - .await; + let was_none = instance + .update_playback(duration_ms, position_ms, false) + .await; - if was_none { - // Stop player if update track fails - if let Err(why) = ipc_instance - .update_track(&ipc_context, &owner_id, track_id) - .await - { - error!("Failed to update track: {:?}", why); + if was_none { + // Stop player if update track fails - ipc_instance.player_stopped().await; - return; + if let Err(why) = instance.update_track(&ctx, &owner_id, track_id).await { + error!("Failed to update track: {:?}", why); + + instance.player_stopped().await; + return; + } } } + + IpcPacket::Stopped => { + check_result(track.pause()); + + let mut inner = inner.write().await; + inner.playback_info.take(); + + instance.start_disconnect_timer().await; + } + + // Ignore other packets + _ => {} } - - IpcPacket::Stopped => { - check_result(ipc_track.pause()); - - ipc_instance.playback_info.write().await.take(); - ipc_instance.start_disconnect_timer().await; - } - - // Ignore other packets - _ => {} } } }); - // Set up events - call_mut.add_global_event( - songbird::Event::Core(songbird::CoreEvent::DriverDisconnect), - instance.clone(), - ); - - call_mut.add_global_event( - songbird::Event::Core(songbird::CoreEvent::ClientDisconnect), - instance.clone(), - ); - // Inform the player process to connect to Spotify if let Err(why) = client.send(IpcPacket::Connect(token, user.device_name)) { error!("Failed to send IpcPacket::Connect packet: {:?}", why); } - Ok(instance) - } - - pub async fn update_owner( - &self, - ctx: &Context, - owner_id: UserId, - ) -> Result<(), SessionCreateError> { - // Get the Spotify token of the owner - let data = ctx.data.read().await; - let database = data.get::().unwrap(); - let mut session_manager = data.get::().unwrap().clone(); - - let token = match database.get_access_token(owner_id.to_string()).await { - Ok(token) => token, - Err(why) => { - if let DatabaseError::InvalidStatusCode(code) = why { - if code == 404 { - return Err(SessionCreateError::NoSpotifyError); - } - } - - return Err(SessionCreateError::DatabaseError); - } - }; - - let user = match database.get_user(owner_id.to_string()).await { - Ok(user) => user, - Err(why) => { - error!("Failed to get user: {:?}", why); - return Err(SessionCreateError::DatabaseError); - } - }; - - { - let mut owner = self.owner.write().await; - *owner = Some(owner_id); - } - - session_manager.set_owner(owner_id, self.guild_id).await; - - // Inform the player process to connect to Spotify - if let Err(why) = self - .client - .send(IpcPacket::Connect(token, user.device_name)) - { - error!("Failed to send IpcPacket::Connect packet: {:?}", why); - } + // Update inner client and track + let mut inner = self.0.write().await; + inner.track = Some(track_handle); + inner.client = Some(client); Ok(()) } - // Update current track + /// Update current track async fn update_track( &self, ctx: &Context, @@ -394,9 +402,9 @@ impl SpoticordSession { spotify_id: SpotifyId, ) -> Result<(), String> { let should_update = { - let pbi = self.playback_info.read().await; + let pbi = self.playback_info().await; - if let Some(pbi) = &*pbi { + if let Some(pbi) = pbi { pbi.spotify_id.is_none() || pbi.spotify_id.unwrap() != spotify_id } else { false @@ -447,9 +455,10 @@ impl SpoticordSession { episode = Some(episode_info); } - let mut pbi = self.playback_info.write().await; + // Update track/episode + let mut inner = self.0.write().await; - if let Some(pbi) = &mut *pbi { + if let Some(pbi) = inner.playback_info.as_mut() { pbi.update_track_episode(spotify_id, track, episode); } @@ -457,74 +466,176 @@ impl SpoticordSession { } /// Called when the player must stop, but not leave the call - async fn player_stopped(&mut self) { - // Disconnect from Spotify - if let Err(why) = self.client.send(IpcPacket::Disconnect) { - error!("Failed to send disconnect packet: {:?}", why); + async fn player_stopped(&self) { + let mut inner = self.0.write().await; + + if let Some(client) = inner.client.take() { + // Ask player to quit (will cause defunct process) + if let Err(why) = client.send(IpcPacket::Quit) { + error!("Failed to send quit packet: {:?}", why); + } } - /* So this is really annoying, but necessary. - * If we pause songbird too quickly, the player would still have - * some audio to write to stdout. Because songbird is paused, - * the audio is never read, and the player process will hang. - * - * So yeah we just blatanly wait a second, and hope that's enough. - * Most likely causes issues when the system is a bit slow. - */ - tokio::time::sleep(Duration::from_millis(1000)).await; - - if let Err(why) = self.track.pause() { - error!("Failed to pause track: {:?}", why); + if let Some(track) = inner.track.take() { + // Stop the playback, and freeing the child handle, removing the defunct process + if let Err(why) = track.stop() { + error!("Failed to stop track: {:?}", why); + } } // Clear owner - let mut owner = self.owner.write().await; - if let Some(owner_id) = owner.take() { - self.session_manager.remove_owner(owner_id).await; + if let Some(owner_id) = inner.owner.take() { + inner.session_manager.remove_owner(owner_id).await; } // Clear playback info - let mut playback_info = self.playback_info.write().await; - *playback_info = None; + inner.playback_info = None; + + // Unlock to prevent deadlock in start_disconnect_timer + drop(inner); // Disconnect automatically after some time self.start_disconnect_timer().await; } - /// Internal version of disconnect, which does not abort the disconnect timer - async fn disconnect_no_abort(&self) { - self - .session_manager - .clone() - .remove_session(self.guild_id) - .await; - - let mut call = self.call.lock().await; - - self.track.stop().unwrap_or(()); - call.remove_all_global_events(); - - if let Err(why) = call.leave().await { - error!("Failed to leave voice channel: {:?}", why); - } - } - // Disconnect from voice channel and remove session from manager pub async fn disconnect(&self) { - info!("Disconnecting from voice channel {}", self.channel_id); + info!( + "[{}] Disconnecting from voice channel {}", + self.guild_id().await, + self.channel_id().await + ); - self.disconnect_no_abort().await; + // We must run disconnect_no_abort within a read lock + // This is because `SessionManager::remove_session` will acquire a + // read lock to read the current owner. + // This would deadlock if we have an active write lock + { + let inner = self.0.read().await; + inner.disconnect_no_abort().await; + } // Stop the disconnect timer, if one is running - let mut dc_handle = self.disconnect_handle.lock().await; - - if let Some(handle) = dc_handle.take() { + let mut inner = self.0.write().await; + if let Some(handle) = inner.disconnect_handle.take() { handle.abort(); } } - /// Disconnect from voice channel with a message - pub async fn disconnect_with_message(&self, content: &str) { + // Update playback info (duration, position, playing state) + async fn update_playback(&self, duration_ms: u32, position_ms: u32, playing: bool) -> bool { + let is_none = { + let pbi = self.playback_info().await; + + pbi.is_none() + }; + + let mut inner = self.0.write().await; + + if is_none { + inner.playback_info = Some(PlaybackInfo::new(duration_ms, position_ms, playing)); + } else { + // Update position, duration and playback state + inner + .playback_info + .as_mut() + .unwrap() + .update_pos_dur(position_ms, duration_ms, playing); + }; + + is_none + } + + /// Start the disconnect timer, which will disconnect the bot from the voice channel after a + /// certain amount of time + async fn start_disconnect_timer(&self) { + let inner_arc = self.0.clone(); + let mut inner = inner_arc.write().await; + + // Abort the previous timer, if one is running + if let Some(handle) = inner.disconnect_handle.take() { + handle.abort(); + } + + inner.disconnect_handle = Some(tokio::spawn({ + let inner = inner_arc.clone(); + + async move { + let mut timer = tokio::time::interval(Duration::from_secs(DISCONNECT_TIME)); + + // Ignore first (immediate) tick + timer.tick().await; + timer.tick().await; + + // Make sure this task has not been aborted, if it has this will automatically stop execution. + tokio::task::yield_now().await; + + // Lock the inner mutex as late as possible to prevent any deadlocks + let mut inner = inner.write().await; + + let is_playing = { + if let Some(ref pbi) = inner.playback_info { + pbi.is_playing + } else { + false + } + }; + + if !is_playing { + info!("Player is not playing, disconnecting"); + inner + .disconnect_with_message( + "The player has been inactive for too long, and has been disconnected.", + ) + .await; + } + } + })); + } + + /* Inner getters */ + + /// Get the owner + pub async fn owner(&self) -> Option { + self.0.read().await.owner + } + + /// Get the session manager + pub async fn session_manager(&self) -> SessionManager { + self.0.read().await.session_manager.clone() + } + + /// Get the guild id + pub async fn guild_id(&self) -> GuildId { + self.0.read().await.guild_id + } + + /// Get the channel id + pub async fn channel_id(&self) -> ChannelId { + self.0.read().await.channel_id + } + + /// Get the channel id + pub async fn text_channel_id(&self) -> ChannelId { + self.0.read().await.text_channel_id + } + + /// Get the playback info + pub async fn playback_info(&self) -> Option { + self.0.read().await.playback_info.clone() + } + + pub async fn call(&self) -> Arc> { + self.0.read().await.call.clone() + } + + pub async fn http(&self) -> Arc { + self.0.read().await.http.clone() + } +} + +impl InnerSpoticordSession { + pub async fn disconnect_with_message(&mut self, content: &str) { self.disconnect_no_abort().await; if let Err(why) = self @@ -544,102 +655,32 @@ impl SpoticordSession { } // Stop the disconnect timer, if one is running - let mut dc_handle = self.disconnect_handle.lock().await; - - if let Some(handle) = dc_handle.take() { + if let Some(handle) = self.disconnect_handle.take() { handle.abort(); } } - // Update playback info (duration, position, playing state) - async fn update_playback(&self, duration_ms: u32, position_ms: u32, playing: bool) -> bool { - let is_none = { - let pbi = self.playback_info.read().await; + /// Internal version of disconnect, which does not abort the disconnect timer + async fn disconnect_no_abort(&self) { + self.session_manager.remove_session(self.guild_id).await; - pbi.is_none() - }; + let mut call = self.call.lock().await; - if is_none { - let mut pbi = self.playback_info.write().await; - *pbi = Some(PlaybackInfo::new(duration_ms, position_ms, playing)); - } else { - let mut pbi = self.playback_info.write().await; - - // Update position, duration and playback state - pbi - .as_mut() - .unwrap() - .update_pos_dur(position_ms, duration_ms, playing) - .await; - }; - - is_none - } - - /// Start the disconnect timer, which will disconnect the bot from the voice channel after a - /// certain amount of time - async fn start_disconnect_timer(&self) { - let pbi = self.playback_info.clone(); - let instance = self.clone(); - - let mut handle = self.disconnect_handle.lock().await; - - // Abort the previous timer, if one is running - if let Some(handle) = handle.take() { - handle.abort(); + if let Some(ref track) = self.track { + track.stop().unwrap_or(()); } - *handle = Some(tokio::spawn(async move { - let mut timer = tokio::time::interval(Duration::from_secs(DISCONNECT_TIME)); + call.remove_all_global_events(); - // Ignore first (immediate) tick - timer.tick().await; - timer.tick().await; - - // Make sure this task has not been aborted, if it has this will automatically stop execution. - tokio::task::yield_now().await; - - let is_playing = { - let pbi = pbi.read().await; - - if let Some(pbi) = &*pbi { - pbi.is_playing - } else { - false - } - }; - - if !is_playing { - info!("Player is not playing, disconnecting"); - instance - .disconnect_with_message( - "The player has been inactive for too long, and has been disconnected.", - ) - .await; - } - })); + if let Err(why) = call.leave().await { + error!("Failed to leave voice channel: {:?}", why); + } } +} - // Get the playback info for the current track - pub async fn get_playback_info(&self) -> Option { - self.playback_info.read().await.clone() - } - - // Get the current owner of this session - pub async fn get_owner(&self) -> Option { - let owner = self.owner.read().await; - - *owner - } - - // Get the server id this session is playing in - pub fn get_guild_id(&self) -> GuildId { - self.guild_id - } - - // Get the channel id this session is playing in - pub fn get_channel_id(&self) -> ChannelId { - self.channel_id +impl Drop for InnerSpoticordSession { + fn drop(&mut self) { + trace!("Dropping inner session"); } } @@ -654,11 +695,16 @@ impl EventHandler for SpoticordSession { EventContext::ClientDisconnect(who) => { trace!("Client disconnected, {}", who.user_id.to_string()); - if let Some(session) = self.session_manager.find(UserId(who.user_id.0)).await { - if session.get_guild_id() == self.guild_id && session.get_channel_id() == self.channel_id + if let Some(session) = self + .session_manager() + .await + .find(UserId(who.user_id.0)) + .await + { + if session.guild_id().await == self.guild_id().await + && session.channel_id().await == self.channel_id().await { - // Clone because haha immutable references - self.clone().player_stopped().await; + self.player_stopped().await; } } } diff --git a/src/session/pbi.rs b/src/session/pbi.rs index fea90d6..2443b1b 100644 --- a/src/session/pbi.rs +++ b/src/session/pbi.rs @@ -30,7 +30,7 @@ impl PlaybackInfo { } /// Update position, duration and playback state - pub async fn update_pos_dur(&mut self, position_ms: u32, duration_ms: u32, is_playing: bool) { + pub fn update_pos_dur(&mut self, position_ms: u32, duration_ms: u32, is_playing: bool) { self.position_ms = position_ms; self.duration_ms = duration_ms; self.is_playing = is_playing;