From cb2560a8138024a62064c060bbfa83b2ffa3be0d Mon Sep 17 00:00:00 2001 From: Savanni D'Gerinel Date: Sat, 11 Mar 2023 14:46:51 -0500 Subject: [PATCH] Work out improved error reporting --- music-player/server/Cargo.lock | 1 + music-player/server/Cargo.toml | 1 + music-player/server/src/bin/server.rs | 66 +++++++++++------ music-player/server/src/core.rs | 65 +++-------------- music-player/server/src/database.rs | 7 +- music-player/server/src/playback.rs | 101 +++++++------------------- 6 files changed, 85 insertions(+), 156 deletions(-) diff --git a/music-player/server/Cargo.lock b/music-player/server/Cargo.lock index ac1e348..4ab9465 100644 --- a/music-player/server/Cargo.lock +++ b/music-player/server/Cargo.lock @@ -664,6 +664,7 @@ dependencies = [ "mime_guess", "rusqlite", "serde", + "serde_json", "thiserror", "tokio", "url", diff --git a/music-player/server/Cargo.toml b/music-player/server/Cargo.toml index a55ec1d..3887ae0 100644 --- a/music-player/server/Cargo.toml +++ b/music-player/server/Cargo.toml @@ -13,6 +13,7 @@ mime_guess = { version = "2.0" } mime = { version = "0.3" } rusqlite = { version = "0.28" } serde = { version = "1.0", features = ["derive"] } +serde_json = { version = "1.0" } thiserror = { version = "1.0" } tokio = { version = "1.24", features = ["full"] } url = { version = "2.3" } diff --git a/music-player/server/src/bin/server.rs b/music-player/server/src/bin/server.rs index 3197f7f..ca69e4a 100644 --- a/music-player/server/src/bin/server.rs +++ b/music-player/server/src/bin/server.rs @@ -4,8 +4,9 @@ use music_player::{ database::{MemoryIndex, MusicIndex}, media::{TrackId, TrackInfo}, scanner::FileScanner, + Error, FatalError, }; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use std::{ net::{IpAddr, Ipv4Addr, SocketAddr}, path::PathBuf, @@ -19,11 +20,29 @@ struct TrackRequest { id: String, } -fn tracks(index: &Arc) -> Vec { - match index.list_tracks() { - Flow::Ok(tracks) => tracks, - Flow::Err(err) => panic!("error: {}", err), - Flow::Fatal(err) => panic!("fatal: {}", err), +#[derive(Serialize)] +enum Response { + Success(A), + Failure(String), + Fatal(String), +} + +impl From> for Response { + fn from(res: Result) -> Self { + match res { + Ok(val) => Self::Success(val), + Err(err) => Self::Failure(format!("{}", err)), + } + } +} + +impl From> for Response { + fn from(res: Flow) -> Self { + match res { + Flow::Ok(val) => Self::Success(val), + Flow::Fatal(fatal) => Self::Fatal(format!("{}", fatal)), + Flow::Err(err) => Self::Failure(format!("{}", err)), + } } } @@ -50,22 +69,18 @@ pub async fn main() { .map(|b| PathBuf::from(b)) .unwrap(); - gstreamer::init(); + match gstreamer::init() { + Ok(()) => (), + Err(err) => panic!("failed to initialize gstreamer: {}", err), + } let index = Arc::new(MemoryIndex::new()); let scanner = Arc::new(FileScanner::new(vec![music_root.clone()])); let core = match Core::new(index.clone(), scanner) { - Flow::Ok(core) => Arc::new(RwLock::new(core)), - Flow::Err(error) => panic!("error: {}", error), - Flow::Fatal(error) => panic!("fatal: {}", error), + Ok(core) => Arc::new(RwLock::new(core)), + Err(error) => panic!("core failed to initialize: {}", error), }; - /* - let core_handle = tokio::spawn(move || core.main_loop()); - */ - - // let _handle = thread::spawn(move || core.start()); - println!("config: {:?} {:?} {:?}", dev, bundle_root, music_root); let root = warp::path!().and(warp::get()).map({ @@ -101,7 +116,13 @@ pub async fn main() { let track_list = warp::path!("api" / "v1" / "tracks").and(warp::get()).map({ let index = index.clone(); - move || warp::reply::json(&tracks(&index)) + move || { + warp::reply::json(&Response::from( + index + .list_tracks() + .map_err(|db_err| Error::DatabaseError(db_err)), + )) + } }); let play_track = warp::path!("api" / "v1" / "play") @@ -110,18 +131,15 @@ pub async fn main() { .map({ let core = core.clone(); move |body: TrackRequest| { - let result = core.write().unwrap().play_track(TrackId::from(body.id)); - println!("Play result: {:?}", result); - warp::reply::json(&("ok".to_owned())) + warp::reply::json(&Response::from( + core.write().unwrap().play_track(TrackId::from(body.id)), + )) } }); let stop_playback = warp::path!("api" / "v1" / "stop").and(warp::post()).map({ let core = core.clone(); - move || { - core.write().unwrap().stop_playback(); - warp::reply::json(&("ok".to_owned())) - } + move || warp::reply::json(&Response::from(core.write().unwrap().stop_playback())) }); /* diff --git a/music-player/server/src/core.rs b/music-player/server/src/core.rs index 590c3a3..a62eb9b 100644 --- a/music-player/server/src/core.rs +++ b/music-player/server/src/core.rs @@ -43,11 +43,8 @@ pub enum PlaybackMsg { pub struct Core { db: Arc, - playback_controller: Option, - next_scan: Arc>, - scanner_handle: thread::JoinHandle<()>, } @@ -55,7 +52,7 @@ impl Core { pub fn new( db: Arc, scanner: Arc, - ) -> Flow { + ) -> Result { let db = db; let next_scan = Arc::new(RwLock::new(Instant::now())); @@ -65,7 +62,7 @@ impl Core { thread::spawn(move || scan_loop(scanner, db, next_scan)) }; - ok(Core { + Ok(Core { db, playback_controller: None, next_scan, @@ -73,63 +70,26 @@ impl Core { }) } - /* - pub fn main_loop(&self) -> Flow<(), FatalError, Error> { - // let (scanner_tx, _scanner_rx) = channel(); - loop { - println!("loop"); - /* - match self.control_rx.recv_timeout(Duration::from_millis(1000)) { - Ok(ControlMsg::PlayTrack(id)) => { - let _ = self.play_track(id); - } - Ok(ControlMsg::Exit) => return ok(()), - Err(RecvTimeoutError::Timeout) => (), - Err(RecvTimeoutError::Disconnected) => return ok(()), - } - */ - thread::sleep(Duration::from_secs(1)); - } - } - */ - pub fn list_tracks<'a>(&'a self) -> Flow, FatalError, Error> { self.db.list_tracks().map_err(Error::DatabaseError) } - pub fn play_track<'a>(&'a mut self, id: TrackId) -> Flow<(), FatalError, Error> { - self.stop_playback(); - self.playback_controller = Some(return_error!(Playback::new(id))); - ok(()) + pub fn play_track<'a>(&'a mut self, id: TrackId) -> Result<(), Error> { + self.stop_playback()?; + self.playback_controller = Some(Playback::new(id)?); + Ok(()) } - pub fn stop_playback<'a>(&'a mut self) { + pub fn stop_playback<'a>(&'a mut self) -> Result<(), Error> { match self.playback_controller { - Some(ref controller) => controller.stop(), + Some(ref controller) => controller.stop()?, None => (), } self.playback_controller = None; + Ok(()) } } -/* -#[derive(Clone)] -pub struct CoreAPI { - control_tx: Arc>>, -} - -impl CoreAPI { - pub async fn play_track(&self, id: TrackId) -> () { - self.control_tx - .lock() - .unwrap() - .send(ControlMsg::PlayTrack(id)) - .await - .unwrap() - } -} -*/ - pub fn scan_loop( scanner: Arc, db: Arc, @@ -163,7 +123,6 @@ mod test { use crate::{database::MemoryIndex, media::TrackId, scanner::factories::MockScanner}; use std::collections::HashSet; - /* fn with_example_index(f: F) where F: Fn(Core), @@ -171,12 +130,11 @@ mod test { let index = MemoryIndex::new(); let scanner = MockScanner::new(); match Core::new(Arc::new(index), Arc::new(scanner)) { - Flow::Ok((core, api)) => { + Ok(core) => { thread::sleep(Duration::from_millis(10)); f(core) } - Flow::Err(error) => panic!("{:?}", error), - Flow::Fatal(error) => panic!("{:?}", error), + Err(error) => panic!("{:?}", error), } } @@ -204,5 +162,4 @@ mod test { Flow::Err(err) => panic!("error: {:?}", err), }) } - */ } diff --git a/music-player/server/src/database.rs b/music-player/server/src/database.rs index 4407c9d..7d8d660 100644 --- a/music-player/server/src/database.rs +++ b/music-player/server/src/database.rs @@ -4,6 +4,7 @@ use crate::{ }; use flow::{error, ok, Flow}; use rusqlite::Connection; +use serde::Serialize; use std::collections::HashMap; use std::{ path::PathBuf, @@ -11,12 +12,12 @@ use std::{ }; use thiserror::Error; -#[derive(Debug, Error, PartialEq)] +#[derive(Debug, Error, PartialEq, Serialize)] pub enum DatabaseError { #[error("database is unreadable")] DatabaseUnreadable, #[error("unhandled database problem: {0}")] - UnhandledError(rusqlite::Error), + UnhandledError(String), } pub trait MusicIndex: Sync + Send { @@ -94,7 +95,7 @@ impl Database { pub fn new(path: PathBuf) -> Flow { let connection = match Connection::open(path.clone()) { Ok(connection) => connection, - Err(err) => return error(DatabaseError::UnhandledError(err)), + Err(err) => return error(DatabaseError::UnhandledError(err.to_string())), }; ok(Database { path, diff --git a/music-player/server/src/playback.rs b/music-player/server/src/playback.rs index 57df72d..0efb3f3 100644 --- a/music-player/server/src/playback.rs +++ b/music-player/server/src/playback.rs @@ -20,35 +20,47 @@ pub enum PlaybackStatus { } pub struct Playback { - handle: tokio::task::JoinHandle<()>, + events_handle: tokio::task::JoinHandle<()>, pipeline: gstreamer::Element, } impl Playback { - pub fn new(id: TrackId) -> Flow { + pub fn new(id: TrackId) -> Result { let pb = PathBuf::from(id.as_ref()); let path = pb .iter() .skip(1) .map(|component| encode(&component.to_string_lossy()).into_owned()) .collect::(); - let pipeline = return_error!(Flow::from( - gstreamer::parse_launch(&format!("playbin uri=file:///{}", path.display())) - .map_err(|err| Error::GlibError(err)) - )); + let pipeline = gstreamer::parse_launch(&format!("playbin uri=file:///{}", path.display())) + .map_err(|err| Error::GlibError(err))?; - pipeline.set_state(gstreamer::State::Playing); - let handle = tokio::task::spawn_blocking({ + pipeline + .set_state(gstreamer::State::Playing) + .map_err(|err| Error::CannotPlay(err.to_string()))?; + let events_handle = tokio::task::spawn_blocking({ let pipeline = pipeline.clone(); - move || pipeline_status(pipeline) + move || pipeline_events(pipeline) }); - ok(Self { handle, pipeline }) + Ok(Self { + events_handle, + pipeline, + }) } - pub fn stop(&self) { - self.handle.abort(); - self.pipeline.set_state(gstreamer::State::Paused); + pub fn stop(&self) -> Result<(), Error> { + self.events_handle.abort(); + self.pipeline + .set_state(gstreamer::State::Paused) + .map_err(|_| Error::CannotStop)?; + Ok(()) + } + + pub fn position(&self) -> (Option, Option) { + let position = self.pipeline.query_position(); + let duration = self.pipeline.query_duration(); + (position, duration) } } @@ -58,7 +70,7 @@ impl Drop for Playback { } } -fn pipeline_status(pipeline: gstreamer::Element) { +fn pipeline_events(pipeline: gstreamer::Element) { let bus = pipeline.bus().unwrap(); for msg in bus.iter_timed(gstreamer::ClockTime::NONE) { match msg.view() { @@ -75,64 +87,3 @@ fn pipeline_status(pipeline: gstreamer::Element) { } } } - -/* -fn play_track(id: TrackId) -> Flow { - println!("setting up to play {}", playbin); - println!("ready to play"); - return_error!(Flow::from( - pipeline - .set_state(gstreamer::State::Playing) - .map_err(|err| Error::CannotPlay(err.to_string())) - )); - println!("playing started"); - ok(pipeline) -} -*/ - -/* -fn play_track(id: TrackId) -> Flow<(), FatalError, Error> { - let playbin = format!("playbin uri=file://{}", id.as_ref()); - let pipeline = return_error!(Flow::from( - gstreamer::parse_launch(&playbin).map_err(|err| Error::GlibError(err)) - )); - return_error!(Flow::from( - pipeline - .set_state(gstreamer::State::Playing) - .map_err(|_| Error::CannotPlay) - )); - - let message_handler = { - let pipeline = pipeline.clone(); - thread::spawn(move || { - let bus = pipeline.bus().unwrap(); - for msg in bus.iter_timed(gstreamer::ClockTime::NONE) { - match msg.view() { - MessageView::Eos(_) => (), - MessageView::Error(err) => { - println!( - "Error from {:?}: {} ({:?})", - err.src().map(|s| s.path_string()), - err.error(), - err.debug() - ); - } - msg => println!("{:?}", msg), - } - } - }) - }; - - let query_handler = { - let pipeline = pipeline.clone(); - thread::spawn(move || loop { - let position: Option = pipeline.query_position(); - let duration: Option = pipeline.query_duration(); - println!("Position {:?} {:?}", position, duration); - thread::sleep(Duration::from_millis(100)); - }) - }; - - ok(()) -} -*/