Big audio backend cleanup, small bugfix on auto-disconnect

main
DaXcess 2022-11-07 20:41:09 +01:00
parent a37daa23a0
commit 7405656854
2 changed files with 15 additions and 113 deletions

View File

@ -1,125 +1,30 @@
use librespot::playback::audio_backend::{Sink, SinkAsBytes, SinkResult}; use librespot::playback::audio_backend::{Sink, SinkAsBytes, SinkResult};
use librespot::playback::convert::Converter; use librespot::playback::convert::Converter;
use librespot::playback::decoder::AudioPacket; use librespot::playback::decoder::AudioPacket;
use log::{error, trace};
use std::io::Write; use std::io::Write;
use std::sync::{Arc, Mutex};
use std::thread::JoinHandle;
use std::time::Duration;
use crate::ipc; use crate::ipc;
use crate::ipc::packet::IpcPacket; use crate::ipc::packet::IpcPacket;
pub struct StdoutSink { pub struct StdoutSink {
client: ipc::Client, client: ipc::Client,
buffer: Arc<Mutex<Vec<u8>>>,
is_stopped: Arc<Mutex<bool>>,
handle: Option<JoinHandle<()>>,
} }
const BUFFER_SIZE: usize = 7680;
impl StdoutSink { impl StdoutSink {
pub fn start_writer(&mut self) {
// With 48khz, 32-bit float, 2 channels, 1 second of audio is 384000 bytes
// 384000 / 50 = 7680 bytes per 20ms
let buffer = self.buffer.clone();
let is_stopped = self.is_stopped.clone();
let client = self.client.clone();
let handle = std::thread::spawn(move || {
let mut output = std::io::stdout();
let mut act_buffer = [0u8; BUFFER_SIZE];
// Use closure to make sure lock is released as fast as possible
let is_stopped = || {
let is_stopped = is_stopped.lock().unwrap();
*is_stopped
};
// Start songbird's playback
client.send(IpcPacket::StartPlayback).unwrap();
loop {
if is_stopped() {
break;
}
std::thread::sleep(Duration::from_millis(15));
let mut buffer = buffer.lock().unwrap();
let to_drain: usize;
if buffer.len() < BUFFER_SIZE {
// Copy the buffer into the action buffer
// Fill remaining length with zeroes
act_buffer[..buffer.len()].copy_from_slice(&buffer[..]);
act_buffer[buffer.len()..].fill(0);
to_drain = buffer.len();
} else {
act_buffer.copy_from_slice(&buffer[..BUFFER_SIZE]);
to_drain = BUFFER_SIZE;
}
output.write_all(&act_buffer).unwrap_or(());
buffer.drain(..to_drain);
}
});
self.handle = Some(handle);
}
pub fn stop_writer(&mut self) -> std::thread::Result<()> {
// Use closure to avoid deadlocking the mutex
let set_stopped = |value| {
let mut is_stopped = self.is_stopped.lock().unwrap();
*is_stopped = value;
};
// Notify thread to stop
set_stopped(true);
// Wait for thread to stop
let result = match self.handle.take() {
Some(handle) => handle.join(),
None => Ok(()),
};
// Reset stopped value
set_stopped(false);
result
}
pub fn new(client: ipc::Client) -> Self { pub fn new(client: ipc::Client) -> Self {
StdoutSink { StdoutSink { client }
client,
is_stopped: Arc::new(Mutex::new(false)),
buffer: Arc::new(Mutex::new(Vec::new())),
handle: None,
}
} }
} }
impl Sink for StdoutSink { impl Sink for StdoutSink {
fn start(&mut self) -> SinkResult<()> { fn start(&mut self) -> SinkResult<()> {
self.start_writer(); // TODO: Handle error
self.client.send(IpcPacket::StartPlayback).unwrap();
Ok(()) Ok(())
} }
fn stop(&mut self) -> SinkResult<()> { fn stop(&mut self) -> SinkResult<()> {
// Stop the writer thread
// This is done before pausing songbird, because else the writer thread
// might hang on writing to stdout
if let Err(why) = self.stop_writer() {
error!("Failed to stop stdout writer: {:?}", why);
} else {
trace!("Stopped stdout writer");
}
// Stop songbird's playback // Stop songbird's playback
self.client.send(IpcPacket::StopPlayback).unwrap(); self.client.send(IpcPacket::StopPlayback).unwrap();
@ -140,7 +45,11 @@ impl Sink for StdoutSink {
&samples_f32, &samples_f32,
) )
.unwrap(); .unwrap();
self.write_bytes(resampled.as_bytes())?;
let samples_i16 =
&converter.f64_to_s16(&resampled.iter().map(|v| *v as f64).collect::<Vec<f64>>());
self.write_bytes(samples_i16.as_bytes())?;
} }
Ok(()) Ok(())
@ -149,18 +58,7 @@ impl Sink for StdoutSink {
impl SinkAsBytes for StdoutSink { impl SinkAsBytes for StdoutSink {
fn write_bytes(&mut self, data: &[u8]) -> SinkResult<()> { fn write_bytes(&mut self, data: &[u8]) -> SinkResult<()> {
let get_buffer_len = || { std::io::stdout().write_all(data).unwrap();
let buffer = self.buffer.lock().unwrap();
buffer.len()
};
while get_buffer_len() > BUFFER_SIZE * 2 {
std::thread::sleep(Duration::from_millis(15));
}
let mut buffer = self.buffer.lock().unwrap();
buffer.extend_from_slice(data);
Ok(()) Ok(())
} }

View File

@ -19,7 +19,7 @@ use serenity::{
}; };
use songbird::{ use songbird::{
create_player, create_player,
input::{children_to_reader, Input}, input::{children_to_reader, Codec, Container, Input},
tracks::TrackHandle, tracks::TrackHandle,
Call, Event, EventContext, EventHandler, Call, Event, EventContext, EventHandler,
}; };
@ -137,7 +137,8 @@ impl SpoticordSession {
let reader = children_to_reader::<f32>(vec![child]); let reader = children_to_reader::<f32>(vec![child]);
// Create track (paused, fixes audio glitches) // Create track (paused, fixes audio glitches)
let (mut track, track_handle) = create_player(Input::float_pcm(true, reader)); let (mut track, track_handle) =
create_player(Input::new(true, reader, Codec::Pcm, Container::Raw, None));
track.pause(); track.pause();
// Set call audio to track // Set call audio to track
@ -549,6 +550,9 @@ impl SpoticordSession {
loop { loop {
timer.tick().await; timer.tick().await;
// Make sure this task has not been aborted, if it has this will automatically stop execution.
tokio::task::yield_now().await;
let is_playing = { let is_playing = {
let pbi = pbi.read().await; let pbi = pbi.read().await;