diff --git a/.vscode/settings.json b/.vscode/settings.json index 766d5939..d89a1f00 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -16,5 +16,11 @@ }, "[yaml]": { "editor.defaultFormatter": "esbenp.prettier-vscode" - } + }, + "cSpell.words": [ + "actix", + "rsplit", + "tokio", + "uuids" + ] } diff --git a/Cargo.lock b/Cargo.lock index 158182cc..ac231a67 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -267,6 +267,55 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "actix-web-lab" +version = "0.20.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7675c1a84eec1b179c844cdea8488e3e409d8e4984026e92fa96c87dd86f33c6" +dependencies = [ + "actix-http", + "actix-router", + "actix-service", + "actix-utils", + "actix-web", + "actix-web-lab-derive", + "ahash", + "arc-swap", + "async-trait", + "bytes", + "bytestring", + "csv", + "derive_more", + "futures-core", + "futures-util", + "http 0.2.12", + "impl-more", + "itertools", + "local-channel", + "mediatype", + "mime", + "once_cell", + "pin-project-lite", + "regex", + "serde", + "serde_html_form", + "serde_json", + "tokio", + "tokio-stream", + "tracing", +] + +[[package]] +name = "actix-web-lab-derive" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9aa0b287c8de4a76b691f29dbb5451e8dd5b79d777eaf87350c9b0cbfdb5e968" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.60", +] + [[package]] name = "actix-web-static-files" version = "4.0.1" @@ -406,6 +455,12 @@ version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "70033777eb8b5124a81a1889416543dddef2de240019b674c81285a2635a7e1e" +[[package]] +name = "arc-swap" +version = "1.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" + [[package]] name = "argon2" version = "0.5.3" @@ -981,6 +1036,27 @@ dependencies = [ "typenum", ] +[[package]] +name = "csv" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac574ff4d437a7b5ad237ef331c17ccca63c46479e5b5453eb8e10bb99a759fe" +dependencies = [ + "csv-core", + "itoa", + "ryu", + "serde", +] + +[[package]] +name = "csv-core" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5efa2b3d7902f4b634a20cae3c9c4e6209dc4779feb6863329607560143efa70" +dependencies = [ + "memchr", +] + [[package]] name = "darling" version = "0.20.8" @@ -1217,7 +1293,7 @@ checksum = "658bd65b1cf4c852a3cc96f18a8ce7b5640f6b703f905c7d74532294c2a63984" [[package]] name = "ffplayout" -version = "0.21.4" +version = "0.22.0" dependencies = [ "chrono", "clap", @@ -1239,13 +1315,14 @@ dependencies = [ [[package]] name = "ffplayout-api" -version = "0.21.4" +version = "0.22.0" dependencies = [ "actix-files", "actix-multipart", "actix-web", "actix-web-grants", "actix-web-httpauth", + "actix-web-lab", "actix-web-static-files", "argon2", "chrono", @@ -1259,6 +1336,7 @@ dependencies = [ "lexical-sort", "local-ip-address", "once_cell", + "parking_lot", "path-clean", "rand", "regex", @@ -1274,11 +1352,13 @@ dependencies = [ "static-files", "sysinfo", "tokio", + "tokio-stream", + "uuid", ] [[package]] name = "ffplayout-lib" -version = "0.21.4" +version = "0.22.0" dependencies = [ "chrono", "crossbeam-channel", @@ -1821,6 +1901,12 @@ dependencies = [ "unicode-normalization", ] +[[package]] +name = "impl-more" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "206ca75c9c03ba3d4ace2460e57b189f39f43de612c2f85836e65c929701bb2d" + [[package]] name = "indexmap" version = "2.2.6" @@ -2096,6 +2182,12 @@ dependencies = [ "digest", ] +[[package]] +name = "mediatype" +version = "0.19.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8878cd8d1b3c8c8ae4b2ba0a36652b7cf192f618a599a7fbdfa25cffd4ea72dd" + [[package]] name = "memchr" version = "2.7.2" @@ -2943,6 +3035,19 @@ dependencies = [ "syn 2.0.60", ] +[[package]] +name = "serde_html_form" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8de514ef58196f1fc96dcaef80fe6170a1ce6215df9687a93fe8300e773fefc5" +dependencies = [ + "form_urlencoded", + "indexmap", + "itoa", + "ryu", + "serde", +] + [[package]] name = "serde_json" version = "1.0.116" @@ -3473,7 +3578,7 @@ dependencies = [ [[package]] name = "tests" -version = "0.21.4" +version = "0.22.0" dependencies = [ "chrono", "crossbeam-channel", diff --git a/Cargo.toml b/Cargo.toml index c73282ae..00dbb1c2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,7 +4,7 @@ default-members = ["ffplayout-api", "ffplayout-engine", "tests"] resolver = "2" [workspace.package] -version = "0.21.4" +version = "0.22.0" license = "GPL-3.0" repository = "https://github.com/ffplayout/ffplayout" authors = ["Jonathan Baecker "] diff --git a/README.md b/README.md index f1633b4f..4be5d9be 100644 --- a/README.md +++ b/README.md @@ -176,19 +176,17 @@ Output from `{"media":"current"}` show: ```JSON { - "current_media": { + "media": { "category": "", "duration": 154.2, "out": 154.2, - "seek": 0.0, + "in": 0.0, "source": "/opt/tv-media/clip.mp4" }, "index": 39, - "play_mode": "playlist", - "played_sec": 67.80771999300123, - "remaining_sec": 86.39228000699876, - "start_sec": 24713.631999999998, - "start_time": "06:51:53.631" + "mode": "playlist", + "ingest": false, + "played": 67.80771999300123, } ``` diff --git a/assets/ffplayout.conf b/assets/ffplayout.conf index 3cbea865..a9f13a00 100644 --- a/assets/ffplayout.conf +++ b/assets/ffplayout.conf @@ -22,7 +22,7 @@ server { proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header X-Forwarded-Proto $scheme; proxy_read_timeout 36000s; - proxy_connect_timeout 36000s; + proxy_connect_timeout 36000s; proxy_send_timeout 36000s; proxy_buffer_size 128k; proxy_buffers 4 256k; @@ -31,6 +31,16 @@ server { proxy_pass http://127.0.0.1:8787; } + location /data { + proxy_set_header Host $http_host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_set_header X-Forwarded-Proto $scheme; + proxy_set_header Connection ""; + proxy_http_version 1.1; + proxy_pass http://127.0.0.1:8787/data; + } + location /live/ { alias /usr/share/ffplayout/public/live/; } diff --git a/ffplayout-api/Cargo.toml b/ffplayout-api/Cargo.toml index 50978ed2..10328234 100644 --- a/ffplayout-api/Cargo.toml +++ b/ffplayout-api/Cargo.toml @@ -19,6 +19,7 @@ actix-multipart = "0.6" actix-web = "4" actix-web-grants = "4" actix-web-httpauth = "0.8" +actix-web-lab = "0.20" actix-web-static-files = "4.0" argon2 = "0.5" chrono = { version = "0.4", default-features = false, features = ["clock", "std"] } @@ -31,6 +32,7 @@ lazy_static = "1.4" lexical-sort = "0.3" local-ip-address = "0.6" once_cell = "1.18" +parking_lot = "0.12" path-clean = "1.0" rand = "0.8" regex = "1" @@ -46,6 +48,8 @@ static-files = "0.2" sysinfo ={ version = "0.30", features = ["linux-netdevs"] } sqlx = { version = "0.7", features = ["runtime-tokio", "sqlite"] } tokio = { version = "1.29", features = ["full"] } +tokio-stream = "0.1" +uuid = "1.8" [build-dependencies] static-files = "0.2" diff --git a/ffplayout-api/examples/uuid_auth.rs b/ffplayout-api/examples/uuid_auth.rs new file mode 100644 index 00000000..13739d20 --- /dev/null +++ b/ffplayout-api/examples/uuid_auth.rs @@ -0,0 +1,80 @@ +/// Example for a simple auth mechanism in SSE. +/// +/// get new UUID: curl -X GET http://127.0.0.1:8080/generate +/// use UUID: curl --header "UUID: f2f8c29b-712a-48c5-8919-b535d3a05a3a" -X GET http://127.0.0.1:8080/check +/// +use std::{collections::HashSet, sync::Mutex, time::Duration, time::SystemTime}; + +use actix_web::{middleware::Logger, web, App, HttpRequest, HttpResponse, HttpServer}; +use simplelog::*; +use uuid::Uuid; + +use ffplayout_lib::utils::{init_logging, PlayoutConfig}; + +#[derive(Debug, Eq, Hash, PartialEq)] +struct UuidData { + uuid: Uuid, + expiration_time: SystemTime, +} + +struct AppState { + uuids: Mutex>, +} + +fn prune_uuids(uuids: &mut HashSet) { + uuids.retain(|entry| entry.expiration_time > SystemTime::now()); +} + +async fn generate_uuid(data: web::Data) -> HttpResponse { + let uuid = Uuid::new_v4(); + let expiration_time = SystemTime::now() + Duration::from_secs(30); // 24 * 3600 -> for 24 hours + let mut uuids = data.uuids.lock().unwrap(); + + prune_uuids(&mut uuids); + + uuids.insert(UuidData { + uuid, + expiration_time, + }); + + HttpResponse::Ok().body(uuid.to_string()) +} + +async fn check_uuid(data: web::Data, req: HttpRequest) -> HttpResponse { + let uuid = req.headers().get("uuid").unwrap().to_str().unwrap(); + let uuid_from_client = Uuid::parse_str(uuid).unwrap(); + let mut uuids = data.uuids.lock().unwrap(); + + prune_uuids(&mut uuids); + + match uuids.iter().find(|entry| entry.uuid == uuid_from_client) { + Some(_) => HttpResponse::Ok().body("UUID is valid"), + None => HttpResponse::Unauthorized().body("Invalid or expired UUID"), + } +} + +#[actix_web::main] +async fn main() -> std::io::Result<()> { + let mut config = PlayoutConfig::new(None, None); + config.mail.recipient = String::new(); + config.logging.log_to_file = false; + config.logging.timestamp = false; + + let logging = init_logging(&config, None, None); + CombinedLogger::init(logging).unwrap(); + + let state = web::Data::new(AppState { + uuids: Mutex::new(HashSet::new()), + }); + + HttpServer::new(move || { + App::new() + .app_data(state.clone()) + .wrap(Logger::default()) + .route("/generate", web::get().to(generate_uuid)) + .route("/check", web::get().to(check_uuid)) + }) + .bind("127.0.0.1:8080")? + .run() + .await +} diff --git a/ffplayout-api/src/api/routes.rs b/ffplayout-api/src/api/routes.rs index 91fbea4a..17806f3f 100644 --- a/ffplayout-api/src/api/routes.rs +++ b/ffplayout-api/src/api/routes.rs @@ -246,7 +246,7 @@ async fn get_user( /// ``` #[get("/user/{name}")] #[protect("Role::Admin", ty = "Role")] -async fn get_user_by_name( +async fn get_by_name( pool: web::Data>, name: web::Path, ) -> Result { @@ -326,7 +326,7 @@ async fn update_user( return Err(ServiceError::InternalServerError); } - Err(ServiceError::Unauthorized) + Err(ServiceError::Unauthorized("No Permission".to_string())) } /// **Add User** @@ -651,7 +651,9 @@ pub async fn send_text_message( id: web::Path, data: web::Json>, ) -> Result { - match send_message(&pool.into_inner(), *id, data.into_inner()).await { + let (config, _) = playout_config(&pool.clone().into_inner(), &id).await?; + + match send_message(&config, data.into_inner()).await { Ok(res) => Ok(res.text().await.unwrap_or_else(|_| "Success".into())), Err(e) => Err(e), } @@ -674,7 +676,9 @@ pub async fn control_playout( id: web::Path, control: web::Json, ) -> Result { - match control_state(&pool.into_inner(), *id, &control.control).await { + let (config, _) = playout_config(&pool.clone().into_inner(), &id).await?; + + match control_state(&config, &control.control).await { Ok(res) => Ok(res.text().await.unwrap_or_else(|_| "Success".into())), Err(e) => Err(e), } @@ -690,25 +694,19 @@ pub async fn control_playout( /// **Response:** /// /// ```JSON -/// { -/// "jsonrpc": "2.0", -/// "result": { -/// "current_media": { +/// { +/// "media": { /// "category": "", /// "duration": 154.2, /// "out": 154.2, -/// "seek": 0.0, +/// "in": 0.0, /// "source": "/opt/tv-media/clip.mp4" /// }, /// "index": 39, -/// "play_mode": "playlist", -/// "played_sec": 67.80771999300123, -/// "remaining_sec": 86.39228000699876, -/// "start_sec": 24713.631999999998, -/// "start_time": "06:51:53.631" -/// }, -/// "id": 1 -/// } +/// "ingest": false, +/// "mode": "playlist", +/// "played": 67.808 +/// } /// ``` #[get("/control/{id}/media/current")] #[protect(any("Role::Admin", "Role::User"), ty = "Role")] @@ -716,7 +714,9 @@ pub async fn media_current( pool: web::Data>, id: web::Path, ) -> Result { - match media_info(&pool.into_inner(), *id, "current".into()).await { + let (config, _) = playout_config(&pool.clone().into_inner(), &id).await?; + + match media_info(&config, "current".into()).await { Ok(res) => Ok(res.text().await.unwrap_or_else(|_| "Success".into())), Err(e) => Err(e), } @@ -733,7 +733,9 @@ pub async fn media_next( pool: web::Data>, id: web::Path, ) -> Result { - match media_info(&pool.into_inner(), *id, "next".into()).await { + let (config, _) = playout_config(&pool.clone().into_inner(), &id).await?; + + match media_info(&config, "next".into()).await { Ok(res) => Ok(res.text().await.unwrap_or_else(|_| "Success".into())), Err(e) => Err(e), } @@ -751,7 +753,9 @@ pub async fn media_last( pool: web::Data>, id: web::Path, ) -> Result { - match media_info(&pool.into_inner(), *id, "last".into()).await { + let (config, _) = playout_config(&pool.clone().into_inner(), &id).await?; + + match media_info(&config, "last".into()).await { Ok(res) => Ok(res.text().await.unwrap_or_else(|_| "Success".into())), Err(e) => Err(e), } @@ -778,7 +782,16 @@ pub async fn process_control( proc: web::Json, engine_process: web::Data, ) -> Result { - control_service(&pool.into_inner(), *id, &proc.command, Some(engine_process)).await + let (config, _) = playout_config(&pool.clone().into_inner(), &id).await?; + + control_service( + &pool.into_inner(), + &config, + *id, + &proc.command, + Some(engine_process), + ) + .await } /// #### ffplayout Playlist Operations diff --git a/ffplayout-api/src/lib.rs b/ffplayout-api/src/lib.rs new file mode 100644 index 00000000..3809055f --- /dev/null +++ b/ffplayout-api/src/lib.rs @@ -0,0 +1,21 @@ +use std::sync::{Arc, Mutex}; + +use clap::Parser; +use lazy_static::lazy_static; +use sysinfo::{Disks, Networks, System}; + +pub mod api; +pub mod db; +pub mod sse; +pub mod utils; + +use utils::args_parse::Args; + +lazy_static! { + pub static ref ARGS: Args = Args::parse(); + pub static ref DISKS: Arc> = + Arc::new(Mutex::new(Disks::new_with_refreshed_list())); + pub static ref NETWORKS: Arc> = + Arc::new(Mutex::new(Networks::new_with_refreshed_list())); + pub static ref SYS: Arc> = Arc::new(Mutex::new(System::new_all())); +} diff --git a/ffplayout-api/src/main.rs b/ffplayout-api/src/main.rs index 769cdffc..c3e52737 100644 --- a/ffplayout-api/src/main.rs +++ b/ffplayout-api/src/main.rs @@ -1,8 +1,4 @@ -use std::{ - env, - process::exit, - sync::{Arc, Mutex}, -}; +use std::{collections::HashSet, env, process::exit, sync::Arc}; use actix_files::Files; use actix_web::{ @@ -14,37 +10,26 @@ use actix_web_httpauth::{extractors::bearer::BearerAuth, middleware::HttpAuthent #[cfg(all(not(debug_assertions), feature = "embed_frontend"))] use actix_web_static_files::ResourceFiles; -use clap::Parser; -use lazy_static::lazy_static; use path_clean::PathClean; use simplelog::*; -use sysinfo::{Disks, Networks, System}; +use tokio::sync::Mutex; -pub mod api; -pub mod db; -pub mod utils; - -use api::{auth, routes::*}; -use db::{db_pool, models::LoginUser}; -use utils::{args_parse::Args, control::ProcessControl, db_path, init_config, run_args}; +use ffplayout_api::{ + api::{auth, routes::*}, + db::{db_pool, models::LoginUser}, + sse::{broadcast::Broadcaster, routes::*, AuthState}, + utils::{control::ProcessControl, db_path, init_config, run_args}, + ARGS, +}; #[cfg(any(debug_assertions, not(feature = "embed_frontend")))] -use utils::public_path; +use ffplayout_api::utils::public_path; use ffplayout_lib::utils::{init_logging, PlayoutConfig}; #[cfg(all(not(debug_assertions), feature = "embed_frontend"))] include!(concat!(env!("OUT_DIR"), "/generated.rs")); -lazy_static! { - pub static ref ARGS: Args = Args::parse(); - pub static ref DISKS: Arc> = - Arc::new(Mutex::new(Disks::new_with_refreshed_list())); - pub static ref NETWORKS: Arc> = - Arc::new(Mutex::new(Networks::new_with_refreshed_list())); - pub static ref SYS: Arc> = Arc::new(Mutex::new(System::new_all())); -} - async fn validator( req: ServiceRequest, credentials: BearerAuth, @@ -95,6 +80,10 @@ async fn main() -> std::io::Result<()> { let addr = ip_port[0]; let port = ip_port[1].parse::().unwrap(); let engine_process = web::Data::new(ProcessControl::new()); + let auth_state = web::Data::new(AuthState { + uuids: Mutex::new(HashSet::new()), + }); + let broadcast_data = Broadcaster::create(); info!("running ffplayout API, listen on http://{conn}"); @@ -109,14 +98,16 @@ async fn main() -> std::io::Result<()> { let mut web_app = App::new() .app_data(db_pool) .app_data(engine_process.clone()) + .app_data(auth_state.clone()) + .app_data(web::Data::from(Arc::clone(&broadcast_data))) .wrap(logger) .service(login) .service( web::scope("/api") - .wrap(auth) + .wrap(auth.clone()) .service(add_user) .service(get_user) - .service(get_user_by_name) + .service(get_by_name) .service(get_users) .service(remove_user) .service(get_playout_config) @@ -149,7 +140,13 @@ async fn main() -> std::io::Result<()> { .service(save_file) .service(import_playlist) .service(get_program) - .service(get_system_stat), + .service(get_system_stat) + .service(generate_uuid), + ) + .service( + web::scope("/data") + .service(validate_uuid) + .service(event_stream), ) .service(get_file); diff --git a/ffplayout-api/src/sse/broadcast.rs b/ffplayout-api/src/sse/broadcast.rs new file mode 100644 index 00000000..5ec9e853 --- /dev/null +++ b/ffplayout-api/src/sse/broadcast.rs @@ -0,0 +1,160 @@ +use std::{sync::Arc, time::Duration}; + +use actix_web::{rt::time::interval, web}; +use actix_web_lab::{ + sse::{self, Sse}, + util::InfallibleStream, +}; + +use ffplayout_lib::utils::PlayoutConfig; +use parking_lot::Mutex; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; + +use crate::utils::{control::media_info, system}; + +#[derive(Debug, Clone)] +struct Client { + _channel: i32, + config: PlayoutConfig, + endpoint: String, + sender: mpsc::Sender, +} + +impl Client { + fn new( + _channel: i32, + config: PlayoutConfig, + endpoint: String, + sender: mpsc::Sender, + ) -> Self { + Self { + _channel, + config, + endpoint, + sender, + } + } +} + +pub struct Broadcaster { + inner: Mutex, +} + +#[derive(Debug, Clone, Default)] +struct BroadcasterInner { + clients: Vec, +} + +impl Broadcaster { + /// Constructs new broadcaster and spawns ping loop. + pub fn create() -> Arc { + let this = Arc::new(Broadcaster { + inner: Mutex::new(BroadcasterInner::default()), + }); + + Broadcaster::spawn_ping(Arc::clone(&this)); + + this + } + + /// Pings clients every 10 seconds to see if they are alive and remove them from the broadcast + /// list if not. + fn spawn_ping(this: Arc) { + actix_web::rt::spawn(async move { + let mut interval = interval(Duration::from_secs(1)); + let mut counter = 0; + + loop { + interval.tick().await; + + if counter % 10 == 0 { + this.remove_stale_clients().await; + } + + this.broadcast_playout().await; + this.broadcast_system().await; + + counter = (counter + 1) % 61; + } + }); + } + + /// Removes all non-responsive clients from broadcast list. + async fn remove_stale_clients(&self) { + let clients = self.inner.lock().clients.clone(); + + let mut ok_clients = Vec::new(); + + for client in clients { + if client + .sender + .send(sse::Event::Comment("ping".into())) + .await + .is_ok() + { + ok_clients.push(client.clone()); + } + } + + self.inner.lock().clients = ok_clients; + } + + /// Registers client with broadcaster, returning an SSE response body. + pub async fn new_client( + &self, + channel: i32, + config: PlayoutConfig, + endpoint: String, + ) -> Sse>> { + let (tx, rx) = mpsc::channel(10); + + tx.send(sse::Data::new("connected").into()).await.unwrap(); + + self.inner + .lock() + .clients + .push(Client::new(channel, config, endpoint, tx)); + + Sse::from_infallible_receiver(rx) + } + + /// Broadcasts playout status to clients. + pub async fn broadcast_playout(&self) { + let clients = self.inner.lock().clients.clone(); + + for client in clients.iter().filter(|client| client.endpoint == "playout") { + match media_info(&client.config, "current".into()).await { + Ok(res) => { + let _ = client + .sender + .send( + sse::Data::new(res.text().await.unwrap_or_else(|_| "Success".into())) + .into(), + ) + .await; + } + Err(_) => { + let _ = client + .sender + .send(sse::Data::new("not running").into()) + .await; + } + }; + } + } + + /// Broadcasts system status to clients. + pub async fn broadcast_system(&self) { + let clients = self.inner.lock().clients.clone(); + + for client in clients { + if &client.endpoint == "system" { + if let Ok(stat) = web::block(move || system::stat(client.config.clone())).await { + let stat_string = stat.to_string(); + let _ = client.sender.send(sse::Data::new(stat_string).into()).await; + }; + } + } + } +} diff --git a/ffplayout-api/src/sse/mod.rs b/ffplayout-api/src/sse/mod.rs new file mode 100644 index 00000000..834573ea --- /dev/null +++ b/ffplayout-api/src/sse/mod.rs @@ -0,0 +1,55 @@ +use std::{ + collections::HashSet, + time::{Duration, SystemTime}, +}; + +use tokio::sync::Mutex; +use uuid::Uuid; + +use crate::utils::errors::ServiceError; + +pub mod broadcast; +pub mod routes; + +#[derive(Debug, Eq, Hash, PartialEq, Clone, Copy)] +pub struct UuidData { + pub uuid: Uuid, + pub expiration: SystemTime, +} + +impl UuidData { + pub fn new() -> Self { + Self { + uuid: Uuid::new_v4(), + expiration: SystemTime::now() + Duration::from_secs(2 * 3600), // 2 hours + } + } +} + +impl Default for UuidData { + fn default() -> Self { + Self::new() + } +} + +pub struct AuthState { + pub uuids: Mutex>, +} + +/// Remove all UUIDs from HashSet which are older the expiration time. +pub fn prune_uuids(uuids: &mut HashSet) { + uuids.retain(|entry| entry.expiration > SystemTime::now()); +} + +pub fn check_uuid(uuids: &mut HashSet, uuid: &str) -> Result<&'static str, ServiceError> { + let client_uuid = Uuid::parse_str(uuid)?; + + prune_uuids(uuids); + + match uuids.iter().find(|entry| entry.uuid == client_uuid) { + Some(_) => Ok("UUID is valid"), + None => Err(ServiceError::Unauthorized( + "Invalid or expired UUID".to_string(), + )), + } +} diff --git a/ffplayout-api/src/sse/routes.rs b/ffplayout-api/src/sse/routes.rs new file mode 100644 index 00000000..a33bf02b --- /dev/null +++ b/ffplayout-api/src/sse/routes.rs @@ -0,0 +1,82 @@ +use actix_web::{get, post, web, Responder}; +use actix_web_grants::proc_macro::protect; +use serde::{Deserialize, Serialize}; +use sqlx::{Pool, Sqlite}; + +use super::{check_uuid, prune_uuids, AuthState, UuidData}; +use crate::sse::broadcast::Broadcaster; +use crate::utils::{errors::ServiceError, playout_config, Role}; + +#[derive(Deserialize, Serialize)] +struct User { + #[serde(default, skip_serializing)] + endpoint: String, + uuid: String, +} + +impl User { + fn new(endpoint: String, uuid: String) -> Self { + Self { endpoint, uuid } + } +} + +/// **Get generated UUID** +/// +/// ```BASH +/// curl -X GET 'http://127.0.0.1:8787/api/generate-uuid' -H 'Authorization: Bearer ' +/// ``` +#[post("/generate-uuid")] +#[protect(any("Role::Admin", "Role::User"), ty = "Role")] +async fn generate_uuid(data: web::Data) -> Result { + let mut uuids = data.uuids.lock().await; + let new_uuid = UuidData::new(); + let user_auth = User::new(String::new(), new_uuid.uuid.to_string()); + + prune_uuids(&mut uuids); + + uuids.insert(new_uuid); + + Ok(web::Json(user_auth)) +} + +/// **Validate UUID** +/// +/// ```BASH +/// curl -X GET 'http://127.0.0.1:8787/data/validate?uuid=f2f8c29b-712a-48c5-8919-b535d3a05a3a' +/// ``` +#[get("/validate")] +async fn validate_uuid( + data: web::Data, + user: web::Query, +) -> Result { + let mut uuids = data.uuids.lock().await; + + match check_uuid(&mut uuids, user.uuid.as_str()) { + Ok(s) => Ok(web::Json(s)), + Err(e) => Err(e), + } +} + +/// **Connect to event handler** +/// +/// ```BASH +/// curl -X GET 'http://127.0.0.1:8787/data/event/1?endpoint=system&uuid=f2f8c29b-712a-48c5-8919-b535d3a05a3a' +/// ``` +#[get("/event/{channel}")] +async fn event_stream( + pool: web::Data>, + broadcaster: web::Data, + data: web::Data, + id: web::Path, + user: web::Query, +) -> Result { + let mut uuids = data.uuids.lock().await; + + check_uuid(&mut uuids, user.uuid.as_str())?; + + let (config, _) = playout_config(&pool.clone().into_inner(), &id).await?; + + Ok(broadcaster + .new_client(*id, config, user.endpoint.clone()) + .await) +} diff --git a/ffplayout-api/src/utils/channels.rs b/ffplayout-api/src/utils/channels.rs index e3ae52c0..3d289c3e 100644 --- a/ffplayout-api/src/utils/channels.rs +++ b/ffplayout-api/src/utils/channels.rs @@ -12,6 +12,7 @@ use crate::utils::{ use ffplayout_lib::utils::PlayoutConfig; use crate::db::{handles, models::Channel}; +use crate::utils::playout_config; pub async fn create_channel( conn: &Pool, @@ -51,15 +52,17 @@ pub async fn create_channel( serde_yaml::to_writer(file, &config).unwrap(); let new_channel = handles::insert_channel(conn, target_channel).await?; - control_service(conn, new_channel.id, &ServiceCmd::Enable, None).await?; + control_service(conn, &config, new_channel.id, &ServiceCmd::Enable, None).await?; Ok(new_channel) } pub async fn delete_channel(conn: &Pool, id: i32) -> Result<(), ServiceError> { let channel = handles::select_channel(conn, &id).await?; - control_service(conn, channel.id, &ServiceCmd::Stop, None).await?; - control_service(conn, channel.id, &ServiceCmd::Disable, None).await?; + let (config, _) = playout_config(conn, &id).await?; + + control_service(conn, &config, channel.id, &ServiceCmd::Stop, None).await?; + control_service(conn, &config, channel.id, &ServiceCmd::Disable, None).await?; if let Err(e) = fs::remove_file(channel.config_path) { error!("{e}"); diff --git a/ffplayout-api/src/utils/control.rs b/ffplayout-api/src/utils/control.rs index cef70a3d..19e62d31 100644 --- a/ffplayout-api/src/utils/control.rs +++ b/ffplayout-api/src/utils/control.rs @@ -15,8 +15,8 @@ use tokio::{ }; use crate::db::handles::select_channel; -use crate::utils::{errors::ServiceError, playout_config}; -use ffplayout_lib::vec_strings; +use crate::utils::errors::ServiceError; +use ffplayout_lib::{utils::PlayoutConfig, vec_strings}; #[derive(Debug, Deserialize, Serialize, Clone)] struct TextParams { @@ -241,11 +241,10 @@ impl SystemD { } } -async fn post_request(conn: &Pool, id: i32, obj: T) -> Result +async fn post_request(config: &PlayoutConfig, obj: T) -> Result where T: Serialize, { - let (config, _) = playout_config(conn, &id).await?; let url = format!("http://{}", config.rpc_server.address); let client = Client::new(); @@ -262,8 +261,7 @@ where } pub async fn send_message( - conn: &Pool, - id: i32, + config: &PlayoutConfig, message: HashMap, ) -> Result { let json_obj = TextParams { @@ -271,33 +269,29 @@ pub async fn send_message( message, }; - post_request(conn, id, json_obj).await + post_request(config, json_obj).await } pub async fn control_state( - conn: &Pool, - id: i32, + config: &PlayoutConfig, command: &str, ) -> Result { let json_obj = ControlParams { control: command.to_owned(), }; - post_request(conn, id, json_obj).await + post_request(config, json_obj).await } -pub async fn media_info( - conn: &Pool, - id: i32, - command: String, -) -> Result { +pub async fn media_info(config: &PlayoutConfig, command: String) -> Result { let json_obj = MediaParams { media: command }; - post_request(conn, id, json_obj).await + post_request(config, json_obj).await } pub async fn control_service( conn: &Pool, + config: &PlayoutConfig, id: i32, command: &ServiceCmd, engine: Option>, @@ -307,14 +301,14 @@ pub async fn control_service( match command { ServiceCmd::Start => en.start().await, ServiceCmd::Stop => { - if control_state(conn, id, "stop_all").await.is_ok() { + if control_state(config, "stop_all").await.is_ok() { en.stop().await } else { Err(ServiceError::NoContent("Nothing to stop".to_string())) } } ServiceCmd::Restart => { - if control_state(conn, id, "stop_all").await.is_ok() { + if control_state(config, "stop_all").await.is_ok() { en.restart().await } else { Err(ServiceError::NoContent("Nothing to restart".to_string())) diff --git a/ffplayout-api/src/utils/errors.rs b/ffplayout-api/src/utils/errors.rs index 8aa5a628..72def236 100644 --- a/ffplayout-api/src/utils/errors.rs +++ b/ffplayout-api/src/utils/errors.rs @@ -12,8 +12,8 @@ pub enum ServiceError { #[display(fmt = "Conflict: {_0}")] Conflict(String), - #[display(fmt = "Unauthorized")] - Unauthorized, + #[display(fmt = "Unauthorized: {_0}")] + Unauthorized(String), #[display(fmt = "NoContent: {_0}")] NoContent(String), @@ -31,7 +31,7 @@ impl ResponseError for ServiceError { } ServiceError::BadRequest(ref message) => HttpResponse::BadRequest().json(message), ServiceError::Conflict(ref message) => HttpResponse::Conflict().json(message), - ServiceError::Unauthorized => HttpResponse::Unauthorized().json("No Permission!"), + ServiceError::Unauthorized(ref message) => HttpResponse::Unauthorized().json(message), ServiceError::NoContent(ref message) => HttpResponse::NoContent().json(message), ServiceError::ServiceUnavailable(ref message) => { HttpResponse::ServiceUnavailable().json(message) @@ -87,3 +87,9 @@ impl From for ServiceError { ServiceError::BadRequest(err.to_string()) } } + +impl From for ServiceError { + fn from(err: uuid::Error) -> ServiceError { + ServiceError::BadRequest(err.to_string()) + } +} diff --git a/ffplayout-api/src/utils/system.rs b/ffplayout-api/src/utils/system.rs index 9cd47e15..f52f35e2 100644 --- a/ffplayout-api/src/utils/system.rs +++ b/ffplayout-api/src/utils/system.rs @@ -1,4 +1,4 @@ -// use std::cmp; +use std::fmt; use local_ip_address::list_afinet_netifas; use serde::Serialize; @@ -71,6 +71,12 @@ pub struct SystemStat { pub system: MySystem, } +impl fmt::Display for SystemStat { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{}", serde_json::to_string(self).unwrap()) + } +} + pub fn stat(config: PlayoutConfig) -> SystemStat { let mut disks = DISKS.lock().unwrap(); let mut networks = NETWORKS.lock().unwrap(); diff --git a/ffplayout-engine/src/input/playlist.rs b/ffplayout-engine/src/input/playlist.rs index 7256d8db..6d5dcda2 100644 --- a/ffplayout-engine/src/input/playlist.rs +++ b/ffplayout-engine/src/input/playlist.rs @@ -121,8 +121,6 @@ impl CurrentProgram { let node_index = self.current_node.index.unwrap_or_default(); - trace!("delta: {delta}, total_delta: {total_delta}, current index: {node_index}",); - let mut next_start = self.current_node.begin.unwrap_or_default() - self.start_sec + duration + delta; @@ -133,7 +131,7 @@ impl CurrentProgram { } trace!( - "next_start: {next_start} | end_sec: {} | source {}", + "delta: {delta} | total_delta: {total_delta}, index: {node_index} \nnext_start: {next_start} | end_sec: {} | source {}", self.end_sec, self.current_node.source ); diff --git a/ffplayout-engine/src/utils/mod.rs b/ffplayout-engine/src/utils/mod.rs index 5a464d72..678e1d8e 100644 --- a/ffplayout-engine/src/utils/mod.rs +++ b/ffplayout-engine/src/utils/mod.rs @@ -15,8 +15,8 @@ pub use arg_parse::Args; use ffplayout_lib::{ filter::Filters, utils::{ - config::Template, errors::ProcError, parse_log_level_filter, sec_to_time, time_in_seconds, - time_to_sec, Media, OutputMode::*, PlayoutConfig, PlayoutStatus, ProcessMode::*, + config::Template, errors::ProcError, parse_log_level_filter, time_in_seconds, time_to_sec, + Media, OutputMode::*, PlayoutConfig, PlayoutStatus, ProcessMode::*, }, vec_strings, }; @@ -252,7 +252,7 @@ pub fn prepare_output_cmd( /// map media struct to json object pub fn get_media_map(media: Media) -> Value { json!({ - "seek": media.seek, + "in": media.seek, "out": media.out, "duration": media.duration, "category": media.category, @@ -271,22 +271,20 @@ pub fn get_data_map( let current_time = time_in_seconds(); let shift = *playout_stat.time_shift.lock().unwrap(); let begin = media.begin.unwrap_or(0.0) - shift; + let played_time = current_time - begin; - data_map.insert("play_mode".to_string(), json!(config.processing.mode)); - data_map.insert("ingest_runs".to_string(), json!(server_is_running)); data_map.insert("index".to_string(), json!(media.index)); - data_map.insert("start_sec".to_string(), json!(begin)); - - if begin > 0.0 { - let played_time = current_time - begin; - let remaining_time = media.out - played_time; - - data_map.insert("start_time".to_string(), json!(sec_to_time(begin))); - data_map.insert("played_sec".to_string(), json!(played_time)); - data_map.insert("remaining_sec".to_string(), json!(remaining_time)); - } - - data_map.insert("current_media".to_string(), get_media_map(media)); + data_map.insert("ingest".to_string(), json!(server_is_running)); + data_map.insert("mode".to_string(), json!(config.processing.mode)); + data_map.insert( + "shift".to_string(), + json!((shift * 1000.0).round() / 1000.0), + ); + data_map.insert( + "elapsed".to_string(), + json!((played_time * 1000.0).round() / 1000.0), + ); + data_map.insert("media".to_string(), get_media_map(media)); data_map } diff --git a/ffplayout-frontend b/ffplayout-frontend index 52411f61..6111c268 160000 --- a/ffplayout-frontend +++ b/ffplayout-frontend @@ -1 +1 @@ -Subproject commit 52411f61ef25a1b11129a77d26d987f44c6ea543 +Subproject commit 6111c2686d14b3bf33a4c0b29c85672f7e4f4399