From f941d1fb66d455f4de91ddf70abf585e50bd6dd9 Mon Sep 17 00:00:00 2001 From: Savanni D'Gerinel Date: Wed, 4 Sep 2024 01:41:54 -0400 Subject: [PATCH] Speculative server architecture --- Cargo.lock | 2 + gm-dash/server/Cargo.toml | 3 + gm-dash/server/src/{app.rs => app/mod.rs} | 85 +++++---- gm-dash/server/src/app/test.rs | 1 + gm-dash/server/src/app/tests.rs | 63 +++++++ .../mod.rs} | 170 ++++++++++++++++-- gm-dash/server/src/main.rs | 19 +- gm-dash/server/src/types.rs | 30 ++++ 8 files changed, 319 insertions(+), 54 deletions(-) rename gm-dash/server/src/{app.rs => app/mod.rs} (56%) create mode 100644 gm-dash/server/src/app/test.rs create mode 100644 gm-dash/server/src/app/tests.rs rename gm-dash/server/src/{audio_control.rs => audio_control/mod.rs} (50%) create mode 100644 gm-dash/server/src/types.rs diff --git a/Cargo.lock b/Cargo.lock index d5a3535..5551c55 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1946,11 +1946,13 @@ dependencies = [ name = "gm-dash" version = "0.1.0" dependencies = [ + "cool_asserts", "glib 0.18.5", "gstreamer", "pipewire", "serde 1.0.209", "serde_json", + "thiserror", "tokio", "warp", ] diff --git a/gm-dash/server/Cargo.toml b/gm-dash/server/Cargo.toml index ea27496..d8c0bd4 100644 --- a/gm-dash/server/Cargo.toml +++ b/gm-dash/server/Cargo.toml @@ -13,3 +13,6 @@ serde_json = { version = "1.0.127" } tokio = { version = "1.39.3", features = ["full"] } warp = { version = "0.3.7" } glib = { version = "0.18" } +thiserror = "1.0.63" +cool_asserts = "2.0.3" + diff --git a/gm-dash/server/src/app.rs b/gm-dash/server/src/app/mod.rs similarity index 56% rename from gm-dash/server/src/app.rs rename to gm-dash/server/src/app/mod.rs index fb21386..91d976f 100644 --- a/gm-dash/server/src/app.rs +++ b/gm-dash/server/src/app/mod.rs @@ -1,18 +1,25 @@ use std::{ - collections::HashSet, + collections::HashMap, + path::PathBuf, sync::{Arc, RwLock}, }; -use crate::audio_control::AudioControl; +use crate::{ + audio_control::{AudioControl, AudioControlBackend}, + types::{AppError, TrackInfo}, +}; struct AppState { device_list: Vec, - track_list: Vec, - currently_playing: HashSet, + track_list: Vec, audio_control: AudioControl, } +#[cfg(test)] +mod tests; + +/* impl Default for AppState { fn default() -> Self { Self { @@ -30,20 +37,44 @@ impl Default for AppState { } } } +*/ #[derive(Clone)] pub struct App { internal: Arc>, } +impl Default for App { + fn default() -> Self { + let internal = AppState { + device_list: vec![], + track_list: vec![], + audio_control: AudioControl::default(), + }; + + Self { + internal: Arc::new(RwLock::new(internal)), + } + } +} + impl App { - fn new() -> App { - let internal = AppState::default(); + fn new(backend: impl AudioControlBackend + 'static) -> App { + let internal = AppState { + device_list: vec![], + track_list: vec![], + audio_control: AudioControl::new(backend), + }; + App { internal: Arc::new(RwLock::new(internal)), } } + pub fn playing(&self) -> bool { + self.internal.read().unwrap().audio_control.playing() + } + pub fn add_audio(&self, device: String) { let mut st = self.internal.write().unwrap(); st.device_list.push(device); @@ -54,47 +85,39 @@ impl App { st.device_list.clone() } - pub fn tracks(&self) -> Vec { + pub fn enabled_tracks(&self) -> Vec{ let st = self.internal.read().unwrap(); - st.track_list.clone() + st.audio_control.tracks().into_iter().map(|ti| ti.path.clone()).collect() + } - pub fn enable_track(&self, track: &str) -> Result<(), String> { - let mut st = self.internal.write().unwrap(); - if !st.currently_playing.contains(track) { - st.currently_playing.insert(track.to_owned()); - } + pub fn enable_track(&self, path: PathBuf) -> Result<(), AppError> { + let st = self.internal.write().unwrap(); + st.audio_control.add_track(TrackInfo{ path })?; + Ok(()) } - pub fn disable_track(&self, track: &str) -> Result<(), String> { + pub fn disable_track(&self, _track: &str) -> Result<(), AppError> { + /* let mut st = self.internal.write().unwrap(); - if st.currently_playing.contains(track) { + if st.currently_playing.contains_key(track) { st.currently_playing.remove(track); } Ok(()) - } - - pub fn play(&self) -> Result<(), String> { - let st = self.internal.write().unwrap(); - st.audio_control.play(); + */ Ok(()) } - pub fn stop(&self) -> Result<(), String> { + pub fn play(&self) -> Result<(), AppError> { let st = self.internal.write().unwrap(); - st.audio_control.stop(); + st.audio_control.play()?; Ok(()) } - pub fn playing(&self) -> Vec { - let st = self.internal.read().unwrap(); - st.currently_playing.iter().cloned().collect() - } -} - -impl Default for App { - fn default() -> App { - App::new() + pub fn stop(&self) -> Result<(), AppError> { + let st = self.internal.write().unwrap(); + st.audio_control.stop()?; + Ok(()) } } diff --git a/gm-dash/server/src/app/test.rs b/gm-dash/server/src/app/test.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/gm-dash/server/src/app/test.rs @@ -0,0 +1 @@ + diff --git a/gm-dash/server/src/app/tests.rs b/gm-dash/server/src/app/tests.rs new file mode 100644 index 0000000..7bb1b1d --- /dev/null +++ b/gm-dash/server/src/app/tests.rs @@ -0,0 +1,63 @@ +use std::path::PathBuf; + +use cool_asserts::assert_matches; + +use crate::{ + app::{App, AppError}, + audio_control::MemoryBackend, +}; + +fn with_memory_app(f: F) +where + F: Fn(App), +{ + let app = App::new(MemoryBackend::default()); + + f(app) +} + +#[test] +fn app_starts_in_stopped_state() { + with_memory_app(|app| { + assert!(!app.playing()); + }); +} + +#[test] +fn can_add_a_track_without_starting_playback() { + with_memory_app(|app| { + app.enable_track(PathBuf::from("/home/savanni/Music/Travis Savoie/RPG Toolkit Volume II/05 - Books and Spellcrafting.mp3.mp3")).expect("to enable a track"); + assert!(!app.playing()); + app.enable_track(PathBuf::from( + "/home/savanni/Music/Travis Savoie/RPG Toolkit Volume II/01 - A Day to Rebuild.mp3.mp3", + )) + .expect("to enable a track"); + assert!(!app.playing()); + + let tracks = app.enabled_tracks(); + tracks.iter().find(|p| **p == PathBuf::from("/home/savanni/Music/Travis Savoie/RPG Toolkit Volume II/05 - Books and Spellcrafting.mp3.mp3")).expect("the books and spellcrafting track to be enabled"); + tracks.iter().find(|p| **p == PathBuf::from("/home/savanni/Music/Travis Savoie/RPG Toolkit Volume II/01 - A Day to Rebuild.mp3.mp3")).expect("the day to rebuild track to be enabled"); + }); +} + +#[test] +fn cannot_start_playback_with_no_tracks() { + with_memory_app(|app| { + assert_matches!(app.play(), Err(AppError::NoTracks)); + }); +} + +#[test] +fn can_add_a_track_during_playback() { + with_memory_app(|app| { + app.enable_track(PathBuf::from("/home/savanni/Music/Travis Savoie/RPG Toolkit Volume II/05 - Books and Spellcrafting.mp3.mp3")).expect("to enable a track"); + app.play().expect("to start playback"); + + assert!(app.playing()); + + app.enable_track(PathBuf::from( + "/home/savanni/Music/Travis Savoie/RPG Toolkit Volume II/01 - A Day to Rebuild.mp3.mp3", + )) + .expect("to enable another track during playback"); + }); +} diff --git a/gm-dash/server/src/audio_control.rs b/gm-dash/server/src/audio_control/mod.rs similarity index 50% rename from gm-dash/server/src/audio_control.rs rename to gm-dash/server/src/audio_control/mod.rs index 8c1452d..02c1920 100644 --- a/gm-dash/server/src/audio_control.rs +++ b/gm-dash/server/src/audio_control/mod.rs @@ -1,18 +1,144 @@ +use std::{ + collections::HashSet, + path::PathBuf, + sync::{Arc, RwLock}, +}; + use gstreamer::{prelude::*, ClockTime, MessageType, MessageView}; -use std::sync::{Arc, RwLock}; +use thiserror::Error; + +use crate::types::TrackInfo; + +#[derive(Debug, Error, PartialEq)] +pub enum AudioError { + #[error("No tracks are available to play")] + NoTracks, + + #[error("Cannot perform operation in the current state")] + InvalidState, +} pub struct AudioControl { + backend: Arc>, +} + +impl Default for AudioControl { + fn default() -> Self { + Self::new(GStreamerBackend::default()) + } +} + +impl AudioControl { + pub fn new(backend: impl AudioControlBackend + 'static) -> Self { + Self { + backend: Arc::new(RwLock::new(backend)), + } + } + + pub fn playing(&self) -> bool { + self.backend.read().unwrap().playing() + } + + pub fn tracks(&self) -> Vec { + self.backend.read().unwrap().tracks() + } + + pub fn play(&self) -> Result<(), AudioError> { + self.backend.read().unwrap().play() + } + + pub fn stop(&self) -> Result<(), AudioError> { + self.backend.read().unwrap().stop() + } + + pub fn add_track(&self, track: TrackInfo) -> Result<(), AudioError> { + self.backend.write().unwrap().add_track(track) + } +} + +pub trait AudioControlBackend: Send + Sync { + fn playing(&self) -> bool; + + fn tracks(&self) -> Vec; + + fn play(&self) -> Result<(), AudioError>; + + fn stop(&self) -> Result<(), AudioError>; + + fn add_track(&mut self, track: TrackInfo) -> Result<(), AudioError>; + + fn remove_track(&mut self, track: TrackInfo) -> Result<(), AudioError>; +} + +pub struct MemoryBackend { + playing: Arc>, + tracks: HashSet, +} + +impl Default for MemoryBackend { + fn default() -> Self { + Self { + playing: Arc::new(RwLock::new(false)), + tracks: HashSet::new(), + } + } +} + +impl AudioControlBackend for MemoryBackend { + fn playing(&self) -> bool { + *self.playing.read().unwrap() + } + + fn tracks(&self) -> Vec { + self.tracks.iter().cloned().collect() + } + + fn play(&self) -> Result<(), AudioError> { + if self.tracks.is_empty() { + return Err(AudioError::NoTracks); + } + + let mut playing = self.playing.write().unwrap(); + if *playing { + return Err(AudioError::InvalidState); + } + + *playing = true; + Ok(()) + } + + fn stop(&self) -> Result<(), AudioError> { + let mut playing = self.playing.write().unwrap(); + if *playing { + *playing = false; + Ok(()) + } else { + Err(AudioError::InvalidState) + } + } + + fn add_track(&mut self, track: TrackInfo) -> Result<(), AudioError> { + self.tracks.insert(track); + Ok(()) + } + + fn remove_track(&mut self, track: TrackInfo) -> Result<(), AudioError> { + self.tracks.remove(&track); + Ok(()) + } +} + +pub struct GStreamerBackend { bus: gstreamer::Bus, pipeline: gstreamer::Pipeline, mixer: gstreamer::Element, audio_sink: gstreamer::Element, - - bus_monitor: std::thread::JoinHandle<()>, + monitor: std::thread::JoinHandle<()>, playing: Arc>, } -impl Default for AudioControl { +impl Default for GStreamerBackend { fn default() -> Self { let pipeline = gstreamer::Pipeline::new(); let bus = pipeline.bus().unwrap(); @@ -38,7 +164,7 @@ impl Default for AudioControl { let playing = Arc::new(RwLock::new(false)); - let bus_monitor = std::thread::spawn({ + let monitor = std::thread::spawn({ let pipeline_object = pipeline.clone().upcast::(); let playing = playing.clone(); let bus = bus.clone(); @@ -54,7 +180,8 @@ impl Default for AudioControl { match msg.view() { MessageView::StateChanged(st) => { if msg.src() == Some(&pipeline_object) { - *playing.write().unwrap() = st.current() == gstreamer::State::Playing; + *playing.write().unwrap() = + st.current() == gstreamer::State::Playing; } } MessageView::Error(err) => { @@ -77,41 +204,51 @@ impl Default for AudioControl { mixer, audio_sink, - bus_monitor, + monitor, - playing, + playing: Arc::new(RwLock::new(false)), } } } -impl AudioControl { - pub fn playing(&self) -> bool { +impl AudioControlBackend for GStreamerBackend { + fn playing(&self) -> bool { *self.playing.read().unwrap() } - pub fn play(&self) { + fn tracks(&self) -> Vec { + vec![] + } + + fn play(&self) -> Result<(), AudioError> { let mut playing = self.playing.write().unwrap(); if !*playing { // self.pipeline.set_state(gstreamer::State::Playing).unwrap(); *playing = true; + Ok(()) + } else { + Err(AudioError::InvalidState) } } - pub fn stop(&self) { + fn stop(&self) -> Result<(), AudioError> { let mut playing = self.playing.write().unwrap(); if *playing { // self.pipeline.set_state(gstreamer::State::Paused).unwrap(); *playing = false; + Ok(()) + } else { + Err(AudioError::InvalidState) } } - pub fn add_track(&mut self, path: String) { + fn add_track(&mut self, track: TrackInfo) -> Result<(), AudioError> { let source = gstreamer::ElementFactory::find("filesrc") .unwrap() .load() .unwrap() .create() - .property("location", path) + .property("location", track.path.to_str().unwrap()) .build() .unwrap(); self.pipeline.add(&source).unwrap(); @@ -142,9 +279,12 @@ impl AudioControl { let next_pad = volume.static_pad("sink").unwrap(); pad.link(&next_pad).unwrap(); }); + + Ok(()) } - pub fn remove_track(&mut self, path: String) { + fn remove_track(&mut self, _path: TrackInfo) -> Result<(), AudioError> { + unimplemented!() /* Need to run EOS through to a probe on the trailing end of the volume element */ } } diff --git a/gm-dash/server/src/main.rs b/gm-dash/server/src/main.rs index 4c85e72..095950c 100644 --- a/gm-dash/server/src/main.rs +++ b/gm-dash/server/src/main.rs @@ -1,13 +1,14 @@ +use std::{net::{Ipv6Addr, SocketAddrV6}, path::PathBuf}; + +use app::App; use pipewire::{context::Context, main_loop::MainLoop}; use serde::Deserialize; -use std::net::{Ipv6Addr, SocketAddrV6}; use tokio::task::spawn_blocking; use warp::{serve, Filter}; mod audio_control; - mod app; -use app::App; +mod types; #[derive(Deserialize)] struct PlayTrackParams { @@ -27,10 +28,12 @@ async fn server_main(state: App) { } }); + /* let list_tracks = warp::path!("tracks").map({ let state = state.clone(); move || serde_json::to_string(&state.tracks()).unwrap() }); + */ let enable_track = warp::put() .and(warp::path!("playing")) @@ -38,7 +41,7 @@ async fn server_main(state: App) { .map({ let state = state.clone(); move |params: PlayTrackParams| { - state.enable_track(¶ms.track_name); + let _ = state.enable_track(PathBuf::from(params.track_name)); "".to_owned() } }); @@ -49,7 +52,7 @@ async fn server_main(state: App) { .map({ let state = state.clone(); move |params: PlayTrackParams| { - state.disable_track(¶ms.track_name); + let _ = state.disable_track(¶ms.track_name); "".to_owned() } }); @@ -57,7 +60,7 @@ async fn server_main(state: App) { let play_all = warp::put().and(warp::path!("playing")).map({ let state = state.clone(); move || { - state.play(); + let _ = state.play(); "".to_owned() } }); @@ -65,7 +68,7 @@ async fn server_main(state: App) { let stop_all = warp::delete().and(warp::path!("playing")).map({ let state = state.clone(); move || { - state.stop(); + let _ = state.stop(); "".to_owned() } }); @@ -77,7 +80,7 @@ async fn server_main(state: App) { let routes = root .or(list_output_devices) - .or(list_tracks) + // .or(list_tracks) .or(enable_track) .or(disable_track) .or(play_all) diff --git a/gm-dash/server/src/types.rs b/gm-dash/server/src/types.rs new file mode 100644 index 0000000..d300526 --- /dev/null +++ b/gm-dash/server/src/types.rs @@ -0,0 +1,30 @@ +use std::path::PathBuf; + +use thiserror::Error; + +use crate::audio_control::AudioError; + +#[derive(Debug, Error, PartialEq)] +pub enum AppError { + #[error("Operation invalid with no tracks enabled")] + NoTracks, + + #[error("Operation is invalid in the current state")] + InvalidState +} + +impl From for AppError { + fn from(err: AudioError) -> Self { + match err { + AudioError::NoTracks => Self::NoTracks, + AudioError::InvalidState => Self::InvalidState, + } + } +} + +#[derive(Clone, Debug, Hash, PartialEq, PartialOrd, Eq)] +pub struct TrackInfo { + pub path: PathBuf, +} + +