Almost ready

main
DaXcess 2023-09-18 22:39:11 +02:00
parent 5154c220bf
commit 2e273cdcde
No known key found for this signature in database
GPG Key ID: CF78CC72F0FD5EAD
11 changed files with 189 additions and 181 deletions

1
.github/FUNDING.yml vendored
View File

@ -1 +0,0 @@
patreon: rodabafilms

27
Cargo.lock generated
View File

@ -1249,7 +1249,7 @@ dependencies = [
"shell-words",
"thiserror",
"tokio",
"zerocopy",
"zerocopy 0.6.4",
]
[[package]]
@ -2377,7 +2377,7 @@ dependencies = [
"thiserror",
"time",
"tokio",
"zerocopy",
"zerocopy 0.7.5",
]
[[package]]
@ -3137,7 +3137,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "20707b61725734c595e840fb3704378a0cd2b9c74cc9e6e20724838fc6a1e2f9"
dependencies = [
"byteorder",
"zerocopy-derive",
"zerocopy-derive 0.6.4",
]
[[package]]
name = "zerocopy"
version = "0.7.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "870cdd4b8b867698aea998d95bcc06c1d75fe566267781ee6f5ae8c9c45a3930"
dependencies = [
"byteorder",
"zerocopy-derive 0.7.5",
]
[[package]]
@ -3151,6 +3161,17 @@ dependencies = [
"syn 2.0.37",
]
[[package]]
name = "zerocopy-derive"
version = "0.7.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e9c6f95fa5657518b36c6784ba7cdd89e8bdf9a16e58266085248bfb950860c5"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.37",
]
[[package]]
name = "zeroize"
version = "1.3.0"

View File

@ -13,14 +13,19 @@ dotenv = "0.15.0"
env_logger = "0.10.0"
lazy_static = { version = "1.4.0", optional = true }
librespot = { version = "0.4.2", default-features = false }
log = "0.4.17"
reqwest = "0.11.18"
log = "0.4.20"
reqwest = "0.11.20"
samplerate = "0.2.4"
serde = "1.0.163"
serde_json = "1.0.96"
serenity = { version = "0.11.5", features = ["framework", "cache", "standard_framework"], default-features = false }
serde = "1.0.188"
serde_json = "1.0.107"
serenity = { version = "0.11.6", features = ["framework", "cache", "standard_framework"], default-features = false }
songbird = "0.3.2"
thiserror = "1.0.40"
time = "0.3.21"
tokio = { version = "1.28.1", features = ["rt", "full"] }
zerocopy = "0.6.1"
thiserror = "1.0.48"
time = "0.3.28"
tokio = { version = "1.32.0", features = ["rt", "full"] }
zerocopy = "0.7.5"
[profile.release]
opt-level = 3
lto = true
debug = true

View File

@ -1,3 +1,7 @@
pub mod stream;
use self::stream::Stream;
use librespot::playback::audio_backend::{Sink, SinkAsBytes, SinkError, SinkResult};
use librespot::playback::convert::Converter;
use librespot::playback::decoder::AudioPacket;
@ -5,8 +9,6 @@ use log::error;
use std::io::Write;
use tokio::sync::mpsc::UnboundedSender;
use crate::player::stream::Stream;
pub enum SinkEvent {
Start,
Stop,
@ -26,6 +28,8 @@ impl StreamSink {
impl Sink for StreamSink {
fn start(&mut self) -> SinkResult<()> {
if let Err(why) = self.sender.send(SinkEvent::Start) {
// WARNING: Returning an error causes librespot-playback to exit the process with status 1
error!("Failed to send start playback event: {why}");
return Err(SinkError::ConnectionRefused(why.to_string()));
}
@ -35,10 +39,14 @@ impl Sink for StreamSink {
fn stop(&mut self) -> SinkResult<()> {
if let Err(why) = self.sender.send(SinkEvent::Stop) {
// WARNING: Returning an error causes librespot-playback to exit the process with status 1
error!("Failed to send start playback event: {why}");
return Err(SinkError::ConnectionRefused(why.to_string()));
}
self.stream.flush().ok();
Ok(())
}

View File

@ -5,8 +5,10 @@ use std::{
use songbird::input::reader::MediaSource;
// TODO: Find optimal value
const MAX_SIZE: usize = 1024 * 1024;
/// The lower the value, the less latency
///
/// Too low of a value results in unpredictable audio
const MAX_SIZE: usize = 32 * 1024;
#[derive(Clone)]
pub struct Stream {
@ -26,15 +28,19 @@ impl Read for Stream {
let (mutex, condvar) = &*self.inner;
let mut buffer = mutex.lock().expect("Mutex was poisoned");
log::trace!("Read!");
// Prevent Discord jitter by filling buffer with zeroes if we don't have any audio
// (i.e. when you skip too far ahead in a song which hasn't been downloaded yet)
if buffer.is_empty() {
buf.fill(0);
condvar.notify_all();
while buffer.is_empty() {
buffer = condvar.wait(buffer).expect("Mutex was poisoned");
return Ok(buf.len());
}
let max_read = usize::min(buf.len(), buffer.len());
buf[0..max_read].copy_from_slice(&buffer[0..max_read]);
buffer.drain(0..max_read);
condvar.notify_all();
Ok(max_read)
}
@ -56,6 +62,12 @@ impl Write for Stream {
}
fn flush(&mut self) -> std::io::Result<()> {
let (mutex, condvar) = &*self.inner;
let mut buffer = mutex.lock().expect("Mutex was poisoned");
buffer.clear();
condvar.notify_all();
Ok(())
}
}

View File

@ -25,7 +25,7 @@ pub fn command(ctx: Context, command: ApplicationCommandInteraction) -> CommandO
author
.name("Maintained by: RoDaBaFilms")
.url("https://rodabafilms.com/")
.icon_url("https://rodabafilms.com/logo_2021_nobg.png")
.icon_url("https://cdn.discordapp.com/avatars/389786424142200835/6bfe3840b0aa6a1baf432bb251b70c9f.webp?size=128")
})
.description(format!("Current version: {}\n\nSpoticord is open source, check out [our GitHub](https://github.com/SpoticordMusic)", VERSION))
.color(Status::Info as u64)

View File

@ -338,7 +338,7 @@ pub fn command(ctx: Context, command: ApplicationCommandInteraction) -> CommandO
.title("Connected to voice channel")
.icon_url("https://spoticord.com/speaker.png")
.description(format!("Come listen along in <#{}>", channel_id))
.footer("Spotify will automatically start playing on Spoticord")
.footer("You must manually go to Spotify and select your device")
.status(Status::Info)
.build(),
)

View File

@ -91,6 +91,7 @@ async fn main() {
_ = tokio::signal::ctrl_c() => {
info!("Received interrupt signal, shutting down...");
session_manager.shutdown().await;
shard_manager.lock().await.shutdown_all().await;
break;
@ -110,6 +111,7 @@ async fn main() {
}, if term.is_some() => {
info!("Received terminate signal, shutting down...");
session_manager.shutdown().await;
shard_manager.lock().await.shutdown_all().await;
break;

View File

@ -1,8 +1,9 @@
pub mod stream;
use librespot::{
connect::spirc::Spirc,
core::{config::ConnectConfig, session::Session},
core::{
config::{ConnectConfig, SessionConfig},
session::Session,
},
discovery::Credentials,
playback::{
config::{Bitrate, PlayerConfig, VolumeCtrl},
@ -13,13 +14,11 @@ use librespot::{
use tokio::sync::mpsc::UnboundedReceiver;
use crate::{
audio::{SinkEvent, StreamSink},
audio::{stream::Stream, SinkEvent, StreamSink},
librespot_ext::discovery::CredentialsExt,
utils,
};
use self::stream::Stream;
pub struct Player {
stream: Stream,
session: Option<Session>,
@ -59,7 +58,16 @@ impl Player {
}
// Connect the session
let (session, _) = Session::connect(Default::default(), credentials, None, false).await?;
let (session, _) = Session::connect(
SessionConfig {
ap_port: Some(9999), // Force the use of ap.spotify.com, which has the lowest latency
..Default::default()
},
credentials,
None,
false,
)
.await?;
self.session = Some(session.clone());
let mixer = (mixer::find(Some("softvol")).expect("to exist"))(MixerConfig {

View File

@ -115,6 +115,10 @@ impl InnerSessionManager {
count
}
pub fn sessions(&self) -> Vec<SpoticordSession> {
self.sessions.values().cloned().collect()
}
}
impl SessionManager {
@ -183,4 +187,13 @@ impl SessionManager {
pub async fn get_active_session_count(&self) -> usize {
self.0.read().await.get_active_session_count().await
}
/// Tell all sessions to instantly shut down
pub async fn shutdown(&self) {
let sessions = self.0.read().await.sessions();
for session in sessions {
session.disconnect().await;
}
}
}

View File

@ -30,8 +30,13 @@ use songbird::{
tracks::TrackHandle,
Call, Event, EventContext, EventHandler,
};
use std::{sync::Arc, time::Duration};
use tokio::sync::Mutex;
use std::{
io::Write,
ops::{Deref, DerefMut},
sync::Arc,
time::Duration,
};
use tokio::sync::{Mutex, RwLockReadGuard, RwLockWriteGuard};
#[derive(Clone)]
pub struct SpoticordSession(Arc<RwLock<InnerSpoticordSession>>);
@ -55,6 +60,8 @@ struct InnerSpoticordSession {
spirc: Option<Spirc>,
player: Option<Player>,
/// Whether the session has been disconnected
/// If this is true then this instance should no longer be used and dropped
disconnected: bool,
@ -97,6 +104,7 @@ impl SpoticordSession {
playback_info: None,
disconnect_handle: None,
spirc: None,
player: None,
disconnected: false,
};
@ -132,14 +140,13 @@ impl SpoticordSession {
.clone();
{
let mut inner = self.0.write().await;
let mut inner = self.acquire_write().await;
inner.owner = Some(owner_id);
}
{
let inner = self.0.clone();
let inner = inner.read().await;
session_manager.set_owner(owner_id, inner.guild_id).await;
let guild_id = self.acquire_read().await.guild_id;
session_manager.set_owner(owner_id, guild_id).await;
}
// Create the player
@ -150,28 +157,28 @@ impl SpoticordSession {
/// Advance to the next track
pub async fn next(&mut self) {
if let Some(ref spirc) = self.0.read().await.spirc {
if let Some(ref spirc) = self.acquire_read().await.spirc {
spirc.next();
}
}
/// Rewind to the previous track
pub async fn previous(&mut self) {
if let Some(ref spirc) = self.0.read().await.spirc {
if let Some(ref spirc) = self.acquire_read().await.spirc {
spirc.prev();
}
}
/// Pause the current track
pub async fn pause(&mut self) {
if let Some(ref spirc) = self.0.read().await.spirc {
if let Some(ref spirc) = self.acquire_read().await.spirc {
spirc.pause();
}
}
/// Resume the current track
pub async fn resume(&mut self) {
if let Some(ref spirc) = self.0.read().await.spirc {
if let Some(ref spirc) = self.acquire_read().await.spirc {
spirc.play();
}
}
@ -237,8 +244,7 @@ impl SpoticordSession {
}
};
// Handle IPC packets
// This will automatically quit once the IPC connection is closed
// Handle events
tokio::spawn({
let track = track_handle.clone();
let ctx = ctx.clone();
@ -367,126 +373,16 @@ impl SpoticordSession {
}
SinkEvent::Stop => {
check_result(track.pause());
// EXPERIMENT: It may be beneficial to *NOT* pause songbird here
// We already have a fallback if no audio is present in the buffer (write all zeroes aka silence)
// So commenting this out may help prevent a substantial portion of jitter
// This comes at a cost of more bandwidth, though opus should compress it down to almost nothing
// check_result(track.pause());
}
}
}
};
// match event {
// // Session connect error
// IpcPacket::ConnectError(why) => {
// error!("Failed to connect to Spotify: {:?}", why);
// // Notify the user in the text channel
// if let Err(why) = instance
// .text_channel_id()
// .await
// .send_message(&instance.http().await, |message| {
// message.embed(|embed| {
// embed.title("Failed to connect to Spotify");
// embed.description(why);
// embed.footer(|footer| footer.text("Please try again"));
// embed.color(Status::Error as u64);
// embed
// });
// message
// })
// .await
// {
// error!("Failed to send error message: {:?}", why);
// }
// break;
// }
// // Sink requests playback to start/resume
// IpcPacket::StartPlayback => {
// check_result(track.play());
// }
// // Sink requests playback to pause
// IpcPacket::StopPlayback => {
// check_result(track.pause());
// }
// // A new track has been set by the player
// IpcPacket::TrackChange(track) => {
// // Convert to SpotifyId
// let track_id = SpotifyId::from_uri(&track).expect("to be a valid uri");
// let instance = instance.clone();
// let ctx = ctx.clone();
// // Fetch track info
// // This is done in a separate task to avoid blocking the IPC handler
// tokio::spawn(async move {
// if let Err(why) = instance.update_track(&ctx, &owner_id, track_id).await {
// error!("Failed to update track: {:?}", why);
// instance.player_stopped().await;
// }
// });
// }
// // The player has started playing a track
// IpcPacket::Playing(track, position_ms, duration_ms) => {
// // Convert to SpotifyId
// let track_id = SpotifyId::from_uri(&track).expect("to be a valid uri");
// let was_none = instance
// .update_playback(duration_ms, position_ms, true)
// .await;
// if was_none {
// // Stop player if update track fails
// if let Err(why) = instance.update_track(&ctx, &owner_id, track_id).await {
// error!("Failed to update track: {:?}", why);
// instance.player_stopped().await;
// return;
// }
// }
// }
// IpcPacket::Paused(track, position_ms, duration_ms) => {
// instance.start_disconnect_timer().await;
// // Convert to SpotifyId
// let track_id = SpotifyId::from_uri(&track).expect("to be a valid uri");
// let was_none = instance
// .update_playback(duration_ms, position_ms, false)
// .await;
// if was_none {
// // Stop player if update track fails
// if let Err(why) = instance.update_track(&ctx, &owner_id, track_id).await {
// error!("Failed to update track: {:?}", why);
// instance.player_stopped().await;
// return;
// }
// }
// }
// IpcPacket::Stopped => {
// check_result(track.pause());
// {
// let mut inner = inner.write().await;
// inner.playback_info.take();
// }
// instance.start_disconnect_timer().await;
// }
// // Ignore other packets
// _ => {}
// }
}
// Clean up session
@ -497,9 +393,10 @@ impl SpoticordSession {
});
// Update inner client and track
let mut inner = self.0.write().await;
let mut inner = self.acquire_write().await;
inner.track = Some(track_handle);
inner.spirc = Some(spirc);
inner.player = Some(player);
Ok(())
}
@ -566,7 +463,7 @@ impl SpoticordSession {
}
// Update track/episode
let mut inner = self.0.write().await;
let mut inner = self.acquire_write().await;
if let Some(pbi) = inner.playback_info.as_mut() {
pbi.update_track_episode(spotify_id, track, episode);
@ -577,7 +474,7 @@ impl SpoticordSession {
/// Called when the player must stop, but not leave the call
async fn player_stopped(&self) {
let mut inner = self.0.write().await;
let mut inner = self.acquire_write().await;
if let Some(spirc) = inner.spirc.take() {
spirc.shutdown();
@ -617,7 +514,7 @@ impl SpoticordSession {
// read lock to read the current owner.
// This would deadlock if we have an active write lock
{
let mut inner = self.0.write().await;
let mut inner = self.acquire_write().await;
inner.disconnect_no_abort().await;
}
@ -633,7 +530,7 @@ impl SpoticordSession {
};
{
let mut inner = self.0.write().await;
let mut inner = self.acquire_write().await;
if is_none {
inner.playback_info = Some(PlaybackInfo::new(duration_ms, position_ms, playing));
@ -659,8 +556,8 @@ impl SpoticordSession {
async fn start_disconnect_timer(&self) {
self.stop_disconnect_timer().await;
let inner_arc = self.0.clone();
let mut inner = inner_arc.write().await;
let arc_handle = self.0.clone();
let mut inner = self.acquire_write().await;
// Check if we are already disconnected
if inner.disconnected {
@ -668,7 +565,7 @@ impl SpoticordSession {
}
inner.disconnect_handle = Some(tokio::spawn({
let inner = inner_arc.clone();
let inner = arc_handle.clone();
let instance = self.clone();
async move {
@ -705,7 +602,7 @@ impl SpoticordSession {
/// Stop the disconnect timer (if one is running)
async fn stop_disconnect_timer(&self) {
let mut inner = self.0.write().await;
let mut inner = self.acquire_write().await;
if let Some(handle) = inner.disconnect_handle.take() {
handle.abort();
}
@ -714,7 +611,7 @@ impl SpoticordSession {
/// Disconnect from the VC and send a message to the text channel
pub async fn disconnect_with_message(&self, content: &str) {
{
let mut inner = self.0.write().await;
let mut inner = self.acquire_write().await;
// Firstly we disconnect
inner.disconnect_no_abort().await;
@ -745,48 +642,97 @@ impl SpoticordSession {
/// Get the owner
pub async fn owner(&self) -> Option<UserId> {
self.0.read().await.owner
self.acquire_read().await.owner
}
/// Get the session manager
pub async fn session_manager(&self) -> SessionManager {
self.0.read().await.session_manager.clone()
self.acquire_read().await.session_manager.clone()
}
/// Get the guild id
pub async fn guild_id(&self) -> GuildId {
self.0.read().await.guild_id
self.acquire_read().await.guild_id
}
/// Get the channel id
pub async fn channel_id(&self) -> ChannelId {
self.0.read().await.channel_id
self.acquire_read().await.channel_id
}
/// Get the channel id
#[allow(dead_code)]
pub async fn text_channel_id(&self) -> ChannelId {
self.0.read().await.text_channel_id
self.acquire_read().await.text_channel_id
}
/// Get the playback info
pub async fn playback_info(&self) -> Option<PlaybackInfo> {
self.0.read().await.playback_info.clone()
self.acquire_read().await.playback_info.clone()
}
pub async fn call(&self) -> Arc<Mutex<Call>> {
self.0.read().await.call.clone()
self.acquire_read().await.call.clone()
}
#[allow(dead_code)]
pub async fn http(&self) -> Arc<Http> {
self.0.read().await.http.clone()
self.acquire_read().await.http.clone()
}
async fn acquire_read(&self) -> ReadLock {
ReadLock(self.0.read().await)
}
async fn acquire_write(&self) -> WriteLock {
WriteLock(self.0.write().await)
}
}
struct ReadLock<'a>(RwLockReadGuard<'a, InnerSpoticordSession>);
impl<'a> Deref for ReadLock<'a> {
type Target = RwLockReadGuard<'a, InnerSpoticordSession>;
#[inline]
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<'a> DerefMut for ReadLock<'a> {
#[inline]
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
struct WriteLock<'a>(RwLockWriteGuard<'a, InnerSpoticordSession>);
impl<'a> Deref for WriteLock<'a> {
type Target = RwLockWriteGuard<'a, InnerSpoticordSession>;
#[inline]
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<'a> DerefMut for WriteLock<'a> {
#[inline]
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl InnerSpoticordSession {
/// Internal version of disconnect, which does not abort the disconnect timer
async fn disconnect_no_abort(&mut self) {
// Flush stream so that it is not permanently blocking the thread
if let Some(player) = self.player.take() {
player.get_stream().flush().ok();
}
self.disconnected = true;
self
.session_manager
@ -813,12 +759,6 @@ impl InnerSpoticordSession {
}
}
impl Drop for InnerSpoticordSession {
fn drop(&mut self) {
trace!("Dropping inner session");
}
}
#[async_trait]
impl EventHandler for SpoticordSession {
async fn act(&self, ctx: &EventContext<'_>) -> Option<Event> {