playout status

This commit is contained in:
jb-alvarado 2024-04-28 21:41:17 +02:00
parent 47b88d90ca
commit 1d8307015f
4 changed files with 68 additions and 42 deletions

View File

@ -651,7 +651,9 @@ pub async fn send_text_message(
id: web::Path<i32>, id: web::Path<i32>,
data: web::Json<HashMap<String, String>>, data: web::Json<HashMap<String, String>>,
) -> Result<impl Responder, ServiceError> { ) -> Result<impl Responder, ServiceError> {
match send_message(&pool.into_inner(), *id, data.into_inner()).await { let (config, _) = playout_config(&pool.clone().into_inner(), &id).await?;
match send_message(&config, data.into_inner()).await {
Ok(res) => Ok(res.text().await.unwrap_or_else(|_| "Success".into())), Ok(res) => Ok(res.text().await.unwrap_or_else(|_| "Success".into())),
Err(e) => Err(e), Err(e) => Err(e),
} }
@ -674,7 +676,9 @@ pub async fn control_playout(
id: web::Path<i32>, id: web::Path<i32>,
control: web::Json<ControlParams>, control: web::Json<ControlParams>,
) -> Result<impl Responder, ServiceError> { ) -> Result<impl Responder, ServiceError> {
match control_state(&pool.into_inner(), *id, &control.control).await { let (config, _) = playout_config(&pool.clone().into_inner(), &id).await?;
match control_state(&config, &control.control).await {
Ok(res) => Ok(res.text().await.unwrap_or_else(|_| "Success".into())), Ok(res) => Ok(res.text().await.unwrap_or_else(|_| "Success".into())),
Err(e) => Err(e), Err(e) => Err(e),
} }
@ -716,7 +720,9 @@ pub async fn media_current(
pool: web::Data<Pool<Sqlite>>, pool: web::Data<Pool<Sqlite>>,
id: web::Path<i32>, id: web::Path<i32>,
) -> Result<impl Responder, ServiceError> { ) -> Result<impl Responder, ServiceError> {
match media_info(&pool.into_inner(), *id, "current".into()).await { let (config, _) = playout_config(&pool.clone().into_inner(), &id).await?;
match media_info(&config, "current".into()).await {
Ok(res) => Ok(res.text().await.unwrap_or_else(|_| "Success".into())), Ok(res) => Ok(res.text().await.unwrap_or_else(|_| "Success".into())),
Err(e) => Err(e), Err(e) => Err(e),
} }
@ -733,7 +739,9 @@ pub async fn media_next(
pool: web::Data<Pool<Sqlite>>, pool: web::Data<Pool<Sqlite>>,
id: web::Path<i32>, id: web::Path<i32>,
) -> Result<impl Responder, ServiceError> { ) -> Result<impl Responder, ServiceError> {
match media_info(&pool.into_inner(), *id, "next".into()).await { let (config, _) = playout_config(&pool.clone().into_inner(), &id).await?;
match media_info(&config, "next".into()).await {
Ok(res) => Ok(res.text().await.unwrap_or_else(|_| "Success".into())), Ok(res) => Ok(res.text().await.unwrap_or_else(|_| "Success".into())),
Err(e) => Err(e), Err(e) => Err(e),
} }
@ -751,7 +759,9 @@ pub async fn media_last(
pool: web::Data<Pool<Sqlite>>, pool: web::Data<Pool<Sqlite>>,
id: web::Path<i32>, id: web::Path<i32>,
) -> Result<impl Responder, ServiceError> { ) -> Result<impl Responder, ServiceError> {
match media_info(&pool.into_inner(), *id, "last".into()).await { let (config, _) = playout_config(&pool.clone().into_inner(), &id).await?;
match media_info(&config, "last".into()).await {
Ok(res) => Ok(res.text().await.unwrap_or_else(|_| "Success".into())), Ok(res) => Ok(res.text().await.unwrap_or_else(|_| "Success".into())),
Err(e) => Err(e), Err(e) => Err(e),
} }
@ -778,7 +788,16 @@ pub async fn process_control(
proc: web::Json<Process>, proc: web::Json<Process>,
engine_process: web::Data<ProcessControl>, engine_process: web::Data<ProcessControl>,
) -> Result<impl Responder, ServiceError> { ) -> Result<impl Responder, ServiceError> {
control_service(&pool.into_inner(), *id, &proc.command, Some(engine_process)).await let (config, _) = playout_config(&pool.clone().into_inner(), &id).await?;
control_service(
&pool.into_inner(),
&config,
*id,
&proc.command,
Some(engine_process),
)
.await
} }
/// #### ffplayout Playlist Operations /// #### ffplayout Playlist Operations

View File

@ -7,12 +7,11 @@ use actix_web_lab::{
}; };
use ffplayout_lib::utils::PlayoutConfig; use ffplayout_lib::utils::PlayoutConfig;
use futures_util::future;
use parking_lot::Mutex; use parking_lot::Mutex;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream; use tokio_stream::wrappers::ReceiverStream;
use crate::utils::system; use crate::utils::{control::media_info, system};
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
struct Client { struct Client {
@ -73,9 +72,8 @@ impl Broadcaster {
this.remove_stale_clients().await; this.remove_stale_clients().await;
} }
if counter % 30 == 0 { if counter % 5 == 0 {
// TODO: implement playout status this.broadcast_playout().await;
this.broadcast("ping").await;
} }
this.broadcast_system().await; this.broadcast_system().await;
@ -124,20 +122,32 @@ impl Broadcaster {
Sse::from_infallible_receiver(rx) Sse::from_infallible_receiver(rx)
} }
/// Broadcasts `msg` to all clients. /// Broadcasts playout status to clients.
pub async fn broadcast(&self, msg: &str) { pub async fn broadcast_playout(&self) {
let clients = self.inner.lock().clients.clone(); let clients = self.inner.lock().clients.clone();
let send_futures = clients for client in clients.iter().filter(|client| client.endpoint == "playout") {
.iter() match media_info(&client.config, "current".into()).await {
.map(|client| client.sender.send(sse::Data::new(msg).into())); Ok(res) => {
let _ = client
// try to send to all clients, ignoring failures .sender
// disconnected clients will get swept up by `remove_stale_clients` .send(
let _ = future::join_all(send_futures).await; sse::Data::new(res.text().await.unwrap_or_else(|_| "Success".into()))
.into(),
)
.await;
}
Err(_) => {
let _ = client
.sender
.send(sse::Data::new("not running").into())
.await;
}
};
}
} }
/// Broadcasts `msg` to all clients. /// Broadcasts system status to clients.
pub async fn broadcast_system(&self) { pub async fn broadcast_system(&self) {
let clients = self.inner.lock().clients.clone(); let clients = self.inner.lock().clients.clone();

View File

@ -12,6 +12,7 @@ use crate::utils::{
use ffplayout_lib::utils::PlayoutConfig; use ffplayout_lib::utils::PlayoutConfig;
use crate::db::{handles, models::Channel}; use crate::db::{handles, models::Channel};
use crate::utils::playout_config;
pub async fn create_channel( pub async fn create_channel(
conn: &Pool<Sqlite>, conn: &Pool<Sqlite>,
@ -51,15 +52,17 @@ pub async fn create_channel(
serde_yaml::to_writer(file, &config).unwrap(); serde_yaml::to_writer(file, &config).unwrap();
let new_channel = handles::insert_channel(conn, target_channel).await?; let new_channel = handles::insert_channel(conn, target_channel).await?;
control_service(conn, new_channel.id, &ServiceCmd::Enable, None).await?; control_service(conn, &config, new_channel.id, &ServiceCmd::Enable, None).await?;
Ok(new_channel) Ok(new_channel)
} }
pub async fn delete_channel(conn: &Pool<Sqlite>, id: i32) -> Result<(), ServiceError> { pub async fn delete_channel(conn: &Pool<Sqlite>, id: i32) -> Result<(), ServiceError> {
let channel = handles::select_channel(conn, &id).await?; let channel = handles::select_channel(conn, &id).await?;
control_service(conn, channel.id, &ServiceCmd::Stop, None).await?; let (config, _) = playout_config(conn, &id).await?;
control_service(conn, channel.id, &ServiceCmd::Disable, None).await?;
control_service(conn, &config, channel.id, &ServiceCmd::Stop, None).await?;
control_service(conn, &config, channel.id, &ServiceCmd::Disable, None).await?;
if let Err(e) = fs::remove_file(channel.config_path) { if let Err(e) = fs::remove_file(channel.config_path) {
error!("{e}"); error!("{e}");

View File

@ -15,8 +15,8 @@ use tokio::{
}; };
use crate::db::handles::select_channel; use crate::db::handles::select_channel;
use crate::utils::{errors::ServiceError, playout_config}; use crate::utils::errors::ServiceError;
use ffplayout_lib::vec_strings; use ffplayout_lib::{utils::PlayoutConfig, vec_strings};
#[derive(Debug, Deserialize, Serialize, Clone)] #[derive(Debug, Deserialize, Serialize, Clone)]
struct TextParams { struct TextParams {
@ -241,11 +241,10 @@ impl SystemD {
} }
} }
async fn post_request<T>(conn: &Pool<Sqlite>, id: i32, obj: T) -> Result<Response, ServiceError> async fn post_request<T>(config: &PlayoutConfig, obj: T) -> Result<Response, ServiceError>
where where
T: Serialize, T: Serialize,
{ {
let (config, _) = playout_config(conn, &id).await?;
let url = format!("http://{}", config.rpc_server.address); let url = format!("http://{}", config.rpc_server.address);
let client = Client::new(); let client = Client::new();
@ -262,8 +261,7 @@ where
} }
pub async fn send_message( pub async fn send_message(
conn: &Pool<Sqlite>, config: &PlayoutConfig,
id: i32,
message: HashMap<String, String>, message: HashMap<String, String>,
) -> Result<Response, ServiceError> { ) -> Result<Response, ServiceError> {
let json_obj = TextParams { let json_obj = TextParams {
@ -271,33 +269,29 @@ pub async fn send_message(
message, message,
}; };
post_request(conn, id, json_obj).await post_request(config, json_obj).await
} }
pub async fn control_state( pub async fn control_state(
conn: &Pool<Sqlite>, config: &PlayoutConfig,
id: i32,
command: &str, command: &str,
) -> Result<Response, ServiceError> { ) -> Result<Response, ServiceError> {
let json_obj = ControlParams { let json_obj = ControlParams {
control: command.to_owned(), control: command.to_owned(),
}; };
post_request(conn, id, json_obj).await post_request(config, json_obj).await
} }
pub async fn media_info( pub async fn media_info(config: &PlayoutConfig, command: String) -> Result<Response, ServiceError> {
conn: &Pool<Sqlite>,
id: i32,
command: String,
) -> Result<Response, ServiceError> {
let json_obj = MediaParams { media: command }; let json_obj = MediaParams { media: command };
post_request(conn, id, json_obj).await post_request(config, json_obj).await
} }
pub async fn control_service( pub async fn control_service(
conn: &Pool<Sqlite>, conn: &Pool<Sqlite>,
config: &PlayoutConfig,
id: i32, id: i32,
command: &ServiceCmd, command: &ServiceCmd,
engine: Option<web::Data<ProcessControl>>, engine: Option<web::Data<ProcessControl>>,
@ -307,14 +301,14 @@ pub async fn control_service(
match command { match command {
ServiceCmd::Start => en.start().await, ServiceCmd::Start => en.start().await,
ServiceCmd::Stop => { ServiceCmd::Stop => {
if control_state(conn, id, "stop_all").await.is_ok() { if control_state(config, "stop_all").await.is_ok() {
en.stop().await en.stop().await
} else { } else {
Err(ServiceError::NoContent("Nothing to stop".to_string())) Err(ServiceError::NoContent("Nothing to stop".to_string()))
} }
} }
ServiceCmd::Restart => { ServiceCmd::Restart => {
if control_state(conn, id, "stop_all").await.is_ok() { if control_state(config, "stop_all").await.is_ok() {
en.restart().await en.restart().await
} else { } else {
Err(ServiceError::NoContent("Nothing to restart".to_string())) Err(ServiceError::NoContent("Nothing to restart".to_string()))