diff --git a/Cargo.lock b/Cargo.lock index 60da8ec6..8cd55bb8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -267,6 +267,55 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "actix-web-lab" +version = "0.20.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7675c1a84eec1b179c844cdea8488e3e409d8e4984026e92fa96c87dd86f33c6" +dependencies = [ + "actix-http", + "actix-router", + "actix-service", + "actix-utils", + "actix-web", + "actix-web-lab-derive", + "ahash", + "arc-swap", + "async-trait", + "bytes", + "bytestring", + "csv", + "derive_more", + "futures-core", + "futures-util", + "http 0.2.12", + "impl-more", + "itertools", + "local-channel", + "mediatype", + "mime", + "once_cell", + "pin-project-lite", + "regex", + "serde", + "serde_html_form", + "serde_json", + "tokio", + "tokio-stream", + "tracing", +] + +[[package]] +name = "actix-web-lab-derive" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9aa0b287c8de4a76b691f29dbb5451e8dd5b79d777eaf87350c9b0cbfdb5e968" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.60", +] + [[package]] name = "actix-web-static-files" version = "4.0.1" @@ -406,6 +455,12 @@ version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "70033777eb8b5124a81a1889416543dddef2de240019b674c81285a2635a7e1e" +[[package]] +name = "arc-swap" +version = "1.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" + [[package]] name = "argon2" version = "0.5.3" @@ -981,6 +1036,27 @@ dependencies = [ "typenum", ] +[[package]] +name = "csv" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac574ff4d437a7b5ad237ef331c17ccca63c46479e5b5453eb8e10bb99a759fe" +dependencies = [ + "csv-core", + "itoa", + "ryu", + "serde", +] + +[[package]] +name = "csv-core" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5efa2b3d7902f4b634a20cae3c9c4e6209dc4779feb6863329607560143efa70" +dependencies = [ + "memchr", +] + [[package]] name = "darling" version = "0.20.8" @@ -1246,6 +1322,7 @@ dependencies = [ "actix-web", "actix-web-grants", "actix-web-httpauth", + "actix-web-lab", "actix-web-static-files", "argon2", "chrono", @@ -1259,6 +1336,7 @@ dependencies = [ "lexical-sort", "local-ip-address", "once_cell", + "parking_lot", "path-clean", "rand", "regex", @@ -1274,6 +1352,7 @@ dependencies = [ "static-files", "sysinfo", "tokio", + "tokio-stream", "uuid", ] @@ -1822,6 +1901,12 @@ dependencies = [ "unicode-normalization", ] +[[package]] +name = "impl-more" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "206ca75c9c03ba3d4ace2460e57b189f39f43de612c2f85836e65c929701bb2d" + [[package]] name = "indexmap" version = "2.2.6" @@ -2097,6 +2182,12 @@ dependencies = [ "digest", ] +[[package]] +name = "mediatype" +version = "0.19.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8878cd8d1b3c8c8ae4b2ba0a36652b7cf192f618a599a7fbdfa25cffd4ea72dd" + [[package]] name = "memchr" version = "2.7.2" @@ -2944,6 +3035,19 @@ dependencies = [ "syn 2.0.60", ] +[[package]] +name = "serde_html_form" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8de514ef58196f1fc96dcaef80fe6170a1ce6215df9687a93fe8300e773fefc5" +dependencies = [ + "form_urlencoded", + "indexmap", + "itoa", + "ryu", + "serde", +] + [[package]] name = "serde_json" version = "1.0.116" diff --git a/ffplayout-api/Cargo.toml b/ffplayout-api/Cargo.toml index e6658834..10328234 100644 --- a/ffplayout-api/Cargo.toml +++ b/ffplayout-api/Cargo.toml @@ -19,6 +19,7 @@ actix-multipart = "0.6" actix-web = "4" actix-web-grants = "4" actix-web-httpauth = "0.8" +actix-web-lab = "0.20" actix-web-static-files = "4.0" argon2 = "0.5" chrono = { version = "0.4", default-features = false, features = ["clock", "std"] } @@ -31,6 +32,7 @@ lazy_static = "1.4" lexical-sort = "0.3" local-ip-address = "0.6" once_cell = "1.18" +parking_lot = "0.12" path-clean = "1.0" rand = "0.8" regex = "1" @@ -46,6 +48,7 @@ static-files = "0.2" sysinfo ={ version = "0.30", features = ["linux-netdevs"] } sqlx = { version = "0.7", features = ["runtime-tokio", "sqlite"] } tokio = { version = "1.29", features = ["full"] } +tokio-stream = "0.1" uuid = "1.8" [build-dependencies] diff --git a/ffplayout-api/examples/index.html b/ffplayout-api/examples/index.html new file mode 100644 index 00000000..ef87ab39 --- /dev/null +++ b/ffplayout-api/examples/index.html @@ -0,0 +1,28 @@ + + + + + + + Server-sent events + + + +
+ + + diff --git a/ffplayout-api/examples/sse.rs b/ffplayout-api/examples/sse.rs new file mode 100644 index 00000000..573bdce8 --- /dev/null +++ b/ffplayout-api/examples/sse.rs @@ -0,0 +1,57 @@ +/// https://github.com/actix/examples/tree/master/server-sent-events +/// +use std::{io, sync::Arc}; + +use actix_web::{get, middleware::Logger, post, web, App, HttpResponse, HttpServer, Responder}; +use actix_web_lab::{extract::Path, respond::Html}; + +use simplelog::*; + +use ffplayout_api::sse::broadcast::Broadcaster; + +use ffplayout_lib::utils::{init_logging, PlayoutConfig}; + +#[actix_web::main] +async fn main() -> io::Result<()> { + let mut config = PlayoutConfig::new(None, None); + config.mail.recipient = String::new(); + config.logging.log_to_file = false; + config.logging.timestamp = false; + + let logging = init_logging(&config, None, None); + CombinedLogger::init(logging).unwrap(); + + let data = Broadcaster::create(); + + HttpServer::new(move || { + App::new() + .app_data(web::Data::from(Arc::clone(&data))) + .service(index) + .service(event_stream) + .service(broadcast_msg) + .wrap(Logger::default()) + }) + .bind(("127.0.0.1", 8080))? + .workers(2) + .run() + .await +} + +#[get("/")] +async fn index() -> impl Responder { + Html(include_str!("index.html").to_owned()) +} + +#[get("/events")] +async fn event_stream(broadcaster: web::Data) -> impl Responder { + broadcaster.new_client().await +} + +#[post("/broadcast/{msg}")] +async fn broadcast_msg( + broadcaster: web::Data, + Path((msg,)): Path<(String,)>, +) -> impl Responder { + broadcaster.broadcast(&msg).await; + HttpResponse::Ok().body("msg sent") +} diff --git a/ffplayout-api/src/api/routes.rs b/ffplayout-api/src/api/routes.rs index 91fbea4a..570d6fcd 100644 --- a/ffplayout-api/src/api/routes.rs +++ b/ffplayout-api/src/api/routes.rs @@ -246,7 +246,7 @@ async fn get_user( /// ``` #[get("/user/{name}")] #[protect("Role::Admin", ty = "Role")] -async fn get_user_by_name( +async fn get_by_name( pool: web::Data>, name: web::Path, ) -> Result { @@ -326,7 +326,7 @@ async fn update_user( return Err(ServiceError::InternalServerError); } - Err(ServiceError::Unauthorized) + Err(ServiceError::Unauthorized("No Permission".to_string())) } /// **Add User** diff --git a/ffplayout-api/src/lib.rs b/ffplayout-api/src/lib.rs new file mode 100644 index 00000000..3809055f --- /dev/null +++ b/ffplayout-api/src/lib.rs @@ -0,0 +1,21 @@ +use std::sync::{Arc, Mutex}; + +use clap::Parser; +use lazy_static::lazy_static; +use sysinfo::{Disks, Networks, System}; + +pub mod api; +pub mod db; +pub mod sse; +pub mod utils; + +use utils::args_parse::Args; + +lazy_static! { + pub static ref ARGS: Args = Args::parse(); + pub static ref DISKS: Arc> = + Arc::new(Mutex::new(Disks::new_with_refreshed_list())); + pub static ref NETWORKS: Arc> = + Arc::new(Mutex::new(Networks::new_with_refreshed_list())); + pub static ref SYS: Arc> = Arc::new(Mutex::new(System::new_all())); +} diff --git a/ffplayout-api/src/main.rs b/ffplayout-api/src/main.rs index 769cdffc..89fb70b5 100644 --- a/ffplayout-api/src/main.rs +++ b/ffplayout-api/src/main.rs @@ -1,8 +1,4 @@ -use std::{ - env, - process::exit, - sync::{Arc, Mutex}, -}; +use std::{collections::HashSet, env, process::exit, sync::Mutex}; use actix_files::Files; use actix_web::{ @@ -14,37 +10,25 @@ use actix_web_httpauth::{extractors::bearer::BearerAuth, middleware::HttpAuthent #[cfg(all(not(debug_assertions), feature = "embed_frontend"))] use actix_web_static_files::ResourceFiles; -use clap::Parser; -use lazy_static::lazy_static; use path_clean::PathClean; use simplelog::*; -use sysinfo::{Disks, Networks, System}; -pub mod api; -pub mod db; -pub mod utils; - -use api::{auth, routes::*}; -use db::{db_pool, models::LoginUser}; -use utils::{args_parse::Args, control::ProcessControl, db_path, init_config, run_args}; +use ffplayout_api::{ + api::{auth, routes::*}, + db::{db_pool, models::LoginUser}, + sse::{routes::*, AuthState}, + utils::{control::ProcessControl, db_path, init_config, run_args}, + ARGS, +}; #[cfg(any(debug_assertions, not(feature = "embed_frontend")))] -use utils::public_path; +use ffplayout_api::utils::public_path; use ffplayout_lib::utils::{init_logging, PlayoutConfig}; #[cfg(all(not(debug_assertions), feature = "embed_frontend"))] include!(concat!(env!("OUT_DIR"), "/generated.rs")); -lazy_static! { - pub static ref ARGS: Args = Args::parse(); - pub static ref DISKS: Arc> = - Arc::new(Mutex::new(Disks::new_with_refreshed_list())); - pub static ref NETWORKS: Arc> = - Arc::new(Mutex::new(Networks::new_with_refreshed_list())); - pub static ref SYS: Arc> = Arc::new(Mutex::new(System::new_all())); -} - async fn validator( req: ServiceRequest, credentials: BearerAuth, @@ -95,6 +79,9 @@ async fn main() -> std::io::Result<()> { let addr = ip_port[0]; let port = ip_port[1].parse::().unwrap(); let engine_process = web::Data::new(ProcessControl::new()); + let auth_state = web::Data::new(AuthState { + uuids: Mutex::new(HashSet::new()), + }); info!("running ffplayout API, listen on http://{conn}"); @@ -109,14 +96,15 @@ async fn main() -> std::io::Result<()> { let mut web_app = App::new() .app_data(db_pool) .app_data(engine_process.clone()) + .app_data(auth_state.clone()) .wrap(logger) .service(login) .service( web::scope("/api") - .wrap(auth) + .wrap(auth.clone()) .service(add_user) .service(get_user) - .service(get_user_by_name) + .service(get_by_name) .service(get_users) .service(remove_user) .service(get_playout_config) @@ -149,8 +137,10 @@ async fn main() -> std::io::Result<()> { .service(save_file) .service(import_playlist) .service(get_program) - .service(get_system_stat), + .service(get_system_stat) + .service(generate_uuid), ) + .service(web::scope("/data").service(validate_uuid)) .service(get_file); if let Some(public) = &ARGS.public { diff --git a/ffplayout-api/src/sse/broadcast.rs b/ffplayout-api/src/sse/broadcast.rs new file mode 100644 index 00000000..25f216f2 --- /dev/null +++ b/ffplayout-api/src/sse/broadcast.rs @@ -0,0 +1,89 @@ +use std::{sync::Arc, time::Duration}; + +use actix_web::rt::time::interval; +use actix_web_lab::{ + sse::{self, Sse}, + util::InfallibleStream, +}; +use futures_util::future; +use parking_lot::Mutex; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; + +pub struct Broadcaster { + inner: Mutex, +} + +#[derive(Debug, Clone, Default)] +struct BroadcasterInner { + clients: Vec>, +} + +impl Broadcaster { + /// Constructs new broadcaster and spawns ping loop. + pub fn create() -> Arc { + let this = Arc::new(Broadcaster { + inner: Mutex::new(BroadcasterInner::default()), + }); + + Broadcaster::spawn_ping(Arc::clone(&this)); + + this + } + + /// Pings clients every 10 seconds to see if they are alive and remove them from the broadcast + /// list if not. + fn spawn_ping(this: Arc) { + actix_web::rt::spawn(async move { + let mut interval = interval(Duration::from_secs(10)); + + loop { + interval.tick().await; + this.remove_stale_clients().await; + } + }); + } + + /// Removes all non-responsive clients from broadcast list. + async fn remove_stale_clients(&self) { + let clients = self.inner.lock().clients.clone(); + + let mut ok_clients = Vec::new(); + + for client in clients { + if client + .send(sse::Event::Comment("ping".into())) + .await + .is_ok() + { + ok_clients.push(client.clone()); + } + } + + self.inner.lock().clients = ok_clients; + } + + /// Registers client with broadcaster, returning an SSE response body. + pub async fn new_client(&self) -> Sse>> { + let (tx, rx) = mpsc::channel(10); + + tx.send(sse::Data::new("connected").into()).await.unwrap(); + + self.inner.lock().clients.push(tx); + + Sse::from_infallible_receiver(rx) + } + + /// Broadcasts `msg` to all clients. + pub async fn broadcast(&self, msg: &str) { + let clients = self.inner.lock().clients.clone(); + + let send_futures = clients + .iter() + .map(|client| client.send(sse::Data::new(msg).into())); + + // try to send to all clients, ignoring failures + // disconnected clients will get swept up by `remove_stale_clients` + let _ = future::join_all(send_futures).await; + } +} diff --git a/ffplayout-api/src/sse/mod.rs b/ffplayout-api/src/sse/mod.rs new file mode 100644 index 00000000..d908d058 --- /dev/null +++ b/ffplayout-api/src/sse/mod.rs @@ -0,0 +1,48 @@ +use std::{ + collections::HashSet, + sync::Mutex, + time::{Duration, SystemTime}, +}; + +use uuid::Uuid; + +use crate::utils::errors::ServiceError; + +pub mod broadcast; +pub mod routes; + +#[derive(Debug, Eq, Hash, PartialEq, Clone, Copy)] +pub struct UuidData { + pub uuid: Uuid, + pub expiration: SystemTime, +} + +impl UuidData { + pub fn new() -> Self { + Self { + uuid: Uuid::new_v4(), + expiration: SystemTime::now() + Duration::from_secs(12 * 3600), // 12 hours + } + } +} + +pub struct AuthState { + pub uuids: Mutex>, +} + +pub fn prune_uuids(uuids: &mut HashSet) { + uuids.retain(|entry| entry.expiration > SystemTime::now()); +} + +pub fn check_uuid(uuids: &mut HashSet, uuid: &str) -> Result<&'static str, ServiceError> { + let client_uuid = Uuid::parse_str(uuid)?; + + prune_uuids(uuids); + + match uuids.iter().find(|entry| entry.uuid == client_uuid) { + Some(_) => Ok("UUID is valid"), + None => Err(ServiceError::Unauthorized( + "Invalid or expired UUID".to_string(), + )), + } +} diff --git a/ffplayout-api/src/sse/routes.rs b/ffplayout-api/src/sse/routes.rs new file mode 100644 index 00000000..59cda62c --- /dev/null +++ b/ffplayout-api/src/sse/routes.rs @@ -0,0 +1,54 @@ +use actix_web::{get, post, web, Responder}; +use actix_web_grants::proc_macro::protect; +use serde::{Deserialize, Serialize}; + +use super::{check_uuid, prune_uuids, AuthState, UuidData}; +use crate::utils::{errors::ServiceError, Role}; + +#[derive(Deserialize, Serialize)] +struct User { + uuid: String, +} + +impl User { + fn new(uuid: String) -> Self { + Self { uuid } + } +} + +/// **Get generated UUID** +/// +/// ```BASH +/// curl -X GET 'http://127.0.0.1:8787/api/generate-uuid' -H 'Authorization: Bearer ' +/// ``` +#[post("/generate-uuid")] +#[protect(any("Role::Admin", "Role::User"), ty = "Role")] +async fn generate_uuid(data: web::Data) -> Result { + let mut uuids = data.uuids.lock().map_err(|e| e.to_string())?; + let new_uuid = UuidData::new(); + let user_auth = User::new(new_uuid.uuid.to_string()); + + prune_uuids(&mut uuids); + + uuids.insert(new_uuid); + + Ok(web::Json(user_auth)) +} + +/// **Validate UUID** +/// +/// ```BASH +/// curl -X GET 'http://127.0.0.1:8787/data/validate?uuid=f2f8c29b-712a-48c5-8919-b535d3a05a3a' +/// ``` +#[get("/validate")] +async fn validate_uuid( + data: web::Data, + user: web::Query, +) -> Result { + let mut uuids = data.uuids.lock().map_err(|e| e.to_string())?; + + match check_uuid(&mut uuids, user.uuid.as_str()) { + Ok(s) => Ok(web::Json(s)), + Err(e) => Err(e), + } +} diff --git a/ffplayout-api/src/utils/errors.rs b/ffplayout-api/src/utils/errors.rs index 8aa5a628..72def236 100644 --- a/ffplayout-api/src/utils/errors.rs +++ b/ffplayout-api/src/utils/errors.rs @@ -12,8 +12,8 @@ pub enum ServiceError { #[display(fmt = "Conflict: {_0}")] Conflict(String), - #[display(fmt = "Unauthorized")] - Unauthorized, + #[display(fmt = "Unauthorized: {_0}")] + Unauthorized(String), #[display(fmt = "NoContent: {_0}")] NoContent(String), @@ -31,7 +31,7 @@ impl ResponseError for ServiceError { } ServiceError::BadRequest(ref message) => HttpResponse::BadRequest().json(message), ServiceError::Conflict(ref message) => HttpResponse::Conflict().json(message), - ServiceError::Unauthorized => HttpResponse::Unauthorized().json("No Permission!"), + ServiceError::Unauthorized(ref message) => HttpResponse::Unauthorized().json(message), ServiceError::NoContent(ref message) => HttpResponse::NoContent().json(message), ServiceError::ServiceUnavailable(ref message) => { HttpResponse::ServiceUnavailable().json(message) @@ -87,3 +87,9 @@ impl From for ServiceError { ServiceError::BadRequest(err.to_string()) } } + +impl From for ServiceError { + fn from(err: uuid::Error) -> ServiceError { + ServiceError::BadRequest(err.to_string()) + } +}