From 2e273cdcded24d76ba21d9b5c2c9029a4856d11a Mon Sep 17 00:00:00 2001 From: DaXcess Date: Mon, 18 Sep 2023 22:39:11 +0200 Subject: [PATCH] Almost ready --- .github/FUNDING.yml | 1 - Cargo.lock | 27 +++- Cargo.toml | 23 +-- src/audio/mod.rs | 12 +- src/{player => audio}/stream.rs | 22 ++- src/bot/commands/core/version.rs | 2 +- src/bot/commands/music/join.rs | 2 +- src/main.rs | 2 + src/player/mod.rs | 22 ++- src/session/manager.rs | 13 ++ src/session/mod.rs | 244 ++++++++++++------------------- 11 files changed, 189 insertions(+), 181 deletions(-) delete mode 100644 .github/FUNDING.yml rename src/{player => audio}/stream.rs (70%) diff --git a/.github/FUNDING.yml b/.github/FUNDING.yml deleted file mode 100644 index 07fc7c3..0000000 --- a/.github/FUNDING.yml +++ /dev/null @@ -1 +0,0 @@ -patreon: rodabafilms \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index e94299f..0c836ae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1249,7 +1249,7 @@ dependencies = [ "shell-words", "thiserror", "tokio", - "zerocopy", + "zerocopy 0.6.4", ] [[package]] @@ -2377,7 +2377,7 @@ dependencies = [ "thiserror", "time", "tokio", - "zerocopy", + "zerocopy 0.7.5", ] [[package]] @@ -3137,7 +3137,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "20707b61725734c595e840fb3704378a0cd2b9c74cc9e6e20724838fc6a1e2f9" dependencies = [ "byteorder", - "zerocopy-derive", + "zerocopy-derive 0.6.4", +] + +[[package]] +name = "zerocopy" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "870cdd4b8b867698aea998d95bcc06c1d75fe566267781ee6f5ae8c9c45a3930" +dependencies = [ + "byteorder", + "zerocopy-derive 0.7.5", ] [[package]] @@ -3151,6 +3161,17 @@ dependencies = [ "syn 2.0.37", ] +[[package]] +name = "zerocopy-derive" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e9c6f95fa5657518b36c6784ba7cdd89e8bdf9a16e58266085248bfb950860c5" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.37", +] + [[package]] name = "zeroize" version = "1.3.0" diff --git a/Cargo.toml b/Cargo.toml index 0d2b83c..5f7231f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,14 +13,19 @@ dotenv = "0.15.0" env_logger = "0.10.0" lazy_static = { version = "1.4.0", optional = true } librespot = { version = "0.4.2", default-features = false } -log = "0.4.17" -reqwest = "0.11.18" +log = "0.4.20" +reqwest = "0.11.20" samplerate = "0.2.4" -serde = "1.0.163" -serde_json = "1.0.96" -serenity = { version = "0.11.5", features = ["framework", "cache", "standard_framework"], default-features = false } +serde = "1.0.188" +serde_json = "1.0.107" +serenity = { version = "0.11.6", features = ["framework", "cache", "standard_framework"], default-features = false } songbird = "0.3.2" -thiserror = "1.0.40" -time = "0.3.21" -tokio = { version = "1.28.1", features = ["rt", "full"] } -zerocopy = "0.6.1" +thiserror = "1.0.48" +time = "0.3.28" +tokio = { version = "1.32.0", features = ["rt", "full"] } +zerocopy = "0.7.5" + +[profile.release] +opt-level = 3 +lto = true +debug = true \ No newline at end of file diff --git a/src/audio/mod.rs b/src/audio/mod.rs index dd06c1c..d0cdd40 100644 --- a/src/audio/mod.rs +++ b/src/audio/mod.rs @@ -1,3 +1,7 @@ +pub mod stream; + +use self::stream::Stream; + use librespot::playback::audio_backend::{Sink, SinkAsBytes, SinkError, SinkResult}; use librespot::playback::convert::Converter; use librespot::playback::decoder::AudioPacket; @@ -5,8 +9,6 @@ use log::error; use std::io::Write; use tokio::sync::mpsc::UnboundedSender; -use crate::player::stream::Stream; - pub enum SinkEvent { Start, Stop, @@ -26,6 +28,8 @@ impl StreamSink { impl Sink for StreamSink { fn start(&mut self) -> SinkResult<()> { if let Err(why) = self.sender.send(SinkEvent::Start) { + // WARNING: Returning an error causes librespot-playback to exit the process with status 1 + error!("Failed to send start playback event: {why}"); return Err(SinkError::ConnectionRefused(why.to_string())); } @@ -35,10 +39,14 @@ impl Sink for StreamSink { fn stop(&mut self) -> SinkResult<()> { if let Err(why) = self.sender.send(SinkEvent::Stop) { + // WARNING: Returning an error causes librespot-playback to exit the process with status 1 + error!("Failed to send start playback event: {why}"); return Err(SinkError::ConnectionRefused(why.to_string())); } + self.stream.flush().ok(); + Ok(()) } diff --git a/src/player/stream.rs b/src/audio/stream.rs similarity index 70% rename from src/player/stream.rs rename to src/audio/stream.rs index 8b3972d..1f38215 100644 --- a/src/player/stream.rs +++ b/src/audio/stream.rs @@ -5,8 +5,10 @@ use std::{ use songbird::input::reader::MediaSource; -// TODO: Find optimal value -const MAX_SIZE: usize = 1024 * 1024; +/// The lower the value, the less latency +/// +/// Too low of a value results in unpredictable audio +const MAX_SIZE: usize = 32 * 1024; #[derive(Clone)] pub struct Stream { @@ -26,15 +28,19 @@ impl Read for Stream { let (mutex, condvar) = &*self.inner; let mut buffer = mutex.lock().expect("Mutex was poisoned"); - log::trace!("Read!"); + // Prevent Discord jitter by filling buffer with zeroes if we don't have any audio + // (i.e. when you skip too far ahead in a song which hasn't been downloaded yet) + if buffer.is_empty() { + buf.fill(0); + condvar.notify_all(); - while buffer.is_empty() { - buffer = condvar.wait(buffer).expect("Mutex was poisoned"); + return Ok(buf.len()); } let max_read = usize::min(buf.len(), buffer.len()); buf[0..max_read].copy_from_slice(&buffer[0..max_read]); buffer.drain(0..max_read); + condvar.notify_all(); Ok(max_read) } @@ -56,6 +62,12 @@ impl Write for Stream { } fn flush(&mut self) -> std::io::Result<()> { + let (mutex, condvar) = &*self.inner; + let mut buffer = mutex.lock().expect("Mutex was poisoned"); + + buffer.clear(); + condvar.notify_all(); + Ok(()) } } diff --git a/src/bot/commands/core/version.rs b/src/bot/commands/core/version.rs index 141d89e..8292870 100644 --- a/src/bot/commands/core/version.rs +++ b/src/bot/commands/core/version.rs @@ -25,7 +25,7 @@ pub fn command(ctx: Context, command: ApplicationCommandInteraction) -> CommandO author .name("Maintained by: RoDaBaFilms") .url("https://rodabafilms.com/") - .icon_url("https://rodabafilms.com/logo_2021_nobg.png") + .icon_url("https://cdn.discordapp.com/avatars/389786424142200835/6bfe3840b0aa6a1baf432bb251b70c9f.webp?size=128") }) .description(format!("Current version: {}\n\nSpoticord is open source, check out [our GitHub](https://github.com/SpoticordMusic)", VERSION)) .color(Status::Info as u64) diff --git a/src/bot/commands/music/join.rs b/src/bot/commands/music/join.rs index 65aa10f..2816779 100644 --- a/src/bot/commands/music/join.rs +++ b/src/bot/commands/music/join.rs @@ -338,7 +338,7 @@ pub fn command(ctx: Context, command: ApplicationCommandInteraction) -> CommandO .title("Connected to voice channel") .icon_url("https://spoticord.com/speaker.png") .description(format!("Come listen along in <#{}>", channel_id)) - .footer("Spotify will automatically start playing on Spoticord") + .footer("You must manually go to Spotify and select your device") .status(Status::Info) .build(), ) diff --git a/src/main.rs b/src/main.rs index 0b4e320..330ab66 100644 --- a/src/main.rs +++ b/src/main.rs @@ -91,6 +91,7 @@ async fn main() { _ = tokio::signal::ctrl_c() => { info!("Received interrupt signal, shutting down..."); + session_manager.shutdown().await; shard_manager.lock().await.shutdown_all().await; break; @@ -110,6 +111,7 @@ async fn main() { }, if term.is_some() => { info!("Received terminate signal, shutting down..."); + session_manager.shutdown().await; shard_manager.lock().await.shutdown_all().await; break; diff --git a/src/player/mod.rs b/src/player/mod.rs index d78dcb5..faa5539 100644 --- a/src/player/mod.rs +++ b/src/player/mod.rs @@ -1,8 +1,9 @@ -pub mod stream; - use librespot::{ connect::spirc::Spirc, - core::{config::ConnectConfig, session::Session}, + core::{ + config::{ConnectConfig, SessionConfig}, + session::Session, + }, discovery::Credentials, playback::{ config::{Bitrate, PlayerConfig, VolumeCtrl}, @@ -13,13 +14,11 @@ use librespot::{ use tokio::sync::mpsc::UnboundedReceiver; use crate::{ - audio::{SinkEvent, StreamSink}, + audio::{stream::Stream, SinkEvent, StreamSink}, librespot_ext::discovery::CredentialsExt, utils, }; -use self::stream::Stream; - pub struct Player { stream: Stream, session: Option, @@ -59,7 +58,16 @@ impl Player { } // Connect the session - let (session, _) = Session::connect(Default::default(), credentials, None, false).await?; + let (session, _) = Session::connect( + SessionConfig { + ap_port: Some(9999), // Force the use of ap.spotify.com, which has the lowest latency + ..Default::default() + }, + credentials, + None, + false, + ) + .await?; self.session = Some(session.clone()); let mixer = (mixer::find(Some("softvol")).expect("to exist"))(MixerConfig { diff --git a/src/session/manager.rs b/src/session/manager.rs index acaef69..4639bc3 100644 --- a/src/session/manager.rs +++ b/src/session/manager.rs @@ -115,6 +115,10 @@ impl InnerSessionManager { count } + + pub fn sessions(&self) -> Vec { + self.sessions.values().cloned().collect() + } } impl SessionManager { @@ -183,4 +187,13 @@ impl SessionManager { pub async fn get_active_session_count(&self) -> usize { self.0.read().await.get_active_session_count().await } + + /// Tell all sessions to instantly shut down + pub async fn shutdown(&self) { + let sessions = self.0.read().await.sessions(); + + for session in sessions { + session.disconnect().await; + } + } } diff --git a/src/session/mod.rs b/src/session/mod.rs index 7dbe91c..92a2b6b 100644 --- a/src/session/mod.rs +++ b/src/session/mod.rs @@ -30,8 +30,13 @@ use songbird::{ tracks::TrackHandle, Call, Event, EventContext, EventHandler, }; -use std::{sync::Arc, time::Duration}; -use tokio::sync::Mutex; +use std::{ + io::Write, + ops::{Deref, DerefMut}, + sync::Arc, + time::Duration, +}; +use tokio::sync::{Mutex, RwLockReadGuard, RwLockWriteGuard}; #[derive(Clone)] pub struct SpoticordSession(Arc>); @@ -55,6 +60,8 @@ struct InnerSpoticordSession { spirc: Option, + player: Option, + /// Whether the session has been disconnected /// If this is true then this instance should no longer be used and dropped disconnected: bool, @@ -97,6 +104,7 @@ impl SpoticordSession { playback_info: None, disconnect_handle: None, spirc: None, + player: None, disconnected: false, }; @@ -132,14 +140,13 @@ impl SpoticordSession { .clone(); { - let mut inner = self.0.write().await; + let mut inner = self.acquire_write().await; inner.owner = Some(owner_id); } { - let inner = self.0.clone(); - let inner = inner.read().await; - session_manager.set_owner(owner_id, inner.guild_id).await; + let guild_id = self.acquire_read().await.guild_id; + session_manager.set_owner(owner_id, guild_id).await; } // Create the player @@ -150,28 +157,28 @@ impl SpoticordSession { /// Advance to the next track pub async fn next(&mut self) { - if let Some(ref spirc) = self.0.read().await.spirc { + if let Some(ref spirc) = self.acquire_read().await.spirc { spirc.next(); } } /// Rewind to the previous track pub async fn previous(&mut self) { - if let Some(ref spirc) = self.0.read().await.spirc { + if let Some(ref spirc) = self.acquire_read().await.spirc { spirc.prev(); } } /// Pause the current track pub async fn pause(&mut self) { - if let Some(ref spirc) = self.0.read().await.spirc { + if let Some(ref spirc) = self.acquire_read().await.spirc { spirc.pause(); } } /// Resume the current track pub async fn resume(&mut self) { - if let Some(ref spirc) = self.0.read().await.spirc { + if let Some(ref spirc) = self.acquire_read().await.spirc { spirc.play(); } } @@ -237,8 +244,7 @@ impl SpoticordSession { } }; - // Handle IPC packets - // This will automatically quit once the IPC connection is closed + // Handle events tokio::spawn({ let track = track_handle.clone(); let ctx = ctx.clone(); @@ -367,126 +373,16 @@ impl SpoticordSession { } SinkEvent::Stop => { - check_result(track.pause()); + // EXPERIMENT: It may be beneficial to *NOT* pause songbird here + // We already have a fallback if no audio is present in the buffer (write all zeroes aka silence) + // So commenting this out may help prevent a substantial portion of jitter + // This comes at a cost of more bandwidth, though opus should compress it down to almost nothing + + // check_result(track.pause()); } } } }; - - // match event { - // // Session connect error - // IpcPacket::ConnectError(why) => { - // error!("Failed to connect to Spotify: {:?}", why); - - // // Notify the user in the text channel - // if let Err(why) = instance - // .text_channel_id() - // .await - // .send_message(&instance.http().await, |message| { - // message.embed(|embed| { - // embed.title("Failed to connect to Spotify"); - // embed.description(why); - // embed.footer(|footer| footer.text("Please try again")); - // embed.color(Status::Error as u64); - - // embed - // }); - - // message - // }) - // .await - // { - // error!("Failed to send error message: {:?}", why); - // } - - // break; - // } - - // // Sink requests playback to start/resume - // IpcPacket::StartPlayback => { - // check_result(track.play()); - // } - - // // Sink requests playback to pause - // IpcPacket::StopPlayback => { - // check_result(track.pause()); - // } - - // // A new track has been set by the player - // IpcPacket::TrackChange(track) => { - // // Convert to SpotifyId - // let track_id = SpotifyId::from_uri(&track).expect("to be a valid uri"); - - // let instance = instance.clone(); - // let ctx = ctx.clone(); - - // // Fetch track info - // // This is done in a separate task to avoid blocking the IPC handler - // tokio::spawn(async move { - // if let Err(why) = instance.update_track(&ctx, &owner_id, track_id).await { - // error!("Failed to update track: {:?}", why); - - // instance.player_stopped().await; - // } - // }); - // } - - // // The player has started playing a track - // IpcPacket::Playing(track, position_ms, duration_ms) => { - // // Convert to SpotifyId - // let track_id = SpotifyId::from_uri(&track).expect("to be a valid uri"); - - // let was_none = instance - // .update_playback(duration_ms, position_ms, true) - // .await; - - // if was_none { - // // Stop player if update track fails - // if let Err(why) = instance.update_track(&ctx, &owner_id, track_id).await { - // error!("Failed to update track: {:?}", why); - - // instance.player_stopped().await; - // return; - // } - // } - // } - - // IpcPacket::Paused(track, position_ms, duration_ms) => { - // instance.start_disconnect_timer().await; - - // // Convert to SpotifyId - // let track_id = SpotifyId::from_uri(&track).expect("to be a valid uri"); - - // let was_none = instance - // .update_playback(duration_ms, position_ms, false) - // .await; - - // if was_none { - // // Stop player if update track fails - - // if let Err(why) = instance.update_track(&ctx, &owner_id, track_id).await { - // error!("Failed to update track: {:?}", why); - - // instance.player_stopped().await; - // return; - // } - // } - // } - - // IpcPacket::Stopped => { - // check_result(track.pause()); - - // { - // let mut inner = inner.write().await; - // inner.playback_info.take(); - // } - - // instance.start_disconnect_timer().await; - // } - - // // Ignore other packets - // _ => {} - // } } // Clean up session @@ -497,9 +393,10 @@ impl SpoticordSession { }); // Update inner client and track - let mut inner = self.0.write().await; + let mut inner = self.acquire_write().await; inner.track = Some(track_handle); inner.spirc = Some(spirc); + inner.player = Some(player); Ok(()) } @@ -566,7 +463,7 @@ impl SpoticordSession { } // Update track/episode - let mut inner = self.0.write().await; + let mut inner = self.acquire_write().await; if let Some(pbi) = inner.playback_info.as_mut() { pbi.update_track_episode(spotify_id, track, episode); @@ -577,7 +474,7 @@ impl SpoticordSession { /// Called when the player must stop, but not leave the call async fn player_stopped(&self) { - let mut inner = self.0.write().await; + let mut inner = self.acquire_write().await; if let Some(spirc) = inner.spirc.take() { spirc.shutdown(); @@ -617,7 +514,7 @@ impl SpoticordSession { // read lock to read the current owner. // This would deadlock if we have an active write lock { - let mut inner = self.0.write().await; + let mut inner = self.acquire_write().await; inner.disconnect_no_abort().await; } @@ -633,7 +530,7 @@ impl SpoticordSession { }; { - let mut inner = self.0.write().await; + let mut inner = self.acquire_write().await; if is_none { inner.playback_info = Some(PlaybackInfo::new(duration_ms, position_ms, playing)); @@ -659,8 +556,8 @@ impl SpoticordSession { async fn start_disconnect_timer(&self) { self.stop_disconnect_timer().await; - let inner_arc = self.0.clone(); - let mut inner = inner_arc.write().await; + let arc_handle = self.0.clone(); + let mut inner = self.acquire_write().await; // Check if we are already disconnected if inner.disconnected { @@ -668,7 +565,7 @@ impl SpoticordSession { } inner.disconnect_handle = Some(tokio::spawn({ - let inner = inner_arc.clone(); + let inner = arc_handle.clone(); let instance = self.clone(); async move { @@ -705,7 +602,7 @@ impl SpoticordSession { /// Stop the disconnect timer (if one is running) async fn stop_disconnect_timer(&self) { - let mut inner = self.0.write().await; + let mut inner = self.acquire_write().await; if let Some(handle) = inner.disconnect_handle.take() { handle.abort(); } @@ -714,7 +611,7 @@ impl SpoticordSession { /// Disconnect from the VC and send a message to the text channel pub async fn disconnect_with_message(&self, content: &str) { { - let mut inner = self.0.write().await; + let mut inner = self.acquire_write().await; // Firstly we disconnect inner.disconnect_no_abort().await; @@ -745,48 +642,97 @@ impl SpoticordSession { /// Get the owner pub async fn owner(&self) -> Option { - self.0.read().await.owner + self.acquire_read().await.owner } /// Get the session manager pub async fn session_manager(&self) -> SessionManager { - self.0.read().await.session_manager.clone() + self.acquire_read().await.session_manager.clone() } /// Get the guild id pub async fn guild_id(&self) -> GuildId { - self.0.read().await.guild_id + self.acquire_read().await.guild_id } /// Get the channel id pub async fn channel_id(&self) -> ChannelId { - self.0.read().await.channel_id + self.acquire_read().await.channel_id } /// Get the channel id #[allow(dead_code)] pub async fn text_channel_id(&self) -> ChannelId { - self.0.read().await.text_channel_id + self.acquire_read().await.text_channel_id } /// Get the playback info pub async fn playback_info(&self) -> Option { - self.0.read().await.playback_info.clone() + self.acquire_read().await.playback_info.clone() } pub async fn call(&self) -> Arc> { - self.0.read().await.call.clone() + self.acquire_read().await.call.clone() } #[allow(dead_code)] pub async fn http(&self) -> Arc { - self.0.read().await.http.clone() + self.acquire_read().await.http.clone() + } + + async fn acquire_read(&self) -> ReadLock { + ReadLock(self.0.read().await) + } + + async fn acquire_write(&self) -> WriteLock { + WriteLock(self.0.write().await) + } +} + +struct ReadLock<'a>(RwLockReadGuard<'a, InnerSpoticordSession>); + +impl<'a> Deref for ReadLock<'a> { + type Target = RwLockReadGuard<'a, InnerSpoticordSession>; + + #[inline] + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl<'a> DerefMut for ReadLock<'a> { + #[inline] + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +struct WriteLock<'a>(RwLockWriteGuard<'a, InnerSpoticordSession>); + +impl<'a> Deref for WriteLock<'a> { + type Target = RwLockWriteGuard<'a, InnerSpoticordSession>; + + #[inline] + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl<'a> DerefMut for WriteLock<'a> { + #[inline] + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 } } impl InnerSpoticordSession { /// Internal version of disconnect, which does not abort the disconnect timer async fn disconnect_no_abort(&mut self) { + // Flush stream so that it is not permanently blocking the thread + if let Some(player) = self.player.take() { + player.get_stream().flush().ok(); + } + self.disconnected = true; self .session_manager @@ -813,12 +759,6 @@ impl InnerSpoticordSession { } } -impl Drop for InnerSpoticordSession { - fn drop(&mut self) { - trace!("Dropping inner session"); - } -} - #[async_trait] impl EventHandler for SpoticordSession { async fn act(&self, ctx: &EventContext<'_>) -> Option {