add live ingest switch to hls mode

This commit is contained in:
jb-alvarado 2022-05-02 22:49:41 +02:00
parent 42ec3ef9f9
commit 98881eda9d
8 changed files with 230 additions and 104 deletions

View File

@ -62,6 +62,7 @@ processing:
logo_opacity: 0.7 logo_opacity: 0.7
logo_filter: overlay=W-w-12:12 logo_filter: overlay=W-w-12:12
add_loudnorm: false add_loudnorm: false
loudnorm_ingest: false
loud_i: -18 loud_i: -18
loud_tp: -1.5 loud_tp: -1.5
loud_lra: 11 loud_lra: 11

11
src/filter/a_loudnorm.rs Normal file
View File

@ -0,0 +1,11 @@
use crate::utils::GlobalConfig;
/// Loudnorm Audio Filter
///
/// Add loudness normalization.
pub fn filter_node(config: &GlobalConfig) -> String {
format!(
",loudnorm=I={}:TP={}:LRA={}",
config.processing.loud_i, config.processing.loud_tp, config.processing.loud_lra
)
}

View File

@ -0,0 +1,47 @@
use crate::filter::{a_loudnorm, v_overlay};
use crate::utils::GlobalConfig;
/// Audio Filter
///
/// If needed we add audio filters to the server instance.
fn audio_filter(config: &GlobalConfig) -> String {
let mut audio_chain = ";[0:a]afade=in:st=0:d=0.5".to_string();
if config.processing.loudnorm_ingest {
audio_chain.push_str(&a_loudnorm::filter_node(config));
}
if config.processing.volume != 1.0 {
audio_chain.push_str(format!(",volume={}", config.processing.volume).as_str());
}
audio_chain.push_str("[aout1]");
audio_chain
}
/// Create filter nodes for ingest live stream.
pub fn filter_cmd() -> Vec<String> {
let config = GlobalConfig::global();
let mut filter = format!(
"[0:v]fps={},scale={}:{},setdar=dar={},fade=in:st=0:d=0.5",
config.processing.fps,
config.processing.width,
config.processing.height,
config.processing.aspect
);
filter.push_str(&v_overlay::filter_node(config, true));
filter.push_str("[vout1]");
filter.push_str(audio_filter(config).as_str());
vec![
"-filter_complex".to_string(),
filter,
"-map".to_string(),
"[vout1]".to_string(),
"-map".to_string(),
"[aout1]".to_string(),
]
}

View File

@ -2,7 +2,10 @@ use std::path::Path;
use simplelog::*; use simplelog::*;
pub mod a_loudnorm;
pub mod ingest_filter;
pub mod v_drawtext; pub mod v_drawtext;
pub mod v_overlay;
use crate::utils::{get_delta, is_close, GlobalConfig, Media}; use crate::utils::{get_delta, is_close, GlobalConfig, Media};
@ -62,9 +65,9 @@ impl Filters {
} }
} }
fn deinterlace(field_order: Option<String>, chain: &mut Filters) { fn deinterlace(field_order: &Option<String>, chain: &mut Filters) {
if let Some(order) = field_order { if let Some(order) = field_order {
if &order != "progressive" { if order != "progressive" {
chain.add_filter("yadif=0:-1:0", "video") chain.add_filter("yadif=0:-1:0", "video")
} }
} }
@ -138,15 +141,7 @@ fn overlay(node: &mut Media, chain: &mut Filters, config: &GlobalConfig) {
&& Path::new(&config.processing.logo).is_file() && Path::new(&config.processing.logo).is_file()
&& &node.category.clone().unwrap_or_default() != "advertisement" && &node.category.clone().unwrap_or_default() != "advertisement"
{ {
let opacity = format!( let mut logo_chain = v_overlay::filter_node(config, false);
"format=rgba,colorchannelmixer=aa={}",
config.processing.logo_opacity
);
let logo_loop = "loop=loop=-1:size=1:start=0";
let mut logo_chain = format!(
"null[v];movie={},{logo_loop},{opacity}",
config.processing.logo
);
if node.last_ad.unwrap() { if node.last_ad.unwrap() {
logo_chain.push_str(",fade=in:st=0:d=1.0:alpha=1") logo_chain.push_str(",fade=in:st=0:d=1.0:alpha=1")
@ -228,9 +223,8 @@ fn extend_audio(node: &mut Media, chain: &mut Filters) {
} }
} }
/// Add single pass loudnorm filter to audio line.
fn add_loudnorm(node: &mut Media, chain: &mut Filters, config: &GlobalConfig) { fn add_loudnorm(node: &mut Media, chain: &mut Filters, config: &GlobalConfig) {
// add single pass loudnorm filter to audio line
if node.probe.is_some() if node.probe.is_some()
&& !node && !node
.probe .probe
@ -241,11 +235,7 @@ fn add_loudnorm(node: &mut Media, chain: &mut Filters, config: &GlobalConfig) {
.is_empty() .is_empty()
&& config.processing.add_loudnorm && config.processing.add_loudnorm
{ {
let loud_filter = format!( let loud_filter = a_loudnorm::filter_node(config);
"loudnorm=I={}:TP={}:LRA={}",
config.processing.loud_i, config.processing.loud_tp, config.processing.loud_lra
);
chain.add_filter(&loud_filter, "audio"); chain.add_filter(&loud_filter, "audio");
} }
} }
@ -256,16 +246,20 @@ fn audio_volume(chain: &mut Filters, config: &GlobalConfig) {
} }
} }
fn aspect_calc(aspect_string: String) -> f64 { fn aspect_calc(aspect_string: &Option<String>, config: &GlobalConfig) -> f64 {
let aspect_vec: Vec<&str> = aspect_string.split(':').collect(); let mut source_aspect = config.processing.aspect;
let w: f64 = aspect_vec[0].parse().unwrap();
let h: f64 = aspect_vec[1].parse().unwrap(); if let Some(aspect) = aspect_string {
let source_aspect: f64 = w as f64 / h as f64; let aspect_vec: Vec<&str> = aspect.split(':').collect();
let w: f64 = aspect_vec[0].parse().unwrap();
let h: f64 = aspect_vec[1].parse().unwrap();
source_aspect = w as f64 / h as f64;
}
source_aspect source_aspect
} }
fn fps_calc(r_frame_rate: String) -> f64 { fn fps_calc(r_frame_rate: &str) -> f64 {
let frame_rate_vec: Vec<&str> = r_frame_rate.split('/').collect(); let frame_rate_vec: Vec<&str> = r_frame_rate.split('/').collect();
let rate: f64 = frame_rate_vec[0].parse().unwrap(); let rate: f64 = frame_rate_vec[0].parse().unwrap();
let factor: f64 = frame_rate_vec[1].parse().unwrap(); let factor: f64 = frame_rate_vec[1].parse().unwrap();
@ -314,10 +308,10 @@ pub fn filter_chains(node: &mut Media) -> Vec<String> {
} }
let v_stream = &probe.video_streams.unwrap()[0]; let v_stream = &probe.video_streams.unwrap()[0];
let aspect = aspect_calc(v_stream.display_aspect_ratio.clone().unwrap()); let aspect = aspect_calc(&v_stream.display_aspect_ratio, config);
let frame_per_sec = fps_calc(v_stream.r_frame_rate.clone()); let frame_per_sec = fps_calc(&v_stream.r_frame_rate);
deinterlace(v_stream.field_order.clone(), &mut filters); deinterlace(&v_stream.field_order, &mut filters);
pad(aspect, &mut filters, config); pad(aspect, &mut filters, config);
fps(frame_per_sec, &mut filters, config); fps(frame_per_sec, &mut filters, config);
scale( scale(

27
src/filter/v_overlay.rs Normal file
View File

@ -0,0 +1,27 @@
use std::path::Path;
use crate::utils::GlobalConfig;
/// Overlay Filter
///
/// When a logo is set, we create here the filter for the server.
pub fn filter_node(config: &GlobalConfig, add_tail: bool) -> String {
let mut logo_chain = String::new();
if config.processing.add_logo && Path::new(&config.processing.logo).is_file() {
let opacity = format!(
"format=rgba,colorchannelmixer=aa={}",
config.processing.logo_opacity
);
let logo_loop = "loop=loop=-1:size=1:start=0";
logo_chain = format!("[v];movie={},{logo_loop},{opacity}", config.processing.logo);
if add_tail {
logo_chain.push_str(
format!("[l];[v][l]{}:shortest=1", config.processing.logo_filter).as_str(),
);
}
}
logo_chain
}

View File

@ -1,6 +1,5 @@
use std::{ use std::{
io::{BufReader, Error, Read}, io::{BufReader, Error, Read},
path::Path,
process::{Command, Stdio}, process::{Command, Stdio},
sync::atomic::Ordering, sync::atomic::Ordering,
thread, thread,
@ -9,54 +8,9 @@ use std::{
use crossbeam_channel::Sender; use crossbeam_channel::Sender;
use simplelog::*; use simplelog::*;
use crate::filter::ingest_filter::filter_cmd;
use crate::utils::{stderr_reader, GlobalConfig, Ingest, ProcessControl}; use crate::utils::{stderr_reader, GlobalConfig, Ingest, ProcessControl};
/// Overlay Filter
///
/// When a logo is set, we create here the filter for the server.
fn overlay(config: &GlobalConfig) -> String {
let mut logo_chain = String::new();
if config.processing.add_logo && Path::new(&config.processing.logo).is_file() {
let opacity = format!(
"format=rgba,colorchannelmixer=aa={}",
config.processing.logo_opacity
);
let logo_loop = "loop=loop=-1:size=1:start=0";
logo_chain = format!("[v];movie={},{logo_loop},{opacity}", config.processing.logo);
logo_chain
.push_str(format!("[l];[v][l]{}:shortest=1", config.processing.logo_filter).as_str());
}
logo_chain
}
/// Audio Filter
///
/// If needed we add audio filters to the server instance.
fn audio_filter(config: &GlobalConfig) -> String {
let mut audio_chain = ";[0:a]afade=in:st=0:d=0.5".to_string();
if config.processing.add_loudnorm {
audio_chain.push_str(
format!(
",loudnorm=I={}:TP={}:LRA={}",
config.processing.loud_i, config.processing.loud_tp, config.processing.loud_lra
)
.as_str(),
);
}
if config.processing.volume != 1.0 {
audio_chain.push_str(format!(",volume={}", config.processing.volume).as_str());
}
audio_chain.push_str("[aout1]");
audio_chain
}
/// ffmpeg Ingest Server /// ffmpeg Ingest Server
/// ///
/// Start ffmpeg in listen mode, and wait for input. /// Start ffmpeg in listen mode, and wait for input.
@ -67,32 +21,14 @@ pub fn ingest_server(
) -> Result<(), Error> { ) -> Result<(), Error> {
let config = GlobalConfig::global(); let config = GlobalConfig::global();
let mut buffer: [u8; 65088] = [0; 65088]; let mut buffer: [u8; 65088] = [0; 65088];
let mut filter = format!( let filter_list = filter_cmd();
"[0:v]fps={},scale={}:{},setdar=dar={},fade=in:st=0:d=0.5",
config.processing.fps,
config.processing.width,
config.processing.height,
config.processing.aspect
);
filter.push_str(&overlay(config));
filter.push_str("[vout1]");
filter.push_str(audio_filter(config).as_str());
let mut filter_list = vec![
"-filter_complex",
&filter,
"-map",
"[vout1]",
"-map",
"[aout1]",
];
let mut server_cmd = vec!["-hide_banner", "-nostats", "-v", log_format.as_str()]; let mut server_cmd = vec!["-hide_banner", "-nostats", "-v", log_format.as_str()];
let stream_input = config.ingest.input_cmd.clone().unwrap(); let stream_input = config.ingest.input_cmd.clone().unwrap();
let stream_settings = config.processing.settings.clone().unwrap(); let stream_settings = config.processing.settings.clone().unwrap();
server_cmd.append(&mut stream_input.iter().map(String::as_str).collect()); server_cmd.append(&mut stream_input.iter().map(String::as_str).collect());
server_cmd.append(&mut filter_list); server_cmd.append(&mut filter_list.iter().map(String::as_str).collect());
server_cmd.append(&mut stream_settings.iter().map(String::as_str).collect()); server_cmd.append(&mut stream_settings.iter().map(String::as_str).collect());
let mut is_running; let mut is_running;

View File

@ -18,29 +18,129 @@ out:
*/ */
use std::{ use std::{
io::BufReader, io::{BufRead, BufReader, Error},
process::{Command, Stdio}, process::{Command, Stdio},
thread, sync::atomic::Ordering,
thread::{self, sleep},
time::Duration,
}; };
use simplelog::*; use simplelog::*;
use crate::filter::ingest_filter::filter_cmd;
use crate::input::source_generator; use crate::input::source_generator;
use crate::utils::{ use crate::utils::{
sec_to_time, stderr_reader, GlobalConfig, PlayerControl, PlayoutStatus, ProcessControl, sec_to_time, stderr_reader, Decoder, GlobalConfig, Ingest, PlayerControl, PlayoutStatus,
ProcessControl,
}; };
fn format_line(line: String, level: &str) -> String {
line.replace(&format!("[{level: >5}] "), "")
}
/// Ingest Server for HLS
fn ingest_to_hls_server(
playout_stat: PlayoutStatus,
mut proc_control: ProcessControl,
) -> Result<(), Error> {
let config = GlobalConfig::global();
let dec_settings = config.out.clone().output_cmd.unwrap();
let playlist_init = playout_stat.list_init;
let filter_list = filter_cmd();
let mut server_cmd = vec!["-hide_banner", "-nostats", "-v", "level+info"];
let stream_input = config.ingest.input_cmd.clone().unwrap();
server_cmd.append(&mut stream_input.iter().map(String::as_str).collect());
server_cmd.append(&mut filter_list.iter().map(String::as_str).collect());
server_cmd.append(&mut dec_settings.iter().map(String::as_str).collect());
let mut is_running;
info!(
"Start ingest server, listening on: <b><magenta>{}</></b>",
stream_input.last().unwrap()
);
debug!(
"Server CMD: <bright-blue>\"ffmpeg {}\"</>",
server_cmd.join(" ")
);
loop {
let mut server_proc = match Command::new("ffmpeg")
.args(server_cmd.clone())
.stderr(Stdio::piped())
.spawn()
{
Err(e) => {
error!("couldn't spawn ingest server: {e}");
panic!("couldn't spawn ingest server: {e}")
}
Ok(proc) => proc,
};
let server_err = BufReader::new(server_proc.stderr.take().unwrap());
*proc_control.server_term.lock().unwrap() = Some(server_proc);
is_running = false;
for line in server_err.lines() {
let line = line?;
if !is_running {
proc_control.server_is_running.store(true, Ordering::SeqCst);
playlist_init.store(true, Ordering::SeqCst);
is_running = true;
info!("Switch from {} to live ingest", config.processing.mode);
if let Err(e) = proc_control.kill(Decoder) {
error!("{e}");
}
}
if line.contains("[error]")
&& !line.contains("Input/output error")
&& !line.contains("Broken pipe")
{
error!(
"<bright black>[server]</> {}",
format_line(line.clone(), "error")
);
}
}
info!("Switch from live ingest to {}", config.processing.mode);
proc_control
.server_is_running
.store(false, Ordering::SeqCst);
if let Err(e) = proc_control.wait(Ingest) {
error!("{e}")
}
if proc_control.is_terminated.load(Ordering::SeqCst) {
break;
}
}
Ok(())
}
/// HLS Writer /// HLS Writer
/// ///
/// Write with single ffmpeg instance directly to a HLS playlist. /// Write with single ffmpeg instance directly to a HLS playlist.
pub fn write_hls( pub fn write_hls(
play_control: PlayerControl, play_control: PlayerControl,
playout_stat: PlayoutStatus, playout_stat: PlayoutStatus,
proc_control: ProcessControl, mut proc_control: ProcessControl,
) { ) {
let config = GlobalConfig::global(); let config = GlobalConfig::global();
let dec_settings = config.out.clone().output_cmd.unwrap(); let dec_settings = config.out.clone().output_cmd.unwrap();
let ff_log_format = format!("level+{}", config.logging.ffmpeg_level.to_lowercase()); let ff_log_format = format!("level+{}", config.logging.ffmpeg_level.to_lowercase());
let play_stat = playout_stat.clone();
let proc_control_c = proc_control.clone();
let get_source = source_generator( let get_source = source_generator(
config.clone(), config.clone(),
@ -50,6 +150,11 @@ pub fn write_hls(
proc_control.is_terminated.clone(), proc_control.is_terminated.clone(),
); );
// spawn a thread for ffmpeg ingest server and create a channel for package sending
if config.ingest.enable {
thread::spawn(move || ingest_to_hls_server(play_stat, proc_control_c));
}
for node in get_source { for node in get_source {
*play_control.current_media.lock().unwrap() = Some(node.clone()); *play_control.current_media.lock().unwrap() = Some(node.clone());
@ -96,14 +201,18 @@ pub fn write_hls(
}; };
let dec_err = BufReader::new(dec_proc.stderr.take().unwrap()); let dec_err = BufReader::new(dec_proc.stderr.take().unwrap());
let error_decoder_thread = thread::spawn(move || stderr_reader(dec_err, "Writer")); *proc_control.decoder_term.lock().unwrap() = Some(dec_proc);
if let Err(e) = dec_proc.wait() { if let Err(e) = stderr_reader(dec_err, "Writer") {
error!("Writer: {e}") error!("{e:?}")
}; };
if let Err(e) = error_decoder_thread.join() { if let Err(e) = proc_control.wait(Decoder) {
error!("{e:?}"); error!("{e}");
}; }
while proc_control.server_is_running.load(Ordering::SeqCst) {
sleep(Duration::from_secs(1));
}
} }
} }

View File

@ -81,6 +81,7 @@ pub struct Processing {
pub logo_opacity: f32, pub logo_opacity: f32,
pub logo_filter: String, pub logo_filter: String,
pub add_loudnorm: bool, pub add_loudnorm: bool,
pub loudnorm_ingest: bool,
pub loud_i: f32, pub loud_i: f32,
pub loud_tp: f32, pub loud_tp: f32,
pub loud_lra: f32, pub loud_lra: f32,