Thread model and error handling cleanup #30

Merged
savanni merged 3 commits from refactoring/music-tokio into main 2023-03-11 20:15:47 +00:00
7 changed files with 112 additions and 159 deletions
Showing only changes of commit 725c96bc1b - Show all commits

View File

@ -1,3 +1,8 @@
export type Response<A> =
| { type: "Success"; content: A }
| { type: "Failure"; content: string }
| { type: "Fatal"; content: string };
export interface TrackInfo {
id: string;
track_number?: number;
@ -8,11 +13,28 @@ export interface TrackInfo {
}
export const getTracks = (): Promise<TrackInfo[]> =>
fetch("/api/v1/tracks").then((r) => r.json());
fetch("/api/v1/tracks")
.then((r) => r.json())
.then((result: Response<TrackInfo[]>) => {
switch (result.type) {
case "Success":
return result.content || [];
case "Failure":
console.log("failed: ", result.content);
return [];
case "Fatal":
console.log("fatal: ", result.content);
return [];
}
});
export const playTrack = (id: string): Promise<Response> =>
export const playTrack = (id: string): Promise<void> =>
fetch("/api/v1/play", {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({ id: id }),
})
.then((r) => r.json())
.then((result: Response<null>) => {
console.log("result: ", result);
});

View File

@ -664,6 +664,7 @@ dependencies = [
"mime_guess",
"rusqlite",
"serde",
"serde_json",
"thiserror",
"tokio",
"url",

View File

@ -13,6 +13,7 @@ mime_guess = { version = "2.0" }
mime = { version = "0.3" }
rusqlite = { version = "0.28" }
serde = { version = "1.0", features = ["derive"] }
serde_json = { version = "1.0" }
thiserror = { version = "1.0" }
tokio = { version = "1.24", features = ["full"] }
url = { version = "2.3" }

View File

@ -4,8 +4,9 @@ use music_player::{
database::{MemoryIndex, MusicIndex},
media::{TrackId, TrackInfo},
scanner::FileScanner,
Error, FatalError,
};
use serde::Deserialize;
use serde::{Deserialize, Serialize};
use std::{
net::{IpAddr, Ipv4Addr, SocketAddr},
path::PathBuf,
@ -15,15 +16,35 @@ use std::{
use warp::Filter;
#[derive(Clone, Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
struct TrackRequest {
id: String,
}
fn tracks(index: &Arc<impl MusicIndex>) -> Vec<TrackInfo> {
match index.list_tracks() {
Flow::Ok(tracks) => tracks,
Flow::Err(err) => panic!("error: {}", err),
Flow::Fatal(err) => panic!("fatal: {}", err),
#[derive(Serialize)]
#[serde(tag = "type", content = "content")]
enum Response<A: Serialize> {
Success(A),
Failure(String),
Fatal(String),
}
impl<A: Serialize> From<Result<A, Error>> for Response<A> {
fn from(res: Result<A, Error>) -> Self {
match res {
Ok(val) => Self::Success(val),
Err(err) => Self::Failure(format!("{}", err)),
}
}
}
impl<A: Serialize> From<Flow<A, FatalError, Error>> for Response<A> {
fn from(res: Flow<A, FatalError, Error>) -> Self {
match res {
Flow::Ok(val) => Self::Success(val),
Flow::Fatal(fatal) => Self::Fatal(format!("{}", fatal)),
Flow::Err(err) => Self::Failure(format!("{}", err)),
}
}
}
@ -50,22 +71,18 @@ pub async fn main() {
.map(|b| PathBuf::from(b))
.unwrap();
gstreamer::init();
match gstreamer::init() {
Ok(()) => (),
Err(err) => panic!("failed to initialize gstreamer: {}", err),
}
let index = Arc::new(MemoryIndex::new());
let scanner = Arc::new(FileScanner::new(vec![music_root.clone()]));
let core = match Core::new(index.clone(), scanner) {
Flow::Ok(core) => Arc::new(RwLock::new(core)),
Flow::Err(error) => panic!("error: {}", error),
Flow::Fatal(error) => panic!("fatal: {}", error),
Ok(core) => Arc::new(RwLock::new(core)),
Err(error) => panic!("core failed to initialize: {}", error),
};
/*
let core_handle = tokio::spawn(move || core.main_loop());
*/
// let _handle = thread::spawn(move || core.start());
println!("config: {:?} {:?} {:?}", dev, bundle_root, music_root);
let root = warp::path!().and(warp::get()).map({
@ -101,7 +118,13 @@ pub async fn main() {
let track_list = warp::path!("api" / "v1" / "tracks").and(warp::get()).map({
let index = index.clone();
move || warp::reply::json(&tracks(&index))
move || {
warp::reply::json(&Response::from(
index
.list_tracks()
.map_err(|db_err| Error::DatabaseError(db_err)),
))
}
});
let play_track = warp::path!("api" / "v1" / "play")
@ -110,18 +133,15 @@ pub async fn main() {
.map({
let core = core.clone();
move |body: TrackRequest| {
let result = core.write().unwrap().play_track(TrackId::from(body.id));
println!("Play result: {:?}", result);
warp::reply::json(&("ok".to_owned()))
warp::reply::json(&Response::from(
core.write().unwrap().play_track(TrackId::from(body.id)),
))
}
});
let stop_playback = warp::path!("api" / "v1" / "stop").and(warp::post()).map({
let core = core.clone();
move || {
core.write().unwrap().stop_playback();
warp::reply::json(&("ok".to_owned()))
}
move || warp::reply::json(&Response::from(core.write().unwrap().stop_playback()))
});
/*

View File

@ -43,11 +43,8 @@ pub enum PlaybackMsg {
pub struct Core {
db: Arc<dyn MusicIndex>,
playback_controller: Option<Playback>,
next_scan: Arc<RwLock<Instant>>,
scanner_handle: thread::JoinHandle<()>,
}
@ -55,7 +52,7 @@ impl Core {
pub fn new(
db: Arc<dyn MusicIndex>,
scanner: Arc<dyn MusicScanner>,
) -> Flow<Core, FatalError, Error> {
) -> Result<Core, FatalError> {
let db = db;
let next_scan = Arc::new(RwLock::new(Instant::now()));
@ -65,7 +62,7 @@ impl Core {
thread::spawn(move || scan_loop(scanner, db, next_scan))
};
ok(Core {
Ok(Core {
db,
playback_controller: None,
next_scan,
@ -73,63 +70,26 @@ impl Core {
})
}
/*
pub fn main_loop(&self) -> Flow<(), FatalError, Error> {
// let (scanner_tx, _scanner_rx) = channel();
loop {
println!("loop");
/*
match self.control_rx.recv_timeout(Duration::from_millis(1000)) {
Ok(ControlMsg::PlayTrack(id)) => {
let _ = self.play_track(id);
}
Ok(ControlMsg::Exit) => return ok(()),
Err(RecvTimeoutError::Timeout) => (),
Err(RecvTimeoutError::Disconnected) => return ok(()),
}
*/
thread::sleep(Duration::from_secs(1));
}
}
*/
pub fn list_tracks<'a>(&'a self) -> Flow<Vec<TrackInfo>, FatalError, Error> {
self.db.list_tracks().map_err(Error::DatabaseError)
}
pub fn play_track<'a>(&'a mut self, id: TrackId) -> Flow<(), FatalError, Error> {
self.stop_playback();
self.playback_controller = Some(return_error!(Playback::new(id)));
ok(())
pub fn play_track<'a>(&'a mut self, id: TrackId) -> Result<(), Error> {
self.stop_playback()?;
self.playback_controller = Some(Playback::new(id)?);
Ok(())
}
pub fn stop_playback<'a>(&'a mut self) {
pub fn stop_playback<'a>(&'a mut self) -> Result<(), Error> {
match self.playback_controller {
Some(ref controller) => controller.stop(),
Some(ref controller) => controller.stop()?,
None => (),
}
self.playback_controller = None;
Ok(())
}
}
/*
#[derive(Clone)]
pub struct CoreAPI {
control_tx: Arc<Mutex<Sender<ControlMsg>>>,
}
impl CoreAPI {
pub async fn play_track(&self, id: TrackId) -> () {
self.control_tx
.lock()
.unwrap()
.send(ControlMsg::PlayTrack(id))
.await
.unwrap()
}
}
*/
pub fn scan_loop(
scanner: Arc<dyn MusicScanner>,
db: Arc<dyn MusicIndex>,
@ -163,7 +123,6 @@ mod test {
use crate::{database::MemoryIndex, media::TrackId, scanner::factories::MockScanner};
use std::collections::HashSet;
/*
fn with_example_index<F>(f: F)
where
F: Fn(Core),
@ -171,12 +130,11 @@ mod test {
let index = MemoryIndex::new();
let scanner = MockScanner::new();
match Core::new(Arc::new(index), Arc::new(scanner)) {
Flow::Ok((core, api)) => {
Ok(core) => {
thread::sleep(Duration::from_millis(10));
f(core)
}
Flow::Err(error) => panic!("{:?}", error),
Flow::Fatal(error) => panic!("{:?}", error),
Err(error) => panic!("{:?}", error),
}
}
@ -204,5 +162,4 @@ mod test {
Flow::Err(err) => panic!("error: {:?}", err),
})
}
*/
}

View File

@ -4,6 +4,7 @@ use crate::{
};
use flow::{error, ok, Flow};
use rusqlite::Connection;
use serde::Serialize;
use std::collections::HashMap;
use std::{
path::PathBuf,
@ -11,12 +12,12 @@ use std::{
};
use thiserror::Error;
#[derive(Debug, Error, PartialEq)]
#[derive(Debug, Error, PartialEq, Serialize)]
pub enum DatabaseError {
#[error("database is unreadable")]
DatabaseUnreadable,
#[error("unhandled database problem: {0}")]
UnhandledError(rusqlite::Error),
UnhandledError(String),
}
pub trait MusicIndex: Sync + Send {
@ -94,7 +95,7 @@ impl Database {
pub fn new(path: PathBuf) -> Flow<Database, FatalError, DatabaseError> {
let connection = match Connection::open(path.clone()) {
Ok(connection) => connection,
Err(err) => return error(DatabaseError::UnhandledError(err)),
Err(err) => return error(DatabaseError::UnhandledError(err.to_string())),
};
ok(Database {
path,

View File

@ -20,35 +20,47 @@ pub enum PlaybackStatus {
}
pub struct Playback {
handle: tokio::task::JoinHandle<()>,
events_handle: tokio::task::JoinHandle<()>,
pipeline: gstreamer::Element,
}
impl Playback {
pub fn new(id: TrackId) -> Flow<Self, FatalError, Error> {
pub fn new(id: TrackId) -> Result<Self, Error> {
let pb = PathBuf::from(id.as_ref());
let path = pb
.iter()
.skip(1)
.map(|component| encode(&component.to_string_lossy()).into_owned())
.collect::<PathBuf>();
let pipeline = return_error!(Flow::from(
gstreamer::parse_launch(&format!("playbin uri=file:///{}", path.display()))
.map_err(|err| Error::GlibError(err))
));
let pipeline = gstreamer::parse_launch(&format!("playbin uri=file:///{}", path.display()))
.map_err(|err| Error::GlibError(err))?;
pipeline.set_state(gstreamer::State::Playing);
let handle = tokio::task::spawn_blocking({
pipeline
.set_state(gstreamer::State::Playing)
.map_err(|err| Error::CannotPlay(err.to_string()))?;
let events_handle = tokio::task::spawn_blocking({
let pipeline = pipeline.clone();
move || pipeline_status(pipeline)
move || pipeline_events(pipeline)
});
ok(Self { handle, pipeline })
Ok(Self {
events_handle,
pipeline,
})
}
pub fn stop(&self) {
self.handle.abort();
self.pipeline.set_state(gstreamer::State::Paused);
pub fn stop(&self) -> Result<(), Error> {
self.events_handle.abort();
self.pipeline
.set_state(gstreamer::State::Paused)
.map_err(|_| Error::CannotStop)?;
Ok(())
}
pub fn position(&self) -> (Option<ClockTime>, Option<ClockTime>) {
let position = self.pipeline.query_position();
let duration = self.pipeline.query_duration();
(position, duration)
}
}
@ -58,7 +70,7 @@ impl Drop for Playback {
}
}
fn pipeline_status(pipeline: gstreamer::Element) {
fn pipeline_events(pipeline: gstreamer::Element) {
let bus = pipeline.bus().unwrap();
for msg in bus.iter_timed(gstreamer::ClockTime::NONE) {
match msg.view() {
@ -75,64 +87,3 @@ fn pipeline_status(pipeline: gstreamer::Element) {
}
}
}
/*
fn play_track(id: TrackId) -> Flow<gstreamer::Element, FatalError, Error> {
println!("setting up to play {}", playbin);
println!("ready to play");
return_error!(Flow::from(
pipeline
.set_state(gstreamer::State::Playing)
.map_err(|err| Error::CannotPlay(err.to_string()))
));
println!("playing started");
ok(pipeline)
}
*/
/*
fn play_track(id: TrackId) -> Flow<(), FatalError, Error> {
let playbin = format!("playbin uri=file://{}", id.as_ref());
let pipeline = return_error!(Flow::from(
gstreamer::parse_launch(&playbin).map_err(|err| Error::GlibError(err))
));
return_error!(Flow::from(
pipeline
.set_state(gstreamer::State::Playing)
.map_err(|_| Error::CannotPlay)
));
let message_handler = {
let pipeline = pipeline.clone();
thread::spawn(move || {
let bus = pipeline.bus().unwrap();
for msg in bus.iter_timed(gstreamer::ClockTime::NONE) {
match msg.view() {
MessageView::Eos(_) => (),
MessageView::Error(err) => {
println!(
"Error from {:?}: {} ({:?})",
err.src().map(|s| s.path_string()),
err.error(),
err.debug()
);
}
msg => println!("{:?}", msg),
}
}
})
};
let query_handler = {
let pipeline = pipeline.clone();
thread::spawn(move || loop {
let position: Option<ClockTime> = pipeline.query_position();
let duration: Option<ClockTime> = pipeline.query_duration();
println!("Position {:?} {:?}", position, duration);
thread::sleep(Duration::from_millis(100));
})
};
ok(())
}
*/