diff --git a/Cargo.lock b/Cargo.lock index 24f249c0..90738ca4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -195,7 +195,7 @@ dependencies = [ [[package]] name = "ffplayout-engine" -version = "0.9.6" +version = "0.9.7" dependencies = [ "chrono", "clap", @@ -251,13 +251,11 @@ dependencies = [ [[package]] name = "flate2" -version = "1.0.23" +version = "1.0.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b39522e96686d38f4bc984b9198e3a0613264abaebaff2c5c918bfa6b6da09af" +checksum = "f82b0f4c27ad9f8bfd1f3208d882da2b09c301bc1c828fd3a00d0216d2fbbff6" dependencies = [ - "cfg-if 1.0.0", "crc32fast", - "libc", "miniz_oxide", ] @@ -498,9 +496,9 @@ checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421" [[package]] name = "hyper" -version = "0.14.18" +version = "0.14.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b26ae0a80afebe130861d90abf98e3814a4f28a4c6ffeb5ab8ebb2be311e0ef2" +checksum = "42dc3c131584288d375f2d07f822b0cb012d8c6fb899a5b9fdb3cb7eb9b6004f" dependencies = [ "bytes", "futures-channel", @@ -532,9 +530,9 @@ dependencies = [ [[package]] name = "indexmap" -version = "1.8.1" +version = "1.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f647032dfaa1f8b6dc29bd3edb7bbef4861b8b8007ebb118d6db284fd59f6ee" +checksum = "e6012d540c5baa3589337a98ce73408de9b5a25ec9fc2c6fd6be8f0d39e0ca5a" dependencies = [ "autocfg", "hashbrown", @@ -959,9 +957,9 @@ dependencies = [ [[package]] name = "os_str_bytes" -version = "6.0.1" +version = "6.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "029d8d0b2f198229de29dca79676f2738ff952edf3fde542eb8bf94d8c21b435" +checksum = "21326818e99cfe6ce1e524c2a805c189a99b5ae555a35d19f9a284b427d86afa" [[package]] name = "paris" diff --git a/Cargo.toml b/Cargo.toml index a81a4e7f..7becd4d6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,7 +4,7 @@ description = "24/7 playout based on rust and ffmpeg" license = "GPL-3.0" authors = ["Jonathan Baecker jonbae77@gmail.com"] readme = "README.md" -version = "0.9.6" +version = "0.9.7" edition = "2021" default-run = "ffplayout" diff --git a/docs/live_ingest.md b/docs/live_ingest.md index 779a970f..f33d14a1 100644 --- a/docs/live_ingest.md +++ b/docs/live_ingest.md @@ -14,8 +14,8 @@ When it notice a incoming stream, it will stop the current playing and continue In rare cases it can happen, that for a short moment after switching the image freezes, but then it will continue. Also a short frame flickering can happen. -You need to know, that **ffmpeg in current version has no authentication mechanism and it just listen to the protocol and port (no path or app name).** +You need to know, that **ffmpeg in current version has no authentication mechanism and it just listen to the protocol and port (no app and stream name).** -For security you should not expose the ingest to the world. You localhost only, with an relay/reverse proxy where you can make your authentication. You could also use a [patch](https://gist.github.com/jb-alvarado/f8ee1e7a3cf5e482e818338f2b62c95f) for ffmpeg, but there is no guarantee if this really works. +ffplayout catches this problem with monitoring the output from ffmpeg. When the input is **rtmp** and the app or stream name differs to the config it stops the ingest process. So in a way we have a bit control, which stream we let come in and which not. In theory you can use every [protocol](https://ffmpeg.org/ffmpeg-protocols.html) from ffmpeg which support a **listen** mode. diff --git a/src/input/ingest.rs b/src/input/ingest.rs index 03934b71..e65b72c2 100644 --- a/src/input/ingest.rs +++ b/src/input/ingest.rs @@ -1,6 +1,6 @@ use std::{ - io::{BufReader, Error, Read}, - process::{Command, Stdio}, + io::{BufRead, BufReader, Error, Read}, + process::{ChildStderr, Command, Stdio}, sync::atomic::Ordering, thread, }; @@ -9,20 +9,68 @@ use crossbeam_channel::Sender; use simplelog::*; use crate::filter::ingest_filter::filter_cmd; -use crate::utils::{stderr_reader, GlobalConfig, Ingest, ProcessControl}; +use crate::utils::{format_log_line, GlobalConfig, Ingest, ProcessControl}; use crate::vec_strings; +pub fn log_line(line: String, level: &str) { + if line.contains("[info]") && level.to_lowercase() == "info" { + info!( + "[Server] {}", + format_log_line(line, "info") + ) + } else if line.contains("[warning]") + && (level.to_lowercase() == "warning" || level.to_lowercase() == "info") + { + warn!( + "[Server] {}", + format_log_line(line, "warning") + ) + } else if line.contains("[error]") + && !line.contains("Input/output error") + && !line.contains("Broken pipe") + { + error!( + "[Server] {}", + format_log_line(line, "error") + ); + } +} + +fn server_monitor( + level: &str, + buffer: BufReader, + mut proc_ctl: ProcessControl, +) -> Result<(), Error> { + for line in buffer.lines() { + let line = line?; + + if line.contains("rtmp") && line.contains("Unexpected stream") { + if let Err(e) = proc_ctl.kill(Ingest) { + error!("{e}"); + }; + + warn!( + "[Server] {}", + format_log_line(line.clone(), "error") + ); + } + + log_line(line, level); + } + + Ok(()) +} + /// ffmpeg Ingest Server /// /// Start ffmpeg in listen mode, and wait for input. pub fn ingest_server( config: GlobalConfig, - log_format: String, ingest_sender: Sender<(usize, [u8; 65088])>, mut proc_control: ProcessControl, ) -> Result<(), Error> { let mut buffer: [u8; 65088] = [0; 65088]; - let mut server_cmd = vec_strings!["-hide_banner", "-nostats", "-v", log_format]; + let mut server_cmd = vec_strings!["-hide_banner", "-nostats", "-v", "level+info"]; let stream_input = config.ingest.input_cmd.clone().unwrap(); server_cmd.append(&mut stream_input.clone()); @@ -41,6 +89,8 @@ pub fn ingest_server( ); while !proc_control.is_terminated.load(Ordering::SeqCst) { + let proc_ctl = proc_control.clone(); + let level = config.logging.ffmpeg_level.clone(); let mut server_proc = match Command::new("ffmpeg") .args(server_cmd.clone()) .stdout(Stdio::piped()) @@ -55,7 +105,8 @@ pub fn ingest_server( }; let mut ingest_reader = BufReader::new(server_proc.stdout.take().unwrap()); let server_err = BufReader::new(server_proc.stderr.take().unwrap()); - let error_reader_thread = thread::spawn(move || stderr_reader(server_err, "Server")); + let error_reader_thread = + thread::spawn(move || server_monitor(&level, server_err, proc_ctl)); *proc_control.server_term.lock().unwrap() = Some(server_proc); is_running = false; diff --git a/src/output/hls.rs b/src/output/hls.rs index ccfa2b98..b8841476 100644 --- a/src/output/hls.rs +++ b/src/output/hls.rs @@ -28,17 +28,13 @@ use std::{ use simplelog::*; use crate::filter::ingest_filter::filter_cmd; -use crate::input::source_generator; +use crate::input::{ingest::log_line, source_generator}; use crate::utils::{ sec_to_time, stderr_reader, Decoder, GlobalConfig, Ingest, PlayerControl, PlayoutStatus, ProcessControl, }; use crate::vec_strings; -fn format_line(line: String, level: &str) -> String { - line.replace(&format!("[{level: >5}] "), "") -} - /// Ingest Server for HLS fn ingest_to_hls_server( config: GlobalConfig, @@ -46,6 +42,7 @@ fn ingest_to_hls_server( mut proc_control: ProcessControl, ) -> Result<(), Error> { let playlist_init = playout_stat.list_init; + let level = config.logging.ffmpeg_level.clone(); let mut server_cmd = vec_strings!["-hide_banner", "-nostats", "-v", "level+info"]; let stream_input = config.ingest.input_cmd.clone().unwrap(); @@ -66,6 +63,7 @@ fn ingest_to_hls_server( ); loop { + let mut proc_ctl = proc_control.clone(); let mut server_proc = match Command::new("ffmpeg") .args(server_cmd.clone()) .stderr(Stdio::piped()) @@ -73,7 +71,7 @@ fn ingest_to_hls_server( { Err(e) => { error!("couldn't spawn ingest server: {e}"); - panic!("couldn't spawn ingest server: {e}") + panic!("couldn't spawn ingest server: {e}"); } Ok(proc) => proc, }; @@ -85,6 +83,12 @@ fn ingest_to_hls_server( for line in server_err.lines() { let line = line?; + if line.contains("rtmp") && line.contains("Unexpected stream") { + if let Err(e) = proc_ctl.kill(Ingest) { + error!("{e}"); + }; + } + if !is_running { proc_control.server_is_running.store(true, Ordering::SeqCst); playlist_init.store(true, Ordering::SeqCst); @@ -97,15 +101,7 @@ fn ingest_to_hls_server( } } - if line.contains("[error]") - && !line.contains("Input/output error") - && !line.contains("Broken pipe") - { - error!( - "[server] {}", - format_line(line.clone(), "error") - ); - } + log_line(line, &level); } info!("Switch from live ingest to {}", config.processing.mode); diff --git a/src/output/mod.rs b/src/output/mod.rs index 7ad54712..adf07a5c 100644 --- a/src/output/mod.rs +++ b/src/output/mod.rs @@ -66,7 +66,6 @@ pub fn player( *proc_control.encoder_term.lock().unwrap() = Some(enc_proc); - let ff_log_format_c = ff_log_format.clone(); let proc_control_c = proc_control.clone(); let mut ingest_receiver = None; @@ -74,9 +73,7 @@ pub fn player( if config.ingest.enable { let (ingest_sender, rx) = bounded(96); ingest_receiver = Some(rx); - thread::spawn(move || { - ingest_server(config_clone, ff_log_format_c, ingest_sender, proc_control_c) - }); + thread::spawn(move || ingest_server(config_clone, ingest_sender, proc_control_c)); } 'source_iter: for node in get_source { diff --git a/src/utils/controller.rs b/src/utils/controller.rs index 510a7a99..f6baa2a8 100644 --- a/src/utils/controller.rs +++ b/src/utils/controller.rs @@ -151,12 +151,6 @@ impl ProcessControl { } } -impl Drop for ProcessControl { - fn drop(&mut self) { - self.kill_all() - } -} - /// Global player control, to get infos about current clip etc. #[derive(Clone)] pub struct PlayerControl { diff --git a/src/utils/mod.rs b/src/utils/mod.rs index ea007ad8..6664fae6 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -361,31 +361,26 @@ pub fn seek_and_length(src: String, seek: f64, out: f64, duration: f64) -> Vec String { + line.replace(&format!("[{level: >5}] "), "") +} + +/// Read ffmpeg stderr decoder and encoder instance /// and log the output. pub fn stderr_reader(buffer: BufReader, suffix: &str) -> Result<(), Error> { - fn format_line(line: String, level: &str) -> String { - line.replace(&format!("[{level: >5}] "), "") - } - for line in buffer.lines() { let line = line?; if line.contains("[info]") { - info!("[{suffix}] {}", format_line(line, "info")) + info!( + "[{suffix}] {}", + format_log_line(line, "info") + ) } else if line.contains("[warning]") { warn!( "[{suffix}] {}", - format_line(line, "warning") + format_log_line(line, "warning") ) - } else if suffix != "server" - && !line.contains("Input/output error") - && !line.contains("Broken pipe") - { - error!( - "[{suffix}] {}", - format_line(line.clone(), "error") - ); } }