From 8260b4e2f322f028d01a755289045a491ce1a329 Mon Sep 17 00:00:00 2001 From: Savanni D'Gerinel Date: Sat, 11 Mar 2023 13:02:52 -0500 Subject: [PATCH] Overhaul the threading model to use more async tasks and less threads --- music-player/server/Cargo.toml | 3 + music-player/server/src/bin/server.rs | 37 +++++-- music-player/server/src/core.rs | 154 +++++++++++++------------- music-player/server/src/lib.rs | 2 +- music-player/server/src/playback.rs | 91 ++++++++------- 5 files changed, 157 insertions(+), 130 deletions(-) diff --git a/music-player/server/Cargo.toml b/music-player/server/Cargo.toml index ea753c8..a55ec1d 100644 --- a/music-player/server/Cargo.toml +++ b/music-player/server/Cargo.toml @@ -20,4 +20,7 @@ uuid = { version = "1", features = ["v4"] } warp = { version = "0.3" } urlencoding = { version = "2.1" } +[target.armv7-unknown-linux-gnueabi] +linker = "arm-linux-gnueabi-gcc" + [lib] diff --git a/music-player/server/src/bin/server.rs b/music-player/server/src/bin/server.rs index fc0a0a9..3197f7f 100644 --- a/music-player/server/src/bin/server.rs +++ b/music-player/server/src/bin/server.rs @@ -9,7 +9,7 @@ use serde::Deserialize; use std::{ net::{IpAddr, Ipv4Addr, SocketAddr}, path::PathBuf, - sync::Arc, + sync::{Arc, RwLock}, thread, }; use warp::Filter; @@ -33,8 +33,7 @@ impl Static { fn read(self, root: PathBuf) -> String { let mut path = root; path.push(self.0); - println!("path: {:?}", path); - std::fs::read_to_string(path).expect("to find the file") + std::fs::read_to_string(path.clone()).expect(&format!("to find {:?}", path)) } } @@ -51,15 +50,21 @@ pub async fn main() { .map(|b| PathBuf::from(b)) .unwrap(); + gstreamer::init(); + let index = Arc::new(MemoryIndex::new()); let scanner = Arc::new(FileScanner::new(vec![music_root.clone()])); - let (core, api) = match Core::new(index.clone(), scanner) { - Flow::Ok((core, api)) => (core, api), + let core = match Core::new(index.clone(), scanner) { + Flow::Ok(core) => Arc::new(RwLock::new(core)), Flow::Err(error) => panic!("error: {}", error), Flow::Fatal(error) => panic!("fatal: {}", error), }; - let _handle = thread::spawn(move || core.start()); + /* + let core_handle = tokio::spawn(move || core.main_loop()); + */ + + // let _handle = thread::spawn(move || core.start()); println!("config: {:?} {:?} {:?}", dev, bundle_root, music_root); @@ -79,8 +84,6 @@ pub async fn main() { .map(|m| m.essence_str().to_owned()) .unwrap_or("text/plain".to_owned()); println!("mime_type: {:?}", mime_type); - // let mut path = PathBuf::from("assets"); - // path.push(filename); warp::http::Response::builder() .header("content-type", mime_type) .body(Static(PathBuf::from(filename)).read(bundle_root.clone())) @@ -105,14 +108,22 @@ pub async fn main() { .and(warp::post()) .and(warp::body::json()) .map({ - let api = api.clone(); + let core = core.clone(); move |body: TrackRequest| { - let result = api.play_track(TrackId::from(body.id)); + let result = core.write().unwrap().play_track(TrackId::from(body.id)); println!("Play result: {:?}", result); warp::reply::json(&("ok".to_owned())) } }); + let stop_playback = warp::path!("api" / "v1" / "stop").and(warp::post()).map({ + let core = core.clone(); + move || { + core.write().unwrap().stop_playback(); + warp::reply::json(&("ok".to_owned())) + } + }); + /* let tracks_for_artist = warp::path!("api" / "v1" / "artist" / String) .and(warp::get()) @@ -139,7 +150,11 @@ pub async fn main() { .or(queue) .or(playing_status); */ - let routes = root.or(assets).or(track_list).or(play_track); + let routes = root + .or(assets) + .or(track_list) + .or(play_track) + .or(stop_playback); let server = warp::serve(routes); server .run(SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 8002)) diff --git a/music-player/server/src/core.rs b/music-player/server/src/core.rs index a70303c..590c3a3 100644 --- a/music-player/server/src/core.rs +++ b/music-player/server/src/core.rs @@ -8,30 +8,32 @@ use crate::{ use flow::{ok, return_error, Flow}; use gstreamer::{format::ClockTime, prelude::*, MessageView}; use std::{ - sync::{ - mpsc::{channel, Receiver, RecvTimeoutError, Sender}, - Arc, Mutex, - }, + sync::{Arc, Mutex, RwLock}, thread, - thread::JoinHandle, time::{Duration, Instant}, }; +use tokio::{ + sync::mpsc::{channel, Receiver, Sender}, + task::AbortHandle, +}; fn scan_frequency() -> Duration { Duration::from_secs(60) } -#[derive(Clone)] +#[derive(Clone, Debug)] pub enum ControlMsg { PlayTrack(TrackId), Exit, } +#[derive(Clone, Debug)] pub enum TrackMsg { UpdateInProgress, UpdateComplete, } +#[derive(Clone, Debug)] pub enum PlaybackMsg { PositionUpdate, Playing, @@ -41,53 +43,42 @@ pub enum PlaybackMsg { pub struct Core { db: Arc, - scanner: Arc, - control_rx: Receiver, - playback_controller: Playback, + playback_controller: Option, + + next_scan: Arc>, + + scanner_handle: thread::JoinHandle<()>, } impl Core { pub fn new( db: Arc, scanner: Arc, - ) -> Flow<(Core, CoreAPI), FatalError, Error> { - let (control_tx, control_rx) = channel::(); + ) -> Flow { let db = db; - let playback_controller = Playback::new(); + let next_scan = Arc::new(RwLock::new(Instant::now())); + let scanner_handle = { + let db = db.clone(); + let next_scan = next_scan.clone(); + thread::spawn(move || scan_loop(scanner, db, next_scan)) + }; - ok(( - Core { - db, - scanner, - control_rx, - playback_controller, - }, - CoreAPI { - control_tx: Arc::new(Mutex::new(control_tx)), - }, - )) + ok(Core { + db, + playback_controller: None, + next_scan, + scanner_handle, + }) } - pub fn start(&self) -> Flow<(), FatalError, Error> { - gstreamer::init(); - let (scanner_tx, _scanner_rx) = channel(); - let mut next_scan = Instant::now(); + /* + pub fn main_loop(&self) -> Flow<(), FatalError, Error> { + // let (scanner_tx, _scanner_rx) = channel(); loop { - if Instant::now() >= next_scan { - let scan_start = Instant::now(); - let _ = scanner_tx.send(TrackMsg::UpdateInProgress); - for track in self.scanner.scan() { - match track { - Ok(track) => self.db.add_track(track), - Err(_) => ok(()), - }; - } - let _ = scanner_tx.send(TrackMsg::UpdateComplete); - next_scan = Instant::now() + scan_frequency(); - println!("scan duration: {:?}", Instant::now() - scan_start); - } + println!("loop"); + /* match self.control_rx.recv_timeout(Duration::from_millis(1000)) { Ok(ControlMsg::PlayTrack(id)) => { let _ = self.play_track(id); @@ -96,66 +87,75 @@ impl Core { Err(RecvTimeoutError::Timeout) => (), Err(RecvTimeoutError::Disconnected) => return ok(()), } + */ + thread::sleep(Duration::from_secs(1)); } } + */ pub fn list_tracks<'a>(&'a self) -> Flow, FatalError, Error> { self.db.list_tracks().map_err(Error::DatabaseError) } - pub fn play_track<'a>(&'a self, id: TrackId) -> Flow<(), FatalError, Error> { - /* - println!("play_track: {}", id.as_ref()); - let pipeline = return_error!(Flow::from( - gstreamer::parse_launch(&format!("playbin uri={}", id.as_str())) - .map_err(|err| Error::CannotPlay(err.to_string()),) - )); - return_error!(Flow::from( - pipeline - .set_state(gstreamer::State::Playing) - .map_err(|err| Error::CannotPlay(err.to_string())) - )); - { - let pipeline = pipeline.clone(); - thread::spawn(move || { - println!("starting"); - let bus = pipeline.bus().unwrap(); - for msg in bus.iter_timed(gstreamer::ClockTime::NONE) { - match msg.view() { - MessageView::Eos(_) => (), - MessageView::Error(err) => { - println!( - "Error from {:?}: {} ({:?})", - err.src().map(|s| s.path_string()), - err.error(), - err.debug() - ); - } - msg => println!("{:?}", msg), - } - } - }); - } - */ - self.playback_controller.play_track(id); + pub fn play_track<'a>(&'a mut self, id: TrackId) -> Flow<(), FatalError, Error> { + self.stop_playback(); + self.playback_controller = Some(return_error!(Playback::new(id))); ok(()) } + + pub fn stop_playback<'a>(&'a mut self) { + match self.playback_controller { + Some(ref controller) => controller.stop(), + None => (), + } + self.playback_controller = None; + } } +/* #[derive(Clone)] pub struct CoreAPI { control_tx: Arc>>, } impl CoreAPI { - pub fn play_track(&self, id: TrackId) -> () { + pub async fn play_track(&self, id: TrackId) -> () { self.control_tx .lock() .unwrap() .send(ControlMsg::PlayTrack(id)) + .await .unwrap() } } +*/ + +pub fn scan_loop( + scanner: Arc, + db: Arc, + next_scan: Arc>, +) { + loop { + if Instant::now() >= *next_scan.read().unwrap() { + let scan_start = Instant::now(); + let mut counter = 0; + for track in scanner.scan() { + counter += 1; + match track { + Ok(track) => db.add_track(track), + Err(_) => ok(()), + }; + } + *next_scan.write().unwrap() = Instant::now() + scan_frequency(); + println!( + "scanned {} files in {:?}", + counter, + Instant::now() - scan_start + ); + } + thread::sleep(Duration::from_secs(1)); + } +} #[cfg(test)] mod test { @@ -163,6 +163,7 @@ mod test { use crate::{database::MemoryIndex, media::TrackId, scanner::factories::MockScanner}; use std::collections::HashSet; + /* fn with_example_index(f: F) where F: Fn(Core), @@ -203,4 +204,5 @@ mod test { Flow::Err(err) => panic!("error: {:?}", err), }) } + */ } diff --git a/music-player/server/src/lib.rs b/music-player/server/src/lib.rs index 4171597..edf10d8 100644 --- a/music-player/server/src/lib.rs +++ b/music-player/server/src/lib.rs @@ -12,7 +12,7 @@ pub enum Error { DatabaseError(DatabaseError), #[error("Cannot play track")] - CannotPlay, + CannotPlay(String), #[error("Cannot stop playback")] CannotStop, diff --git a/music-player/server/src/playback.rs b/music-player/server/src/playback.rs index 6442acd..57df72d 100644 --- a/music-player/server/src/playback.rs +++ b/music-player/server/src/playback.rs @@ -3,10 +3,10 @@ use flow::{ok, return_error, Flow}; use gstreamer::{format::ClockTime, prelude::*, MessageView, StateChangeError}; use std::{ path::PathBuf, - sync::mpsc::{channel, Receiver, Sender}, thread::{self, JoinHandle}, time::Duration, }; +use tokio::sync::mpsc::{channel, Receiver, Sender}; use urlencoding::encode; pub enum PlaybackControl { @@ -20,68 +20,75 @@ pub enum PlaybackStatus { } pub struct Playback { - handle: JoinHandle>, - control_tx: Sender, + handle: tokio::task::JoinHandle<()>, + pipeline: gstreamer::Element, } impl Playback { - pub fn new() -> Playback { - let (control_tx, control_rx) = channel::(); + pub fn new(id: TrackId) -> Flow { + let pb = PathBuf::from(id.as_ref()); + let path = pb + .iter() + .skip(1) + .map(|component| encode(&component.to_string_lossy()).into_owned()) + .collect::(); + let pipeline = return_error!(Flow::from( + gstreamer::parse_launch(&format!("playbin uri=file:///{}", path.display())) + .map_err(|err| Error::GlibError(err)) + )); - let handle = thread::spawn(move || { - let mut pipeline = None; - loop { - match control_rx.recv().unwrap() { - PlaybackControl::PlayTrack(id) => match play_track(id) { - Flow::Ok(pipeline_) => pipeline = Some(pipeline_), - Flow::Fatal(err) => panic!("fatal error: {:?}", err), - Flow::Err(err) => panic!("playback error: {:?}", err), - }, - PlaybackControl::Stop => { - if let Some(ref pipeline) = pipeline { - return_error!(Flow::from( - pipeline - .set_state(gstreamer::State::Paused) - .map_err(|_| Error::CannotStop) - )); - } - } - PlaybackControl::Exit => return ok(()), - } - } + pipeline.set_state(gstreamer::State::Playing); + let handle = tokio::task::spawn_blocking({ + let pipeline = pipeline.clone(); + move || pipeline_status(pipeline) }); - Self { handle, control_tx } + ok(Self { handle, pipeline }) } - pub fn play_track(&self, id: TrackId) { - self.control_tx - .send(PlaybackControl::PlayTrack(id)) - .unwrap(); + pub fn stop(&self) { + self.handle.abort(); + self.pipeline.set_state(gstreamer::State::Paused); } } +impl Drop for Playback { + fn drop(&mut self) { + self.stop(); + } +} + +fn pipeline_status(pipeline: gstreamer::Element) { + let bus = pipeline.bus().unwrap(); + for msg in bus.iter_timed(gstreamer::ClockTime::NONE) { + match msg.view() { + MessageView::Eos(_) => (), + MessageView::Error(err) => { + println!( + "Error from {:?}: {} ({:?})", + err.src().map(|s| s.path_string()), + err.error(), + err.debug() + ); + } + msg => println!("{:?}", msg), + } + } +} + +/* fn play_track(id: TrackId) -> Flow { - let pb = PathBuf::from(id.as_ref()); - let path = pb - .iter() - .skip(1) - .map(|component| encode(&component.to_string_lossy()).into_owned()) - .collect::(); - let playbin = format!("playbin uri=file:///{}", path.display()); println!("setting up to play {}", playbin); - let pipeline = return_error!(Flow::from( - gstreamer::parse_launch(&playbin).map_err(|err| Error::GlibError(err)) - )); println!("ready to play"); return_error!(Flow::from( pipeline .set_state(gstreamer::State::Playing) - .map_err(|_| Error::CannotPlay) + .map_err(|err| Error::CannotPlay(err.to_string())) )); println!("playing started"); ok(pipeline) } +*/ /* fn play_track(id: TrackId) -> Flow<(), FatalError, Error> {