From 1b6b9a19aab10bb6a59c9a6ea2ec867355337b56 Mon Sep 17 00:00:00 2001 From: Jonathan Baecker Date: Sun, 27 Oct 2024 20:40:57 +0100 Subject: [PATCH] cleanup and fix child wait, check if ingest got input, console log with timestamp --- engine/src/player/controller.rs | 57 +++++++++++++-------------------- engine/src/player/output/hls.rs | 2 +- engine/src/utils/args_parse.rs | 5 ++- engine/src/utils/logging.rs | 54 ++++++++++++++----------------- 4 files changed, 51 insertions(+), 67 deletions(-) diff --git a/engine/src/player/controller.rs b/engine/src/player/controller.rs index d06b650b..f3e7b860 100644 --- a/engine/src/player/controller.rs +++ b/engine/src/player/controller.rs @@ -224,39 +224,31 @@ impl ChannelManager { Ok(()) } + fn run_wait( + &self, + unit: ProcessUnit, + child: &Arc>>, + ) -> Result<(), ProcessError> { + if let Some(proc) = child.lock().unwrap().as_mut() { + loop { + match proc.try_wait() { + Ok(Some(_)) => break, + Ok(None) => thread::sleep(Duration::from_millis(10)), + Err(e) => return Err(ProcessError::Custom(format!("{unit}: {e}"))), + } + } + } + + Ok(()) + } + /// Wait for process to proper close. /// This prevents orphaned/zombi processes in system pub fn wait(&self, unit: ProcessUnit) -> Result<(), ProcessError> { - loop { - match unit { - Decoder => { - if let Some(proc) = self.decoder.lock().unwrap().as_mut() { - match proc.try_wait() { - Ok(Some(_)) => break, - Ok(None) => thread::sleep(Duration::from_millis(10)), - Err(e) => return Err(ProcessError::Custom(format!("Decoder: {e}"))), - } - } - } - Encoder => { - if let Some(proc) = self.encoder.lock().unwrap().as_mut() { - match proc.try_wait() { - Ok(Some(_)) => break, - Ok(None) => thread::sleep(Duration::from_millis(10)), - Err(e) => return Err(ProcessError::Custom(format!("Encoder: {e}"))), - } - } - } - Ingest => { - if let Some(proc) = self.ingest.lock().unwrap().as_mut() { - match proc.try_wait() { - Ok(Some(_)) => break, - Ok(None) => thread::sleep(Duration::from_millis(10)), - Err(e) => return Err(ProcessError::Custom(format!("Ingest: {e}"))), - } - } - } - } + match unit { + Decoder => self.run_wait(unit, &self.decoder)?, + Encoder => self.run_wait(unit, &self.encoder)?, + Ingest => self.run_wait(unit, &self.ingest)?, } thread::sleep(Duration::from_millis(50)); @@ -313,11 +305,6 @@ impl ChannelManager { error!(target: Target::all(), channel = channel_id; "{e}") } } - if let Err(e) = self.wait(unit) { - if !e.to_string().contains("exited process") { - error!(target: Target::all(), channel = channel_id; "{e}") - } - } } } } diff --git a/engine/src/player/output/hls.rs b/engine/src/player/output/hls.rs index 3fdd5a6e..18766872 100644 --- a/engine/src/player/output/hls.rs +++ b/engine/src/player/output/hls.rs @@ -126,7 +126,7 @@ fn ingest_to_hls_server(manager: ChannelManager) -> Result<(), ProcessError> { }; } - if !is_running { + if !is_running && line.contains("Input #0") { ingest_is_running.store(true, Ordering::SeqCst); playlist_init.store(true, Ordering::SeqCst); is_running = true; diff --git a/engine/src/utils/args_parse.rs b/engine/src/utils/args_parse.rs index d15607d6..ad3278a9 100644 --- a/engine/src/utils/args_parse.rs +++ b/engine/src/utils/args_parse.rs @@ -167,7 +167,7 @@ pub struct Args { #[clap(long, help_heading = Some("Playlist"), help = "Only validate given playlist")] pub validate: bool, - #[clap(long, env, help_heading = Some("Playout"), help = "Run playout without webserver and frontend.")] + #[clap(long, env, help_heading = Some("Playout"), help = "Run playout without webserver and frontend")] pub foreground: bool, #[clap(short, long, help_heading = Some("Playout"), help = "Play folder content")] @@ -176,6 +176,9 @@ pub struct Args { #[clap(long, env, help_heading = Some("Playout"), help = "Keep log file for given days")] pub log_backup_count: Option, + #[clap(long, env, help_heading = Some("Playout"), help = "Add timestamp to log line")] + pub log_timestamp: bool, + #[clap(short, long, help_heading = Some("Playout"), help = "Set output mode: desktop, hls, null, stream")] pub output: Option, diff --git a/engine/src/utils/logging.rs b/engine/src/utils/logging.rs index 61ebd5ba..4676c882 100644 --- a/engine/src/utils/logging.rs +++ b/engine/src/utils/logging.rs @@ -248,38 +248,32 @@ fn strip_tags(input: &str) -> String { re.replace_all(input, "").to_string() } -fn console_formatter(w: &mut dyn Write, _now: &mut DeferredNow, record: &Record) -> io::Result<()> { - match record.level() { - Level::Debug => write!( +fn console_formatter(w: &mut dyn Write, now: &mut DeferredNow, record: &Record) -> io::Result<()> { + let log_line = match record.level() { + Level::Debug => colorize_string(format!("[DEBUG] {}", record.args())), + Level::Error => colorize_string(format!("[ERROR] {}", record.args())), + Level::Info => colorize_string(format!("[ INFO] {}", record.args())), + Level::Trace => colorize_string(format!( + "[TRACE] {}:{} {}", + record.file().unwrap_or_default(), + record.line().unwrap_or_default(), + record.args() + )), + Level::Warn => colorize_string(format!("[ WARN] {}", record.args())), + }; + + if ARGS.log_timestamp { + write!( w, - "{}", - colorize_string(format!("[DEBUG] {}", record.args())) - ), - Level::Error => write!( - w, - "{}", - colorize_string(format!("[ERROR] {}", record.args())) - ), - Level::Info => write!( - w, - "{}", - colorize_string(format!("[ INFO] {}", record.args())) - ), - Level::Trace => write!( - w, - "{}", + "{} {}", colorize_string(format!( - "[TRACE] {}:{} {}", - record.file().unwrap_or_default(), - record.line().unwrap_or_default(), - record.args() - )) - ), - Level::Warn => write!( - w, - "{}", - colorize_string(format!("[ WARN] {}", record.args())) - ), + "[{}]", + now.now().format("%Y-%m-%d %H:%M:%S%.6f") + )), + log_line + ) + } else { + write!(w, "{}", log_line) } }