use crate::{ database::MusicIndex, media::{TrackId, TrackInfo}, playback::{Playback, PlaybackControl, PlaybackStatus}, scanner::MusicScanner, Error, FatalError, }; use flow::{ok, return_error, Flow}; use gstreamer::{format::ClockTime, prelude::*, MessageView}; use std::{ sync::{ mpsc::{channel, Receiver, RecvTimeoutError, Sender}, Arc, Mutex, }, thread, thread::JoinHandle, time::{Duration, Instant}, }; fn scan_frequency() -> Duration { Duration::from_secs(60) } #[derive(Clone)] pub enum ControlMsg { PlayTrack(TrackId), Exit, } pub enum TrackMsg { UpdateInProgress, UpdateComplete, } pub enum PlaybackMsg { PositionUpdate, Playing, Pausing, Stopping, } pub struct Core { db: Arc, scanner: Arc, control_rx: Receiver, playback_controller: Playback, } impl Core { pub fn new( db: Arc, scanner: Arc, ) -> Flow<(Core, CoreAPI), FatalError, Error> { let (control_tx, control_rx) = channel::(); let db = db; let playback_controller = Playback::new(); ok(( Core { db, scanner, control_rx, playback_controller, }, CoreAPI { control_tx: Arc::new(Mutex::new(control_tx)), }, )) } pub fn start(&self) -> Flow<(), FatalError, Error> { gstreamer::init(); let (scanner_tx, _scanner_rx) = channel(); let mut next_scan = Instant::now(); loop { if Instant::now() >= next_scan { let scan_start = Instant::now(); let _ = scanner_tx.send(TrackMsg::UpdateInProgress); for track in self.scanner.scan() { match track { Ok(track) => self.db.add_track(track), Err(_) => ok(()), }; } let _ = scanner_tx.send(TrackMsg::UpdateComplete); next_scan = Instant::now() + scan_frequency(); println!("scan duration: {:?}", Instant::now() - scan_start); } match self.control_rx.recv_timeout(Duration::from_millis(1000)) { Ok(ControlMsg::PlayTrack(id)) => { let _ = self.play_track(id); } Ok(ControlMsg::Exit) => return ok(()), Err(RecvTimeoutError::Timeout) => (), Err(RecvTimeoutError::Disconnected) => return ok(()), } } } pub fn list_tracks<'a>(&'a self) -> Flow, FatalError, Error> { self.db.list_tracks().map_err(Error::DatabaseError) } pub fn play_track<'a>(&'a self, id: TrackId) -> Flow<(), FatalError, Error> { /* println!("play_track: {}", id.as_ref()); let pipeline = return_error!(Flow::from( gstreamer::parse_launch(&format!("playbin uri={}", id.as_str())) .map_err(|err| Error::CannotPlay(err.to_string()),) )); return_error!(Flow::from( pipeline .set_state(gstreamer::State::Playing) .map_err(|err| Error::CannotPlay(err.to_string())) )); { let pipeline = pipeline.clone(); thread::spawn(move || { println!("starting"); let bus = pipeline.bus().unwrap(); for msg in bus.iter_timed(gstreamer::ClockTime::NONE) { match msg.view() { MessageView::Eos(_) => (), MessageView::Error(err) => { println!( "Error from {:?}: {} ({:?})", err.src().map(|s| s.path_string()), err.error(), err.debug() ); } msg => println!("{:?}", msg), } } }); } */ self.playback_controller.play_track(id); ok(()) } } #[derive(Clone)] pub struct CoreAPI { control_tx: Arc>>, } impl CoreAPI { pub fn play_track(&self, id: TrackId) -> () { self.control_tx .lock() .unwrap() .send(ControlMsg::PlayTrack(id)) .unwrap() } } #[cfg(test)] mod test { use super::*; use crate::{database::MemoryIndex, media::TrackId, scanner::factories::MockScanner}; use std::collections::HashSet; fn with_example_index(f: F) where F: Fn(Core), { let index = MemoryIndex::new(); let scanner = MockScanner::new(); match Core::new(Arc::new(index), Arc::new(scanner)) { Flow::Ok((core, api)) => { thread::sleep(Duration::from_millis(10)); f(core) } Flow::Err(error) => panic!("{:?}", error), Flow::Fatal(error) => panic!("{:?}", error), } } #[test] fn it_lists_tracks() { with_example_index(|core| match core.list_tracks() { Flow::Ok(tracks) => { let track_ids = tracks .iter() .map(|t| t.id.clone()) .collect::>(); assert_eq!(track_ids.len(), 5); assert_eq!( track_ids, HashSet::from([ TrackId::from("/home/savanni/Track 1.mp3".to_owned()), TrackId::from("/home/savanni/Track 2.mp3".to_owned()), TrackId::from("/home/savanni/Track 3.mp3".to_owned()), TrackId::from("/home/savanni/Track 4.mp3".to_owned()), TrackId::from("/home/savanni/Track 5.mp3".to_owned()), ]) ); } Flow::Fatal(err) => panic!("fatal error: {:?}", err), Flow::Err(err) => panic!("error: {:?}", err), }) } }