send system status over sse

This commit is contained in:
jb-alvarado 2024-04-25 12:41:26 +02:00
parent 1fbfda2e85
commit d0244da05e
8 changed files with 168 additions and 42 deletions

View File

@ -16,5 +16,10 @@
}, },
"[yaml]": { "[yaml]": {
"editor.defaultFormatter": "esbenp.prettier-vscode" "editor.defaultFormatter": "esbenp.prettier-vscode"
} },
"cSpell.words": [
"actix",
"tokio",
"uuids"
]
} }

View File

@ -1,28 +1,35 @@
<!DOCTYPE html> <!DOCTYPE html>
<html lang="en"> <html lang="en">
<head> <head>
<meta charset="UTF-8"> <meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0"> <meta name="viewport" content="width=device-width, initial-scale=1.0" />
<meta http-equiv="X-UA-Compatible" content="ie=edge"> <meta http-equiv="X-UA-Compatible" content="ie=edge" />
<title>Server-sent events</title> <title>Server-sent events</title>
<style> <style>
p { p {
margin-top: 0.5em; margin-top: 0.5em;
margin-bottom: 0.5em; margin-bottom: 0.5em;
} }
</style> </style>
</head> </head>
<body> <body>
<div id="root"></div> <div id="root"></div>
<script> <div id="ping"></div>
let root = document.getElementById("root"); <script>
let events = new EventSource("/events"); let root = document.getElementById('root')
events.onmessage = (event) => { const ping = document.getElementById('ping')
let data = document.createElement("p"); let events = new EventSource('/events')
let time = new Date().toLocaleTimeString(); events.onmessage = (event) => {
data.innerText = time + ": " + event.data; let data = document.createElement('p')
root.appendChild(data); let time = new Date().toLocaleTimeString()
} data.innerText = time + ': ' + event.data
</script>
</body> if (event.data.includes('ping')) {
ping.innerHTML = time + ': ' + event.data
} else {
root.appendChild(data)
}
}
</script>
</body>
</html> </html>

View File

@ -44,7 +44,9 @@ async fn index() -> impl Responder {
#[get("/events")] #[get("/events")]
async fn event_stream(broadcaster: web::Data<Broadcaster>) -> impl Responder { async fn event_stream(broadcaster: web::Data<Broadcaster>) -> impl Responder {
broadcaster.new_client().await broadcaster
.new_client(1, PlayoutConfig::default(), "ping".to_string())
.await
} }
#[post("/broadcast/{msg}")] #[post("/broadcast/{msg}")]

View File

@ -1,4 +1,9 @@
use std::{collections::HashSet, env, process::exit, sync::Mutex}; use std::{
collections::HashSet,
env,
process::exit,
sync::{Arc, Mutex},
};
use actix_files::Files; use actix_files::Files;
use actix_web::{ use actix_web::{
@ -16,7 +21,7 @@ use simplelog::*;
use ffplayout_api::{ use ffplayout_api::{
api::{auth, routes::*}, api::{auth, routes::*},
db::{db_pool, models::LoginUser}, db::{db_pool, models::LoginUser},
sse::{routes::*, AuthState}, sse::{broadcast::Broadcaster, routes::*, AuthState},
utils::{control::ProcessControl, db_path, init_config, run_args}, utils::{control::ProcessControl, db_path, init_config, run_args},
ARGS, ARGS,
}; };
@ -82,6 +87,7 @@ async fn main() -> std::io::Result<()> {
let auth_state = web::Data::new(AuthState { let auth_state = web::Data::new(AuthState {
uuids: Mutex::new(HashSet::new()), uuids: Mutex::new(HashSet::new()),
}); });
let broadcast_data = Broadcaster::create();
info!("running ffplayout API, listen on http://{conn}"); info!("running ffplayout API, listen on http://{conn}");
@ -97,6 +103,7 @@ async fn main() -> std::io::Result<()> {
.app_data(db_pool) .app_data(db_pool)
.app_data(engine_process.clone()) .app_data(engine_process.clone())
.app_data(auth_state.clone()) .app_data(auth_state.clone())
.app_data(web::Data::from(Arc::clone(&broadcast_data)))
.wrap(logger) .wrap(logger)
.service(login) .service(login)
.service( .service(
@ -140,7 +147,11 @@ async fn main() -> std::io::Result<()> {
.service(get_system_stat) .service(get_system_stat)
.service(generate_uuid), .service(generate_uuid),
) )
.service(web::scope("/data").service(validate_uuid)) .service(
web::scope("/data")
.service(validate_uuid)
.service(event_stream),
)
.service(get_file); .service(get_file);
if let Some(public) = &ARGS.public { if let Some(public) = &ARGS.public {

View File

@ -1,22 +1,50 @@
use std::{sync::Arc, time::Duration}; use std::{sync::Arc, time::Duration};
use actix_web::rt::time::interval; use actix_web::{rt::time::interval, web};
use actix_web_lab::{ use actix_web_lab::{
sse::{self, Sse}, sse::{self, Sse},
util::InfallibleStream, util::InfallibleStream,
}; };
use ffplayout_lib::utils::PlayoutConfig;
use futures_util::future; use futures_util::future;
use parking_lot::Mutex; use parking_lot::Mutex;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream; use tokio_stream::wrappers::ReceiverStream;
use crate::utils::system;
#[derive(Debug, Clone)]
struct Client {
_channel: i32,
config: PlayoutConfig,
endpoint: String,
sender: mpsc::Sender<sse::Event>,
}
impl Client {
fn new(
_channel: i32,
config: PlayoutConfig,
endpoint: String,
sender: mpsc::Sender<sse::Event>,
) -> Self {
Self {
_channel,
config,
endpoint,
sender,
}
}
}
pub struct Broadcaster { pub struct Broadcaster {
inner: Mutex<BroadcasterInner>, inner: Mutex<BroadcasterInner>,
} }
#[derive(Debug, Clone, Default)] #[derive(Debug, Clone, Default)]
struct BroadcasterInner { struct BroadcasterInner {
clients: Vec<mpsc::Sender<sse::Event>>, clients: Vec<Client>,
} }
impl Broadcaster { impl Broadcaster {
@ -35,11 +63,24 @@ impl Broadcaster {
/// list if not. /// list if not.
fn spawn_ping(this: Arc<Self>) { fn spawn_ping(this: Arc<Self>) {
actix_web::rt::spawn(async move { actix_web::rt::spawn(async move {
let mut interval = interval(Duration::from_secs(10)); let mut interval = interval(Duration::from_secs(1));
let mut counter = 0;
loop { loop {
interval.tick().await; interval.tick().await;
this.remove_stale_clients().await;
if counter % 10 == 0 {
this.remove_stale_clients().await;
}
if counter % 30 == 0 {
// TODO: implement playout status
this.broadcast("ping").await;
}
this.broadcast_system().await;
counter = (counter + 1) % 61;
} }
}); });
} }
@ -52,6 +93,7 @@ impl Broadcaster {
for client in clients { for client in clients {
if client if client
.sender
.send(sse::Event::Comment("ping".into())) .send(sse::Event::Comment("ping".into()))
.await .await
.is_ok() .is_ok()
@ -64,12 +106,20 @@ impl Broadcaster {
} }
/// Registers client with broadcaster, returning an SSE response body. /// Registers client with broadcaster, returning an SSE response body.
pub async fn new_client(&self) -> Sse<InfallibleStream<ReceiverStream<sse::Event>>> { pub async fn new_client(
&self,
channel: i32,
config: PlayoutConfig,
endpoint: String,
) -> Sse<InfallibleStream<ReceiverStream<sse::Event>>> {
let (tx, rx) = mpsc::channel(10); let (tx, rx) = mpsc::channel(10);
tx.send(sse::Data::new("connected").into()).await.unwrap(); tx.send(sse::Data::new("connected").into()).await.unwrap();
self.inner.lock().clients.push(tx); self.inner
.lock()
.clients
.push(Client::new(channel, config, endpoint, tx));
Sse::from_infallible_receiver(rx) Sse::from_infallible_receiver(rx)
} }
@ -80,10 +130,24 @@ impl Broadcaster {
let send_futures = clients let send_futures = clients
.iter() .iter()
.map(|client| client.send(sse::Data::new(msg).into())); .map(|client| client.sender.send(sse::Data::new(msg).into()));
// try to send to all clients, ignoring failures // try to send to all clients, ignoring failures
// disconnected clients will get swept up by `remove_stale_clients` // disconnected clients will get swept up by `remove_stale_clients`
let _ = future::join_all(send_futures).await; let _ = future::join_all(send_futures).await;
} }
/// Broadcasts `msg` to all clients.
pub async fn broadcast_system(&self) {
let clients = self.inner.lock().clients.clone();
for client in clients {
if &client.endpoint == "system" {
if let Ok(stat) = web::block(move || system::stat(client.config.clone())).await {
let stat_string = stat.to_string();
let _ = client.sender.send(sse::Data::new(stat_string).into()).await;
};
}
}
}
} }

View File

@ -21,7 +21,7 @@ impl UuidData {
pub fn new() -> Self { pub fn new() -> Self {
Self { Self {
uuid: Uuid::new_v4(), uuid: Uuid::new_v4(),
expiration: SystemTime::now() + Duration::from_secs(12 * 3600), // 12 hours expiration: SystemTime::now() + Duration::from_secs(2 * 3600), // 2 hours
} }
} }
} }
@ -30,6 +30,7 @@ pub struct AuthState {
pub uuids: Mutex<HashSet<UuidData>>, pub uuids: Mutex<HashSet<UuidData>>,
} }
/// Remove all UUIDs from HashSet which are older the expiration time.
pub fn prune_uuids(uuids: &mut HashSet<UuidData>) { pub fn prune_uuids(uuids: &mut HashSet<UuidData>) {
uuids.retain(|entry| entry.expiration > SystemTime::now()); uuids.retain(|entry| entry.expiration > SystemTime::now());
} }

View File

@ -1,18 +1,22 @@
use actix_web::{get, post, web, Responder}; use actix_web::{get, post, web, Responder};
use actix_web_grants::proc_macro::protect; use actix_web_grants::proc_macro::protect;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use sqlx::{Pool, Sqlite};
use super::{check_uuid, prune_uuids, AuthState, UuidData}; use super::{check_uuid, prune_uuids, AuthState, UuidData};
use crate::utils::{errors::ServiceError, Role}; use crate::sse::broadcast::Broadcaster;
use crate::utils::{errors::ServiceError, playout_config, Role};
#[derive(Deserialize, Serialize)] #[derive(Deserialize, Serialize)]
struct User { struct User {
#[serde(default, skip_serializing)]
endpoint: String,
uuid: String, uuid: String,
} }
impl User { impl User {
fn new(uuid: String) -> Self { fn new(endpoint: String, uuid: String) -> Self {
Self { uuid } Self { endpoint, uuid }
} }
} }
@ -26,7 +30,7 @@ impl User {
async fn generate_uuid(data: web::Data<AuthState>) -> Result<impl Responder, ServiceError> { async fn generate_uuid(data: web::Data<AuthState>) -> Result<impl Responder, ServiceError> {
let mut uuids = data.uuids.lock().map_err(|e| e.to_string())?; let mut uuids = data.uuids.lock().map_err(|e| e.to_string())?;
let new_uuid = UuidData::new(); let new_uuid = UuidData::new();
let user_auth = User::new(new_uuid.uuid.to_string()); let user_auth = User::new(String::new(), new_uuid.uuid.to_string());
prune_uuids(&mut uuids); prune_uuids(&mut uuids);
@ -52,3 +56,29 @@ async fn validate_uuid(
Err(e) => Err(e), Err(e) => Err(e),
} }
} }
/// **Connect to event handler**
///
/// ```BASH
/// curl -X GET 'http://127.0.0.1:8787/data/event/1?endpoint=system&uuid=f2f8c29b-712a-48c5-8919-b535d3a05a3a'
/// ```
#[get("/event/{channel}")]
async fn event_stream(
pool: web::Data<Pool<Sqlite>>,
broadcaster: web::Data<Broadcaster>,
data: web::Data<AuthState>,
id: web::Path<i32>,
user: web::Query<User>,
) -> Result<impl Responder, ServiceError> {
let mut uuids = data.uuids.lock().map_err(|e| e.to_string())?;
if let Err(e) = check_uuid(&mut uuids, user.uuid.as_str()) {
return Err(e);
}
let (config, _) = playout_config(&pool.clone().into_inner(), &id).await?;
Ok(broadcaster
.new_client(*id, config, user.endpoint.clone())
.await)
}

View File

@ -71,6 +71,12 @@ pub struct SystemStat {
pub system: MySystem, pub system: MySystem,
} }
impl SystemStat {
pub fn to_string(&self) -> String {
serde_json::to_string(&self).unwrap()
}
}
pub fn stat(config: PlayoutConfig) -> SystemStat { pub fn stat(config: PlayoutConfig) -> SystemStat {
let mut disks = DISKS.lock().unwrap(); let mut disks = DISKS.lock().unwrap();
let mut networks = NETWORKS.lock().unwrap(); let mut networks = NETWORKS.lock().unwrap();