From 8260b4e2f322f028d01a755289045a491ce1a329 Mon Sep 17 00:00:00 2001 From: Savanni D'Gerinel Date: Sat, 11 Mar 2023 13:02:52 -0500 Subject: [PATCH 1/3] Overhaul the threading model to use more async tasks and less threads --- music-player/server/Cargo.toml | 3 + music-player/server/src/bin/server.rs | 37 +++++-- music-player/server/src/core.rs | 154 +++++++++++++------------- music-player/server/src/lib.rs | 2 +- music-player/server/src/playback.rs | 91 ++++++++------- 5 files changed, 157 insertions(+), 130 deletions(-) diff --git a/music-player/server/Cargo.toml b/music-player/server/Cargo.toml index ea753c8..a55ec1d 100644 --- a/music-player/server/Cargo.toml +++ b/music-player/server/Cargo.toml @@ -20,4 +20,7 @@ uuid = { version = "1", features = ["v4"] } warp = { version = "0.3" } urlencoding = { version = "2.1" } +[target.armv7-unknown-linux-gnueabi] +linker = "arm-linux-gnueabi-gcc" + [lib] diff --git a/music-player/server/src/bin/server.rs b/music-player/server/src/bin/server.rs index fc0a0a9..3197f7f 100644 --- a/music-player/server/src/bin/server.rs +++ b/music-player/server/src/bin/server.rs @@ -9,7 +9,7 @@ use serde::Deserialize; use std::{ net::{IpAddr, Ipv4Addr, SocketAddr}, path::PathBuf, - sync::Arc, + sync::{Arc, RwLock}, thread, }; use warp::Filter; @@ -33,8 +33,7 @@ impl Static { fn read(self, root: PathBuf) -> String { let mut path = root; path.push(self.0); - println!("path: {:?}", path); - std::fs::read_to_string(path).expect("to find the file") + std::fs::read_to_string(path.clone()).expect(&format!("to find {:?}", path)) } } @@ -51,15 +50,21 @@ pub async fn main() { .map(|b| PathBuf::from(b)) .unwrap(); + gstreamer::init(); + let index = Arc::new(MemoryIndex::new()); let scanner = Arc::new(FileScanner::new(vec![music_root.clone()])); - let (core, api) = match Core::new(index.clone(), scanner) { - Flow::Ok((core, api)) => (core, api), + let core = match Core::new(index.clone(), scanner) { + Flow::Ok(core) => Arc::new(RwLock::new(core)), Flow::Err(error) => panic!("error: {}", error), Flow::Fatal(error) => panic!("fatal: {}", error), }; - let _handle = thread::spawn(move || core.start()); + /* + let core_handle = tokio::spawn(move || core.main_loop()); + */ + + // let _handle = thread::spawn(move || core.start()); println!("config: {:?} {:?} {:?}", dev, bundle_root, music_root); @@ -79,8 +84,6 @@ pub async fn main() { .map(|m| m.essence_str().to_owned()) .unwrap_or("text/plain".to_owned()); println!("mime_type: {:?}", mime_type); - // let mut path = PathBuf::from("assets"); - // path.push(filename); warp::http::Response::builder() .header("content-type", mime_type) .body(Static(PathBuf::from(filename)).read(bundle_root.clone())) @@ -105,14 +108,22 @@ pub async fn main() { .and(warp::post()) .and(warp::body::json()) .map({ - let api = api.clone(); + let core = core.clone(); move |body: TrackRequest| { - let result = api.play_track(TrackId::from(body.id)); + let result = core.write().unwrap().play_track(TrackId::from(body.id)); println!("Play result: {:?}", result); warp::reply::json(&("ok".to_owned())) } }); + let stop_playback = warp::path!("api" / "v1" / "stop").and(warp::post()).map({ + let core = core.clone(); + move || { + core.write().unwrap().stop_playback(); + warp::reply::json(&("ok".to_owned())) + } + }); + /* let tracks_for_artist = warp::path!("api" / "v1" / "artist" / String) .and(warp::get()) @@ -139,7 +150,11 @@ pub async fn main() { .or(queue) .or(playing_status); */ - let routes = root.or(assets).or(track_list).or(play_track); + let routes = root + .or(assets) + .or(track_list) + .or(play_track) + .or(stop_playback); let server = warp::serve(routes); server .run(SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 8002)) diff --git a/music-player/server/src/core.rs b/music-player/server/src/core.rs index a70303c..590c3a3 100644 --- a/music-player/server/src/core.rs +++ b/music-player/server/src/core.rs @@ -8,30 +8,32 @@ use crate::{ use flow::{ok, return_error, Flow}; use gstreamer::{format::ClockTime, prelude::*, MessageView}; use std::{ - sync::{ - mpsc::{channel, Receiver, RecvTimeoutError, Sender}, - Arc, Mutex, - }, + sync::{Arc, Mutex, RwLock}, thread, - thread::JoinHandle, time::{Duration, Instant}, }; +use tokio::{ + sync::mpsc::{channel, Receiver, Sender}, + task::AbortHandle, +}; fn scan_frequency() -> Duration { Duration::from_secs(60) } -#[derive(Clone)] +#[derive(Clone, Debug)] pub enum ControlMsg { PlayTrack(TrackId), Exit, } +#[derive(Clone, Debug)] pub enum TrackMsg { UpdateInProgress, UpdateComplete, } +#[derive(Clone, Debug)] pub enum PlaybackMsg { PositionUpdate, Playing, @@ -41,53 +43,42 @@ pub enum PlaybackMsg { pub struct Core { db: Arc, - scanner: Arc, - control_rx: Receiver, - playback_controller: Playback, + playback_controller: Option, + + next_scan: Arc>, + + scanner_handle: thread::JoinHandle<()>, } impl Core { pub fn new( db: Arc, scanner: Arc, - ) -> Flow<(Core, CoreAPI), FatalError, Error> { - let (control_tx, control_rx) = channel::(); + ) -> Flow { let db = db; - let playback_controller = Playback::new(); + let next_scan = Arc::new(RwLock::new(Instant::now())); + let scanner_handle = { + let db = db.clone(); + let next_scan = next_scan.clone(); + thread::spawn(move || scan_loop(scanner, db, next_scan)) + }; - ok(( - Core { - db, - scanner, - control_rx, - playback_controller, - }, - CoreAPI { - control_tx: Arc::new(Mutex::new(control_tx)), - }, - )) + ok(Core { + db, + playback_controller: None, + next_scan, + scanner_handle, + }) } - pub fn start(&self) -> Flow<(), FatalError, Error> { - gstreamer::init(); - let (scanner_tx, _scanner_rx) = channel(); - let mut next_scan = Instant::now(); + /* + pub fn main_loop(&self) -> Flow<(), FatalError, Error> { + // let (scanner_tx, _scanner_rx) = channel(); loop { - if Instant::now() >= next_scan { - let scan_start = Instant::now(); - let _ = scanner_tx.send(TrackMsg::UpdateInProgress); - for track in self.scanner.scan() { - match track { - Ok(track) => self.db.add_track(track), - Err(_) => ok(()), - }; - } - let _ = scanner_tx.send(TrackMsg::UpdateComplete); - next_scan = Instant::now() + scan_frequency(); - println!("scan duration: {:?}", Instant::now() - scan_start); - } + println!("loop"); + /* match self.control_rx.recv_timeout(Duration::from_millis(1000)) { Ok(ControlMsg::PlayTrack(id)) => { let _ = self.play_track(id); @@ -96,66 +87,75 @@ impl Core { Err(RecvTimeoutError::Timeout) => (), Err(RecvTimeoutError::Disconnected) => return ok(()), } + */ + thread::sleep(Duration::from_secs(1)); } } + */ pub fn list_tracks<'a>(&'a self) -> Flow, FatalError, Error> { self.db.list_tracks().map_err(Error::DatabaseError) } - pub fn play_track<'a>(&'a self, id: TrackId) -> Flow<(), FatalError, Error> { - /* - println!("play_track: {}", id.as_ref()); - let pipeline = return_error!(Flow::from( - gstreamer::parse_launch(&format!("playbin uri={}", id.as_str())) - .map_err(|err| Error::CannotPlay(err.to_string()),) - )); - return_error!(Flow::from( - pipeline - .set_state(gstreamer::State::Playing) - .map_err(|err| Error::CannotPlay(err.to_string())) - )); - { - let pipeline = pipeline.clone(); - thread::spawn(move || { - println!("starting"); - let bus = pipeline.bus().unwrap(); - for msg in bus.iter_timed(gstreamer::ClockTime::NONE) { - match msg.view() { - MessageView::Eos(_) => (), - MessageView::Error(err) => { - println!( - "Error from {:?}: {} ({:?})", - err.src().map(|s| s.path_string()), - err.error(), - err.debug() - ); - } - msg => println!("{:?}", msg), - } - } - }); - } - */ - self.playback_controller.play_track(id); + pub fn play_track<'a>(&'a mut self, id: TrackId) -> Flow<(), FatalError, Error> { + self.stop_playback(); + self.playback_controller = Some(return_error!(Playback::new(id))); ok(()) } + + pub fn stop_playback<'a>(&'a mut self) { + match self.playback_controller { + Some(ref controller) => controller.stop(), + None => (), + } + self.playback_controller = None; + } } +/* #[derive(Clone)] pub struct CoreAPI { control_tx: Arc>>, } impl CoreAPI { - pub fn play_track(&self, id: TrackId) -> () { + pub async fn play_track(&self, id: TrackId) -> () { self.control_tx .lock() .unwrap() .send(ControlMsg::PlayTrack(id)) + .await .unwrap() } } +*/ + +pub fn scan_loop( + scanner: Arc, + db: Arc, + next_scan: Arc>, +) { + loop { + if Instant::now() >= *next_scan.read().unwrap() { + let scan_start = Instant::now(); + let mut counter = 0; + for track in scanner.scan() { + counter += 1; + match track { + Ok(track) => db.add_track(track), + Err(_) => ok(()), + }; + } + *next_scan.write().unwrap() = Instant::now() + scan_frequency(); + println!( + "scanned {} files in {:?}", + counter, + Instant::now() - scan_start + ); + } + thread::sleep(Duration::from_secs(1)); + } +} #[cfg(test)] mod test { @@ -163,6 +163,7 @@ mod test { use crate::{database::MemoryIndex, media::TrackId, scanner::factories::MockScanner}; use std::collections::HashSet; + /* fn with_example_index(f: F) where F: Fn(Core), @@ -203,4 +204,5 @@ mod test { Flow::Err(err) => panic!("error: {:?}", err), }) } + */ } diff --git a/music-player/server/src/lib.rs b/music-player/server/src/lib.rs index 4171597..edf10d8 100644 --- a/music-player/server/src/lib.rs +++ b/music-player/server/src/lib.rs @@ -12,7 +12,7 @@ pub enum Error { DatabaseError(DatabaseError), #[error("Cannot play track")] - CannotPlay, + CannotPlay(String), #[error("Cannot stop playback")] CannotStop, diff --git a/music-player/server/src/playback.rs b/music-player/server/src/playback.rs index 6442acd..57df72d 100644 --- a/music-player/server/src/playback.rs +++ b/music-player/server/src/playback.rs @@ -3,10 +3,10 @@ use flow::{ok, return_error, Flow}; use gstreamer::{format::ClockTime, prelude::*, MessageView, StateChangeError}; use std::{ path::PathBuf, - sync::mpsc::{channel, Receiver, Sender}, thread::{self, JoinHandle}, time::Duration, }; +use tokio::sync::mpsc::{channel, Receiver, Sender}; use urlencoding::encode; pub enum PlaybackControl { @@ -20,68 +20,75 @@ pub enum PlaybackStatus { } pub struct Playback { - handle: JoinHandle>, - control_tx: Sender, + handle: tokio::task::JoinHandle<()>, + pipeline: gstreamer::Element, } impl Playback { - pub fn new() -> Playback { - let (control_tx, control_rx) = channel::(); + pub fn new(id: TrackId) -> Flow { + let pb = PathBuf::from(id.as_ref()); + let path = pb + .iter() + .skip(1) + .map(|component| encode(&component.to_string_lossy()).into_owned()) + .collect::(); + let pipeline = return_error!(Flow::from( + gstreamer::parse_launch(&format!("playbin uri=file:///{}", path.display())) + .map_err(|err| Error::GlibError(err)) + )); - let handle = thread::spawn(move || { - let mut pipeline = None; - loop { - match control_rx.recv().unwrap() { - PlaybackControl::PlayTrack(id) => match play_track(id) { - Flow::Ok(pipeline_) => pipeline = Some(pipeline_), - Flow::Fatal(err) => panic!("fatal error: {:?}", err), - Flow::Err(err) => panic!("playback error: {:?}", err), - }, - PlaybackControl::Stop => { - if let Some(ref pipeline) = pipeline { - return_error!(Flow::from( - pipeline - .set_state(gstreamer::State::Paused) - .map_err(|_| Error::CannotStop) - )); - } - } - PlaybackControl::Exit => return ok(()), - } - } + pipeline.set_state(gstreamer::State::Playing); + let handle = tokio::task::spawn_blocking({ + let pipeline = pipeline.clone(); + move || pipeline_status(pipeline) }); - Self { handle, control_tx } + ok(Self { handle, pipeline }) } - pub fn play_track(&self, id: TrackId) { - self.control_tx - .send(PlaybackControl::PlayTrack(id)) - .unwrap(); + pub fn stop(&self) { + self.handle.abort(); + self.pipeline.set_state(gstreamer::State::Paused); } } +impl Drop for Playback { + fn drop(&mut self) { + self.stop(); + } +} + +fn pipeline_status(pipeline: gstreamer::Element) { + let bus = pipeline.bus().unwrap(); + for msg in bus.iter_timed(gstreamer::ClockTime::NONE) { + match msg.view() { + MessageView::Eos(_) => (), + MessageView::Error(err) => { + println!( + "Error from {:?}: {} ({:?})", + err.src().map(|s| s.path_string()), + err.error(), + err.debug() + ); + } + msg => println!("{:?}", msg), + } + } +} + +/* fn play_track(id: TrackId) -> Flow { - let pb = PathBuf::from(id.as_ref()); - let path = pb - .iter() - .skip(1) - .map(|component| encode(&component.to_string_lossy()).into_owned()) - .collect::(); - let playbin = format!("playbin uri=file:///{}", path.display()); println!("setting up to play {}", playbin); - let pipeline = return_error!(Flow::from( - gstreamer::parse_launch(&playbin).map_err(|err| Error::GlibError(err)) - )); println!("ready to play"); return_error!(Flow::from( pipeline .set_state(gstreamer::State::Playing) - .map_err(|_| Error::CannotPlay) + .map_err(|err| Error::CannotPlay(err.to_string())) )); println!("playing started"); ok(pipeline) } +*/ /* fn play_track(id: TrackId) -> Flow<(), FatalError, Error> { -- 2.44.1 From 725c96bc1b1809d0687f5f52fe0d62da350bd783 Mon Sep 17 00:00:00 2001 From: Savanni D'Gerinel Date: Sat, 11 Mar 2023 14:46:51 -0500 Subject: [PATCH 2/3] Work out improved error reporting --- music-player/client/src/client.ts | 28 ++++++- music-player/server/Cargo.lock | 1 + music-player/server/Cargo.toml | 1 + music-player/server/src/bin/server.rs | 68 +++++++++++------ music-player/server/src/core.rs | 65 +++-------------- music-player/server/src/database.rs | 7 +- music-player/server/src/playback.rs | 101 +++++++------------------- 7 files changed, 112 insertions(+), 159 deletions(-) diff --git a/music-player/client/src/client.ts b/music-player/client/src/client.ts index c39a0ea..315d320 100644 --- a/music-player/client/src/client.ts +++ b/music-player/client/src/client.ts @@ -1,3 +1,8 @@ +export type Response = + | { type: "Success"; content: A } + | { type: "Failure"; content: string } + | { type: "Fatal"; content: string }; + export interface TrackInfo { id: string; track_number?: number; @@ -8,11 +13,28 @@ export interface TrackInfo { } export const getTracks = (): Promise => - fetch("/api/v1/tracks").then((r) => r.json()); + fetch("/api/v1/tracks") + .then((r) => r.json()) + .then((result: Response) => { + switch (result.type) { + case "Success": + return result.content || []; + case "Failure": + console.log("failed: ", result.content); + return []; + case "Fatal": + console.log("fatal: ", result.content); + return []; + } + }); -export const playTrack = (id: string): Promise => +export const playTrack = (id: string): Promise => fetch("/api/v1/play", { method: "POST", headers: { "content-type": "application/json" }, body: JSON.stringify({ id: id }), - }); + }) + .then((r) => r.json()) + .then((result: Response) => { + console.log("result: ", result); + }); diff --git a/music-player/server/Cargo.lock b/music-player/server/Cargo.lock index ac1e348..4ab9465 100644 --- a/music-player/server/Cargo.lock +++ b/music-player/server/Cargo.lock @@ -664,6 +664,7 @@ dependencies = [ "mime_guess", "rusqlite", "serde", + "serde_json", "thiserror", "tokio", "url", diff --git a/music-player/server/Cargo.toml b/music-player/server/Cargo.toml index a55ec1d..3887ae0 100644 --- a/music-player/server/Cargo.toml +++ b/music-player/server/Cargo.toml @@ -13,6 +13,7 @@ mime_guess = { version = "2.0" } mime = { version = "0.3" } rusqlite = { version = "0.28" } serde = { version = "1.0", features = ["derive"] } +serde_json = { version = "1.0" } thiserror = { version = "1.0" } tokio = { version = "1.24", features = ["full"] } url = { version = "2.3" } diff --git a/music-player/server/src/bin/server.rs b/music-player/server/src/bin/server.rs index 3197f7f..34c5c47 100644 --- a/music-player/server/src/bin/server.rs +++ b/music-player/server/src/bin/server.rs @@ -4,8 +4,9 @@ use music_player::{ database::{MemoryIndex, MusicIndex}, media::{TrackId, TrackInfo}, scanner::FileScanner, + Error, FatalError, }; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use std::{ net::{IpAddr, Ipv4Addr, SocketAddr}, path::PathBuf, @@ -15,15 +16,35 @@ use std::{ use warp::Filter; #[derive(Clone, Debug, Deserialize)] +#[serde(rename_all = "camelCase")] struct TrackRequest { id: String, } -fn tracks(index: &Arc) -> Vec { - match index.list_tracks() { - Flow::Ok(tracks) => tracks, - Flow::Err(err) => panic!("error: {}", err), - Flow::Fatal(err) => panic!("fatal: {}", err), +#[derive(Serialize)] +#[serde(tag = "type", content = "content")] +enum Response { + Success(A), + Failure(String), + Fatal(String), +} + +impl From> for Response { + fn from(res: Result) -> Self { + match res { + Ok(val) => Self::Success(val), + Err(err) => Self::Failure(format!("{}", err)), + } + } +} + +impl From> for Response { + fn from(res: Flow) -> Self { + match res { + Flow::Ok(val) => Self::Success(val), + Flow::Fatal(fatal) => Self::Fatal(format!("{}", fatal)), + Flow::Err(err) => Self::Failure(format!("{}", err)), + } } } @@ -50,22 +71,18 @@ pub async fn main() { .map(|b| PathBuf::from(b)) .unwrap(); - gstreamer::init(); + match gstreamer::init() { + Ok(()) => (), + Err(err) => panic!("failed to initialize gstreamer: {}", err), + } let index = Arc::new(MemoryIndex::new()); let scanner = Arc::new(FileScanner::new(vec![music_root.clone()])); let core = match Core::new(index.clone(), scanner) { - Flow::Ok(core) => Arc::new(RwLock::new(core)), - Flow::Err(error) => panic!("error: {}", error), - Flow::Fatal(error) => panic!("fatal: {}", error), + Ok(core) => Arc::new(RwLock::new(core)), + Err(error) => panic!("core failed to initialize: {}", error), }; - /* - let core_handle = tokio::spawn(move || core.main_loop()); - */ - - // let _handle = thread::spawn(move || core.start()); - println!("config: {:?} {:?} {:?}", dev, bundle_root, music_root); let root = warp::path!().and(warp::get()).map({ @@ -101,7 +118,13 @@ pub async fn main() { let track_list = warp::path!("api" / "v1" / "tracks").and(warp::get()).map({ let index = index.clone(); - move || warp::reply::json(&tracks(&index)) + move || { + warp::reply::json(&Response::from( + index + .list_tracks() + .map_err(|db_err| Error::DatabaseError(db_err)), + )) + } }); let play_track = warp::path!("api" / "v1" / "play") @@ -110,18 +133,15 @@ pub async fn main() { .map({ let core = core.clone(); move |body: TrackRequest| { - let result = core.write().unwrap().play_track(TrackId::from(body.id)); - println!("Play result: {:?}", result); - warp::reply::json(&("ok".to_owned())) + warp::reply::json(&Response::from( + core.write().unwrap().play_track(TrackId::from(body.id)), + )) } }); let stop_playback = warp::path!("api" / "v1" / "stop").and(warp::post()).map({ let core = core.clone(); - move || { - core.write().unwrap().stop_playback(); - warp::reply::json(&("ok".to_owned())) - } + move || warp::reply::json(&Response::from(core.write().unwrap().stop_playback())) }); /* diff --git a/music-player/server/src/core.rs b/music-player/server/src/core.rs index 590c3a3..a62eb9b 100644 --- a/music-player/server/src/core.rs +++ b/music-player/server/src/core.rs @@ -43,11 +43,8 @@ pub enum PlaybackMsg { pub struct Core { db: Arc, - playback_controller: Option, - next_scan: Arc>, - scanner_handle: thread::JoinHandle<()>, } @@ -55,7 +52,7 @@ impl Core { pub fn new( db: Arc, scanner: Arc, - ) -> Flow { + ) -> Result { let db = db; let next_scan = Arc::new(RwLock::new(Instant::now())); @@ -65,7 +62,7 @@ impl Core { thread::spawn(move || scan_loop(scanner, db, next_scan)) }; - ok(Core { + Ok(Core { db, playback_controller: None, next_scan, @@ -73,63 +70,26 @@ impl Core { }) } - /* - pub fn main_loop(&self) -> Flow<(), FatalError, Error> { - // let (scanner_tx, _scanner_rx) = channel(); - loop { - println!("loop"); - /* - match self.control_rx.recv_timeout(Duration::from_millis(1000)) { - Ok(ControlMsg::PlayTrack(id)) => { - let _ = self.play_track(id); - } - Ok(ControlMsg::Exit) => return ok(()), - Err(RecvTimeoutError::Timeout) => (), - Err(RecvTimeoutError::Disconnected) => return ok(()), - } - */ - thread::sleep(Duration::from_secs(1)); - } - } - */ - pub fn list_tracks<'a>(&'a self) -> Flow, FatalError, Error> { self.db.list_tracks().map_err(Error::DatabaseError) } - pub fn play_track<'a>(&'a mut self, id: TrackId) -> Flow<(), FatalError, Error> { - self.stop_playback(); - self.playback_controller = Some(return_error!(Playback::new(id))); - ok(()) + pub fn play_track<'a>(&'a mut self, id: TrackId) -> Result<(), Error> { + self.stop_playback()?; + self.playback_controller = Some(Playback::new(id)?); + Ok(()) } - pub fn stop_playback<'a>(&'a mut self) { + pub fn stop_playback<'a>(&'a mut self) -> Result<(), Error> { match self.playback_controller { - Some(ref controller) => controller.stop(), + Some(ref controller) => controller.stop()?, None => (), } self.playback_controller = None; + Ok(()) } } -/* -#[derive(Clone)] -pub struct CoreAPI { - control_tx: Arc>>, -} - -impl CoreAPI { - pub async fn play_track(&self, id: TrackId) -> () { - self.control_tx - .lock() - .unwrap() - .send(ControlMsg::PlayTrack(id)) - .await - .unwrap() - } -} -*/ - pub fn scan_loop( scanner: Arc, db: Arc, @@ -163,7 +123,6 @@ mod test { use crate::{database::MemoryIndex, media::TrackId, scanner::factories::MockScanner}; use std::collections::HashSet; - /* fn with_example_index(f: F) where F: Fn(Core), @@ -171,12 +130,11 @@ mod test { let index = MemoryIndex::new(); let scanner = MockScanner::new(); match Core::new(Arc::new(index), Arc::new(scanner)) { - Flow::Ok((core, api)) => { + Ok(core) => { thread::sleep(Duration::from_millis(10)); f(core) } - Flow::Err(error) => panic!("{:?}", error), - Flow::Fatal(error) => panic!("{:?}", error), + Err(error) => panic!("{:?}", error), } } @@ -204,5 +162,4 @@ mod test { Flow::Err(err) => panic!("error: {:?}", err), }) } - */ } diff --git a/music-player/server/src/database.rs b/music-player/server/src/database.rs index 4407c9d..7d8d660 100644 --- a/music-player/server/src/database.rs +++ b/music-player/server/src/database.rs @@ -4,6 +4,7 @@ use crate::{ }; use flow::{error, ok, Flow}; use rusqlite::Connection; +use serde::Serialize; use std::collections::HashMap; use std::{ path::PathBuf, @@ -11,12 +12,12 @@ use std::{ }; use thiserror::Error; -#[derive(Debug, Error, PartialEq)] +#[derive(Debug, Error, PartialEq, Serialize)] pub enum DatabaseError { #[error("database is unreadable")] DatabaseUnreadable, #[error("unhandled database problem: {0}")] - UnhandledError(rusqlite::Error), + UnhandledError(String), } pub trait MusicIndex: Sync + Send { @@ -94,7 +95,7 @@ impl Database { pub fn new(path: PathBuf) -> Flow { let connection = match Connection::open(path.clone()) { Ok(connection) => connection, - Err(err) => return error(DatabaseError::UnhandledError(err)), + Err(err) => return error(DatabaseError::UnhandledError(err.to_string())), }; ok(Database { path, diff --git a/music-player/server/src/playback.rs b/music-player/server/src/playback.rs index 57df72d..0efb3f3 100644 --- a/music-player/server/src/playback.rs +++ b/music-player/server/src/playback.rs @@ -20,35 +20,47 @@ pub enum PlaybackStatus { } pub struct Playback { - handle: tokio::task::JoinHandle<()>, + events_handle: tokio::task::JoinHandle<()>, pipeline: gstreamer::Element, } impl Playback { - pub fn new(id: TrackId) -> Flow { + pub fn new(id: TrackId) -> Result { let pb = PathBuf::from(id.as_ref()); let path = pb .iter() .skip(1) .map(|component| encode(&component.to_string_lossy()).into_owned()) .collect::(); - let pipeline = return_error!(Flow::from( - gstreamer::parse_launch(&format!("playbin uri=file:///{}", path.display())) - .map_err(|err| Error::GlibError(err)) - )); + let pipeline = gstreamer::parse_launch(&format!("playbin uri=file:///{}", path.display())) + .map_err(|err| Error::GlibError(err))?; - pipeline.set_state(gstreamer::State::Playing); - let handle = tokio::task::spawn_blocking({ + pipeline + .set_state(gstreamer::State::Playing) + .map_err(|err| Error::CannotPlay(err.to_string()))?; + let events_handle = tokio::task::spawn_blocking({ let pipeline = pipeline.clone(); - move || pipeline_status(pipeline) + move || pipeline_events(pipeline) }); - ok(Self { handle, pipeline }) + Ok(Self { + events_handle, + pipeline, + }) } - pub fn stop(&self) { - self.handle.abort(); - self.pipeline.set_state(gstreamer::State::Paused); + pub fn stop(&self) -> Result<(), Error> { + self.events_handle.abort(); + self.pipeline + .set_state(gstreamer::State::Paused) + .map_err(|_| Error::CannotStop)?; + Ok(()) + } + + pub fn position(&self) -> (Option, Option) { + let position = self.pipeline.query_position(); + let duration = self.pipeline.query_duration(); + (position, duration) } } @@ -58,7 +70,7 @@ impl Drop for Playback { } } -fn pipeline_status(pipeline: gstreamer::Element) { +fn pipeline_events(pipeline: gstreamer::Element) { let bus = pipeline.bus().unwrap(); for msg in bus.iter_timed(gstreamer::ClockTime::NONE) { match msg.view() { @@ -75,64 +87,3 @@ fn pipeline_status(pipeline: gstreamer::Element) { } } } - -/* -fn play_track(id: TrackId) -> Flow { - println!("setting up to play {}", playbin); - println!("ready to play"); - return_error!(Flow::from( - pipeline - .set_state(gstreamer::State::Playing) - .map_err(|err| Error::CannotPlay(err.to_string())) - )); - println!("playing started"); - ok(pipeline) -} -*/ - -/* -fn play_track(id: TrackId) -> Flow<(), FatalError, Error> { - let playbin = format!("playbin uri=file://{}", id.as_ref()); - let pipeline = return_error!(Flow::from( - gstreamer::parse_launch(&playbin).map_err(|err| Error::GlibError(err)) - )); - return_error!(Flow::from( - pipeline - .set_state(gstreamer::State::Playing) - .map_err(|_| Error::CannotPlay) - )); - - let message_handler = { - let pipeline = pipeline.clone(); - thread::spawn(move || { - let bus = pipeline.bus().unwrap(); - for msg in bus.iter_timed(gstreamer::ClockTime::NONE) { - match msg.view() { - MessageView::Eos(_) => (), - MessageView::Error(err) => { - println!( - "Error from {:?}: {} ({:?})", - err.src().map(|s| s.path_string()), - err.error(), - err.debug() - ); - } - msg => println!("{:?}", msg), - } - } - }) - }; - - let query_handler = { - let pipeline = pipeline.clone(); - thread::spawn(move || loop { - let position: Option = pipeline.query_position(); - let duration: Option = pipeline.query_duration(); - println!("Position {:?} {:?}", position, duration); - thread::sleep(Duration::from_millis(100)); - }) - }; - - ok(()) -} -*/ -- 2.44.1 From 40cce7ce00ad08d360cbc9eadc41c4293d18e428 Mon Sep 17 00:00:00 2001 From: Savanni D'Gerinel Date: Sat, 11 Mar 2023 15:08:39 -0500 Subject: [PATCH 3/3] Add a playback stop button --- music-player/client/src/client.ts | 9 +++++++++ music-player/client/src/components/NowPlaying.ts | 10 ++++++++++ music-player/client/src/main.ts | 3 ++- 3 files changed, 21 insertions(+), 1 deletion(-) diff --git a/music-player/client/src/client.ts b/music-player/client/src/client.ts index 315d320..94b5d92 100644 --- a/music-player/client/src/client.ts +++ b/music-player/client/src/client.ts @@ -38,3 +38,12 @@ export const playTrack = (id: string): Promise => .then((result: Response) => { console.log("result: ", result); }); + +export const stopPlayback = (): Promise => + fetch("api/v1/stop", { + method: "POST", + }) + .then((r) => r.json()) + .then((result: Response) => { + console.log("result: ", result); + }); diff --git a/music-player/client/src/components/NowPlaying.ts b/music-player/client/src/components/NowPlaying.ts index e256a2f..34dbdbb 100644 --- a/music-player/client/src/components/NowPlaying.ts +++ b/music-player/client/src/components/NowPlaying.ts @@ -1,6 +1,7 @@ import { TextField } from "./TextField"; export class NowPlaying extends HTMLElement { + onStop: () => void; nameContainer: TextField; albumContainer: TextField; artistContainer: TextField; @@ -20,6 +21,8 @@ export class NowPlaying extends HTMLElement { this.artistContainer = document.createElement("text-field"); this.artistContainer.classList.add("now-playing__artist"); + + this.onStop = () => {}; } get name(): string | null { @@ -74,6 +77,13 @@ export class NowPlaying extends HTMLElement { container.appendChild(this.albumContainer); container.appendChild(this.artistContainer); + const stopButton = document.createElement("button"); + stopButton.innerHTML = "Stop"; + stopButton.addEventListener("click", (_) => { + this.onStop(); + }); + this.appendChild(container); + this.appendChild(stopButton); } } diff --git a/music-player/client/src/main.ts b/music-player/client/src/main.ts index 8ded565..1f9d751 100644 --- a/music-player/client/src/main.ts +++ b/music-player/client/src/main.ts @@ -1,5 +1,5 @@ import * as _ from "lodash"; -import { TrackInfo, getTracks, playTrack } from "./client"; +import { TrackInfo, getTracks, playTrack, stopPlayback } from "./client"; import { DataCard } from "./components/DataCard"; import { NowPlaying } from "./components/NowPlaying"; import { TextField } from "./components/TextField"; @@ -55,6 +55,7 @@ const updateNowPlaying = (track: TrackInfo) => { card.album = track.album || null; card.artist = track.artist || null; track_list.appendChild(card); + card.onStop = () => stopPlayback(); } }; -- 2.44.1