From ca52f8bf948049a924f3285697cacf67c699d7e9 Mon Sep 17 00:00:00 2001 From: jb-alvarado Date: Mon, 11 Apr 2022 21:37:41 +0200 Subject: [PATCH] remove process_control and replace it with std Child --- Cargo.lock | 54 ------------ Cargo.toml | 1 - src/input/ingest.rs | 35 ++++---- src/output/mod.rs | 40 ++++----- src/rpc/mod.rs | 46 +++++----- src/utils/controller.rs | 181 ++++++++++++++++++++++++++++++++++++++++ src/utils/mod.rs | 110 +----------------------- 7 files changed, 239 insertions(+), 228 deletions(-) create mode 100644 src/utils/controller.rs diff --git a/Cargo.lock b/Cargo.lock index 37152d7d..250b7828 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -179,7 +179,6 @@ dependencies = [ "notify", "once_cell", "openssl", - "process_control", "rand", "regex", "serde", @@ -1028,16 +1027,6 @@ dependencies = [ "unicode-xid", ] -[[package]] -name = "process_control" -version = "3.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26d4fa9c62a51815c9588b09a94f713c1e9a87d74142537d7c7d5ee972b8479f" -dependencies = [ - "libc", - "windows-sys", -] - [[package]] name = "quote" version = "1.0.18" @@ -1517,49 +1506,6 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" -[[package]] -name = "windows-sys" -version = "0.33.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43dbb096663629518eb1dfa72d80243ca5a6aca764cae62a2df70af760a9be75" -dependencies = [ - "windows_aarch64_msvc", - "windows_i686_gnu", - "windows_i686_msvc", - "windows_x86_64_gnu", - "windows_x86_64_msvc", -] - -[[package]] -name = "windows_aarch64_msvc" -version = "0.33.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cd761fd3eb9ab8cc1ed81e56e567f02dd82c4c837e48ac3b2181b9ffc5060807" - -[[package]] -name = "windows_i686_gnu" -version = "0.33.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cab0cf703a96bab2dc0c02c0fa748491294bf9b7feb27e1f4f96340f208ada0e" - -[[package]] -name = "windows_i686_msvc" -version = "0.33.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8cfdbe89cc9ad7ce618ba34abc34bbb6c36d99e96cae2245b7943cd75ee773d0" - -[[package]] -name = "windows_x86_64_gnu" -version = "0.33.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b4dd9b0c0e9ece7bb22e84d70d01b71c6d6248b81a3c60d11869451b4cb24784" - -[[package]] -name = "windows_x86_64_msvc" -version = "0.33.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ff1e4aa646495048ec7f3ffddc411e1d829c026a2ec62b39da15c1055e406eaa" - [[package]] name = "ws2_32-sys" version = "0.2.1" diff --git a/Cargo.toml b/Cargo.toml index c7b88abf..d6dc10ee 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,7 +15,6 @@ lettre = "0.10.0-rc.5" log = "0.4" notify = "4.0" once_cell = "1.10" -process_control = "3.3" rand = "0.8" regex = "1" serde = { version = "1.0", features = ["derive"] } diff --git a/src/input/ingest.rs b/src/input/ingest.rs index dcb3adf7..8dbbedb2 100644 --- a/src/input/ingest.rs +++ b/src/input/ingest.rs @@ -2,16 +2,15 @@ use std::{ io::{BufReader, Error, Read}, path::Path, process::{Command, Stdio}, - sync::{mpsc::SyncSender}, + sync::mpsc::SyncSender, thread::sleep, time::Duration, }; -use process_control::ChildExt; use simplelog::*; use tokio::runtime::Handle; -use crate::utils::{stderr_reader, GlobalConfig, ProcessControl}; +use crate::utils::{stderr_reader, GlobalConfig, Ingest, ProcessControl}; fn overlay(config: &GlobalConfig) -> String { let mut logo_chain = String::new(); @@ -57,7 +56,7 @@ pub async fn ingest_server( log_format: String, ingest_sender: SyncSender<(usize, [u8; 65088])>, rt_handle: Handle, - proc_control: ProcessControl, + mut proc_control: ProcessControl, ) -> Result<(), Error> { let config = GlobalConfig::global(); let mut buffer: [u8; 65088] = [0; 65088]; @@ -96,7 +95,10 @@ pub async fn ingest_server( stream_input.last().unwrap() ); - debug!("Server CMD: \"ffmpeg {}\"", server_cmd.join(" ")); + debug!( + "Server CMD: \"ffmpeg {}\"", + server_cmd.join(" ") + ); loop { if *proc_control.is_terminated.lock().unwrap() { @@ -115,15 +117,11 @@ pub async fn ingest_server( Ok(proc) => proc, }; - let serv_terminator = server_proc.terminator()?; - *proc_control.server_term.lock().unwrap() = Some(serv_terminator); - - rt_handle.spawn(stderr_reader( - server_proc.stderr.take().unwrap(), - "Server", - )); + rt_handle.spawn(stderr_reader(server_proc.stderr.take().unwrap(), "Server")); let mut ingest_reader = BufReader::new(server_proc.stdout.take().unwrap()); + *proc_control.server_term.lock().unwrap() = Some(server_proc); + is_running = false; loop { @@ -131,7 +129,6 @@ pub async fn ingest_server( Ok(length) => length, Err(e) => { debug!("Ingest server read {e:?}"); - break; } }; @@ -153,17 +150,15 @@ pub async fn ingest_server( } } + drop(ingest_reader); + *proc_control.server_is_running.lock().unwrap() = false; sleep(Duration::from_secs(1)); - if let Err(e) = server_proc.kill() { - error!("Ingest server {e:?}") - }; - - if let Err(e) = server_proc.wait() { - error!("Ingest server {e:?}") - }; + if let Err(e) = proc_control.wait(Ingest) { + error!("{e}") + } } Ok(()) diff --git a/src/output/mod.rs b/src/output/mod.rs index 454ab10a..ea922923 100644 --- a/src/output/mod.rs +++ b/src/output/mod.rs @@ -6,7 +6,6 @@ use std::{ time::Duration, }; -use process_control::ChildExt; use simplelog::*; use tokio::runtime::Handle; @@ -18,14 +17,15 @@ pub use hls::write_hls; use crate::input::{ingest_server, source_generator}; use crate::utils::{ - sec_to_time, stderr_reader, GlobalConfig, PlayerControl, PlayoutStatus, ProcessControl, + sec_to_time, stderr_reader, Decoder, Encoder, GlobalConfig, PlayerControl, PlayoutStatus, + ProcessControl, }; pub fn player( rt_handle: &Handle, play_control: PlayerControl, playout_stat: PlayoutStatus, - proc_control: ProcessControl, + mut proc_control: ProcessControl, ) { let config = GlobalConfig::global(); let dec_settings = config.processing.clone().settings.unwrap(); @@ -50,8 +50,8 @@ pub fn player( }; let mut enc_writer = BufWriter::new(enc_proc.stdin.take().unwrap()); - rt_handle.spawn(stderr_reader(enc_proc.stderr.take().unwrap(), "Encoder")); + *proc_control.decoder_term.lock().unwrap() = Some(enc_proc); let (ingest_sender, ingest_receiver): ( SyncSender<(usize, [u8; 65088])>, @@ -114,12 +114,8 @@ pub fn player( }; let mut dec_reader = BufReader::new(dec_proc.stdout.take().unwrap()); - rt_handle.spawn(stderr_reader(dec_proc.stderr.take().unwrap(), "Decoder")); - - if let Ok(dec_terminator) = dec_proc.terminator() { - *proc_control.decoder_term.lock().unwrap() = Some(dec_terminator); - }; + *proc_control.decoder_term.lock().unwrap() = Some(dec_proc); loop { if *proc_control.server_is_running.lock().unwrap() { @@ -130,13 +126,9 @@ pub fn player( error!("Encoder error: {e}") } - if let Err(e) = dec_proc.kill() { - error!("Decoder error: {e}") - }; - - if let Err(e) = dec_proc.wait() { - error!("Decoder error: {e}") - }; + if let Err(e) = proc_control.kill(Decoder) { + error!("{e}") + } live_on = true; @@ -182,18 +174,14 @@ pub fn player( } } - if let Err(e) = dec_proc.wait() { - panic!("Decoder error: {e:?}") - }; + if let Err(e) = proc_control.wait(Decoder) { + error!("{e}") + } } sleep(Duration::from_secs(1)); - if let Err(e) = enc_proc.kill() { - panic!("Encoder error: {e:?}") - }; - - if let Err(e) = enc_proc.wait() { - panic!("Encoder error: {e:?}") - }; + if let Err(e) = proc_control.kill(Encoder) { + error!("{e}") + } } diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index 295a3753..7957a648 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -1,10 +1,7 @@ -use std::sync::{Arc, Mutex}; - use jsonrpc_http_server::jsonrpc_core::{IoHandler, Params, Value}; use jsonrpc_http_server::{ hyper, AccessControlAllowOrigin, DomainsValidation, Response, RestApi, ServerBuilder, }; -use process_control::Terminator; use serde_json::{json, Map}; use simplelog::*; @@ -45,19 +42,6 @@ fn get_data_map(config: &GlobalConfig, media: Media) -> Map { data_map } -fn kill_decoder(terminator: Arc>>) -> Result<(), String> { - match &*terminator.lock().unwrap() { - Some(decoder) => unsafe { - if let Err(e) = decoder.terminate() { - return Err(format!("Terminate decoder: {e}")); - } - }, - None => return Err("No decoder terminator found".to_string()), - } - - Ok(()) -} - pub async fn json_rpc_server( play_control: PlayerControl, playout_stat: PlayoutStatus, @@ -78,7 +62,15 @@ pub async fn json_rpc_server( let index = *play.index.lock().unwrap(); if index < play.current_list.lock().unwrap().len() { - if let Ok(_) = kill_decoder(proc.decoder_term.clone()) { + if let Some(proc) = proc.decoder_term.lock().unwrap().as_mut() { + if let Err(e) = proc.kill() { + error!("Decoder {e:?}") + }; + + if let Err(e) = proc.wait() { + error!("Decoder {e:?}") + }; + info!("Move to next clip"); let mut data_map = Map::new(); @@ -107,7 +99,15 @@ pub async fn json_rpc_server( let index = *play.index.lock().unwrap(); if index > 1 && play.current_list.lock().unwrap().len() > 1 { - if let Ok(_) = kill_decoder(proc.decoder_term.clone()) { + if let Some(proc) = proc.decoder_term.lock().unwrap().as_mut() { + if let Err(e) = proc.kill() { + error!("Decoder {e:?}") + }; + + if let Err(e) = proc.wait() { + error!("Decoder {e:?}") + }; + info!("Move to last clip"); let mut data_map = Map::new(); let mut media = play.current_list.lock().unwrap()[index - 2].clone(); @@ -133,7 +133,15 @@ pub async fn json_rpc_server( } if map.contains_key("control") && &map["control"] == "reset" { - if let Ok(_) = kill_decoder(proc.decoder_term.clone()) { + if let Some(proc) = proc.decoder_term.lock().unwrap().as_mut() { + if let Err(e) = proc.kill() { + error!("Decoder {e:?}") + }; + + if let Err(e) = proc.wait() { + error!("Decoder {e:?}") + }; + info!("Reset playout to original state"); let mut data_map = Map::new(); *time_shift = 0.0; diff --git a/src/utils/controller.rs b/src/utils/controller.rs new file mode 100644 index 00000000..e251eebf --- /dev/null +++ b/src/utils/controller.rs @@ -0,0 +1,181 @@ +use std::{ + fmt, + process::Child, + sync::{Arc, Mutex, RwLock}, + +}; + +use jsonrpc_http_server::CloseHandle; +use simplelog::*; + +use crate::utils::Media; + +pub enum ProcessUnit { + Decoder, + Encoder, + Ingest, +} + +impl fmt::Display for ProcessUnit { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match *self { + ProcessUnit::Decoder => write!(f, "Decoder"), + ProcessUnit::Encoder => write!(f, "Encoder"), + ProcessUnit::Ingest => write!(f, "Ingest"), + } + } +} + +use ProcessUnit::*; + +#[derive(Clone)] +pub struct ProcessControl { + pub decoder_term: Arc>>, + pub encoder_term: Arc>>, + pub server_term: Arc>>, + pub server_is_running: Arc>, + pub rpc_handle: Arc>>, + pub is_terminated: Arc>, + pub is_alive: Arc>, +} + +impl ProcessControl { + pub fn new() -> Self { + Self { + decoder_term: Arc::new(Mutex::new(None)), + encoder_term: Arc::new(Mutex::new(None)), + server_term: Arc::new(Mutex::new(None)), + server_is_running: Arc::new(Mutex::new(false)), + rpc_handle: Arc::new(Mutex::new(None)), + is_terminated: Arc::new(Mutex::new(false)), + is_alive: Arc::new(RwLock::new(true)), + } + } +} + +impl ProcessControl { + pub fn kill(&mut self, proc: ProcessUnit) -> Result<(), String> { + match proc { + Decoder => { + if let Some(proc) = self.decoder_term.lock().unwrap().as_mut() { + if let Err(e) = proc.kill() { + return Err(format!("Decoder {e:?}")); + }; + } + } + Encoder => { + if let Some(proc) = self.encoder_term.lock().unwrap().as_mut() { + if let Err(e) = proc.kill() { + return Err(format!("Encoder {e:?}")); + }; + } + } + Ingest => { + if let Some(proc) = self.server_term.lock().unwrap().as_mut() { + if let Err(e) = proc.kill() { + return Err(format!("Ingest server {e:?}")); + }; + } + } + } + + if let Err(e) = self.wait(proc) { + return Err(e); + }; + + Ok(()) + } + + pub fn wait(&mut self, proc: ProcessUnit) -> Result<(), String> { + match proc { + Decoder => { + if let Some(proc) = self.decoder_term.lock().unwrap().as_mut() { + if let Err(e) = proc.wait() { + return Err(format!("Decoder {e:?}")); + }; + } + } + Encoder => { + if let Some(proc) = self.encoder_term.lock().unwrap().as_mut() { + if let Err(e) = proc.wait() { + return Err(format!("Encoder {e:?}")); + }; + } + } + Ingest => { + if let Some(proc) = self.server_term.lock().unwrap().as_mut() { + if let Err(e) = proc.wait() { + return Err(format!("Ingest server {e:?}")); + }; + } + } + } + + Ok(()) + } + + pub fn kill_all(&mut self) { + *self.is_terminated.lock().unwrap() = true; + + if *self.is_alive.read().unwrap() { + *self.is_alive.write().unwrap() = false; + + if let Some(rpc) = &*self.rpc_handle.lock().unwrap() { + rpc.clone().close() + }; + + for unit in [ + Decoder, + Encoder, + Ingest, + ] { + if let Err(e) = self.kill(unit) { + error!("{e}") + } + } + } + } +} + +impl Drop for ProcessControl { + fn drop(&mut self) { + self.kill_all() + } +} + +#[derive(Clone)] +pub struct PlayerControl { + pub current_media: Arc>>, + pub current_list: Arc>>, + pub index: Arc>, +} + +impl PlayerControl { + pub fn new() -> Self { + Self { + current_media: Arc::new(Mutex::new(None)), + current_list: Arc::new(Mutex::new(vec![Media::new(0, String::new(), false)])), + index: Arc::new(Mutex::new(0)), + } + } +} + +#[derive(Clone, Debug)] +pub struct PlayoutStatus { + pub time_shift: Arc>, + pub date: Arc>, + pub current_date: Arc>, + pub list_init: Arc>, +} + +impl PlayoutStatus { + pub fn new() -> Self { + Self { + time_shift: Arc::new(Mutex::new(0.0)), + date: Arc::new(Mutex::new(String::new())), + current_date: Arc::new(Mutex::new(String::new())), + list_init: Arc::new(Mutex::new(true)), + } + } +} + diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 5608602d..e1f450be 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -8,13 +8,10 @@ use std::{ path::Path, process::exit, process::{ChildStderr, Command, Stdio}, - sync::{Arc, Mutex, RwLock}, time, time::UNIX_EPOCH, }; -use jsonrpc_http_server::CloseHandle; -use process_control::Terminator; use regex::Regex; use serde::{Deserialize, Serialize}; use serde_json::json; @@ -22,123 +19,20 @@ use simplelog::*; mod arg_parse; mod config; +pub mod controller; pub mod json_reader; mod json_validate; mod logging; pub use arg_parse::get_args; pub use config::{init_config, GlobalConfig}; +pub use controller::{PlayerControl, PlayoutStatus, ProcessControl, ProcessUnit::*}; pub use json_reader::{read_json, Playlist, DUMMY_LEN}; pub use json_validate::validate_playlist; pub use logging::init_logging; use crate::filter::filter_chains; -#[derive(Clone)] -pub struct ProcessControl { - pub decoder_term: Arc>>, - pub encoder_term: Arc>>, - pub server_term: Arc>>, - pub server_is_running: Arc>, - pub rpc_handle: Arc>>, - pub is_terminated: Arc>, - pub is_alive: Arc>, -} - -impl ProcessControl { - pub fn new() -> Self { - Self { - decoder_term: Arc::new(Mutex::new(None)), - encoder_term: Arc::new(Mutex::new(None)), - server_term: Arc::new(Mutex::new(None)), - server_is_running: Arc::new(Mutex::new(false)), - rpc_handle: Arc::new(Mutex::new(None)), - is_terminated: Arc::new(Mutex::new(false)), - is_alive: Arc::new(RwLock::new(true)), - } - } -} - -impl ProcessControl { - pub fn kill_all(&mut self) { - *self.is_terminated.lock().unwrap() = true; - - if *self.is_alive.read().unwrap() { - *self.is_alive.write().unwrap() = false; - - if let Some(rpc) = &*self.rpc_handle.lock().unwrap() { - rpc.clone().close() - }; - - if let Some(server) = &*self.server_term.lock().unwrap() { - unsafe { - if let Err(e) = server.terminate() { - error!("Ingest server: {e:?}"); - } - } - }; - - if let Some(decoder) = &*self.decoder_term.lock().unwrap() { - unsafe { - if let Err(e) = decoder.terminate() { - error!("Decoder: {e:?}"); - } - } - }; - - if let Some(encoder) = &*self.encoder_term.lock().unwrap() { - unsafe { - if let Err(e) = encoder.terminate() { - error!("Encoder: {e:?}"); - } - } - }; - } - } -} - -impl Drop for ProcessControl { - fn drop(&mut self) { - self.kill_all() - } -} - -#[derive(Clone, Debug)] -pub struct PlayoutStatus { - pub time_shift: Arc>, - pub date: Arc>, - pub current_date: Arc>, - pub list_init: Arc>, -} - -impl PlayoutStatus { - pub fn new() -> Self { - Self { - time_shift: Arc::new(Mutex::new(0.0)), - date: Arc::new(Mutex::new(String::new())), - current_date: Arc::new(Mutex::new(String::new())), - list_init: Arc::new(Mutex::new(true)), - } - } -} - -#[derive(Clone)] -pub struct PlayerControl { - pub current_media: Arc>>, - pub current_list: Arc>>, - pub index: Arc>, -} - -impl PlayerControl { - pub fn new() -> Self { - Self { - current_media: Arc::new(Mutex::new(None)), - current_list: Arc::new(Mutex::new(vec![Media::new(0, String::new(), false)])), - index: Arc::new(Mutex::new(0)), - } - } -} - #[derive(Debug, Serialize, Deserialize, Clone)] pub struct Media { pub begin: Option,