use std::{ collections::HashMap, path::PathBuf, sync::{Arc, RwLock}, time::Duration, }; use gstreamer::{prelude::*, ClockTime, MessageType, MessageView}; use thiserror::Error; use tokio::sync::mpsc::{Receiver, Sender}; use crate::types::{AudioControlMessage, AudioStatusMessage, TrackSpec}; #[derive(Debug, Error, PartialEq)] pub enum AudioError { #[error("No tracks are available to play")] NoTracks, #[error("Cannot perform operation in the current state")] InvalidState, } pub struct AudioControl {} /* impl Default for AudioControl { fn default() -> Self { Self::new(GStreamerBackend::default()) } } */ impl AudioControl { pub fn new() -> Self { Self {} } pub async fn listen(&self, mut control_rx: Receiver<AudioControlMessage>) { println!("waiting for control message"); while let Some(msg) = control_rx.recv().await { match msg { AudioControlMessage::Play => { unimplemented!() } AudioControlMessage::Pause => { unimplemented!() } AudioControlMessage::EnableTrack(_) => { unimplemented!() } AudioControlMessage::DisableTrack(_) => { unimplemented!() } AudioControlMessage::ReportStatus => { unimplemented!() } } } } pub async fn report(&self, status_tx: Sender<AudioStatusMessage>) { loop { println!("sending status message"); status_tx .send(AudioStatusMessage::Status(vec![])) .await .expect("to successfully send a message"); let _ = tokio::time::sleep(Duration::from_secs(1)).await; } } } /* pub fn playing(&self) -> bool { self.backend.read().unwrap().playing() } pub fn tracks(&self) -> Vec<TrackSpec> { self.backend.read().unwrap().tracks() } pub fn play(&self) -> Result<(), AudioError> { self.backend.read().unwrap().play() } pub fn stop(&self) -> Result<(), AudioError> { self.backend.read().unwrap().stop() } pub fn add_track(&self, track: TrackSpec) -> Result<(), AudioError> { self.backend.write().unwrap().add_track(track) } } pub trait AudioControlBackend: Send + Sync { fn playing(&self) -> bool; fn tracks(&self) -> Vec<TrackSpec>; fn play(&self) -> Result<(), AudioError>; fn stop(&self) -> Result<(), AudioError>; fn add_track(&mut self, track: TrackSpec) -> Result<(), AudioError>; fn remove_track(&mut self, track: TrackSpec) -> Result<(), AudioError>; } pub struct MemoryBackend { playing: Arc<RwLock<bool>>, tracks: HashMap<PathBuf, TrackSpec>, } impl Default for MemoryBackend { fn default() -> Self { Self { playing: Arc::new(RwLock::new(false)), tracks: HashMap::new(), } } } impl AudioControlBackend for MemoryBackend { fn playing(&self) -> bool { *self.playing.read().unwrap() } fn tracks(&self) -> Vec<TrackSpec> { /* self.tracks.iter().cloned().collect() */ vec![] } fn play(&self) -> Result<(), AudioError> { if self.tracks.is_empty() { return Err(AudioError::NoTracks); } let mut playing = self.playing.write().unwrap(); if *playing { return Err(AudioError::InvalidState); } *playing = true; Ok(()) } fn stop(&self) -> Result<(), AudioError> { let mut playing = self.playing.write().unwrap(); if *playing { *playing = false; Ok(()) } else { Err(AudioError::InvalidState) } } fn add_track(&mut self, track: TrackSpec) -> Result<(), AudioError> { /* self.tracks.insert(track); */ Ok(()) } fn remove_track(&mut self, track: TrackSpec) -> Result<(), AudioError> { /* self.tracks.remove(&track); */ Ok(()) } } pub struct GStreamerBackend { bus: gstreamer::Bus, pipeline: gstreamer::Pipeline, mixer: gstreamer::Element, audio_sink: gstreamer::Element, monitor: std::thread::JoinHandle<()>, playing: Arc<RwLock<bool>>, } impl Default for GStreamerBackend { fn default() -> Self { let pipeline = gstreamer::Pipeline::new(); let bus = pipeline.bus().unwrap(); let mixer = gstreamer::ElementFactory::find("audiomixer") .unwrap() .load() .unwrap() .create() .build() .unwrap(); pipeline.add(&mixer).unwrap(); let audio_sink = gstreamer::ElementFactory::find("pulsesink") .unwrap() .load() .unwrap() .create() .build() .unwrap(); pipeline.add(&audio_sink).unwrap(); mixer.link(&audio_sink).unwrap(); let playing = Arc::new(RwLock::new(false)); let monitor = std::thread::spawn({ let pipeline_object = pipeline.clone().upcast::<gstreamer::Object>(); let playing = playing.clone(); let bus = bus.clone(); move || loop { if let Some(msg) = bus.timed_pop_filtered( ClockTime::NONE, &[ MessageType::Error, MessageType::Eos, MessageType::StateChanged, ], ) { match msg.view() { MessageView::StateChanged(st) => { if msg.src() == Some(&pipeline_object) { *playing.write().unwrap() = st.current() == gstreamer::State::Playing; } } MessageView::Error(err) => { println!("error: {:?}", err); } MessageView::Eos(_) => { println!("EOS"); } _ => { unreachable!(); } } } } }); Self { bus, pipeline, mixer, audio_sink, monitor, playing: Arc::new(RwLock::new(false)), } } } impl AudioControlBackend for GStreamerBackend { fn playing(&self) -> bool { *self.playing.read().unwrap() } fn tracks(&self) -> Vec<TrackSpec> { vec![] } fn play(&self) -> Result<(), AudioError> { let mut playing = self.playing.write().unwrap(); if !*playing { // self.pipeline.set_state(gstreamer::State::Playing).unwrap(); *playing = true; Ok(()) } else { Err(AudioError::InvalidState) } } fn stop(&self) -> Result<(), AudioError> { let mut playing = self.playing.write().unwrap(); if *playing { // self.pipeline.set_state(gstreamer::State::Paused).unwrap(); *playing = false; Ok(()) } else { Err(AudioError::InvalidState) } } fn add_track(&mut self, track: TrackSpec) -> Result<(), AudioError> { let source = gstreamer::ElementFactory::find("filesrc") .unwrap() .load() .unwrap() .create() .property("location", track.path.to_str().unwrap()) .build() .unwrap(); self.pipeline.add(&source).unwrap(); let decoder = gstreamer::ElementFactory::find("decodebin") .unwrap() .load() .unwrap() .create() .build() .unwrap(); self.pipeline.add(&decoder).unwrap(); source.link(&decoder).unwrap(); let volume = gstreamer::ElementFactory::find("volume") .unwrap() .load() .unwrap() .create() .property("mute", false) .property("volume", 0.75) .build() .unwrap(); self.pipeline.add(&volume).unwrap(); volume.link(&self.mixer).unwrap(); decoder.connect_pad_added(move |_, pad| { let next_pad = volume.static_pad("sink").unwrap(); pad.link(&next_pad).unwrap(); }); Ok(()) } fn remove_track(&mut self, _path: TrackSpec) -> Result<(), AudioError> { unimplemented!() /* Need to run EOS through to a probe on the trailing end of the volume element */ } } */