diff --git a/Cargo.lock b/Cargo.lock index ddfb37e9..a2a73c5b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -971,7 +971,7 @@ dependencies = [ [[package]] name = "ffplayout" -version = "0.17.0-beta4" +version = "0.17.0-beta5" dependencies = [ "chrono", "clap", @@ -991,7 +991,7 @@ dependencies = [ [[package]] name = "ffplayout-api" -version = "0.17.0-beta4" +version = "0.17.0-beta5" dependencies = [ "actix-files", "actix-multipart", @@ -1023,7 +1023,7 @@ dependencies = [ [[package]] name = "ffplayout-lib" -version = "0.17.0-beta4" +version = "0.17.0-beta5" dependencies = [ "chrono", "crossbeam-channel", @@ -2549,8 +2549,7 @@ dependencies = [ [[package]] name = "serial_test" version = "1.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "538c30747ae860d6fb88330addbbd3e0ddbe46d662d032855596d8a8ca260611" +source = "git+https://github.com/palfrey/serial_test.git?branch=remove-ignore-ignore#fc0497c8e8f08c2fc5c50ffab40a8f51d7c1effe" dependencies = [ "dashmap", "futures", @@ -2563,8 +2562,7 @@ dependencies = [ [[package]] name = "serial_test_derive" version = "1.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "079a83df15f85d89a68d64ae1238f142f172b1fa915d0d76b26a7cba1b659a69" +source = "git+https://github.com/palfrey/serial_test.git?branch=remove-ignore-ignore#fc0497c8e8f08c2fc5c50ffab40a8f51d7c1effe" dependencies = [ "proc-macro2", "quote", @@ -2825,7 +2823,7 @@ dependencies = [ [[package]] name = "tests" -version = "0.17.0-beta4" +version = "0.17.0-beta5" dependencies = [ "chrono", "crossbeam-channel", diff --git a/Cargo.toml b/Cargo.toml index e2f30b35..20ba2dde 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,7 @@ members = ["ffplayout-api", "ffplayout-engine", "lib", "tests"] default-members = ["ffplayout-api", "ffplayout-engine", "tests"] [workspace.package] -version = "0.17.0-beta4" +version = "0.17.0-beta5" license = "GPL-3.0" repository = "https://github.com/ffplayout/ffplayout" authors = ["Jonathan Baecker "] diff --git a/ffplayout-api/src/api/routes.rs b/ffplayout-api/src/api/routes.rs index bd6967b3..9215c710 100644 --- a/ffplayout-api/src/api/routes.rs +++ b/ffplayout-api/src/api/routes.rs @@ -23,14 +23,13 @@ use serde::{Deserialize, Serialize}; use simplelog::*; use sqlx::{Pool, Sqlite}; -use crate::auth::{create_jwt, Claims}; use crate::db::{ handles, models::{Channel, LoginUser, TextPreset, User}, }; use crate::utils::{ channels::{create_channel, delete_channel}, - control::{control_service, control_state, media_info, send_message, Process}, + control::{control_service, control_state, media_info, send_message, ControlParams, Process}, errors::ServiceError, files::{ browser, create_directory, norm_abs_path, remove_file_or_folder, rename_file, upload, @@ -40,6 +39,10 @@ use crate::utils::{ playlist::{delete_playlist, generate_playlist, read_playlist, write_playlist}, playout_config, read_log_file, read_playout_config, Role, }; +use crate::{ + auth::{create_jwt, Claims}, + utils::control::ProcessControl, +}; use ffplayout_lib::{ utils::{ get_date_range, import::import_file, sec_to_time, time_to_sec, JsonPlaylist, PlayoutConfig, @@ -586,9 +589,9 @@ pub async fn send_text_message( pub async fn control_playout( pool: web::Data>, id: web::Path, - control: web::Json, + control: web::Json, ) -> Result { - match control_state(&pool.into_inner(), *id, control.command.clone()).await { + match control_state(&pool.into_inner(), *id, &control.control).await { Ok(res) => Ok(res.text().await.unwrap_or_else(|_| "Success".into())), Err(e) => Err(e), } @@ -690,8 +693,9 @@ pub async fn process_control( pool: web::Data>, id: web::Path, proc: web::Json, + engine_process: web::Data, ) -> Result { - control_service(&pool.into_inner(), *id, &proc.command).await + control_service(&pool.into_inner(), *id, &proc.command, Some(engine_process)).await } /// #### ffplayout Playlist Operations diff --git a/ffplayout-api/src/main.rs b/ffplayout-api/src/main.rs index 0fc9775f..0cfd71d5 100644 --- a/ffplayout-api/src/main.rs +++ b/ffplayout-api/src/main.rs @@ -25,7 +25,7 @@ use api::{ }, }; use db::{db_pool, models::LoginUser}; -use utils::{args_parse::Args, db_path, init_config, run_args, Role}; +use utils::{args_parse::Args, control::ProcessControl, db_path, init_config, run_args, Role}; use ffplayout_lib::utils::{init_logging, PlayoutConfig}; @@ -94,9 +94,11 @@ async fn main() -> std::io::Result<()> { HttpServer::new(move || { let auth = HttpAuthentication::bearer(validator); let db_pool = web::Data::new(pool.clone()); + let engine_process = web::Data::new(ProcessControl::new()); App::new() .app_data(db_pool) + .app_data(engine_process) .wrap(middleware::Logger::default()) .service(login) .service( diff --git a/ffplayout-api/src/utils/channels.rs b/ffplayout-api/src/utils/channels.rs index 3165fcc7..99c72e4c 100644 --- a/ffplayout-api/src/utils/channels.rs +++ b/ffplayout-api/src/utils/channels.rs @@ -3,7 +3,10 @@ use std::fs; use simplelog::*; use sqlx::{Pool, Sqlite}; -use crate::utils::{control::control_service, errors::ServiceError}; +use crate::utils::{ + control::{control_service, ServiceCmd}, + errors::ServiceError, +}; use crate::db::{handles, models::Channel}; @@ -25,15 +28,15 @@ pub async fn create_channel( )?; let new_channel = handles::insert_channel(conn, target_channel).await?; - control_service(conn, new_channel.id, "enable").await?; + control_service(conn, new_channel.id, &ServiceCmd::Enable, None).await?; Ok(new_channel) } pub async fn delete_channel(conn: &Pool, id: i32) -> Result<(), ServiceError> { let channel = handles::select_channel(conn, &id).await?; - control_service(conn, channel.id, "stop").await?; - control_service(conn, channel.id, "disable").await?; + control_service(conn, channel.id, &ServiceCmd::Stop, None).await?; + control_service(conn, channel.id, &ServiceCmd::Disable, None).await?; if let Err(e) = fs::remove_file(channel.config_path) { error!("{e}"); diff --git a/ffplayout-api/src/utils/control.rs b/ffplayout-api/src/utils/control.rs index 1d4bdd9e..0803fac0 100644 --- a/ffplayout-api/src/utils/control.rs +++ b/ffplayout-api/src/utils/control.rs @@ -1,5 +1,16 @@ -use std::{collections::HashMap, process::Command}; +use std::{ + collections::HashMap, + env, fmt, + process::Child, + process::Command, + str::FromStr, + sync::{ + atomic::{AtomicBool, Ordering}, + Mutex, + }, +}; +use actix_web::web; use reqwest::{ header::{HeaderMap, AUTHORIZATION, CONTENT_TYPE}, Client, Response, @@ -26,8 +37,8 @@ struct TextParams { } #[derive(Debug, Deserialize, Serialize, Clone)] -struct ControlParams { - control: String, +pub struct ControlParams { + pub control: String, } #[derive(Debug, Deserialize, Serialize, Clone)] @@ -46,9 +57,132 @@ impl RpcObj { } } +/// ffplayout engine process +/// +/// When running not on Linux, or with environment variable `PIGGYBACK_MODE=true`, +/// the engine get startet and conntrolled from ffpapi +pub struct ProcessControl { + pub engine_child: Mutex>, + pub is_running: AtomicBool, +} + +impl ProcessControl { + pub fn new() -> Self { + Self { + engine_child: Mutex::new(None), + is_running: AtomicBool::new(false), + } + } +} + +impl ProcessControl { + pub fn start(&self) -> Result { + match Command::new("ffplayout").spawn() { + Ok(proc) => *self.engine_child.lock().unwrap() = Some(proc), + Err(_) => return Err(ServiceError::InternalServerError), + }; + + self.is_running.store(true, Ordering::SeqCst); + + Ok("Success".to_string()) + } + + pub fn kill(&self) -> Result { + if let Some(proc) = self.engine_child.lock().unwrap().as_mut() { + if proc.kill().is_err() { + return Err(ServiceError::InternalServerError); + }; + } + + self.wait()?; + self.is_running.store(false, Ordering::SeqCst); + + Ok("Success".to_string()) + } + + pub fn restart(&self) -> Result { + self.kill()?; + + match Command::new("ffplayout").spawn() { + Ok(proc) => *self.engine_child.lock().unwrap() = Some(proc), + Err(_) => return Err(ServiceError::InternalServerError), + }; + + self.is_running.store(true, Ordering::SeqCst); + + Ok("Success".to_string()) + } + + /// Wait for process to proper close. + /// This prevents orphaned/zombi processes in system + pub fn wait(&self) -> Result { + if let Some(proc) = self.engine_child.lock().unwrap().as_mut() { + if proc.wait().is_err() { + return Err(ServiceError::InternalServerError); + }; + } + + Ok("Success".to_string()) + } + + pub fn status(&self) -> Result { + if self.is_running.load(Ordering::SeqCst) { + Ok("active".to_string()) + } else { + Ok("not running".to_string()) + } + } +} + +impl Default for ProcessControl { + fn default() -> Self { + Self::new() + } +} + +#[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq)] +#[serde(rename_all = "snake_case")] +pub enum ServiceCmd { + Enable, + Disable, + Start, + Stop, + Restart, + Status, +} + +impl FromStr for ServiceCmd { + type Err = String; + + fn from_str(input: &str) -> Result { + match input.to_lowercase().as_str() { + "enable" => Ok(Self::Enable), + "disable" => Ok(Self::Disable), + "start" => Ok(Self::Start), + "stop" => Ok(Self::Stop), + "restart" => Ok(Self::Restart), + "status" => Ok(Self::Status), + _ => Err(format!("Command '{input}' not found!")), + } + } +} + +impl fmt::Display for ServiceCmd { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match *self { + Self::Enable => write!(f, "enable"), + Self::Disable => write!(f, "disable"), + Self::Start => write!(f, "start"), + Self::Stop => write!(f, "stop"), + Self::Restart => write!(f, "restart"), + Self::Status => write!(f, "status"), + } + } +} + #[derive(Debug, Deserialize, Serialize, Clone)] pub struct Process { - pub command: String, + pub command: ServiceCmd, } struct SystemD { @@ -175,9 +309,15 @@ pub async fn send_message( pub async fn control_state( conn: &Pool, id: i32, - command: String, + command: &str, ) -> Result { - let json_obj = RpcObj::new(id, "player".into(), ControlParams { control: command }); + let json_obj = RpcObj::new( + id, + "player".into(), + ControlParams { + control: command.to_owned(), + }, + ); post_request(conn, id, json_obj).await } @@ -195,17 +335,32 @@ pub async fn media_info( pub async fn control_service( conn: &Pool, id: i32, - command: &str, + command: &ServiceCmd, + engine: Option>, ) -> Result { - let system_d = SystemD::new(conn, id).await?; + let piggyback = env::var("PIGGYBACK_MODE"); - match command { - "enable" => system_d.enable(), - "disable" => system_d.disable(), - "start" => system_d.start(), - "stop" => system_d.stop(), - "restart" => system_d.restart(), - "status" => system_d.status(), - _ => Err(ServiceError::BadRequest("Command not found!".to_string())), + if (env::consts::OS != "linux" || piggyback.is_ok()) && engine.is_some() { + match command { + ServiceCmd::Start => engine.unwrap().start(), + ServiceCmd::Stop => engine.unwrap().kill(), + ServiceCmd::Restart => engine.unwrap().restart(), + ServiceCmd::Status => engine.unwrap().status(), + _ => Err(ServiceError::Conflict( + "Engine runs in piggyback mode, in this mode this command is not allowed." + .to_string(), + )), + } + } else { + let system_d = SystemD::new(conn, id).await?; + + match command { + ServiceCmd::Enable => system_d.enable(), + ServiceCmd::Disable => system_d.disable(), + ServiceCmd::Start => system_d.start(), + ServiceCmd::Stop => system_d.stop(), + ServiceCmd::Restart => system_d.restart(), + ServiceCmd::Status => system_d.status(), + } } } diff --git a/ffplayout-frontend b/ffplayout-frontend index 59002611..bd7d2a21 160000 --- a/ffplayout-frontend +++ b/ffplayout-frontend @@ -1 +1 @@ -Subproject commit 590026119cbeb1c7b166f7f45b6dfa44628f6597 +Subproject commit bd7d2a21e3450b2489717b3e37e016864698e8a0 diff --git a/tests/Cargo.toml b/tests/Cargo.toml index 253d1b7d..84c9dba2 100644 --- a/tests/Cargo.toml +++ b/tests/Cargo.toml @@ -26,7 +26,7 @@ reqwest = { version = "0.11", features = ["blocking", "json"] } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" serde_yaml = "0.9" -serial_test = "1.0" +serial_test = { git = "https://github.com/palfrey/serial_test.git", branch = "remove-ignore-ignore" } shlex = "1.1" simplelog = { version = "^0.12", features = ["paris"] } time = { version = "0.3", features = ["formatting", "macros"] }