Rewrite SpoticordSession and small QoL updates

main
DaXcess 2022-11-23 18:08:57 +01:00
parent 2223de1c30
commit a53c885b6c
No known key found for this signature in database
GPG Key ID: CF78CC72F0FD5EAD
10 changed files with 532 additions and 427 deletions

View File

@ -1,2 +1,2 @@
[target.x86_64-pc-windows-gnu] [target.x86_64-pc-windows-gnu]
rustflags = "-C link-args=-lssp" # Does does compile without this line rustflags = "-C link-args=-lssp" # Does not compile without this line

1
.github/FUNDING.yml vendored 100644
View File

@ -0,0 +1 @@
patreon: rodabafilms

View File

@ -1,4 +1,4 @@
use log::{error, trace}; use log::error;
use serenity::{ use serenity::{
builder::CreateApplicationCommand, builder::CreateApplicationCommand,
model::prelude::{interaction::application_command::ApplicationCommandInteraction, Channel}, model::prelude::{interaction::application_command::ApplicationCommandInteraction, Channel},
@ -167,13 +167,13 @@ pub fn run(ctx: Context, command: ApplicationCommandInteraction) -> CommandOutpu
} }
let data = ctx.data.read().await; let data = ctx.data.read().await;
let mut session_manager = data.get::<SessionManager>().unwrap().clone(); let session_manager = data.get::<SessionManager>().unwrap().clone();
// Check if another session is already active in this server // Check if another session is already active in this server
let mut session_opt = session_manager.get_session(guild.id).await; let mut session_opt = session_manager.get_session(guild.id).await;
if let Some(session) = &session_opt { if let Some(session) = &session_opt {
if let Some(owner) = session.get_owner().await { if let Some(owner) = session.owner().await {
let msg = if owner == command.user.id { let msg = if owner == command.user.id {
"You are already controlling the bot" "You are already controlling the bot"
} else { } else {
@ -206,7 +206,7 @@ pub fn run(ctx: Context, command: ApplicationCommandInteraction) -> CommandOutpu
.description( .description(
format!( format!(
"You are already playing music in another server ({}).\nStop playing in that server first before joining this one.", "You are already playing music in another server ({}).\nStop playing in that server first before joining this one.",
ctx.cache.guild(session.get_guild_id()).unwrap().name ctx.cache.guild(session.guild_id().await).unwrap().name
)).status(Status::Error).build(), )).status(Status::Error).build(),
true, true,
) )
@ -218,8 +218,7 @@ pub fn run(ctx: Context, command: ApplicationCommandInteraction) -> CommandOutpu
defer_message(&ctx, &command, false).await; defer_message(&ctx, &command, false).await;
if let Some(session) = &session_opt { if let Some(session) = &session_opt {
trace!("{} != {}", session.get_channel_id(), channel_id); if session.channel_id().await != channel_id {
if session.get_channel_id() != channel_id {
session.disconnect().await; session.disconnect().await;
session_opt = None; session_opt = None;
@ -228,7 +227,7 @@ pub fn run(ctx: Context, command: ApplicationCommandInteraction) -> CommandOutpu
} }
} }
if let Some(session) = &session_opt { if let Some(session) = session_opt.as_mut() {
if let Err(why) = session.update_owner(&ctx, command.user.id).await { if let Err(why) = session.update_owner(&ctx, command.user.id).await {
// Need to link first // Need to link first
if let SessionCreateError::NoSpotifyError = why { if let SessionCreateError::NoSpotifyError = why {

View File

@ -36,7 +36,7 @@ pub fn run(ctx: Context, command: ApplicationCommandInteraction) -> CommandOutpu
} }
}; };
if let Some(owner) = session.get_owner().await { if let Some(owner) = session.owner().await {
if owner != command.user.id { if owner != command.user.id {
// This message was generated by AI, and I love it. // This message was generated by AI, and I love it.
respond_message( respond_message(

View File

@ -48,7 +48,7 @@ pub fn run(ctx: Context, command: ApplicationCommandInteraction) -> CommandOutpu
} }
}; };
let owner = match session.get_owner().await { let owner = match session.owner().await {
Some(owner) => owner, Some(owner) => owner,
None => { None => {
not_playing.await; not_playing.await;
@ -58,7 +58,7 @@ pub fn run(ctx: Context, command: ApplicationCommandInteraction) -> CommandOutpu
}; };
// Get Playback Info from session // Get Playback Info from session
let pbi = match session.get_playback_info().await { let pbi = match session.playback_info().await {
Some(pbi) => pbi, Some(pbi) => pbi,
None => { None => {
not_playing.await; not_playing.await;

View File

@ -2,14 +2,22 @@ use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Debug, Clone)] #[derive(Serialize, Deserialize, Debug, Clone)]
pub enum IpcPacket { pub enum IpcPacket {
/// Quit the player process
Quit, Quit,
/// Connect to Spotify with the given token and device name
Connect(String, String), Connect(String, String),
/// Disconnect from Spotify (unused)
Disconnect, Disconnect,
/// Unable to connect to Spotify
ConnectError(String), ConnectError(String),
/// The audio sink has started writing
StartPlayback, StartPlayback,
/// The audio sink has stopped writing
StopPlayback, StopPlayback,
/// The current Spotify track was changed /// The current Spotify track was changed

View File

@ -1,4 +1,4 @@
use std::time::Duration; use std::{process::exit, time::Duration};
use ipc_channel::ipc::{IpcError, TryRecvError}; use ipc_channel::ipc::{IpcError, TryRecvError};
use librespot::{ use librespot::{
@ -95,8 +95,8 @@ impl SpoticordPlayer {
let (spirc, spirc_task) = Spirc::new( let (spirc, spirc_task) = Spirc::new(
ConnectConfig { ConnectConfig {
name: device_name.into(), name: device_name.into(),
// 75% // 50%
initial_volume: Some((65535 / 4) * 3), initial_volume: Some(65535 / 2),
..ConnectConfig::default() ..ConnectConfig::default()
}, },
session.clone(), session.clone(),
@ -275,8 +275,7 @@ pub async fn main() {
IpcPacket::Quit => { IpcPacket::Quit => {
debug!("Received quit packet, exiting"); debug!("Received quit packet, exiting");
player.stop(); exit(0);
break;
} }
_ => { _ => {

View File

@ -10,6 +10,9 @@ use super::SpoticordSession;
#[derive(Debug, Error)] #[derive(Debug, Error)]
pub enum SessionCreateError { pub enum SessionCreateError {
#[error("This session has no owner assigned")]
NoOwnerError,
#[error("The user has not linked their Spotify account")] #[error("The user has not linked their Spotify account")]
NoSpotifyError, NoSpotifyError,
@ -24,20 +27,22 @@ pub enum SessionCreateError {
} }
#[derive(Clone)] #[derive(Clone)]
pub struct SessionManager { pub struct SessionManager(Arc<tokio::sync::RwLock<InnerSessionManager>>);
sessions: Arc<tokio::sync::RwLock<HashMap<GuildId, Arc<SpoticordSession>>>>,
owner_map: Arc<tokio::sync::RwLock<HashMap<UserId, GuildId>>>,
}
impl TypeMapKey for SessionManager { impl TypeMapKey for SessionManager {
type Value = SessionManager; type Value = SessionManager;
} }
impl SessionManager { pub struct InnerSessionManager {
pub fn new() -> SessionManager { sessions: HashMap<GuildId, SpoticordSession>,
SessionManager { owner_map: HashMap<UserId, GuildId>,
sessions: Arc::new(tokio::sync::RwLock::new(HashMap::new())), }
owner_map: Arc::new(tokio::sync::RwLock::new(HashMap::new())),
impl InnerSessionManager {
pub fn new() -> Self {
Self {
sessions: HashMap::new(),
owner_map: HashMap::new(),
} }
} }
@ -54,75 +59,60 @@ impl SessionManager {
let session = let session =
SpoticordSession::new(ctx, guild_id, channel_id, text_channel_id, owner_id).await?; SpoticordSession::new(ctx, guild_id, channel_id, text_channel_id, owner_id).await?;
let mut sessions = self.sessions.write().await; self.sessions.insert(guild_id, session);
let mut owner_map = self.owner_map.write().await; self.owner_map.insert(owner_id, guild_id);
sessions.insert(guild_id, Arc::new(session));
owner_map.insert(owner_id, guild_id);
Ok(()) Ok(())
} }
/// Remove a session /// Remove a session
pub async fn remove_session(&mut self, guild_id: GuildId) { pub async fn remove_session(&mut self, guild_id: GuildId) {
let mut sessions = self.sessions.write().await; if let Some(session) = self.sessions.get(&guild_id) {
if let Some(owner) = session.owner().await {
if let Some(session) = sessions.get(&guild_id) { self.owner_map.remove(&owner);
if let Some(owner) = session.get_owner().await {
let mut owner_map = self.owner_map.write().await;
owner_map.remove(&owner);
} }
} }
sessions.remove(&guild_id); self.sessions.remove(&guild_id);
} }
/// Remove owner from owner map. /// Remove owner from owner map.
/// Used whenever a user stops playing music without leaving the bot. /// Used whenever a user stops playing music without leaving the bot.
pub async fn remove_owner(&mut self, owner_id: UserId) { pub fn remove_owner(&mut self, owner_id: UserId) {
let mut owner_map = self.owner_map.write().await; self.owner_map.remove(&owner_id);
owner_map.remove(&owner_id);
} }
/// Set the owner of a session /// Set the owner of a session
/// Used when a user joins a session that is already active /// Used when a user joins a session that is already active
pub async fn set_owner(&mut self, owner_id: UserId, guild_id: GuildId) { pub fn set_owner(&mut self, owner_id: UserId, guild_id: GuildId) {
let mut owner_map = self.owner_map.write().await; self.owner_map.insert(owner_id, guild_id);
owner_map.insert(owner_id, guild_id);
} }
/// Get a session by its guild ID /// Get a session by its guild ID
pub async fn get_session(&self, guild_id: GuildId) -> Option<Arc<SpoticordSession>> { pub fn get_session(&self, guild_id: GuildId) -> Option<SpoticordSession> {
let sessions = self.sessions.read().await; self.sessions.get(&guild_id).cloned()
sessions.get(&guild_id).cloned()
} }
/// Find a Spoticord session by their current owner's ID /// Find a Spoticord session by their current owner's ID
pub async fn find(&self, owner_id: UserId) -> Option<Arc<SpoticordSession>> { pub fn find(&self, owner_id: UserId) -> Option<SpoticordSession> {
let sessions = self.sessions.read().await; let guild_id = self.owner_map.get(&owner_id)?;
let owner_map = self.owner_map.read().await;
let guild_id = owner_map.get(&owner_id)?; self.sessions.get(&guild_id).cloned()
sessions.get(&guild_id).cloned()
} }
/// Get the amount of sessions /// Get the amount of sessions
pub async fn get_session_count(&self) -> usize { pub fn get_session_count(&self) -> usize {
let sessions = self.sessions.read().await; self.sessions.len()
sessions.len()
} }
/// Get the amount of sessions with an owner /// Get the amount of sessions with an owner
pub async fn get_active_session_count(&self) -> usize { pub async fn get_active_session_count(&self) -> usize {
let sessions = self.sessions.read().await;
let mut count: usize = 0; let mut count: usize = 0;
for session in sessions.values() { for session in self.sessions.values() {
if session.owner.read().await.is_some() { let session = session.0.read().await;
if session.owner.is_some() {
count += 1; count += 1;
} }
} }
@ -130,3 +120,65 @@ impl SessionManager {
count count
} }
} }
impl SessionManager {
pub fn new() -> Self {
Self(Arc::new(tokio::sync::RwLock::new(
InnerSessionManager::new(),
)))
}
/// Creates a new session for the given user in the given guild.
pub async fn create_session(
&self,
ctx: &Context,
guild_id: GuildId,
channel_id: ChannelId,
text_channel_id: ChannelId,
owner_id: UserId,
) -> Result<(), SessionCreateError> {
self
.0
.write()
.await
.create_session(ctx, guild_id, channel_id, text_channel_id, owner_id)
.await
}
/// Remove a session
pub async fn remove_session(&self, guild_id: GuildId) {
self.0.write().await.remove_session(guild_id).await;
}
/// Remove owner from owner map.
/// Used whenever a user stops playing music without leaving the bot.
pub async fn remove_owner(&self, owner_id: UserId) {
self.0.write().await.remove_owner(owner_id);
}
/// Set the owner of a session
/// Used when a user joins a session that is already active
pub async fn set_owner(&self, owner_id: UserId, guild_id: GuildId) {
self.0.write().await.set_owner(owner_id, guild_id);
}
/// Get a session by its guild ID
pub async fn get_session(&self, guild_id: GuildId) -> Option<SpoticordSession> {
self.0.read().await.get_session(guild_id)
}
/// Find a Spoticord session by their current owner's ID
pub async fn find(&self, owner_id: UserId) -> Option<SpoticordSession> {
self.0.read().await.find(owner_id)
}
/// Get the amount of sessions
pub async fn get_session_count(&self) -> usize {
self.0.read().await.get_session_count()
}
/// Get the amount of sessions with an owner
pub async fn get_active_session_count(&self) -> usize {
self.0.read().await.get_active_session_count().await
}
}

View File

@ -34,8 +34,10 @@ pub mod manager;
mod pbi; mod pbi;
#[derive(Clone)] #[derive(Clone)]
pub struct SpoticordSession { pub struct SpoticordSession(Arc<RwLock<InnerSpoticordSession>>);
owner: Arc<RwLock<Option<UserId>>>,
struct InnerSpoticordSession {
owner: Option<UserId>,
guild_id: GuildId, guild_id: GuildId,
channel_id: ChannelId, channel_id: ChannelId,
text_channel_id: ChannelId, text_channel_id: ChannelId,
@ -45,13 +47,13 @@ pub struct SpoticordSession {
session_manager: SessionManager, session_manager: SessionManager,
call: Arc<Mutex<Call>>, call: Arc<Mutex<Call>>,
track: TrackHandle, track: Option<TrackHandle>,
playback_info: Arc<RwLock<Option<PlaybackInfo>>>, playback_info: Option<PlaybackInfo>,
disconnect_handle: Arc<Mutex<Option<tokio::task::JoinHandle<()>>>>, disconnect_handle: Option<tokio::task::JoinHandle<()>>,
client: Client, client: Option<Client>,
} }
impl SpoticordSession { impl SpoticordSession {
@ -64,9 +66,87 @@ impl SpoticordSession {
) -> Result<SpoticordSession, SessionCreateError> { ) -> Result<SpoticordSession, SessionCreateError> {
// Get the Spotify token of the owner // Get the Spotify token of the owner
let data = ctx.data.read().await; let data = ctx.data.read().await;
let database = data.get::<Database>().unwrap();
let session_manager = data.get::<SessionManager>().unwrap().clone(); let session_manager = data.get::<SessionManager>().unwrap().clone();
// Join the voice channel
let songbird = songbird::get(ctx).await.unwrap().clone();
let (call, result) = songbird.join(guild_id, channel_id).await;
if let Err(why) = result {
error!("Error joining voice channel: {:?}", why);
return Err(SessionCreateError::JoinError(channel_id, guild_id));
}
let inner = InnerSpoticordSession {
owner: Some(owner_id.clone()),
guild_id,
channel_id,
text_channel_id,
http: ctx.http.clone(),
session_manager: session_manager.clone(),
call: call.clone(),
track: None,
playback_info: None,
disconnect_handle: None,
client: None,
};
let mut instance = Self(Arc::new(RwLock::new(inner)));
instance.create_player(ctx).await?;
let mut call = call.lock().await;
// Set up events
call.add_global_event(
songbird::Event::Core(songbird::CoreEvent::DriverDisconnect),
instance.clone(),
);
call.add_global_event(
songbird::Event::Core(songbird::CoreEvent::ClientDisconnect),
instance.clone(),
);
Ok(instance)
}
pub async fn update_owner(
&mut self,
ctx: &Context,
owner_id: UserId,
) -> Result<(), SessionCreateError> {
// Get the Spotify token of the owner
let data = ctx.data.read().await;
let session_manager = data.get::<SessionManager>().unwrap().clone();
{
let mut inner = self.0.write().await;
inner.owner = Some(owner_id);
}
{
let inner = self.0.clone();
let inner = inner.read().await;
session_manager.set_owner(owner_id, inner.guild_id).await;
}
// Create the player
self.create_player(ctx).await?;
Ok(())
}
async fn create_player(&mut self, ctx: &Context) -> Result<(), SessionCreateError> {
let owner_id = match self.owner().await.clone() {
Some(owner_id) => owner_id,
None => return Err(SessionCreateError::NoOwnerError),
};
let data = ctx.data.read().await;
let database = data.get::<Database>().unwrap();
let token = match database.get_access_token(owner_id.to_string()).await { let token = match database.get_access_token(owner_id.to_string()).await {
Ok(token) => token, Ok(token) => token,
Err(why) => { Err(why) => {
@ -97,21 +177,15 @@ impl SpoticordSession {
} }
}; };
// Join the voice channel
let songbird = songbird::get(ctx).await.unwrap().clone();
let (call, result) = songbird.join(guild_id, channel_id).await;
if let Err(why) = result {
error!("Error joining voice channel: {:?}", why);
return Err(SessionCreateError::JoinError(channel_id, guild_id));
}
let mut call_mut = call.lock().await;
// Spawn player process // Spawn player process
let child = match Command::new(std::env::current_exe().unwrap()) let child = match Command::new(std::env::current_exe().unwrap())
.args(["--player", &tx_name, &rx_name]) .args([
"--player",
&tx_name,
&rx_name,
"--debug-guild-id",
&self.guild_id().await.to_string(),
])
.stdout(Stdio::piped()) .stdout(Stdio::piped())
.stderr(Stdio::inherit()) .stderr(Stdio::inherit())
.spawn() .spawn()
@ -141,252 +215,186 @@ impl SpoticordSession {
create_player(Input::new(true, reader, Codec::Pcm, Container::Raw, None)); create_player(Input::new(true, reader, Codec::Pcm, Container::Raw, None));
track.pause(); track.pause();
let call = self.call().await;
let mut call = call.lock().await;
// Set call audio to track // Set call audio to track
call_mut.play_only(track); call.play_only(track);
let instance = Self {
owner: Arc::new(RwLock::new(Some(owner_id.clone()))),
guild_id,
channel_id,
text_channel_id,
http: ctx.http.clone(),
session_manager: session_manager.clone(),
call: call.clone(),
track: track_handle.clone(),
playback_info: Arc::new(RwLock::new(None)),
disconnect_handle: Arc::new(Mutex::new(None)),
client: client.clone(),
};
// Clone variables for use in the IPC handler
let ipc_track = track_handle.clone();
let ipc_client = client.clone();
let ipc_context = ctx.clone();
let mut ipc_instance = instance.clone();
// Handle IPC packets // Handle IPC packets
// This will automatically quit once the IPC connection is closed // This will automatically quit once the IPC connection is closed
tokio::spawn(async move { tokio::spawn({
let check_result = |result| { let track = track_handle.clone();
if let Err(why) = result { let client = client.clone();
error!("Failed to issue track command: {:?}", why); let ctx = ctx.clone();
} let instance = self.clone();
}; let inner = self.0.clone();
loop { async move {
// Required for IpcPacket::TrackChange to work let check_result = |result| {
tokio::task::yield_now().await; if let Err(why) = result {
error!("Failed to issue track command: {:?}", why);
let msg = match ipc_client.try_recv() {
Ok(msg) => msg,
Err(why) => {
if let TryRecvError::Empty = why {
// No message, wait a bit and try again
tokio::time::sleep(Duration::from_millis(25)).await;
continue;
} else if let TryRecvError::IpcError(why) = &why {
if let IpcError::Disconnected = why {
trace!("IPC connection closed, exiting IPC handler");
break;
}
}
error!("Failed to receive IPC message: {:?}", why);
break;
} }
}; };
trace!("Received IPC message: {:?}", msg); loop {
// Required for IpcPacket::TrackChange to work
tokio::task::yield_now().await;
match msg { let msg = match client.try_recv() {
// Session connect error Ok(msg) => msg,
IpcPacket::ConnectError(why) => { Err(why) => {
error!("Failed to connect to Spotify: {:?}", why); if let TryRecvError::Empty = why {
// No message, wait a bit and try again
tokio::time::sleep(Duration::from_millis(25)).await;
// Notify the user in the text channel continue;
if let Err(why) = ipc_instance } else if let TryRecvError::IpcError(why) = &why {
.text_channel_id if let IpcError::Disconnected = why {
.send_message(&ipc_instance.http, |message| { trace!("IPC connection closed, exiting IPC handler");
message.embed(|embed| { break;
embed.title("Failed to connect to Spotify"); }
embed.description(why);
embed.color(Status::Error as u64);
embed
});
message
})
.await
{
error!("Failed to send error message: {:?}", why);
}
// Clean up session
ipc_instance.player_stopped().await;
break;
}
// Sink requests playback to start/resume
IpcPacket::StartPlayback => {
check_result(ipc_track.play());
}
// Sink requests playback to pause
IpcPacket::StopPlayback => {
check_result(ipc_track.pause());
}
// A new track has been set by the player
IpcPacket::TrackChange(track) => {
// Convert to SpotifyId
let track_id = SpotifyId::from_uri(&track).unwrap();
let mut instance = ipc_instance.clone();
let context = ipc_context.clone();
// Fetch track info
// This is done in a separate task to avoid blocking the IPC handler
tokio::spawn(async move {
if let Err(why) = instance.update_track(&context, &owner_id, track_id).await {
error!("Failed to update track: {:?}", why);
instance.player_stopped().await;
} }
});
}
// The player has started playing a track error!("Failed to receive IPC message: {:?}", why);
IpcPacket::Playing(track, position_ms, duration_ms) => { break;
// Convert to SpotifyId }
let track_id = SpotifyId::from_uri(&track).unwrap(); };
let was_none = ipc_instance trace!("Received IPC message: {:?}", msg);
.update_playback(duration_ms, position_ms, true)
.await;
if was_none { match msg {
// Stop player if update track fails // Session connect error
if let Err(why) = ipc_instance IpcPacket::ConnectError(why) => {
.update_track(&ipc_context, &owner_id, track_id) error!("Failed to connect to Spotify: {:?}", why);
// Notify the user in the text channel
if let Err(why) = instance
.text_channel_id()
.await
.send_message(&instance.http().await, |message| {
message.embed(|embed| {
embed.title("Failed to connect to Spotify");
embed.description(why);
embed.color(Status::Error as u64);
embed
});
message
})
.await .await
{ {
error!("Failed to update track: {:?}", why); error!("Failed to send error message: {:?}", why);
}
ipc_instance.player_stopped().await; // Clean up session
return; instance.player_stopped().await;
break;
}
// Sink requests playback to start/resume
IpcPacket::StartPlayback => {
check_result(track.play());
}
// Sink requests playback to pause
IpcPacket::StopPlayback => {
check_result(track.pause());
}
// A new track has been set by the player
IpcPacket::TrackChange(track) => {
// Convert to SpotifyId
let track_id = SpotifyId::from_uri(&track).unwrap();
let instance = instance.clone();
let ctx = ctx.clone();
// Fetch track info
// This is done in a separate task to avoid blocking the IPC handler
tokio::spawn(async move {
if let Err(why) = instance.update_track(&ctx, &owner_id, track_id).await {
error!("Failed to update track: {:?}", why);
instance.player_stopped().await;
}
});
}
// The player has started playing a track
IpcPacket::Playing(track, position_ms, duration_ms) => {
// Convert to SpotifyId
let track_id = SpotifyId::from_uri(&track).unwrap();
let was_none = instance
.update_playback(duration_ms, position_ms, true)
.await;
if was_none {
// Stop player if update track fails
if let Err(why) = instance.update_track(&ctx, &owner_id, track_id).await {
error!("Failed to update track: {:?}", why);
instance.player_stopped().await;
return;
}
} }
} }
}
IpcPacket::Paused(track, position_ms, duration_ms) => { IpcPacket::Paused(track, position_ms, duration_ms) => {
ipc_instance.start_disconnect_timer().await; instance.start_disconnect_timer().await;
// Convert to SpotifyId // Convert to SpotifyId
let track_id = SpotifyId::from_uri(&track).unwrap(); let track_id = SpotifyId::from_uri(&track).unwrap();
let was_none = ipc_instance let was_none = instance
.update_playback(duration_ms, position_ms, false) .update_playback(duration_ms, position_ms, false)
.await; .await;
if was_none { if was_none {
// Stop player if update track fails // Stop player if update track fails
if let Err(why) = ipc_instance
.update_track(&ipc_context, &owner_id, track_id)
.await
{
error!("Failed to update track: {:?}", why);
ipc_instance.player_stopped().await; if let Err(why) = instance.update_track(&ctx, &owner_id, track_id).await {
return; error!("Failed to update track: {:?}", why);
instance.player_stopped().await;
return;
}
} }
} }
IpcPacket::Stopped => {
check_result(track.pause());
let mut inner = inner.write().await;
inner.playback_info.take();
instance.start_disconnect_timer().await;
}
// Ignore other packets
_ => {}
} }
IpcPacket::Stopped => {
check_result(ipc_track.pause());
ipc_instance.playback_info.write().await.take();
ipc_instance.start_disconnect_timer().await;
}
// Ignore other packets
_ => {}
} }
} }
}); });
// Set up events
call_mut.add_global_event(
songbird::Event::Core(songbird::CoreEvent::DriverDisconnect),
instance.clone(),
);
call_mut.add_global_event(
songbird::Event::Core(songbird::CoreEvent::ClientDisconnect),
instance.clone(),
);
// Inform the player process to connect to Spotify // Inform the player process to connect to Spotify
if let Err(why) = client.send(IpcPacket::Connect(token, user.device_name)) { if let Err(why) = client.send(IpcPacket::Connect(token, user.device_name)) {
error!("Failed to send IpcPacket::Connect packet: {:?}", why); error!("Failed to send IpcPacket::Connect packet: {:?}", why);
} }
Ok(instance) // Update inner client and track
} let mut inner = self.0.write().await;
inner.track = Some(track_handle);
pub async fn update_owner( inner.client = Some(client);
&self,
ctx: &Context,
owner_id: UserId,
) -> Result<(), SessionCreateError> {
// Get the Spotify token of the owner
let data = ctx.data.read().await;
let database = data.get::<Database>().unwrap();
let mut session_manager = data.get::<SessionManager>().unwrap().clone();
let token = match database.get_access_token(owner_id.to_string()).await {
Ok(token) => token,
Err(why) => {
if let DatabaseError::InvalidStatusCode(code) = why {
if code == 404 {
return Err(SessionCreateError::NoSpotifyError);
}
}
return Err(SessionCreateError::DatabaseError);
}
};
let user = match database.get_user(owner_id.to_string()).await {
Ok(user) => user,
Err(why) => {
error!("Failed to get user: {:?}", why);
return Err(SessionCreateError::DatabaseError);
}
};
{
let mut owner = self.owner.write().await;
*owner = Some(owner_id);
}
session_manager.set_owner(owner_id, self.guild_id).await;
// Inform the player process to connect to Spotify
if let Err(why) = self
.client
.send(IpcPacket::Connect(token, user.device_name))
{
error!("Failed to send IpcPacket::Connect packet: {:?}", why);
}
Ok(()) Ok(())
} }
// Update current track /// Update current track
async fn update_track( async fn update_track(
&self, &self,
ctx: &Context, ctx: &Context,
@ -394,9 +402,9 @@ impl SpoticordSession {
spotify_id: SpotifyId, spotify_id: SpotifyId,
) -> Result<(), String> { ) -> Result<(), String> {
let should_update = { let should_update = {
let pbi = self.playback_info.read().await; let pbi = self.playback_info().await;
if let Some(pbi) = &*pbi { if let Some(pbi) = pbi {
pbi.spotify_id.is_none() || pbi.spotify_id.unwrap() != spotify_id pbi.spotify_id.is_none() || pbi.spotify_id.unwrap() != spotify_id
} else { } else {
false false
@ -447,9 +455,10 @@ impl SpoticordSession {
episode = Some(episode_info); episode = Some(episode_info);
} }
let mut pbi = self.playback_info.write().await; // Update track/episode
let mut inner = self.0.write().await;
if let Some(pbi) = &mut *pbi { if let Some(pbi) = inner.playback_info.as_mut() {
pbi.update_track_episode(spotify_id, track, episode); pbi.update_track_episode(spotify_id, track, episode);
} }
@ -457,74 +466,176 @@ impl SpoticordSession {
} }
/// Called when the player must stop, but not leave the call /// Called when the player must stop, but not leave the call
async fn player_stopped(&mut self) { async fn player_stopped(&self) {
// Disconnect from Spotify let mut inner = self.0.write().await;
if let Err(why) = self.client.send(IpcPacket::Disconnect) {
error!("Failed to send disconnect packet: {:?}", why); if let Some(client) = inner.client.take() {
// Ask player to quit (will cause defunct process)
if let Err(why) = client.send(IpcPacket::Quit) {
error!("Failed to send quit packet: {:?}", why);
}
} }
/* So this is really annoying, but necessary. if let Some(track) = inner.track.take() {
* If we pause songbird too quickly, the player would still have // Stop the playback, and freeing the child handle, removing the defunct process
* some audio to write to stdout. Because songbird is paused, if let Err(why) = track.stop() {
* the audio is never read, and the player process will hang. error!("Failed to stop track: {:?}", why);
* }
* So yeah we just blatanly wait a second, and hope that's enough.
* Most likely causes issues when the system is a bit slow.
*/
tokio::time::sleep(Duration::from_millis(1000)).await;
if let Err(why) = self.track.pause() {
error!("Failed to pause track: {:?}", why);
} }
// Clear owner // Clear owner
let mut owner = self.owner.write().await; if let Some(owner_id) = inner.owner.take() {
if let Some(owner_id) = owner.take() { inner.session_manager.remove_owner(owner_id).await;
self.session_manager.remove_owner(owner_id).await;
} }
// Clear playback info // Clear playback info
let mut playback_info = self.playback_info.write().await; inner.playback_info = None;
*playback_info = None;
// Unlock to prevent deadlock in start_disconnect_timer
drop(inner);
// Disconnect automatically after some time // Disconnect automatically after some time
self.start_disconnect_timer().await; self.start_disconnect_timer().await;
} }
/// Internal version of disconnect, which does not abort the disconnect timer
async fn disconnect_no_abort(&self) {
self
.session_manager
.clone()
.remove_session(self.guild_id)
.await;
let mut call = self.call.lock().await;
self.track.stop().unwrap_or(());
call.remove_all_global_events();
if let Err(why) = call.leave().await {
error!("Failed to leave voice channel: {:?}", why);
}
}
// Disconnect from voice channel and remove session from manager // Disconnect from voice channel and remove session from manager
pub async fn disconnect(&self) { pub async fn disconnect(&self) {
info!("Disconnecting from voice channel {}", self.channel_id); info!(
"[{}] Disconnecting from voice channel {}",
self.guild_id().await,
self.channel_id().await
);
self.disconnect_no_abort().await; // We must run disconnect_no_abort within a read lock
// This is because `SessionManager::remove_session` will acquire a
// read lock to read the current owner.
// This would deadlock if we have an active write lock
{
let inner = self.0.read().await;
inner.disconnect_no_abort().await;
}
// Stop the disconnect timer, if one is running // Stop the disconnect timer, if one is running
let mut dc_handle = self.disconnect_handle.lock().await; let mut inner = self.0.write().await;
if let Some(handle) = inner.disconnect_handle.take() {
if let Some(handle) = dc_handle.take() {
handle.abort(); handle.abort();
} }
} }
/// Disconnect from voice channel with a message // Update playback info (duration, position, playing state)
pub async fn disconnect_with_message(&self, content: &str) { async fn update_playback(&self, duration_ms: u32, position_ms: u32, playing: bool) -> bool {
let is_none = {
let pbi = self.playback_info().await;
pbi.is_none()
};
let mut inner = self.0.write().await;
if is_none {
inner.playback_info = Some(PlaybackInfo::new(duration_ms, position_ms, playing));
} else {
// Update position, duration and playback state
inner
.playback_info
.as_mut()
.unwrap()
.update_pos_dur(position_ms, duration_ms, playing);
};
is_none
}
/// Start the disconnect timer, which will disconnect the bot from the voice channel after a
/// certain amount of time
async fn start_disconnect_timer(&self) {
let inner_arc = self.0.clone();
let mut inner = inner_arc.write().await;
// Abort the previous timer, if one is running
if let Some(handle) = inner.disconnect_handle.take() {
handle.abort();
}
inner.disconnect_handle = Some(tokio::spawn({
let inner = inner_arc.clone();
async move {
let mut timer = tokio::time::interval(Duration::from_secs(DISCONNECT_TIME));
// Ignore first (immediate) tick
timer.tick().await;
timer.tick().await;
// Make sure this task has not been aborted, if it has this will automatically stop execution.
tokio::task::yield_now().await;
// Lock the inner mutex as late as possible to prevent any deadlocks
let mut inner = inner.write().await;
let is_playing = {
if let Some(ref pbi) = inner.playback_info {
pbi.is_playing
} else {
false
}
};
if !is_playing {
info!("Player is not playing, disconnecting");
inner
.disconnect_with_message(
"The player has been inactive for too long, and has been disconnected.",
)
.await;
}
}
}));
}
/* Inner getters */
/// Get the owner
pub async fn owner(&self) -> Option<UserId> {
self.0.read().await.owner
}
/// Get the session manager
pub async fn session_manager(&self) -> SessionManager {
self.0.read().await.session_manager.clone()
}
/// Get the guild id
pub async fn guild_id(&self) -> GuildId {
self.0.read().await.guild_id
}
/// Get the channel id
pub async fn channel_id(&self) -> ChannelId {
self.0.read().await.channel_id
}
/// Get the channel id
pub async fn text_channel_id(&self) -> ChannelId {
self.0.read().await.text_channel_id
}
/// Get the playback info
pub async fn playback_info(&self) -> Option<PlaybackInfo> {
self.0.read().await.playback_info.clone()
}
pub async fn call(&self) -> Arc<Mutex<Call>> {
self.0.read().await.call.clone()
}
pub async fn http(&self) -> Arc<Http> {
self.0.read().await.http.clone()
}
}
impl InnerSpoticordSession {
pub async fn disconnect_with_message(&mut self, content: &str) {
self.disconnect_no_abort().await; self.disconnect_no_abort().await;
if let Err(why) = self if let Err(why) = self
@ -544,102 +655,32 @@ impl SpoticordSession {
} }
// Stop the disconnect timer, if one is running // Stop the disconnect timer, if one is running
let mut dc_handle = self.disconnect_handle.lock().await; if let Some(handle) = self.disconnect_handle.take() {
if let Some(handle) = dc_handle.take() {
handle.abort(); handle.abort();
} }
} }
// Update playback info (duration, position, playing state) /// Internal version of disconnect, which does not abort the disconnect timer
async fn update_playback(&self, duration_ms: u32, position_ms: u32, playing: bool) -> bool { async fn disconnect_no_abort(&self) {
let is_none = { self.session_manager.remove_session(self.guild_id).await;
let pbi = self.playback_info.read().await;
pbi.is_none() let mut call = self.call.lock().await;
};
if is_none { if let Some(ref track) = self.track {
let mut pbi = self.playback_info.write().await; track.stop().unwrap_or(());
*pbi = Some(PlaybackInfo::new(duration_ms, position_ms, playing));
} else {
let mut pbi = self.playback_info.write().await;
// Update position, duration and playback state
pbi
.as_mut()
.unwrap()
.update_pos_dur(position_ms, duration_ms, playing)
.await;
};
is_none
}
/// Start the disconnect timer, which will disconnect the bot from the voice channel after a
/// certain amount of time
async fn start_disconnect_timer(&self) {
let pbi = self.playback_info.clone();
let instance = self.clone();
let mut handle = self.disconnect_handle.lock().await;
// Abort the previous timer, if one is running
if let Some(handle) = handle.take() {
handle.abort();
} }
*handle = Some(tokio::spawn(async move { call.remove_all_global_events();
let mut timer = tokio::time::interval(Duration::from_secs(DISCONNECT_TIME));
// Ignore first (immediate) tick if let Err(why) = call.leave().await {
timer.tick().await; error!("Failed to leave voice channel: {:?}", why);
timer.tick().await; }
// Make sure this task has not been aborted, if it has this will automatically stop execution.
tokio::task::yield_now().await;
let is_playing = {
let pbi = pbi.read().await;
if let Some(pbi) = &*pbi {
pbi.is_playing
} else {
false
}
};
if !is_playing {
info!("Player is not playing, disconnecting");
instance
.disconnect_with_message(
"The player has been inactive for too long, and has been disconnected.",
)
.await;
}
}));
} }
}
// Get the playback info for the current track impl Drop for InnerSpoticordSession {
pub async fn get_playback_info(&self) -> Option<PlaybackInfo> { fn drop(&mut self) {
self.playback_info.read().await.clone() trace!("Dropping inner session");
}
// Get the current owner of this session
pub async fn get_owner(&self) -> Option<UserId> {
let owner = self.owner.read().await;
*owner
}
// Get the server id this session is playing in
pub fn get_guild_id(&self) -> GuildId {
self.guild_id
}
// Get the channel id this session is playing in
pub fn get_channel_id(&self) -> ChannelId {
self.channel_id
} }
} }
@ -654,11 +695,16 @@ impl EventHandler for SpoticordSession {
EventContext::ClientDisconnect(who) => { EventContext::ClientDisconnect(who) => {
trace!("Client disconnected, {}", who.user_id.to_string()); trace!("Client disconnected, {}", who.user_id.to_string());
if let Some(session) = self.session_manager.find(UserId(who.user_id.0)).await { if let Some(session) = self
if session.get_guild_id() == self.guild_id && session.get_channel_id() == self.channel_id .session_manager()
.await
.find(UserId(who.user_id.0))
.await
{
if session.guild_id().await == self.guild_id().await
&& session.channel_id().await == self.channel_id().await
{ {
// Clone because haha immutable references self.player_stopped().await;
self.clone().player_stopped().await;
} }
} }
} }

View File

@ -30,7 +30,7 @@ impl PlaybackInfo {
} }
/// Update position, duration and playback state /// Update position, duration and playback state
pub async fn update_pos_dur(&mut self, position_ms: u32, duration_ms: u32, is_playing: bool) { pub fn update_pos_dur(&mut self, position_ms: u32, duration_ms: u32, is_playing: bool) {
self.position_ms = position_ms; self.position_ms = position_ms;
self.duration_ms = duration_ms; self.duration_ms = duration_ms;
self.is_playing = is_playing; self.is_playing = is_playing;