From 1d8307015f91137759a9e35b10c75be6a48c9f61 Mon Sep 17 00:00:00 2001 From: jb-alvarado Date: Sun, 28 Apr 2024 21:41:17 +0200 Subject: [PATCH] playout status --- ffplayout-api/src/api/routes.rs | 31 +++++++++++++++++----- ffplayout-api/src/sse/broadcast.rs | 40 ++++++++++++++++++----------- ffplayout-api/src/utils/channels.rs | 9 ++++--- ffplayout-api/src/utils/control.rs | 30 +++++++++------------- 4 files changed, 68 insertions(+), 42 deletions(-) diff --git a/ffplayout-api/src/api/routes.rs b/ffplayout-api/src/api/routes.rs index 570d6fcd..586877e2 100644 --- a/ffplayout-api/src/api/routes.rs +++ b/ffplayout-api/src/api/routes.rs @@ -651,7 +651,9 @@ pub async fn send_text_message( id: web::Path, data: web::Json>, ) -> Result { - match send_message(&pool.into_inner(), *id, data.into_inner()).await { + let (config, _) = playout_config(&pool.clone().into_inner(), &id).await?; + + match send_message(&config, data.into_inner()).await { Ok(res) => Ok(res.text().await.unwrap_or_else(|_| "Success".into())), Err(e) => Err(e), } @@ -674,7 +676,9 @@ pub async fn control_playout( id: web::Path, control: web::Json, ) -> Result { - match control_state(&pool.into_inner(), *id, &control.control).await { + let (config, _) = playout_config(&pool.clone().into_inner(), &id).await?; + + match control_state(&config, &control.control).await { Ok(res) => Ok(res.text().await.unwrap_or_else(|_| "Success".into())), Err(e) => Err(e), } @@ -716,7 +720,9 @@ pub async fn media_current( pool: web::Data>, id: web::Path, ) -> Result { - match media_info(&pool.into_inner(), *id, "current".into()).await { + let (config, _) = playout_config(&pool.clone().into_inner(), &id).await?; + + match media_info(&config, "current".into()).await { Ok(res) => Ok(res.text().await.unwrap_or_else(|_| "Success".into())), Err(e) => Err(e), } @@ -733,7 +739,9 @@ pub async fn media_next( pool: web::Data>, id: web::Path, ) -> Result { - match media_info(&pool.into_inner(), *id, "next".into()).await { + let (config, _) = playout_config(&pool.clone().into_inner(), &id).await?; + + match media_info(&config, "next".into()).await { Ok(res) => Ok(res.text().await.unwrap_or_else(|_| "Success".into())), Err(e) => Err(e), } @@ -751,7 +759,9 @@ pub async fn media_last( pool: web::Data>, id: web::Path, ) -> Result { - match media_info(&pool.into_inner(), *id, "last".into()).await { + let (config, _) = playout_config(&pool.clone().into_inner(), &id).await?; + + match media_info(&config, "last".into()).await { Ok(res) => Ok(res.text().await.unwrap_or_else(|_| "Success".into())), Err(e) => Err(e), } @@ -778,7 +788,16 @@ pub async fn process_control( proc: web::Json, engine_process: web::Data, ) -> Result { - control_service(&pool.into_inner(), *id, &proc.command, Some(engine_process)).await + let (config, _) = playout_config(&pool.clone().into_inner(), &id).await?; + + control_service( + &pool.into_inner(), + &config, + *id, + &proc.command, + Some(engine_process), + ) + .await } /// #### ffplayout Playlist Operations diff --git a/ffplayout-api/src/sse/broadcast.rs b/ffplayout-api/src/sse/broadcast.rs index 7286f638..3492e6ef 100644 --- a/ffplayout-api/src/sse/broadcast.rs +++ b/ffplayout-api/src/sse/broadcast.rs @@ -7,12 +7,11 @@ use actix_web_lab::{ }; use ffplayout_lib::utils::PlayoutConfig; -use futures_util::future; use parking_lot::Mutex; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; -use crate::utils::system; +use crate::utils::{control::media_info, system}; #[derive(Debug, Clone)] struct Client { @@ -73,9 +72,8 @@ impl Broadcaster { this.remove_stale_clients().await; } - if counter % 30 == 0 { - // TODO: implement playout status - this.broadcast("ping").await; + if counter % 5 == 0 { + this.broadcast_playout().await; } this.broadcast_system().await; @@ -124,20 +122,32 @@ impl Broadcaster { Sse::from_infallible_receiver(rx) } - /// Broadcasts `msg` to all clients. - pub async fn broadcast(&self, msg: &str) { + /// Broadcasts playout status to clients. + pub async fn broadcast_playout(&self) { let clients = self.inner.lock().clients.clone(); - let send_futures = clients - .iter() - .map(|client| client.sender.send(sse::Data::new(msg).into())); - - // try to send to all clients, ignoring failures - // disconnected clients will get swept up by `remove_stale_clients` - let _ = future::join_all(send_futures).await; + for client in clients.iter().filter(|client| client.endpoint == "playout") { + match media_info(&client.config, "current".into()).await { + Ok(res) => { + let _ = client + .sender + .send( + sse::Data::new(res.text().await.unwrap_or_else(|_| "Success".into())) + .into(), + ) + .await; + } + Err(_) => { + let _ = client + .sender + .send(sse::Data::new("not running").into()) + .await; + } + }; + } } - /// Broadcasts `msg` to all clients. + /// Broadcasts system status to clients. pub async fn broadcast_system(&self) { let clients = self.inner.lock().clients.clone(); diff --git a/ffplayout-api/src/utils/channels.rs b/ffplayout-api/src/utils/channels.rs index e3ae52c0..3d289c3e 100644 --- a/ffplayout-api/src/utils/channels.rs +++ b/ffplayout-api/src/utils/channels.rs @@ -12,6 +12,7 @@ use crate::utils::{ use ffplayout_lib::utils::PlayoutConfig; use crate::db::{handles, models::Channel}; +use crate::utils::playout_config; pub async fn create_channel( conn: &Pool, @@ -51,15 +52,17 @@ pub async fn create_channel( serde_yaml::to_writer(file, &config).unwrap(); let new_channel = handles::insert_channel(conn, target_channel).await?; - control_service(conn, new_channel.id, &ServiceCmd::Enable, None).await?; + control_service(conn, &config, new_channel.id, &ServiceCmd::Enable, None).await?; Ok(new_channel) } pub async fn delete_channel(conn: &Pool, id: i32) -> Result<(), ServiceError> { let channel = handles::select_channel(conn, &id).await?; - control_service(conn, channel.id, &ServiceCmd::Stop, None).await?; - control_service(conn, channel.id, &ServiceCmd::Disable, None).await?; + let (config, _) = playout_config(conn, &id).await?; + + control_service(conn, &config, channel.id, &ServiceCmd::Stop, None).await?; + control_service(conn, &config, channel.id, &ServiceCmd::Disable, None).await?; if let Err(e) = fs::remove_file(channel.config_path) { error!("{e}"); diff --git a/ffplayout-api/src/utils/control.rs b/ffplayout-api/src/utils/control.rs index cef70a3d..19e62d31 100644 --- a/ffplayout-api/src/utils/control.rs +++ b/ffplayout-api/src/utils/control.rs @@ -15,8 +15,8 @@ use tokio::{ }; use crate::db::handles::select_channel; -use crate::utils::{errors::ServiceError, playout_config}; -use ffplayout_lib::vec_strings; +use crate::utils::errors::ServiceError; +use ffplayout_lib::{utils::PlayoutConfig, vec_strings}; #[derive(Debug, Deserialize, Serialize, Clone)] struct TextParams { @@ -241,11 +241,10 @@ impl SystemD { } } -async fn post_request(conn: &Pool, id: i32, obj: T) -> Result +async fn post_request(config: &PlayoutConfig, obj: T) -> Result where T: Serialize, { - let (config, _) = playout_config(conn, &id).await?; let url = format!("http://{}", config.rpc_server.address); let client = Client::new(); @@ -262,8 +261,7 @@ where } pub async fn send_message( - conn: &Pool, - id: i32, + config: &PlayoutConfig, message: HashMap, ) -> Result { let json_obj = TextParams { @@ -271,33 +269,29 @@ pub async fn send_message( message, }; - post_request(conn, id, json_obj).await + post_request(config, json_obj).await } pub async fn control_state( - conn: &Pool, - id: i32, + config: &PlayoutConfig, command: &str, ) -> Result { let json_obj = ControlParams { control: command.to_owned(), }; - post_request(conn, id, json_obj).await + post_request(config, json_obj).await } -pub async fn media_info( - conn: &Pool, - id: i32, - command: String, -) -> Result { +pub async fn media_info(config: &PlayoutConfig, command: String) -> Result { let json_obj = MediaParams { media: command }; - post_request(conn, id, json_obj).await + post_request(config, json_obj).await } pub async fn control_service( conn: &Pool, + config: &PlayoutConfig, id: i32, command: &ServiceCmd, engine: Option>, @@ -307,14 +301,14 @@ pub async fn control_service( match command { ServiceCmd::Start => en.start().await, ServiceCmd::Stop => { - if control_state(conn, id, "stop_all").await.is_ok() { + if control_state(config, "stop_all").await.is_ok() { en.stop().await } else { Err(ServiceError::NoContent("Nothing to stop".to_string())) } } ServiceCmd::Restart => { - if control_state(conn, id, "stop_all").await.is_ok() { + if control_state(config, "stop_all").await.is_ok() { en.restart().await } else { Err(ServiceError::NoContent("Nothing to restart".to_string()))