From f555804f109b589b8a9dfab386bd5387abc4ddf3 Mon Sep 17 00:00:00 2001 From: Savanni D'Gerinel Date: Wed, 4 Sep 2024 21:20:41 -0400 Subject: [PATCH] App and AudioController now send messages as peers --- gm-dash/server/src/app/mod.rs | 135 +++++++++++++--------- gm-dash/server/src/app/test.rs | 1 - gm-dash/server/src/app/tests.rs | 147 +++++++++++++++++------- gm-dash/server/src/audio_control/mod.rs | 97 ++++++++++++---- gm-dash/server/src/main.rs | 58 ++++++---- gm-dash/server/src/types.rs | 63 +++++++++- 6 files changed, 355 insertions(+), 146 deletions(-) delete mode 100644 gm-dash/server/src/app/test.rs diff --git a/gm-dash/server/src/app/mod.rs b/gm-dash/server/src/app/mod.rs index 91d976f..6d97ecf 100644 --- a/gm-dash/server/src/app/mod.rs +++ b/gm-dash/server/src/app/mod.rs @@ -4,101 +4,122 @@ use std::{ sync::{Arc, RwLock}, }; -use crate::{ - audio_control::{AudioControl, AudioControlBackend}, - types::{AppError, TrackInfo}, +use tokio::{ + sync::mpsc::{Receiver, Sender}, + task::JoinHandle, }; -struct AppState { - device_list: Vec, - track_list: Vec, - - audio_control: AudioControl, -} +use crate::{ + audio_control::{AudioControl}, + types::{AppError, AudioControlMessage, AudioStatusMessage, TrackInfo, TrackSpec, Volume}, +}; #[cfg(test)] mod tests; -/* +struct AppState { + playing: bool, + + device_list: Vec, + track_list: Vec, + + track_status: Vec, +} + impl Default for AppState { fn default() -> Self { Self { - device_list: vec![], - track_list: vec![ - "/home/savanni/Music/Travis Savoie/RPG Toolkit Volume II/01 - A Day to Rebuild.mp3.mp3".to_owned(), - "/home/savanni/Music/Travis Savoie/RPG Toolkit Volume II/02 - Against the Clock.mp3.mp3".to_owned(), - "/home/savanni/Music/Travis Savoie/RPG Toolkit Volume II/03 - Alleyway Cutthroat.mp3.mp3".to_owned(), - "/home/savanni/Music/Travis Savoie/RPG Toolkit Volume II/04 - Beasts Of Legend.mp3.mp3".to_owned(), - "/home/savanni/Music/Travis Savoie/RPG Toolkit Volume II/05 - Books and Spellcrafting.mp3.mp3".to_owned(), - ], - currently_playing: HashSet::default(), + playing: false, - audio_control: AudioControl::default(), - } - } -} -*/ - -#[derive(Clone)] -pub struct App { - internal: Arc>, -} - -impl Default for App { - fn default() -> Self { - let internal = AppState { device_list: vec![], track_list: vec![], - audio_control: AudioControl::default(), - }; - Self { - internal: Arc::new(RwLock::new(internal)), + track_status: vec![], } } } +pub struct App { + state: Arc>, + + audio_control: Sender, + + listener: JoinHandle<()>, +} + impl App { - fn new(backend: impl AudioControlBackend + 'static) -> App { - let internal = AppState { - device_list: vec![], - track_list: vec![], - audio_control: AudioControl::new(backend), - }; + pub fn new( + audio_control: Sender, + mut audio_status: Receiver, + ) -> App { + let state = Arc::new(RwLock::new(AppState::default())); - App { - internal: Arc::new(RwLock::new(internal)), + let listener = tokio::spawn({ + let state = state.clone(); + async move { + println!("listener started"); + loop { + match audio_status.recv().await { + Some(AudioStatusMessage::Playing) => { + state.write().unwrap().playing = true; + } + Some(AudioStatusMessage::Status(track_status)) => { + state.write().unwrap().track_status = track_status; + } + msg => println!("message received from audio controller: {:?}", msg), + } + } + } + }); + + Self { + state, + audio_control, + listener, } } pub fn playing(&self) -> bool { - self.internal.read().unwrap().audio_control.playing() + self.state.read().unwrap().playing } pub fn add_audio(&self, device: String) { + /* let mut st = self.internal.write().unwrap(); st.device_list.push(device); + */ } pub fn audio_devices(&self) -> Vec { + /* let st = self.internal.read().unwrap(); st.device_list.clone() + */ + vec![] } - pub fn enabled_tracks(&self) -> Vec{ - let st = self.internal.read().unwrap(); - st.audio_control.tracks().into_iter().map(|ti| ti.path.clone()).collect() - + pub fn enabled_tracks(&self) -> Vec { + let st = self.state.read().unwrap(); + st.track_status.iter().map(|ti| ti.path.clone()).collect() } - pub fn enable_track(&self, path: PathBuf) -> Result<(), AppError> { + pub async fn enable_track(&self, path: PathBuf) -> Result<(), AppError> { + self.audio_control + .send(AudioControlMessage::EnableTrack(TrackSpec { + path, + volume: Volume::try_from(1.0).unwrap(), + })) + .await + .expect("audio control send to succeed"); + /* let st = self.internal.write().unwrap(); - st.audio_control.add_track(TrackInfo{ path })?; + st.audio_control.add_track(TrackSpec{ path })?; + */ Ok(()) } - pub fn disable_track(&self, _track: &str) -> Result<(), AppError> { + pub async fn disable_track(&self, _track: &str) -> Result<(), AppError> { /* let mut st = self.internal.write().unwrap(); if st.currently_playing.contains_key(track) { @@ -109,15 +130,19 @@ impl App { Ok(()) } - pub fn play(&self) -> Result<(), AppError> { + pub async fn play(&self) -> Result<(), AppError> { + /* let st = self.internal.write().unwrap(); st.audio_control.play()?; + */ Ok(()) } - pub fn stop(&self) -> Result<(), AppError> { + pub async fn stop(&self) -> Result<(), AppError> { + /* let st = self.internal.write().unwrap(); st.audio_control.stop()?; + */ Ok(()) } } diff --git a/gm-dash/server/src/app/test.rs b/gm-dash/server/src/app/test.rs deleted file mode 100644 index 8b13789..0000000 --- a/gm-dash/server/src/app/test.rs +++ /dev/null @@ -1 +0,0 @@ - diff --git a/gm-dash/server/src/app/tests.rs b/gm-dash/server/src/app/tests.rs index 7bb1b1d..752c050 100644 --- a/gm-dash/server/src/app/tests.rs +++ b/gm-dash/server/src/app/tests.rs @@ -1,63 +1,126 @@ -use std::path::PathBuf; +use std::{future::Future, path::PathBuf, time::Duration}; use cool_asserts::assert_matches; +use tokio::sync::mpsc::{Receiver, Sender}; use crate::{ app::{App, AppError}, - audio_control::MemoryBackend, + types::{AudioControlMessage, AudioStatusMessage, Progress, TrackInfo, TrackSpec, Volume}, }; -fn with_memory_app(f: F) -where - F: Fn(App), -{ - let app = App::new(MemoryBackend::default()); +fn memory_app() -> ( + App, + Receiver, + Sender, +) { + let (audio_control_tx, audio_control_rx) = tokio::sync::mpsc::channel(5); + let (audio_status_tx, audio_status_rx) = tokio::sync::mpsc::channel(5); + let app = App::new(audio_control_tx, audio_status_rx); - f(app) + (app, audio_control_rx, audio_status_tx) } -#[test] -fn app_starts_in_stopped_state() { - with_memory_app(|app| { - assert!(!app.playing()); - }); +#[tokio::test] +async fn app_starts_in_stopped_state() { + let (app, _control_rx, _status_tx) = memory_app(); + + assert!(!app.playing()); } -#[test] -fn can_add_a_track_without_starting_playback() { - with_memory_app(|app| { - app.enable_track(PathBuf::from("/home/savanni/Music/Travis Savoie/RPG Toolkit Volume II/05 - Books and Spellcrafting.mp3.mp3")).expect("to enable a track"); - assert!(!app.playing()); - app.enable_track(PathBuf::from( - "/home/savanni/Music/Travis Savoie/RPG Toolkit Volume II/01 - A Day to Rebuild.mp3.mp3", - )) - .expect("to enable a track"); - assert!(!app.playing()); +#[tokio::test] +async fn can_add_a_track_without_starting_playback() { + let (app, mut control_rx, status_tx) = memory_app(); + let path_1 = PathBuf::from("/home/savanni/Music/Travis Savoie/RPG Toolkit Volume II/05 - Books and Spellcrafting.mp3.mp3"); + let path_2 = PathBuf::from( + "/home/savanni/Music/Travis Savoie/RPG Toolkit Volume II/01 - A Day to Rebuild.mp3.mp3", + ); + + { + app.enable_track(path_1.clone()) + .await + .expect("to enable a track"); + + assert_matches!(control_rx.recv().await, Some(AudioControlMessage::EnableTrack(trackspec)) => { + assert_eq!(trackspec, TrackSpec{ path: path_1.clone(), volume: Volume::try_from(1.0).unwrap() }); + }); + + status_tx + .send(AudioStatusMessage::Status(vec![TrackInfo { + path: path_1.clone(), + volume: Volume::try_from(1.0).unwrap(), + progress: Progress { + current: Duration::from_secs(0), + length: Duration::from_secs(100), + }, + }])) + .await + .expect("status send to work"); + + tokio::time::sleep(Duration::from_millis(1)).await; let tracks = app.enabled_tracks(); - tracks.iter().find(|p| **p == PathBuf::from("/home/savanni/Music/Travis Savoie/RPG Toolkit Volume II/05 - Books and Spellcrafting.mp3.mp3")).expect("the books and spellcrafting track to be enabled"); - tracks.iter().find(|p| **p == PathBuf::from("/home/savanni/Music/Travis Savoie/RPG Toolkit Volume II/01 - A Day to Rebuild.mp3.mp3")).expect("the day to rebuild track to be enabled"); - }); + tracks.iter().find(|p| **p == path_1); + assert!(!app.playing()); + } + + { + app.enable_track(path_2.clone()) + .await + .expect("to enable a track"); + + assert_matches!(control_rx.recv().await, Some(AudioControlMessage::EnableTrack(trackspec)) => { + assert_eq!(trackspec, TrackSpec{ path: path_2.clone(), volume: Volume::try_from(1.0).unwrap() }); + }); + + status_tx + .send(AudioStatusMessage::Status(vec![ + TrackInfo { + path: path_1.clone(), + volume: Volume::try_from(1.0).unwrap(), + progress: Progress { + current: Duration::from_secs(0), + length: Duration::from_secs(100), + }, + }, + TrackInfo { + path: path_2.clone(), + volume: Volume::try_from(1.0).unwrap(), + progress: Progress { + current: Duration::from_secs(0), + length: Duration::from_secs(100), + }, + }, + ])) + .await + .expect("status send to work"); + + tokio::time::sleep(Duration::from_millis(1)).await; + let tracks = app.enabled_tracks(); + tracks.iter().find(|p| **p == path_1); + tracks.iter().find(|p| **p == path_2); + } } -#[test] -fn cannot_start_playback_with_no_tracks() { - with_memory_app(|app| { - assert_matches!(app.play(), Err(AppError::NoTracks)); - }); +#[tokio::test] +async fn cannot_start_playback_with_no_tracks() { + let (app, control_rx, status_tx) = memory_app(); + // assert_matches!(app.play(), Err(AppError::NoTracks)); + unimplemented!() } -#[test] -fn can_add_a_track_during_playback() { - with_memory_app(|app| { - app.enable_track(PathBuf::from("/home/savanni/Music/Travis Savoie/RPG Toolkit Volume II/05 - Books and Spellcrafting.mp3.mp3")).expect("to enable a track"); - app.play().expect("to start playback"); +#[tokio::test] +async fn can_add_a_track_during_playback() { + let (app, control_rx, status_tx) = memory_app(); + /* + app.enable_track(PathBuf::from("/home/savanni/Music/Travis Savoie/RPG Toolkit Volume II/05 - Books and Spellcrafting.mp3.mp3")).expect("to enable a track"); + app.play().expect("to start playback"); - assert!(app.playing()); + assert!(app.playing()); - app.enable_track(PathBuf::from( - "/home/savanni/Music/Travis Savoie/RPG Toolkit Volume II/01 - A Day to Rebuild.mp3.mp3", - )) - .expect("to enable another track during playback"); - }); + app.enable_track(PathBuf::from( + "/home/savanni/Music/Travis Savoie/RPG Toolkit Volume II/01 - A Day to Rebuild.mp3.mp3", + )) + .expect("to enable another track during playback"); + */ + unimplemented!() } diff --git a/gm-dash/server/src/audio_control/mod.rs b/gm-dash/server/src/audio_control/mod.rs index 02c1920..d29dab3 100644 --- a/gm-dash/server/src/audio_control/mod.rs +++ b/gm-dash/server/src/audio_control/mod.rs @@ -1,13 +1,14 @@ use std::{ - collections::HashSet, + collections::HashMap, path::PathBuf, - sync::{Arc, RwLock}, + sync::{Arc, RwLock}, time::Duration, }; use gstreamer::{prelude::*, ClockTime, MessageType, MessageView}; use thiserror::Error; +use tokio::sync::mpsc::{Receiver, Sender}; -use crate::types::TrackInfo; +use crate::types::{AudioControlMessage, AudioStatusMessage, TrackSpec}; #[derive(Debug, Error, PartialEq)] pub enum AudioError { @@ -19,27 +20,75 @@ pub enum AudioError { } pub struct AudioControl { - backend: Arc>, + control_rx: Receiver, + status_tx: Sender, } +/* impl Default for AudioControl { fn default() -> Self { Self::new(GStreamerBackend::default()) } } +*/ impl AudioControl { - pub fn new(backend: impl AudioControlBackend + 'static) -> Self { - Self { - backend: Arc::new(RwLock::new(backend)), + pub fn new() -> ( + Self, + Sender, + Receiver, + ) { + let (control_tx, control_rx) = tokio::sync::mpsc::channel(5); + let (status_tx, status_rx) = tokio::sync::mpsc::channel(5); + ( + Self { + control_rx, + status_tx, + }, + control_tx, + status_rx, + ) + } + + pub async fn listen(&mut self) { + while let Some(msg) = self.control_rx.recv().await { + match msg { + AudioControlMessage::Play => { + unimplemented!() + } + AudioControlMessage::Pause => { + unimplemented!() + } + AudioControlMessage::EnableTrack(_) => { + unimplemented!() + } + AudioControlMessage::DisableTrack(_) => { + unimplemented!() + } + AudioControlMessage::ReportStatus => { + unimplemented!() + } + } } } + pub async fn report(&self) { + loop { + self.status_tx + .send(AudioStatusMessage::Status(vec![])) + .await + .expect("to successfully send a message"); + tokio::time::sleep(Duration::from_secs(1)); + } + } +} + +/* pub fn playing(&self) -> bool { self.backend.read().unwrap().playing() } - pub fn tracks(&self) -> Vec { + pub fn tracks(&self) -> Vec { self.backend.read().unwrap().tracks() } @@ -51,7 +100,7 @@ impl AudioControl { self.backend.read().unwrap().stop() } - pub fn add_track(&self, track: TrackInfo) -> Result<(), AudioError> { + pub fn add_track(&self, track: TrackSpec) -> Result<(), AudioError> { self.backend.write().unwrap().add_track(track) } } @@ -59,27 +108,27 @@ impl AudioControl { pub trait AudioControlBackend: Send + Sync { fn playing(&self) -> bool; - fn tracks(&self) -> Vec; + fn tracks(&self) -> Vec; fn play(&self) -> Result<(), AudioError>; fn stop(&self) -> Result<(), AudioError>; - fn add_track(&mut self, track: TrackInfo) -> Result<(), AudioError>; + fn add_track(&mut self, track: TrackSpec) -> Result<(), AudioError>; - fn remove_track(&mut self, track: TrackInfo) -> Result<(), AudioError>; + fn remove_track(&mut self, track: TrackSpec) -> Result<(), AudioError>; } pub struct MemoryBackend { playing: Arc>, - tracks: HashSet, + tracks: HashMap, } impl Default for MemoryBackend { fn default() -> Self { Self { playing: Arc::new(RwLock::new(false)), - tracks: HashSet::new(), + tracks: HashMap::new(), } } } @@ -89,8 +138,11 @@ impl AudioControlBackend for MemoryBackend { *self.playing.read().unwrap() } - fn tracks(&self) -> Vec { + fn tracks(&self) -> Vec { + /* self.tracks.iter().cloned().collect() + */ + vec![] } fn play(&self) -> Result<(), AudioError> { @@ -117,13 +169,17 @@ impl AudioControlBackend for MemoryBackend { } } - fn add_track(&mut self, track: TrackInfo) -> Result<(), AudioError> { + fn add_track(&mut self, track: TrackSpec) -> Result<(), AudioError> { + /* self.tracks.insert(track); + */ Ok(()) } - fn remove_track(&mut self, track: TrackInfo) -> Result<(), AudioError> { + fn remove_track(&mut self, track: TrackSpec) -> Result<(), AudioError> { + /* self.tracks.remove(&track); + */ Ok(()) } } @@ -216,7 +272,7 @@ impl AudioControlBackend for GStreamerBackend { *self.playing.read().unwrap() } - fn tracks(&self) -> Vec { + fn tracks(&self) -> Vec { vec![] } @@ -242,7 +298,7 @@ impl AudioControlBackend for GStreamerBackend { } } - fn add_track(&mut self, track: TrackInfo) -> Result<(), AudioError> { + fn add_track(&mut self, track: TrackSpec) -> Result<(), AudioError> { let source = gstreamer::ElementFactory::find("filesrc") .unwrap() .load() @@ -283,8 +339,9 @@ impl AudioControlBackend for GStreamerBackend { Ok(()) } - fn remove_track(&mut self, _path: TrackInfo) -> Result<(), AudioError> { + fn remove_track(&mut self, _path: TrackSpec) -> Result<(), AudioError> { unimplemented!() /* Need to run EOS through to a probe on the trailing end of the volume element */ } } +*/ diff --git a/gm-dash/server/src/main.rs b/gm-dash/server/src/main.rs index 095950c..ac85ecd 100644 --- a/gm-dash/server/src/main.rs +++ b/gm-dash/server/src/main.rs @@ -1,4 +1,4 @@ -use std::{net::{Ipv6Addr, SocketAddrV6}, path::PathBuf}; +use std::{net::{Ipv6Addr, SocketAddrV6}, path::PathBuf, sync::Arc}; use app::App; use pipewire::{context::Context, main_loop::MainLoop}; @@ -15,23 +15,23 @@ struct PlayTrackParams { track_name: String, } -async fn server_main(state: App) { +async fn server_main(app: Arc) { let localhost: Ipv6Addr = "::1".parse().unwrap(); let server_addr = SocketAddrV6::new(localhost, 3001, 0, 0); let root = warp::path!().map(|| "ok".to_string()); let list_output_devices = warp::path!("output_devices").map({ - let state = state.clone(); + let app = app.clone(); move || { - let devices = state.audio_devices(); + let devices = app.audio_devices(); serde_json::to_string(&devices).unwrap() } }); /* let list_tracks = warp::path!("tracks").map({ - let state = state.clone(); - move || serde_json::to_string(&state.tracks()).unwrap() + let app = app.clone(); + move || serde_json::to_string(&app.tracks()).unwrap() }); */ @@ -39,9 +39,9 @@ async fn server_main(state: App) { .and(warp::path!("playing")) .and(warp::body::json()) .map({ - let state = state.clone(); + let app = app.clone(); move |params: PlayTrackParams| { - let _ = state.enable_track(PathBuf::from(params.track_name)); + let _ = app.enable_track(PathBuf::from(params.track_name)); "".to_owned() } }); @@ -50,32 +50,32 @@ async fn server_main(state: App) { .and(warp::path!("playing")) .and(warp::body::json()) .map({ - let state = state.clone(); + let app = app.clone(); move |params: PlayTrackParams| { - let _ = state.disable_track(¶ms.track_name); + let _ = app.disable_track(¶ms.track_name); "".to_owned() } }); let play_all = warp::put().and(warp::path!("playing")).map({ - let state = state.clone(); + let app = app.clone(); move || { - let _ = state.play(); + let _ = app.play(); "".to_owned() } }); let stop_all = warp::delete().and(warp::path!("playing")).map({ - let state = state.clone(); + let app = app.clone(); move || { - let _ = state.stop(); + let _ = app.stop(); "".to_owned() } }); let now_playing = warp::path!("playing").map({ - let state = state.clone(); - move || serde_json::to_string(&state.playing()).unwrap() + let app = app.clone(); + move || serde_json::to_string(&app.playing()).unwrap() }); let routes = root @@ -90,15 +90,16 @@ async fn server_main(state: App) { serve(routes).run(server_addr).await; } -fn handle_add_audio_device(state: App, props: &pipewire::spa::utils::dict::DictRef) { +fn handle_add_audio_device(app: App, props: &pipewire::spa::utils::dict::DictRef) { if props.get("media.class") == Some("Audio/Sink") { if let Some(device_name) = props.get("node.description") { - state.add_audio(device_name.to_owned()); + app.add_audio(device_name.to_owned()); } } } -fn pipewire_loop(state: App) -> Result<(), Box> { +/* +fn pipewire_loop(app: App) -> Result<(), Box> { let mainloop = MainLoop::new(None)?; let context = Context::new(&mainloop)?; let core = context.connect(None)?; @@ -107,10 +108,10 @@ fn pipewire_loop(state: App) -> Result<(), Box> { let _listener = registry .add_listener_local() .global({ - let state = state.clone(); + let app = app.clone(); move |global_data| { if let Some(props) = global_data.props { - handle_add_audio_device(state.clone(), props); + handle_add_audio_device(app.clone(), props); } } }) @@ -120,20 +121,29 @@ fn pipewire_loop(state: App) -> Result<(), Box> { Ok(()) } +*/ +/* fn pipewire_main(state: App) { pipewire_loop(state).expect("pipewire should not error"); } +*/ #[tokio::main] async fn main() { gstreamer::init(); - let state = App::default(); + let (audio_control_tx, audio_control_rx) = tokio::sync::mpsc::channel(5); + let (audio_status_tx, audio_status_rx) = tokio::sync::mpsc::channel(5); + + let app = Arc::new(App::new(audio_control_tx, audio_status_rx)); + + /* spawn_blocking({ - let state = state.clone(); + let app = app.clone(); move || pipewire_main(state) }); + */ - server_main(state.clone()).await; + server_main(app.clone()).await; } diff --git a/gm-dash/server/src/types.rs b/gm-dash/server/src/types.rs index d300526..bd49cb8 100644 --- a/gm-dash/server/src/types.rs +++ b/gm-dash/server/src/types.rs @@ -1,4 +1,4 @@ -use std::path::PathBuf; +use std::{ops::Deref, path::PathBuf, time::Duration}; use thiserror::Error; @@ -10,7 +10,7 @@ pub enum AppError { NoTracks, #[error("Operation is invalid in the current state")] - InvalidState + InvalidState, } impl From for AppError { @@ -22,9 +22,64 @@ impl From for AppError { } } -#[derive(Clone, Debug, Hash, PartialEq, PartialOrd, Eq)] -pub struct TrackInfo { +#[derive(Clone, Debug, PartialEq, PartialOrd)] +pub struct TrackSpec { pub path: PathBuf, + pub volume: Volume, } +#[derive(Clone, Debug)] +pub struct Progress { + pub current: Duration, + pub length: Duration, +} +#[derive(Debug, Error, PartialEq)] +pub enum VolumeError { + #[error("The specified volume is out of range and must be between 0.0 and 1.0")] + OutOfRange, +} + +#[derive(Clone, Copy, Debug, PartialEq, PartialOrd)] +pub struct Volume(f32); + +impl TryFrom for Volume { + type Error = VolumeError; + fn try_from(val: f32) -> Result { + if val < 0. || val > 1. { + return Err(VolumeError::OutOfRange); + } + + Ok(Self(val)) + } +} + +impl From for f32 { + fn from(val: Volume) -> f32 { + val.0 + } +} + +#[derive(Clone, Debug)] +pub enum AudioControlMessage { + Play, + Pause, + EnableTrack(TrackSpec), + DisableTrack(PathBuf), + ReportStatus, +} + +#[derive(Clone, Debug)] +pub struct TrackInfo { + pub path: PathBuf, + pub volume: Volume, + pub progress: Progress, +} + +#[derive(Debug)] +pub enum AudioStatusMessage { + Playing, + Pausing, + Status(Vec), + AudioError(AudioError), +}