From 1ffb07300febfe53d33b3c503cc62edc46de1c3d Mon Sep 17 00:00:00 2001 From: DaXcess Date: Mon, 18 Sep 2023 14:02:59 +0200 Subject: [PATCH] Hello there I'm back --- .github/workflows/build-push.yml | 2 +- COMPILING.md | 7 - Cargo.lock | 2 +- Cargo.toml | 6 +- Dockerfile.metrics | 23 -- README.md | 1 - src/audio/backend.rs | 88 ----- src/audio/mod.rs | 161 +++++++++- src/bot/commands/music/playing.rs | 21 +- src/bot/events.rs | 9 - src/main.rs | 56 ---- src/metrics.rs | 114 ------- src/player.rs | 328 ------------------- src/player/mod.rs | 99 ++++++ src/player/stream.rs | 76 +++++ src/session/manager.rs | 4 +- src/session/mod.rs | 516 ++++++++++++++++-------------- 17 files changed, 614 insertions(+), 899 deletions(-) delete mode 100644 Dockerfile.metrics delete mode 100644 src/audio/backend.rs delete mode 100644 src/metrics.rs delete mode 100644 src/player.rs create mode 100644 src/player/mod.rs create mode 100644 src/player/stream.rs diff --git a/.github/workflows/build-push.yml b/.github/workflows/build-push.yml index 507765c..b04fb13 100644 --- a/.github/workflows/build-push.yml +++ b/.github/workflows/build-push.yml @@ -28,7 +28,7 @@ jobs: uses: docker/build-push-action@v2 with: context: . - file: ./Dockerfile.metrics + file: ./Dockerfile tags: | ${{ secrets.REGISTRY_URL }}/spoticord/spoticord:latest push: ${{ github.ref == 'refs/heads/main' }} diff --git a/COMPILING.md b/COMPILING.md index cde8a47..db657bd 100644 --- a/COMPILING.md +++ b/COMPILING.md @@ -71,13 +71,6 @@ If you are actively developing Spoticord, you can use the following command to b cargo run ``` -# Features -As of now, Spoticord has one optional feature: `metrics`. This feature enables pushing metrics about the bot, like how many servers it is in, which tracks are being played and which commands are being executed. The metrics are designed to be pushed to a [Prometheus Pushgateway](https://prometheus.io/docs/instrumenting/pushing/). If you want to enable this feature, you can do so by running the following command: - -```sh -cargo build --release --features metrics -``` - # MSRV The current minimum supported rust version is `1.65.0`. diff --git a/Cargo.lock b/Cargo.lock index 2d6dab9..aff5e5c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2603,7 +2603,7 @@ dependencies = [ [[package]] name = "spoticord" -version = "2.0.0" +version = "2.1.0" dependencies = [ "dotenv", "env_logger 0.10.0", diff --git a/Cargo.toml b/Cargo.toml index ce04333..deb901b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "spoticord" -version = "2.0.0" +version = "2.1.0" edition = "2021" rust-version = "1.65.0" @@ -8,10 +8,6 @@ rust-version = "1.65.0" name = "spoticord" path = "src/main.rs" -[features] -default = [] -metrics = ["lazy_static", "prometheus"] - [dependencies] dotenv = "0.15.0" env_logger = "0.10.0" diff --git a/Dockerfile.metrics b/Dockerfile.metrics deleted file mode 100644 index 9d7dc42..0000000 --- a/Dockerfile.metrics +++ /dev/null @@ -1,23 +0,0 @@ -# Builder -FROM rust:1.65-buster as builder - -WORKDIR /app - -# Add extra build dependencies here -RUN apt-get update && apt-get install -y cmake - -COPY . . -RUN cargo install --path . --features metrics - -# Runtime -FROM debian:buster-slim - -WORKDIR /app - -# Add extra runtime dependencies here -RUN apt-get update && apt-get install -y openssl ca-certificates && rm -rf /var/lib/apt/lists/* - -# Copy spoticord binary from builder -COPY --from=builder /usr/local/cargo/bin/spoticord ./spoticord - -CMD ["./spoticord"] \ No newline at end of file diff --git a/README.md b/README.md index 5965bc1..673c2a5 100644 --- a/README.md +++ b/README.md @@ -16,7 +16,6 @@ Spoticord uses environment variables to configure itself. The following variable Additionally you can configure the following variables: - `GUILD_ID`: The ID of the Discord server where this bot will create commands for. This is used during testing to prevent the bot from creating slash commands in other servers, as well as getting the commands quicker. This variable is optional, and if not set, the bot will create commands in all servers it is in (this may take up to 15 minutes). -- `METRICS_URL`: The connection URL of a Prometheus Push Gateway server used for pushing metrics. This variable is required when compiling with the `metrics` feature. #### Providing environment variables You can provide environment variables in a `.env` file at the root of the working directory of Spoticord. diff --git a/src/audio/backend.rs b/src/audio/backend.rs deleted file mode 100644 index 3241ae7..0000000 --- a/src/audio/backend.rs +++ /dev/null @@ -1,88 +0,0 @@ -use librespot::playback::audio_backend::{Sink, SinkAsBytes, SinkError, SinkResult}; -use librespot::playback::convert::Converter; -use librespot::playback::decoder::AudioPacket; -use log::error; -use std::io::{Stdout, Write}; - -use crate::ipc; -use crate::ipc::packet::IpcPacket; - -pub struct StdoutSink { - client: ipc::Client, - output: Option>, -} - -impl StdoutSink { - pub fn new(client: ipc::Client) -> Self { - StdoutSink { - client, - output: None, - } - } -} - -impl Sink for StdoutSink { - fn start(&mut self) -> SinkResult<()> { - if let Err(why) = self.client.send(IpcPacket::StartPlayback) { - error!("Failed to send start playback packet: {}", why); - return Err(SinkError::ConnectionRefused(why.to_string())); - } - - self.output.get_or_insert(Box::new(std::io::stdout())); - - Ok(()) - } - - fn stop(&mut self) -> SinkResult<()> { - if let Err(why) = self.client.send(IpcPacket::StopPlayback) { - error!("Failed to send stop playback packet: {}", why); - return Err(SinkError::ConnectionRefused(why.to_string())); - } - - self - .output - .take() - .ok_or_else(|| SinkError::NotConnected("StdoutSink is not connected".to_string()))? - .flush() - .map_err(|why| SinkError::OnWrite(why.to_string()))?; - - Ok(()) - } - - fn write(&mut self, packet: AudioPacket, converter: &mut Converter) -> SinkResult<()> { - use zerocopy::AsBytes; - - if let AudioPacket::Samples(samples) = packet { - let samples_f32: &[f32] = &converter.f64_to_f32(&samples); - - let resampled = samplerate::convert( - 44100, - 48000, - 2, - samplerate::ConverterType::Linear, - samples_f32, - ) - .expect("to succeed"); - - let samples_i16 = - &converter.f64_to_s16(&resampled.iter().map(|v| *v as f64).collect::>()); - - self.write_bytes(samples_i16.as_bytes())?; - } - - Ok(()) - } -} - -impl SinkAsBytes for StdoutSink { - fn write_bytes(&mut self, data: &[u8]) -> SinkResult<()> { - self - .output - .as_deref_mut() - .ok_or_else(|| SinkError::NotConnected("StdoutSink is not connected".to_string()))? - .write_all(data) - .map_err(|why| SinkError::OnWrite(why.to_string()))?; - - Ok(()) - } -} diff --git a/src/audio/mod.rs b/src/audio/mod.rs index fceb141..0286273 100644 --- a/src/audio/mod.rs +++ b/src/audio/mod.rs @@ -1 +1,160 @@ -pub mod backend; +use librespot::playback::audio_backend::{Sink, SinkAsBytes, SinkError, SinkResult}; +use librespot::playback::convert::Converter; +use librespot::playback::decoder::AudioPacket; +use log::error; +use std::io::{Stdout, Write}; +use tokio::sync::mpsc::UnboundedSender; + +use crate::ipc; +use crate::ipc::packet::IpcPacket; +use crate::player::stream::Stream; + +pub struct StdoutSink { + client: ipc::Client, + output: Option>, +} + +impl StdoutSink { + pub fn new(client: ipc::Client) -> Self { + StdoutSink { + client, + output: None, + } + } +} + +impl Sink for StdoutSink { + fn start(&mut self) -> SinkResult<()> { + if let Err(why) = self.client.send(IpcPacket::StartPlayback) { + error!("Failed to send start playback packet: {}", why); + return Err(SinkError::ConnectionRefused(why.to_string())); + } + + self.output.get_or_insert(Box::new(std::io::stdout())); + + Ok(()) + } + + fn stop(&mut self) -> SinkResult<()> { + if let Err(why) = self.client.send(IpcPacket::StopPlayback) { + error!("Failed to send stop playback packet: {}", why); + return Err(SinkError::ConnectionRefused(why.to_string())); + } + + self + .output + .take() + .ok_or_else(|| SinkError::NotConnected("StdoutSink is not connected".to_string()))? + .flush() + .map_err(|why| SinkError::OnWrite(why.to_string()))?; + + Ok(()) + } + + fn write(&mut self, packet: AudioPacket, converter: &mut Converter) -> SinkResult<()> { + use zerocopy::AsBytes; + + if let AudioPacket::Samples(samples) = packet { + let samples_f32: &[f32] = &converter.f64_to_f32(&samples); + + let resampled = samplerate::convert( + 44100, + 48000, + 2, + samplerate::ConverterType::Linear, + samples_f32, + ) + .expect("to succeed"); + + let samples_i16 = + &converter.f64_to_s16(&resampled.iter().map(|v| *v as f64).collect::>()); + + self.write_bytes(samples_i16.as_bytes())?; + } + + Ok(()) + } +} + +impl SinkAsBytes for StdoutSink { + fn write_bytes(&mut self, data: &[u8]) -> SinkResult<()> { + self + .output + .as_deref_mut() + .ok_or_else(|| SinkError::NotConnected("StdoutSink is not connected".to_string()))? + .write_all(data) + .map_err(|why| SinkError::OnWrite(why.to_string()))?; + + Ok(()) + } +} + +pub enum SinkEvent { + Start, + Stop, +} + +pub struct StreamSink { + stream: Stream, + sender: UnboundedSender, +} + +impl StreamSink { + pub fn new(stream: Stream, sender: UnboundedSender) -> Self { + Self { stream, sender } + } +} + +impl Sink for StreamSink { + fn start(&mut self) -> SinkResult<()> { + if let Err(why) = self.sender.send(SinkEvent::Start) { + error!("Failed to send start playback event: {why}"); + return Err(SinkError::ConnectionRefused(why.to_string())); + } + + Ok(()) + } + + fn stop(&mut self) -> SinkResult<()> { + if let Err(why) = self.sender.send(SinkEvent::Stop) { + error!("Failed to send start playback event: {why}"); + return Err(SinkError::ConnectionRefused(why.to_string())); + } + + Ok(()) + } + + fn write(&mut self, packet: AudioPacket, converter: &mut Converter) -> SinkResult<()> { + use zerocopy::AsBytes; + + let AudioPacket::Samples(samples) = packet else { return Ok(()); }; + let samples_f32: &[f32] = &converter.f64_to_f32(&samples); + + let resampled = samplerate::convert( + 44100, + 48000, + 2, + samplerate::ConverterType::Linear, + samples_f32, + ) + .expect("to succeed"); + + let samples_i16 = + &converter.f64_to_s16(&resampled.iter().map(|v| *v as f64).collect::>()); + + self.write_bytes(samples_i16.as_bytes())?; + + Ok(()) + } +} + +impl SinkAsBytes for StreamSink { + fn write_bytes(&mut self, data: &[u8]) -> SinkResult<()> { + self + .stream + .write_all(data) + .map_err(|why| SinkError::OnWrite(why.to_string()))?; + + Ok(()) + } +} diff --git a/src/bot/commands/music/playing.rs b/src/bot/commands/music/playing.rs index 27d6d26..f022575 100644 --- a/src/bot/commands/music/playing.rs +++ b/src/bot/commands/music/playing.rs @@ -280,35 +280,24 @@ pub fn component(ctx: Context, mut interaction: MessageComponentInteraction) -> }; // Send the desired command to the session - let success = match interaction.data.custom_id.as_str() { + match interaction.data.custom_id.as_str() { "playing::btn_pause_play" => { if pbi.is_playing { - session.pause().await.is_ok() + session.pause().await } else { - session.resume().await.is_ok() + session.resume().await } } - "playing::btn_previous_track" => session.previous().await.is_ok(), + "playing::btn_previous_track" => session.previous().await, - "playing::btn_next_track" => session.next().await.is_ok(), + "playing::btn_next_track" => session.next().await, _ => { error!("Unknown custom_id: {}", interaction.data.custom_id); - false } }; - if !success { - error_message( - "Cannot change playback state", - "An error occurred while trying to change the playback state", - ) - .await; - - return; - } - interaction.defer(&ctx.http).await.ok(); tokio::time::sleep(Duration::from_millis( if interaction.data.custom_id == "playing::btn_pause_play" { diff --git a/src/bot/events.rs b/src/bot/events.rs index 0fd1a77..3668150 100644 --- a/src/bot/events.rs +++ b/src/bot/events.rs @@ -15,9 +15,6 @@ use serenity::{ prelude::{Context, EventHandler}, }; -#[cfg(feature = "metrics")] -use crate::metrics::MetricsManager; - // If the GUILD_ID environment variable is set, only allow commands from that guild macro_rules! enforce_guild { ($interaction:ident) => { @@ -99,12 +96,6 @@ impl Handler { let data = ctx.data.read().await; let command_manager = data.get::().expect("to contain a value"); - #[cfg(feature = "metrics")] - { - let metrics = data.get::().expect("to contain a value"); - metrics.command_exec(&command.data.name); - } - command_manager.execute_command(&ctx, command).await; } diff --git a/src/main.rs b/src/main.rs index 1d1f17e..90ff3dd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,15 +6,9 @@ use serenity::{framework::StandardFramework, prelude::GatewayIntents, Client}; use songbird::SerenityInit; use std::{any::Any, env, process::exit}; -#[cfg(feature = "metrics")] -use metrics::MetricsManager; - #[cfg(unix)] use tokio::signal::unix::SignalKind; -#[cfg(feature = "metrics")] -mod metrics; - mod audio; mod bot; mod consts; @@ -41,20 +35,6 @@ async fn main() { env_logger::init(); - let args: Vec = env::args().collect(); - - if args.len() > 2 && &args[1] == "--player" { - // Woah! We're running in player mode! - - debug!("Starting Spoticord player"); - - player::main().await; - - debug!("Player exited, shutting down"); - - return; - } - info!("It's a good day"); info!(" - Spoticord {}", time::OffsetDateTime::now_utc().year()); @@ -72,12 +52,6 @@ async fn main() { let token = env::var("DISCORD_TOKEN").expect("a token in the environment"); let db_url = env::var("DATABASE_URL").expect("a database URL in the environment"); - #[cfg(feature = "metrics")] - let metrics_manager = { - let metrics_url = env::var("METRICS_URL").expect("a prometheus pusher URL in the environment"); - MetricsManager::new(metrics_url) - }; - let session_manager = SessionManager::new(); // Create client @@ -97,9 +71,6 @@ async fn main() { data.insert::(Database::new(db_url, None)); data.insert::(CommandManager::new()); data.insert::(session_manager.clone()); - - #[cfg(feature = "metrics")] - data.insert::(metrics_manager.clone()); } let shard_manager = client.shard_manager.clone(); @@ -118,30 +89,6 @@ async fn main() { tokio::spawn(async move { loop { tokio::select! { - _ = tokio::time::sleep(std::time::Duration::from_secs(60)) => { - #[cfg(feature = "metrics")] - { - let guild_count = _cache.guilds().len(); - let active_count = session_manager.get_active_session_count().await; - let total_count = session_manager.get_session_count().await; - - metrics_manager.set_server_count(guild_count); - metrics_manager.set_active_sessions(active_count); - metrics_manager.set_total_sessions(total_count); - - // Yes, I like to handle my s's when I'm working with amounts - debug!( - "Updated metrics: {} guild{}, {} active session{}, {} total session{}", - guild_count, - if guild_count == 1 { "" } else { "s" }, - active_count, - if active_count == 1 { "" } else { "s" }, - total_count, - if total_count == 1 { "" } else { "s" } - ); - } - } - _ = tokio::signal::ctrl_c() => { info!("Received interrupt signal, shutting down..."); @@ -166,9 +113,6 @@ async fn main() { shard_manager.lock().await.shutdown_all().await; - #[cfg(feature = "metrics")] - metrics_manager.stop(); - break; } } diff --git a/src/metrics.rs b/src/metrics.rs deleted file mode 100644 index 21a3e0b..0000000 --- a/src/metrics.rs +++ /dev/null @@ -1,114 +0,0 @@ -use std::{ - collections::hash_map::RandomState, - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, - }, - thread, - time::Duration, -}; - -use lazy_static::lazy_static; -use prometheus::{ - opts, push_metrics, register_int_counter_vec, register_int_gauge, IntCounterVec, IntGauge, -}; -use serenity::prelude::TypeMapKey; - -use crate::session::pbi::PlaybackInfo; - -lazy_static! { - static ref TOTAL_SERVERS: IntGauge = - register_int_gauge!("total_servers", "Total number of servers Spoticord is in").unwrap(); - static ref ACTIVE_SESSIONS: IntGauge = register_int_gauge!( - "active_sessions", - "Total number of servers with an active Spoticord session" - ) - .unwrap(); - static ref TOTAL_SESSIONS: IntGauge = register_int_gauge!( - "total_sessions", - "Total number of servers with Spoticord in a voice channel" - ) - .unwrap(); - static ref TRACKS_PLAYED: IntCounterVec = - register_int_counter_vec!(opts!("tracks_played", "Tracks Played"), &["type"]).unwrap(); - static ref COMMANDS_EXECUTED: IntCounterVec = register_int_counter_vec!( - opts!("commands_executed", "Commands Executed"), - &["command"] - ) - .unwrap(); -} - -#[derive(Clone)] -pub struct MetricsManager { - should_stop: Arc, -} - -impl MetricsManager { - pub fn new(pusher_url: impl Into) -> Self { - let instance = Self { - should_stop: Arc::new(AtomicBool::new(false)), - }; - - thread::spawn({ - let instance = instance.clone(); - let pusher_url = pusher_url.into(); - - move || loop { - thread::sleep(Duration::from_secs(5)); - - if instance.should_stop() { - break; - } - - if let Err(why) = push_metrics::( - "spoticord_metrics", - Default::default(), - &pusher_url, - prometheus::gather(), - None, - ) { - log::error!("Failed to push metrics: {}", why); - } - } - }); - - instance - } - - pub fn should_stop(&self) -> bool { - self.should_stop.load(Ordering::Relaxed) - } - - pub fn stop(&self) { - self.should_stop.store(true, Ordering::Relaxed); - } - - pub fn set_server_count(&self, count: usize) { - TOTAL_SERVERS.set(count as i64); - } - - pub fn set_total_sessions(&self, count: usize) { - TOTAL_SESSIONS.set(count as i64); - } - - pub fn set_active_sessions(&self, count: usize) { - ACTIVE_SESSIONS.set(count as i64); - } - - pub fn track_play(&self, track: &PlaybackInfo) { - let track_type = match track.get_type() { - Some(track_type) => track_type, - None => return, - }; - - TRACKS_PLAYED.with_label_values(&[&track_type]).inc(); - } - - pub fn command_exec(&self, command: &str) { - COMMANDS_EXECUTED.with_label_values(&[command]).inc(); - } -} - -impl TypeMapKey for MetricsManager { - type Value = MetricsManager; -} diff --git a/src/player.rs b/src/player.rs deleted file mode 100644 index 045d55e..0000000 --- a/src/player.rs +++ /dev/null @@ -1,328 +0,0 @@ -use std::{process::exit, time::Duration}; - -use ipc_channel::ipc::{IpcError, TryRecvError}; -use librespot::{ - connect::spirc::Spirc, - core::{ - config::{ConnectConfig, SessionConfig}, - session::Session, - }, - discovery::Credentials, - playback::{ - config::{Bitrate, PlayerConfig}, - mixer::{self, MixerConfig}, - player::{Player, PlayerEvent}, - }, -}; -use log::{debug, error, warn}; -use serde_json::json; - -use crate::{ - audio::backend::StdoutSink, - ipc::{self, packet::IpcPacket}, - librespot_ext::discovery::CredentialsExt, - utils, -}; - -pub struct SpoticordPlayer { - client: ipc::Client, - session: Option, - spirc: Option, -} - -impl SpoticordPlayer { - pub fn new(client: ipc::Client) -> Self { - Self { - client, - session: None, - spirc: None, - } - } - - pub async fn start(&mut self, token: impl Into, device_name: impl Into) { - let token = token.into(); - - // Get the username (required for librespot) - let username = utils::spotify::get_username(&token) - .await - .expect("to get the username"); - - let session_config = SessionConfig::default(); - let player_config = PlayerConfig { - bitrate: Bitrate::Bitrate96, - ..PlayerConfig::default() - }; - - // Log in using the token - let credentials = Credentials::with_token(username, &token); - - // Shutdown old session (cannot be done in the stop function) - if let Some(session) = self.session.take() { - session.shutdown(); - } - - // Connect the session - let (session, _) = match Session::connect(session_config, credentials, None, false).await { - Ok((session, credentials)) => (session, credentials), - Err(why) => { - error!("Failed to create Spotify session: {}", why); - - self - .client - .send(IpcPacket::ConnectError(why.to_string())) - .ok(); - return; - } - }; - - // Store session for later use - self.session = Some(session.clone()); - - // Volume mixer - let mixer = (mixer::find(Some("softvol")).expect("to exist"))(MixerConfig { - volume_ctrl: librespot::playback::config::VolumeCtrl::Linear, - ..MixerConfig::default() - }); - - let client = self.client.clone(); - - // Create the player - let (player, mut receiver) = Player::new( - player_config, - session.clone(), - mixer.get_soft_volume(), - move || Box::new(StdoutSink::new(client)), - ); - - let (spirc, spirc_task) = Spirc::new( - ConnectConfig { - name: device_name.into(), - // 50% - initial_volume: Some(65535 / 2), - ..ConnectConfig::default() - }, - session.clone(), - player, - mixer, - ); - - let device_id = session.device_id().to_owned(); - let ipc = self.client.clone(); - - // IPC Handler - tokio::spawn(async move { - let client = reqwest::Client::new(); - - let mut retries = 10; - - // Try to switch to the device - loop { - match client - .put("https://api.spotify.com/v1/me/player") - .bearer_auth(token.clone()) - .json(&json!({ - "device_ids": [device_id], - })) - .send() - .await - { - Ok(resp) => { - if resp.status() == 202 { - debug!("Successfully switched to device"); - break; - } - - retries -= 1; - - if retries == 0 { - error!("Failed to switch to device"); - ipc - .send(IpcPacket::ConnectError( - "Switch to Spoticord device timed out".to_string(), - )) - .ok(); - break; - } - } - Err(why) => { - error!("Failed to set device: {}", why); - ipc.send(IpcPacket::ConnectError(why.to_string())).ok(); - break; - } - } - - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - } - - // Do IPC stuff with these events - loop { - let event = match receiver.recv().await { - Some(event) => event, - None => break, - }; - - match event { - PlayerEvent::Playing { - play_request_id: _, - track_id, - position_ms, - duration_ms, - } => { - if let Err(why) = ipc.send(IpcPacket::Playing( - track_id.to_uri().expect("to not fail"), - position_ms, - duration_ms, - )) { - error!("Failed to send playing packet: {}", why); - } - } - - PlayerEvent::Paused { - play_request_id: _, - track_id, - position_ms, - duration_ms, - } => { - if let Err(why) = ipc.send(IpcPacket::Paused( - track_id.to_uri().expect("to not fail"), - position_ms, - duration_ms, - )) { - error!("Failed to send paused packet: {}", why); - } - } - - PlayerEvent::Changed { - old_track_id: _, - new_track_id, - } => { - if let Err(why) = ipc.send(IpcPacket::TrackChange( - new_track_id.to_uri().expect("to not fail"), - )) { - error!("Failed to send track change packet: {}", why); - } - } - - PlayerEvent::Stopped { - play_request_id: _, - track_id: _, - } => { - if let Err(why) = ipc.send(IpcPacket::Stopped) { - error!("Failed to send player stopped packet: {}", why); - } - } - - _ => {} - }; - } - - debug!("Player stopped"); - }); - - self.spirc = Some(spirc); - tokio::spawn(spirc_task); - } - - pub fn next(&mut self) { - if let Some(spirc) = &self.spirc { - spirc.next(); - } - } - - pub fn previous(&mut self) { - if let Some(spirc) = &self.spirc { - spirc.prev(); - } - } - - pub fn pause(&mut self) { - if let Some(spirc) = &self.spirc { - spirc.pause(); - } - } - - pub fn resume(&mut self) { - if let Some(spirc) = &self.spirc { - spirc.play(); - } - } - - pub fn stop(&mut self) { - if let Some(spirc) = self.spirc.take() { - spirc.shutdown(); - } - } -} - -pub async fn main() { - let args = std::env::args().collect::>(); - - let tx_name = args[2].clone(); - let rx_name = args[3].clone(); - - // Create IPC communication channel - let client = ipc::Client::connect(tx_name, rx_name).expect("Failed to connect to IPC"); - - // Create the player - let mut player = SpoticordPlayer::new(client.clone()); - - loop { - let message = match client.try_recv() { - Ok(message) => message, - Err(why) => { - if let TryRecvError::Empty = why { - // No message, wait a bit and try again - tokio::time::sleep(Duration::from_millis(25)).await; - - continue; - } else if let TryRecvError::IpcError(IpcError::Disconnected) = &why { - debug!("IPC connection closed, goodbye"); - break; - } - - error!("Failed to receive message: {}", why); - break; - } - }; - - match message { - IpcPacket::Connect(token, device_name) => { - debug!("Connecting to Spotify with device name {}", device_name); - - player.start(token, device_name).await; - } - - IpcPacket::Disconnect => { - debug!("Disconnecting from Spotify"); - - player.stop(); - } - - IpcPacket::Next => { - player.next(); - } - - IpcPacket::Previous => { - player.previous(); - } - - IpcPacket::Pause => { - player.pause(); - } - - IpcPacket::Resume => { - player.resume(); - } - - IpcPacket::Quit => { - debug!("Received quit packet, exiting"); - - exit(0); - } - - _ => { - warn!("Received unknown packet: {:?}", message); - } - } - } -} diff --git a/src/player/mod.rs b/src/player/mod.rs new file mode 100644 index 0000000..d78dcb5 --- /dev/null +++ b/src/player/mod.rs @@ -0,0 +1,99 @@ +pub mod stream; + +use librespot::{ + connect::spirc::Spirc, + core::{config::ConnectConfig, session::Session}, + discovery::Credentials, + playback::{ + config::{Bitrate, PlayerConfig, VolumeCtrl}, + mixer::{self, MixerConfig}, + player::{Player as SpotifyPlayer, PlayerEvent}, + }, +}; +use tokio::sync::mpsc::UnboundedReceiver; + +use crate::{ + audio::{SinkEvent, StreamSink}, + librespot_ext::discovery::CredentialsExt, + utils, +}; + +use self::stream::Stream; + +pub struct Player { + stream: Stream, + session: Option, +} + +impl Player { + pub fn create() -> Self { + Self { + stream: Stream::new(), + session: None, + } + } + + pub async fn start( + &mut self, + token: &str, + device_name: &str, + ) -> Result< + ( + Spirc, + (UnboundedReceiver, UnboundedReceiver), + ), + Box, + > { + let username = utils::spotify::get_username(token).await?; + + let player_config = PlayerConfig { + bitrate: Bitrate::Bitrate96, + ..Default::default() + }; + + let credentials = Credentials::with_token(username, token); + + // Shutdown old session (cannot be done in the stop function) + if let Some(session) = self.session.take() { + session.shutdown() + } + + // Connect the session + let (session, _) = Session::connect(Default::default(), credentials, None, false).await?; + self.session = Some(session.clone()); + + let mixer = (mixer::find(Some("softvol")).expect("to exist"))(MixerConfig { + volume_ctrl: VolumeCtrl::Linear, + ..Default::default() + }); + + let stream = self.get_stream(); + let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); + let (player, receiver) = SpotifyPlayer::new( + player_config, + session.clone(), + mixer.get_soft_volume(), + move || Box::new(StreamSink::new(stream, tx)), + ); + + let (spirc, spirc_task) = Spirc::new( + ConnectConfig { + name: device_name.into(), + // 50% + initial_volume: Some(65535 / 2), + ..Default::default() + }, + session.clone(), + player, + mixer, + ); + + tokio::spawn(spirc_task); + + Ok((spirc, (receiver, rx))) + } + + pub fn get_stream(&self) -> Stream { + self.stream.clone() + } +} diff --git a/src/player/stream.rs b/src/player/stream.rs new file mode 100644 index 0000000..e82e36a --- /dev/null +++ b/src/player/stream.rs @@ -0,0 +1,76 @@ +use std::{ + io::{Read, Seek, Write}, + sync::{Arc, Condvar, Mutex}, +}; + +use songbird::input::reader::MediaSource; + +const MAX_SIZE: usize = 1 * 1024 * 1024; + +#[derive(Clone)] +pub struct Stream { + inner: Arc<(Mutex>, Condvar)>, +} + +impl Stream { + pub fn new() -> Self { + Self { + inner: Arc::new((Mutex::new(Vec::new()), Condvar::new())), + } + } +} + +impl Read for Stream { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + let (mutex, condvar) = &*self.inner; + let mut buffer = mutex.lock().expect("Mutex was poisoned"); + + log::trace!("Read!"); + + while buffer.is_empty() { + buffer = condvar.wait(buffer).expect("Mutex was poisoned"); + } + + let max_read = usize::min(buf.len(), buffer.len()); + buf[0..max_read].copy_from_slice(&buffer[0..max_read]); + buffer.drain(0..max_read); + + Ok(max_read) + } +} + +impl Write for Stream { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + let (mutex, condvar) = &*self.inner; + let mut buffer = mutex.lock().expect("Mutex was poisoned"); + + while buffer.len() + buf.len() > MAX_SIZE { + buffer = condvar.wait(buffer).unwrap(); + } + + buffer.extend_from_slice(buf); + condvar.notify_all(); + + Ok(buf.len()) + } + + fn flush(&mut self) -> std::io::Result<()> { + Ok(()) + } +} + +impl Seek for Stream { + fn seek(&mut self, _: std::io::SeekFrom) -> std::io::Result { + Ok(0) + } +} + +impl MediaSource for Stream { + fn byte_len(&self) -> Option { + None + } + + fn is_seekable(&self) -> bool { + false + } +} diff --git a/src/session/manager.rs b/src/session/manager.rs index 222a4d3..acaef69 100644 --- a/src/session/manager.rs +++ b/src/session/manager.rs @@ -25,8 +25,8 @@ pub enum SessionCreateError { #[error("Failed to join voice channel {0} ({1})")] JoinError(ChannelId, GuildId), - #[error("Failed to start player process")] - ForkError, + #[error("Failed to start the player")] + PlayerStartError, } #[derive(Clone)] diff --git a/src/session/mod.rs b/src/session/mod.rs index f4be02c..9210903 100644 --- a/src/session/mod.rs +++ b/src/session/mod.rs @@ -6,13 +6,17 @@ use self::{ pbi::PlaybackInfo, }; use crate::{ + audio::SinkEvent, consts::DISCONNECT_TIME, database::{Database, DatabaseError}, - ipc::{self, packet::IpcPacket, Client}, + player::Player, utils::{embed::Status, spotify}, }; -use ipc_channel::ipc::{IpcError, TryRecvError}; -use librespot::core::spotify_id::{SpotifyAudioType, SpotifyId}; +use librespot::{ + connect::spirc::Spirc, + core::spotify_id::{SpotifyAudioType, SpotifyId}, + playback::player::PlayerEvent, +}; use log::*; use serenity::{ async_trait, @@ -22,20 +26,13 @@ use serenity::{ }; use songbird::{ create_player, - input::{children_to_reader, Codec, Container, Input}, + input::{Codec, Container, Input, Reader}, tracks::TrackHandle, Call, Event, EventContext, EventHandler, }; -use std::{ - process::{Command, Stdio}, - sync::Arc, - time::Duration, -}; +use std::{sync::Arc, time::Duration}; use tokio::sync::Mutex; -#[cfg(feature = "metrics")] -use crate::metrics::MetricsManager; - #[derive(Clone)] pub struct SpoticordSession(Arc>); @@ -56,14 +53,11 @@ struct InnerSpoticordSession { disconnect_handle: Option>, - client: Option, + spirc: Option, /// Whether the session has been disconnected /// If this is true then this instance should no longer be used and dropped disconnected: bool, - - #[cfg(feature = "metrics")] - metrics: MetricsManager, } impl SpoticordSession { @@ -81,12 +75,6 @@ impl SpoticordSession { .expect("to contain a value") .clone(); - #[cfg(feature = "metrics")] - let metrics = data - .get::() - .expect("to contain a value") - .clone(); - // Join the voice channel let songbird = songbird::get(ctx).await.expect("to be present").clone(); @@ -108,15 +96,11 @@ impl SpoticordSession { track: None, playback_info: None, disconnect_handle: None, - client: None, + spirc: None, disconnected: false, - - #[cfg(feature = "metrics")] - metrics, }; let mut instance = Self(Arc::new(RwLock::new(inner))); - instance.create_player(ctx).await?; let mut call = call.lock().await; @@ -165,39 +149,31 @@ impl SpoticordSession { } /// Advance to the next track - pub async fn next(&mut self) -> Result<(), IpcError> { - if let Some(ref client) = self.0.read().await.client { - return client.send(IpcPacket::Next); + pub async fn next(&mut self) { + if let Some(ref spirc) = self.0.read().await.spirc { + spirc.next(); } - - Ok(()) } /// Rewind to the previous track - pub async fn previous(&mut self) -> Result<(), IpcError> { - if let Some(ref client) = self.0.read().await.client { - return client.send(IpcPacket::Previous); + pub async fn previous(&mut self) { + if let Some(ref spirc) = self.0.read().await.spirc { + spirc.prev(); } - - Ok(()) } /// Pause the current track - pub async fn pause(&mut self) -> Result<(), IpcError> { - if let Some(ref client) = self.0.read().await.client { - return client.send(IpcPacket::Pause); + pub async fn pause(&mut self) { + if let Some(ref spirc) = self.0.read().await.spirc { + spirc.pause(); } - - Ok(()) } /// Resume the current track - pub async fn resume(&mut self) -> Result<(), IpcError> { - if let Some(ref client) = self.0.read().await.client { - return client.send(IpcPacket::Resume); + pub async fn resume(&mut self) { + if let Some(ref spirc) = self.0.read().await.spirc { + spirc.play(); } - - Ok(()) } async fn create_player(&mut self, ctx: &Context) -> Result<(), SessionCreateError> { @@ -232,52 +208,17 @@ impl SpoticordSession { } }; - // Create IPC oneshot server - let (server, tx_name, rx_name) = match ipc::Server::create() { - Ok(server) => server, - Err(why) => { - error!("Failed to create IPC server: {:?}", why); - return Err(SessionCreateError::ForkError); - } - }; - - // Spawn player process - let child = - match Command::new(std::env::current_exe().expect("to know the current executable path")) - .args([ - "--player", - &tx_name, - &rx_name, - "--debug-guild-id", - &self.guild_id().await.to_string(), - ]) - .stdout(Stdio::piped()) - .stderr(Stdio::inherit()) - .spawn() - { - Ok(child) => child, - Err(why) => { - error!("Failed to start player process: {:?}", why); - return Err(SessionCreateError::ForkError); - } - }; - - // Establish bi-directional IPC channel - let client = match server.accept() { - Ok(client) => client, - Err(why) => { - error!("Failed to accept IPC connection: {:?}", why); - - return Err(SessionCreateError::ForkError); - } - }; - - // Pipe player audio to the voice channel - let reader = children_to_reader::(vec![child]); + // Create player + let mut player = Player::create(); // Create track (paused, fixes audio glitches) - let (mut track, track_handle) = - create_player(Input::new(true, reader, Codec::Pcm, Container::Raw, None)); + let (mut track, track_handle) = create_player(Input::new( + true, + Reader::Extension(Box::new(player.get_stream())), + Codec::Pcm, + Container::Raw, + None, + )); track.pause(); let call = self.call().await; @@ -286,11 +227,20 @@ impl SpoticordSession { // Set call audio to track call.play_only(track); + let (spirc, (mut player_rx, mut sink_rx)) = match player.start(&token, &user.device_name).await + { + Ok(v) => v, + Err(why) => { + error!("Failed to start the player: {:?}", why); + + return Err(SessionCreateError::PlayerStartError); + } + }; + // Handle IPC packets // This will automatically quit once the IPC connection is closed tokio::spawn({ let track = track_handle.clone(); - let client = client.clone(); let ctx = ctx.clone(); let instance = self.clone(); let inner = self.0.clone(); @@ -315,138 +265,228 @@ impl SpoticordSession { break; } - let msg = match client.try_recv() { - Ok(msg) => msg, - Err(why) => { - if let TryRecvError::Empty = why { - // No message, wait a bit and try again - tokio::time::sleep(Duration::from_millis(25)).await; + tokio::select! { + event = player_rx.recv() => { + let Some(event) = event else { break; }; - continue; - } else if let TryRecvError::IpcError(IpcError::Disconnected) = &why { - trace!("IPC connection closed, exiting IPC handler"); - break; + match event { + PlayerEvent::Playing { + play_request_id: _, + track_id, + position_ms, + duration_ms, + } => { + let was_none = instance + .update_playback(duration_ms, position_ms, true) + .await; + + if was_none { + // Stop player if update track fails + if let Err(why) = instance.update_track(&ctx, &owner_id, track_id).await { + error!("Failed to update track: {:?}", why); + + instance.player_stopped().await; + return; + } + } + } + + PlayerEvent::Paused { + play_request_id: _, + track_id, + position_ms, + duration_ms, + } => { + instance.start_disconnect_timer().await; + + let was_none = instance + .update_playback(duration_ms, position_ms, false) + .await; + + if was_none { + // Stop player if update track fails + + if let Err(why) = instance.update_track(&ctx, &owner_id, track_id).await { + error!("Failed to update track: {:?}", why); + + instance.player_stopped().await; + return; + } + } + } + + PlayerEvent::Changed { + old_track_id: _, + new_track_id, + } => { + let instance = instance.clone(); + let ctx = ctx.clone(); + + // Fetch track info + // This is done in a separate task to avoid blocking the IPC handler + tokio::spawn(async move { + if let Err(why) = instance.update_track(&ctx, &owner_id, new_track_id).await { + error!("Failed to update track: {:?}", why); + + instance.player_stopped().await; + } + }); + } + + PlayerEvent::Stopped { + play_request_id: _, + track_id: _, + } => { + check_result(track.pause()); + + { + let mut inner = inner.write().await; + inner.playback_info.take(); + } + + instance.start_disconnect_timer().await; + } + + _ => {} + }; + } + + event = sink_rx.recv() => { + let Some(event) = event else { break; }; + + let check_result = |result| { + if let Err(why) = result { + error!("Failed to issue track command: {:?}", why); + } + }; + + + match event { + SinkEvent::Start => { + check_result(track.play()); + } + + SinkEvent::Stop => { + check_result(track.pause()); + } } - - error!("Failed to receive IPC message: {:?}", why); - break; } }; - match msg { - // Session connect error - IpcPacket::ConnectError(why) => { - error!("Failed to connect to Spotify: {:?}", why); + // match event { + // // Session connect error + // IpcPacket::ConnectError(why) => { + // error!("Failed to connect to Spotify: {:?}", why); - // Notify the user in the text channel - if let Err(why) = instance - .text_channel_id() - .await - .send_message(&instance.http().await, |message| { - message.embed(|embed| { - embed.title("Failed to connect to Spotify"); - embed.description(why); - embed.footer(|footer| footer.text("Please try again")); - embed.color(Status::Error as u64); + // // Notify the user in the text channel + // if let Err(why) = instance + // .text_channel_id() + // .await + // .send_message(&instance.http().await, |message| { + // message.embed(|embed| { + // embed.title("Failed to connect to Spotify"); + // embed.description(why); + // embed.footer(|footer| footer.text("Please try again")); + // embed.color(Status::Error as u64); - embed - }); + // embed + // }); - message - }) - .await - { - error!("Failed to send error message: {:?}", why); - } + // message + // }) + // .await + // { + // error!("Failed to send error message: {:?}", why); + // } - break; - } + // break; + // } - // Sink requests playback to start/resume - IpcPacket::StartPlayback => { - check_result(track.play()); - } + // // Sink requests playback to start/resume + // IpcPacket::StartPlayback => { + // check_result(track.play()); + // } - // Sink requests playback to pause - IpcPacket::StopPlayback => { - check_result(track.pause()); - } + // // Sink requests playback to pause + // IpcPacket::StopPlayback => { + // check_result(track.pause()); + // } - // A new track has been set by the player - IpcPacket::TrackChange(track) => { - // Convert to SpotifyId - let track_id = SpotifyId::from_uri(&track).expect("to be a valid uri"); + // // A new track has been set by the player + // IpcPacket::TrackChange(track) => { + // // Convert to SpotifyId + // let track_id = SpotifyId::from_uri(&track).expect("to be a valid uri"); - let instance = instance.clone(); - let ctx = ctx.clone(); + // let instance = instance.clone(); + // let ctx = ctx.clone(); - // Fetch track info - // This is done in a separate task to avoid blocking the IPC handler - tokio::spawn(async move { - if let Err(why) = instance.update_track(&ctx, &owner_id, track_id).await { - error!("Failed to update track: {:?}", why); + // // Fetch track info + // // This is done in a separate task to avoid blocking the IPC handler + // tokio::spawn(async move { + // if let Err(why) = instance.update_track(&ctx, &owner_id, track_id).await { + // error!("Failed to update track: {:?}", why); - instance.player_stopped().await; - } - }); - } + // instance.player_stopped().await; + // } + // }); + // } - // The player has started playing a track - IpcPacket::Playing(track, position_ms, duration_ms) => { - // Convert to SpotifyId - let track_id = SpotifyId::from_uri(&track).expect("to be a valid uri"); + // // The player has started playing a track + // IpcPacket::Playing(track, position_ms, duration_ms) => { + // // Convert to SpotifyId + // let track_id = SpotifyId::from_uri(&track).expect("to be a valid uri"); - let was_none = instance - .update_playback(duration_ms, position_ms, true) - .await; + // let was_none = instance + // .update_playback(duration_ms, position_ms, true) + // .await; - if was_none { - // Stop player if update track fails - if let Err(why) = instance.update_track(&ctx, &owner_id, track_id).await { - error!("Failed to update track: {:?}", why); + // if was_none { + // // Stop player if update track fails + // if let Err(why) = instance.update_track(&ctx, &owner_id, track_id).await { + // error!("Failed to update track: {:?}", why); - instance.player_stopped().await; - return; - } - } - } + // instance.player_stopped().await; + // return; + // } + // } + // } - IpcPacket::Paused(track, position_ms, duration_ms) => { - instance.start_disconnect_timer().await; + // IpcPacket::Paused(track, position_ms, duration_ms) => { + // instance.start_disconnect_timer().await; - // Convert to SpotifyId - let track_id = SpotifyId::from_uri(&track).expect("to be a valid uri"); + // // Convert to SpotifyId + // let track_id = SpotifyId::from_uri(&track).expect("to be a valid uri"); - let was_none = instance - .update_playback(duration_ms, position_ms, false) - .await; + // let was_none = instance + // .update_playback(duration_ms, position_ms, false) + // .await; - if was_none { - // Stop player if update track fails + // if was_none { + // // Stop player if update track fails - if let Err(why) = instance.update_track(&ctx, &owner_id, track_id).await { - error!("Failed to update track: {:?}", why); + // if let Err(why) = instance.update_track(&ctx, &owner_id, track_id).await { + // error!("Failed to update track: {:?}", why); - instance.player_stopped().await; - return; - } - } - } + // instance.player_stopped().await; + // return; + // } + // } + // } - IpcPacket::Stopped => { - check_result(track.pause()); + // IpcPacket::Stopped => { + // check_result(track.pause()); - { - let mut inner = inner.write().await; - inner.playback_info.take(); - } + // { + // let mut inner = inner.write().await; + // inner.playback_info.take(); + // } - instance.start_disconnect_timer().await; - } + // instance.start_disconnect_timer().await; + // } - // Ignore other packets - _ => {} - } + // // Ignore other packets + // _ => {} + // } } // Clean up session @@ -456,15 +496,10 @@ impl SpoticordSession { } }); - // Inform the player process to connect to Spotify - if let Err(why) = client.send(IpcPacket::Connect(token, user.device_name)) { - error!("Failed to send IpcPacket::Connect packet: {:?}", why); - } - // Update inner client and track let mut inner = self.0.write().await; inner.track = Some(track_handle); - inner.client = Some(client); + inner.spirc = Some(spirc); Ok(()) } @@ -537,14 +572,6 @@ impl SpoticordSession { pbi.update_track_episode(spotify_id, track, episode); } - // Send track play event to metrics - #[cfg(feature = "metrics")] - { - if let Some(ref pbi) = inner.playback_info { - inner.metrics.track_play(pbi); - } - } - Ok(()) } @@ -552,15 +579,11 @@ impl SpoticordSession { async fn player_stopped(&self) { let mut inner = self.0.write().await; - if let Some(client) = inner.client.take() { - // Ask player to quit (will cause defunct process) - if let Err(why) = client.send(IpcPacket::Quit) { - error!("Failed to send quit packet: {:?}", why); - } + if let Some(spirc) = inner.spirc.take() { + spirc.shutdown(); } if let Some(track) = inner.track.take() { - // Stop the playback, and freeing the child handle, removing the defunct process if let Err(why) = track.stop() { error!("Failed to stop track: {:?}", why); } @@ -598,11 +621,7 @@ impl SpoticordSession { inner.disconnect_no_abort().await; } - // Stop the disconnect timer, if one is running - let mut inner = self.0.write().await; - if let Some(handle) = inner.disconnect_handle.take() { - handle.abort(); - } + self.stop_disconnect_timer().await; } // Update playback info (duration, position, playing state) @@ -613,25 +632,33 @@ impl SpoticordSession { pbi.is_none() }; - let mut inner = self.0.write().await; + { + let mut inner = self.0.write().await; - if is_none { - inner.playback_info = Some(PlaybackInfo::new(duration_ms, position_ms, playing)); - } else { - // Update position, duration and playback state - inner - .playback_info - .as_mut() - .expect("to contain a value") - .update_pos_dur(position_ms, duration_ms, playing); + if is_none { + inner.playback_info = Some(PlaybackInfo::new(duration_ms, position_ms, playing)); + } else { + // Update position, duration and playback state + inner + .playback_info + .as_mut() + .expect("to contain a value") + .update_pos_dur(position_ms, duration_ms, playing); + }; }; + if playing { + self.stop_disconnect_timer().await; + } + is_none } /// Start the disconnect timer, which will disconnect the bot from the voice channel after a /// certain amount of time async fn start_disconnect_timer(&self) { + self.stop_disconnect_timer().await; + let inner_arc = self.0.clone(); let mut inner = inner_arc.write().await; @@ -640,11 +667,6 @@ impl SpoticordSession { return; } - // Abort the previous timer, if one is running - if let Some(handle) = inner.disconnect_handle.take() { - handle.abort(); - } - inner.disconnect_handle = Some(tokio::spawn({ let inner = inner_arc.clone(); let instance = self.clone(); @@ -681,6 +703,15 @@ impl SpoticordSession { })); } + /// Stop the disconnect timer (if one is running) + async fn stop_disconnect_timer(&self) { + let mut inner = self.0.write().await; + if let Some(handle) = inner.disconnect_handle.take() { + handle.abort(); + } + } + + /// Disconnect from the VC and send a message to the text channel pub async fn disconnect_with_message(&self, content: &str) { { let mut inner = self.0.write().await; @@ -707,12 +738,7 @@ impl SpoticordSession { } // Finally we stop and remove the disconnect timer - let mut inner = self.0.write().await; - - // Stop the disconnect timer, if one is running - if let Some(handle) = inner.disconnect_handle.take() { - handle.abort(); - } + self.stop_disconnect_timer().await; } /* Inner getters */ @@ -767,15 +793,11 @@ impl InnerSpoticordSession { let mut call = self.call.lock().await; - if let Some(client) = self.client.take() { - // Ask player to quit (will cause defunct process) - if let Err(why) = client.send(IpcPacket::Quit) { - error!("Failed to send quit packet: {:?}", why); - } + if let Some(spirc) = self.spirc.take() { + spirc.shutdown(); } if let Some(track) = self.track.take() { - // Stop the playback, and freeing the child handle, removing the defunct process if let Err(why) = track.stop() { error!("Failed to stop track: {:?}", why); }