Overhaul the threading model to use more async tasks and less threads

This commit is contained in:
Savanni D'Gerinel 2023-03-11 13:02:52 -05:00
parent c296c742ca
commit 8260b4e2f3
5 changed files with 157 additions and 130 deletions

View File

@ -20,4 +20,7 @@ uuid = { version = "1", features = ["v4"] }
warp = { version = "0.3" } warp = { version = "0.3" }
urlencoding = { version = "2.1" } urlencoding = { version = "2.1" }
[target.armv7-unknown-linux-gnueabi]
linker = "arm-linux-gnueabi-gcc"
[lib] [lib]

View File

@ -9,7 +9,7 @@ use serde::Deserialize;
use std::{ use std::{
net::{IpAddr, Ipv4Addr, SocketAddr}, net::{IpAddr, Ipv4Addr, SocketAddr},
path::PathBuf, path::PathBuf,
sync::Arc, sync::{Arc, RwLock},
thread, thread,
}; };
use warp::Filter; use warp::Filter;
@ -33,8 +33,7 @@ impl Static {
fn read(self, root: PathBuf) -> String { fn read(self, root: PathBuf) -> String {
let mut path = root; let mut path = root;
path.push(self.0); path.push(self.0);
println!("path: {:?}", path); std::fs::read_to_string(path.clone()).expect(&format!("to find {:?}", path))
std::fs::read_to_string(path).expect("to find the file")
} }
} }
@ -51,15 +50,21 @@ pub async fn main() {
.map(|b| PathBuf::from(b)) .map(|b| PathBuf::from(b))
.unwrap(); .unwrap();
gstreamer::init();
let index = Arc::new(MemoryIndex::new()); let index = Arc::new(MemoryIndex::new());
let scanner = Arc::new(FileScanner::new(vec![music_root.clone()])); let scanner = Arc::new(FileScanner::new(vec![music_root.clone()]));
let (core, api) = match Core::new(index.clone(), scanner) { let core = match Core::new(index.clone(), scanner) {
Flow::Ok((core, api)) => (core, api), Flow::Ok(core) => Arc::new(RwLock::new(core)),
Flow::Err(error) => panic!("error: {}", error), Flow::Err(error) => panic!("error: {}", error),
Flow::Fatal(error) => panic!("fatal: {}", error), Flow::Fatal(error) => panic!("fatal: {}", error),
}; };
let _handle = thread::spawn(move || core.start()); /*
let core_handle = tokio::spawn(move || core.main_loop());
*/
// let _handle = thread::spawn(move || core.start());
println!("config: {:?} {:?} {:?}", dev, bundle_root, music_root); println!("config: {:?} {:?} {:?}", dev, bundle_root, music_root);
@ -79,8 +84,6 @@ pub async fn main() {
.map(|m| m.essence_str().to_owned()) .map(|m| m.essence_str().to_owned())
.unwrap_or("text/plain".to_owned()); .unwrap_or("text/plain".to_owned());
println!("mime_type: {:?}", mime_type); println!("mime_type: {:?}", mime_type);
// let mut path = PathBuf::from("assets");
// path.push(filename);
warp::http::Response::builder() warp::http::Response::builder()
.header("content-type", mime_type) .header("content-type", mime_type)
.body(Static(PathBuf::from(filename)).read(bundle_root.clone())) .body(Static(PathBuf::from(filename)).read(bundle_root.clone()))
@ -105,14 +108,22 @@ pub async fn main() {
.and(warp::post()) .and(warp::post())
.and(warp::body::json()) .and(warp::body::json())
.map({ .map({
let api = api.clone(); let core = core.clone();
move |body: TrackRequest| { move |body: TrackRequest| {
let result = api.play_track(TrackId::from(body.id)); let result = core.write().unwrap().play_track(TrackId::from(body.id));
println!("Play result: {:?}", result); println!("Play result: {:?}", result);
warp::reply::json(&("ok".to_owned())) warp::reply::json(&("ok".to_owned()))
} }
}); });
let stop_playback = warp::path!("api" / "v1" / "stop").and(warp::post()).map({
let core = core.clone();
move || {
core.write().unwrap().stop_playback();
warp::reply::json(&("ok".to_owned()))
}
});
/* /*
let tracks_for_artist = warp::path!("api" / "v1" / "artist" / String) let tracks_for_artist = warp::path!("api" / "v1" / "artist" / String)
.and(warp::get()) .and(warp::get())
@ -139,7 +150,11 @@ pub async fn main() {
.or(queue) .or(queue)
.or(playing_status); .or(playing_status);
*/ */
let routes = root.or(assets).or(track_list).or(play_track); let routes = root
.or(assets)
.or(track_list)
.or(play_track)
.or(stop_playback);
let server = warp::serve(routes); let server = warp::serve(routes);
server server
.run(SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 8002)) .run(SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 8002))

View File

@ -8,30 +8,32 @@ use crate::{
use flow::{ok, return_error, Flow}; use flow::{ok, return_error, Flow};
use gstreamer::{format::ClockTime, prelude::*, MessageView}; use gstreamer::{format::ClockTime, prelude::*, MessageView};
use std::{ use std::{
sync::{ sync::{Arc, Mutex, RwLock},
mpsc::{channel, Receiver, RecvTimeoutError, Sender},
Arc, Mutex,
},
thread, thread,
thread::JoinHandle,
time::{Duration, Instant}, time::{Duration, Instant},
}; };
use tokio::{
sync::mpsc::{channel, Receiver, Sender},
task::AbortHandle,
};
fn scan_frequency() -> Duration { fn scan_frequency() -> Duration {
Duration::from_secs(60) Duration::from_secs(60)
} }
#[derive(Clone)] #[derive(Clone, Debug)]
pub enum ControlMsg { pub enum ControlMsg {
PlayTrack(TrackId), PlayTrack(TrackId),
Exit, Exit,
} }
#[derive(Clone, Debug)]
pub enum TrackMsg { pub enum TrackMsg {
UpdateInProgress, UpdateInProgress,
UpdateComplete, UpdateComplete,
} }
#[derive(Clone, Debug)]
pub enum PlaybackMsg { pub enum PlaybackMsg {
PositionUpdate, PositionUpdate,
Playing, Playing,
@ -41,53 +43,42 @@ pub enum PlaybackMsg {
pub struct Core { pub struct Core {
db: Arc<dyn MusicIndex>, db: Arc<dyn MusicIndex>,
scanner: Arc<dyn MusicScanner>,
control_rx: Receiver<ControlMsg>,
playback_controller: Playback, playback_controller: Option<Playback>,
next_scan: Arc<RwLock<Instant>>,
scanner_handle: thread::JoinHandle<()>,
} }
impl Core { impl Core {
pub fn new( pub fn new(
db: Arc<dyn MusicIndex>, db: Arc<dyn MusicIndex>,
scanner: Arc<dyn MusicScanner>, scanner: Arc<dyn MusicScanner>,
) -> Flow<(Core, CoreAPI), FatalError, Error> { ) -> Flow<Core, FatalError, Error> {
let (control_tx, control_rx) = channel::<ControlMsg>();
let db = db; let db = db;
let playback_controller = Playback::new(); let next_scan = Arc::new(RwLock::new(Instant::now()));
let scanner_handle = {
ok(( let db = db.clone();
Core { let next_scan = next_scan.clone();
db, thread::spawn(move || scan_loop(scanner, db, next_scan))
scanner,
control_rx,
playback_controller,
},
CoreAPI {
control_tx: Arc::new(Mutex::new(control_tx)),
},
))
}
pub fn start(&self) -> Flow<(), FatalError, Error> {
gstreamer::init();
let (scanner_tx, _scanner_rx) = channel();
let mut next_scan = Instant::now();
loop {
if Instant::now() >= next_scan {
let scan_start = Instant::now();
let _ = scanner_tx.send(TrackMsg::UpdateInProgress);
for track in self.scanner.scan() {
match track {
Ok(track) => self.db.add_track(track),
Err(_) => ok(()),
}; };
ok(Core {
db,
playback_controller: None,
next_scan,
scanner_handle,
})
} }
let _ = scanner_tx.send(TrackMsg::UpdateComplete);
next_scan = Instant::now() + scan_frequency(); /*
println!("scan duration: {:?}", Instant::now() - scan_start); pub fn main_loop(&self) -> Flow<(), FatalError, Error> {
} // let (scanner_tx, _scanner_rx) = channel();
loop {
println!("loop");
/*
match self.control_rx.recv_timeout(Duration::from_millis(1000)) { match self.control_rx.recv_timeout(Duration::from_millis(1000)) {
Ok(ControlMsg::PlayTrack(id)) => { Ok(ControlMsg::PlayTrack(id)) => {
let _ = self.play_track(id); let _ = self.play_track(id);
@ -96,66 +87,75 @@ impl Core {
Err(RecvTimeoutError::Timeout) => (), Err(RecvTimeoutError::Timeout) => (),
Err(RecvTimeoutError::Disconnected) => return ok(()), Err(RecvTimeoutError::Disconnected) => return ok(()),
} }
*/
thread::sleep(Duration::from_secs(1));
} }
} }
*/
pub fn list_tracks<'a>(&'a self) -> Flow<Vec<TrackInfo>, FatalError, Error> { pub fn list_tracks<'a>(&'a self) -> Flow<Vec<TrackInfo>, FatalError, Error> {
self.db.list_tracks().map_err(Error::DatabaseError) self.db.list_tracks().map_err(Error::DatabaseError)
} }
pub fn play_track<'a>(&'a self, id: TrackId) -> Flow<(), FatalError, Error> { pub fn play_track<'a>(&'a mut self, id: TrackId) -> Flow<(), FatalError, Error> {
/* self.stop_playback();
println!("play_track: {}", id.as_ref()); self.playback_controller = Some(return_error!(Playback::new(id)));
let pipeline = return_error!(Flow::from(
gstreamer::parse_launch(&format!("playbin uri={}", id.as_str()))
.map_err(|err| Error::CannotPlay(err.to_string()),)
));
return_error!(Flow::from(
pipeline
.set_state(gstreamer::State::Playing)
.map_err(|err| Error::CannotPlay(err.to_string()))
));
{
let pipeline = pipeline.clone();
thread::spawn(move || {
println!("starting");
let bus = pipeline.bus().unwrap();
for msg in bus.iter_timed(gstreamer::ClockTime::NONE) {
match msg.view() {
MessageView::Eos(_) => (),
MessageView::Error(err) => {
println!(
"Error from {:?}: {} ({:?})",
err.src().map(|s| s.path_string()),
err.error(),
err.debug()
);
}
msg => println!("{:?}", msg),
}
}
});
}
*/
self.playback_controller.play_track(id);
ok(()) ok(())
} }
pub fn stop_playback<'a>(&'a mut self) {
match self.playback_controller {
Some(ref controller) => controller.stop(),
None => (),
}
self.playback_controller = None;
}
} }
/*
#[derive(Clone)] #[derive(Clone)]
pub struct CoreAPI { pub struct CoreAPI {
control_tx: Arc<Mutex<Sender<ControlMsg>>>, control_tx: Arc<Mutex<Sender<ControlMsg>>>,
} }
impl CoreAPI { impl CoreAPI {
pub fn play_track(&self, id: TrackId) -> () { pub async fn play_track(&self, id: TrackId) -> () {
self.control_tx self.control_tx
.lock() .lock()
.unwrap() .unwrap()
.send(ControlMsg::PlayTrack(id)) .send(ControlMsg::PlayTrack(id))
.await
.unwrap() .unwrap()
} }
} }
*/
pub fn scan_loop(
scanner: Arc<dyn MusicScanner>,
db: Arc<dyn MusicIndex>,
next_scan: Arc<RwLock<Instant>>,
) {
loop {
if Instant::now() >= *next_scan.read().unwrap() {
let scan_start = Instant::now();
let mut counter = 0;
for track in scanner.scan() {
counter += 1;
match track {
Ok(track) => db.add_track(track),
Err(_) => ok(()),
};
}
*next_scan.write().unwrap() = Instant::now() + scan_frequency();
println!(
"scanned {} files in {:?}",
counter,
Instant::now() - scan_start
);
}
thread::sleep(Duration::from_secs(1));
}
}
#[cfg(test)] #[cfg(test)]
mod test { mod test {
@ -163,6 +163,7 @@ mod test {
use crate::{database::MemoryIndex, media::TrackId, scanner::factories::MockScanner}; use crate::{database::MemoryIndex, media::TrackId, scanner::factories::MockScanner};
use std::collections::HashSet; use std::collections::HashSet;
/*
fn with_example_index<F>(f: F) fn with_example_index<F>(f: F)
where where
F: Fn(Core), F: Fn(Core),
@ -203,4 +204,5 @@ mod test {
Flow::Err(err) => panic!("error: {:?}", err), Flow::Err(err) => panic!("error: {:?}", err),
}) })
} }
*/
} }

View File

@ -12,7 +12,7 @@ pub enum Error {
DatabaseError(DatabaseError), DatabaseError(DatabaseError),
#[error("Cannot play track")] #[error("Cannot play track")]
CannotPlay, CannotPlay(String),
#[error("Cannot stop playback")] #[error("Cannot stop playback")]
CannotStop, CannotStop,

View File

@ -3,10 +3,10 @@ use flow::{ok, return_error, Flow};
use gstreamer::{format::ClockTime, prelude::*, MessageView, StateChangeError}; use gstreamer::{format::ClockTime, prelude::*, MessageView, StateChangeError};
use std::{ use std::{
path::PathBuf, path::PathBuf,
sync::mpsc::{channel, Receiver, Sender},
thread::{self, JoinHandle}, thread::{self, JoinHandle},
time::Duration, time::Duration,
}; };
use tokio::sync::mpsc::{channel, Receiver, Sender};
use urlencoding::encode; use urlencoding::encode;
pub enum PlaybackControl { pub enum PlaybackControl {
@ -20,68 +20,75 @@ pub enum PlaybackStatus {
} }
pub struct Playback { pub struct Playback {
handle: JoinHandle<Flow<(), FatalError, Error>>, handle: tokio::task::JoinHandle<()>,
control_tx: Sender<PlaybackControl>, pipeline: gstreamer::Element,
} }
impl Playback { impl Playback {
pub fn new() -> Playback { pub fn new(id: TrackId) -> Flow<Self, FatalError, Error> {
let (control_tx, control_rx) = channel::<PlaybackControl>();
let handle = thread::spawn(move || {
let mut pipeline = None;
loop {
match control_rx.recv().unwrap() {
PlaybackControl::PlayTrack(id) => match play_track(id) {
Flow::Ok(pipeline_) => pipeline = Some(pipeline_),
Flow::Fatal(err) => panic!("fatal error: {:?}", err),
Flow::Err(err) => panic!("playback error: {:?}", err),
},
PlaybackControl::Stop => {
if let Some(ref pipeline) = pipeline {
return_error!(Flow::from(
pipeline
.set_state(gstreamer::State::Paused)
.map_err(|_| Error::CannotStop)
));
}
}
PlaybackControl::Exit => return ok(()),
}
}
});
Self { handle, control_tx }
}
pub fn play_track(&self, id: TrackId) {
self.control_tx
.send(PlaybackControl::PlayTrack(id))
.unwrap();
}
}
fn play_track(id: TrackId) -> Flow<gstreamer::Element, FatalError, Error> {
let pb = PathBuf::from(id.as_ref()); let pb = PathBuf::from(id.as_ref());
let path = pb let path = pb
.iter() .iter()
.skip(1) .skip(1)
.map(|component| encode(&component.to_string_lossy()).into_owned()) .map(|component| encode(&component.to_string_lossy()).into_owned())
.collect::<PathBuf>(); .collect::<PathBuf>();
let playbin = format!("playbin uri=file:///{}", path.display());
println!("setting up to play {}", playbin);
let pipeline = return_error!(Flow::from( let pipeline = return_error!(Flow::from(
gstreamer::parse_launch(&playbin).map_err(|err| Error::GlibError(err)) gstreamer::parse_launch(&format!("playbin uri=file:///{}", path.display()))
.map_err(|err| Error::GlibError(err))
)); ));
pipeline.set_state(gstreamer::State::Playing);
let handle = tokio::task::spawn_blocking({
let pipeline = pipeline.clone();
move || pipeline_status(pipeline)
});
ok(Self { handle, pipeline })
}
pub fn stop(&self) {
self.handle.abort();
self.pipeline.set_state(gstreamer::State::Paused);
}
}
impl Drop for Playback {
fn drop(&mut self) {
self.stop();
}
}
fn pipeline_status(pipeline: gstreamer::Element) {
let bus = pipeline.bus().unwrap();
for msg in bus.iter_timed(gstreamer::ClockTime::NONE) {
match msg.view() {
MessageView::Eos(_) => (),
MessageView::Error(err) => {
println!(
"Error from {:?}: {} ({:?})",
err.src().map(|s| s.path_string()),
err.error(),
err.debug()
);
}
msg => println!("{:?}", msg),
}
}
}
/*
fn play_track(id: TrackId) -> Flow<gstreamer::Element, FatalError, Error> {
println!("setting up to play {}", playbin);
println!("ready to play"); println!("ready to play");
return_error!(Flow::from( return_error!(Flow::from(
pipeline pipeline
.set_state(gstreamer::State::Playing) .set_state(gstreamer::State::Playing)
.map_err(|_| Error::CannotPlay) .map_err(|err| Error::CannotPlay(err.to_string()))
)); ));
println!("playing started"); println!("playing started");
ok(pipeline) ok(pipeline)
} }
*/
/* /*
fn play_track(id: TrackId) -> Flow<(), FatalError, Error> { fn play_track(id: TrackId) -> Flow<(), FatalError, Error> {