cleanup and fix child wait, check if ingest got input, console log with timestamp

This commit is contained in:
Jonathan Baecker 2024-10-27 20:40:57 +01:00
parent da2fdd2d75
commit 1b6b9a19aa
4 changed files with 51 additions and 67 deletions

View File

@ -224,39 +224,31 @@ impl ChannelManager {
Ok(()) Ok(())
} }
fn run_wait(
&self,
unit: ProcessUnit,
child: &Arc<Mutex<Option<Child>>>,
) -> Result<(), ProcessError> {
if let Some(proc) = child.lock().unwrap().as_mut() {
loop {
match proc.try_wait() {
Ok(Some(_)) => break,
Ok(None) => thread::sleep(Duration::from_millis(10)),
Err(e) => return Err(ProcessError::Custom(format!("{unit}: {e}"))),
}
}
}
Ok(())
}
/// Wait for process to proper close. /// Wait for process to proper close.
/// This prevents orphaned/zombi processes in system /// This prevents orphaned/zombi processes in system
pub fn wait(&self, unit: ProcessUnit) -> Result<(), ProcessError> { pub fn wait(&self, unit: ProcessUnit) -> Result<(), ProcessError> {
loop {
match unit { match unit {
Decoder => { Decoder => self.run_wait(unit, &self.decoder)?,
if let Some(proc) = self.decoder.lock().unwrap().as_mut() { Encoder => self.run_wait(unit, &self.encoder)?,
match proc.try_wait() { Ingest => self.run_wait(unit, &self.ingest)?,
Ok(Some(_)) => break,
Ok(None) => thread::sleep(Duration::from_millis(10)),
Err(e) => return Err(ProcessError::Custom(format!("Decoder: {e}"))),
}
}
}
Encoder => {
if let Some(proc) = self.encoder.lock().unwrap().as_mut() {
match proc.try_wait() {
Ok(Some(_)) => break,
Ok(None) => thread::sleep(Duration::from_millis(10)),
Err(e) => return Err(ProcessError::Custom(format!("Encoder: {e}"))),
}
}
}
Ingest => {
if let Some(proc) = self.ingest.lock().unwrap().as_mut() {
match proc.try_wait() {
Ok(Some(_)) => break,
Ok(None) => thread::sleep(Duration::from_millis(10)),
Err(e) => return Err(ProcessError::Custom(format!("Ingest: {e}"))),
}
}
}
}
} }
thread::sleep(Duration::from_millis(50)); thread::sleep(Duration::from_millis(50));
@ -313,11 +305,6 @@ impl ChannelManager {
error!(target: Target::all(), channel = channel_id; "{e}") error!(target: Target::all(), channel = channel_id; "{e}")
} }
} }
if let Err(e) = self.wait(unit) {
if !e.to_string().contains("exited process") {
error!(target: Target::all(), channel = channel_id; "{e}")
}
}
} }
} }
} }

View File

@ -126,7 +126,7 @@ fn ingest_to_hls_server(manager: ChannelManager) -> Result<(), ProcessError> {
}; };
} }
if !is_running { if !is_running && line.contains("Input #0") {
ingest_is_running.store(true, Ordering::SeqCst); ingest_is_running.store(true, Ordering::SeqCst);
playlist_init.store(true, Ordering::SeqCst); playlist_init.store(true, Ordering::SeqCst);
is_running = true; is_running = true;

View File

@ -167,7 +167,7 @@ pub struct Args {
#[clap(long, help_heading = Some("Playlist"), help = "Only validate given playlist")] #[clap(long, help_heading = Some("Playlist"), help = "Only validate given playlist")]
pub validate: bool, pub validate: bool,
#[clap(long, env, help_heading = Some("Playout"), help = "Run playout without webserver and frontend.")] #[clap(long, env, help_heading = Some("Playout"), help = "Run playout without webserver and frontend")]
pub foreground: bool, pub foreground: bool,
#[clap(short, long, help_heading = Some("Playout"), help = "Play folder content")] #[clap(short, long, help_heading = Some("Playout"), help = "Play folder content")]
@ -176,6 +176,9 @@ pub struct Args {
#[clap(long, env, help_heading = Some("Playout"), help = "Keep log file for given days")] #[clap(long, env, help_heading = Some("Playout"), help = "Keep log file for given days")]
pub log_backup_count: Option<usize>, pub log_backup_count: Option<usize>,
#[clap(long, env, help_heading = Some("Playout"), help = "Add timestamp to log line")]
pub log_timestamp: bool,
#[clap(short, long, help_heading = Some("Playout"), help = "Set output mode: desktop, hls, null, stream")] #[clap(short, long, help_heading = Some("Playout"), help = "Set output mode: desktop, hls, null, stream")]
pub output: Option<OutputMode>, pub output: Option<OutputMode>,

View File

@ -248,38 +248,32 @@ fn strip_tags(input: &str) -> String {
re.replace_all(input, "").to_string() re.replace_all(input, "").to_string()
} }
fn console_formatter(w: &mut dyn Write, _now: &mut DeferredNow, record: &Record) -> io::Result<()> { fn console_formatter(w: &mut dyn Write, now: &mut DeferredNow, record: &Record) -> io::Result<()> {
match record.level() { let log_line = match record.level() {
Level::Debug => write!( Level::Debug => colorize_string(format!("<bright-blue>[DEBUG]</> {}", record.args())),
w, Level::Error => colorize_string(format!("<bright-red>[ERROR]</> {}", record.args())),
"{}", Level::Info => colorize_string(format!("<bright-green>[ INFO]</> {}", record.args())),
colorize_string(format!("<bright-blue>[DEBUG]</> {}", record.args())) Level::Trace => colorize_string(format!(
),
Level::Error => write!(
w,
"{}",
colorize_string(format!("<bright-red>[ERROR]</> {}", record.args()))
),
Level::Info => write!(
w,
"{}",
colorize_string(format!("<bright-green>[ INFO]</> {}", record.args()))
),
Level::Trace => write!(
w,
"{}",
colorize_string(format!(
"<bright-yellow>[TRACE]</> {}:{} {}", "<bright-yellow>[TRACE]</> {}:{} {}",
record.file().unwrap_or_default(), record.file().unwrap_or_default(),
record.line().unwrap_or_default(), record.line().unwrap_or_default(),
record.args() record.args()
)) )),
), Level::Warn => colorize_string(format!("<yellow>[ WARN]</> {}", record.args())),
Level::Warn => write!( };
if ARGS.log_timestamp {
write!(
w, w,
"{}", "{} {}",
colorize_string(format!("<yellow>[ WARN]</> {}", record.args())) colorize_string(format!(
), "<bright black>[{}]</>",
now.now().format("%Y-%m-%d %H:%M:%S%.6f")
)),
log_line
)
} else {
write!(w, "{}", log_line)
} }
} }