From 7da7ffcaa503303e4490f35543633d04fe6e3063 Mon Sep 17 00:00:00 2001 From: Savanni D'Gerinel Date: Thu, 5 Sep 2024 00:16:28 -0400 Subject: [PATCH] Be able to add tracks, start, stop, and pause playback from the web api --- gm-dash/server/src/app/mod.rs | 44 +++++---- gm-dash/server/src/audio_control/mod.rs | 114 +++++++++++++++--------- gm-dash/server/src/main.rs | 70 +++++++++------ gm-dash/server/src/types.rs | 19 +++- 4 files changed, 165 insertions(+), 82 deletions(-) diff --git a/gm-dash/server/src/app/mod.rs b/gm-dash/server/src/app/mod.rs index aa5496d..3a5f43d 100644 --- a/gm-dash/server/src/app/mod.rs +++ b/gm-dash/server/src/app/mod.rs @@ -11,7 +11,9 @@ use tokio::{ use crate::{ audio_control::AudioControl, - types::{AppError, AudioControlMessage, AudioStatusMessage, TrackInfo, TrackSpec, Volume}, + types::{ + AppError, AudioControlMessage, AudioState, AudioStatusMessage, TrackInfo, TrackSpec, Volume, + }, }; #[cfg(test)] @@ -58,18 +60,21 @@ impl App { let state = state.clone(); async move { println!("listener started"); - loop { - match audio_status.recv().await { - Some(AudioStatusMessage::Playing) => { + while let Some(msg) = audio_status.recv().await { + match msg { + AudioStatusMessage::Playing => { state.write().unwrap().playing = true; } - Some(AudioStatusMessage::Status(track_status)) => { - println!("status: {:?}", track_status); - state.write().unwrap().track_status = track_status; + AudioStatusMessage::Status(AudioState { + playing: _playing, + tracks, + }) => { + state.write().unwrap().track_status = tracks; } msg => println!("message received from audio controller: {:?}", msg), } } + println!("listener exiting"); } }); @@ -105,6 +110,7 @@ impl App { } pub async fn enable_track(&self, path: PathBuf) -> Result<(), AppError> { + println!("enabling track: {}", path.display()); self.audio_control .send(AudioControlMessage::EnableTrack(TrackSpec { path, @@ -132,18 +138,26 @@ impl App { } pub async fn play(&self) -> Result<(), AppError> { - /* - let st = self.internal.write().unwrap(); - st.audio_control.play()?; - */ + self.audio_control + .send(AudioControlMessage::Play) + .await + .expect("audio control send to succeed"); Ok(()) } pub async fn stop(&self) -> Result<(), AppError> { - /* - let st = self.internal.write().unwrap(); - st.audio_control.stop()?; - */ + self.audio_control + .send(AudioControlMessage::Stop) + .await + .expect("audio control send to succeed"); + Ok(()) + } + + pub async fn pause(&self) -> Result<(), AppError> { + self.audio_control + .send(AudioControlMessage::Pause) + .await + .expect("audio control send to succeed"); Ok(()) } } diff --git a/gm-dash/server/src/audio_control/mod.rs b/gm-dash/server/src/audio_control/mod.rs index 9570d88..c777784 100644 --- a/gm-dash/server/src/audio_control/mod.rs +++ b/gm-dash/server/src/audio_control/mod.rs @@ -9,7 +9,7 @@ use gstreamer::{prelude::*, ClockTime, MessageType, MessageView}; use thiserror::Error; use tokio::sync::mpsc::{Receiver, Sender}; -use crate::types::{AudioControlMessage, AudioStatusMessage, TrackSpec}; +use crate::types::{AudioControlMessage, AudioState, AudioStatusMessage, Progress, TrackInfo, TrackSpec}; #[derive(Debug, Error, PartialEq)] pub enum AudioError { @@ -20,33 +20,32 @@ pub enum AudioError { InvalidState, } -pub struct AudioControl {} - -/* -impl Default for AudioControl { - fn default() -> Self { - Self::new(GStreamerBackend::default()) - } +pub struct AudioControl { + backend: Box, } -*/ impl AudioControl { - pub fn new() -> Self { - Self {} + pub fn new(backend: impl AudioControlBackend + 'static) -> Self { + Self { + backend: Box::new(backend), + } } pub async fn listen(&self, mut control_rx: Receiver) { - println!("waiting for control message"); while let Some(msg) = control_rx.recv().await { + println!("control message: {:?}", msg); match msg { AudioControlMessage::Play => { - unimplemented!() + self.backend.play().unwrap(); + } + AudioControlMessage::Stop => { + self.backend.stop().unwrap(); } AudioControlMessage::Pause => { - unimplemented!() + self.backend.pause().unwrap(); } - AudioControlMessage::EnableTrack(_) => { - unimplemented!() + AudioControlMessage::EnableTrack(spec) => { + self.backend.add_track(spec).unwrap(); } AudioControlMessage::DisableTrack(_) => { unimplemented!() @@ -60,9 +59,11 @@ impl AudioControl { pub async fn report(&self, status_tx: Sender) { loop { - println!("sending status message"); status_tx - .send(AudioStatusMessage::Status(vec![])) + .send(AudioStatusMessage::Status(AudioState { + playing: self.backend.playing(), + tracks: self.backend.tracks() + })) .await .expect("to successfully send a message"); let _ = tokio::time::sleep(Duration::from_secs(1)).await; @@ -91,21 +92,25 @@ impl AudioControl { self.backend.write().unwrap().add_track(track) } } +*/ pub trait AudioControlBackend: Send + Sync { fn playing(&self) -> bool; - fn tracks(&self) -> Vec; + fn tracks(&self) -> Vec; fn play(&self) -> Result<(), AudioError>; fn stop(&self) -> Result<(), AudioError>; - fn add_track(&mut self, track: TrackSpec) -> Result<(), AudioError>; + fn pause(&self) -> Result<(), AudioError>; - fn remove_track(&mut self, track: TrackSpec) -> Result<(), AudioError>; + fn add_track(&self, track: TrackSpec) -> Result<(), AudioError>; + + fn remove_track(&self, track: TrackSpec) -> Result<(), AudioError>; } +/* pub struct MemoryBackend { playing: Arc>, tracks: HashMap, @@ -170,6 +175,21 @@ impl AudioControlBackend for MemoryBackend { Ok(()) } } +*/ + +struct GStreamerBackendState { + playing: bool, + tracks: HashMap, +} + +impl Default for GStreamerBackendState { + fn default() -> Self { + Self { + playing: false, + tracks: HashMap::new(), + } + } +} pub struct GStreamerBackend { bus: gstreamer::Bus, @@ -178,7 +198,7 @@ pub struct GStreamerBackend { audio_sink: gstreamer::Element, monitor: std::thread::JoinHandle<()>, - playing: Arc>, + state: Arc>, } impl Default for GStreamerBackend { @@ -205,11 +225,11 @@ impl Default for GStreamerBackend { pipeline.add(&audio_sink).unwrap(); mixer.link(&audio_sink).unwrap(); - let playing = Arc::new(RwLock::new(false)); + let state = Arc::new(RwLock::new(GStreamerBackendState::default())); let monitor = std::thread::spawn({ let pipeline_object = pipeline.clone().upcast::(); - let playing = playing.clone(); + let state = state.clone(); let bus = bus.clone(); move || loop { if let Some(msg) = bus.timed_pop_filtered( @@ -223,7 +243,7 @@ impl Default for GStreamerBackend { match msg.view() { MessageView::StateChanged(st) => { if msg.src() == Some(&pipeline_object) { - *playing.write().unwrap() = + state.write().unwrap().playing = st.current() == gstreamer::State::Playing; } } @@ -249,25 +269,23 @@ impl Default for GStreamerBackend { monitor, - playing: Arc::new(RwLock::new(false)), + state, } } } impl AudioControlBackend for GStreamerBackend { fn playing(&self) -> bool { - *self.playing.read().unwrap() + self.state.read().unwrap().playing } - fn tracks(&self) -> Vec { + fn tracks(&self) -> Vec { vec![] } fn play(&self) -> Result<(), AudioError> { - let mut playing = self.playing.write().unwrap(); - if !*playing { - // self.pipeline.set_state(gstreamer::State::Playing).unwrap(); - *playing = true; + if !self.playing() { + self.pipeline.set_state(gstreamer::State::Playing).unwrap(); Ok(()) } else { Err(AudioError::InvalidState) @@ -275,17 +293,34 @@ impl AudioControlBackend for GStreamerBackend { } fn stop(&self) -> Result<(), AudioError> { - let mut playing = self.playing.write().unwrap(); - if *playing { - // self.pipeline.set_state(gstreamer::State::Paused).unwrap(); - *playing = false; + if self.playing() { + self.pipeline.set_state(gstreamer::State::Ready).unwrap(); Ok(()) } else { Err(AudioError::InvalidState) } } - fn add_track(&mut self, track: TrackSpec) -> Result<(), AudioError> { + fn pause(&self) -> Result<(), AudioError> { + if self.playing() { + self.pipeline.set_state(gstreamer::State::Paused).unwrap(); + Ok(()) + } else { + Err(AudioError::InvalidState) + } + } + + fn add_track(&self, track: TrackSpec) -> Result<(), AudioError> { + let mut st = self.state.write().unwrap(); + st.tracks.insert(track.path.clone(), TrackInfo { + path: track.path.clone(), + volume: track.volume, + progress: Progress { + current: Duration::from_secs(0), + length: Duration::from_secs(1), + }, + }); + let source = gstreamer::ElementFactory::find("filesrc") .unwrap() .load() @@ -312,7 +347,7 @@ impl AudioControlBackend for GStreamerBackend { .unwrap() .create() .property("mute", false) - .property("volume", 0.75) + .property("volume", track.volume.as_f64()) .build() .unwrap(); self.pipeline.add(&volume).unwrap(); @@ -326,9 +361,8 @@ impl AudioControlBackend for GStreamerBackend { Ok(()) } - fn remove_track(&mut self, _path: TrackSpec) -> Result<(), AudioError> { + fn remove_track(&self, _path: TrackSpec) -> Result<(), AudioError> { unimplemented!() /* Need to run EOS through to a probe on the trailing end of the volume element */ } } -*/ diff --git a/gm-dash/server/src/main.rs b/gm-dash/server/src/main.rs index 7087a2b..7d8b7ec 100644 --- a/gm-dash/server/src/main.rs +++ b/gm-dash/server/src/main.rs @@ -1,11 +1,12 @@ use std::{ + convert::Infallible, net::{Ipv6Addr, SocketAddrV6}, path::PathBuf, sync::Arc, }; use app::App; -use audio_control::AudioControl; +use audio_control::{AudioControl, GStreamerBackend}; use pipewire::{context::Context, main_loop::MainLoop}; use serde::Deserialize; use tokio::task::spawn_blocking; @@ -20,6 +21,10 @@ struct PlayTrackParams { track_name: String, } +fn with_app(app: Arc) -> impl Filter,), Error = Infallible> + Clone { + warp::any().map(move || app.clone()) +} + async fn server_main(app: Arc) { let localhost: Ipv6Addr = "::1".parse().unwrap(); let server_addr = SocketAddrV6::new(localhost, 3001, 0, 0); @@ -43,40 +48,52 @@ async fn server_main(app: Arc) { let enable_track = warp::put() .and(warp::path!("playing")) .and(warp::body::json()) - .map({ - let app = app.clone(); - move |params: PlayTrackParams| { - let _ = app.enable_track(PathBuf::from(params.track_name)); - "".to_owned() - } + .and(with_app(app.clone())) + .then(|params: PlayTrackParams, app: Arc| async move { + println!("enable track"); + let _ = app.enable_track(PathBuf::from(params.track_name)).await; + "".to_owned() }); let disable_track = warp::delete() .and(warp::path!("playing")) .and(warp::body::json()) - .map({ - let app = app.clone(); - move |params: PlayTrackParams| { - let _ = app.disable_track(¶ms.track_name); + .and(with_app(app.clone())) + .then(|params: PlayTrackParams, app: Arc| async move { + let _ = app.disable_track(¶ms.track_name); + "".to_owned() + }); + + let play_all = warp::post() + .and(warp::path!("play")) + .and(with_app(app.clone())) + .then({ + |app: Arc| async move { + println!("play_all"); + let _ = app.play().await; "".to_owned() } }); - let play_all = warp::put().and(warp::path!("playing")).map({ - let app = app.clone(); - move || { - let _ = app.play(); - "".to_owned() - } - }); + let stop_all = warp::post() + .and(warp::path!("stop")) + .and(with_app(app.clone())) + .then({ + |app: Arc| async move { + let _ = app.stop().await; + "".to_owned() + } + }); - let stop_all = warp::delete().and(warp::path!("playing")).map({ - let app = app.clone(); - move || { - let _ = app.stop(); - "".to_owned() - } - }); + let pause = warp::post() + .and(warp::path!("pause")) + .and(with_app(app.clone())) + .then({ + |app: Arc| async move { + let _ = app.pause().await; + "".to_owned() + } + }); let now_playing = warp::path!("playing").map({ let app = app.clone(); @@ -90,6 +107,7 @@ async fn server_main(app: Arc) { .or(disable_track) .or(play_all) .or(stop_all) + .or(pause) .or(now_playing); serve(routes).run(server_addr).await; @@ -142,7 +160,7 @@ async fn main() { let (audio_status_tx, audio_status_rx) = tokio::sync::mpsc::channel(5); let app = Arc::new(App::new(audio_control_tx, audio_status_rx)); - let audio_controller = Arc::new(AudioControl::new()); + let audio_controller = Arc::new(AudioControl::new(GStreamerBackend::default())); tokio::spawn({ let audio_controller = audio_controller.clone(); async move { audio_controller.listen(audio_control_rx).await } diff --git a/gm-dash/server/src/types.rs b/gm-dash/server/src/types.rs index bd49cb8..5b3d242 100644 --- a/gm-dash/server/src/types.rs +++ b/gm-dash/server/src/types.rs @@ -43,6 +43,16 @@ pub enum VolumeError { #[derive(Clone, Copy, Debug, PartialEq, PartialOrd)] pub struct Volume(f32); +impl Volume { + pub fn as_f32(&self) -> f32 { + self.0 + } + + pub fn as_f64(&self) -> f64 { + self.0.into() + } +} + impl TryFrom for Volume { type Error = VolumeError; fn try_from(val: f32) -> Result { @@ -64,6 +74,7 @@ impl From for f32 { pub enum AudioControlMessage { Play, Pause, + Stop, EnableTrack(TrackSpec), DisableTrack(PathBuf), ReportStatus, @@ -76,10 +87,16 @@ pub struct TrackInfo { pub progress: Progress, } +#[derive(Clone, Debug)] +pub struct AudioState { + pub playing: bool, + pub tracks: Vec, +} + #[derive(Debug)] pub enum AudioStatusMessage { Playing, Pausing, - Status(Vec), + Status(AudioState), AudioError(AudioError), }