From b4004d7d8b335871fa2dfe78161ae38eaad8e4de Mon Sep 17 00:00:00 2001 From: jb-alvarado Date: Mon, 20 Feb 2023 22:12:33 +0100 Subject: [PATCH] use async process::Command --- Cargo.lock | 80 +++++++++++++++++++++++++++++- ffplayout-api/Cargo.toml | 3 +- ffplayout-api/src/main.rs | 4 +- ffplayout-api/src/utils/control.rs | 73 +++++++++++++++------------ ffplayout-engine/src/main.rs | 9 ++-- lib/src/utils/controller.rs | 17 ++++--- 6 files changed, 139 insertions(+), 47 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a2a73c5b..9df30370 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1019,6 +1019,7 @@ dependencies = [ "serde_yaml", "simplelog", "sqlx", + "tokio", ] [[package]] @@ -2415,6 +2416,27 @@ dependencies = [ "semver", ] +[[package]] +name = "rustls" +version = "0.20.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fff78fc74d175294f4e83b28343315ffcfb114b156f0185e9741cb5570f50e2f" +dependencies = [ + "log", + "ring", + "sct", + "webpki", +] + +[[package]] +name = "rustls-pemfile" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d194b56d58803a43635bdc398cd17e383d6f71f9182b9a192c127ca42494a59b" +dependencies = [ + "base64 0.21.0", +] + [[package]] name = "ryu" version = "1.0.12" @@ -2461,6 +2483,16 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ddccb15bcce173023b3fedd9436f882a0739b8dfb45e4f6b6002bee5929f61b2" +[[package]] +name = "sct" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d53dcdb7c9f8158937a7981b48accfd39a43af418591a5d008c7b22b5e1b7ca4" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "security-framework" version = "2.8.2" @@ -2724,6 +2756,8 @@ dependencies = [ "once_cell", "paste", "percent-encoding", + "rustls", + "rustls-pemfile", "sha2", "smallvec", "sqlformat", @@ -2732,6 +2766,7 @@ dependencies = [ "thiserror", "tokio-stream", "url", + "webpki-roots", ] [[package]] @@ -2759,10 +2794,9 @@ version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "24c5b2d25fa654cc5f841750b8e1cdedbe21189bf9a9382ee90bfa9dd3562396" dependencies = [ - "native-tls", "once_cell", "tokio", - "tokio-native-tls", + "tokio-rustls", ] [[package]] @@ -2945,9 +2979,21 @@ dependencies = [ "pin-project-lite", "signal-hook-registry", "socket2", + "tokio-macros", "windows-sys 0.42.0", ] +[[package]] +name = "tokio-macros" +version = "1.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d266c00fde287f55d3f1c3e96c500c362a2b8c695076ec180f27918820bc6df8" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "tokio-native-tls" version = "0.3.1" @@ -2958,6 +3004,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-rustls" +version = "0.23.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c43ee83903113e03984cb9e5cebe6c04a5116269e900e3ddba8f068a62adda59" +dependencies = [ + "rustls", + "tokio", + "webpki", +] + [[package]] name = "tokio-stream" version = "0.1.11" @@ -3253,6 +3310,25 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "webpki" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f095d78192e208183081cc07bc5515ef55216397af48b873e5edcd72637fa1bd" +dependencies = [ + "ring", + "untrusted", +] + +[[package]] +name = "webpki-roots" +version = "0.22.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6c71e40d7d2c34a5106301fb632274ca37242cd0c9d3e64dbece371a40a2d87" +dependencies = [ + "webpki", +] + [[package]] name = "wepoll-ffi" version = "0.1.2" diff --git a/ffplayout-api/Cargo.toml b/ffplayout-api/Cargo.toml index 365e7ba7..d068f67f 100644 --- a/ffplayout-api/Cargo.toml +++ b/ffplayout-api/Cargo.toml @@ -34,7 +34,8 @@ serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" serde_yaml = "0.9" simplelog = { version = "^0.12", features = ["paris"] } -sqlx = { version = "0.6", features = ["runtime-actix-native-tls", "sqlite"] } +sqlx = { version = "0.6", features = ["runtime-tokio-rustls", "sqlite"] } +tokio = { version = "1.25", features = ["full"] } [[bin]] name = "ffpapi" diff --git a/ffplayout-api/src/main.rs b/ffplayout-api/src/main.rs index 0cfd71d5..a0dab153 100644 --- a/ffplayout-api/src/main.rs +++ b/ffplayout-api/src/main.rs @@ -87,6 +87,7 @@ async fn main() -> std::io::Result<()> { let ip_port = conn.split(':').collect::>(); let addr = ip_port[0]; let port = ip_port[1].parse::().unwrap(); + let engine_process = web::Data::new(ProcessControl::new()); info!("running ffplayout API, listen on {conn}"); @@ -94,11 +95,10 @@ async fn main() -> std::io::Result<()> { HttpServer::new(move || { let auth = HttpAuthentication::bearer(validator); let db_pool = web::Data::new(pool.clone()); - let engine_process = web::Data::new(ProcessControl::new()); App::new() .app_data(db_pool) - .app_data(engine_process) + .app_data(engine_process.clone()) .wrap(middleware::Logger::default()) .service(login) .service( diff --git a/ffplayout-api/src/utils/control.rs b/ffplayout-api/src/utils/control.rs index 0803fac0..63fcac05 100644 --- a/ffplayout-api/src/utils/control.rs +++ b/ffplayout-api/src/utils/control.rs @@ -1,13 +1,8 @@ use std::{ collections::HashMap, env, fmt, - process::Child, - process::Command, str::FromStr, - sync::{ - atomic::{AtomicBool, Ordering}, - Mutex, - }, + sync::atomic::{AtomicBool, Ordering}, }; use actix_web::web; @@ -17,6 +12,10 @@ use reqwest::{ }; use serde::{Deserialize, Serialize}; use sqlx::{Pool, Sqlite}; +use tokio::{ + process::{Child, Command}, + sync::Mutex, +}; use crate::db::handles::select_channel; use crate::utils::{errors::ServiceError, playout_config}; @@ -60,25 +59,39 @@ impl RpcObj { /// ffplayout engine process /// /// When running not on Linux, or with environment variable `PIGGYBACK_MODE=true`, -/// the engine get startet and conntrolled from ffpapi +/// the engine get startet and controlled from ffpapi pub struct ProcessControl { pub engine_child: Mutex>, pub is_running: AtomicBool, + pub piggyback: AtomicBool, } impl ProcessControl { pub fn new() -> Self { + let piggyback = if env::consts::OS != "linux" || env::var("PIGGYBACK_MODE").is_ok() { + AtomicBool::new(true) + } else { + AtomicBool::new(false) + }; + Self { engine_child: Mutex::new(None), is_running: AtomicBool::new(false), + piggyback, } } } impl ProcessControl { - pub fn start(&self) -> Result { - match Command::new("ffplayout").spawn() { - Ok(proc) => *self.engine_child.lock().unwrap() = Some(proc), + pub async fn start(&self) -> Result { + #[cfg(not(debug_assertions))] + let engine_path = "ffplayout"; + + #[cfg(debug_assertions)] + let engine_path = "./target/debug/ffplayout"; + + match Command::new(engine_path).kill_on_drop(true).spawn() { + Ok(proc) => *self.engine_child.lock().await = Some(proc), Err(_) => return Err(ServiceError::InternalServerError), }; @@ -87,26 +100,22 @@ impl ProcessControl { Ok("Success".to_string()) } - pub fn kill(&self) -> Result { - if let Some(proc) = self.engine_child.lock().unwrap().as_mut() { - if proc.kill().is_err() { + pub async fn kill(&self) -> Result { + if let Some(proc) = self.engine_child.lock().await.as_mut() { + if proc.kill().await.is_err() { return Err(ServiceError::InternalServerError); }; } - self.wait()?; + self.wait().await?; self.is_running.store(false, Ordering::SeqCst); Ok("Success".to_string()) } - pub fn restart(&self) -> Result { - self.kill()?; - - match Command::new("ffplayout").spawn() { - Ok(proc) => *self.engine_child.lock().unwrap() = Some(proc), - Err(_) => return Err(ServiceError::InternalServerError), - }; + pub async fn restart(&self) -> Result { + self.kill().await?; + self.start().await?; self.is_running.store(true, Ordering::SeqCst); @@ -115,9 +124,9 @@ impl ProcessControl { /// Wait for process to proper close. /// This prevents orphaned/zombi processes in system - pub fn wait(&self) -> Result { - if let Some(proc) = self.engine_child.lock().unwrap().as_mut() { - if proc.wait().is_err() { + pub async fn wait(&self) -> Result { + if let Some(proc) = self.engine_child.lock().await.as_mut() { + if proc.wait().await.is_err() { return Err(ServiceError::InternalServerError); }; } @@ -244,11 +253,11 @@ impl SystemD { Ok("Success".to_string()) } - fn status(mut self) -> Result { + async fn status(mut self) -> Result { self.cmd .append(&mut vec!["is-active".to_string(), self.service]); - let output = Command::new("sudo").args(self.cmd).output()?; + let output = Command::new("sudo").args(self.cmd).output().await?; Ok(String::from_utf8_lossy(&output.stdout).trim().to_string()) } @@ -338,13 +347,11 @@ pub async fn control_service( command: &ServiceCmd, engine: Option>, ) -> Result { - let piggyback = env::var("PIGGYBACK_MODE"); - - if (env::consts::OS != "linux" || piggyback.is_ok()) && engine.is_some() { + if engine.is_some() && engine.as_ref().unwrap().piggyback.load(Ordering::SeqCst) { match command { - ServiceCmd::Start => engine.unwrap().start(), - ServiceCmd::Stop => engine.unwrap().kill(), - ServiceCmd::Restart => engine.unwrap().restart(), + ServiceCmd::Start => engine.unwrap().start().await, + ServiceCmd::Stop => engine.unwrap().kill().await, + ServiceCmd::Restart => engine.unwrap().restart().await, ServiceCmd::Status => engine.unwrap().status(), _ => Err(ServiceError::Conflict( "Engine runs in piggyback mode, in this mode this command is not allowed." @@ -360,7 +367,7 @@ pub async fn control_service( ServiceCmd::Start => system_d.start(), ServiceCmd::Stop => system_d.stop(), ServiceCmd::Restart => system_d.restart(), - ServiceCmd::Status => system_d.status(), + ServiceCmd::Status => system_d.status().await, } } } diff --git a/ffplayout-engine/src/main.rs b/ffplayout-engine/src/main.rs index 48922a66..567eb408 100644 --- a/ffplayout-engine/src/main.rs +++ b/ffplayout-engine/src/main.rs @@ -81,7 +81,7 @@ fn fake_time(args: &Args) { } } -fn main() { +fn main() -> Result<(), ()> { let args = get_args(); // use fake time function only in debugging mode @@ -153,9 +153,9 @@ fn main() { match config.out.mode { // write files/playlist to HLS m3u8 playlist - HLS => write_hls(&config, play_control, playout_stat, proc_control), + HLS => write_hls(&config, play_control, playout_stat, proc_control.clone()), // play on desktop or stream to a remote target - _ => player(&config, play_control, playout_stat, proc_control), + _ => player(&config, play_control, playout_stat, proc_control.clone()), } info!("Playout done..."); @@ -167,4 +167,7 @@ fn main() { } drop(msg); + drop(proc_control); + + Ok(()) } diff --git a/lib/src/utils/controller.rs b/lib/src/utils/controller.rs index 19315f10..20278452 100644 --- a/lib/src/utils/controller.rs +++ b/lib/src/utils/controller.rs @@ -146,22 +146,27 @@ impl ProcessControl { rpc.clone().close() }; - for unit in [Decoder, Encoder, Ingest] { + for unit in [Encoder, Decoder, Ingest] { if let Err(e) = self.kill(unit) { if !e.contains("exited process") { error!("{e}") } } + if let Err(e) = self.wait(unit) { + if !e.contains("exited process") { + error!("{e}") + } + } } } } } -// impl Drop for ProcessControl { -// fn drop(&mut self) { -// self.kill_all() -// } -// } +impl Drop for ProcessControl { + fn drop(&mut self) { + self.kill_all() + } +} /// Global player control, to get infos about current clip etc. #[derive(Clone)]