Work out improved error reporting

This commit is contained in:
Savanni D'Gerinel 2023-03-11 14:46:51 -05:00
parent 8260b4e2f3
commit cb2560a813
6 changed files with 85 additions and 156 deletions

View File

@ -664,6 +664,7 @@ dependencies = [
"mime_guess", "mime_guess",
"rusqlite", "rusqlite",
"serde", "serde",
"serde_json",
"thiserror", "thiserror",
"tokio", "tokio",
"url", "url",

View File

@ -13,6 +13,7 @@ mime_guess = { version = "2.0" }
mime = { version = "0.3" } mime = { version = "0.3" }
rusqlite = { version = "0.28" } rusqlite = { version = "0.28" }
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_json = { version = "1.0" }
thiserror = { version = "1.0" } thiserror = { version = "1.0" }
tokio = { version = "1.24", features = ["full"] } tokio = { version = "1.24", features = ["full"] }
url = { version = "2.3" } url = { version = "2.3" }

View File

@ -4,8 +4,9 @@ use music_player::{
database::{MemoryIndex, MusicIndex}, database::{MemoryIndex, MusicIndex},
media::{TrackId, TrackInfo}, media::{TrackId, TrackInfo},
scanner::FileScanner, scanner::FileScanner,
Error, FatalError,
}; };
use serde::Deserialize; use serde::{Deserialize, Serialize};
use std::{ use std::{
net::{IpAddr, Ipv4Addr, SocketAddr}, net::{IpAddr, Ipv4Addr, SocketAddr},
path::PathBuf, path::PathBuf,
@ -19,11 +20,29 @@ struct TrackRequest {
id: String, id: String,
} }
fn tracks(index: &Arc<impl MusicIndex>) -> Vec<TrackInfo> { #[derive(Serialize)]
match index.list_tracks() { enum Response<A: Serialize> {
Flow::Ok(tracks) => tracks, Success(A),
Flow::Err(err) => panic!("error: {}", err), Failure(String),
Flow::Fatal(err) => panic!("fatal: {}", err), Fatal(String),
}
impl<A: Serialize> From<Result<A, Error>> for Response<A> {
fn from(res: Result<A, Error>) -> Self {
match res {
Ok(val) => Self::Success(val),
Err(err) => Self::Failure(format!("{}", err)),
}
}
}
impl<A: Serialize> From<Flow<A, FatalError, Error>> for Response<A> {
fn from(res: Flow<A, FatalError, Error>) -> Self {
match res {
Flow::Ok(val) => Self::Success(val),
Flow::Fatal(fatal) => Self::Fatal(format!("{}", fatal)),
Flow::Err(err) => Self::Failure(format!("{}", err)),
}
} }
} }
@ -50,22 +69,18 @@ pub async fn main() {
.map(|b| PathBuf::from(b)) .map(|b| PathBuf::from(b))
.unwrap(); .unwrap();
gstreamer::init(); match gstreamer::init() {
Ok(()) => (),
Err(err) => panic!("failed to initialize gstreamer: {}", err),
}
let index = Arc::new(MemoryIndex::new()); let index = Arc::new(MemoryIndex::new());
let scanner = Arc::new(FileScanner::new(vec![music_root.clone()])); let scanner = Arc::new(FileScanner::new(vec![music_root.clone()]));
let core = match Core::new(index.clone(), scanner) { let core = match Core::new(index.clone(), scanner) {
Flow::Ok(core) => Arc::new(RwLock::new(core)), Ok(core) => Arc::new(RwLock::new(core)),
Flow::Err(error) => panic!("error: {}", error), Err(error) => panic!("core failed to initialize: {}", error),
Flow::Fatal(error) => panic!("fatal: {}", error),
}; };
/*
let core_handle = tokio::spawn(move || core.main_loop());
*/
// let _handle = thread::spawn(move || core.start());
println!("config: {:?} {:?} {:?}", dev, bundle_root, music_root); println!("config: {:?} {:?} {:?}", dev, bundle_root, music_root);
let root = warp::path!().and(warp::get()).map({ let root = warp::path!().and(warp::get()).map({
@ -101,7 +116,13 @@ pub async fn main() {
let track_list = warp::path!("api" / "v1" / "tracks").and(warp::get()).map({ let track_list = warp::path!("api" / "v1" / "tracks").and(warp::get()).map({
let index = index.clone(); let index = index.clone();
move || warp::reply::json(&tracks(&index)) move || {
warp::reply::json(&Response::from(
index
.list_tracks()
.map_err(|db_err| Error::DatabaseError(db_err)),
))
}
}); });
let play_track = warp::path!("api" / "v1" / "play") let play_track = warp::path!("api" / "v1" / "play")
@ -110,18 +131,15 @@ pub async fn main() {
.map({ .map({
let core = core.clone(); let core = core.clone();
move |body: TrackRequest| { move |body: TrackRequest| {
let result = core.write().unwrap().play_track(TrackId::from(body.id)); warp::reply::json(&Response::from(
println!("Play result: {:?}", result); core.write().unwrap().play_track(TrackId::from(body.id)),
warp::reply::json(&("ok".to_owned())) ))
} }
}); });
let stop_playback = warp::path!("api" / "v1" / "stop").and(warp::post()).map({ let stop_playback = warp::path!("api" / "v1" / "stop").and(warp::post()).map({
let core = core.clone(); let core = core.clone();
move || { move || warp::reply::json(&Response::from(core.write().unwrap().stop_playback()))
core.write().unwrap().stop_playback();
warp::reply::json(&("ok".to_owned()))
}
}); });
/* /*

View File

@ -43,11 +43,8 @@ pub enum PlaybackMsg {
pub struct Core { pub struct Core {
db: Arc<dyn MusicIndex>, db: Arc<dyn MusicIndex>,
playback_controller: Option<Playback>, playback_controller: Option<Playback>,
next_scan: Arc<RwLock<Instant>>, next_scan: Arc<RwLock<Instant>>,
scanner_handle: thread::JoinHandle<()>, scanner_handle: thread::JoinHandle<()>,
} }
@ -55,7 +52,7 @@ impl Core {
pub fn new( pub fn new(
db: Arc<dyn MusicIndex>, db: Arc<dyn MusicIndex>,
scanner: Arc<dyn MusicScanner>, scanner: Arc<dyn MusicScanner>,
) -> Flow<Core, FatalError, Error> { ) -> Result<Core, FatalError> {
let db = db; let db = db;
let next_scan = Arc::new(RwLock::new(Instant::now())); let next_scan = Arc::new(RwLock::new(Instant::now()));
@ -65,7 +62,7 @@ impl Core {
thread::spawn(move || scan_loop(scanner, db, next_scan)) thread::spawn(move || scan_loop(scanner, db, next_scan))
}; };
ok(Core { Ok(Core {
db, db,
playback_controller: None, playback_controller: None,
next_scan, next_scan,
@ -73,63 +70,26 @@ impl Core {
}) })
} }
/*
pub fn main_loop(&self) -> Flow<(), FatalError, Error> {
// let (scanner_tx, _scanner_rx) = channel();
loop {
println!("loop");
/*
match self.control_rx.recv_timeout(Duration::from_millis(1000)) {
Ok(ControlMsg::PlayTrack(id)) => {
let _ = self.play_track(id);
}
Ok(ControlMsg::Exit) => return ok(()),
Err(RecvTimeoutError::Timeout) => (),
Err(RecvTimeoutError::Disconnected) => return ok(()),
}
*/
thread::sleep(Duration::from_secs(1));
}
}
*/
pub fn list_tracks<'a>(&'a self) -> Flow<Vec<TrackInfo>, FatalError, Error> { pub fn list_tracks<'a>(&'a self) -> Flow<Vec<TrackInfo>, FatalError, Error> {
self.db.list_tracks().map_err(Error::DatabaseError) self.db.list_tracks().map_err(Error::DatabaseError)
} }
pub fn play_track<'a>(&'a mut self, id: TrackId) -> Flow<(), FatalError, Error> { pub fn play_track<'a>(&'a mut self, id: TrackId) -> Result<(), Error> {
self.stop_playback(); self.stop_playback()?;
self.playback_controller = Some(return_error!(Playback::new(id))); self.playback_controller = Some(Playback::new(id)?);
ok(()) Ok(())
} }
pub fn stop_playback<'a>(&'a mut self) { pub fn stop_playback<'a>(&'a mut self) -> Result<(), Error> {
match self.playback_controller { match self.playback_controller {
Some(ref controller) => controller.stop(), Some(ref controller) => controller.stop()?,
None => (), None => (),
} }
self.playback_controller = None; self.playback_controller = None;
Ok(())
} }
} }
/*
#[derive(Clone)]
pub struct CoreAPI {
control_tx: Arc<Mutex<Sender<ControlMsg>>>,
}
impl CoreAPI {
pub async fn play_track(&self, id: TrackId) -> () {
self.control_tx
.lock()
.unwrap()
.send(ControlMsg::PlayTrack(id))
.await
.unwrap()
}
}
*/
pub fn scan_loop( pub fn scan_loop(
scanner: Arc<dyn MusicScanner>, scanner: Arc<dyn MusicScanner>,
db: Arc<dyn MusicIndex>, db: Arc<dyn MusicIndex>,
@ -163,7 +123,6 @@ mod test {
use crate::{database::MemoryIndex, media::TrackId, scanner::factories::MockScanner}; use crate::{database::MemoryIndex, media::TrackId, scanner::factories::MockScanner};
use std::collections::HashSet; use std::collections::HashSet;
/*
fn with_example_index<F>(f: F) fn with_example_index<F>(f: F)
where where
F: Fn(Core), F: Fn(Core),
@ -171,12 +130,11 @@ mod test {
let index = MemoryIndex::new(); let index = MemoryIndex::new();
let scanner = MockScanner::new(); let scanner = MockScanner::new();
match Core::new(Arc::new(index), Arc::new(scanner)) { match Core::new(Arc::new(index), Arc::new(scanner)) {
Flow::Ok((core, api)) => { Ok(core) => {
thread::sleep(Duration::from_millis(10)); thread::sleep(Duration::from_millis(10));
f(core) f(core)
} }
Flow::Err(error) => panic!("{:?}", error), Err(error) => panic!("{:?}", error),
Flow::Fatal(error) => panic!("{:?}", error),
} }
} }
@ -204,5 +162,4 @@ mod test {
Flow::Err(err) => panic!("error: {:?}", err), Flow::Err(err) => panic!("error: {:?}", err),
}) })
} }
*/
} }

View File

@ -4,6 +4,7 @@ use crate::{
}; };
use flow::{error, ok, Flow}; use flow::{error, ok, Flow};
use rusqlite::Connection; use rusqlite::Connection;
use serde::Serialize;
use std::collections::HashMap; use std::collections::HashMap;
use std::{ use std::{
path::PathBuf, path::PathBuf,
@ -11,12 +12,12 @@ use std::{
}; };
use thiserror::Error; use thiserror::Error;
#[derive(Debug, Error, PartialEq)] #[derive(Debug, Error, PartialEq, Serialize)]
pub enum DatabaseError { pub enum DatabaseError {
#[error("database is unreadable")] #[error("database is unreadable")]
DatabaseUnreadable, DatabaseUnreadable,
#[error("unhandled database problem: {0}")] #[error("unhandled database problem: {0}")]
UnhandledError(rusqlite::Error), UnhandledError(String),
} }
pub trait MusicIndex: Sync + Send { pub trait MusicIndex: Sync + Send {
@ -94,7 +95,7 @@ impl Database {
pub fn new(path: PathBuf) -> Flow<Database, FatalError, DatabaseError> { pub fn new(path: PathBuf) -> Flow<Database, FatalError, DatabaseError> {
let connection = match Connection::open(path.clone()) { let connection = match Connection::open(path.clone()) {
Ok(connection) => connection, Ok(connection) => connection,
Err(err) => return error(DatabaseError::UnhandledError(err)), Err(err) => return error(DatabaseError::UnhandledError(err.to_string())),
}; };
ok(Database { ok(Database {
path, path,

View File

@ -20,35 +20,47 @@ pub enum PlaybackStatus {
} }
pub struct Playback { pub struct Playback {
handle: tokio::task::JoinHandle<()>, events_handle: tokio::task::JoinHandle<()>,
pipeline: gstreamer::Element, pipeline: gstreamer::Element,
} }
impl Playback { impl Playback {
pub fn new(id: TrackId) -> Flow<Self, FatalError, Error> { pub fn new(id: TrackId) -> Result<Self, Error> {
let pb = PathBuf::from(id.as_ref()); let pb = PathBuf::from(id.as_ref());
let path = pb let path = pb
.iter() .iter()
.skip(1) .skip(1)
.map(|component| encode(&component.to_string_lossy()).into_owned()) .map(|component| encode(&component.to_string_lossy()).into_owned())
.collect::<PathBuf>(); .collect::<PathBuf>();
let pipeline = return_error!(Flow::from( let pipeline = gstreamer::parse_launch(&format!("playbin uri=file:///{}", path.display()))
gstreamer::parse_launch(&format!("playbin uri=file:///{}", path.display())) .map_err(|err| Error::GlibError(err))?;
.map_err(|err| Error::GlibError(err))
));
pipeline.set_state(gstreamer::State::Playing); pipeline
let handle = tokio::task::spawn_blocking({ .set_state(gstreamer::State::Playing)
.map_err(|err| Error::CannotPlay(err.to_string()))?;
let events_handle = tokio::task::spawn_blocking({
let pipeline = pipeline.clone(); let pipeline = pipeline.clone();
move || pipeline_status(pipeline) move || pipeline_events(pipeline)
}); });
ok(Self { handle, pipeline }) Ok(Self {
events_handle,
pipeline,
})
} }
pub fn stop(&self) { pub fn stop(&self) -> Result<(), Error> {
self.handle.abort(); self.events_handle.abort();
self.pipeline.set_state(gstreamer::State::Paused); self.pipeline
.set_state(gstreamer::State::Paused)
.map_err(|_| Error::CannotStop)?;
Ok(())
}
pub fn position(&self) -> (Option<ClockTime>, Option<ClockTime>) {
let position = self.pipeline.query_position();
let duration = self.pipeline.query_duration();
(position, duration)
} }
} }
@ -58,7 +70,7 @@ impl Drop for Playback {
} }
} }
fn pipeline_status(pipeline: gstreamer::Element) { fn pipeline_events(pipeline: gstreamer::Element) {
let bus = pipeline.bus().unwrap(); let bus = pipeline.bus().unwrap();
for msg in bus.iter_timed(gstreamer::ClockTime::NONE) { for msg in bus.iter_timed(gstreamer::ClockTime::NONE) {
match msg.view() { match msg.view() {
@ -75,64 +87,3 @@ fn pipeline_status(pipeline: gstreamer::Element) {
} }
} }
} }
/*
fn play_track(id: TrackId) -> Flow<gstreamer::Element, FatalError, Error> {
println!("setting up to play {}", playbin);
println!("ready to play");
return_error!(Flow::from(
pipeline
.set_state(gstreamer::State::Playing)
.map_err(|err| Error::CannotPlay(err.to_string()))
));
println!("playing started");
ok(pipeline)
}
*/
/*
fn play_track(id: TrackId) -> Flow<(), FatalError, Error> {
let playbin = format!("playbin uri=file://{}", id.as_ref());
let pipeline = return_error!(Flow::from(
gstreamer::parse_launch(&playbin).map_err(|err| Error::GlibError(err))
));
return_error!(Flow::from(
pipeline
.set_state(gstreamer::State::Playing)
.map_err(|_| Error::CannotPlay)
));
let message_handler = {
let pipeline = pipeline.clone();
thread::spawn(move || {
let bus = pipeline.bus().unwrap();
for msg in bus.iter_timed(gstreamer::ClockTime::NONE) {
match msg.view() {
MessageView::Eos(_) => (),
MessageView::Error(err) => {
println!(
"Error from {:?}: {} ({:?})",
err.src().map(|s| s.path_string()),
err.error(),
err.debug()
);
}
msg => println!("{:?}", msg),
}
}
})
};
let query_handler = {
let pipeline = pipeline.clone();
thread::spawn(move || loop {
let position: Option<ClockTime> = pipeline.query_position();
let duration: Option<ClockTime> = pipeline.query_duration();
println!("Position {:?} {:?}", position, duration);
thread::sleep(Duration::from_millis(100));
})
};
ok(())
}
*/