Thread model and error handling cleanup #30
|
@ -1,3 +1,8 @@
|
|||
export type Response<A> =
|
||||
| { type: "Success"; content: A }
|
||||
| { type: "Failure"; content: string }
|
||||
| { type: "Fatal"; content: string };
|
||||
|
||||
export interface TrackInfo {
|
||||
id: string;
|
||||
track_number?: number;
|
||||
|
@ -8,11 +13,37 @@ export interface TrackInfo {
|
|||
}
|
||||
|
||||
export const getTracks = (): Promise<TrackInfo[]> =>
|
||||
fetch("/api/v1/tracks").then((r) => r.json());
|
||||
fetch("/api/v1/tracks")
|
||||
.then((r) => r.json())
|
||||
.then((result: Response<TrackInfo[]>) => {
|
||||
switch (result.type) {
|
||||
case "Success":
|
||||
return result.content || [];
|
||||
case "Failure":
|
||||
console.log("failed: ", result.content);
|
||||
return [];
|
||||
case "Fatal":
|
||||
console.log("fatal: ", result.content);
|
||||
return [];
|
||||
}
|
||||
});
|
||||
|
||||
export const playTrack = (id: string): Promise<Response> =>
|
||||
export const playTrack = (id: string): Promise<void> =>
|
||||
fetch("/api/v1/play", {
|
||||
method: "POST",
|
||||
headers: { "content-type": "application/json" },
|
||||
body: JSON.stringify({ id: id }),
|
||||
});
|
||||
})
|
||||
.then((r) => r.json())
|
||||
.then((result: Response<null>) => {
|
||||
console.log("result: ", result);
|
||||
});
|
||||
|
||||
export const stopPlayback = (): Promise<void> =>
|
||||
fetch("api/v1/stop", {
|
||||
method: "POST",
|
||||
})
|
||||
.then((r) => r.json())
|
||||
.then((result: Response<null>) => {
|
||||
console.log("result: ", result);
|
||||
});
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
import { TextField } from "./TextField";
|
||||
|
||||
export class NowPlaying extends HTMLElement {
|
||||
onStop: () => void;
|
||||
nameContainer: TextField;
|
||||
albumContainer: TextField;
|
||||
artistContainer: TextField;
|
||||
|
@ -20,6 +21,8 @@ export class NowPlaying extends HTMLElement {
|
|||
|
||||
this.artistContainer = document.createElement("text-field");
|
||||
this.artistContainer.classList.add("now-playing__artist");
|
||||
|
||||
this.onStop = () => {};
|
||||
}
|
||||
|
||||
get name(): string | null {
|
||||
|
@ -74,6 +77,13 @@ export class NowPlaying extends HTMLElement {
|
|||
container.appendChild(this.albumContainer);
|
||||
container.appendChild(this.artistContainer);
|
||||
|
||||
const stopButton = document.createElement("button");
|
||||
stopButton.innerHTML = "Stop";
|
||||
stopButton.addEventListener("click", (_) => {
|
||||
this.onStop();
|
||||
});
|
||||
|
||||
this.appendChild(container);
|
||||
this.appendChild(stopButton);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
import * as _ from "lodash";
|
||||
import { TrackInfo, getTracks, playTrack } from "./client";
|
||||
import { TrackInfo, getTracks, playTrack, stopPlayback } from "./client";
|
||||
import { DataCard } from "./components/DataCard";
|
||||
import { NowPlaying } from "./components/NowPlaying";
|
||||
import { TextField } from "./components/TextField";
|
||||
|
@ -55,6 +55,7 @@ const updateNowPlaying = (track: TrackInfo) => {
|
|||
card.album = track.album || null;
|
||||
card.artist = track.artist || null;
|
||||
track_list.appendChild(card);
|
||||
card.onStop = () => stopPlayback();
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -664,6 +664,7 @@ dependencies = [
|
|||
"mime_guess",
|
||||
"rusqlite",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"url",
|
||||
|
|
|
@ -13,6 +13,7 @@ mime_guess = { version = "2.0" }
|
|||
mime = { version = "0.3" }
|
||||
rusqlite = { version = "0.28" }
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
serde_json = { version = "1.0" }
|
||||
thiserror = { version = "1.0" }
|
||||
tokio = { version = "1.24", features = ["full"] }
|
||||
url = { version = "2.3" }
|
||||
|
@ -20,4 +21,7 @@ uuid = { version = "1", features = ["v4"] }
|
|||
warp = { version = "0.3" }
|
||||
urlencoding = { version = "2.1" }
|
||||
|
||||
[target.armv7-unknown-linux-gnueabi]
|
||||
linker = "arm-linux-gnueabi-gcc"
|
||||
|
||||
[lib]
|
||||
|
|
|
@ -4,26 +4,47 @@ use music_player::{
|
|||
database::{MemoryIndex, MusicIndex},
|
||||
media::{TrackId, TrackInfo},
|
||||
scanner::FileScanner,
|
||||
Error, FatalError,
|
||||
};
|
||||
use serde::Deserialize;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::{
|
||||
net::{IpAddr, Ipv4Addr, SocketAddr},
|
||||
path::PathBuf,
|
||||
sync::Arc,
|
||||
sync::{Arc, RwLock},
|
||||
thread,
|
||||
};
|
||||
use warp::Filter;
|
||||
|
||||
#[derive(Clone, Debug, Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
struct TrackRequest {
|
||||
id: String,
|
||||
}
|
||||
|
||||
fn tracks(index: &Arc<impl MusicIndex>) -> Vec<TrackInfo> {
|
||||
match index.list_tracks() {
|
||||
Flow::Ok(tracks) => tracks,
|
||||
Flow::Err(err) => panic!("error: {}", err),
|
||||
Flow::Fatal(err) => panic!("fatal: {}", err),
|
||||
#[derive(Serialize)]
|
||||
#[serde(tag = "type", content = "content")]
|
||||
enum Response<A: Serialize> {
|
||||
Success(A),
|
||||
Failure(String),
|
||||
Fatal(String),
|
||||
}
|
||||
|
||||
impl<A: Serialize> From<Result<A, Error>> for Response<A> {
|
||||
fn from(res: Result<A, Error>) -> Self {
|
||||
match res {
|
||||
Ok(val) => Self::Success(val),
|
||||
Err(err) => Self::Failure(format!("{}", err)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<A: Serialize> From<Flow<A, FatalError, Error>> for Response<A> {
|
||||
fn from(res: Flow<A, FatalError, Error>) -> Self {
|
||||
match res {
|
||||
Flow::Ok(val) => Self::Success(val),
|
||||
Flow::Fatal(fatal) => Self::Fatal(format!("{}", fatal)),
|
||||
Flow::Err(err) => Self::Failure(format!("{}", err)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -33,8 +54,7 @@ impl Static {
|
|||
fn read(self, root: PathBuf) -> String {
|
||||
let mut path = root;
|
||||
path.push(self.0);
|
||||
println!("path: {:?}", path);
|
||||
std::fs::read_to_string(path).expect("to find the file")
|
||||
std::fs::read_to_string(path.clone()).expect(&format!("to find {:?}", path))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -51,16 +71,18 @@ pub async fn main() {
|
|||
.map(|b| PathBuf::from(b))
|
||||
.unwrap();
|
||||
|
||||
match gstreamer::init() {
|
||||
Ok(()) => (),
|
||||
Err(err) => panic!("failed to initialize gstreamer: {}", err),
|
||||
}
|
||||
|
||||
let index = Arc::new(MemoryIndex::new());
|
||||
let scanner = Arc::new(FileScanner::new(vec![music_root.clone()]));
|
||||
let (core, api) = match Core::new(index.clone(), scanner) {
|
||||
Flow::Ok((core, api)) => (core, api),
|
||||
Flow::Err(error) => panic!("error: {}", error),
|
||||
Flow::Fatal(error) => panic!("fatal: {}", error),
|
||||
let core = match Core::new(index.clone(), scanner) {
|
||||
Ok(core) => Arc::new(RwLock::new(core)),
|
||||
Err(error) => panic!("core failed to initialize: {}", error),
|
||||
};
|
||||
|
||||
let _handle = thread::spawn(move || core.start());
|
||||
|
||||
println!("config: {:?} {:?} {:?}", dev, bundle_root, music_root);
|
||||
|
||||
let root = warp::path!().and(warp::get()).map({
|
||||
|
@ -79,8 +101,6 @@ pub async fn main() {
|
|||
.map(|m| m.essence_str().to_owned())
|
||||
.unwrap_or("text/plain".to_owned());
|
||||
println!("mime_type: {:?}", mime_type);
|
||||
// let mut path = PathBuf::from("assets");
|
||||
// path.push(filename);
|
||||
warp::http::Response::builder()
|
||||
.header("content-type", mime_type)
|
||||
.body(Static(PathBuf::from(filename)).read(bundle_root.clone()))
|
||||
|
@ -98,21 +118,32 @@ pub async fn main() {
|
|||
|
||||
let track_list = warp::path!("api" / "v1" / "tracks").and(warp::get()).map({
|
||||
let index = index.clone();
|
||||
move || warp::reply::json(&tracks(&index))
|
||||
move || {
|
||||
warp::reply::json(&Response::from(
|
||||
index
|
||||
.list_tracks()
|
||||
.map_err(|db_err| Error::DatabaseError(db_err)),
|
||||
))
|
||||
}
|
||||
});
|
||||
|
||||
let play_track = warp::path!("api" / "v1" / "play")
|
||||
.and(warp::post())
|
||||
.and(warp::body::json())
|
||||
.map({
|
||||
let api = api.clone();
|
||||
let core = core.clone();
|
||||
move |body: TrackRequest| {
|
||||
let result = api.play_track(TrackId::from(body.id));
|
||||
println!("Play result: {:?}", result);
|
||||
warp::reply::json(&("ok".to_owned()))
|
||||
warp::reply::json(&Response::from(
|
||||
core.write().unwrap().play_track(TrackId::from(body.id)),
|
||||
))
|
||||
}
|
||||
});
|
||||
|
||||
let stop_playback = warp::path!("api" / "v1" / "stop").and(warp::post()).map({
|
||||
let core = core.clone();
|
||||
move || warp::reply::json(&Response::from(core.write().unwrap().stop_playback()))
|
||||
});
|
||||
|
||||
/*
|
||||
let tracks_for_artist = warp::path!("api" / "v1" / "artist" / String)
|
||||
.and(warp::get())
|
||||
|
@ -139,7 +170,11 @@ pub async fn main() {
|
|||
.or(queue)
|
||||
.or(playing_status);
|
||||
*/
|
||||
let routes = root.or(assets).or(track_list).or(play_track);
|
||||
let routes = root
|
||||
.or(assets)
|
||||
.or(track_list)
|
||||
.or(play_track)
|
||||
.or(stop_playback);
|
||||
let server = warp::serve(routes);
|
||||
server
|
||||
.run(SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 8002))
|
||||
|
|
|
@ -8,30 +8,32 @@ use crate::{
|
|||
use flow::{ok, return_error, Flow};
|
||||
use gstreamer::{format::ClockTime, prelude::*, MessageView};
|
||||
use std::{
|
||||
sync::{
|
||||
mpsc::{channel, Receiver, RecvTimeoutError, Sender},
|
||||
Arc, Mutex,
|
||||
},
|
||||
sync::{Arc, Mutex, RwLock},
|
||||
thread,
|
||||
thread::JoinHandle,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
use tokio::{
|
||||
sync::mpsc::{channel, Receiver, Sender},
|
||||
task::AbortHandle,
|
||||
};
|
||||
|
||||
fn scan_frequency() -> Duration {
|
||||
Duration::from_secs(60)
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
#[derive(Clone, Debug)]
|
||||
pub enum ControlMsg {
|
||||
PlayTrack(TrackId),
|
||||
Exit,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub enum TrackMsg {
|
||||
UpdateInProgress,
|
||||
UpdateComplete,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub enum PlaybackMsg {
|
||||
PositionUpdate,
|
||||
Playing,
|
||||
|
@ -41,119 +43,77 @@ pub enum PlaybackMsg {
|
|||
|
||||
pub struct Core {
|
||||
db: Arc<dyn MusicIndex>,
|
||||
scanner: Arc<dyn MusicScanner>,
|
||||
control_rx: Receiver<ControlMsg>,
|
||||
|
||||
playback_controller: Playback,
|
||||
playback_controller: Option<Playback>,
|
||||
next_scan: Arc<RwLock<Instant>>,
|
||||
scanner_handle: thread::JoinHandle<()>,
|
||||
}
|
||||
|
||||
impl Core {
|
||||
pub fn new(
|
||||
db: Arc<dyn MusicIndex>,
|
||||
scanner: Arc<dyn MusicScanner>,
|
||||
) -> Flow<(Core, CoreAPI), FatalError, Error> {
|
||||
let (control_tx, control_rx) = channel::<ControlMsg>();
|
||||
) -> Result<Core, FatalError> {
|
||||
let db = db;
|
||||
|
||||
let playback_controller = Playback::new();
|
||||
let next_scan = Arc::new(RwLock::new(Instant::now()));
|
||||
let scanner_handle = {
|
||||
let db = db.clone();
|
||||
let next_scan = next_scan.clone();
|
||||
thread::spawn(move || scan_loop(scanner, db, next_scan))
|
||||
};
|
||||
|
||||
ok((
|
||||
Core {
|
||||
db,
|
||||
scanner,
|
||||
control_rx,
|
||||
playback_controller,
|
||||
},
|
||||
CoreAPI {
|
||||
control_tx: Arc::new(Mutex::new(control_tx)),
|
||||
},
|
||||
))
|
||||
}
|
||||
|
||||
pub fn start(&self) -> Flow<(), FatalError, Error> {
|
||||
gstreamer::init();
|
||||
let (scanner_tx, _scanner_rx) = channel();
|
||||
let mut next_scan = Instant::now();
|
||||
loop {
|
||||
if Instant::now() >= next_scan {
|
||||
let scan_start = Instant::now();
|
||||
let _ = scanner_tx.send(TrackMsg::UpdateInProgress);
|
||||
for track in self.scanner.scan() {
|
||||
match track {
|
||||
Ok(track) => self.db.add_track(track),
|
||||
Err(_) => ok(()),
|
||||
};
|
||||
}
|
||||
let _ = scanner_tx.send(TrackMsg::UpdateComplete);
|
||||
next_scan = Instant::now() + scan_frequency();
|
||||
println!("scan duration: {:?}", Instant::now() - scan_start);
|
||||
}
|
||||
match self.control_rx.recv_timeout(Duration::from_millis(1000)) {
|
||||
Ok(ControlMsg::PlayTrack(id)) => {
|
||||
let _ = self.play_track(id);
|
||||
}
|
||||
Ok(ControlMsg::Exit) => return ok(()),
|
||||
Err(RecvTimeoutError::Timeout) => (),
|
||||
Err(RecvTimeoutError::Disconnected) => return ok(()),
|
||||
}
|
||||
}
|
||||
Ok(Core {
|
||||
db,
|
||||
playback_controller: None,
|
||||
next_scan,
|
||||
scanner_handle,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn list_tracks<'a>(&'a self) -> Flow<Vec<TrackInfo>, FatalError, Error> {
|
||||
self.db.list_tracks().map_err(Error::DatabaseError)
|
||||
}
|
||||
|
||||
pub fn play_track<'a>(&'a self, id: TrackId) -> Flow<(), FatalError, Error> {
|
||||
/*
|
||||
println!("play_track: {}", id.as_ref());
|
||||
let pipeline = return_error!(Flow::from(
|
||||
gstreamer::parse_launch(&format!("playbin uri={}", id.as_str()))
|
||||
.map_err(|err| Error::CannotPlay(err.to_string()),)
|
||||
));
|
||||
return_error!(Flow::from(
|
||||
pipeline
|
||||
.set_state(gstreamer::State::Playing)
|
||||
.map_err(|err| Error::CannotPlay(err.to_string()))
|
||||
));
|
||||
{
|
||||
let pipeline = pipeline.clone();
|
||||
thread::spawn(move || {
|
||||
println!("starting");
|
||||
let bus = pipeline.bus().unwrap();
|
||||
for msg in bus.iter_timed(gstreamer::ClockTime::NONE) {
|
||||
match msg.view() {
|
||||
MessageView::Eos(_) => (),
|
||||
MessageView::Error(err) => {
|
||||
println!(
|
||||
"Error from {:?}: {} ({:?})",
|
||||
err.src().map(|s| s.path_string()),
|
||||
err.error(),
|
||||
err.debug()
|
||||
);
|
||||
}
|
||||
msg => println!("{:?}", msg),
|
||||
}
|
||||
}
|
||||
});
|
||||
pub fn play_track<'a>(&'a mut self, id: TrackId) -> Result<(), Error> {
|
||||
self.stop_playback()?;
|
||||
self.playback_controller = Some(Playback::new(id)?);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn stop_playback<'a>(&'a mut self) -> Result<(), Error> {
|
||||
match self.playback_controller {
|
||||
Some(ref controller) => controller.stop()?,
|
||||
None => (),
|
||||
}
|
||||
*/
|
||||
self.playback_controller.play_track(id);
|
||||
ok(())
|
||||
self.playback_controller = None;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct CoreAPI {
|
||||
control_tx: Arc<Mutex<Sender<ControlMsg>>>,
|
||||
}
|
||||
|
||||
impl CoreAPI {
|
||||
pub fn play_track(&self, id: TrackId) -> () {
|
||||
self.control_tx
|
||||
.lock()
|
||||
.unwrap()
|
||||
.send(ControlMsg::PlayTrack(id))
|
||||
.unwrap()
|
||||
pub fn scan_loop(
|
||||
scanner: Arc<dyn MusicScanner>,
|
||||
db: Arc<dyn MusicIndex>,
|
||||
next_scan: Arc<RwLock<Instant>>,
|
||||
) {
|
||||
loop {
|
||||
if Instant::now() >= *next_scan.read().unwrap() {
|
||||
let scan_start = Instant::now();
|
||||
let mut counter = 0;
|
||||
for track in scanner.scan() {
|
||||
counter += 1;
|
||||
match track {
|
||||
Ok(track) => db.add_track(track),
|
||||
Err(_) => ok(()),
|
||||
};
|
||||
}
|
||||
*next_scan.write().unwrap() = Instant::now() + scan_frequency();
|
||||
println!(
|
||||
"scanned {} files in {:?}",
|
||||
counter,
|
||||
Instant::now() - scan_start
|
||||
);
|
||||
}
|
||||
thread::sleep(Duration::from_secs(1));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -170,12 +130,11 @@ mod test {
|
|||
let index = MemoryIndex::new();
|
||||
let scanner = MockScanner::new();
|
||||
match Core::new(Arc::new(index), Arc::new(scanner)) {
|
||||
Flow::Ok((core, api)) => {
|
||||
Ok(core) => {
|
||||
thread::sleep(Duration::from_millis(10));
|
||||
f(core)
|
||||
}
|
||||
Flow::Err(error) => panic!("{:?}", error),
|
||||
Flow::Fatal(error) => panic!("{:?}", error),
|
||||
Err(error) => panic!("{:?}", error),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -4,6 +4,7 @@ use crate::{
|
|||
};
|
||||
use flow::{error, ok, Flow};
|
||||
use rusqlite::Connection;
|
||||
use serde::Serialize;
|
||||
use std::collections::HashMap;
|
||||
use std::{
|
||||
path::PathBuf,
|
||||
|
@ -11,12 +12,12 @@ use std::{
|
|||
};
|
||||
use thiserror::Error;
|
||||
|
||||
#[derive(Debug, Error, PartialEq)]
|
||||
#[derive(Debug, Error, PartialEq, Serialize)]
|
||||
pub enum DatabaseError {
|
||||
#[error("database is unreadable")]
|
||||
DatabaseUnreadable,
|
||||
#[error("unhandled database problem: {0}")]
|
||||
UnhandledError(rusqlite::Error),
|
||||
UnhandledError(String),
|
||||
}
|
||||
|
||||
pub trait MusicIndex: Sync + Send {
|
||||
|
@ -94,7 +95,7 @@ impl Database {
|
|||
pub fn new(path: PathBuf) -> Flow<Database, FatalError, DatabaseError> {
|
||||
let connection = match Connection::open(path.clone()) {
|
||||
Ok(connection) => connection,
|
||||
Err(err) => return error(DatabaseError::UnhandledError(err)),
|
||||
Err(err) => return error(DatabaseError::UnhandledError(err.to_string())),
|
||||
};
|
||||
ok(Database {
|
||||
path,
|
||||
|
|
|
@ -12,7 +12,7 @@ pub enum Error {
|
|||
DatabaseError(DatabaseError),
|
||||
|
||||
#[error("Cannot play track")]
|
||||
CannotPlay,
|
||||
CannotPlay(String),
|
||||
|
||||
#[error("Cannot stop playback")]
|
||||
CannotStop,
|
||||
|
|
|
@ -3,10 +3,10 @@ use flow::{ok, return_error, Flow};
|
|||
use gstreamer::{format::ClockTime, prelude::*, MessageView, StateChangeError};
|
||||
use std::{
|
||||
path::PathBuf,
|
||||
sync::mpsc::{channel, Receiver, Sender},
|
||||
thread::{self, JoinHandle},
|
||||
time::Duration,
|
||||
};
|
||||
use tokio::sync::mpsc::{channel, Receiver, Sender};
|
||||
use urlencoding::encode;
|
||||
|
||||
pub enum PlaybackControl {
|
||||
|
@ -20,112 +20,70 @@ pub enum PlaybackStatus {
|
|||
}
|
||||
|
||||
pub struct Playback {
|
||||
handle: JoinHandle<Flow<(), FatalError, Error>>,
|
||||
control_tx: Sender<PlaybackControl>,
|
||||
events_handle: tokio::task::JoinHandle<()>,
|
||||
pipeline: gstreamer::Element,
|
||||
}
|
||||
|
||||
impl Playback {
|
||||
pub fn new() -> Playback {
|
||||
let (control_tx, control_rx) = channel::<PlaybackControl>();
|
||||
pub fn new(id: TrackId) -> Result<Self, Error> {
|
||||
let pb = PathBuf::from(id.as_ref());
|
||||
let path = pb
|
||||
.iter()
|
||||
.skip(1)
|
||||
.map(|component| encode(&component.to_string_lossy()).into_owned())
|
||||
.collect::<PathBuf>();
|
||||
let pipeline = gstreamer::parse_launch(&format!("playbin uri=file:///{}", path.display()))
|
||||
.map_err(|err| Error::GlibError(err))?;
|
||||
|
||||
let handle = thread::spawn(move || {
|
||||
let mut pipeline = None;
|
||||
loop {
|
||||
match control_rx.recv().unwrap() {
|
||||
PlaybackControl::PlayTrack(id) => match play_track(id) {
|
||||
Flow::Ok(pipeline_) => pipeline = Some(pipeline_),
|
||||
Flow::Fatal(err) => panic!("fatal error: {:?}", err),
|
||||
Flow::Err(err) => panic!("playback error: {:?}", err),
|
||||
},
|
||||
PlaybackControl::Stop => {
|
||||
if let Some(ref pipeline) = pipeline {
|
||||
return_error!(Flow::from(
|
||||
pipeline
|
||||
.set_state(gstreamer::State::Paused)
|
||||
.map_err(|_| Error::CannotStop)
|
||||
));
|
||||
}
|
||||
}
|
||||
PlaybackControl::Exit => return ok(()),
|
||||
}
|
||||
}
|
||||
pipeline
|
||||
.set_state(gstreamer::State::Playing)
|
||||
.map_err(|err| Error::CannotPlay(err.to_string()))?;
|
||||
let events_handle = tokio::task::spawn_blocking({
|
||||
let pipeline = pipeline.clone();
|
||||
move || pipeline_events(pipeline)
|
||||
});
|
||||
|
||||
Self { handle, control_tx }
|
||||
Ok(Self {
|
||||
events_handle,
|
||||
pipeline,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn play_track(&self, id: TrackId) {
|
||||
self.control_tx
|
||||
.send(PlaybackControl::PlayTrack(id))
|
||||
.unwrap();
|
||||
pub fn stop(&self) -> Result<(), Error> {
|
||||
self.events_handle.abort();
|
||||
self.pipeline
|
||||
.set_state(gstreamer::State::Paused)
|
||||
.map_err(|_| Error::CannotStop)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn position(&self) -> (Option<ClockTime>, Option<ClockTime>) {
|
||||
let position = self.pipeline.query_position();
|
||||
let duration = self.pipeline.query_duration();
|
||||
(position, duration)
|
||||
}
|
||||
}
|
||||
|
||||
fn play_track(id: TrackId) -> Flow<gstreamer::Element, FatalError, Error> {
|
||||
let pb = PathBuf::from(id.as_ref());
|
||||
let path = pb
|
||||
.iter()
|
||||
.skip(1)
|
||||
.map(|component| encode(&component.to_string_lossy()).into_owned())
|
||||
.collect::<PathBuf>();
|
||||
let playbin = format!("playbin uri=file:///{}", path.display());
|
||||
println!("setting up to play {}", playbin);
|
||||
let pipeline = return_error!(Flow::from(
|
||||
gstreamer::parse_launch(&playbin).map_err(|err| Error::GlibError(err))
|
||||
));
|
||||
println!("ready to play");
|
||||
return_error!(Flow::from(
|
||||
pipeline
|
||||
.set_state(gstreamer::State::Playing)
|
||||
.map_err(|_| Error::CannotPlay)
|
||||
));
|
||||
println!("playing started");
|
||||
ok(pipeline)
|
||||
impl Drop for Playback {
|
||||
fn drop(&mut self) {
|
||||
self.stop();
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
fn play_track(id: TrackId) -> Flow<(), FatalError, Error> {
|
||||
let playbin = format!("playbin uri=file://{}", id.as_ref());
|
||||
let pipeline = return_error!(Flow::from(
|
||||
gstreamer::parse_launch(&playbin).map_err(|err| Error::GlibError(err))
|
||||
));
|
||||
return_error!(Flow::from(
|
||||
pipeline
|
||||
.set_state(gstreamer::State::Playing)
|
||||
.map_err(|_| Error::CannotPlay)
|
||||
));
|
||||
|
||||
let message_handler = {
|
||||
let pipeline = pipeline.clone();
|
||||
thread::spawn(move || {
|
||||
let bus = pipeline.bus().unwrap();
|
||||
for msg in bus.iter_timed(gstreamer::ClockTime::NONE) {
|
||||
match msg.view() {
|
||||
MessageView::Eos(_) => (),
|
||||
MessageView::Error(err) => {
|
||||
println!(
|
||||
"Error from {:?}: {} ({:?})",
|
||||
err.src().map(|s| s.path_string()),
|
||||
err.error(),
|
||||
err.debug()
|
||||
);
|
||||
}
|
||||
msg => println!("{:?}", msg),
|
||||
}
|
||||
fn pipeline_events(pipeline: gstreamer::Element) {
|
||||
let bus = pipeline.bus().unwrap();
|
||||
for msg in bus.iter_timed(gstreamer::ClockTime::NONE) {
|
||||
match msg.view() {
|
||||
MessageView::Eos(_) => (),
|
||||
MessageView::Error(err) => {
|
||||
println!(
|
||||
"Error from {:?}: {} ({:?})",
|
||||
err.src().map(|s| s.path_string()),
|
||||
err.error(),
|
||||
err.debug()
|
||||
);
|
||||
}
|
||||
})
|
||||
};
|
||||
|
||||
let query_handler = {
|
||||
let pipeline = pipeline.clone();
|
||||
thread::spawn(move || loop {
|
||||
let position: Option<ClockTime> = pipeline.query_position();
|
||||
let duration: Option<ClockTime> = pipeline.query_duration();
|
||||
println!("Position {:?} {:?}", position, duration);
|
||||
thread::sleep(Duration::from_millis(100));
|
||||
})
|
||||
};
|
||||
|
||||
ok(())
|
||||
msg => println!("{:?}", msg),
|
||||
}
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
|
Loading…
Reference in New Issue