diff --git a/.vscode/settings.json b/.vscode/settings.json index 766d5939..629bf9a2 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -16,5 +16,10 @@ }, "[yaml]": { "editor.defaultFormatter": "esbenp.prettier-vscode" - } + }, + "cSpell.words": [ + "actix", + "tokio", + "uuids" + ] } diff --git a/ffplayout-api/examples/index.html b/ffplayout-api/examples/index.html index ef87ab39..a9c6d4e6 100644 --- a/ffplayout-api/examples/index.html +++ b/ffplayout-api/examples/index.html @@ -1,28 +1,35 @@ - - - - - Server-sent events - - - -
- - + + + + + Server-sent events + + + +
+
+ + diff --git a/ffplayout-api/examples/sse.rs b/ffplayout-api/examples/sse.rs index 573bdce8..033df68f 100644 --- a/ffplayout-api/examples/sse.rs +++ b/ffplayout-api/examples/sse.rs @@ -44,7 +44,9 @@ async fn index() -> impl Responder { #[get("/events")] async fn event_stream(broadcaster: web::Data) -> impl Responder { - broadcaster.new_client().await + broadcaster + .new_client(1, PlayoutConfig::default(), "ping".to_string()) + .await } #[post("/broadcast/{msg}")] diff --git a/ffplayout-api/src/main.rs b/ffplayout-api/src/main.rs index 89fb70b5..981c460f 100644 --- a/ffplayout-api/src/main.rs +++ b/ffplayout-api/src/main.rs @@ -1,4 +1,9 @@ -use std::{collections::HashSet, env, process::exit, sync::Mutex}; +use std::{ + collections::HashSet, + env, + process::exit, + sync::{Arc, Mutex}, +}; use actix_files::Files; use actix_web::{ @@ -16,7 +21,7 @@ use simplelog::*; use ffplayout_api::{ api::{auth, routes::*}, db::{db_pool, models::LoginUser}, - sse::{routes::*, AuthState}, + sse::{broadcast::Broadcaster, routes::*, AuthState}, utils::{control::ProcessControl, db_path, init_config, run_args}, ARGS, }; @@ -82,6 +87,7 @@ async fn main() -> std::io::Result<()> { let auth_state = web::Data::new(AuthState { uuids: Mutex::new(HashSet::new()), }); + let broadcast_data = Broadcaster::create(); info!("running ffplayout API, listen on http://{conn}"); @@ -97,6 +103,7 @@ async fn main() -> std::io::Result<()> { .app_data(db_pool) .app_data(engine_process.clone()) .app_data(auth_state.clone()) + .app_data(web::Data::from(Arc::clone(&broadcast_data))) .wrap(logger) .service(login) .service( @@ -140,7 +147,11 @@ async fn main() -> std::io::Result<()> { .service(get_system_stat) .service(generate_uuid), ) - .service(web::scope("/data").service(validate_uuid)) + .service( + web::scope("/data") + .service(validate_uuid) + .service(event_stream), + ) .service(get_file); if let Some(public) = &ARGS.public { diff --git a/ffplayout-api/src/sse/broadcast.rs b/ffplayout-api/src/sse/broadcast.rs index 25f216f2..7286f638 100644 --- a/ffplayout-api/src/sse/broadcast.rs +++ b/ffplayout-api/src/sse/broadcast.rs @@ -1,22 +1,50 @@ use std::{sync::Arc, time::Duration}; -use actix_web::rt::time::interval; +use actix_web::{rt::time::interval, web}; use actix_web_lab::{ sse::{self, Sse}, util::InfallibleStream, }; + +use ffplayout_lib::utils::PlayoutConfig; use futures_util::future; use parking_lot::Mutex; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; +use crate::utils::system; + +#[derive(Debug, Clone)] +struct Client { + _channel: i32, + config: PlayoutConfig, + endpoint: String, + sender: mpsc::Sender, +} + +impl Client { + fn new( + _channel: i32, + config: PlayoutConfig, + endpoint: String, + sender: mpsc::Sender, + ) -> Self { + Self { + _channel, + config, + endpoint, + sender, + } + } +} + pub struct Broadcaster { inner: Mutex, } #[derive(Debug, Clone, Default)] struct BroadcasterInner { - clients: Vec>, + clients: Vec, } impl Broadcaster { @@ -35,11 +63,24 @@ impl Broadcaster { /// list if not. fn spawn_ping(this: Arc) { actix_web::rt::spawn(async move { - let mut interval = interval(Duration::from_secs(10)); + let mut interval = interval(Duration::from_secs(1)); + let mut counter = 0; loop { interval.tick().await; - this.remove_stale_clients().await; + + if counter % 10 == 0 { + this.remove_stale_clients().await; + } + + if counter % 30 == 0 { + // TODO: implement playout status + this.broadcast("ping").await; + } + + this.broadcast_system().await; + + counter = (counter + 1) % 61; } }); } @@ -52,6 +93,7 @@ impl Broadcaster { for client in clients { if client + .sender .send(sse::Event::Comment("ping".into())) .await .is_ok() @@ -64,12 +106,20 @@ impl Broadcaster { } /// Registers client with broadcaster, returning an SSE response body. - pub async fn new_client(&self) -> Sse>> { + pub async fn new_client( + &self, + channel: i32, + config: PlayoutConfig, + endpoint: String, + ) -> Sse>> { let (tx, rx) = mpsc::channel(10); tx.send(sse::Data::new("connected").into()).await.unwrap(); - self.inner.lock().clients.push(tx); + self.inner + .lock() + .clients + .push(Client::new(channel, config, endpoint, tx)); Sse::from_infallible_receiver(rx) } @@ -80,10 +130,24 @@ impl Broadcaster { let send_futures = clients .iter() - .map(|client| client.send(sse::Data::new(msg).into())); + .map(|client| client.sender.send(sse::Data::new(msg).into())); // try to send to all clients, ignoring failures // disconnected clients will get swept up by `remove_stale_clients` let _ = future::join_all(send_futures).await; } + + /// Broadcasts `msg` to all clients. + pub async fn broadcast_system(&self) { + let clients = self.inner.lock().clients.clone(); + + for client in clients { + if &client.endpoint == "system" { + if let Ok(stat) = web::block(move || system::stat(client.config.clone())).await { + let stat_string = stat.to_string(); + let _ = client.sender.send(sse::Data::new(stat_string).into()).await; + }; + } + } + } } diff --git a/ffplayout-api/src/sse/mod.rs b/ffplayout-api/src/sse/mod.rs index d908d058..e2545c8b 100644 --- a/ffplayout-api/src/sse/mod.rs +++ b/ffplayout-api/src/sse/mod.rs @@ -21,7 +21,7 @@ impl UuidData { pub fn new() -> Self { Self { uuid: Uuid::new_v4(), - expiration: SystemTime::now() + Duration::from_secs(12 * 3600), // 12 hours + expiration: SystemTime::now() + Duration::from_secs(2 * 3600), // 2 hours } } } @@ -30,6 +30,7 @@ pub struct AuthState { pub uuids: Mutex>, } +/// Remove all UUIDs from HashSet which are older the expiration time. pub fn prune_uuids(uuids: &mut HashSet) { uuids.retain(|entry| entry.expiration > SystemTime::now()); } diff --git a/ffplayout-api/src/sse/routes.rs b/ffplayout-api/src/sse/routes.rs index 59cda62c..665ca9b1 100644 --- a/ffplayout-api/src/sse/routes.rs +++ b/ffplayout-api/src/sse/routes.rs @@ -1,18 +1,22 @@ use actix_web::{get, post, web, Responder}; use actix_web_grants::proc_macro::protect; use serde::{Deserialize, Serialize}; +use sqlx::{Pool, Sqlite}; use super::{check_uuid, prune_uuids, AuthState, UuidData}; -use crate::utils::{errors::ServiceError, Role}; +use crate::sse::broadcast::Broadcaster; +use crate::utils::{errors::ServiceError, playout_config, Role}; #[derive(Deserialize, Serialize)] struct User { + #[serde(default, skip_serializing)] + endpoint: String, uuid: String, } impl User { - fn new(uuid: String) -> Self { - Self { uuid } + fn new(endpoint: String, uuid: String) -> Self { + Self { endpoint, uuid } } } @@ -26,7 +30,7 @@ impl User { async fn generate_uuid(data: web::Data) -> Result { let mut uuids = data.uuids.lock().map_err(|e| e.to_string())?; let new_uuid = UuidData::new(); - let user_auth = User::new(new_uuid.uuid.to_string()); + let user_auth = User::new(String::new(), new_uuid.uuid.to_string()); prune_uuids(&mut uuids); @@ -52,3 +56,29 @@ async fn validate_uuid( Err(e) => Err(e), } } + +/// **Connect to event handler** +/// +/// ```BASH +/// curl -X GET 'http://127.0.0.1:8787/data/event/1?endpoint=system&uuid=f2f8c29b-712a-48c5-8919-b535d3a05a3a' +/// ``` +#[get("/event/{channel}")] +async fn event_stream( + pool: web::Data>, + broadcaster: web::Data, + data: web::Data, + id: web::Path, + user: web::Query, +) -> Result { + let mut uuids = data.uuids.lock().map_err(|e| e.to_string())?; + + if let Err(e) = check_uuid(&mut uuids, user.uuid.as_str()) { + return Err(e); + } + + let (config, _) = playout_config(&pool.clone().into_inner(), &id).await?; + + Ok(broadcaster + .new_client(*id, config, user.endpoint.clone()) + .await) +} diff --git a/ffplayout-api/src/utils/system.rs b/ffplayout-api/src/utils/system.rs index 9cd47e15..3feb2e47 100644 --- a/ffplayout-api/src/utils/system.rs +++ b/ffplayout-api/src/utils/system.rs @@ -71,6 +71,12 @@ pub struct SystemStat { pub system: MySystem, } +impl SystemStat { + pub fn to_string(&self) -> String { + serde_json::to_string(&self).unwrap() + } +} + pub fn stat(config: PlayoutConfig) -> SystemStat { let mut disks = DISKS.lock().unwrap(); let mut networks = NETWORKS.lock().unwrap();