From 492085c9fdc95e0142ff08011c7dcd051280a006 Mon Sep 17 00:00:00 2001 From: jb-alvarado Date: Wed, 25 May 2022 18:00:33 +0200 Subject: [PATCH] very basic ingest rtmp auth --- src/input/ingest.rs | 54 ++++++++++++++++++++++++++++++++++++++++----- src/output/hls.rs | 17 ++++++++------ src/output/mod.rs | 5 +---- src/utils/mod.rs | 25 +++++++++------------ 4 files changed, 69 insertions(+), 32 deletions(-) diff --git a/src/input/ingest.rs b/src/input/ingest.rs index 03934b71..5e782c42 100644 --- a/src/input/ingest.rs +++ b/src/input/ingest.rs @@ -1,6 +1,6 @@ use std::{ - io::{BufReader, Error, Read}, - process::{Command, Stdio}, + io::{BufRead, BufReader, Error, Read}, + process::{ChildStderr, Command, Stdio}, sync::atomic::Ordering, thread, }; @@ -9,20 +9,59 @@ use crossbeam_channel::Sender; use simplelog::*; use crate::filter::ingest_filter::filter_cmd; -use crate::utils::{stderr_reader, GlobalConfig, Ingest, ProcessControl}; +use crate::utils::{format_log_line, GlobalConfig, Ingest, ProcessControl}; use crate::vec_strings; +fn server_monitor( + level: &str, + buffer: BufReader, + mut proc_ctl: ProcessControl, +) -> Result<(), Error> { + for line in buffer.lines() { + let line = line?; + + if line.contains("[info]") && level.to_lowercase() == "info" { + info!( + "[Server] {}", + format_log_line(line.clone(), "info") + ) + } else if line.contains("[warning]") + && (level.to_lowercase() == "warning" || level.to_lowercase() == "info") + { + warn!( + "[Server] {}", + format_log_line(line.clone(), "warning") + ) + } else if line.contains("[error]") + && !line.contains("Input/output error") + && !line.contains("Broken pipe") + { + error!( + "[Server] {}", + format_log_line(line.clone(), "error") + ); + } + + if line.contains("rtmp") && line.contains("Unexpected stream") { + if let Err(e) = proc_ctl.kill(Ingest) { + error!("{e}"); + }; + } + } + + Ok(()) +} + /// ffmpeg Ingest Server /// /// Start ffmpeg in listen mode, and wait for input. pub fn ingest_server( config: GlobalConfig, - log_format: String, ingest_sender: Sender<(usize, [u8; 65088])>, mut proc_control: ProcessControl, ) -> Result<(), Error> { let mut buffer: [u8; 65088] = [0; 65088]; - let mut server_cmd = vec_strings!["-hide_banner", "-nostats", "-v", log_format]; + let mut server_cmd = vec_strings!["-hide_banner", "-nostats", "-v", "level+info"]; let stream_input = config.ingest.input_cmd.clone().unwrap(); server_cmd.append(&mut stream_input.clone()); @@ -41,6 +80,8 @@ pub fn ingest_server( ); while !proc_control.is_terminated.load(Ordering::SeqCst) { + let proc_ctl = proc_control.clone(); + let level = config.logging.ffmpeg_level.clone(); let mut server_proc = match Command::new("ffmpeg") .args(server_cmd.clone()) .stdout(Stdio::piped()) @@ -55,7 +96,8 @@ pub fn ingest_server( }; let mut ingest_reader = BufReader::new(server_proc.stdout.take().unwrap()); let server_err = BufReader::new(server_proc.stderr.take().unwrap()); - let error_reader_thread = thread::spawn(move || stderr_reader(server_err, "Server")); + let error_reader_thread = + thread::spawn(move || server_monitor(&level, server_err, proc_ctl)); *proc_control.server_term.lock().unwrap() = Some(server_proc); is_running = false; diff --git a/src/output/hls.rs b/src/output/hls.rs index ccfa2b98..7e61ca93 100644 --- a/src/output/hls.rs +++ b/src/output/hls.rs @@ -30,15 +30,11 @@ use simplelog::*; use crate::filter::ingest_filter::filter_cmd; use crate::input::source_generator; use crate::utils::{ - sec_to_time, stderr_reader, Decoder, GlobalConfig, Ingest, PlayerControl, PlayoutStatus, - ProcessControl, + format_log_line, sec_to_time, stderr_reader, Decoder, GlobalConfig, Ingest, PlayerControl, + PlayoutStatus, ProcessControl, }; use crate::vec_strings; -fn format_line(line: String, level: &str) -> String { - line.replace(&format!("[{level: >5}] "), "") -} - /// Ingest Server for HLS fn ingest_to_hls_server( config: GlobalConfig, @@ -66,6 +62,7 @@ fn ingest_to_hls_server( ); loop { + let mut proc_ctl = proc_control.clone(); let mut server_proc = match Command::new("ffmpeg") .args(server_cmd.clone()) .stderr(Stdio::piped()) @@ -103,9 +100,15 @@ fn ingest_to_hls_server( { error!( "[server] {}", - format_line(line.clone(), "error") + format_log_line(line.clone(), "error") ); } + + if line.contains("rtmp") && line.contains("Unexpected stream") { + if let Err(e) = proc_ctl.kill(Ingest) { + error!("{e}"); + }; + } } info!("Switch from live ingest to {}", config.processing.mode); diff --git a/src/output/mod.rs b/src/output/mod.rs index 7ad54712..adf07a5c 100644 --- a/src/output/mod.rs +++ b/src/output/mod.rs @@ -66,7 +66,6 @@ pub fn player( *proc_control.encoder_term.lock().unwrap() = Some(enc_proc); - let ff_log_format_c = ff_log_format.clone(); let proc_control_c = proc_control.clone(); let mut ingest_receiver = None; @@ -74,9 +73,7 @@ pub fn player( if config.ingest.enable { let (ingest_sender, rx) = bounded(96); ingest_receiver = Some(rx); - thread::spawn(move || { - ingest_server(config_clone, ff_log_format_c, ingest_sender, proc_control_c) - }); + thread::spawn(move || ingest_server(config_clone, ingest_sender, proc_control_c)); } 'source_iter: for node in get_source { diff --git a/src/utils/mod.rs b/src/utils/mod.rs index ea007ad8..6664fae6 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -361,31 +361,26 @@ pub fn seek_and_length(src: String, seek: f64, out: f64, duration: f64) -> Vec String { + line.replace(&format!("[{level: >5}] "), "") +} + +/// Read ffmpeg stderr decoder and encoder instance /// and log the output. pub fn stderr_reader(buffer: BufReader, suffix: &str) -> Result<(), Error> { - fn format_line(line: String, level: &str) -> String { - line.replace(&format!("[{level: >5}] "), "") - } - for line in buffer.lines() { let line = line?; if line.contains("[info]") { - info!("[{suffix}] {}", format_line(line, "info")) + info!( + "[{suffix}] {}", + format_log_line(line, "info") + ) } else if line.contains("[warning]") { warn!( "[{suffix}] {}", - format_line(line, "warning") + format_log_line(line, "warning") ) - } else if suffix != "server" - && !line.contains("Input/output error") - && !line.contains("Broken pipe") - { - error!( - "[{suffix}] {}", - format_line(line.clone(), "error") - ); } }