From 754b69cf99e751c8c42e343ccf5ef5793c26bb4a Mon Sep 17 00:00:00 2001 From: jb-alvarado Date: Sun, 27 Mar 2022 18:37:18 +0200 Subject: [PATCH] close processes on panic --- src/output/mod.rs | 74 ++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 64 insertions(+), 10 deletions(-) diff --git a/src/output/mod.rs b/src/output/mod.rs index 6c9e8017..95b524bc 100644 --- a/src/output/mod.rs +++ b/src/output/mod.rs @@ -3,7 +3,7 @@ use std::{ io::{prelude::*, BufReader, BufWriter, Read}, path::Path, process, - process::{Command, Stdio}, + process::{Child, Command, Stdio}, sync::{ mpsc::{channel, sync_channel, Receiver, SyncSender}, Arc, Mutex, @@ -22,6 +22,61 @@ mod stream; use crate::input::{ingest_server, watch_folder, CurrentProgram, Source}; use crate::utils::{sec_to_time, stderr_reader, GlobalConfig, Media}; +#[derive(Debug)] +struct ProcessCleanup { + server_term: Arc>>, + is_terminated: Arc>, + enc_proc: Child, + is_alive: bool, +} + +impl ProcessCleanup { + fn new( + server_term: Arc>>, + is_terminated: Arc>, + enc_proc: Child, + ) -> Self { + Self { + server_term, + is_terminated, + enc_proc, + is_alive: true, + } + } +} + +impl ProcessCleanup { + fn kill(&mut self) { + *self.is_terminated.lock().unwrap() = true; + + if self.is_alive { + if let Some(server) = &*self.server_term.lock().unwrap() { + unsafe { + if let Ok(_) = server.terminate() { + info!("Terminate ingest server done"); + } + } + }; + + self.is_alive = false; + } + + if let Ok(_) = self.enc_proc.kill() { + info!("Playout done...") + } + + if let Err(e) = self.enc_proc.wait() { + error!("Encoder: {e}") + }; + } +} + +impl Drop for ProcessCleanup { + fn drop(&mut self) { + self.kill() + } +} + pub fn play(rt_handle: &Handle) { let config = GlobalConfig::global(); let dec_settings = config.processing.clone().settings.unwrap(); @@ -100,6 +155,9 @@ pub fn play(rt_handle: &Handle) { )); } + let mut proc_cleanup = + ProcessCleanup::new(server_term.clone(), is_terminated.clone(), enc_proc); + 'source_iter: for node in get_source { let cmd = match node.cmd { Some(cmd) => cmd, @@ -127,7 +185,10 @@ pub fn play(rt_handle: &Handle) { dec_cmd.append(&mut dec_settings.iter().map(String::as_str).collect()); - debug!("Decoder CMD: \"ffmpeg {}\"", dec_cmd.join(" ")); + debug!( + "Decoder CMD: \"ffmpeg {}\"", + dec_cmd.join(" ") + ); let mut dec_proc = match Command::new("ffmpeg") .args(dec_cmd) @@ -231,12 +292,5 @@ pub fn play(rt_handle: &Handle) { sleep(Duration::from_secs(1)); - match enc_proc.kill() { - Ok(_) => info!("Playout done..."), - Err(e) => panic!("Encoder error: {:?}", e), - } - - if let Err(e) = enc_proc.wait() { - error!("Encoder: {e}") - }; + proc_cleanup.kill(); }