From 98881eda9df4b25370bec8ce48c4586b340d2315 Mon Sep 17 00:00:00 2001 From: jb-alvarado Date: Mon, 2 May 2022 22:49:41 +0200 Subject: [PATCH] add live ingest switch to hls mode --- assets/ffplayout.yml | 1 + src/filter/a_loudnorm.rs | 11 +++ src/filter/ingest_filter.rs | 47 +++++++++++++ src/filter/mod.rs | 48 ++++++-------- src/filter/v_overlay.rs | 27 ++++++++ src/input/ingest.rs | 70 +------------------ src/output/hls.rs | 129 +++++++++++++++++++++++++++++++++--- src/utils/config.rs | 1 + 8 files changed, 230 insertions(+), 104 deletions(-) create mode 100644 src/filter/a_loudnorm.rs create mode 100644 src/filter/ingest_filter.rs create mode 100644 src/filter/v_overlay.rs diff --git a/assets/ffplayout.yml b/assets/ffplayout.yml index 44697edb..e121438a 100644 --- a/assets/ffplayout.yml +++ b/assets/ffplayout.yml @@ -62,6 +62,7 @@ processing: logo_opacity: 0.7 logo_filter: overlay=W-w-12:12 add_loudnorm: false + loudnorm_ingest: false loud_i: -18 loud_tp: -1.5 loud_lra: 11 diff --git a/src/filter/a_loudnorm.rs b/src/filter/a_loudnorm.rs new file mode 100644 index 00000000..2d503372 --- /dev/null +++ b/src/filter/a_loudnorm.rs @@ -0,0 +1,11 @@ +use crate::utils::GlobalConfig; + +/// Loudnorm Audio Filter +/// +/// Add loudness normalization. +pub fn filter_node(config: &GlobalConfig) -> String { + format!( + ",loudnorm=I={}:TP={}:LRA={}", + config.processing.loud_i, config.processing.loud_tp, config.processing.loud_lra + ) +} diff --git a/src/filter/ingest_filter.rs b/src/filter/ingest_filter.rs new file mode 100644 index 00000000..048391d9 --- /dev/null +++ b/src/filter/ingest_filter.rs @@ -0,0 +1,47 @@ +use crate::filter::{a_loudnorm, v_overlay}; +use crate::utils::GlobalConfig; + +/// Audio Filter +/// +/// If needed we add audio filters to the server instance. +fn audio_filter(config: &GlobalConfig) -> String { + let mut audio_chain = ";[0:a]afade=in:st=0:d=0.5".to_string(); + + if config.processing.loudnorm_ingest { + audio_chain.push_str(&a_loudnorm::filter_node(config)); + } + + if config.processing.volume != 1.0 { + audio_chain.push_str(format!(",volume={}", config.processing.volume).as_str()); + } + + audio_chain.push_str("[aout1]"); + + audio_chain +} + +/// Create filter nodes for ingest live stream. +pub fn filter_cmd() -> Vec { + let config = GlobalConfig::global(); + + let mut filter = format!( + "[0:v]fps={},scale={}:{},setdar=dar={},fade=in:st=0:d=0.5", + config.processing.fps, + config.processing.width, + config.processing.height, + config.processing.aspect + ); + + filter.push_str(&v_overlay::filter_node(config, true)); + filter.push_str("[vout1]"); + filter.push_str(audio_filter(config).as_str()); + + vec![ + "-filter_complex".to_string(), + filter, + "-map".to_string(), + "[vout1]".to_string(), + "-map".to_string(), + "[aout1]".to_string(), + ] +} diff --git a/src/filter/mod.rs b/src/filter/mod.rs index b18b54ea..9f64c76c 100644 --- a/src/filter/mod.rs +++ b/src/filter/mod.rs @@ -2,7 +2,10 @@ use std::path::Path; use simplelog::*; +pub mod a_loudnorm; +pub mod ingest_filter; pub mod v_drawtext; +pub mod v_overlay; use crate::utils::{get_delta, is_close, GlobalConfig, Media}; @@ -62,9 +65,9 @@ impl Filters { } } -fn deinterlace(field_order: Option, chain: &mut Filters) { +fn deinterlace(field_order: &Option, chain: &mut Filters) { if let Some(order) = field_order { - if &order != "progressive" { + if order != "progressive" { chain.add_filter("yadif=0:-1:0", "video") } } @@ -138,15 +141,7 @@ fn overlay(node: &mut Media, chain: &mut Filters, config: &GlobalConfig) { && Path::new(&config.processing.logo).is_file() && &node.category.clone().unwrap_or_default() != "advertisement" { - let opacity = format!( - "format=rgba,colorchannelmixer=aa={}", - config.processing.logo_opacity - ); - let logo_loop = "loop=loop=-1:size=1:start=0"; - let mut logo_chain = format!( - "null[v];movie={},{logo_loop},{opacity}", - config.processing.logo - ); + let mut logo_chain = v_overlay::filter_node(config, false); if node.last_ad.unwrap() { logo_chain.push_str(",fade=in:st=0:d=1.0:alpha=1") @@ -228,9 +223,8 @@ fn extend_audio(node: &mut Media, chain: &mut Filters) { } } +/// Add single pass loudnorm filter to audio line. fn add_loudnorm(node: &mut Media, chain: &mut Filters, config: &GlobalConfig) { - // add single pass loudnorm filter to audio line - if node.probe.is_some() && !node .probe @@ -241,11 +235,7 @@ fn add_loudnorm(node: &mut Media, chain: &mut Filters, config: &GlobalConfig) { .is_empty() && config.processing.add_loudnorm { - let loud_filter = format!( - "loudnorm=I={}:TP={}:LRA={}", - config.processing.loud_i, config.processing.loud_tp, config.processing.loud_lra - ); - + let loud_filter = a_loudnorm::filter_node(config); chain.add_filter(&loud_filter, "audio"); } } @@ -256,16 +246,20 @@ fn audio_volume(chain: &mut Filters, config: &GlobalConfig) { } } -fn aspect_calc(aspect_string: String) -> f64 { - let aspect_vec: Vec<&str> = aspect_string.split(':').collect(); - let w: f64 = aspect_vec[0].parse().unwrap(); - let h: f64 = aspect_vec[1].parse().unwrap(); - let source_aspect: f64 = w as f64 / h as f64; +fn aspect_calc(aspect_string: &Option, config: &GlobalConfig) -> f64 { + let mut source_aspect = config.processing.aspect; + + if let Some(aspect) = aspect_string { + let aspect_vec: Vec<&str> = aspect.split(':').collect(); + let w: f64 = aspect_vec[0].parse().unwrap(); + let h: f64 = aspect_vec[1].parse().unwrap(); + source_aspect = w as f64 / h as f64; + } source_aspect } -fn fps_calc(r_frame_rate: String) -> f64 { +fn fps_calc(r_frame_rate: &str) -> f64 { let frame_rate_vec: Vec<&str> = r_frame_rate.split('/').collect(); let rate: f64 = frame_rate_vec[0].parse().unwrap(); let factor: f64 = frame_rate_vec[1].parse().unwrap(); @@ -314,10 +308,10 @@ pub fn filter_chains(node: &mut Media) -> Vec { } let v_stream = &probe.video_streams.unwrap()[0]; - let aspect = aspect_calc(v_stream.display_aspect_ratio.clone().unwrap()); - let frame_per_sec = fps_calc(v_stream.r_frame_rate.clone()); + let aspect = aspect_calc(&v_stream.display_aspect_ratio, config); + let frame_per_sec = fps_calc(&v_stream.r_frame_rate); - deinterlace(v_stream.field_order.clone(), &mut filters); + deinterlace(&v_stream.field_order, &mut filters); pad(aspect, &mut filters, config); fps(frame_per_sec, &mut filters, config); scale( diff --git a/src/filter/v_overlay.rs b/src/filter/v_overlay.rs new file mode 100644 index 00000000..b3f2d62a --- /dev/null +++ b/src/filter/v_overlay.rs @@ -0,0 +1,27 @@ +use std::path::Path; + +use crate::utils::GlobalConfig; + +/// Overlay Filter +/// +/// When a logo is set, we create here the filter for the server. +pub fn filter_node(config: &GlobalConfig, add_tail: bool) -> String { + let mut logo_chain = String::new(); + + if config.processing.add_logo && Path::new(&config.processing.logo).is_file() { + let opacity = format!( + "format=rgba,colorchannelmixer=aa={}", + config.processing.logo_opacity + ); + let logo_loop = "loop=loop=-1:size=1:start=0"; + logo_chain = format!("[v];movie={},{logo_loop},{opacity}", config.processing.logo); + + if add_tail { + logo_chain.push_str( + format!("[l];[v][l]{}:shortest=1", config.processing.logo_filter).as_str(), + ); + } + } + + logo_chain +} diff --git a/src/input/ingest.rs b/src/input/ingest.rs index 5138944b..05961723 100644 --- a/src/input/ingest.rs +++ b/src/input/ingest.rs @@ -1,6 +1,5 @@ use std::{ io::{BufReader, Error, Read}, - path::Path, process::{Command, Stdio}, sync::atomic::Ordering, thread, @@ -9,54 +8,9 @@ use std::{ use crossbeam_channel::Sender; use simplelog::*; +use crate::filter::ingest_filter::filter_cmd; use crate::utils::{stderr_reader, GlobalConfig, Ingest, ProcessControl}; -/// Overlay Filter -/// -/// When a logo is set, we create here the filter for the server. -fn overlay(config: &GlobalConfig) -> String { - let mut logo_chain = String::new(); - - if config.processing.add_logo && Path::new(&config.processing.logo).is_file() { - let opacity = format!( - "format=rgba,colorchannelmixer=aa={}", - config.processing.logo_opacity - ); - let logo_loop = "loop=loop=-1:size=1:start=0"; - logo_chain = format!("[v];movie={},{logo_loop},{opacity}", config.processing.logo); - - logo_chain - .push_str(format!("[l];[v][l]{}:shortest=1", config.processing.logo_filter).as_str()); - } - - logo_chain -} - -/// Audio Filter -/// -/// If needed we add audio filters to the server instance. -fn audio_filter(config: &GlobalConfig) -> String { - let mut audio_chain = ";[0:a]afade=in:st=0:d=0.5".to_string(); - - if config.processing.add_loudnorm { - audio_chain.push_str( - format!( - ",loudnorm=I={}:TP={}:LRA={}", - config.processing.loud_i, config.processing.loud_tp, config.processing.loud_lra - ) - .as_str(), - ); - } - - if config.processing.volume != 1.0 { - audio_chain.push_str(format!(",volume={}", config.processing.volume).as_str()); - } - - audio_chain.push_str("[aout1]"); - - audio_chain -} - /// ffmpeg Ingest Server /// /// Start ffmpeg in listen mode, and wait for input. @@ -67,32 +21,14 @@ pub fn ingest_server( ) -> Result<(), Error> { let config = GlobalConfig::global(); let mut buffer: [u8; 65088] = [0; 65088]; - let mut filter = format!( - "[0:v]fps={},scale={}:{},setdar=dar={},fade=in:st=0:d=0.5", - config.processing.fps, - config.processing.width, - config.processing.height, - config.processing.aspect - ); - - filter.push_str(&overlay(config)); - filter.push_str("[vout1]"); - filter.push_str(audio_filter(config).as_str()); - let mut filter_list = vec![ - "-filter_complex", - &filter, - "-map", - "[vout1]", - "-map", - "[aout1]", - ]; + let filter_list = filter_cmd(); let mut server_cmd = vec!["-hide_banner", "-nostats", "-v", log_format.as_str()]; let stream_input = config.ingest.input_cmd.clone().unwrap(); let stream_settings = config.processing.settings.clone().unwrap(); server_cmd.append(&mut stream_input.iter().map(String::as_str).collect()); - server_cmd.append(&mut filter_list); + server_cmd.append(&mut filter_list.iter().map(String::as_str).collect()); server_cmd.append(&mut stream_settings.iter().map(String::as_str).collect()); let mut is_running; diff --git a/src/output/hls.rs b/src/output/hls.rs index 1dfb92cf..deabfbf2 100644 --- a/src/output/hls.rs +++ b/src/output/hls.rs @@ -18,29 +18,129 @@ out: */ use std::{ - io::BufReader, + io::{BufRead, BufReader, Error}, process::{Command, Stdio}, - thread, + sync::atomic::Ordering, + thread::{self, sleep}, + time::Duration, }; use simplelog::*; +use crate::filter::ingest_filter::filter_cmd; use crate::input::source_generator; use crate::utils::{ - sec_to_time, stderr_reader, GlobalConfig, PlayerControl, PlayoutStatus, ProcessControl, + sec_to_time, stderr_reader, Decoder, GlobalConfig, Ingest, PlayerControl, PlayoutStatus, + ProcessControl, }; +fn format_line(line: String, level: &str) -> String { + line.replace(&format!("[{level: >5}] "), "") +} + +/// Ingest Server for HLS +fn ingest_to_hls_server( + playout_stat: PlayoutStatus, + mut proc_control: ProcessControl, +) -> Result<(), Error> { + let config = GlobalConfig::global(); + let dec_settings = config.out.clone().output_cmd.unwrap(); + let playlist_init = playout_stat.list_init; + let filter_list = filter_cmd(); + + let mut server_cmd = vec!["-hide_banner", "-nostats", "-v", "level+info"]; + let stream_input = config.ingest.input_cmd.clone().unwrap(); + + server_cmd.append(&mut stream_input.iter().map(String::as_str).collect()); + server_cmd.append(&mut filter_list.iter().map(String::as_str).collect()); + server_cmd.append(&mut dec_settings.iter().map(String::as_str).collect()); + + let mut is_running; + + info!( + "Start ingest server, listening on: {}", + stream_input.last().unwrap() + ); + + debug!( + "Server CMD: \"ffmpeg {}\"", + server_cmd.join(" ") + ); + + loop { + let mut server_proc = match Command::new("ffmpeg") + .args(server_cmd.clone()) + .stderr(Stdio::piped()) + .spawn() + { + Err(e) => { + error!("couldn't spawn ingest server: {e}"); + panic!("couldn't spawn ingest server: {e}") + } + Ok(proc) => proc, + }; + + let server_err = BufReader::new(server_proc.stderr.take().unwrap()); + *proc_control.server_term.lock().unwrap() = Some(server_proc); + is_running = false; + + for line in server_err.lines() { + let line = line?; + + if !is_running { + proc_control.server_is_running.store(true, Ordering::SeqCst); + playlist_init.store(true, Ordering::SeqCst); + is_running = true; + + info!("Switch from {} to live ingest", config.processing.mode); + + if let Err(e) = proc_control.kill(Decoder) { + error!("{e}"); + } + } + + if line.contains("[error]") + && !line.contains("Input/output error") + && !line.contains("Broken pipe") + { + error!( + "[server] {}", + format_line(line.clone(), "error") + ); + } + } + + info!("Switch from live ingest to {}", config.processing.mode); + + proc_control + .server_is_running + .store(false, Ordering::SeqCst); + + if let Err(e) = proc_control.wait(Ingest) { + error!("{e}") + } + + if proc_control.is_terminated.load(Ordering::SeqCst) { + break; + } + } + + Ok(()) +} + /// HLS Writer /// /// Write with single ffmpeg instance directly to a HLS playlist. pub fn write_hls( play_control: PlayerControl, playout_stat: PlayoutStatus, - proc_control: ProcessControl, + mut proc_control: ProcessControl, ) { let config = GlobalConfig::global(); let dec_settings = config.out.clone().output_cmd.unwrap(); let ff_log_format = format!("level+{}", config.logging.ffmpeg_level.to_lowercase()); + let play_stat = playout_stat.clone(); + let proc_control_c = proc_control.clone(); let get_source = source_generator( config.clone(), @@ -50,6 +150,11 @@ pub fn write_hls( proc_control.is_terminated.clone(), ); + // spawn a thread for ffmpeg ingest server and create a channel for package sending + if config.ingest.enable { + thread::spawn(move || ingest_to_hls_server(play_stat, proc_control_c)); + } + for node in get_source { *play_control.current_media.lock().unwrap() = Some(node.clone()); @@ -96,14 +201,18 @@ pub fn write_hls( }; let dec_err = BufReader::new(dec_proc.stderr.take().unwrap()); - let error_decoder_thread = thread::spawn(move || stderr_reader(dec_err, "Writer")); + *proc_control.decoder_term.lock().unwrap() = Some(dec_proc); - if let Err(e) = dec_proc.wait() { - error!("Writer: {e}") + if let Err(e) = stderr_reader(dec_err, "Writer") { + error!("{e:?}") }; - if let Err(e) = error_decoder_thread.join() { - error!("{e:?}"); - }; + if let Err(e) = proc_control.wait(Decoder) { + error!("{e}"); + } + + while proc_control.server_is_running.load(Ordering::SeqCst) { + sleep(Duration::from_secs(1)); + } } } diff --git a/src/utils/config.rs b/src/utils/config.rs index e5039a56..c9321a38 100644 --- a/src/utils/config.rs +++ b/src/utils/config.rs @@ -81,6 +81,7 @@ pub struct Processing { pub logo_opacity: f32, pub logo_filter: String, pub add_loudnorm: bool, + pub loudnorm_ingest: bool, pub loud_i: f32, pub loud_tp: f32, pub loud_lra: f32,