Hello there I'm back
parent
7ae2184679
commit
1ffb07300f
|
@ -28,7 +28,7 @@ jobs:
|
||||||
uses: docker/build-push-action@v2
|
uses: docker/build-push-action@v2
|
||||||
with:
|
with:
|
||||||
context: .
|
context: .
|
||||||
file: ./Dockerfile.metrics
|
file: ./Dockerfile
|
||||||
tags: |
|
tags: |
|
||||||
${{ secrets.REGISTRY_URL }}/spoticord/spoticord:latest
|
${{ secrets.REGISTRY_URL }}/spoticord/spoticord:latest
|
||||||
push: ${{ github.ref == 'refs/heads/main' }}
|
push: ${{ github.ref == 'refs/heads/main' }}
|
||||||
|
|
|
@ -71,13 +71,6 @@ If you are actively developing Spoticord, you can use the following command to b
|
||||||
cargo run
|
cargo run
|
||||||
```
|
```
|
||||||
|
|
||||||
# Features
|
|
||||||
As of now, Spoticord has one optional feature: `metrics`. This feature enables pushing metrics about the bot, like how many servers it is in, which tracks are being played and which commands are being executed. The metrics are designed to be pushed to a [Prometheus Pushgateway](https://prometheus.io/docs/instrumenting/pushing/). If you want to enable this feature, you can do so by running the following command:
|
|
||||||
|
|
||||||
```sh
|
|
||||||
cargo build --release --features metrics
|
|
||||||
```
|
|
||||||
|
|
||||||
# MSRV
|
# MSRV
|
||||||
|
|
||||||
The current minimum supported rust version is `1.65.0`.
|
The current minimum supported rust version is `1.65.0`.
|
||||||
|
|
|
@ -2603,7 +2603,7 @@ dependencies = [
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "spoticord"
|
name = "spoticord"
|
||||||
version = "2.0.0"
|
version = "2.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"dotenv",
|
"dotenv",
|
||||||
"env_logger 0.10.0",
|
"env_logger 0.10.0",
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
[package]
|
[package]
|
||||||
name = "spoticord"
|
name = "spoticord"
|
||||||
version = "2.0.0"
|
version = "2.1.0"
|
||||||
edition = "2021"
|
edition = "2021"
|
||||||
rust-version = "1.65.0"
|
rust-version = "1.65.0"
|
||||||
|
|
||||||
|
@ -8,10 +8,6 @@ rust-version = "1.65.0"
|
||||||
name = "spoticord"
|
name = "spoticord"
|
||||||
path = "src/main.rs"
|
path = "src/main.rs"
|
||||||
|
|
||||||
[features]
|
|
||||||
default = []
|
|
||||||
metrics = ["lazy_static", "prometheus"]
|
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
dotenv = "0.15.0"
|
dotenv = "0.15.0"
|
||||||
env_logger = "0.10.0"
|
env_logger = "0.10.0"
|
||||||
|
|
|
@ -1,23 +0,0 @@
|
||||||
# Builder
|
|
||||||
FROM rust:1.65-buster as builder
|
|
||||||
|
|
||||||
WORKDIR /app
|
|
||||||
|
|
||||||
# Add extra build dependencies here
|
|
||||||
RUN apt-get update && apt-get install -y cmake
|
|
||||||
|
|
||||||
COPY . .
|
|
||||||
RUN cargo install --path . --features metrics
|
|
||||||
|
|
||||||
# Runtime
|
|
||||||
FROM debian:buster-slim
|
|
||||||
|
|
||||||
WORKDIR /app
|
|
||||||
|
|
||||||
# Add extra runtime dependencies here
|
|
||||||
RUN apt-get update && apt-get install -y openssl ca-certificates && rm -rf /var/lib/apt/lists/*
|
|
||||||
|
|
||||||
# Copy spoticord binary from builder
|
|
||||||
COPY --from=builder /usr/local/cargo/bin/spoticord ./spoticord
|
|
||||||
|
|
||||||
CMD ["./spoticord"]
|
|
|
@ -16,7 +16,6 @@ Spoticord uses environment variables to configure itself. The following variable
|
||||||
|
|
||||||
Additionally you can configure the following variables:
|
Additionally you can configure the following variables:
|
||||||
- `GUILD_ID`: The ID of the Discord server where this bot will create commands for. This is used during testing to prevent the bot from creating slash commands in other servers, as well as getting the commands quicker. This variable is optional, and if not set, the bot will create commands in all servers it is in (this may take up to 15 minutes).
|
- `GUILD_ID`: The ID of the Discord server where this bot will create commands for. This is used during testing to prevent the bot from creating slash commands in other servers, as well as getting the commands quicker. This variable is optional, and if not set, the bot will create commands in all servers it is in (this may take up to 15 minutes).
|
||||||
- `METRICS_URL`: The connection URL of a Prometheus Push Gateway server used for pushing metrics. This variable is required when compiling with the `metrics` feature.
|
|
||||||
|
|
||||||
#### Providing environment variables
|
#### Providing environment variables
|
||||||
You can provide environment variables in a `.env` file at the root of the working directory of Spoticord.
|
You can provide environment variables in a `.env` file at the root of the working directory of Spoticord.
|
||||||
|
|
|
@ -1,88 +0,0 @@
|
||||||
use librespot::playback::audio_backend::{Sink, SinkAsBytes, SinkError, SinkResult};
|
|
||||||
use librespot::playback::convert::Converter;
|
|
||||||
use librespot::playback::decoder::AudioPacket;
|
|
||||||
use log::error;
|
|
||||||
use std::io::{Stdout, Write};
|
|
||||||
|
|
||||||
use crate::ipc;
|
|
||||||
use crate::ipc::packet::IpcPacket;
|
|
||||||
|
|
||||||
pub struct StdoutSink {
|
|
||||||
client: ipc::Client,
|
|
||||||
output: Option<Box<Stdout>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl StdoutSink {
|
|
||||||
pub fn new(client: ipc::Client) -> Self {
|
|
||||||
StdoutSink {
|
|
||||||
client,
|
|
||||||
output: None,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Sink for StdoutSink {
|
|
||||||
fn start(&mut self) -> SinkResult<()> {
|
|
||||||
if let Err(why) = self.client.send(IpcPacket::StartPlayback) {
|
|
||||||
error!("Failed to send start playback packet: {}", why);
|
|
||||||
return Err(SinkError::ConnectionRefused(why.to_string()));
|
|
||||||
}
|
|
||||||
|
|
||||||
self.output.get_or_insert(Box::new(std::io::stdout()));
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn stop(&mut self) -> SinkResult<()> {
|
|
||||||
if let Err(why) = self.client.send(IpcPacket::StopPlayback) {
|
|
||||||
error!("Failed to send stop playback packet: {}", why);
|
|
||||||
return Err(SinkError::ConnectionRefused(why.to_string()));
|
|
||||||
}
|
|
||||||
|
|
||||||
self
|
|
||||||
.output
|
|
||||||
.take()
|
|
||||||
.ok_or_else(|| SinkError::NotConnected("StdoutSink is not connected".to_string()))?
|
|
||||||
.flush()
|
|
||||||
.map_err(|why| SinkError::OnWrite(why.to_string()))?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn write(&mut self, packet: AudioPacket, converter: &mut Converter) -> SinkResult<()> {
|
|
||||||
use zerocopy::AsBytes;
|
|
||||||
|
|
||||||
if let AudioPacket::Samples(samples) = packet {
|
|
||||||
let samples_f32: &[f32] = &converter.f64_to_f32(&samples);
|
|
||||||
|
|
||||||
let resampled = samplerate::convert(
|
|
||||||
44100,
|
|
||||||
48000,
|
|
||||||
2,
|
|
||||||
samplerate::ConverterType::Linear,
|
|
||||||
samples_f32,
|
|
||||||
)
|
|
||||||
.expect("to succeed");
|
|
||||||
|
|
||||||
let samples_i16 =
|
|
||||||
&converter.f64_to_s16(&resampled.iter().map(|v| *v as f64).collect::<Vec<f64>>());
|
|
||||||
|
|
||||||
self.write_bytes(samples_i16.as_bytes())?;
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl SinkAsBytes for StdoutSink {
|
|
||||||
fn write_bytes(&mut self, data: &[u8]) -> SinkResult<()> {
|
|
||||||
self
|
|
||||||
.output
|
|
||||||
.as_deref_mut()
|
|
||||||
.ok_or_else(|| SinkError::NotConnected("StdoutSink is not connected".to_string()))?
|
|
||||||
.write_all(data)
|
|
||||||
.map_err(|why| SinkError::OnWrite(why.to_string()))?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
161
src/audio/mod.rs
161
src/audio/mod.rs
|
@ -1 +1,160 @@
|
||||||
pub mod backend;
|
use librespot::playback::audio_backend::{Sink, SinkAsBytes, SinkError, SinkResult};
|
||||||
|
use librespot::playback::convert::Converter;
|
||||||
|
use librespot::playback::decoder::AudioPacket;
|
||||||
|
use log::error;
|
||||||
|
use std::io::{Stdout, Write};
|
||||||
|
use tokio::sync::mpsc::UnboundedSender;
|
||||||
|
|
||||||
|
use crate::ipc;
|
||||||
|
use crate::ipc::packet::IpcPacket;
|
||||||
|
use crate::player::stream::Stream;
|
||||||
|
|
||||||
|
pub struct StdoutSink {
|
||||||
|
client: ipc::Client,
|
||||||
|
output: Option<Box<Stdout>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl StdoutSink {
|
||||||
|
pub fn new(client: ipc::Client) -> Self {
|
||||||
|
StdoutSink {
|
||||||
|
client,
|
||||||
|
output: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Sink for StdoutSink {
|
||||||
|
fn start(&mut self) -> SinkResult<()> {
|
||||||
|
if let Err(why) = self.client.send(IpcPacket::StartPlayback) {
|
||||||
|
error!("Failed to send start playback packet: {}", why);
|
||||||
|
return Err(SinkError::ConnectionRefused(why.to_string()));
|
||||||
|
}
|
||||||
|
|
||||||
|
self.output.get_or_insert(Box::new(std::io::stdout()));
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn stop(&mut self) -> SinkResult<()> {
|
||||||
|
if let Err(why) = self.client.send(IpcPacket::StopPlayback) {
|
||||||
|
error!("Failed to send stop playback packet: {}", why);
|
||||||
|
return Err(SinkError::ConnectionRefused(why.to_string()));
|
||||||
|
}
|
||||||
|
|
||||||
|
self
|
||||||
|
.output
|
||||||
|
.take()
|
||||||
|
.ok_or_else(|| SinkError::NotConnected("StdoutSink is not connected".to_string()))?
|
||||||
|
.flush()
|
||||||
|
.map_err(|why| SinkError::OnWrite(why.to_string()))?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn write(&mut self, packet: AudioPacket, converter: &mut Converter) -> SinkResult<()> {
|
||||||
|
use zerocopy::AsBytes;
|
||||||
|
|
||||||
|
if let AudioPacket::Samples(samples) = packet {
|
||||||
|
let samples_f32: &[f32] = &converter.f64_to_f32(&samples);
|
||||||
|
|
||||||
|
let resampled = samplerate::convert(
|
||||||
|
44100,
|
||||||
|
48000,
|
||||||
|
2,
|
||||||
|
samplerate::ConverterType::Linear,
|
||||||
|
samples_f32,
|
||||||
|
)
|
||||||
|
.expect("to succeed");
|
||||||
|
|
||||||
|
let samples_i16 =
|
||||||
|
&converter.f64_to_s16(&resampled.iter().map(|v| *v as f64).collect::<Vec<f64>>());
|
||||||
|
|
||||||
|
self.write_bytes(samples_i16.as_bytes())?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SinkAsBytes for StdoutSink {
|
||||||
|
fn write_bytes(&mut self, data: &[u8]) -> SinkResult<()> {
|
||||||
|
self
|
||||||
|
.output
|
||||||
|
.as_deref_mut()
|
||||||
|
.ok_or_else(|| SinkError::NotConnected("StdoutSink is not connected".to_string()))?
|
||||||
|
.write_all(data)
|
||||||
|
.map_err(|why| SinkError::OnWrite(why.to_string()))?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub enum SinkEvent {
|
||||||
|
Start,
|
||||||
|
Stop,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct StreamSink {
|
||||||
|
stream: Stream,
|
||||||
|
sender: UnboundedSender<SinkEvent>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl StreamSink {
|
||||||
|
pub fn new(stream: Stream, sender: UnboundedSender<SinkEvent>) -> Self {
|
||||||
|
Self { stream, sender }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Sink for StreamSink {
|
||||||
|
fn start(&mut self) -> SinkResult<()> {
|
||||||
|
if let Err(why) = self.sender.send(SinkEvent::Start) {
|
||||||
|
error!("Failed to send start playback event: {why}");
|
||||||
|
return Err(SinkError::ConnectionRefused(why.to_string()));
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn stop(&mut self) -> SinkResult<()> {
|
||||||
|
if let Err(why) = self.sender.send(SinkEvent::Stop) {
|
||||||
|
error!("Failed to send start playback event: {why}");
|
||||||
|
return Err(SinkError::ConnectionRefused(why.to_string()));
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn write(&mut self, packet: AudioPacket, converter: &mut Converter) -> SinkResult<()> {
|
||||||
|
use zerocopy::AsBytes;
|
||||||
|
|
||||||
|
let AudioPacket::Samples(samples) = packet else { return Ok(()); };
|
||||||
|
let samples_f32: &[f32] = &converter.f64_to_f32(&samples);
|
||||||
|
|
||||||
|
let resampled = samplerate::convert(
|
||||||
|
44100,
|
||||||
|
48000,
|
||||||
|
2,
|
||||||
|
samplerate::ConverterType::Linear,
|
||||||
|
samples_f32,
|
||||||
|
)
|
||||||
|
.expect("to succeed");
|
||||||
|
|
||||||
|
let samples_i16 =
|
||||||
|
&converter.f64_to_s16(&resampled.iter().map(|v| *v as f64).collect::<Vec<f64>>());
|
||||||
|
|
||||||
|
self.write_bytes(samples_i16.as_bytes())?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SinkAsBytes for StreamSink {
|
||||||
|
fn write_bytes(&mut self, data: &[u8]) -> SinkResult<()> {
|
||||||
|
self
|
||||||
|
.stream
|
||||||
|
.write_all(data)
|
||||||
|
.map_err(|why| SinkError::OnWrite(why.to_string()))?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -280,35 +280,24 @@ pub fn component(ctx: Context, mut interaction: MessageComponentInteraction) ->
|
||||||
};
|
};
|
||||||
|
|
||||||
// Send the desired command to the session
|
// Send the desired command to the session
|
||||||
let success = match interaction.data.custom_id.as_str() {
|
match interaction.data.custom_id.as_str() {
|
||||||
"playing::btn_pause_play" => {
|
"playing::btn_pause_play" => {
|
||||||
if pbi.is_playing {
|
if pbi.is_playing {
|
||||||
session.pause().await.is_ok()
|
session.pause().await
|
||||||
} else {
|
} else {
|
||||||
session.resume().await.is_ok()
|
session.resume().await
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
"playing::btn_previous_track" => session.previous().await.is_ok(),
|
"playing::btn_previous_track" => session.previous().await,
|
||||||
|
|
||||||
"playing::btn_next_track" => session.next().await.is_ok(),
|
"playing::btn_next_track" => session.next().await,
|
||||||
|
|
||||||
_ => {
|
_ => {
|
||||||
error!("Unknown custom_id: {}", interaction.data.custom_id);
|
error!("Unknown custom_id: {}", interaction.data.custom_id);
|
||||||
false
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
if !success {
|
|
||||||
error_message(
|
|
||||||
"Cannot change playback state",
|
|
||||||
"An error occurred while trying to change the playback state",
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
interaction.defer(&ctx.http).await.ok();
|
interaction.defer(&ctx.http).await.ok();
|
||||||
tokio::time::sleep(Duration::from_millis(
|
tokio::time::sleep(Duration::from_millis(
|
||||||
if interaction.data.custom_id == "playing::btn_pause_play" {
|
if interaction.data.custom_id == "playing::btn_pause_play" {
|
||||||
|
|
|
@ -15,9 +15,6 @@ use serenity::{
|
||||||
prelude::{Context, EventHandler},
|
prelude::{Context, EventHandler},
|
||||||
};
|
};
|
||||||
|
|
||||||
#[cfg(feature = "metrics")]
|
|
||||||
use crate::metrics::MetricsManager;
|
|
||||||
|
|
||||||
// If the GUILD_ID environment variable is set, only allow commands from that guild
|
// If the GUILD_ID environment variable is set, only allow commands from that guild
|
||||||
macro_rules! enforce_guild {
|
macro_rules! enforce_guild {
|
||||||
($interaction:ident) => {
|
($interaction:ident) => {
|
||||||
|
@ -99,12 +96,6 @@ impl Handler {
|
||||||
let data = ctx.data.read().await;
|
let data = ctx.data.read().await;
|
||||||
let command_manager = data.get::<CommandManager>().expect("to contain a value");
|
let command_manager = data.get::<CommandManager>().expect("to contain a value");
|
||||||
|
|
||||||
#[cfg(feature = "metrics")]
|
|
||||||
{
|
|
||||||
let metrics = data.get::<MetricsManager>().expect("to contain a value");
|
|
||||||
metrics.command_exec(&command.data.name);
|
|
||||||
}
|
|
||||||
|
|
||||||
command_manager.execute_command(&ctx, command).await;
|
command_manager.execute_command(&ctx, command).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
56
src/main.rs
56
src/main.rs
|
@ -6,15 +6,9 @@ use serenity::{framework::StandardFramework, prelude::GatewayIntents, Client};
|
||||||
use songbird::SerenityInit;
|
use songbird::SerenityInit;
|
||||||
use std::{any::Any, env, process::exit};
|
use std::{any::Any, env, process::exit};
|
||||||
|
|
||||||
#[cfg(feature = "metrics")]
|
|
||||||
use metrics::MetricsManager;
|
|
||||||
|
|
||||||
#[cfg(unix)]
|
#[cfg(unix)]
|
||||||
use tokio::signal::unix::SignalKind;
|
use tokio::signal::unix::SignalKind;
|
||||||
|
|
||||||
#[cfg(feature = "metrics")]
|
|
||||||
mod metrics;
|
|
||||||
|
|
||||||
mod audio;
|
mod audio;
|
||||||
mod bot;
|
mod bot;
|
||||||
mod consts;
|
mod consts;
|
||||||
|
@ -41,20 +35,6 @@ async fn main() {
|
||||||
|
|
||||||
env_logger::init();
|
env_logger::init();
|
||||||
|
|
||||||
let args: Vec<String> = env::args().collect();
|
|
||||||
|
|
||||||
if args.len() > 2 && &args[1] == "--player" {
|
|
||||||
// Woah! We're running in player mode!
|
|
||||||
|
|
||||||
debug!("Starting Spoticord player");
|
|
||||||
|
|
||||||
player::main().await;
|
|
||||||
|
|
||||||
debug!("Player exited, shutting down");
|
|
||||||
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
info!("It's a good day");
|
info!("It's a good day");
|
||||||
info!(" - Spoticord {}", time::OffsetDateTime::now_utc().year());
|
info!(" - Spoticord {}", time::OffsetDateTime::now_utc().year());
|
||||||
|
|
||||||
|
@ -72,12 +52,6 @@ async fn main() {
|
||||||
let token = env::var("DISCORD_TOKEN").expect("a token in the environment");
|
let token = env::var("DISCORD_TOKEN").expect("a token in the environment");
|
||||||
let db_url = env::var("DATABASE_URL").expect("a database URL in the environment");
|
let db_url = env::var("DATABASE_URL").expect("a database URL in the environment");
|
||||||
|
|
||||||
#[cfg(feature = "metrics")]
|
|
||||||
let metrics_manager = {
|
|
||||||
let metrics_url = env::var("METRICS_URL").expect("a prometheus pusher URL in the environment");
|
|
||||||
MetricsManager::new(metrics_url)
|
|
||||||
};
|
|
||||||
|
|
||||||
let session_manager = SessionManager::new();
|
let session_manager = SessionManager::new();
|
||||||
|
|
||||||
// Create client
|
// Create client
|
||||||
|
@ -97,9 +71,6 @@ async fn main() {
|
||||||
data.insert::<Database>(Database::new(db_url, None));
|
data.insert::<Database>(Database::new(db_url, None));
|
||||||
data.insert::<CommandManager>(CommandManager::new());
|
data.insert::<CommandManager>(CommandManager::new());
|
||||||
data.insert::<SessionManager>(session_manager.clone());
|
data.insert::<SessionManager>(session_manager.clone());
|
||||||
|
|
||||||
#[cfg(feature = "metrics")]
|
|
||||||
data.insert::<MetricsManager>(metrics_manager.clone());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
let shard_manager = client.shard_manager.clone();
|
let shard_manager = client.shard_manager.clone();
|
||||||
|
@ -118,30 +89,6 @@ async fn main() {
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
_ = tokio::time::sleep(std::time::Duration::from_secs(60)) => {
|
|
||||||
#[cfg(feature = "metrics")]
|
|
||||||
{
|
|
||||||
let guild_count = _cache.guilds().len();
|
|
||||||
let active_count = session_manager.get_active_session_count().await;
|
|
||||||
let total_count = session_manager.get_session_count().await;
|
|
||||||
|
|
||||||
metrics_manager.set_server_count(guild_count);
|
|
||||||
metrics_manager.set_active_sessions(active_count);
|
|
||||||
metrics_manager.set_total_sessions(total_count);
|
|
||||||
|
|
||||||
// Yes, I like to handle my s's when I'm working with amounts
|
|
||||||
debug!(
|
|
||||||
"Updated metrics: {} guild{}, {} active session{}, {} total session{}",
|
|
||||||
guild_count,
|
|
||||||
if guild_count == 1 { "" } else { "s" },
|
|
||||||
active_count,
|
|
||||||
if active_count == 1 { "" } else { "s" },
|
|
||||||
total_count,
|
|
||||||
if total_count == 1 { "" } else { "s" }
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
_ = tokio::signal::ctrl_c() => {
|
_ = tokio::signal::ctrl_c() => {
|
||||||
info!("Received interrupt signal, shutting down...");
|
info!("Received interrupt signal, shutting down...");
|
||||||
|
|
||||||
|
@ -166,9 +113,6 @@ async fn main() {
|
||||||
|
|
||||||
shard_manager.lock().await.shutdown_all().await;
|
shard_manager.lock().await.shutdown_all().await;
|
||||||
|
|
||||||
#[cfg(feature = "metrics")]
|
|
||||||
metrics_manager.stop();
|
|
||||||
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
114
src/metrics.rs
114
src/metrics.rs
|
@ -1,114 +0,0 @@
|
||||||
use std::{
|
|
||||||
collections::hash_map::RandomState,
|
|
||||||
sync::{
|
|
||||||
atomic::{AtomicBool, Ordering},
|
|
||||||
Arc,
|
|
||||||
},
|
|
||||||
thread,
|
|
||||||
time::Duration,
|
|
||||||
};
|
|
||||||
|
|
||||||
use lazy_static::lazy_static;
|
|
||||||
use prometheus::{
|
|
||||||
opts, push_metrics, register_int_counter_vec, register_int_gauge, IntCounterVec, IntGauge,
|
|
||||||
};
|
|
||||||
use serenity::prelude::TypeMapKey;
|
|
||||||
|
|
||||||
use crate::session::pbi::PlaybackInfo;
|
|
||||||
|
|
||||||
lazy_static! {
|
|
||||||
static ref TOTAL_SERVERS: IntGauge =
|
|
||||||
register_int_gauge!("total_servers", "Total number of servers Spoticord is in").unwrap();
|
|
||||||
static ref ACTIVE_SESSIONS: IntGauge = register_int_gauge!(
|
|
||||||
"active_sessions",
|
|
||||||
"Total number of servers with an active Spoticord session"
|
|
||||||
)
|
|
||||||
.unwrap();
|
|
||||||
static ref TOTAL_SESSIONS: IntGauge = register_int_gauge!(
|
|
||||||
"total_sessions",
|
|
||||||
"Total number of servers with Spoticord in a voice channel"
|
|
||||||
)
|
|
||||||
.unwrap();
|
|
||||||
static ref TRACKS_PLAYED: IntCounterVec =
|
|
||||||
register_int_counter_vec!(opts!("tracks_played", "Tracks Played"), &["type"]).unwrap();
|
|
||||||
static ref COMMANDS_EXECUTED: IntCounterVec = register_int_counter_vec!(
|
|
||||||
opts!("commands_executed", "Commands Executed"),
|
|
||||||
&["command"]
|
|
||||||
)
|
|
||||||
.unwrap();
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
|
||||||
pub struct MetricsManager {
|
|
||||||
should_stop: Arc<AtomicBool>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl MetricsManager {
|
|
||||||
pub fn new(pusher_url: impl Into<String>) -> Self {
|
|
||||||
let instance = Self {
|
|
||||||
should_stop: Arc::new(AtomicBool::new(false)),
|
|
||||||
};
|
|
||||||
|
|
||||||
thread::spawn({
|
|
||||||
let instance = instance.clone();
|
|
||||||
let pusher_url = pusher_url.into();
|
|
||||||
|
|
||||||
move || loop {
|
|
||||||
thread::sleep(Duration::from_secs(5));
|
|
||||||
|
|
||||||
if instance.should_stop() {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Err(why) = push_metrics::<RandomState>(
|
|
||||||
"spoticord_metrics",
|
|
||||||
Default::default(),
|
|
||||||
&pusher_url,
|
|
||||||
prometheus::gather(),
|
|
||||||
None,
|
|
||||||
) {
|
|
||||||
log::error!("Failed to push metrics: {}", why);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
instance
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn should_stop(&self) -> bool {
|
|
||||||
self.should_stop.load(Ordering::Relaxed)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn stop(&self) {
|
|
||||||
self.should_stop.store(true, Ordering::Relaxed);
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn set_server_count(&self, count: usize) {
|
|
||||||
TOTAL_SERVERS.set(count as i64);
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn set_total_sessions(&self, count: usize) {
|
|
||||||
TOTAL_SESSIONS.set(count as i64);
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn set_active_sessions(&self, count: usize) {
|
|
||||||
ACTIVE_SESSIONS.set(count as i64);
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn track_play(&self, track: &PlaybackInfo) {
|
|
||||||
let track_type = match track.get_type() {
|
|
||||||
Some(track_type) => track_type,
|
|
||||||
None => return,
|
|
||||||
};
|
|
||||||
|
|
||||||
TRACKS_PLAYED.with_label_values(&[&track_type]).inc();
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn command_exec(&self, command: &str) {
|
|
||||||
COMMANDS_EXECUTED.with_label_values(&[command]).inc();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl TypeMapKey for MetricsManager {
|
|
||||||
type Value = MetricsManager;
|
|
||||||
}
|
|
328
src/player.rs
328
src/player.rs
|
@ -1,328 +0,0 @@
|
||||||
use std::{process::exit, time::Duration};
|
|
||||||
|
|
||||||
use ipc_channel::ipc::{IpcError, TryRecvError};
|
|
||||||
use librespot::{
|
|
||||||
connect::spirc::Spirc,
|
|
||||||
core::{
|
|
||||||
config::{ConnectConfig, SessionConfig},
|
|
||||||
session::Session,
|
|
||||||
},
|
|
||||||
discovery::Credentials,
|
|
||||||
playback::{
|
|
||||||
config::{Bitrate, PlayerConfig},
|
|
||||||
mixer::{self, MixerConfig},
|
|
||||||
player::{Player, PlayerEvent},
|
|
||||||
},
|
|
||||||
};
|
|
||||||
use log::{debug, error, warn};
|
|
||||||
use serde_json::json;
|
|
||||||
|
|
||||||
use crate::{
|
|
||||||
audio::backend::StdoutSink,
|
|
||||||
ipc::{self, packet::IpcPacket},
|
|
||||||
librespot_ext::discovery::CredentialsExt,
|
|
||||||
utils,
|
|
||||||
};
|
|
||||||
|
|
||||||
pub struct SpoticordPlayer {
|
|
||||||
client: ipc::Client,
|
|
||||||
session: Option<Session>,
|
|
||||||
spirc: Option<Spirc>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl SpoticordPlayer {
|
|
||||||
pub fn new(client: ipc::Client) -> Self {
|
|
||||||
Self {
|
|
||||||
client,
|
|
||||||
session: None,
|
|
||||||
spirc: None,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn start(&mut self, token: impl Into<String>, device_name: impl Into<String>) {
|
|
||||||
let token = token.into();
|
|
||||||
|
|
||||||
// Get the username (required for librespot)
|
|
||||||
let username = utils::spotify::get_username(&token)
|
|
||||||
.await
|
|
||||||
.expect("to get the username");
|
|
||||||
|
|
||||||
let session_config = SessionConfig::default();
|
|
||||||
let player_config = PlayerConfig {
|
|
||||||
bitrate: Bitrate::Bitrate96,
|
|
||||||
..PlayerConfig::default()
|
|
||||||
};
|
|
||||||
|
|
||||||
// Log in using the token
|
|
||||||
let credentials = Credentials::with_token(username, &token);
|
|
||||||
|
|
||||||
// Shutdown old session (cannot be done in the stop function)
|
|
||||||
if let Some(session) = self.session.take() {
|
|
||||||
session.shutdown();
|
|
||||||
}
|
|
||||||
|
|
||||||
// Connect the session
|
|
||||||
let (session, _) = match Session::connect(session_config, credentials, None, false).await {
|
|
||||||
Ok((session, credentials)) => (session, credentials),
|
|
||||||
Err(why) => {
|
|
||||||
error!("Failed to create Spotify session: {}", why);
|
|
||||||
|
|
||||||
self
|
|
||||||
.client
|
|
||||||
.send(IpcPacket::ConnectError(why.to_string()))
|
|
||||||
.ok();
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
// Store session for later use
|
|
||||||
self.session = Some(session.clone());
|
|
||||||
|
|
||||||
// Volume mixer
|
|
||||||
let mixer = (mixer::find(Some("softvol")).expect("to exist"))(MixerConfig {
|
|
||||||
volume_ctrl: librespot::playback::config::VolumeCtrl::Linear,
|
|
||||||
..MixerConfig::default()
|
|
||||||
});
|
|
||||||
|
|
||||||
let client = self.client.clone();
|
|
||||||
|
|
||||||
// Create the player
|
|
||||||
let (player, mut receiver) = Player::new(
|
|
||||||
player_config,
|
|
||||||
session.clone(),
|
|
||||||
mixer.get_soft_volume(),
|
|
||||||
move || Box::new(StdoutSink::new(client)),
|
|
||||||
);
|
|
||||||
|
|
||||||
let (spirc, spirc_task) = Spirc::new(
|
|
||||||
ConnectConfig {
|
|
||||||
name: device_name.into(),
|
|
||||||
// 50%
|
|
||||||
initial_volume: Some(65535 / 2),
|
|
||||||
..ConnectConfig::default()
|
|
||||||
},
|
|
||||||
session.clone(),
|
|
||||||
player,
|
|
||||||
mixer,
|
|
||||||
);
|
|
||||||
|
|
||||||
let device_id = session.device_id().to_owned();
|
|
||||||
let ipc = self.client.clone();
|
|
||||||
|
|
||||||
// IPC Handler
|
|
||||||
tokio::spawn(async move {
|
|
||||||
let client = reqwest::Client::new();
|
|
||||||
|
|
||||||
let mut retries = 10;
|
|
||||||
|
|
||||||
// Try to switch to the device
|
|
||||||
loop {
|
|
||||||
match client
|
|
||||||
.put("https://api.spotify.com/v1/me/player")
|
|
||||||
.bearer_auth(token.clone())
|
|
||||||
.json(&json!({
|
|
||||||
"device_ids": [device_id],
|
|
||||||
}))
|
|
||||||
.send()
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
Ok(resp) => {
|
|
||||||
if resp.status() == 202 {
|
|
||||||
debug!("Successfully switched to device");
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
retries -= 1;
|
|
||||||
|
|
||||||
if retries == 0 {
|
|
||||||
error!("Failed to switch to device");
|
|
||||||
ipc
|
|
||||||
.send(IpcPacket::ConnectError(
|
|
||||||
"Switch to Spoticord device timed out".to_string(),
|
|
||||||
))
|
|
||||||
.ok();
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(why) => {
|
|
||||||
error!("Failed to set device: {}", why);
|
|
||||||
ipc.send(IpcPacket::ConnectError(why.to_string())).ok();
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Do IPC stuff with these events
|
|
||||||
loop {
|
|
||||||
let event = match receiver.recv().await {
|
|
||||||
Some(event) => event,
|
|
||||||
None => break,
|
|
||||||
};
|
|
||||||
|
|
||||||
match event {
|
|
||||||
PlayerEvent::Playing {
|
|
||||||
play_request_id: _,
|
|
||||||
track_id,
|
|
||||||
position_ms,
|
|
||||||
duration_ms,
|
|
||||||
} => {
|
|
||||||
if let Err(why) = ipc.send(IpcPacket::Playing(
|
|
||||||
track_id.to_uri().expect("to not fail"),
|
|
||||||
position_ms,
|
|
||||||
duration_ms,
|
|
||||||
)) {
|
|
||||||
error!("Failed to send playing packet: {}", why);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
PlayerEvent::Paused {
|
|
||||||
play_request_id: _,
|
|
||||||
track_id,
|
|
||||||
position_ms,
|
|
||||||
duration_ms,
|
|
||||||
} => {
|
|
||||||
if let Err(why) = ipc.send(IpcPacket::Paused(
|
|
||||||
track_id.to_uri().expect("to not fail"),
|
|
||||||
position_ms,
|
|
||||||
duration_ms,
|
|
||||||
)) {
|
|
||||||
error!("Failed to send paused packet: {}", why);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
PlayerEvent::Changed {
|
|
||||||
old_track_id: _,
|
|
||||||
new_track_id,
|
|
||||||
} => {
|
|
||||||
if let Err(why) = ipc.send(IpcPacket::TrackChange(
|
|
||||||
new_track_id.to_uri().expect("to not fail"),
|
|
||||||
)) {
|
|
||||||
error!("Failed to send track change packet: {}", why);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
PlayerEvent::Stopped {
|
|
||||||
play_request_id: _,
|
|
||||||
track_id: _,
|
|
||||||
} => {
|
|
||||||
if let Err(why) = ipc.send(IpcPacket::Stopped) {
|
|
||||||
error!("Failed to send player stopped packet: {}", why);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
_ => {}
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
debug!("Player stopped");
|
|
||||||
});
|
|
||||||
|
|
||||||
self.spirc = Some(spirc);
|
|
||||||
tokio::spawn(spirc_task);
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn next(&mut self) {
|
|
||||||
if let Some(spirc) = &self.spirc {
|
|
||||||
spirc.next();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn previous(&mut self) {
|
|
||||||
if let Some(spirc) = &self.spirc {
|
|
||||||
spirc.prev();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn pause(&mut self) {
|
|
||||||
if let Some(spirc) = &self.spirc {
|
|
||||||
spirc.pause();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn resume(&mut self) {
|
|
||||||
if let Some(spirc) = &self.spirc {
|
|
||||||
spirc.play();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn stop(&mut self) {
|
|
||||||
if let Some(spirc) = self.spirc.take() {
|
|
||||||
spirc.shutdown();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn main() {
|
|
||||||
let args = std::env::args().collect::<Vec<String>>();
|
|
||||||
|
|
||||||
let tx_name = args[2].clone();
|
|
||||||
let rx_name = args[3].clone();
|
|
||||||
|
|
||||||
// Create IPC communication channel
|
|
||||||
let client = ipc::Client::connect(tx_name, rx_name).expect("Failed to connect to IPC");
|
|
||||||
|
|
||||||
// Create the player
|
|
||||||
let mut player = SpoticordPlayer::new(client.clone());
|
|
||||||
|
|
||||||
loop {
|
|
||||||
let message = match client.try_recv() {
|
|
||||||
Ok(message) => message,
|
|
||||||
Err(why) => {
|
|
||||||
if let TryRecvError::Empty = why {
|
|
||||||
// No message, wait a bit and try again
|
|
||||||
tokio::time::sleep(Duration::from_millis(25)).await;
|
|
||||||
|
|
||||||
continue;
|
|
||||||
} else if let TryRecvError::IpcError(IpcError::Disconnected) = &why {
|
|
||||||
debug!("IPC connection closed, goodbye");
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
error!("Failed to receive message: {}", why);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
match message {
|
|
||||||
IpcPacket::Connect(token, device_name) => {
|
|
||||||
debug!("Connecting to Spotify with device name {}", device_name);
|
|
||||||
|
|
||||||
player.start(token, device_name).await;
|
|
||||||
}
|
|
||||||
|
|
||||||
IpcPacket::Disconnect => {
|
|
||||||
debug!("Disconnecting from Spotify");
|
|
||||||
|
|
||||||
player.stop();
|
|
||||||
}
|
|
||||||
|
|
||||||
IpcPacket::Next => {
|
|
||||||
player.next();
|
|
||||||
}
|
|
||||||
|
|
||||||
IpcPacket::Previous => {
|
|
||||||
player.previous();
|
|
||||||
}
|
|
||||||
|
|
||||||
IpcPacket::Pause => {
|
|
||||||
player.pause();
|
|
||||||
}
|
|
||||||
|
|
||||||
IpcPacket::Resume => {
|
|
||||||
player.resume();
|
|
||||||
}
|
|
||||||
|
|
||||||
IpcPacket::Quit => {
|
|
||||||
debug!("Received quit packet, exiting");
|
|
||||||
|
|
||||||
exit(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
_ => {
|
|
||||||
warn!("Received unknown packet: {:?}", message);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -0,0 +1,99 @@
|
||||||
|
pub mod stream;
|
||||||
|
|
||||||
|
use librespot::{
|
||||||
|
connect::spirc::Spirc,
|
||||||
|
core::{config::ConnectConfig, session::Session},
|
||||||
|
discovery::Credentials,
|
||||||
|
playback::{
|
||||||
|
config::{Bitrate, PlayerConfig, VolumeCtrl},
|
||||||
|
mixer::{self, MixerConfig},
|
||||||
|
player::{Player as SpotifyPlayer, PlayerEvent},
|
||||||
|
},
|
||||||
|
};
|
||||||
|
use tokio::sync::mpsc::UnboundedReceiver;
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
audio::{SinkEvent, StreamSink},
|
||||||
|
librespot_ext::discovery::CredentialsExt,
|
||||||
|
utils,
|
||||||
|
};
|
||||||
|
|
||||||
|
use self::stream::Stream;
|
||||||
|
|
||||||
|
pub struct Player {
|
||||||
|
stream: Stream,
|
||||||
|
session: Option<Session>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Player {
|
||||||
|
pub fn create() -> Self {
|
||||||
|
Self {
|
||||||
|
stream: Stream::new(),
|
||||||
|
session: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn start(
|
||||||
|
&mut self,
|
||||||
|
token: &str,
|
||||||
|
device_name: &str,
|
||||||
|
) -> Result<
|
||||||
|
(
|
||||||
|
Spirc,
|
||||||
|
(UnboundedReceiver<PlayerEvent>, UnboundedReceiver<SinkEvent>),
|
||||||
|
),
|
||||||
|
Box<dyn std::error::Error>,
|
||||||
|
> {
|
||||||
|
let username = utils::spotify::get_username(token).await?;
|
||||||
|
|
||||||
|
let player_config = PlayerConfig {
|
||||||
|
bitrate: Bitrate::Bitrate96,
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
|
||||||
|
let credentials = Credentials::with_token(username, token);
|
||||||
|
|
||||||
|
// Shutdown old session (cannot be done in the stop function)
|
||||||
|
if let Some(session) = self.session.take() {
|
||||||
|
session.shutdown()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Connect the session
|
||||||
|
let (session, _) = Session::connect(Default::default(), credentials, None, false).await?;
|
||||||
|
self.session = Some(session.clone());
|
||||||
|
|
||||||
|
let mixer = (mixer::find(Some("softvol")).expect("to exist"))(MixerConfig {
|
||||||
|
volume_ctrl: VolumeCtrl::Linear,
|
||||||
|
..Default::default()
|
||||||
|
});
|
||||||
|
|
||||||
|
let stream = self.get_stream();
|
||||||
|
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
|
||||||
|
let (player, receiver) = SpotifyPlayer::new(
|
||||||
|
player_config,
|
||||||
|
session.clone(),
|
||||||
|
mixer.get_soft_volume(),
|
||||||
|
move || Box::new(StreamSink::new(stream, tx)),
|
||||||
|
);
|
||||||
|
|
||||||
|
let (spirc, spirc_task) = Spirc::new(
|
||||||
|
ConnectConfig {
|
||||||
|
name: device_name.into(),
|
||||||
|
// 50%
|
||||||
|
initial_volume: Some(65535 / 2),
|
||||||
|
..Default::default()
|
||||||
|
},
|
||||||
|
session.clone(),
|
||||||
|
player,
|
||||||
|
mixer,
|
||||||
|
);
|
||||||
|
|
||||||
|
tokio::spawn(spirc_task);
|
||||||
|
|
||||||
|
Ok((spirc, (receiver, rx)))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_stream(&self) -> Stream {
|
||||||
|
self.stream.clone()
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,76 @@
|
||||||
|
use std::{
|
||||||
|
io::{Read, Seek, Write},
|
||||||
|
sync::{Arc, Condvar, Mutex},
|
||||||
|
};
|
||||||
|
|
||||||
|
use songbird::input::reader::MediaSource;
|
||||||
|
|
||||||
|
const MAX_SIZE: usize = 1 * 1024 * 1024;
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct Stream {
|
||||||
|
inner: Arc<(Mutex<Vec<u8>>, Condvar)>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Stream {
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Self {
|
||||||
|
inner: Arc::new((Mutex::new(Vec::new()), Condvar::new())),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Read for Stream {
|
||||||
|
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
|
||||||
|
let (mutex, condvar) = &*self.inner;
|
||||||
|
let mut buffer = mutex.lock().expect("Mutex was poisoned");
|
||||||
|
|
||||||
|
log::trace!("Read!");
|
||||||
|
|
||||||
|
while buffer.is_empty() {
|
||||||
|
buffer = condvar.wait(buffer).expect("Mutex was poisoned");
|
||||||
|
}
|
||||||
|
|
||||||
|
let max_read = usize::min(buf.len(), buffer.len());
|
||||||
|
buf[0..max_read].copy_from_slice(&buffer[0..max_read]);
|
||||||
|
buffer.drain(0..max_read);
|
||||||
|
|
||||||
|
Ok(max_read)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Write for Stream {
|
||||||
|
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
|
||||||
|
let (mutex, condvar) = &*self.inner;
|
||||||
|
let mut buffer = mutex.lock().expect("Mutex was poisoned");
|
||||||
|
|
||||||
|
while buffer.len() + buf.len() > MAX_SIZE {
|
||||||
|
buffer = condvar.wait(buffer).unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
buffer.extend_from_slice(buf);
|
||||||
|
condvar.notify_all();
|
||||||
|
|
||||||
|
Ok(buf.len())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn flush(&mut self) -> std::io::Result<()> {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Seek for Stream {
|
||||||
|
fn seek(&mut self, _: std::io::SeekFrom) -> std::io::Result<u64> {
|
||||||
|
Ok(0)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl MediaSource for Stream {
|
||||||
|
fn byte_len(&self) -> Option<u64> {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
|
||||||
|
fn is_seekable(&self) -> bool {
|
||||||
|
false
|
||||||
|
}
|
||||||
|
}
|
|
@ -25,8 +25,8 @@ pub enum SessionCreateError {
|
||||||
#[error("Failed to join voice channel {0} ({1})")]
|
#[error("Failed to join voice channel {0} ({1})")]
|
||||||
JoinError(ChannelId, GuildId),
|
JoinError(ChannelId, GuildId),
|
||||||
|
|
||||||
#[error("Failed to start player process")]
|
#[error("Failed to start the player")]
|
||||||
ForkError,
|
PlayerStartError,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
|
|
|
@ -6,13 +6,17 @@ use self::{
|
||||||
pbi::PlaybackInfo,
|
pbi::PlaybackInfo,
|
||||||
};
|
};
|
||||||
use crate::{
|
use crate::{
|
||||||
|
audio::SinkEvent,
|
||||||
consts::DISCONNECT_TIME,
|
consts::DISCONNECT_TIME,
|
||||||
database::{Database, DatabaseError},
|
database::{Database, DatabaseError},
|
||||||
ipc::{self, packet::IpcPacket, Client},
|
player::Player,
|
||||||
utils::{embed::Status, spotify},
|
utils::{embed::Status, spotify},
|
||||||
};
|
};
|
||||||
use ipc_channel::ipc::{IpcError, TryRecvError};
|
use librespot::{
|
||||||
use librespot::core::spotify_id::{SpotifyAudioType, SpotifyId};
|
connect::spirc::Spirc,
|
||||||
|
core::spotify_id::{SpotifyAudioType, SpotifyId},
|
||||||
|
playback::player::PlayerEvent,
|
||||||
|
};
|
||||||
use log::*;
|
use log::*;
|
||||||
use serenity::{
|
use serenity::{
|
||||||
async_trait,
|
async_trait,
|
||||||
|
@ -22,20 +26,13 @@ use serenity::{
|
||||||
};
|
};
|
||||||
use songbird::{
|
use songbird::{
|
||||||
create_player,
|
create_player,
|
||||||
input::{children_to_reader, Codec, Container, Input},
|
input::{Codec, Container, Input, Reader},
|
||||||
tracks::TrackHandle,
|
tracks::TrackHandle,
|
||||||
Call, Event, EventContext, EventHandler,
|
Call, Event, EventContext, EventHandler,
|
||||||
};
|
};
|
||||||
use std::{
|
use std::{sync::Arc, time::Duration};
|
||||||
process::{Command, Stdio},
|
|
||||||
sync::Arc,
|
|
||||||
time::Duration,
|
|
||||||
};
|
|
||||||
use tokio::sync::Mutex;
|
use tokio::sync::Mutex;
|
||||||
|
|
||||||
#[cfg(feature = "metrics")]
|
|
||||||
use crate::metrics::MetricsManager;
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct SpoticordSession(Arc<RwLock<InnerSpoticordSession>>);
|
pub struct SpoticordSession(Arc<RwLock<InnerSpoticordSession>>);
|
||||||
|
|
||||||
|
@ -56,14 +53,11 @@ struct InnerSpoticordSession {
|
||||||
|
|
||||||
disconnect_handle: Option<tokio::task::JoinHandle<()>>,
|
disconnect_handle: Option<tokio::task::JoinHandle<()>>,
|
||||||
|
|
||||||
client: Option<Client>,
|
spirc: Option<Spirc>,
|
||||||
|
|
||||||
/// Whether the session has been disconnected
|
/// Whether the session has been disconnected
|
||||||
/// If this is true then this instance should no longer be used and dropped
|
/// If this is true then this instance should no longer be used and dropped
|
||||||
disconnected: bool,
|
disconnected: bool,
|
||||||
|
|
||||||
#[cfg(feature = "metrics")]
|
|
||||||
metrics: MetricsManager,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SpoticordSession {
|
impl SpoticordSession {
|
||||||
|
@ -81,12 +75,6 @@ impl SpoticordSession {
|
||||||
.expect("to contain a value")
|
.expect("to contain a value")
|
||||||
.clone();
|
.clone();
|
||||||
|
|
||||||
#[cfg(feature = "metrics")]
|
|
||||||
let metrics = data
|
|
||||||
.get::<MetricsManager>()
|
|
||||||
.expect("to contain a value")
|
|
||||||
.clone();
|
|
||||||
|
|
||||||
// Join the voice channel
|
// Join the voice channel
|
||||||
let songbird = songbird::get(ctx).await.expect("to be present").clone();
|
let songbird = songbird::get(ctx).await.expect("to be present").clone();
|
||||||
|
|
||||||
|
@ -108,15 +96,11 @@ impl SpoticordSession {
|
||||||
track: None,
|
track: None,
|
||||||
playback_info: None,
|
playback_info: None,
|
||||||
disconnect_handle: None,
|
disconnect_handle: None,
|
||||||
client: None,
|
spirc: None,
|
||||||
disconnected: false,
|
disconnected: false,
|
||||||
|
|
||||||
#[cfg(feature = "metrics")]
|
|
||||||
metrics,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut instance = Self(Arc::new(RwLock::new(inner)));
|
let mut instance = Self(Arc::new(RwLock::new(inner)));
|
||||||
|
|
||||||
instance.create_player(ctx).await?;
|
instance.create_player(ctx).await?;
|
||||||
|
|
||||||
let mut call = call.lock().await;
|
let mut call = call.lock().await;
|
||||||
|
@ -165,39 +149,31 @@ impl SpoticordSession {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Advance to the next track
|
/// Advance to the next track
|
||||||
pub async fn next(&mut self) -> Result<(), IpcError> {
|
pub async fn next(&mut self) {
|
||||||
if let Some(ref client) = self.0.read().await.client {
|
if let Some(ref spirc) = self.0.read().await.spirc {
|
||||||
return client.send(IpcPacket::Next);
|
spirc.next();
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Rewind to the previous track
|
/// Rewind to the previous track
|
||||||
pub async fn previous(&mut self) -> Result<(), IpcError> {
|
pub async fn previous(&mut self) {
|
||||||
if let Some(ref client) = self.0.read().await.client {
|
if let Some(ref spirc) = self.0.read().await.spirc {
|
||||||
return client.send(IpcPacket::Previous);
|
spirc.prev();
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Pause the current track
|
/// Pause the current track
|
||||||
pub async fn pause(&mut self) -> Result<(), IpcError> {
|
pub async fn pause(&mut self) {
|
||||||
if let Some(ref client) = self.0.read().await.client {
|
if let Some(ref spirc) = self.0.read().await.spirc {
|
||||||
return client.send(IpcPacket::Pause);
|
spirc.pause();
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Resume the current track
|
/// Resume the current track
|
||||||
pub async fn resume(&mut self) -> Result<(), IpcError> {
|
pub async fn resume(&mut self) {
|
||||||
if let Some(ref client) = self.0.read().await.client {
|
if let Some(ref spirc) = self.0.read().await.spirc {
|
||||||
return client.send(IpcPacket::Resume);
|
spirc.play();
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn create_player(&mut self, ctx: &Context) -> Result<(), SessionCreateError> {
|
async fn create_player(&mut self, ctx: &Context) -> Result<(), SessionCreateError> {
|
||||||
|
@ -232,52 +208,17 @@ impl SpoticordSession {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
// Create IPC oneshot server
|
// Create player
|
||||||
let (server, tx_name, rx_name) = match ipc::Server::create() {
|
let mut player = Player::create();
|
||||||
Ok(server) => server,
|
|
||||||
Err(why) => {
|
|
||||||
error!("Failed to create IPC server: {:?}", why);
|
|
||||||
return Err(SessionCreateError::ForkError);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
// Spawn player process
|
|
||||||
let child =
|
|
||||||
match Command::new(std::env::current_exe().expect("to know the current executable path"))
|
|
||||||
.args([
|
|
||||||
"--player",
|
|
||||||
&tx_name,
|
|
||||||
&rx_name,
|
|
||||||
"--debug-guild-id",
|
|
||||||
&self.guild_id().await.to_string(),
|
|
||||||
])
|
|
||||||
.stdout(Stdio::piped())
|
|
||||||
.stderr(Stdio::inherit())
|
|
||||||
.spawn()
|
|
||||||
{
|
|
||||||
Ok(child) => child,
|
|
||||||
Err(why) => {
|
|
||||||
error!("Failed to start player process: {:?}", why);
|
|
||||||
return Err(SessionCreateError::ForkError);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
// Establish bi-directional IPC channel
|
|
||||||
let client = match server.accept() {
|
|
||||||
Ok(client) => client,
|
|
||||||
Err(why) => {
|
|
||||||
error!("Failed to accept IPC connection: {:?}", why);
|
|
||||||
|
|
||||||
return Err(SessionCreateError::ForkError);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
// Pipe player audio to the voice channel
|
|
||||||
let reader = children_to_reader::<f32>(vec![child]);
|
|
||||||
|
|
||||||
// Create track (paused, fixes audio glitches)
|
// Create track (paused, fixes audio glitches)
|
||||||
let (mut track, track_handle) =
|
let (mut track, track_handle) = create_player(Input::new(
|
||||||
create_player(Input::new(true, reader, Codec::Pcm, Container::Raw, None));
|
true,
|
||||||
|
Reader::Extension(Box::new(player.get_stream())),
|
||||||
|
Codec::Pcm,
|
||||||
|
Container::Raw,
|
||||||
|
None,
|
||||||
|
));
|
||||||
track.pause();
|
track.pause();
|
||||||
|
|
||||||
let call = self.call().await;
|
let call = self.call().await;
|
||||||
|
@ -286,11 +227,20 @@ impl SpoticordSession {
|
||||||
// Set call audio to track
|
// Set call audio to track
|
||||||
call.play_only(track);
|
call.play_only(track);
|
||||||
|
|
||||||
|
let (spirc, (mut player_rx, mut sink_rx)) = match player.start(&token, &user.device_name).await
|
||||||
|
{
|
||||||
|
Ok(v) => v,
|
||||||
|
Err(why) => {
|
||||||
|
error!("Failed to start the player: {:?}", why);
|
||||||
|
|
||||||
|
return Err(SessionCreateError::PlayerStartError);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
// Handle IPC packets
|
// Handle IPC packets
|
||||||
// This will automatically quit once the IPC connection is closed
|
// This will automatically quit once the IPC connection is closed
|
||||||
tokio::spawn({
|
tokio::spawn({
|
||||||
let track = track_handle.clone();
|
let track = track_handle.clone();
|
||||||
let client = client.clone();
|
|
||||||
let ctx = ctx.clone();
|
let ctx = ctx.clone();
|
||||||
let instance = self.clone();
|
let instance = self.clone();
|
||||||
let inner = self.0.clone();
|
let inner = self.0.clone();
|
||||||
|
@ -315,87 +265,17 @@ impl SpoticordSession {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
let msg = match client.try_recv() {
|
tokio::select! {
|
||||||
Ok(msg) => msg,
|
event = player_rx.recv() => {
|
||||||
Err(why) => {
|
let Some(event) = event else { break; };
|
||||||
if let TryRecvError::Empty = why {
|
|
||||||
// No message, wait a bit and try again
|
|
||||||
tokio::time::sleep(Duration::from_millis(25)).await;
|
|
||||||
|
|
||||||
continue;
|
|
||||||
} else if let TryRecvError::IpcError(IpcError::Disconnected) = &why {
|
|
||||||
trace!("IPC connection closed, exiting IPC handler");
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
error!("Failed to receive IPC message: {:?}", why);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
match msg {
|
|
||||||
// Session connect error
|
|
||||||
IpcPacket::ConnectError(why) => {
|
|
||||||
error!("Failed to connect to Spotify: {:?}", why);
|
|
||||||
|
|
||||||
// Notify the user in the text channel
|
|
||||||
if let Err(why) = instance
|
|
||||||
.text_channel_id()
|
|
||||||
.await
|
|
||||||
.send_message(&instance.http().await, |message| {
|
|
||||||
message.embed(|embed| {
|
|
||||||
embed.title("Failed to connect to Spotify");
|
|
||||||
embed.description(why);
|
|
||||||
embed.footer(|footer| footer.text("Please try again"));
|
|
||||||
embed.color(Status::Error as u64);
|
|
||||||
|
|
||||||
embed
|
|
||||||
});
|
|
||||||
|
|
||||||
message
|
|
||||||
})
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
error!("Failed to send error message: {:?}", why);
|
|
||||||
}
|
|
||||||
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Sink requests playback to start/resume
|
|
||||||
IpcPacket::StartPlayback => {
|
|
||||||
check_result(track.play());
|
|
||||||
}
|
|
||||||
|
|
||||||
// Sink requests playback to pause
|
|
||||||
IpcPacket::StopPlayback => {
|
|
||||||
check_result(track.pause());
|
|
||||||
}
|
|
||||||
|
|
||||||
// A new track has been set by the player
|
|
||||||
IpcPacket::TrackChange(track) => {
|
|
||||||
// Convert to SpotifyId
|
|
||||||
let track_id = SpotifyId::from_uri(&track).expect("to be a valid uri");
|
|
||||||
|
|
||||||
let instance = instance.clone();
|
|
||||||
let ctx = ctx.clone();
|
|
||||||
|
|
||||||
// Fetch track info
|
|
||||||
// This is done in a separate task to avoid blocking the IPC handler
|
|
||||||
tokio::spawn(async move {
|
|
||||||
if let Err(why) = instance.update_track(&ctx, &owner_id, track_id).await {
|
|
||||||
error!("Failed to update track: {:?}", why);
|
|
||||||
|
|
||||||
instance.player_stopped().await;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
// The player has started playing a track
|
|
||||||
IpcPacket::Playing(track, position_ms, duration_ms) => {
|
|
||||||
// Convert to SpotifyId
|
|
||||||
let track_id = SpotifyId::from_uri(&track).expect("to be a valid uri");
|
|
||||||
|
|
||||||
|
match event {
|
||||||
|
PlayerEvent::Playing {
|
||||||
|
play_request_id: _,
|
||||||
|
track_id,
|
||||||
|
position_ms,
|
||||||
|
duration_ms,
|
||||||
|
} => {
|
||||||
let was_none = instance
|
let was_none = instance
|
||||||
.update_playback(duration_ms, position_ms, true)
|
.update_playback(duration_ms, position_ms, true)
|
||||||
.await;
|
.await;
|
||||||
|
@ -411,12 +291,14 @@ impl SpoticordSession {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
IpcPacket::Paused(track, position_ms, duration_ms) => {
|
PlayerEvent::Paused {
|
||||||
|
play_request_id: _,
|
||||||
|
track_id,
|
||||||
|
position_ms,
|
||||||
|
duration_ms,
|
||||||
|
} => {
|
||||||
instance.start_disconnect_timer().await;
|
instance.start_disconnect_timer().await;
|
||||||
|
|
||||||
// Convert to SpotifyId
|
|
||||||
let track_id = SpotifyId::from_uri(&track).expect("to be a valid uri");
|
|
||||||
|
|
||||||
let was_none = instance
|
let was_none = instance
|
||||||
.update_playback(duration_ms, position_ms, false)
|
.update_playback(duration_ms, position_ms, false)
|
||||||
.await;
|
.await;
|
||||||
|
@ -433,7 +315,28 @@ impl SpoticordSession {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
IpcPacket::Stopped => {
|
PlayerEvent::Changed {
|
||||||
|
old_track_id: _,
|
||||||
|
new_track_id,
|
||||||
|
} => {
|
||||||
|
let instance = instance.clone();
|
||||||
|
let ctx = ctx.clone();
|
||||||
|
|
||||||
|
// Fetch track info
|
||||||
|
// This is done in a separate task to avoid blocking the IPC handler
|
||||||
|
tokio::spawn(async move {
|
||||||
|
if let Err(why) = instance.update_track(&ctx, &owner_id, new_track_id).await {
|
||||||
|
error!("Failed to update track: {:?}", why);
|
||||||
|
|
||||||
|
instance.player_stopped().await;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
PlayerEvent::Stopped {
|
||||||
|
play_request_id: _,
|
||||||
|
track_id: _,
|
||||||
|
} => {
|
||||||
check_result(track.pause());
|
check_result(track.pause());
|
||||||
|
|
||||||
{
|
{
|
||||||
|
@ -444,9 +347,146 @@ impl SpoticordSession {
|
||||||
instance.start_disconnect_timer().await;
|
instance.start_disconnect_timer().await;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Ignore other packets
|
|
||||||
_ => {}
|
_ => {}
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
event = sink_rx.recv() => {
|
||||||
|
let Some(event) = event else { break; };
|
||||||
|
|
||||||
|
let check_result = |result| {
|
||||||
|
if let Err(why) = result {
|
||||||
|
error!("Failed to issue track command: {:?}", why);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
match event {
|
||||||
|
SinkEvent::Start => {
|
||||||
|
check_result(track.play());
|
||||||
|
}
|
||||||
|
|
||||||
|
SinkEvent::Stop => {
|
||||||
|
check_result(track.pause());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// match event {
|
||||||
|
// // Session connect error
|
||||||
|
// IpcPacket::ConnectError(why) => {
|
||||||
|
// error!("Failed to connect to Spotify: {:?}", why);
|
||||||
|
|
||||||
|
// // Notify the user in the text channel
|
||||||
|
// if let Err(why) = instance
|
||||||
|
// .text_channel_id()
|
||||||
|
// .await
|
||||||
|
// .send_message(&instance.http().await, |message| {
|
||||||
|
// message.embed(|embed| {
|
||||||
|
// embed.title("Failed to connect to Spotify");
|
||||||
|
// embed.description(why);
|
||||||
|
// embed.footer(|footer| footer.text("Please try again"));
|
||||||
|
// embed.color(Status::Error as u64);
|
||||||
|
|
||||||
|
// embed
|
||||||
|
// });
|
||||||
|
|
||||||
|
// message
|
||||||
|
// })
|
||||||
|
// .await
|
||||||
|
// {
|
||||||
|
// error!("Failed to send error message: {:?}", why);
|
||||||
|
// }
|
||||||
|
|
||||||
|
// break;
|
||||||
|
// }
|
||||||
|
|
||||||
|
// // Sink requests playback to start/resume
|
||||||
|
// IpcPacket::StartPlayback => {
|
||||||
|
// check_result(track.play());
|
||||||
|
// }
|
||||||
|
|
||||||
|
// // Sink requests playback to pause
|
||||||
|
// IpcPacket::StopPlayback => {
|
||||||
|
// check_result(track.pause());
|
||||||
|
// }
|
||||||
|
|
||||||
|
// // A new track has been set by the player
|
||||||
|
// IpcPacket::TrackChange(track) => {
|
||||||
|
// // Convert to SpotifyId
|
||||||
|
// let track_id = SpotifyId::from_uri(&track).expect("to be a valid uri");
|
||||||
|
|
||||||
|
// let instance = instance.clone();
|
||||||
|
// let ctx = ctx.clone();
|
||||||
|
|
||||||
|
// // Fetch track info
|
||||||
|
// // This is done in a separate task to avoid blocking the IPC handler
|
||||||
|
// tokio::spawn(async move {
|
||||||
|
// if let Err(why) = instance.update_track(&ctx, &owner_id, track_id).await {
|
||||||
|
// error!("Failed to update track: {:?}", why);
|
||||||
|
|
||||||
|
// instance.player_stopped().await;
|
||||||
|
// }
|
||||||
|
// });
|
||||||
|
// }
|
||||||
|
|
||||||
|
// // The player has started playing a track
|
||||||
|
// IpcPacket::Playing(track, position_ms, duration_ms) => {
|
||||||
|
// // Convert to SpotifyId
|
||||||
|
// let track_id = SpotifyId::from_uri(&track).expect("to be a valid uri");
|
||||||
|
|
||||||
|
// let was_none = instance
|
||||||
|
// .update_playback(duration_ms, position_ms, true)
|
||||||
|
// .await;
|
||||||
|
|
||||||
|
// if was_none {
|
||||||
|
// // Stop player if update track fails
|
||||||
|
// if let Err(why) = instance.update_track(&ctx, &owner_id, track_id).await {
|
||||||
|
// error!("Failed to update track: {:?}", why);
|
||||||
|
|
||||||
|
// instance.player_stopped().await;
|
||||||
|
// return;
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
|
||||||
|
// IpcPacket::Paused(track, position_ms, duration_ms) => {
|
||||||
|
// instance.start_disconnect_timer().await;
|
||||||
|
|
||||||
|
// // Convert to SpotifyId
|
||||||
|
// let track_id = SpotifyId::from_uri(&track).expect("to be a valid uri");
|
||||||
|
|
||||||
|
// let was_none = instance
|
||||||
|
// .update_playback(duration_ms, position_ms, false)
|
||||||
|
// .await;
|
||||||
|
|
||||||
|
// if was_none {
|
||||||
|
// // Stop player if update track fails
|
||||||
|
|
||||||
|
// if let Err(why) = instance.update_track(&ctx, &owner_id, track_id).await {
|
||||||
|
// error!("Failed to update track: {:?}", why);
|
||||||
|
|
||||||
|
// instance.player_stopped().await;
|
||||||
|
// return;
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
|
||||||
|
// IpcPacket::Stopped => {
|
||||||
|
// check_result(track.pause());
|
||||||
|
|
||||||
|
// {
|
||||||
|
// let mut inner = inner.write().await;
|
||||||
|
// inner.playback_info.take();
|
||||||
|
// }
|
||||||
|
|
||||||
|
// instance.start_disconnect_timer().await;
|
||||||
|
// }
|
||||||
|
|
||||||
|
// // Ignore other packets
|
||||||
|
// _ => {}
|
||||||
|
// }
|
||||||
}
|
}
|
||||||
|
|
||||||
// Clean up session
|
// Clean up session
|
||||||
|
@ -456,15 +496,10 @@ impl SpoticordSession {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
// Inform the player process to connect to Spotify
|
|
||||||
if let Err(why) = client.send(IpcPacket::Connect(token, user.device_name)) {
|
|
||||||
error!("Failed to send IpcPacket::Connect packet: {:?}", why);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Update inner client and track
|
// Update inner client and track
|
||||||
let mut inner = self.0.write().await;
|
let mut inner = self.0.write().await;
|
||||||
inner.track = Some(track_handle);
|
inner.track = Some(track_handle);
|
||||||
inner.client = Some(client);
|
inner.spirc = Some(spirc);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -537,14 +572,6 @@ impl SpoticordSession {
|
||||||
pbi.update_track_episode(spotify_id, track, episode);
|
pbi.update_track_episode(spotify_id, track, episode);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send track play event to metrics
|
|
||||||
#[cfg(feature = "metrics")]
|
|
||||||
{
|
|
||||||
if let Some(ref pbi) = inner.playback_info {
|
|
||||||
inner.metrics.track_play(pbi);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -552,15 +579,11 @@ impl SpoticordSession {
|
||||||
async fn player_stopped(&self) {
|
async fn player_stopped(&self) {
|
||||||
let mut inner = self.0.write().await;
|
let mut inner = self.0.write().await;
|
||||||
|
|
||||||
if let Some(client) = inner.client.take() {
|
if let Some(spirc) = inner.spirc.take() {
|
||||||
// Ask player to quit (will cause defunct process)
|
spirc.shutdown();
|
||||||
if let Err(why) = client.send(IpcPacket::Quit) {
|
|
||||||
error!("Failed to send quit packet: {:?}", why);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(track) = inner.track.take() {
|
if let Some(track) = inner.track.take() {
|
||||||
// Stop the playback, and freeing the child handle, removing the defunct process
|
|
||||||
if let Err(why) = track.stop() {
|
if let Err(why) = track.stop() {
|
||||||
error!("Failed to stop track: {:?}", why);
|
error!("Failed to stop track: {:?}", why);
|
||||||
}
|
}
|
||||||
|
@ -598,11 +621,7 @@ impl SpoticordSession {
|
||||||
inner.disconnect_no_abort().await;
|
inner.disconnect_no_abort().await;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stop the disconnect timer, if one is running
|
self.stop_disconnect_timer().await;
|
||||||
let mut inner = self.0.write().await;
|
|
||||||
if let Some(handle) = inner.disconnect_handle.take() {
|
|
||||||
handle.abort();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update playback info (duration, position, playing state)
|
// Update playback info (duration, position, playing state)
|
||||||
|
@ -613,6 +632,7 @@ impl SpoticordSession {
|
||||||
pbi.is_none()
|
pbi.is_none()
|
||||||
};
|
};
|
||||||
|
|
||||||
|
{
|
||||||
let mut inner = self.0.write().await;
|
let mut inner = self.0.write().await;
|
||||||
|
|
||||||
if is_none {
|
if is_none {
|
||||||
|
@ -625,6 +645,11 @@ impl SpoticordSession {
|
||||||
.expect("to contain a value")
|
.expect("to contain a value")
|
||||||
.update_pos_dur(position_ms, duration_ms, playing);
|
.update_pos_dur(position_ms, duration_ms, playing);
|
||||||
};
|
};
|
||||||
|
};
|
||||||
|
|
||||||
|
if playing {
|
||||||
|
self.stop_disconnect_timer().await;
|
||||||
|
}
|
||||||
|
|
||||||
is_none
|
is_none
|
||||||
}
|
}
|
||||||
|
@ -632,6 +657,8 @@ impl SpoticordSession {
|
||||||
/// Start the disconnect timer, which will disconnect the bot from the voice channel after a
|
/// Start the disconnect timer, which will disconnect the bot from the voice channel after a
|
||||||
/// certain amount of time
|
/// certain amount of time
|
||||||
async fn start_disconnect_timer(&self) {
|
async fn start_disconnect_timer(&self) {
|
||||||
|
self.stop_disconnect_timer().await;
|
||||||
|
|
||||||
let inner_arc = self.0.clone();
|
let inner_arc = self.0.clone();
|
||||||
let mut inner = inner_arc.write().await;
|
let mut inner = inner_arc.write().await;
|
||||||
|
|
||||||
|
@ -640,11 +667,6 @@ impl SpoticordSession {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Abort the previous timer, if one is running
|
|
||||||
if let Some(handle) = inner.disconnect_handle.take() {
|
|
||||||
handle.abort();
|
|
||||||
}
|
|
||||||
|
|
||||||
inner.disconnect_handle = Some(tokio::spawn({
|
inner.disconnect_handle = Some(tokio::spawn({
|
||||||
let inner = inner_arc.clone();
|
let inner = inner_arc.clone();
|
||||||
let instance = self.clone();
|
let instance = self.clone();
|
||||||
|
@ -681,6 +703,15 @@ impl SpoticordSession {
|
||||||
}));
|
}));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Stop the disconnect timer (if one is running)
|
||||||
|
async fn stop_disconnect_timer(&self) {
|
||||||
|
let mut inner = self.0.write().await;
|
||||||
|
if let Some(handle) = inner.disconnect_handle.take() {
|
||||||
|
handle.abort();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Disconnect from the VC and send a message to the text channel
|
||||||
pub async fn disconnect_with_message(&self, content: &str) {
|
pub async fn disconnect_with_message(&self, content: &str) {
|
||||||
{
|
{
|
||||||
let mut inner = self.0.write().await;
|
let mut inner = self.0.write().await;
|
||||||
|
@ -707,12 +738,7 @@ impl SpoticordSession {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Finally we stop and remove the disconnect timer
|
// Finally we stop and remove the disconnect timer
|
||||||
let mut inner = self.0.write().await;
|
self.stop_disconnect_timer().await;
|
||||||
|
|
||||||
// Stop the disconnect timer, if one is running
|
|
||||||
if let Some(handle) = inner.disconnect_handle.take() {
|
|
||||||
handle.abort();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Inner getters */
|
/* Inner getters */
|
||||||
|
@ -767,15 +793,11 @@ impl InnerSpoticordSession {
|
||||||
|
|
||||||
let mut call = self.call.lock().await;
|
let mut call = self.call.lock().await;
|
||||||
|
|
||||||
if let Some(client) = self.client.take() {
|
if let Some(spirc) = self.spirc.take() {
|
||||||
// Ask player to quit (will cause defunct process)
|
spirc.shutdown();
|
||||||
if let Err(why) = client.send(IpcPacket::Quit) {
|
|
||||||
error!("Failed to send quit packet: {:?}", why);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(track) = self.track.take() {
|
if let Some(track) = self.track.take() {
|
||||||
// Stop the playback, and freeing the child handle, removing the defunct process
|
|
||||||
if let Err(why) = track.stop() {
|
if let Err(why) = track.stop() {
|
||||||
error!("Failed to stop track: {:?}", why);
|
error!("Failed to stop track: {:?}", why);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue