Merge pull request #18 from SpoticordMusic/dev

Prepare update Spoticord to v2.1.0
main
Daniel 2023-09-20 20:18:37 +02:00 committed by GitHub
commit 9c8222b38a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 1391 additions and 2207 deletions

1
.github/FUNDING.yml vendored
View File

@ -1 +0,0 @@
patreon: rodabafilms

View File

@ -28,7 +28,7 @@ jobs:
uses: docker/build-push-action@v2
with:
context: .
file: ./Dockerfile.metrics
file: ./Dockerfile
tags: |
${{ secrets.REGISTRY_URL }}/spoticord/spoticord:latest
push: ${{ github.ref == 'refs/heads/main' }}

View File

@ -72,10 +72,11 @@ cargo run
```
# Features
As of now, Spoticord has one optional feature: `metrics`. This feature enables pushing metrics about the bot, like how many servers it is in, which tracks are being played and which commands are being executed. The metrics are designed to be pushed to a [Prometheus Pushgateway](https://prometheus.io/docs/instrumenting/pushing/). If you want to enable this feature, you can do so by running the following command:
As of now, Spoticord has one optional feature: `stats`. This feature enables collecting a few statistics, total and active servers. These statistics will be sent to a redis server, where they then can be read for whatever purpose. If you want to enable this feature, you can do so by running the following command:
```sh
cargo build --release --features metrics
cargo build [--release] --features metrics
```
# MSRV

1251
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -1,6 +1,6 @@
[package]
name = "spoticord"
version = "2.0.0"
version = "2.1.0"
edition = "2021"
rust-version = "1.65.0"
@ -9,24 +9,28 @@ name = "spoticord"
path = "src/main.rs"
[features]
default = []
metrics = ["lazy_static", "prometheus"]
stats = ["redis"]
[dependencies]
anyhow = "1.0.75"
dotenv = "0.15.0"
env_logger = "0.10.0"
ipc-channel = { version = "0.16.0", features = ["async"] }
lazy_static = { version = "1.4.0", optional = true }
hex = "0.4.3"
librespot = { version = "0.4.2", default-features = false }
log = "0.4.17"
prometheus = { version = "0.13.3", optional = true, features = ["push", "process"] }
reqwest = "0.11.18"
log = "0.4.20"
protobuf = "2.28.0"
redis = { version = "0.23.3", optional = true }
reqwest = "0.11.20"
samplerate = "0.2.4"
serde = "1.0.163"
serde_json = "1.0.96"
serenity = { version = "0.11.5", features = ["framework", "cache", "standard_framework"], default-features = false }
serde = "1.0.188"
serde_json = "1.0.107"
serenity = { version = "0.11.6", features = ["framework", "cache", "standard_framework"], default-features = false }
songbird = "0.3.2"
thiserror = "1.0.40"
time = "0.3.21"
tokio = { version = "1.28.1", features = ["rt", "full"] }
zerocopy = "0.6.1"
thiserror = "1.0.48"
time = "0.3.28"
tokio = { version = "1.32.0", features = ["rt", "full"] }
zerocopy = "0.7.5"
[profile.release]
opt-level = 3
lto = true

View File

@ -7,7 +7,9 @@ WORKDIR /app
RUN apt-get update && apt-get install -y cmake
COPY . .
RUN cargo install --path .
# Remove `--features stats` if you want to deploy without stats collection
RUN cargo install --path . --features stats
# Runtime
FROM debian:buster-slim

View File

@ -1,23 +0,0 @@
# Builder
FROM rust:1.65-buster as builder
WORKDIR /app
# Add extra build dependencies here
RUN apt-get update && apt-get install -y cmake
COPY . .
RUN cargo install --path . --features metrics
# Runtime
FROM debian:buster-slim
WORKDIR /app
# Add extra runtime dependencies here
RUN apt-get update && apt-get install -y openssl ca-certificates && rm -rf /var/lib/apt/lists/*
# Copy spoticord binary from builder
COPY --from=builder /usr/local/cargo/bin/spoticord ./spoticord
CMD ["./spoticord"]

View File

@ -16,7 +16,7 @@ Spoticord uses environment variables to configure itself. The following variable
Additionally you can configure the following variables:
- `GUILD_ID`: The ID of the Discord server where this bot will create commands for. This is used during testing to prevent the bot from creating slash commands in other servers, as well as getting the commands quicker. This variable is optional, and if not set, the bot will create commands in all servers it is in (this may take up to 15 minutes).
- `METRICS_URL`: The connection URL of a Prometheus Push Gateway server used for pushing metrics. This variable is required when compiling with the `metrics` feature.
- `KV_URL`: The connection URL of a redis-server instance used for storing realtime data. This variable is required when compiling with the `stats` feature.
#### Providing environment variables
You can provide environment variables in a `.env` file at the root of the working directory of Spoticord.

View File

@ -1,88 +0,0 @@
use librespot::playback::audio_backend::{Sink, SinkAsBytes, SinkError, SinkResult};
use librespot::playback::convert::Converter;
use librespot::playback::decoder::AudioPacket;
use log::error;
use std::io::{Stdout, Write};
use crate::ipc;
use crate::ipc::packet::IpcPacket;
pub struct StdoutSink {
client: ipc::Client,
output: Option<Box<Stdout>>,
}
impl StdoutSink {
pub fn new(client: ipc::Client) -> Self {
StdoutSink {
client,
output: None,
}
}
}
impl Sink for StdoutSink {
fn start(&mut self) -> SinkResult<()> {
if let Err(why) = self.client.send(IpcPacket::StartPlayback) {
error!("Failed to send start playback packet: {}", why);
return Err(SinkError::ConnectionRefused(why.to_string()));
}
self.output.get_or_insert(Box::new(std::io::stdout()));
Ok(())
}
fn stop(&mut self) -> SinkResult<()> {
if let Err(why) = self.client.send(IpcPacket::StopPlayback) {
error!("Failed to send stop playback packet: {}", why);
return Err(SinkError::ConnectionRefused(why.to_string()));
}
self
.output
.take()
.ok_or_else(|| SinkError::NotConnected("StdoutSink is not connected".to_string()))?
.flush()
.map_err(|why| SinkError::OnWrite(why.to_string()))?;
Ok(())
}
fn write(&mut self, packet: AudioPacket, converter: &mut Converter) -> SinkResult<()> {
use zerocopy::AsBytes;
if let AudioPacket::Samples(samples) = packet {
let samples_f32: &[f32] = &converter.f64_to_f32(&samples);
let resampled = samplerate::convert(
44100,
48000,
2,
samplerate::ConverterType::Linear,
samples_f32,
)
.expect("to succeed");
let samples_i16 =
&converter.f64_to_s16(&resampled.iter().map(|v| *v as f64).collect::<Vec<f64>>());
self.write_bytes(samples_i16.as_bytes())?;
}
Ok(())
}
}
impl SinkAsBytes for StdoutSink {
fn write_bytes(&mut self, data: &[u8]) -> SinkResult<()> {
self
.output
.as_deref_mut()
.ok_or_else(|| SinkError::NotConnected("StdoutSink is not connected".to_string()))?
.write_all(data)
.map_err(|why| SinkError::OnWrite(why.to_string()))?;
Ok(())
}
}

View File

@ -1 +1,86 @@
pub mod backend;
pub mod stream;
use self::stream::Stream;
use librespot::playback::audio_backend::{Sink, SinkAsBytes, SinkError, SinkResult};
use librespot::playback::convert::Converter;
use librespot::playback::decoder::AudioPacket;
use log::error;
use std::io::Write;
use tokio::sync::mpsc::UnboundedSender;
pub enum SinkEvent {
Start,
Stop,
}
pub struct StreamSink {
stream: Stream,
sender: UnboundedSender<SinkEvent>,
}
impl StreamSink {
pub fn new(stream: Stream, sender: UnboundedSender<SinkEvent>) -> Self {
Self { stream, sender }
}
}
impl Sink for StreamSink {
fn start(&mut self) -> SinkResult<()> {
if let Err(why) = self.sender.send(SinkEvent::Start) {
// WARNING: Returning an error causes librespot-playback to exit the process with status 1
error!("Failed to send start playback event: {why}");
return Err(SinkError::ConnectionRefused(why.to_string()));
}
Ok(())
}
fn stop(&mut self) -> SinkResult<()> {
if let Err(why) = self.sender.send(SinkEvent::Stop) {
// WARNING: Returning an error causes librespot-playback to exit the process with status 1
error!("Failed to send start playback event: {why}");
return Err(SinkError::ConnectionRefused(why.to_string()));
}
self.stream.flush().ok();
Ok(())
}
fn write(&mut self, packet: AudioPacket, converter: &mut Converter) -> SinkResult<()> {
use zerocopy::AsBytes;
let AudioPacket::Samples(samples) = packet else { return Ok(()); };
let samples_f32: &[f32] = &converter.f64_to_f32(&samples);
let resampled = samplerate::convert(
44100,
48000,
2,
samplerate::ConverterType::Linear,
samples_f32,
)
.expect("to succeed");
let samples_i16 =
&converter.f64_to_s16(&resampled.iter().map(|v| *v as f64).collect::<Vec<f64>>());
self.write_bytes(samples_i16.as_bytes())?;
Ok(())
}
}
impl SinkAsBytes for StreamSink {
fn write_bytes(&mut self, data: &[u8]) -> SinkResult<()> {
self
.stream
.write_all(data)
.map_err(|why| SinkError::OnWrite(why.to_string()))?;
Ok(())
}
}

View File

@ -0,0 +1,89 @@
use std::{
io::{Read, Seek, Write},
sync::{Arc, Condvar, Mutex},
};
use songbird::input::reader::MediaSource;
/// The lower the value, the less latency
///
/// Too low of a value results in unpredictable audio
const MAX_SIZE: usize = 32 * 1024;
#[derive(Clone)]
pub struct Stream {
inner: Arc<(Mutex<Vec<u8>>, Condvar)>,
}
impl Stream {
pub fn new() -> Self {
Self {
inner: Arc::new((Mutex::new(Vec::new()), Condvar::new())),
}
}
}
impl Read for Stream {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
let (mutex, condvar) = &*self.inner;
let mut buffer = mutex.lock().expect("Mutex was poisoned");
// Prevent Discord jitter by filling buffer with zeroes if we don't have any audio
// (i.e. when you skip too far ahead in a song which hasn't been downloaded yet)
if buffer.is_empty() {
buf.fill(0);
condvar.notify_all();
return Ok(buf.len());
}
let max_read = usize::min(buf.len(), buffer.len());
buf[0..max_read].copy_from_slice(&buffer[0..max_read]);
buffer.drain(0..max_read);
condvar.notify_all();
Ok(max_read)
}
}
impl Write for Stream {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
let (mutex, condvar) = &*self.inner;
let mut buffer = mutex.lock().expect("Mutex was poisoned");
while buffer.len() + buf.len() > MAX_SIZE {
buffer = condvar.wait(buffer).expect("Mutex was poisoned");
}
buffer.extend_from_slice(buf);
condvar.notify_all();
Ok(buf.len())
}
fn flush(&mut self) -> std::io::Result<()> {
let (mutex, condvar) = &*self.inner;
let mut buffer = mutex.lock().expect("Mutex was poisoned");
buffer.clear();
condvar.notify_all();
Ok(())
}
}
impl Seek for Stream {
fn seek(&mut self, _: std::io::SeekFrom) -> std::io::Result<u64> {
Ok(0)
}
}
impl MediaSource for Stream {
fn byte_len(&self) -> Option<u64> {
None
}
fn is_seekable(&self) -> bool {
false
}
}

View File

@ -25,7 +25,7 @@ pub fn command(ctx: Context, command: ApplicationCommandInteraction) -> CommandO
author
.name("Maintained by: RoDaBaFilms")
.url("https://rodabafilms.com/")
.icon_url("https://rodabafilms.com/logo_2021_nobg.png")
.icon_url("https://cdn.discordapp.com/avatars/389786424142200835/6bfe3840b0aa6a1baf432bb251b70c9f.webp?size=128")
})
.description(format!("Current version: {}\n\nSpoticord is open source, check out [our GitHub](https://github.com/SpoticordMusic)", VERSION))
.color(Status::Info as u64)

View File

@ -338,7 +338,7 @@ pub fn command(ctx: Context, command: ApplicationCommandInteraction) -> CommandO
.title("Connected to voice channel")
.icon_url("https://spoticord.com/speaker.png")
.description(format!("Come listen along in <#{}>", channel_id))
.footer("Spotify will automatically start playing on Spoticord")
.footer("You must manually go to Spotify and select your device")
.status(Status::Info)
.build(),
)

View File

@ -1,6 +1,6 @@
use std::time::Duration;
use librespot::core::spotify_id::{SpotifyAudioType, SpotifyId};
use librespot::core::spotify_id::SpotifyId;
use log::error;
use serenity::{
builder::{CreateApplicationCommand, CreateButton, CreateComponents, CreateEmbed},
@ -82,15 +82,6 @@ pub fn command(ctx: Context, command: ApplicationCommandInteraction) -> CommandO
}
};
let spotify_id = match pbi.spotify_id {
Some(spotify_id) => spotify_id,
None => {
not_playing.await;
return;
}
};
// Get owner of session
let owner = match utils::discord::get_user(&ctx, owner).await {
Some(user) => user,
@ -119,7 +110,7 @@ pub fn command(ctx: Context, command: ApplicationCommandInteraction) -> CommandO
};
// Get metadata
let (title, description, audio_type, thumbnail) = get_metadata(spotify_id, &pbi);
let (title, description, thumbnail) = get_metadata(&pbi);
if let Err(why) = command
.create_interaction_response(&ctx.http, |response| {
@ -129,8 +120,8 @@ pub fn command(ctx: Context, command: ApplicationCommandInteraction) -> CommandO
message
.set_embed(build_playing_embed(
title,
audio_type,
spotify_id,
pbi.get_type(),
pbi.spotify_id,
description,
owner,
thumbnail,
@ -280,35 +271,24 @@ pub fn component(ctx: Context, mut interaction: MessageComponentInteraction) ->
};
// Send the desired command to the session
let success = match interaction.data.custom_id.as_str() {
match interaction.data.custom_id.as_str() {
"playing::btn_pause_play" => {
if pbi.is_playing {
session.pause().await.is_ok()
session.pause().await
} else {
session.resume().await.is_ok()
session.resume().await
}
}
"playing::btn_previous_track" => session.previous().await.is_ok(),
"playing::btn_previous_track" => session.previous().await,
"playing::btn_next_track" => session.next().await.is_ok(),
"playing::btn_next_track" => session.next().await,
_ => {
error!("Unknown custom_id: {}", interaction.data.custom_id);
false
}
};
if !success {
error_message(
"Cannot change playback state",
"An error occurred while trying to change the playback state",
)
.await;
return;
}
interaction.defer(&ctx.http).await.ok();
tokio::time::sleep(Duration::from_millis(
if interaction.data.custom_id == "playing::btn_pause_play" {
@ -420,20 +400,7 @@ async fn update_embed(interaction: &mut MessageComponentInteraction, ctx: &Conte
}
};
let spotify_id = match pbi.spotify_id {
Some(spotify_id) => spotify_id,
None => {
error_edit(
"Cannot change playback state",
"I'm currently not playing any music in this server",
)
.await;
return;
}
};
let (title, description, audio_type, thumbnail) = get_metadata(spotify_id, &pbi);
let (title, description, thumbnail) = get_metadata(&pbi);
if let Err(why) = interaction
.message
@ -441,8 +408,8 @@ async fn update_embed(interaction: &mut MessageComponentInteraction, ctx: &Conte
message
.set_embed(build_playing_embed(
title,
audio_type,
spotify_id,
pbi.get_type(),
pbi.spotify_id,
description,
owner,
thumbnail,
@ -488,20 +455,9 @@ fn build_playing_embed(
embed
}
fn get_metadata(spotify_id: SpotifyId, pbi: &PlaybackInfo) -> (String, String, String, String) {
// Get audio type
let audio_type = if spotify_id.audio_type == SpotifyAudioType::Track {
"track"
} else {
"episode"
};
fn get_metadata(pbi: &PlaybackInfo) -> (String, String, String) {
// Create title
let title = format!(
"{} - {}",
pbi.get_artists().as_deref().unwrap_or("ID"),
pbi.get_name().as_deref().unwrap_or("ID")
);
let title = format!("{} - {}", pbi.get_artists(), pbi.get_name());
// Create description
let mut description = String::new();
@ -529,5 +485,5 @@ fn get_metadata(spotify_id: SpotifyId, pbi: &PlaybackInfo) -> (String, String, S
// Get the thumbnail image
let thumbnail = pbi.get_thumbnail_url().expect("to contain a value");
(title, description, audio_type.to_string(), thumbnail)
(title, description, thumbnail)
}

View File

@ -15,9 +15,6 @@ use serenity::{
prelude::{Context, EventHandler},
};
#[cfg(feature = "metrics")]
use crate::metrics::MetricsManager;
// If the GUILD_ID environment variable is set, only allow commands from that guild
macro_rules! enforce_guild {
($interaction:ident) => {
@ -99,12 +96,6 @@ impl Handler {
let data = ctx.data.read().await;
let command_manager = data.get::<CommandManager>().expect("to contain a value");
#[cfg(feature = "metrics")]
{
let metrics = data.get::<MetricsManager>().expect("to contain a value");
metrics.command_exec(&command.data.name);
}
command_manager.execute_command(&ctx, command).await;
}

View File

@ -1,6 +1,10 @@
#[cfg(not(debug_assertions))]
pub const VERSION: &str = env!("CARGO_PKG_VERSION");
#[cfg(debug_assertions)]
pub const VERSION: &str = concat!(env!("CARGO_PKG_VERSION"), "-dev");
pub const MOTD: &str = "some good 'ol music";
/// The time it takes for Spoticord to disconnect when no music is being played
pub const DISCONNECT_TIME: u64 = 5 * 60;

View File

@ -1,69 +0,0 @@
use std::sync::{Arc, Mutex};
use ipc_channel::ipc::{self, IpcError, IpcOneShotServer, IpcReceiver, IpcSender, TryRecvError};
use self::packet::IpcPacket;
pub mod packet;
pub struct Server {
tx: IpcOneShotServer<IpcSender<IpcPacket>>,
rx: IpcOneShotServer<IpcReceiver<IpcPacket>>,
}
impl Server {
pub fn create() -> Result<(Self, String, String), IpcError> {
let (tx, tx_name) = IpcOneShotServer::new().map_err(IpcError::Io)?;
let (rx, rx_name) = IpcOneShotServer::new().map_err(IpcError::Io)?;
Ok((Self { tx, rx }, tx_name, rx_name))
}
pub fn accept(self) -> Result<Client, IpcError> {
let (_, tx) = self.tx.accept().map_err(IpcError::Bincode)?;
let (_, rx) = self.rx.accept().map_err(IpcError::Bincode)?;
Ok(Client::new(tx, rx))
}
}
#[derive(Clone)]
pub struct Client {
tx: Arc<Mutex<IpcSender<IpcPacket>>>,
rx: Arc<Mutex<IpcReceiver<IpcPacket>>>,
}
impl Client {
pub fn new(tx: IpcSender<IpcPacket>, rx: IpcReceiver<IpcPacket>) -> Client {
Client {
tx: Arc::new(Mutex::new(tx)),
rx: Arc::new(Mutex::new(rx)),
}
}
pub fn connect(tx_name: impl Into<String>, rx_name: impl Into<String>) -> Result<Self, IpcError> {
let (tx, remote_rx) = ipc::channel().map_err(IpcError::Io)?;
let (remote_tx, rx) = ipc::channel().map_err(IpcError::Io)?;
let ttx = IpcSender::connect(tx_name.into()).map_err(IpcError::Io)?;
let trx = IpcSender::connect(rx_name.into()).map_err(IpcError::Io)?;
ttx.send(remote_tx).map_err(IpcError::Bincode)?;
trx.send(remote_rx).map_err(IpcError::Bincode)?;
Ok(Client::new(tx, rx))
}
pub fn send(&self, packet: IpcPacket) -> Result<(), IpcError> {
self
.tx
.lock()
.expect("to be able to lock")
.send(packet)
.map_err(IpcError::Bincode)
}
pub fn try_recv(&self) -> Result<IpcPacket, TryRecvError> {
self.rx.lock().expect("to be able to lock").try_recv()
}
}

View File

@ -1,46 +0,0 @@
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum IpcPacket {
/// Quit the player process
Quit,
/// Connect to Spotify with the given token and device name
Connect(String, String),
/// Disconnect from Spotify (unused)
Disconnect,
/// Unable to connect to Spotify
ConnectError(String),
/// The audio sink has started writing
StartPlayback,
/// The audio sink has stopped writing
StopPlayback,
/// The current Spotify track was changed
TrackChange(String),
/// Spotify playback was started/resumed
Playing(String, u32, u32),
/// Spotify playback was paused
Paused(String, u32, u32),
/// Sent when the user has switched their Spotify device away from Spoticord
Stopped,
/// Request the player to advance to the next track
Next,
/// Request the player to go back to the previous track
Previous,
/// Request the player to pause playback
Pause,
/// Request the player to resume playback
Resume,
}

View File

@ -6,25 +6,24 @@ use serenity::{framework::StandardFramework, prelude::GatewayIntents, Client};
use songbird::SerenityInit;
use std::{any::Any, env, process::exit};
#[cfg(feature = "metrics")]
use metrics::MetricsManager;
#[cfg(unix)]
use tokio::signal::unix::SignalKind;
#[cfg(feature = "metrics")]
mod metrics;
mod audio;
mod bot;
mod consts;
mod database;
mod ipc;
mod librespot_ext;
mod player;
mod session;
mod utils;
#[cfg(feature = "stats")]
mod stats;
#[cfg(feature = "stats")]
use crate::stats::StatsManager;
#[tokio::main]
async fn main() {
if std::env::var("RUST_LOG").is_err() {
@ -41,20 +40,6 @@ async fn main() {
env_logger::init();
let args: Vec<String> = env::args().collect();
if args.len() > 2 && &args[1] == "--player" {
// Woah! We're running in player mode!
debug!("Starting Spoticord player");
player::main().await;
debug!("Player exited, shutting down");
return;
}
info!("It's a good day");
info!(" - Spoticord {}", time::OffsetDateTime::now_utc().year());
@ -72,11 +57,10 @@ async fn main() {
let token = env::var("DISCORD_TOKEN").expect("a token in the environment");
let db_url = env::var("DATABASE_URL").expect("a database URL in the environment");
#[cfg(feature = "metrics")]
let metrics_manager = {
let metrics_url = env::var("METRICS_URL").expect("a prometheus pusher URL in the environment");
MetricsManager::new(metrics_url)
};
#[cfg(feature = "stats")]
let stats_manager =
StatsManager::new(env::var("KV_URL").expect("a redis URL in the environment"))
.expect("Failed to connect to redis");
let session_manager = SessionManager::new();
@ -97,13 +81,12 @@ async fn main() {
data.insert::<Database>(Database::new(db_url, None));
data.insert::<CommandManager>(CommandManager::new());
data.insert::<SessionManager>(session_manager.clone());
#[cfg(feature = "metrics")]
data.insert::<MetricsManager>(metrics_manager.clone());
}
let shard_manager = client.shard_manager.clone();
let _cache = client.cache_and_http.cache.clone();
#[cfg(feature = "stats")]
let cache = client.cache_and_http.cache.clone();
#[cfg(unix)]
let mut term: Option<Box<dyn Any + Send>> = Some(Box::new(
@ -119,32 +102,25 @@ async fn main() {
loop {
tokio::select! {
_ = tokio::time::sleep(std::time::Duration::from_secs(60)) => {
#[cfg(feature = "metrics")]
#[cfg(feature = "stats")]
{
let guild_count = _cache.guilds().len();
let guild_count = cache.guilds().len();
let active_count = session_manager.get_active_session_count().await;
let total_count = session_manager.get_session_count().await;
metrics_manager.set_server_count(guild_count);
metrics_manager.set_active_sessions(active_count);
metrics_manager.set_total_sessions(total_count);
if let Err(why) = stats_manager.set_server_count(guild_count) {
error!("Failed to update server count: {why}");
}
// Yes, I like to handle my s's when I'm working with amounts
debug!(
"Updated metrics: {} guild{}, {} active session{}, {} total session{}",
guild_count,
if guild_count == 1 { "" } else { "s" },
active_count,
if active_count == 1 { "" } else { "s" },
total_count,
if total_count == 1 { "" } else { "s" }
);
if let Err(why) = stats_manager.set_active_count(active_count) {
error!("Failed to update active count: {why}");
}
}
}
_ = tokio::signal::ctrl_c() => {
info!("Received interrupt signal, shutting down...");
session_manager.shutdown().await;
shard_manager.lock().await.shutdown_all().await;
break;
@ -164,11 +140,9 @@ async fn main() {
}, if term.is_some() => {
info!("Received terminate signal, shutting down...");
session_manager.shutdown().await;
shard_manager.lock().await.shutdown_all().await;
#[cfg(feature = "metrics")]
metrics_manager.stop();
break;
}
}

View File

@ -1,114 +0,0 @@
use std::{
collections::hash_map::RandomState,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
thread,
time::Duration,
};
use lazy_static::lazy_static;
use prometheus::{
opts, push_metrics, register_int_counter_vec, register_int_gauge, IntCounterVec, IntGauge,
};
use serenity::prelude::TypeMapKey;
use crate::session::pbi::PlaybackInfo;
lazy_static! {
static ref TOTAL_SERVERS: IntGauge =
register_int_gauge!("total_servers", "Total number of servers Spoticord is in").unwrap();
static ref ACTIVE_SESSIONS: IntGauge = register_int_gauge!(
"active_sessions",
"Total number of servers with an active Spoticord session"
)
.unwrap();
static ref TOTAL_SESSIONS: IntGauge = register_int_gauge!(
"total_sessions",
"Total number of servers with Spoticord in a voice channel"
)
.unwrap();
static ref TRACKS_PLAYED: IntCounterVec =
register_int_counter_vec!(opts!("tracks_played", "Tracks Played"), &["type"]).unwrap();
static ref COMMANDS_EXECUTED: IntCounterVec = register_int_counter_vec!(
opts!("commands_executed", "Commands Executed"),
&["command"]
)
.unwrap();
}
#[derive(Clone)]
pub struct MetricsManager {
should_stop: Arc<AtomicBool>,
}
impl MetricsManager {
pub fn new(pusher_url: impl Into<String>) -> Self {
let instance = Self {
should_stop: Arc::new(AtomicBool::new(false)),
};
thread::spawn({
let instance = instance.clone();
let pusher_url = pusher_url.into();
move || loop {
thread::sleep(Duration::from_secs(5));
if instance.should_stop() {
break;
}
if let Err(why) = push_metrics::<RandomState>(
"spoticord_metrics",
Default::default(),
&pusher_url,
prometheus::gather(),
None,
) {
log::error!("Failed to push metrics: {}", why);
}
}
});
instance
}
pub fn should_stop(&self) -> bool {
self.should_stop.load(Ordering::Relaxed)
}
pub fn stop(&self) {
self.should_stop.store(true, Ordering::Relaxed);
}
pub fn set_server_count(&self, count: usize) {
TOTAL_SERVERS.set(count as i64);
}
pub fn set_total_sessions(&self, count: usize) {
TOTAL_SESSIONS.set(count as i64);
}
pub fn set_active_sessions(&self, count: usize) {
ACTIVE_SESSIONS.set(count as i64);
}
pub fn track_play(&self, track: &PlaybackInfo) {
let track_type = match track.get_type() {
Some(track_type) => track_type,
None => return,
};
TRACKS_PLAYED.with_label_values(&[&track_type]).inc();
}
pub fn command_exec(&self, command: &str) {
COMMANDS_EXECUTED.with_label_values(&[command]).inc();
}
}
impl TypeMapKey for MetricsManager {
type Value = MetricsManager;
}

View File

@ -1,328 +0,0 @@
use std::{process::exit, time::Duration};
use ipc_channel::ipc::{IpcError, TryRecvError};
use librespot::{
connect::spirc::Spirc,
core::{
config::{ConnectConfig, SessionConfig},
session::Session,
},
discovery::Credentials,
playback::{
config::{Bitrate, PlayerConfig},
mixer::{self, MixerConfig},
player::{Player, PlayerEvent},
},
};
use log::{debug, error, warn};
use serde_json::json;
use crate::{
audio::backend::StdoutSink,
ipc::{self, packet::IpcPacket},
librespot_ext::discovery::CredentialsExt,
utils,
};
pub struct SpoticordPlayer {
client: ipc::Client,
session: Option<Session>,
spirc: Option<Spirc>,
}
impl SpoticordPlayer {
pub fn new(client: ipc::Client) -> Self {
Self {
client,
session: None,
spirc: None,
}
}
pub async fn start(&mut self, token: impl Into<String>, device_name: impl Into<String>) {
let token = token.into();
// Get the username (required for librespot)
let username = utils::spotify::get_username(&token)
.await
.expect("to get the username");
let session_config = SessionConfig::default();
let player_config = PlayerConfig {
bitrate: Bitrate::Bitrate96,
..PlayerConfig::default()
};
// Log in using the token
let credentials = Credentials::with_token(username, &token);
// Shutdown old session (cannot be done in the stop function)
if let Some(session) = self.session.take() {
session.shutdown();
}
// Connect the session
let (session, _) = match Session::connect(session_config, credentials, None, false).await {
Ok((session, credentials)) => (session, credentials),
Err(why) => {
error!("Failed to create Spotify session: {}", why);
self
.client
.send(IpcPacket::ConnectError(why.to_string()))
.ok();
return;
}
};
// Store session for later use
self.session = Some(session.clone());
// Volume mixer
let mixer = (mixer::find(Some("softvol")).expect("to exist"))(MixerConfig {
volume_ctrl: librespot::playback::config::VolumeCtrl::Linear,
..MixerConfig::default()
});
let client = self.client.clone();
// Create the player
let (player, mut receiver) = Player::new(
player_config,
session.clone(),
mixer.get_soft_volume(),
move || Box::new(StdoutSink::new(client)),
);
let (spirc, spirc_task) = Spirc::new(
ConnectConfig {
name: device_name.into(),
// 50%
initial_volume: Some(65535 / 2),
..ConnectConfig::default()
},
session.clone(),
player,
mixer,
);
let device_id = session.device_id().to_owned();
let ipc = self.client.clone();
// IPC Handler
tokio::spawn(async move {
let client = reqwest::Client::new();
let mut retries = 10;
// Try to switch to the device
loop {
match client
.put("https://api.spotify.com/v1/me/player")
.bearer_auth(token.clone())
.json(&json!({
"device_ids": [device_id],
}))
.send()
.await
{
Ok(resp) => {
if resp.status() == 202 {
debug!("Successfully switched to device");
break;
}
retries -= 1;
if retries == 0 {
error!("Failed to switch to device");
ipc
.send(IpcPacket::ConnectError(
"Switch to Spoticord device timed out".to_string(),
))
.ok();
break;
}
}
Err(why) => {
error!("Failed to set device: {}", why);
ipc.send(IpcPacket::ConnectError(why.to_string())).ok();
break;
}
}
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
// Do IPC stuff with these events
loop {
let event = match receiver.recv().await {
Some(event) => event,
None => break,
};
match event {
PlayerEvent::Playing {
play_request_id: _,
track_id,
position_ms,
duration_ms,
} => {
if let Err(why) = ipc.send(IpcPacket::Playing(
track_id.to_uri().expect("to not fail"),
position_ms,
duration_ms,
)) {
error!("Failed to send playing packet: {}", why);
}
}
PlayerEvent::Paused {
play_request_id: _,
track_id,
position_ms,
duration_ms,
} => {
if let Err(why) = ipc.send(IpcPacket::Paused(
track_id.to_uri().expect("to not fail"),
position_ms,
duration_ms,
)) {
error!("Failed to send paused packet: {}", why);
}
}
PlayerEvent::Changed {
old_track_id: _,
new_track_id,
} => {
if let Err(why) = ipc.send(IpcPacket::TrackChange(
new_track_id.to_uri().expect("to not fail"),
)) {
error!("Failed to send track change packet: {}", why);
}
}
PlayerEvent::Stopped {
play_request_id: _,
track_id: _,
} => {
if let Err(why) = ipc.send(IpcPacket::Stopped) {
error!("Failed to send player stopped packet: {}", why);
}
}
_ => {}
};
}
debug!("Player stopped");
});
self.spirc = Some(spirc);
tokio::spawn(spirc_task);
}
pub fn next(&mut self) {
if let Some(spirc) = &self.spirc {
spirc.next();
}
}
pub fn previous(&mut self) {
if let Some(spirc) = &self.spirc {
spirc.prev();
}
}
pub fn pause(&mut self) {
if let Some(spirc) = &self.spirc {
spirc.pause();
}
}
pub fn resume(&mut self) {
if let Some(spirc) = &self.spirc {
spirc.play();
}
}
pub fn stop(&mut self) {
if let Some(spirc) = self.spirc.take() {
spirc.shutdown();
}
}
}
pub async fn main() {
let args = std::env::args().collect::<Vec<String>>();
let tx_name = args[2].clone();
let rx_name = args[3].clone();
// Create IPC communication channel
let client = ipc::Client::connect(tx_name, rx_name).expect("Failed to connect to IPC");
// Create the player
let mut player = SpoticordPlayer::new(client.clone());
loop {
let message = match client.try_recv() {
Ok(message) => message,
Err(why) => {
if let TryRecvError::Empty = why {
// No message, wait a bit and try again
tokio::time::sleep(Duration::from_millis(25)).await;
continue;
} else if let TryRecvError::IpcError(IpcError::Disconnected) = &why {
debug!("IPC connection closed, goodbye");
break;
}
error!("Failed to receive message: {}", why);
break;
}
};
match message {
IpcPacket::Connect(token, device_name) => {
debug!("Connecting to Spotify with device name {}", device_name);
player.start(token, device_name).await;
}
IpcPacket::Disconnect => {
debug!("Disconnecting from Spotify");
player.stop();
}
IpcPacket::Next => {
player.next();
}
IpcPacket::Previous => {
player.previous();
}
IpcPacket::Pause => {
player.pause();
}
IpcPacket::Resume => {
player.resume();
}
IpcPacket::Quit => {
debug!("Received quit packet, exiting");
exit(0);
}
_ => {
warn!("Received unknown packet: {:?}", message);
}
}
}
}

406
src/player/mod.rs 100644
View File

@ -0,0 +1,406 @@
use std::{io::Write, sync::Arc};
use anyhow::{anyhow, Result};
use librespot::{
connect::spirc::Spirc,
core::{
config::{ConnectConfig, SessionConfig},
session::Session,
spotify_id::{SpotifyAudioType, SpotifyId},
},
discovery::Credentials,
playback::{
config::{Bitrate, PlayerConfig, VolumeCtrl},
mixer::{self, MixerConfig},
player::{Player as SpotifyPlayer, PlayerEvent as SpotifyEvent},
},
protocol::metadata::{Episode, Track},
};
use log::error;
use protobuf::Message;
use songbird::tracks::TrackHandle;
use tokio::sync::{
broadcast::{Receiver, Sender},
mpsc::UnboundedReceiver,
Mutex,
};
use crate::{
audio::{stream::Stream, SinkEvent, StreamSink},
librespot_ext::discovery::CredentialsExt,
session::pbi::{CurrentTrack, PlaybackInfo},
utils,
};
enum Event {
Player(SpotifyEvent),
Sink(SinkEvent),
Command(PlayerCommand),
}
#[derive(Clone)]
enum PlayerCommand {
Next,
Previous,
Pause,
Play,
Shutdown,
}
#[derive(Clone, Debug)]
pub enum PlayerEvent {
Pause,
Play,
Stopped,
}
#[derive(Clone)]
pub struct Player {
tx: Sender<PlayerCommand>,
pbi: Arc<Mutex<Option<PlaybackInfo>>>,
}
impl Player {
pub async fn create(
stream: Stream,
token: &str,
device_name: &str,
track: TrackHandle,
) -> Result<(Self, Receiver<PlayerEvent>)> {
let username = utils::spotify::get_username(token).await?;
let player_config = PlayerConfig {
bitrate: Bitrate::Bitrate96,
..Default::default()
};
let credentials = Credentials::with_token(username, token);
let (session, _) = Session::connect(
SessionConfig {
ap_port: Some(9999), // Force the use of ap.spotify.com, which has the lowest latency
..Default::default()
},
credentials,
None,
false,
)
.await?;
let mixer = (mixer::find(Some("softvol")).expect("to exist"))(MixerConfig {
volume_ctrl: VolumeCtrl::Linear,
..Default::default()
});
let (tx, rx_sink) = tokio::sync::mpsc::unbounded_channel();
let (player, rx_player) =
SpotifyPlayer::new(player_config, session.clone(), mixer.get_soft_volume(), {
let stream = stream.clone();
move || Box::new(StreamSink::new(stream, tx))
});
let (spirc, spirc_task) = Spirc::new(
ConnectConfig {
name: device_name.into(),
// 50%
initial_volume: Some(65535 / 2),
// Default Spotify behaviour
autoplay: true,
..Default::default()
},
session.clone(),
player,
mixer,
);
let (tx, rx) = tokio::sync::broadcast::channel(10);
let (tx_ev, rx_ev) = tokio::sync::broadcast::channel(10);
let pbi = Arc::new(Mutex::new(None));
let player_task = PlayerTask {
pbi: pbi.clone(),
session: session.clone(),
rx_player,
rx_sink,
rx,
tx: tx_ev,
spirc,
track,
stream,
};
tokio::spawn(spirc_task);
tokio::spawn(player_task.run());
Ok((Self { pbi, tx }, rx_ev))
}
pub fn next(&self) {
self.tx.send(PlayerCommand::Next).ok();
}
pub fn prev(&self) {
self.tx.send(PlayerCommand::Previous).ok();
}
pub fn pause(&self) {
self.tx.send(PlayerCommand::Pause).ok();
}
pub fn play(&self) {
self.tx.send(PlayerCommand::Play).ok();
}
pub fn shutdown(&self) {
self.tx.send(PlayerCommand::Shutdown).ok();
}
pub async fn pbi(&self) -> Option<PlaybackInfo> {
self.pbi.lock().await.as_ref().cloned()
}
}
struct PlayerTask {
stream: Stream,
session: Session,
spirc: Spirc,
track: TrackHandle,
rx_player: UnboundedReceiver<SpotifyEvent>,
rx_sink: UnboundedReceiver<SinkEvent>,
rx: Receiver<PlayerCommand>,
tx: Sender<PlayerEvent>,
pbi: Arc<Mutex<Option<PlaybackInfo>>>,
}
impl PlayerTask {
pub async fn run(mut self) {
let check_result = |result| {
if let Err(why) = result {
error!("Failed to issue track command: {:?}", why);
}
};
loop {
match self.next().await {
// Spotify player events
Some(Event::Player(event)) => match event {
SpotifyEvent::Playing {
play_request_id: _,
track_id,
position_ms,
duration_ms,
} => {
self
.update_pbi(track_id, position_ms, duration_ms, true)
.await;
self.tx.send(PlayerEvent::Play).ok();
}
SpotifyEvent::Paused {
play_request_id: _,
track_id,
position_ms,
duration_ms,
} => {
self
.update_pbi(track_id, position_ms, duration_ms, false)
.await;
self.tx.send(PlayerEvent::Pause).ok();
}
SpotifyEvent::Changed {
old_track_id: _,
new_track_id,
} => {
if let Ok(current) = self.resolve_audio_info(new_track_id).await {
let mut pbi = self.pbi.lock().await;
if let Some(pbi) = pbi.as_mut() {
pbi.update_track(new_track_id, current);
}
}
}
SpotifyEvent::Stopped {
play_request_id: _,
track_id: _,
} => {
check_result(self.track.pause());
self.tx.send(PlayerEvent::Pause).ok();
}
_ => {}
},
// Audio sink events
Some(Event::Sink(event)) => match event {
SinkEvent::Start => {
check_result(self.track.play());
}
SinkEvent::Stop => {
// EXPERIMENT: It may be beneficial to *NOT* pause songbird here
// We already have a fallback if no audio is present in the buffer (write all zeroes aka silence)
// So commenting this out may help prevent a substantial portion of jitter
// This comes at a cost of more bandwidth, though opus should compress it down to almost nothing
// check_result(track.pause());
self.tx.send(PlayerEvent::Pause).ok();
}
},
// The `Player` has instructed us to do something
Some(Event::Command(command)) => match command {
PlayerCommand::Next => self.spirc.next(),
PlayerCommand::Previous => self.spirc.prev(),
PlayerCommand::Pause => self.spirc.pause(),
PlayerCommand::Play => self.spirc.play(),
PlayerCommand::Shutdown => break,
},
None => {
// One of the channels died
log::debug!("Channel died");
break;
}
}
}
self.tx.send(PlayerEvent::Stopped).ok();
}
async fn next(&mut self) -> Option<Event> {
tokio::select! {
event = self.rx_player.recv() => {
event.map(Event::Player)
}
event = self.rx_sink.recv() => {
event.map(Event::Sink)
}
command = self.rx.recv() => {
command.ok().map(Event::Command)
}
}
}
/// Update current playback info, or return early if not necessary
async fn update_pbi(
&self,
spotify_id: SpotifyId,
position_ms: u32,
duration_ms: u32,
playing: bool,
) {
let mut pbi = self.pbi.lock().await;
if let Some(pbi) = pbi.as_mut() {
pbi.update_pos_dur(position_ms, duration_ms, playing);
}
if !pbi
.as_ref()
.map(|pbi| pbi.spotify_id == spotify_id)
.unwrap_or(true)
{
return;
}
if let Ok(current) = self.resolve_audio_info(spotify_id).await {
match pbi.as_mut() {
Some(pbi) => {
pbi.update_track(spotify_id, current);
pbi.update_pos_dur(position_ms, duration_ms, true);
}
None => {
*pbi = Some(PlaybackInfo::new(
duration_ms,
position_ms,
true,
current,
spotify_id,
));
}
}
} else {
log::error!("Failed to resolve audio info");
}
}
/// Retrieve the metadata for a `SpotifyId`
async fn resolve_audio_info(&self, spotify_id: SpotifyId) -> Result<CurrentTrack> {
match spotify_id.audio_type {
SpotifyAudioType::Track => self.resolve_track_info(spotify_id).await,
SpotifyAudioType::Podcast => self.resolve_episode_info(spotify_id).await,
SpotifyAudioType::NonPlayable => Err(anyhow!("Cannot resolve non-playable audio type")),
}
}
/// Retrieve the metadata for a Spotify Track
async fn resolve_track_info(&self, spotify_id: SpotifyId) -> Result<CurrentTrack> {
let result = self
.session
.mercury()
.get(format!("hm://metadata/3/track/{}", spotify_id.to_base16()?))
.await
.map_err(|_| anyhow!("Mercury metadata request failed"))?;
if result.status_code != 200 {
return Err(anyhow!("Mercury metadata request invalid status code"));
}
let message = match result.payload.get(0) {
Some(v) => v,
None => return Err(anyhow!("Mercury metadata request invalid payload")),
};
let proto_track = Track::parse_from_bytes(message)?;
Ok(CurrentTrack::Track(proto_track))
}
/// Retrieve the metadata for a Spotify Podcast
async fn resolve_episode_info(&self, spotify_id: SpotifyId) -> Result<CurrentTrack> {
let result = self
.session
.mercury()
.get(format!(
"hm://metadata/3/episode/{}",
spotify_id.to_base16()?
))
.await
.map_err(|_| anyhow!("Mercury metadata request failed"))?;
if result.status_code != 200 {
return Err(anyhow!("Mercury metadata request invalid status code"));
}
let message = match result.payload.get(0) {
Some(v) => v,
None => return Err(anyhow!("Mercury metadata request invalid payload")),
};
let proto_episode = Episode::parse_from_bytes(message)?;
Ok(CurrentTrack::Episode(proto_episode))
}
}
impl Drop for PlayerTask {
fn drop(&mut self) {
log::trace!("drop PlayerTask");
self.track.stop().ok();
self.spirc.shutdown();
self.session.shutdown();
self.stream.flush().ok();
}
}

View File

@ -25,8 +25,8 @@ pub enum SessionCreateError {
#[error("Failed to join voice channel {0} ({1})")]
JoinError(ChannelId, GuildId),
#[error("Failed to start player process")]
ForkError,
#[error("Failed to start the player")]
PlayerStartError,
}
#[derive(Clone)]
@ -115,6 +115,10 @@ impl InnerSessionManager {
count
}
pub fn sessions(&self) -> Vec<SpoticordSession> {
self.sessions.values().cloned().collect()
}
}
impl SessionManager {
@ -183,4 +187,13 @@ impl SessionManager {
pub async fn get_active_session_count(&self) -> usize {
self.0.read().await.get_active_session_count().await
}
/// Tell all sessions to instantly shut down
pub async fn shutdown(&self) {
let sessions = self.0.read().await.sessions();
for session in sessions {
session.disconnect().await;
}
}
}

View File

@ -6,13 +6,12 @@ use self::{
pbi::PlaybackInfo,
};
use crate::{
audio::stream::Stream,
consts::DISCONNECT_TIME,
database::{Database, DatabaseError},
ipc::{self, packet::IpcPacket, Client},
utils::{embed::Status, spotify},
player::{Player, PlayerEvent},
utils::embed::Status,
};
use ipc_channel::ipc::{IpcError, TryRecvError};
use librespot::core::spotify_id::{SpotifyAudioType, SpotifyId};
use log::*;
use serenity::{
async_trait,
@ -22,19 +21,16 @@ use serenity::{
};
use songbird::{
create_player,
input::{children_to_reader, Codec, Container, Input},
input::{Codec, Container, Input, Reader},
tracks::TrackHandle,
Call, Event, EventContext, EventHandler,
};
use std::{
process::{Command, Stdio},
ops::{Deref, DerefMut},
sync::Arc,
time::Duration,
};
use tokio::sync::Mutex;
#[cfg(feature = "metrics")]
use crate::metrics::MetricsManager;
use tokio::sync::{Mutex, RwLockReadGuard, RwLockWriteGuard};
#[derive(Clone)]
pub struct SpoticordSession(Arc<RwLock<InnerSpoticordSession>>);
@ -51,19 +47,13 @@ struct InnerSpoticordSession {
call: Arc<Mutex<Call>>,
track: Option<TrackHandle>,
playback_info: Option<PlaybackInfo>,
player: Option<Player>,
disconnect_handle: Option<tokio::task::JoinHandle<()>>,
client: Option<Client>,
/// Whether the session has been disconnected
/// If this is true then this instance should no longer be used and dropped
disconnected: bool,
#[cfg(feature = "metrics")]
metrics: MetricsManager,
}
impl SpoticordSession {
@ -81,12 +71,6 @@ impl SpoticordSession {
.expect("to contain a value")
.clone();
#[cfg(feature = "metrics")]
let metrics = data
.get::<MetricsManager>()
.expect("to contain a value")
.clone();
// Join the voice channel
let songbird = songbird::get(ctx).await.expect("to be present").clone();
@ -106,17 +90,12 @@ impl SpoticordSession {
session_manager: session_manager.clone(),
call: call.clone(),
track: None,
playback_info: None,
player: None,
disconnect_handle: None,
client: None,
disconnected: false,
#[cfg(feature = "metrics")]
metrics,
};
let mut instance = Self(Arc::new(RwLock::new(inner)));
instance.create_player(ctx).await?;
let mut call = call.lock().await;
@ -148,14 +127,13 @@ impl SpoticordSession {
.clone();
{
let mut inner = self.0.write().await;
let mut inner = self.acquire_write().await;
inner.owner = Some(owner_id);
}
{
let inner = self.0.clone();
let inner = inner.read().await;
session_manager.set_owner(owner_id, inner.guild_id).await;
let guild_id = self.acquire_read().await.guild_id;
session_manager.set_owner(owner_id, guild_id).await;
}
// Create the player
@ -165,39 +143,31 @@ impl SpoticordSession {
}
/// Advance to the next track
pub async fn next(&mut self) -> Result<(), IpcError> {
if let Some(ref client) = self.0.read().await.client {
return client.send(IpcPacket::Next);
pub async fn next(&mut self) {
if let Some(ref player) = self.acquire_read().await.player {
player.next();
}
Ok(())
}
/// Rewind to the previous track
pub async fn previous(&mut self) -> Result<(), IpcError> {
if let Some(ref client) = self.0.read().await.client {
return client.send(IpcPacket::Previous);
pub async fn previous(&mut self) {
if let Some(ref player) = self.acquire_read().await.player {
player.prev();
}
Ok(())
}
/// Pause the current track
pub async fn pause(&mut self) -> Result<(), IpcError> {
if let Some(ref client) = self.0.read().await.client {
return client.send(IpcPacket::Pause);
pub async fn pause(&mut self) {
if let Some(ref player) = self.acquire_read().await.player {
player.pause();
}
Ok(())
}
/// Resume the current track
pub async fn resume(&mut self) -> Result<(), IpcError> {
if let Some(ref client) = self.0.read().await.client {
return client.send(IpcPacket::Resume);
pub async fn resume(&mut self) {
if let Some(ref player) = self.acquire_read().await.player {
player.play();
}
Ok(())
}
async fn create_player(&mut self, ctx: &Context) -> Result<(), SessionCreateError> {
@ -232,52 +202,17 @@ impl SpoticordSession {
}
};
// Create IPC oneshot server
let (server, tx_name, rx_name) = match ipc::Server::create() {
Ok(server) => server,
Err(why) => {
error!("Failed to create IPC server: {:?}", why);
return Err(SessionCreateError::ForkError);
}
};
// Spawn player process
let child =
match Command::new(std::env::current_exe().expect("to know the current executable path"))
.args([
"--player",
&tx_name,
&rx_name,
"--debug-guild-id",
&self.guild_id().await.to_string(),
])
.stdout(Stdio::piped())
.stderr(Stdio::inherit())
.spawn()
{
Ok(child) => child,
Err(why) => {
error!("Failed to start player process: {:?}", why);
return Err(SessionCreateError::ForkError);
}
};
// Establish bi-directional IPC channel
let client = match server.accept() {
Ok(client) => client,
Err(why) => {
error!("Failed to accept IPC connection: {:?}", why);
return Err(SessionCreateError::ForkError);
}
};
// Pipe player audio to the voice channel
let reader = children_to_reader::<f32>(vec![child]);
// Create stream
let stream = Stream::new();
// Create track (paused, fixes audio glitches)
let (mut track, track_handle) =
create_player(Input::new(true, reader, Codec::Pcm, Container::Raw, None));
let (mut track, track_handle) = create_player(Input::new(
true,
Reader::Extension(Box::new(stream.clone())),
Codec::Pcm,
Container::Raw,
None,
));
track.pause();
let call = self.call().await;
@ -286,281 +221,56 @@ impl SpoticordSession {
// Set call audio to track
call.play_only(track);
// Handle IPC packets
// This will automatically quit once the IPC connection is closed
let (player, mut rx) =
match Player::create(stream, &token, &user.device_name, track_handle.clone()).await {
Ok(v) => v,
Err(why) => {
error!("Failed to start the player: {:?}", why);
return Err(SessionCreateError::PlayerStartError);
}
};
tokio::spawn({
let track = track_handle.clone();
let client = client.clone();
let ctx = ctx.clone();
let instance = self.clone();
let inner = self.0.clone();
let session = self.clone();
async move {
let check_result = |result| {
if let Err(why) = result {
error!("Failed to issue track command: {:?}", why);
}
};
loop {
// Required for IpcPacket::TrackChange to work
tokio::task::yield_now().await;
// Check if the session has been disconnected
let disconnected = {
let inner = inner.read().await;
inner.disconnected
};
if disconnected {
break;
}
let msg = match client.try_recv() {
Ok(msg) => msg,
Err(why) => {
if let TryRecvError::Empty = why {
// No message, wait a bit and try again
tokio::time::sleep(Duration::from_millis(25)).await;
continue;
} else if let TryRecvError::IpcError(IpcError::Disconnected) = &why {
trace!("IPC connection closed, exiting IPC handler");
match rx.recv().await {
Ok(event) => match event {
PlayerEvent::Pause => session.start_disconnect_timer().await,
PlayerEvent::Play => session.stop_disconnect_timer().await,
PlayerEvent::Stopped => {
session.player_stopped().await;
break;
}
error!("Failed to receive IPC message: {:?}", why);
break;
}
};
match msg {
// Session connect error
IpcPacket::ConnectError(why) => {
error!("Failed to connect to Spotify: {:?}", why);
// Notify the user in the text channel
if let Err(why) = instance
.text_channel_id()
.await
.send_message(&instance.http().await, |message| {
message.embed(|embed| {
embed.title("Failed to connect to Spotify");
embed.description(why);
embed.footer(|footer| footer.text("Please try again"));
embed.color(Status::Error as u64);
embed
});
message
})
.await
{
error!("Failed to send error message: {:?}", why);
}
},
Err(why) => {
error!("Communication with player abruptly ended: {why}");
session.player_stopped().await;
break;
}
// Sink requests playback to start/resume
IpcPacket::StartPlayback => {
check_result(track.play());
}
// Sink requests playback to pause
IpcPacket::StopPlayback => {
check_result(track.pause());
}
// A new track has been set by the player
IpcPacket::TrackChange(track) => {
// Convert to SpotifyId
let track_id = SpotifyId::from_uri(&track).expect("to be a valid uri");
let instance = instance.clone();
let ctx = ctx.clone();
// Fetch track info
// This is done in a separate task to avoid blocking the IPC handler
tokio::spawn(async move {
if let Err(why) = instance.update_track(&ctx, &owner_id, track_id).await {
error!("Failed to update track: {:?}", why);
instance.player_stopped().await;
}
});
}
// The player has started playing a track
IpcPacket::Playing(track, position_ms, duration_ms) => {
// Convert to SpotifyId
let track_id = SpotifyId::from_uri(&track).expect("to be a valid uri");
let was_none = instance
.update_playback(duration_ms, position_ms, true)
.await;
if was_none {
// Stop player if update track fails
if let Err(why) = instance.update_track(&ctx, &owner_id, track_id).await {
error!("Failed to update track: {:?}", why);
instance.player_stopped().await;
return;
}
}
}
IpcPacket::Paused(track, position_ms, duration_ms) => {
instance.start_disconnect_timer().await;
// Convert to SpotifyId
let track_id = SpotifyId::from_uri(&track).expect("to be a valid uri");
let was_none = instance
.update_playback(duration_ms, position_ms, false)
.await;
if was_none {
// Stop player if update track fails
if let Err(why) = instance.update_track(&ctx, &owner_id, track_id).await {
error!("Failed to update track: {:?}", why);
instance.player_stopped().await;
return;
}
}
}
IpcPacket::Stopped => {
check_result(track.pause());
{
let mut inner = inner.write().await;
inner.playback_info.take();
}
instance.start_disconnect_timer().await;
}
// Ignore other packets
_ => {}
}
}
// Clean up session
if !inner.read().await.disconnected {
instance.player_stopped().await;
}
}
});
// Inform the player process to connect to Spotify
if let Err(why) = client.send(IpcPacket::Connect(token, user.device_name)) {
error!("Failed to send IpcPacket::Connect packet: {:?}", why);
}
// Start DC timer by default, as automatic device switching is now gone
self.start_disconnect_timer().await;
// Update inner client and track
let mut inner = self.0.write().await;
let mut inner = self.acquire_write().await;
inner.track = Some(track_handle);
inner.client = Some(client);
Ok(())
}
/// Update current track
async fn update_track(
&self,
ctx: &Context,
owner_id: &UserId,
spotify_id: SpotifyId,
) -> Result<(), String> {
let should_update = {
let pbi = self.playback_info().await;
if let Some(pbi) = pbi {
pbi.spotify_id.is_none() || pbi.spotify_id != Some(spotify_id)
} else {
false
}
};
if !should_update {
return Ok(());
}
let data = ctx.data.read().await;
let database = data.get::<Database>().expect("to contain a value");
let token = match database.get_access_token(&owner_id.to_string()).await {
Ok(token) => token,
Err(why) => {
error!("Failed to get access token: {:?}", why);
return Err("Failed to get access token".to_string());
}
};
let mut track: Option<spotify::Track> = None;
let mut episode: Option<spotify::Episode> = None;
if spotify_id.audio_type == SpotifyAudioType::Track {
let track_info = match spotify::get_track_info(&token, spotify_id).await {
Ok(track) => track,
Err(why) => {
error!("Failed to get track info: {:?}", why);
return Err("Failed to get track info".to_string());
}
};
trace!("Received track info: {:?}", track_info);
track = Some(track_info);
} else if spotify_id.audio_type == SpotifyAudioType::Podcast {
let episode_info = match spotify::get_episode_info(&token, spotify_id).await {
Ok(episode) => episode,
Err(why) => {
error!("Failed to get episode info: {:?}", why);
return Err("Failed to get episode info".to_string());
}
};
trace!("Received episode info: {:?}", episode_info);
episode = Some(episode_info);
}
// Update track/episode
let mut inner = self.0.write().await;
if let Some(pbi) = inner.playback_info.as_mut() {
pbi.update_track_episode(spotify_id, track, episode);
}
// Send track play event to metrics
#[cfg(feature = "metrics")]
{
if let Some(ref pbi) = inner.playback_info {
inner.metrics.track_play(pbi);
}
}
inner.player = Some(player);
Ok(())
}
/// Called when the player must stop, but not leave the call
async fn player_stopped(&self) {
let mut inner = self.0.write().await;
if let Some(client) = inner.client.take() {
// Ask player to quit (will cause defunct process)
if let Err(why) = client.send(IpcPacket::Quit) {
error!("Failed to send quit packet: {:?}", why);
}
}
let mut inner = self.acquire_write().await;
if let Some(track) = inner.track.take() {
// Stop the playback, and freeing the child handle, removing the defunct process
if let Err(why) = track.stop() {
error!("Failed to stop track: {:?}", why);
}
@ -571,8 +281,10 @@ impl SpoticordSession {
inner.session_manager.remove_owner(owner_id).await;
}
// Clear playback info
inner.playback_info = None;
// Disconnect from Spotify
if let Some(player) = inner.player.take() {
player.shutdown();
}
// Unlock to prevent deadlock in start_disconnect_timer
drop(inner);
@ -594,60 +306,27 @@ impl SpoticordSession {
// read lock to read the current owner.
// This would deadlock if we have an active write lock
{
let mut inner = self.0.write().await;
let mut inner = self.acquire_write().await;
inner.disconnect_no_abort().await;
}
// Stop the disconnect timer, if one is running
let mut inner = self.0.write().await;
if let Some(handle) = inner.disconnect_handle.take() {
handle.abort();
}
}
// Update playback info (duration, position, playing state)
async fn update_playback(&self, duration_ms: u32, position_ms: u32, playing: bool) -> bool {
let is_none = {
let pbi = self.playback_info().await;
pbi.is_none()
};
let mut inner = self.0.write().await;
if is_none {
inner.playback_info = Some(PlaybackInfo::new(duration_ms, position_ms, playing));
} else {
// Update position, duration and playback state
inner
.playback_info
.as_mut()
.expect("to contain a value")
.update_pos_dur(position_ms, duration_ms, playing);
};
is_none
self.stop_disconnect_timer().await;
}
/// Start the disconnect timer, which will disconnect the bot from the voice channel after a
/// certain amount of time
async fn start_disconnect_timer(&self) {
let inner_arc = self.0.clone();
let mut inner = inner_arc.write().await;
self.stop_disconnect_timer().await;
let mut inner = self.acquire_write().await;
// Check if we are already disconnected
if inner.disconnected {
return;
}
// Abort the previous timer, if one is running
if let Some(handle) = inner.disconnect_handle.take() {
handle.abort();
}
inner.disconnect_handle = Some(tokio::spawn({
let inner = inner_arc.clone();
let instance = self.clone();
let session = self.clone();
async move {
let mut timer = tokio::time::interval(Duration::from_secs(DISCONNECT_TIME));
@ -659,19 +338,15 @@ impl SpoticordSession {
// Make sure this task has not been aborted, if it has this will automatically stop execution.
tokio::task::yield_now().await;
let is_playing = {
let inner = inner.read().await;
if let Some(ref pbi) = inner.playback_info {
pbi.is_playing
} else {
false
}
};
let is_playing = session
.playback_info()
.await
.map(|pbi| pbi.is_playing)
.unwrap_or(false);
if !is_playing {
info!("Player is not playing, disconnecting");
instance
session
.disconnect_with_message(
"The player has been inactive for too long, and has been disconnected.",
)
@ -681,9 +356,18 @@ impl SpoticordSession {
}));
}
/// Stop the disconnect timer (if one is running)
async fn stop_disconnect_timer(&self) {
let mut inner = self.acquire_write().await;
if let Some(handle) = inner.disconnect_handle.take() {
handle.abort();
}
}
/// Disconnect from the VC and send a message to the text channel
pub async fn disconnect_with_message(&self, content: &str) {
{
let mut inner = self.0.write().await;
let mut inner = self.acquire_write().await;
// Firstly we disconnect
inner.disconnect_no_abort().await;
@ -707,58 +391,107 @@ impl SpoticordSession {
}
// Finally we stop and remove the disconnect timer
let mut inner = self.0.write().await;
// Stop the disconnect timer, if one is running
if let Some(handle) = inner.disconnect_handle.take() {
handle.abort();
}
self.stop_disconnect_timer().await;
}
/* Inner getters */
/// Get the owner
pub async fn owner(&self) -> Option<UserId> {
self.0.read().await.owner
self.acquire_read().await.owner
}
/// Get the session manager
pub async fn session_manager(&self) -> SessionManager {
self.0.read().await.session_manager.clone()
self.acquire_read().await.session_manager.clone()
}
/// Get the guild id
pub async fn guild_id(&self) -> GuildId {
self.0.read().await.guild_id
self.acquire_read().await.guild_id
}
/// Get the channel id
pub async fn channel_id(&self) -> ChannelId {
self.0.read().await.channel_id
self.acquire_read().await.channel_id
}
/// Get the channel id
#[allow(dead_code)]
pub async fn text_channel_id(&self) -> ChannelId {
self.0.read().await.text_channel_id
self.acquire_read().await.text_channel_id
}
/// Get the playback info
pub async fn playback_info(&self) -> Option<PlaybackInfo> {
self.0.read().await.playback_info.clone()
let handle = self.acquire_read().await;
let player = handle.player.as_ref()?;
player.pbi().await
}
pub async fn call(&self) -> Arc<Mutex<Call>> {
self.0.read().await.call.clone()
self.acquire_read().await.call.clone()
}
#[allow(dead_code)]
pub async fn http(&self) -> Arc<Http> {
self.0.read().await.http.clone()
self.acquire_read().await.http.clone()
}
async fn acquire_read(&self) -> ReadLock {
ReadLock(self.0.read().await)
}
async fn acquire_write(&self) -> WriteLock {
WriteLock(self.0.write().await)
}
}
struct ReadLock<'a>(RwLockReadGuard<'a, InnerSpoticordSession>);
impl<'a> Deref for ReadLock<'a> {
type Target = RwLockReadGuard<'a, InnerSpoticordSession>;
#[inline]
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<'a> DerefMut for ReadLock<'a> {
#[inline]
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
struct WriteLock<'a>(RwLockWriteGuard<'a, InnerSpoticordSession>);
impl<'a> Deref for WriteLock<'a> {
type Target = RwLockWriteGuard<'a, InnerSpoticordSession>;
#[inline]
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<'a> DerefMut for WriteLock<'a> {
#[inline]
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl InnerSpoticordSession {
/// Internal version of disconnect, which does not abort the disconnect timer
async fn disconnect_no_abort(&mut self) {
// Disconnect from Spotify
if let Some(player) = self.player.take() {
player.shutdown();
}
self.disconnected = true;
self
.session_manager
@ -767,15 +500,7 @@ impl InnerSpoticordSession {
let mut call = self.call.lock().await;
if let Some(client) = self.client.take() {
// Ask player to quit (will cause defunct process)
if let Err(why) = client.send(IpcPacket::Quit) {
error!("Failed to send quit packet: {:?}", why);
}
}
if let Some(track) = self.track.take() {
// Stop the playback, and freeing the child handle, removing the defunct process
if let Err(why) = track.stop() {
error!("Failed to stop track: {:?}", why);
}
@ -789,12 +514,6 @@ impl InnerSpoticordSession {
}
}
impl Drop for InnerSpoticordSession {
fn drop(&mut self) {
trace!("Dropping inner session");
}
}
#[async_trait]
impl EventHandler for SpoticordSession {
async fn act(&self, ctx: &EventContext<'_>) -> Option<Event> {

View File

@ -1,28 +1,41 @@
use librespot::core::spotify_id::SpotifyId;
use librespot::{
core::spotify_id::SpotifyId,
protocol::metadata::{Episode, Track},
};
use crate::utils::{self, spotify};
use crate::utils;
#[derive(Clone)]
pub struct PlaybackInfo {
last_updated: u128,
position_ms: u32,
pub track: Option<spotify::Track>,
pub episode: Option<spotify::Episode>,
pub spotify_id: Option<SpotifyId>,
pub track: CurrentTrack,
pub spotify_id: SpotifyId,
pub duration_ms: u32,
pub is_playing: bool,
}
#[derive(Clone)]
pub enum CurrentTrack {
Track(Track),
Episode(Episode),
}
impl PlaybackInfo {
/// Create a new instance of PlaybackInfo
pub fn new(duration_ms: u32, position_ms: u32, is_playing: bool) -> Self {
pub fn new(
duration_ms: u32,
position_ms: u32,
is_playing: bool,
track: CurrentTrack,
spotify_id: SpotifyId,
) -> Self {
Self {
last_updated: utils::get_time_ms(),
track: None,
episode: None,
spotify_id: None,
track,
spotify_id,
duration_ms,
position_ms,
is_playing,
@ -39,15 +52,9 @@ impl PlaybackInfo {
}
/// Update spotify id, track and episode
pub fn update_track_episode(
&mut self,
spotify_id: SpotifyId,
track: Option<spotify::Track>,
episode: Option<spotify::Episode>,
) {
self.spotify_id = Some(spotify_id);
pub fn update_track(&mut self, spotify_id: SpotifyId, track: CurrentTrack) {
self.spotify_id = spotify_id;
self.track = track;
self.episode = episode;
}
/// Get the current playback position
@ -63,71 +70,73 @@ impl PlaybackInfo {
}
/// Get the name of the track or episode
pub fn get_name(&self) -> Option<String> {
if let Some(track) = &self.track {
Some(track.name.clone())
} else {
self.episode.as_ref().map(|episode| episode.name.clone())
pub fn get_name(&self) -> String {
match &self.track {
CurrentTrack::Track(track) => track.get_name().to_string(),
CurrentTrack::Episode(episode) => episode.get_name().to_string(),
}
}
/// Get the artist(s) or show name of the current track
pub fn get_artists(&self) -> Option<String> {
if let Some(track) = &self.track {
Some(
track
.artists
.iter()
.map(|a| a.name.clone())
.collect::<Vec<String>>()
.join(", "),
)
} else {
self
.episode
.as_ref()
.map(|episode| episode.show.name.clone())
pub fn get_artists(&self) -> String {
match &self.track {
CurrentTrack::Track(track) => track
.get_artist()
.iter()
.map(|a| a.get_name().to_string())
.collect::<Vec<_>>()
.join(", "),
CurrentTrack::Episode(episode) => episode.get_show().get_name().to_string(),
}
}
/// Get the album art url
pub fn get_thumbnail_url(&self) -> Option<String> {
if let Some(track) = &self.track {
let mut images = track.album.images.clone();
images.sort_by(|a, b| b.width.cmp(&a.width));
let file_id = match &self.track {
CurrentTrack::Track(track) => {
let mut images = track.get_album().get_cover_group().get_image().to_vec();
images.sort_by_key(|b| std::cmp::Reverse(b.get_width()));
images.get(0).as_ref().map(|image| image.url.clone())
} else if let Some(episode) = &self.episode {
let mut images = episode.show.images.clone();
images.sort_by(|a, b| b.width.cmp(&a.width));
images
.get(0)
.as_ref()
.map(|image| image.get_file_id())
.map(hex::encode)
}
CurrentTrack::Episode(episode) => {
let mut images = episode.get_covers().get_image().to_vec();
images.sort_by_key(|b| std::cmp::Reverse(b.get_width()));
images.get(0).as_ref().map(|image| image.url.clone())
} else {
None
}
images
.get(0)
.as_ref()
.map(|image| image.get_file_id())
.map(hex::encode)
}
};
file_id.map(|id| format!("https://i.scdn.co/image/{id}"))
}
/// Get the type of audio (track or episode)
#[allow(dead_code)]
pub fn get_type(&self) -> Option<String> {
if self.track.is_some() {
Some("track".into())
} else if self.episode.is_some() {
Some("episode".into())
} else {
None
pub fn get_type(&self) -> String {
match &self.track {
CurrentTrack::Track(_) => "track".to_string(),
CurrentTrack::Episode(_) => "episode".to_string(),
}
}
/// Get the public facing url of the track or episode
#[allow(dead_code)]
pub fn get_url(&self) -> Option<&str> {
if let Some(ref track) = self.track {
Some(track.external_urls.spotify.as_str())
} else if let Some(ref episode) = self.episode {
Some(episode.external_urls.spotify.as_str())
} else {
None
match &self.track {
CurrentTrack::Track(track) => track
.get_external_id()
.iter()
.find(|id| id.get_typ() == "spotify")
.map(|v| v.get_id()),
CurrentTrack::Episode(episode) => Some(episode.get_external_url()),
}
}
}

26
src/stats.rs 100644
View File

@ -0,0 +1,26 @@
use redis::{Client, Commands, RedisResult as Result};
#[derive(Clone)]
pub struct StatsManager {
redis: Client,
}
impl StatsManager {
pub fn new(url: impl AsRef<str>) -> Result<Self> {
let redis = Client::open(url.as_ref())?;
Ok(StatsManager { redis })
}
pub fn set_server_count(&self, count: usize) -> Result<()> {
let mut con = self.redis.get_connection()?;
con.set("sc-bot-total-servers", count.to_string())
}
pub fn set_active_count(&self, count: usize) -> Result<()> {
let mut con = self.redis.get_connection()?;
con.set("sc-bot-active-servers", count.to_string())
}
}

View File

@ -1,55 +1,8 @@
use std::error::Error;
use librespot::core::spotify_id::SpotifyId;
use anyhow::{anyhow, Result};
use log::{error, trace};
use serde::Deserialize;
use serde_json::Value;
#[derive(Debug, Clone, Deserialize)]
pub struct Artist {
pub name: String,
}
#[derive(Debug, Clone, Deserialize)]
pub struct Image {
pub url: String,
pub height: u32,
pub width: u32,
}
#[derive(Debug, Clone, Deserialize)]
pub struct Album {
pub name: String,
pub images: Vec<Image>,
}
#[derive(Debug, Clone, Deserialize)]
pub struct ExternalUrls {
pub spotify: String,
}
#[derive(Debug, Clone, Deserialize)]
pub struct Track {
pub name: String,
pub artists: Vec<Artist>,
pub album: Album,
pub external_urls: ExternalUrls,
}
#[derive(Debug, Clone, Deserialize)]
pub struct Show {
pub name: String,
pub images: Vec<Image>,
}
#[derive(Debug, Clone, Deserialize)]
pub struct Episode {
pub name: String,
pub show: Show,
pub external_urls: ExternalUrls,
}
pub async fn get_username(token: impl Into<String>) -> Result<String, String> {
pub async fn get_username(token: impl Into<String>) -> Result<String> {
let token = token.into();
let client = reqwest::Client::new();
@ -65,7 +18,7 @@ pub async fn get_username(token: impl Into<String>) -> Result<String, String> {
Ok(response) => response,
Err(why) => {
error!("Failed to get username: {}", why);
return Err(format!("{}", why));
return Err(why.into());
}
};
@ -76,7 +29,7 @@ pub async fn get_username(token: impl Into<String>) -> Result<String, String> {
if response.status() != 200 {
error!("Failed to get username: {}", response.status());
return Err(format!(
return Err(anyhow!(
"Failed to get track info: Invalid status code: {}",
response.status()
));
@ -86,7 +39,7 @@ pub async fn get_username(token: impl Into<String>) -> Result<String, String> {
Ok(body) => body,
Err(why) => {
error!("Failed to parse body: {}", why);
return Err(format!("{}", why));
return Err(why.into());
}
};
@ -96,82 +49,6 @@ pub async fn get_username(token: impl Into<String>) -> Result<String, String> {
}
error!("Missing 'id' field in body: {:#?}", body);
return Err("Failed to parse body: Invalid body received".to_string());
}
}
pub async fn get_track_info(
token: impl Into<String>,
track: SpotifyId,
) -> Result<Track, Box<dyn Error>> {
let token = token.into();
let client = reqwest::Client::new();
let mut retries = 3;
loop {
let response = client
.get(format!(
"https://api.spotify.com/v1/tracks/{}",
track.to_base62()?
))
.bearer_auth(&token)
.send()
.await?;
if response.status().as_u16() >= 500 && retries > 0 {
retries -= 1;
continue;
}
if response.status() != 200 {
return Err(
format!(
"Failed to get track info: Invalid status code: {}",
response.status()
)
.into(),
);
}
return Ok(response.json().await?);
}
}
pub async fn get_episode_info(
token: impl Into<String>,
episode: SpotifyId,
) -> Result<Episode, Box<dyn Error>> {
let token = token.into();
let client = reqwest::Client::new();
let mut retries = 3;
loop {
let response = client
.get(format!(
"https://api.spotify.com/v1/episodes/{}",
episode.to_base62()?
))
.bearer_auth(&token)
.send()
.await?;
if response.status().as_u16() >= 500 && retries > 0 {
retries -= 1;
continue;
}
if response.status() != 200 {
return Err(
format!(
"Failed to get episode info: Invalid status code: {}",
response.status()
)
.into(),
);
}
return Ok(response.json().await?);
return Err(anyhow!("Failed to parse body: Invalid body received"));
}
}