Be able to add tracks, start, stop, and pause playback from the web api

This commit is contained in:
Savanni D'Gerinel 2024-09-05 00:16:28 -04:00
parent cbe1a90fcb
commit 7da7ffcaa5
4 changed files with 165 additions and 82 deletions

View File

@ -11,7 +11,9 @@ use tokio::{
use crate::{
audio_control::AudioControl,
types::{AppError, AudioControlMessage, AudioStatusMessage, TrackInfo, TrackSpec, Volume},
types::{
AppError, AudioControlMessage, AudioState, AudioStatusMessage, TrackInfo, TrackSpec, Volume,
},
};
#[cfg(test)]
@ -58,18 +60,21 @@ impl App {
let state = state.clone();
async move {
println!("listener started");
loop {
match audio_status.recv().await {
Some(AudioStatusMessage::Playing) => {
while let Some(msg) = audio_status.recv().await {
match msg {
AudioStatusMessage::Playing => {
state.write().unwrap().playing = true;
}
Some(AudioStatusMessage::Status(track_status)) => {
println!("status: {:?}", track_status);
state.write().unwrap().track_status = track_status;
AudioStatusMessage::Status(AudioState {
playing: _playing,
tracks,
}) => {
state.write().unwrap().track_status = tracks;
}
msg => println!("message received from audio controller: {:?}", msg),
}
}
println!("listener exiting");
}
});
@ -105,6 +110,7 @@ impl App {
}
pub async fn enable_track(&self, path: PathBuf) -> Result<(), AppError> {
println!("enabling track: {}", path.display());
self.audio_control
.send(AudioControlMessage::EnableTrack(TrackSpec {
path,
@ -132,18 +138,26 @@ impl App {
}
pub async fn play(&self) -> Result<(), AppError> {
/*
let st = self.internal.write().unwrap();
st.audio_control.play()?;
*/
self.audio_control
.send(AudioControlMessage::Play)
.await
.expect("audio control send to succeed");
Ok(())
}
pub async fn stop(&self) -> Result<(), AppError> {
/*
let st = self.internal.write().unwrap();
st.audio_control.stop()?;
*/
self.audio_control
.send(AudioControlMessage::Stop)
.await
.expect("audio control send to succeed");
Ok(())
}
pub async fn pause(&self) -> Result<(), AppError> {
self.audio_control
.send(AudioControlMessage::Pause)
.await
.expect("audio control send to succeed");
Ok(())
}
}

View File

@ -9,7 +9,7 @@ use gstreamer::{prelude::*, ClockTime, MessageType, MessageView};
use thiserror::Error;
use tokio::sync::mpsc::{Receiver, Sender};
use crate::types::{AudioControlMessage, AudioStatusMessage, TrackSpec};
use crate::types::{AudioControlMessage, AudioState, AudioStatusMessage, Progress, TrackInfo, TrackSpec};
#[derive(Debug, Error, PartialEq)]
pub enum AudioError {
@ -20,33 +20,32 @@ pub enum AudioError {
InvalidState,
}
pub struct AudioControl {}
/*
impl Default for AudioControl {
fn default() -> Self {
Self::new(GStreamerBackend::default())
}
pub struct AudioControl {
backend: Box<dyn AudioControlBackend>,
}
*/
impl AudioControl {
pub fn new() -> Self {
Self {}
pub fn new(backend: impl AudioControlBackend + 'static) -> Self {
Self {
backend: Box::new(backend),
}
}
pub async fn listen(&self, mut control_rx: Receiver<AudioControlMessage>) {
println!("waiting for control message");
while let Some(msg) = control_rx.recv().await {
println!("control message: {:?}", msg);
match msg {
AudioControlMessage::Play => {
unimplemented!()
self.backend.play().unwrap();
}
AudioControlMessage::Stop => {
self.backend.stop().unwrap();
}
AudioControlMessage::Pause => {
unimplemented!()
self.backend.pause().unwrap();
}
AudioControlMessage::EnableTrack(_) => {
unimplemented!()
AudioControlMessage::EnableTrack(spec) => {
self.backend.add_track(spec).unwrap();
}
AudioControlMessage::DisableTrack(_) => {
unimplemented!()
@ -60,9 +59,11 @@ impl AudioControl {
pub async fn report(&self, status_tx: Sender<AudioStatusMessage>) {
loop {
println!("sending status message");
status_tx
.send(AudioStatusMessage::Status(vec![]))
.send(AudioStatusMessage::Status(AudioState {
playing: self.backend.playing(),
tracks: self.backend.tracks()
}))
.await
.expect("to successfully send a message");
let _ = tokio::time::sleep(Duration::from_secs(1)).await;
@ -91,21 +92,25 @@ impl AudioControl {
self.backend.write().unwrap().add_track(track)
}
}
*/
pub trait AudioControlBackend: Send + Sync {
fn playing(&self) -> bool;
fn tracks(&self) -> Vec<TrackSpec>;
fn tracks(&self) -> Vec<TrackInfo>;
fn play(&self) -> Result<(), AudioError>;
fn stop(&self) -> Result<(), AudioError>;
fn add_track(&mut self, track: TrackSpec) -> Result<(), AudioError>;
fn pause(&self) -> Result<(), AudioError>;
fn remove_track(&mut self, track: TrackSpec) -> Result<(), AudioError>;
fn add_track(&self, track: TrackSpec) -> Result<(), AudioError>;
fn remove_track(&self, track: TrackSpec) -> Result<(), AudioError>;
}
/*
pub struct MemoryBackend {
playing: Arc<RwLock<bool>>,
tracks: HashMap<PathBuf, TrackSpec>,
@ -170,6 +175,21 @@ impl AudioControlBackend for MemoryBackend {
Ok(())
}
}
*/
struct GStreamerBackendState {
playing: bool,
tracks: HashMap<PathBuf, TrackInfo>,
}
impl Default for GStreamerBackendState {
fn default() -> Self {
Self {
playing: false,
tracks: HashMap::new(),
}
}
}
pub struct GStreamerBackend {
bus: gstreamer::Bus,
@ -178,7 +198,7 @@ pub struct GStreamerBackend {
audio_sink: gstreamer::Element,
monitor: std::thread::JoinHandle<()>,
playing: Arc<RwLock<bool>>,
state: Arc<RwLock<GStreamerBackendState>>,
}
impl Default for GStreamerBackend {
@ -205,11 +225,11 @@ impl Default for GStreamerBackend {
pipeline.add(&audio_sink).unwrap();
mixer.link(&audio_sink).unwrap();
let playing = Arc::new(RwLock::new(false));
let state = Arc::new(RwLock::new(GStreamerBackendState::default()));
let monitor = std::thread::spawn({
let pipeline_object = pipeline.clone().upcast::<gstreamer::Object>();
let playing = playing.clone();
let state = state.clone();
let bus = bus.clone();
move || loop {
if let Some(msg) = bus.timed_pop_filtered(
@ -223,7 +243,7 @@ impl Default for GStreamerBackend {
match msg.view() {
MessageView::StateChanged(st) => {
if msg.src() == Some(&pipeline_object) {
*playing.write().unwrap() =
state.write().unwrap().playing =
st.current() == gstreamer::State::Playing;
}
}
@ -249,25 +269,23 @@ impl Default for GStreamerBackend {
monitor,
playing: Arc::new(RwLock::new(false)),
state,
}
}
}
impl AudioControlBackend for GStreamerBackend {
fn playing(&self) -> bool {
*self.playing.read().unwrap()
self.state.read().unwrap().playing
}
fn tracks(&self) -> Vec<TrackSpec> {
fn tracks(&self) -> Vec<TrackInfo> {
vec![]
}
fn play(&self) -> Result<(), AudioError> {
let mut playing = self.playing.write().unwrap();
if !*playing {
// self.pipeline.set_state(gstreamer::State::Playing).unwrap();
*playing = true;
if !self.playing() {
self.pipeline.set_state(gstreamer::State::Playing).unwrap();
Ok(())
} else {
Err(AudioError::InvalidState)
@ -275,17 +293,34 @@ impl AudioControlBackend for GStreamerBackend {
}
fn stop(&self) -> Result<(), AudioError> {
let mut playing = self.playing.write().unwrap();
if *playing {
// self.pipeline.set_state(gstreamer::State::Paused).unwrap();
*playing = false;
if self.playing() {
self.pipeline.set_state(gstreamer::State::Ready).unwrap();
Ok(())
} else {
Err(AudioError::InvalidState)
}
}
fn add_track(&mut self, track: TrackSpec) -> Result<(), AudioError> {
fn pause(&self) -> Result<(), AudioError> {
if self.playing() {
self.pipeline.set_state(gstreamer::State::Paused).unwrap();
Ok(())
} else {
Err(AudioError::InvalidState)
}
}
fn add_track(&self, track: TrackSpec) -> Result<(), AudioError> {
let mut st = self.state.write().unwrap();
st.tracks.insert(track.path.clone(), TrackInfo {
path: track.path.clone(),
volume: track.volume,
progress: Progress {
current: Duration::from_secs(0),
length: Duration::from_secs(1),
},
});
let source = gstreamer::ElementFactory::find("filesrc")
.unwrap()
.load()
@ -312,7 +347,7 @@ impl AudioControlBackend for GStreamerBackend {
.unwrap()
.create()
.property("mute", false)
.property("volume", 0.75)
.property("volume", track.volume.as_f64())
.build()
.unwrap();
self.pipeline.add(&volume).unwrap();
@ -326,9 +361,8 @@ impl AudioControlBackend for GStreamerBackend {
Ok(())
}
fn remove_track(&mut self, _path: TrackSpec) -> Result<(), AudioError> {
fn remove_track(&self, _path: TrackSpec) -> Result<(), AudioError> {
unimplemented!()
/* Need to run EOS through to a probe on the trailing end of the volume element */
}
}
*/

View File

@ -1,11 +1,12 @@
use std::{
convert::Infallible,
net::{Ipv6Addr, SocketAddrV6},
path::PathBuf,
sync::Arc,
};
use app::App;
use audio_control::AudioControl;
use audio_control::{AudioControl, GStreamerBackend};
use pipewire::{context::Context, main_loop::MainLoop};
use serde::Deserialize;
use tokio::task::spawn_blocking;
@ -20,6 +21,10 @@ struct PlayTrackParams {
track_name: String,
}
fn with_app(app: Arc<App>) -> impl Filter<Extract = (Arc<App>,), Error = Infallible> + Clone {
warp::any().map(move || app.clone())
}
async fn server_main(app: Arc<App>) {
let localhost: Ipv6Addr = "::1".parse().unwrap();
let server_addr = SocketAddrV6::new(localhost, 3001, 0, 0);
@ -43,40 +48,52 @@ async fn server_main(app: Arc<App>) {
let enable_track = warp::put()
.and(warp::path!("playing"))
.and(warp::body::json())
.map({
let app = app.clone();
move |params: PlayTrackParams| {
let _ = app.enable_track(PathBuf::from(params.track_name));
"".to_owned()
}
.and(with_app(app.clone()))
.then(|params: PlayTrackParams, app: Arc<App>| async move {
println!("enable track");
let _ = app.enable_track(PathBuf::from(params.track_name)).await;
"".to_owned()
});
let disable_track = warp::delete()
.and(warp::path!("playing"))
.and(warp::body::json())
.map({
let app = app.clone();
move |params: PlayTrackParams| {
let _ = app.disable_track(&params.track_name);
.and(with_app(app.clone()))
.then(|params: PlayTrackParams, app: Arc<App>| async move {
let _ = app.disable_track(&params.track_name);
"".to_owned()
});
let play_all = warp::post()
.and(warp::path!("play"))
.and(with_app(app.clone()))
.then({
|app: Arc<App>| async move {
println!("play_all");
let _ = app.play().await;
"".to_owned()
}
});
let play_all = warp::put().and(warp::path!("playing")).map({
let app = app.clone();
move || {
let _ = app.play();
"".to_owned()
}
});
let stop_all = warp::post()
.and(warp::path!("stop"))
.and(with_app(app.clone()))
.then({
|app: Arc<App>| async move {
let _ = app.stop().await;
"".to_owned()
}
});
let stop_all = warp::delete().and(warp::path!("playing")).map({
let app = app.clone();
move || {
let _ = app.stop();
"".to_owned()
}
});
let pause = warp::post()
.and(warp::path!("pause"))
.and(with_app(app.clone()))
.then({
|app: Arc<App>| async move {
let _ = app.pause().await;
"".to_owned()
}
});
let now_playing = warp::path!("playing").map({
let app = app.clone();
@ -90,6 +107,7 @@ async fn server_main(app: Arc<App>) {
.or(disable_track)
.or(play_all)
.or(stop_all)
.or(pause)
.or(now_playing);
serve(routes).run(server_addr).await;
@ -142,7 +160,7 @@ async fn main() {
let (audio_status_tx, audio_status_rx) = tokio::sync::mpsc::channel(5);
let app = Arc::new(App::new(audio_control_tx, audio_status_rx));
let audio_controller = Arc::new(AudioControl::new());
let audio_controller = Arc::new(AudioControl::new(GStreamerBackend::default()));
tokio::spawn({
let audio_controller = audio_controller.clone();
async move { audio_controller.listen(audio_control_rx).await }

View File

@ -43,6 +43,16 @@ pub enum VolumeError {
#[derive(Clone, Copy, Debug, PartialEq, PartialOrd)]
pub struct Volume(f32);
impl Volume {
pub fn as_f32(&self) -> f32 {
self.0
}
pub fn as_f64(&self) -> f64 {
self.0.into()
}
}
impl TryFrom<f32> for Volume {
type Error = VolumeError;
fn try_from(val: f32) -> Result<Self, Self::Error> {
@ -64,6 +74,7 @@ impl From<Volume> for f32 {
pub enum AudioControlMessage {
Play,
Pause,
Stop,
EnableTrack(TrackSpec),
DisableTrack(PathBuf),
ReportStatus,
@ -76,10 +87,16 @@ pub struct TrackInfo {
pub progress: Progress,
}
#[derive(Clone, Debug)]
pub struct AudioState {
pub playing: bool,
pub tracks: Vec<TrackInfo>,
}
#[derive(Debug)]
pub enum AudioStatusMessage {
Playing,
Pausing,
Status(Vec<TrackInfo>),
Status(AudioState),
AudioError(AudioError),
}