Bring back 5 min timeout

main
DaXcess 2023-09-19 21:42:28 +02:00
parent 2cebeb41ab
commit 8508883320
No known key found for this signature in database
GPG Key ID: CF78CC72F0FD5EAD
2 changed files with 82 additions and 19 deletions

View File

@ -12,7 +12,7 @@ use librespot::{
playback::{ playback::{
config::{Bitrate, PlayerConfig, VolumeCtrl}, config::{Bitrate, PlayerConfig, VolumeCtrl},
mixer::{self, MixerConfig}, mixer::{self, MixerConfig},
player::{Player as SpotifyPlayer, PlayerEvent}, player::{Player as SpotifyPlayer, PlayerEvent as SpotifyEvent},
}, },
protocol::metadata::{Episode, Track}, protocol::metadata::{Episode, Track},
}; };
@ -33,7 +33,7 @@ use crate::{
}; };
enum Event { enum Event {
Player(PlayerEvent), Player(SpotifyEvent),
Sink(SinkEvent), Sink(SinkEvent),
Command(PlayerCommand), Command(PlayerCommand),
} }
@ -44,6 +44,14 @@ enum PlayerCommand {
Previous, Previous,
Pause, Pause,
Play, Play,
Shutdown,
}
#[derive(Clone, Debug)]
pub enum PlayerEvent {
Pause,
Play,
Stopped,
} }
#[derive(Clone)] #[derive(Clone)]
@ -59,7 +67,7 @@ impl Player {
token: &str, token: &str,
device_name: &str, device_name: &str,
track: TrackHandle, track: TrackHandle,
) -> Result<Self> { ) -> Result<(Self, Receiver<PlayerEvent>)> {
let username = utils::spotify::get_username(token).await?; let username = utils::spotify::get_username(token).await?;
let player_config = PlayerConfig { let player_config = PlayerConfig {
@ -107,6 +115,7 @@ impl Player {
); );
let (tx, rx) = tokio::sync::broadcast::channel(10); let (tx, rx) = tokio::sync::broadcast::channel(10);
let (tx_ev, rx_ev) = tokio::sync::broadcast::channel(10);
let pbi = Arc::new(Mutex::new(None)); let pbi = Arc::new(Mutex::new(None));
let player_task = PlayerTask { let player_task = PlayerTask {
@ -115,6 +124,7 @@ impl Player {
rx_player, rx_player,
rx_sink, rx_sink,
rx, rx,
tx: tx_ev,
spirc, spirc,
track, track,
stream, stream,
@ -123,7 +133,7 @@ impl Player {
tokio::spawn(spirc_task); tokio::spawn(spirc_task);
tokio::spawn(player_task.run()); tokio::spawn(player_task.run());
Ok(Self { pbi, tx }) Ok((Self { pbi, tx }, rx_ev))
} }
pub fn next(&self) { pub fn next(&self) {
@ -142,6 +152,10 @@ impl Player {
self.tx.send(PlayerCommand::Play).ok(); self.tx.send(PlayerCommand::Play).ok();
} }
pub fn shutdown(&self) {
self.tx.send(PlayerCommand::Shutdown).ok();
}
pub async fn pbi(&self) -> Option<PlaybackInfo> { pub async fn pbi(&self) -> Option<PlaybackInfo> {
self.pbi.lock().await.as_ref().cloned() self.pbi.lock().await.as_ref().cloned()
} }
@ -153,9 +167,10 @@ struct PlayerTask {
spirc: Spirc, spirc: Spirc,
track: TrackHandle, track: TrackHandle,
rx_player: UnboundedReceiver<PlayerEvent>, rx_player: UnboundedReceiver<SpotifyEvent>,
rx_sink: UnboundedReceiver<SinkEvent>, rx_sink: UnboundedReceiver<SinkEvent>,
rx: Receiver<PlayerCommand>, rx: Receiver<PlayerCommand>,
tx: Sender<PlayerEvent>,
pbi: Arc<Mutex<Option<PlaybackInfo>>>, pbi: Arc<Mutex<Option<PlaybackInfo>>>,
} }
@ -172,7 +187,7 @@ impl PlayerTask {
match self.next().await { match self.next().await {
// Spotify player events // Spotify player events
Some(Event::Player(event)) => match event { Some(Event::Player(event)) => match event {
PlayerEvent::Playing { SpotifyEvent::Playing {
play_request_id: _, play_request_id: _,
track_id, track_id,
position_ms, position_ms,
@ -181,9 +196,11 @@ impl PlayerTask {
self self
.update_pbi(track_id, position_ms, duration_ms, true) .update_pbi(track_id, position_ms, duration_ms, true)
.await; .await;
self.tx.send(PlayerEvent::Play).ok();
} }
PlayerEvent::Paused { SpotifyEvent::Paused {
play_request_id: _, play_request_id: _,
track_id, track_id,
position_ms, position_ms,
@ -192,9 +209,11 @@ impl PlayerTask {
self self
.update_pbi(track_id, position_ms, duration_ms, false) .update_pbi(track_id, position_ms, duration_ms, false)
.await; .await;
self.tx.send(PlayerEvent::Pause).ok();
} }
PlayerEvent::Changed { SpotifyEvent::Changed {
old_track_id: _, old_track_id: _,
new_track_id, new_track_id,
} => { } => {
@ -207,11 +226,13 @@ impl PlayerTask {
} }
} }
PlayerEvent::Stopped { SpotifyEvent::Stopped {
play_request_id: _, play_request_id: _,
track_id: _, track_id: _,
} => { } => {
check_result(self.track.stop()); check_result(self.track.pause());
self.tx.send(PlayerEvent::Pause).ok();
} }
_ => {} _ => {}
@ -230,6 +251,8 @@ impl PlayerTask {
// This comes at a cost of more bandwidth, though opus should compress it down to almost nothing // This comes at a cost of more bandwidth, though opus should compress it down to almost nothing
// check_result(track.pause()); // check_result(track.pause());
self.tx.send(PlayerEvent::Pause).ok();
} }
}, },
@ -239,6 +262,7 @@ impl PlayerTask {
PlayerCommand::Previous => self.spirc.prev(), PlayerCommand::Previous => self.spirc.prev(),
PlayerCommand::Pause => self.spirc.pause(), PlayerCommand::Pause => self.spirc.pause(),
PlayerCommand::Play => self.spirc.play(), PlayerCommand::Play => self.spirc.play(),
PlayerCommand::Shutdown => break,
}, },
None => { None => {
@ -248,6 +272,8 @@ impl PlayerTask {
} }
} }
} }
self.tx.send(PlayerEvent::Stopped).ok();
} }
async fn next(&mut self) -> Option<Event> { async fn next(&mut self) -> Option<Event> {

View File

@ -9,7 +9,7 @@ use crate::{
audio::stream::Stream, audio::stream::Stream,
consts::DISCONNECT_TIME, consts::DISCONNECT_TIME,
database::{Database, DatabaseError}, database::{Database, DatabaseError},
player::Player, player::{Player, PlayerEvent},
utils::embed::Status, utils::embed::Status,
}; };
use log::*; use log::*;
@ -221,8 +221,8 @@ impl SpoticordSession {
// Set call audio to track // Set call audio to track
call.play_only(track); call.play_only(track);
let player = match Player::create(stream, &token, &user.device_name, track_handle.clone()).await let (player, mut rx) =
{ match Player::create(stream, &token, &user.device_name, track_handle.clone()).await {
Ok(v) => v, Ok(v) => v,
Err(why) => { Err(why) => {
error!("Failed to start the player: {:?}", why); error!("Failed to start the player: {:?}", why);
@ -231,7 +231,34 @@ impl SpoticordSession {
} }
}; };
// Update inner client and track tokio::spawn({
let session = self.clone();
async move {
loop {
match rx.recv().await {
Ok(event) => match event {
PlayerEvent::Pause => session.start_disconnect_timer().await,
PlayerEvent::Play => session.stop_disconnect_timer().await,
PlayerEvent::Stopped => {
session.player_stopped().await;
break;
}
},
Err(why) => {
error!("Communication with player abruptly ended: {why}");
session.player_stopped().await;
break;
}
}
}
}
});
// Start DC timer by default, as automatic device switching is now gone
self.start_disconnect_timer().await;
let mut inner = self.acquire_write().await; let mut inner = self.acquire_write().await;
inner.track = Some(track_handle); inner.track = Some(track_handle);
inner.player = Some(player); inner.player = Some(player);
@ -254,6 +281,11 @@ impl SpoticordSession {
inner.session_manager.remove_owner(owner_id).await; inner.session_manager.remove_owner(owner_id).await;
} }
// Disconnect from Spotify
if let Some(player) = inner.player.take() {
player.shutdown();
}
// Unlock to prevent deadlock in start_disconnect_timer // Unlock to prevent deadlock in start_disconnect_timer
drop(inner); drop(inner);
@ -455,6 +487,11 @@ impl<'a> DerefMut for WriteLock<'a> {
impl InnerSpoticordSession { impl InnerSpoticordSession {
/// Internal version of disconnect, which does not abort the disconnect timer /// Internal version of disconnect, which does not abort the disconnect timer
async fn disconnect_no_abort(&mut self) { async fn disconnect_no_abort(&mut self) {
// Disconnect from Spotify
if let Some(player) = self.player.take() {
player.shutdown();
}
self.disconnected = true; self.disconnected = true;
self self
.session_manager .session_manager