Set up to be able to add a track to a running pipeline

This commit is contained in:
Savanni D'Gerinel 2024-09-05 00:42:55 -04:00
parent 7da7ffcaa5
commit 118428d545

View File

@ -9,7 +9,9 @@ use gstreamer::{prelude::*, ClockTime, MessageType, MessageView};
use thiserror::Error;
use tokio::sync::mpsc::{Receiver, Sender};
use crate::types::{AudioControlMessage, AudioState, AudioStatusMessage, Progress, TrackInfo, TrackSpec};
use crate::types::{
AudioControlMessage, AudioState, AudioStatusMessage, Progress, TrackInfo, TrackSpec,
};
#[derive(Debug, Error, PartialEq)]
pub enum AudioError {
@ -62,7 +64,7 @@ impl AudioControl {
status_tx
.send(AudioStatusMessage::Status(AudioState {
playing: self.backend.playing(),
tracks: self.backend.tracks()
tracks: self.backend.tracks(),
}))
.await
.expect("to successfully send a message");
@ -228,20 +230,25 @@ impl Default for GStreamerBackend {
let state = Arc::new(RwLock::new(GStreamerBackendState::default()));
let monitor = std::thread::spawn({
let pipeline = pipeline.clone();
let pipeline_object = pipeline.clone().upcast::<gstreamer::Object>();
let state = state.clone();
let bus = bus.clone();
move || loop {
/*
if let Some(msg) = bus.timed_pop_filtered(
ClockTime::NONE,
gstreamer::ClockTime::from_mseconds(100),
&[
MessageType::Error,
MessageType::Eos,
MessageType::StateChanged,
],
) {
*/
if let Some(msg) = bus.timed_pop(gstreamer::ClockTime::from_mseconds(100)) {
match msg.view() {
MessageView::StateChanged(st) => {
println!("state changed: {:?}", st);
if msg.src() == Some(&pipeline_object) {
state.write().unwrap().playing =
st.current() == gstreamer::State::Playing;
@ -253,10 +260,16 @@ impl Default for GStreamerBackend {
MessageView::Eos(_) => {
println!("EOS");
}
_ => {
unreachable!();
msg => {
println!("{:?}", msg);
}
}
} else {
if state.read().unwrap().playing {
let mut q = gstreamer::query::Position::new(gstreamer::Format::Time);
pipeline.query(&mut q);
println!("Position: {:?}", q.result());
}
}
}
});
@ -312,14 +325,21 @@ impl AudioControlBackend for GStreamerBackend {
fn add_track(&self, track: TrackSpec) -> Result<(), AudioError> {
let mut st = self.state.write().unwrap();
st.tracks.insert(track.path.clone(), TrackInfo {
path: track.path.clone(),
volume: track.volume,
progress: Progress {
current: Duration::from_secs(0),
length: Duration::from_secs(1),
st.tracks.insert(
track.path.clone(),
TrackInfo {
path: track.path.clone(),
volume: track.volume,
progress: Progress {
current: Duration::from_secs(0),
length: Duration::from_secs(1),
},
},
});
);
if st.playing {
self.pipeline.set_state(gstreamer::State::Paused);
}
let source = gstreamer::ElementFactory::find("filesrc")
.unwrap()
@ -353,11 +373,21 @@ impl AudioControlBackend for GStreamerBackend {
self.pipeline.add(&volume).unwrap();
volume.link(&self.mixer).unwrap();
decoder.connect_pad_added(move |_, pad| {
let next_pad = volume.static_pad("sink").unwrap();
pad.link(&next_pad).unwrap();
decoder.connect_pad_added({
let volume = volume.clone();
move |_, pad| {
let next_pad = volume.static_pad("sink").unwrap();
pad.link(&next_pad).unwrap();
}
});
if st.playing {
source.set_state(gstreamer::State::Paused).unwrap();
decoder.set_state(gstreamer::State::Paused).unwrap();
volume.set_state(gstreamer::State::Paused).unwrap();
self.pipeline.set_state(gstreamer::State::Playing).unwrap();
}
Ok(())
}