diff --git a/gm-dash/server/src/audio_control/mod.rs b/gm-dash/server/src/audio_control/mod.rs index c777784..6279a8f 100644 --- a/gm-dash/server/src/audio_control/mod.rs +++ b/gm-dash/server/src/audio_control/mod.rs @@ -9,7 +9,9 @@ use gstreamer::{prelude::*, ClockTime, MessageType, MessageView}; use thiserror::Error; use tokio::sync::mpsc::{Receiver, Sender}; -use crate::types::{AudioControlMessage, AudioState, AudioStatusMessage, Progress, TrackInfo, TrackSpec}; +use crate::types::{ + AudioControlMessage, AudioState, AudioStatusMessage, Progress, TrackInfo, TrackSpec, +}; #[derive(Debug, Error, PartialEq)] pub enum AudioError { @@ -62,7 +64,7 @@ impl AudioControl { status_tx .send(AudioStatusMessage::Status(AudioState { playing: self.backend.playing(), - tracks: self.backend.tracks() + tracks: self.backend.tracks(), })) .await .expect("to successfully send a message"); @@ -228,20 +230,25 @@ impl Default for GStreamerBackend { let state = Arc::new(RwLock::new(GStreamerBackendState::default())); let monitor = std::thread::spawn({ + let pipeline = pipeline.clone(); let pipeline_object = pipeline.clone().upcast::(); let state = state.clone(); let bus = bus.clone(); move || loop { + /* if let Some(msg) = bus.timed_pop_filtered( - ClockTime::NONE, + gstreamer::ClockTime::from_mseconds(100), &[ MessageType::Error, MessageType::Eos, MessageType::StateChanged, ], ) { + */ + if let Some(msg) = bus.timed_pop(gstreamer::ClockTime::from_mseconds(100)) { match msg.view() { MessageView::StateChanged(st) => { + println!("state changed: {:?}", st); if msg.src() == Some(&pipeline_object) { state.write().unwrap().playing = st.current() == gstreamer::State::Playing; @@ -253,10 +260,16 @@ impl Default for GStreamerBackend { MessageView::Eos(_) => { println!("EOS"); } - _ => { - unreachable!(); + msg => { + println!("{:?}", msg); } } + } else { + if state.read().unwrap().playing { + let mut q = gstreamer::query::Position::new(gstreamer::Format::Time); + pipeline.query(&mut q); + println!("Position: {:?}", q.result()); + } } } }); @@ -312,14 +325,21 @@ impl AudioControlBackend for GStreamerBackend { fn add_track(&self, track: TrackSpec) -> Result<(), AudioError> { let mut st = self.state.write().unwrap(); - st.tracks.insert(track.path.clone(), TrackInfo { - path: track.path.clone(), - volume: track.volume, - progress: Progress { - current: Duration::from_secs(0), - length: Duration::from_secs(1), + st.tracks.insert( + track.path.clone(), + TrackInfo { + path: track.path.clone(), + volume: track.volume, + progress: Progress { + current: Duration::from_secs(0), + length: Duration::from_secs(1), + }, }, - }); + ); + + if st.playing { + self.pipeline.set_state(gstreamer::State::Paused); + } let source = gstreamer::ElementFactory::find("filesrc") .unwrap() @@ -353,11 +373,21 @@ impl AudioControlBackend for GStreamerBackend { self.pipeline.add(&volume).unwrap(); volume.link(&self.mixer).unwrap(); - decoder.connect_pad_added(move |_, pad| { - let next_pad = volume.static_pad("sink").unwrap(); - pad.link(&next_pad).unwrap(); + decoder.connect_pad_added({ + let volume = volume.clone(); + move |_, pad| { + let next_pad = volume.static_pad("sink").unwrap(); + pad.link(&next_pad).unwrap(); + } }); + if st.playing { + source.set_state(gstreamer::State::Paused).unwrap(); + decoder.set_state(gstreamer::State::Paused).unwrap(); + volume.set_state(gstreamer::State::Paused).unwrap(); + self.pipeline.set_state(gstreamer::State::Playing).unwrap(); + } + Ok(()) }