Speculative server architecture

This commit is contained in:
Savanni D'Gerinel 2024-09-04 01:41:54 -04:00
parent 36d489e8a2
commit f941d1fb66
8 changed files with 319 additions and 54 deletions

2
Cargo.lock generated
View File

@ -1946,11 +1946,13 @@ dependencies = [
name = "gm-dash" name = "gm-dash"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"cool_asserts",
"glib 0.18.5", "glib 0.18.5",
"gstreamer", "gstreamer",
"pipewire", "pipewire",
"serde 1.0.209", "serde 1.0.209",
"serde_json", "serde_json",
"thiserror",
"tokio", "tokio",
"warp", "warp",
] ]

View File

@ -13,3 +13,6 @@ serde_json = { version = "1.0.127" }
tokio = { version = "1.39.3", features = ["full"] } tokio = { version = "1.39.3", features = ["full"] }
warp = { version = "0.3.7" } warp = { version = "0.3.7" }
glib = { version = "0.18" } glib = { version = "0.18" }
thiserror = "1.0.63"
cool_asserts = "2.0.3"

View File

@ -1,18 +1,25 @@
use std::{ use std::{
collections::HashSet, collections::HashMap,
path::PathBuf,
sync::{Arc, RwLock}, sync::{Arc, RwLock},
}; };
use crate::audio_control::AudioControl; use crate::{
audio_control::{AudioControl, AudioControlBackend},
types::{AppError, TrackInfo},
};
struct AppState { struct AppState {
device_list: Vec<String>, device_list: Vec<String>,
track_list: Vec<String>, track_list: Vec<PathBuf>,
currently_playing: HashSet<String>,
audio_control: AudioControl, audio_control: AudioControl,
} }
#[cfg(test)]
mod tests;
/*
impl Default for AppState { impl Default for AppState {
fn default() -> Self { fn default() -> Self {
Self { Self {
@ -30,20 +37,44 @@ impl Default for AppState {
} }
} }
} }
*/
#[derive(Clone)] #[derive(Clone)]
pub struct App { pub struct App {
internal: Arc<RwLock<AppState>>, internal: Arc<RwLock<AppState>>,
} }
impl Default for App {
fn default() -> Self {
let internal = AppState {
device_list: vec![],
track_list: vec![],
audio_control: AudioControl::default(),
};
Self {
internal: Arc::new(RwLock::new(internal)),
}
}
}
impl App { impl App {
fn new() -> App { fn new(backend: impl AudioControlBackend + 'static) -> App {
let internal = AppState::default(); let internal = AppState {
device_list: vec![],
track_list: vec![],
audio_control: AudioControl::new(backend),
};
App { App {
internal: Arc::new(RwLock::new(internal)), internal: Arc::new(RwLock::new(internal)),
} }
} }
pub fn playing(&self) -> bool {
self.internal.read().unwrap().audio_control.playing()
}
pub fn add_audio(&self, device: String) { pub fn add_audio(&self, device: String) {
let mut st = self.internal.write().unwrap(); let mut st = self.internal.write().unwrap();
st.device_list.push(device); st.device_list.push(device);
@ -54,47 +85,39 @@ impl App {
st.device_list.clone() st.device_list.clone()
} }
pub fn tracks(&self) -> Vec<String> { pub fn enabled_tracks(&self) -> Vec<PathBuf>{
let st = self.internal.read().unwrap(); let st = self.internal.read().unwrap();
st.track_list.clone() st.audio_control.tracks().into_iter().map(|ti| ti.path.clone()).collect()
} }
pub fn enable_track(&self, track: &str) -> Result<(), String> { pub fn enable_track(&self, path: PathBuf) -> Result<(), AppError> {
let mut st = self.internal.write().unwrap(); let st = self.internal.write().unwrap();
if !st.currently_playing.contains(track) { st.audio_control.add_track(TrackInfo{ path })?;
st.currently_playing.insert(track.to_owned());
}
Ok(()) Ok(())
} }
pub fn disable_track(&self, track: &str) -> Result<(), String> { pub fn disable_track(&self, _track: &str) -> Result<(), AppError> {
/*
let mut st = self.internal.write().unwrap(); let mut st = self.internal.write().unwrap();
if st.currently_playing.contains(track) { if st.currently_playing.contains_key(track) {
st.currently_playing.remove(track); st.currently_playing.remove(track);
} }
Ok(()) Ok(())
} */
pub fn play(&self) -> Result<(), String> {
let st = self.internal.write().unwrap();
st.audio_control.play();
Ok(()) Ok(())
} }
pub fn stop(&self) -> Result<(), String> { pub fn play(&self) -> Result<(), AppError> {
let st = self.internal.write().unwrap(); let st = self.internal.write().unwrap();
st.audio_control.stop(); st.audio_control.play()?;
Ok(()) Ok(())
} }
pub fn playing(&self) -> Vec<String> { pub fn stop(&self) -> Result<(), AppError> {
let st = self.internal.read().unwrap(); let st = self.internal.write().unwrap();
st.currently_playing.iter().cloned().collect() st.audio_control.stop()?;
} Ok(())
}
impl Default for App {
fn default() -> App {
App::new()
} }
} }

View File

@ -0,0 +1 @@

View File

@ -0,0 +1,63 @@
use std::path::PathBuf;
use cool_asserts::assert_matches;
use crate::{
app::{App, AppError},
audio_control::MemoryBackend,
};
fn with_memory_app<F>(f: F)
where
F: Fn(App),
{
let app = App::new(MemoryBackend::default());
f(app)
}
#[test]
fn app_starts_in_stopped_state() {
with_memory_app(|app| {
assert!(!app.playing());
});
}
#[test]
fn can_add_a_track_without_starting_playback() {
with_memory_app(|app| {
app.enable_track(PathBuf::from("/home/savanni/Music/Travis Savoie/RPG Toolkit Volume II/05 - Books and Spellcrafting.mp3.mp3")).expect("to enable a track");
assert!(!app.playing());
app.enable_track(PathBuf::from(
"/home/savanni/Music/Travis Savoie/RPG Toolkit Volume II/01 - A Day to Rebuild.mp3.mp3",
))
.expect("to enable a track");
assert!(!app.playing());
let tracks = app.enabled_tracks();
tracks.iter().find(|p| **p == PathBuf::from("/home/savanni/Music/Travis Savoie/RPG Toolkit Volume II/05 - Books and Spellcrafting.mp3.mp3")).expect("the books and spellcrafting track to be enabled");
tracks.iter().find(|p| **p == PathBuf::from("/home/savanni/Music/Travis Savoie/RPG Toolkit Volume II/01 - A Day to Rebuild.mp3.mp3")).expect("the day to rebuild track to be enabled");
});
}
#[test]
fn cannot_start_playback_with_no_tracks() {
with_memory_app(|app| {
assert_matches!(app.play(), Err(AppError::NoTracks));
});
}
#[test]
fn can_add_a_track_during_playback() {
with_memory_app(|app| {
app.enable_track(PathBuf::from("/home/savanni/Music/Travis Savoie/RPG Toolkit Volume II/05 - Books and Spellcrafting.mp3.mp3")).expect("to enable a track");
app.play().expect("to start playback");
assert!(app.playing());
app.enable_track(PathBuf::from(
"/home/savanni/Music/Travis Savoie/RPG Toolkit Volume II/01 - A Day to Rebuild.mp3.mp3",
))
.expect("to enable another track during playback");
});
}

View File

@ -1,18 +1,144 @@
use std::{
collections::HashSet,
path::PathBuf,
sync::{Arc, RwLock},
};
use gstreamer::{prelude::*, ClockTime, MessageType, MessageView}; use gstreamer::{prelude::*, ClockTime, MessageType, MessageView};
use std::sync::{Arc, RwLock}; use thiserror::Error;
use crate::types::TrackInfo;
#[derive(Debug, Error, PartialEq)]
pub enum AudioError {
#[error("No tracks are available to play")]
NoTracks,
#[error("Cannot perform operation in the current state")]
InvalidState,
}
pub struct AudioControl { pub struct AudioControl {
backend: Arc<RwLock<dyn AudioControlBackend>>,
}
impl Default for AudioControl {
fn default() -> Self {
Self::new(GStreamerBackend::default())
}
}
impl AudioControl {
pub fn new(backend: impl AudioControlBackend + 'static) -> Self {
Self {
backend: Arc::new(RwLock::new(backend)),
}
}
pub fn playing(&self) -> bool {
self.backend.read().unwrap().playing()
}
pub fn tracks(&self) -> Vec<TrackInfo> {
self.backend.read().unwrap().tracks()
}
pub fn play(&self) -> Result<(), AudioError> {
self.backend.read().unwrap().play()
}
pub fn stop(&self) -> Result<(), AudioError> {
self.backend.read().unwrap().stop()
}
pub fn add_track(&self, track: TrackInfo) -> Result<(), AudioError> {
self.backend.write().unwrap().add_track(track)
}
}
pub trait AudioControlBackend: Send + Sync {
fn playing(&self) -> bool;
fn tracks(&self) -> Vec<TrackInfo>;
fn play(&self) -> Result<(), AudioError>;
fn stop(&self) -> Result<(), AudioError>;
fn add_track(&mut self, track: TrackInfo) -> Result<(), AudioError>;
fn remove_track(&mut self, track: TrackInfo) -> Result<(), AudioError>;
}
pub struct MemoryBackend {
playing: Arc<RwLock<bool>>,
tracks: HashSet<TrackInfo>,
}
impl Default for MemoryBackend {
fn default() -> Self {
Self {
playing: Arc::new(RwLock::new(false)),
tracks: HashSet::new(),
}
}
}
impl AudioControlBackend for MemoryBackend {
fn playing(&self) -> bool {
*self.playing.read().unwrap()
}
fn tracks(&self) -> Vec<TrackInfo> {
self.tracks.iter().cloned().collect()
}
fn play(&self) -> Result<(), AudioError> {
if self.tracks.is_empty() {
return Err(AudioError::NoTracks);
}
let mut playing = self.playing.write().unwrap();
if *playing {
return Err(AudioError::InvalidState);
}
*playing = true;
Ok(())
}
fn stop(&self) -> Result<(), AudioError> {
let mut playing = self.playing.write().unwrap();
if *playing {
*playing = false;
Ok(())
} else {
Err(AudioError::InvalidState)
}
}
fn add_track(&mut self, track: TrackInfo) -> Result<(), AudioError> {
self.tracks.insert(track);
Ok(())
}
fn remove_track(&mut self, track: TrackInfo) -> Result<(), AudioError> {
self.tracks.remove(&track);
Ok(())
}
}
pub struct GStreamerBackend {
bus: gstreamer::Bus, bus: gstreamer::Bus,
pipeline: gstreamer::Pipeline, pipeline: gstreamer::Pipeline,
mixer: gstreamer::Element, mixer: gstreamer::Element,
audio_sink: gstreamer::Element, audio_sink: gstreamer::Element,
monitor: std::thread::JoinHandle<()>,
bus_monitor: std::thread::JoinHandle<()>,
playing: Arc<RwLock<bool>>, playing: Arc<RwLock<bool>>,
} }
impl Default for AudioControl { impl Default for GStreamerBackend {
fn default() -> Self { fn default() -> Self {
let pipeline = gstreamer::Pipeline::new(); let pipeline = gstreamer::Pipeline::new();
let bus = pipeline.bus().unwrap(); let bus = pipeline.bus().unwrap();
@ -38,7 +164,7 @@ impl Default for AudioControl {
let playing = Arc::new(RwLock::new(false)); let playing = Arc::new(RwLock::new(false));
let bus_monitor = std::thread::spawn({ let monitor = std::thread::spawn({
let pipeline_object = pipeline.clone().upcast::<gstreamer::Object>(); let pipeline_object = pipeline.clone().upcast::<gstreamer::Object>();
let playing = playing.clone(); let playing = playing.clone();
let bus = bus.clone(); let bus = bus.clone();
@ -54,7 +180,8 @@ impl Default for AudioControl {
match msg.view() { match msg.view() {
MessageView::StateChanged(st) => { MessageView::StateChanged(st) => {
if msg.src() == Some(&pipeline_object) { if msg.src() == Some(&pipeline_object) {
*playing.write().unwrap() = st.current() == gstreamer::State::Playing; *playing.write().unwrap() =
st.current() == gstreamer::State::Playing;
} }
} }
MessageView::Error(err) => { MessageView::Error(err) => {
@ -77,41 +204,51 @@ impl Default for AudioControl {
mixer, mixer,
audio_sink, audio_sink,
bus_monitor, monitor,
playing, playing: Arc::new(RwLock::new(false)),
} }
} }
} }
impl AudioControl { impl AudioControlBackend for GStreamerBackend {
pub fn playing(&self) -> bool { fn playing(&self) -> bool {
*self.playing.read().unwrap() *self.playing.read().unwrap()
} }
pub fn play(&self) { fn tracks(&self) -> Vec<TrackInfo> {
vec![]
}
fn play(&self) -> Result<(), AudioError> {
let mut playing = self.playing.write().unwrap(); let mut playing = self.playing.write().unwrap();
if !*playing { if !*playing {
// self.pipeline.set_state(gstreamer::State::Playing).unwrap(); // self.pipeline.set_state(gstreamer::State::Playing).unwrap();
*playing = true; *playing = true;
Ok(())
} else {
Err(AudioError::InvalidState)
} }
} }
pub fn stop(&self) { fn stop(&self) -> Result<(), AudioError> {
let mut playing = self.playing.write().unwrap(); let mut playing = self.playing.write().unwrap();
if *playing { if *playing {
// self.pipeline.set_state(gstreamer::State::Paused).unwrap(); // self.pipeline.set_state(gstreamer::State::Paused).unwrap();
*playing = false; *playing = false;
Ok(())
} else {
Err(AudioError::InvalidState)
} }
} }
pub fn add_track(&mut self, path: String) { fn add_track(&mut self, track: TrackInfo) -> Result<(), AudioError> {
let source = gstreamer::ElementFactory::find("filesrc") let source = gstreamer::ElementFactory::find("filesrc")
.unwrap() .unwrap()
.load() .load()
.unwrap() .unwrap()
.create() .create()
.property("location", path) .property("location", track.path.to_str().unwrap())
.build() .build()
.unwrap(); .unwrap();
self.pipeline.add(&source).unwrap(); self.pipeline.add(&source).unwrap();
@ -142,9 +279,12 @@ impl AudioControl {
let next_pad = volume.static_pad("sink").unwrap(); let next_pad = volume.static_pad("sink").unwrap();
pad.link(&next_pad).unwrap(); pad.link(&next_pad).unwrap();
}); });
Ok(())
} }
pub fn remove_track(&mut self, path: String) { fn remove_track(&mut self, _path: TrackInfo) -> Result<(), AudioError> {
unimplemented!()
/* Need to run EOS through to a probe on the trailing end of the volume element */ /* Need to run EOS through to a probe on the trailing end of the volume element */
} }
} }

View File

@ -1,13 +1,14 @@
use std::{net::{Ipv6Addr, SocketAddrV6}, path::PathBuf};
use app::App;
use pipewire::{context::Context, main_loop::MainLoop}; use pipewire::{context::Context, main_loop::MainLoop};
use serde::Deserialize; use serde::Deserialize;
use std::net::{Ipv6Addr, SocketAddrV6};
use tokio::task::spawn_blocking; use tokio::task::spawn_blocking;
use warp::{serve, Filter}; use warp::{serve, Filter};
mod audio_control; mod audio_control;
mod app; mod app;
use app::App; mod types;
#[derive(Deserialize)] #[derive(Deserialize)]
struct PlayTrackParams { struct PlayTrackParams {
@ -27,10 +28,12 @@ async fn server_main(state: App) {
} }
}); });
/*
let list_tracks = warp::path!("tracks").map({ let list_tracks = warp::path!("tracks").map({
let state = state.clone(); let state = state.clone();
move || serde_json::to_string(&state.tracks()).unwrap() move || serde_json::to_string(&state.tracks()).unwrap()
}); });
*/
let enable_track = warp::put() let enable_track = warp::put()
.and(warp::path!("playing")) .and(warp::path!("playing"))
@ -38,7 +41,7 @@ async fn server_main(state: App) {
.map({ .map({
let state = state.clone(); let state = state.clone();
move |params: PlayTrackParams| { move |params: PlayTrackParams| {
state.enable_track(&params.track_name); let _ = state.enable_track(PathBuf::from(params.track_name));
"".to_owned() "".to_owned()
} }
}); });
@ -49,7 +52,7 @@ async fn server_main(state: App) {
.map({ .map({
let state = state.clone(); let state = state.clone();
move |params: PlayTrackParams| { move |params: PlayTrackParams| {
state.disable_track(&params.track_name); let _ = state.disable_track(&params.track_name);
"".to_owned() "".to_owned()
} }
}); });
@ -57,7 +60,7 @@ async fn server_main(state: App) {
let play_all = warp::put().and(warp::path!("playing")).map({ let play_all = warp::put().and(warp::path!("playing")).map({
let state = state.clone(); let state = state.clone();
move || { move || {
state.play(); let _ = state.play();
"".to_owned() "".to_owned()
} }
}); });
@ -65,7 +68,7 @@ async fn server_main(state: App) {
let stop_all = warp::delete().and(warp::path!("playing")).map({ let stop_all = warp::delete().and(warp::path!("playing")).map({
let state = state.clone(); let state = state.clone();
move || { move || {
state.stop(); let _ = state.stop();
"".to_owned() "".to_owned()
} }
}); });
@ -77,7 +80,7 @@ async fn server_main(state: App) {
let routes = root let routes = root
.or(list_output_devices) .or(list_output_devices)
.or(list_tracks) // .or(list_tracks)
.or(enable_track) .or(enable_track)
.or(disable_track) .or(disable_track)
.or(play_all) .or(play_all)

View File

@ -0,0 +1,30 @@
use std::path::PathBuf;
use thiserror::Error;
use crate::audio_control::AudioError;
#[derive(Debug, Error, PartialEq)]
pub enum AppError {
#[error("Operation invalid with no tracks enabled")]
NoTracks,
#[error("Operation is invalid in the current state")]
InvalidState
}
impl From<AudioError> for AppError {
fn from(err: AudioError) -> Self {
match err {
AudioError::NoTracks => Self::NoTracks,
AudioError::InvalidState => Self::InvalidState,
}
}
}
#[derive(Clone, Debug, Hash, PartialEq, PartialOrd, Eq)]
pub struct TrackInfo {
pub path: PathBuf,
}