From a9dae4ee5b160bb32d01911978c6e77f05ef90b8 Mon Sep 17 00:00:00 2001 From: jb-alvarado Date: Tue, 31 May 2022 12:09:08 +0200 Subject: [PATCH] support multiple outputs for streaming output and hls mode --- README.md | 1 + docs/multiple_outputs.md | 46 ++++++++++++++++++ src/output/hls.rs | 51 ++++++++++++-------- src/output/stream.rs | 75 +++++------------------------ src/tests/utils/mod.rs | 64 +++++++++++++++++++++++++ src/utils/mod.rs | 101 ++++++++++++++++++++++++++++++++------- 6 files changed, 238 insertions(+), 100 deletions(-) create mode 100644 docs/multiple_outputs.md diff --git a/README.md b/README.md index 0add514b..92c215f3 100644 --- a/README.md +++ b/README.md @@ -42,6 +42,7 @@ The main purpose of ffplayout is to provide a 24/7 broadcasting solution that pl - **HLS** - JSON RPC server, for getting infos about current playing and controlling - [live ingest](/docs/live_ingest.md) +- [multiple outputs](/docs/multiple_outputs.md) Requirements ----- diff --git a/docs/multiple_outputs.md b/docs/multiple_outputs.md new file mode 100644 index 00000000..f63dbc5d --- /dev/null +++ b/docs/multiple_outputs.md @@ -0,0 +1,46 @@ +### Multiple Outputs + +ffplayout supports multiple outputs in a way, that it can output the same stream to multiple targets with different encoding settings. + +For example you want to stream different resolutions, you could apply this output parameters: + +```YAML + ... + + output_param: >- + -c:v libx264 + -crf 23 + -x264-params keyint=50:min-keyint=25:scenecut=-1 + -maxrate 1300k + -bufsize 2600k + -preset faster + -tune zerolatency + -profile:v Main + -level 3.1 + -c:a aac + -ar 44100 + -b:a 128k + -flags +global_header + -f flv rtmp://example.org/live/stream-high + -s 960x540 + -c:v libx264 + -crf 23 + -x264-params keyint=50:min-keyint=25:scenecut=-1 + -maxrate 1000k + -bufsize 1800k + -preset faster + -tune zerolatency + -profile:v Main + -level 3.1 + -c:a aac + -ar 44100 + -b:a 128k + -flags +global_header + -f flv rtmp://example.org/live/stream-low +``` + +When you are using the text overlay filter, it will apply to all outputs. + +The same works to for HLS output. + +If you want to use different resolution, you should apply them in order from biggest to smallest. Use the biggest resolution in config under `processing:` and the smaller ones in `output_params:`. diff --git a/src/output/hls.rs b/src/output/hls.rs index b8841476..e326fab0 100644 --- a/src/output/hls.rs +++ b/src/output/hls.rs @@ -30,8 +30,8 @@ use simplelog::*; use crate::filter::ingest_filter::filter_cmd; use crate::input::{ingest::log_line, source_generator}; use crate::utils::{ - sec_to_time, stderr_reader, Decoder, GlobalConfig, Ingest, PlayerControl, PlayoutStatus, - ProcessControl, + prepare_output_cmd, sec_to_time, stderr_reader, Decoder, GlobalConfig, Ingest, PlayerControl, + PlayoutStatus, ProcessControl, }; use crate::vec_strings; @@ -44,12 +44,17 @@ fn ingest_to_hls_server( let playlist_init = playout_stat.list_init; let level = config.logging.ffmpeg_level.clone(); - let mut server_cmd = vec_strings!["-hide_banner", "-nostats", "-v", "level+info"]; - let stream_input = config.ingest.input_cmd.clone().unwrap(); + let mut server_prefix = vec_strings!["-hide_banner", "-nostats", "-v", "level+info"]; + let mut stream_input = config.ingest.input_cmd.clone().unwrap(); + server_prefix.append(&mut stream_input); + let server_filter = filter_cmd(&config); - server_cmd.append(&mut stream_input.clone()); - server_cmd.append(&mut filter_cmd(&config)); - server_cmd.append(&mut config.out.clone().output_cmd.unwrap()); + let server_cmd = prepare_output_cmd( + server_prefix, + server_filter, + config.out.clone().output_cmd.unwrap(), + "hls", + ); let mut is_running; @@ -167,23 +172,23 @@ pub fn write_hls( node.source ); - let mut filter = node.filter.unwrap(); - let mut dec_cmd = vec_strings!["-hide_banner", "-nostats", "-v", &ff_log_format]; - dec_cmd.append(&mut cmd); - - if filter.len() > 1 { - dec_cmd.append(&mut filter); - } - - dec_cmd.append(&mut config.out.clone().output_cmd.unwrap()); + let mut enc_prefix = vec_strings!["-hide_banner", "-nostats", "-v", &ff_log_format]; + enc_prefix.append(&mut cmd); + let enc_filter = node.filter.unwrap(); + let enc_cmd = prepare_output_cmd( + enc_prefix, + enc_filter, + config.out.clone().output_cmd.unwrap(), + &config.out.mode, + ); debug!( "HLS writer CMD: \"ffmpeg {}\"", - dec_cmd.join(" ") + enc_cmd.join(" ") ); - let mut dec_proc = match Command::new("ffmpeg") - .args(dec_cmd) + let mut enc_proc = match Command::new("ffmpeg") + .args(enc_cmd) .stderr(Stdio::piped()) .spawn() { @@ -194,8 +199,8 @@ pub fn write_hls( Ok(proc) => proc, }; - let dec_err = BufReader::new(dec_proc.stderr.take().unwrap()); - *proc_control.decoder_term.lock().unwrap() = Some(dec_proc); + let dec_err = BufReader::new(enc_proc.stderr.take().unwrap()); + *proc_control.decoder_term.lock().unwrap() = Some(enc_proc); if let Err(e) = stderr_reader(dec_err, "Writer") { error!("{e:?}") @@ -209,4 +214,8 @@ pub fn write_hls( sleep(Duration::from_secs(1)); } } + + sleep(Duration::from_secs(1)); + + proc_control.kill_all(); } diff --git a/src/output/stream.rs b/src/output/stream.rs index 6f922ac6..0559e5d5 100644 --- a/src/output/stream.rs +++ b/src/output/stream.rs @@ -3,23 +3,19 @@ use std::process::{self, Command, Stdio}; use simplelog::*; use crate::filter::v_drawtext; -use crate::utils::{GlobalConfig, Media}; +use crate::utils::{prepare_output_cmd, GlobalConfig, Media}; use crate::vec_strings; /// Streaming Output /// /// Prepare the ffmpeg command for streaming output pub fn output(config: &GlobalConfig, log_format: &str) -> process::Child { + let mut enc_cmd = vec![]; let mut enc_filter = vec![]; - let mut preview = vec![]; let mut preview_cmd = config.out.preview_cmd.as_ref().unwrap().clone(); - let output_cmd = config.out.output_cmd.as_ref().unwrap().clone(); - let params_len = output_cmd.len(); - let mut output_count = 1; - let mut output_v_map = "[v_out1]".to_string(); - let mut output_params = output_cmd.clone(); + let mut output_cmd = config.out.output_cmd.as_ref().unwrap().clone(); - let mut enc_cmd = vec_strings![ + let enc_prefix = vec_strings![ "-hide_banner", "-nostats", "-v", @@ -41,63 +37,18 @@ pub fn output(config: &GlobalConfig, log_format: &str) -> process::Child { v_drawtext::filter_node(config, &mut Media::new(0, String::new(), false)).as_str(), ); - if config.out.preview { - output_count += 1; - output_v_map.push_str(format!("[v_out{output_count}]").as_str()); - - preview = vec_strings!["-map", "[v_out1]", "-map", "0:a"]; - preview.append(&mut preview_cmd); - preview.append(&mut vec_strings!["-map", "[v_out2]", "-map", "0:a"]); - } - - output_params.clear(); - - // check for multiple outputs and add mapping to it - for (i, param) in output_cmd.iter().enumerate() { - output_params.push(param.clone()); - - if i > 0 - && !param.starts_with('-') - && !output_cmd[i - 1].starts_with('-') - && i < params_len - 1 - { - output_count += 1; - let v_map = format!("[v_out{output_count}]"); - output_v_map.push_str(v_map.as_str()); - - let mut map = vec![ - "-map".to_string(), - v_map, - "-map".to_string(), - "0:a".to_string(), - ]; - output_params.append(&mut map); - } - } - - if output_count > 1 { - if !filter.is_empty() { - filter.push(','); - } - - filter.push_str(format!("split={output_count}{output_v_map}").as_str()); - - if preview.is_empty() { - output_params.insert(0, "-map".to_string()); - output_params.insert(1, "[v_out1]".to_string()); - output_params.insert(2, "-map".to_string()); - output_params.insert(3, "0:a".to_string()); - } - } - enc_filter = vec!["-filter_complex".to_string(), filter]; - } else if config.out.preview { - preview = preview_cmd; } - enc_cmd.append(&mut enc_filter); - enc_cmd.append(&mut preview); - enc_cmd.append(&mut output_params); + if config.out.preview { + enc_cmd.append(&mut preview_cmd); + } + + println!("{enc_filter:?}"); + + enc_cmd.append(&mut output_cmd); + + let enc_cmd = prepare_output_cmd(enc_prefix, enc_filter, enc_cmd, &config.out.mode); debug!( "Encoder CMD: \"ffmpeg {}\"", diff --git a/src/tests/utils/mod.rs b/src/tests/utils/mod.rs index edd71f49..2267bdfe 100644 --- a/src/tests/utils/mod.rs +++ b/src/tests/utils/mod.rs @@ -3,6 +3,7 @@ use chrono::prelude::*; #[cfg(test)] use crate::utils::*; +use crate::vec_strings; #[test] fn mock_date_time() { @@ -50,3 +51,66 @@ fn test_delta() { assert!(delta < 2.0); } + +#[test] +fn test_prepare_output_cmd() { + let enc_prefix = vec_strings![ + "-hide_banner", + "-nostats", + "-v", + "level+error", + "-re", + "-i", + "pipe:0" + ]; + let filter = vec_strings![ + "-filter_complex", + "[0:v]null,zmq=b=tcp\\\\://'127.0.0.1\\:5555',drawtext=text=''" + ]; + let params = vec_strings![ + "-c:v", + "libx264", + "-flags", + "+global_header", + "-f", + "flv", + "rtmp://localhost/live/stream", + "-s", + "512x288", + "-c:v", + "libx264", + "-flags", + "+global_header", + "-f", + "flv", + "rtmp://localhost:1937/live/stream" + ]; + + let mut t1_params = enc_prefix.clone(); + t1_params.append(&mut params.clone()); + let cmd_two_outs = + prepare_output_cmd(enc_prefix.clone(), vec_strings![], params.clone(), "stream"); + + assert_eq!(cmd_two_outs, t1_params); + + let mut test_cmd = enc_prefix.clone(); + let mut test_params = params.clone(); + let mut t2_filter = filter.clone(); + t2_filter[1].push_str(",split=2[v_out1][v_out2]"); + test_cmd.append(&mut t2_filter); + + test_params.insert(0, "-map".to_string()); + test_params.insert(1, "[v_out1]".to_string()); + test_params.insert(2, "-map".to_string()); + test_params.insert(3, "0:a".to_string()); + + test_params.insert(11, "-map".to_string()); + test_params.insert(12, "[v_out2]".to_string()); + test_params.insert(13, "-map".to_string()); + test_params.insert(14, "0:a".to_string()); + + test_cmd.append(&mut test_params); + let cmd_two_outs_with_filter = prepare_output_cmd(enc_prefix, filter, params, "stream"); + + assert_eq!(cmd_two_outs_with_filter, test_cmd); +} diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 49b0b2c2..949a6258 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -29,7 +29,7 @@ pub use json_serializer::{read_json, Playlist, DUMMY_LEN}; pub use json_validate::validate_playlist; pub use logging::{init_logging, send_mail}; -use crate::filter::filter_chains; +use crate::{filter::filter_chains, vec_strings}; /// Video clip struct to hold some important states and comments for current media. #[derive(Debug, Serialize, Deserialize, Clone)] @@ -365,6 +365,89 @@ pub fn format_log_line(line: String, level: &str) -> String { line.replace(&format!("[{level: >5}] "), "") } +/// Prepare output parameters +/// +/// seek for multiple outputs and add mapping for it +pub fn prepare_output_cmd( + prefix: Vec, + mut filter: Vec, + params: Vec, + mode: &str, +) -> Vec { + let params_len = params.len(); + let mut output_params = params.clone(); + let mut output_a_map = "[a_out1]".to_string(); + let mut output_v_map = "[v_out1]".to_string(); + let mut output_count = 1; + let mut cmd = prefix; + + if !filter.is_empty() { + output_params.clear(); + + for (i, param) in params.iter().enumerate() { + output_params.push(param.clone()); + + if i > 0 + && !param.starts_with('-') + && !params[i - 1].starts_with('-') + && i < params_len - 1 + { + output_count += 1; + let mut a_map = "0:a".to_string(); + let v_map = format!("[v_out{output_count}]"); + output_v_map.push_str(v_map.as_str()); + + if mode == "hls" { + a_map = format!("[a_out{output_count}]"); + } + + output_a_map.push_str(a_map.as_str()); + + let mut map = vec!["-map".to_string(), v_map, "-map".to_string(), a_map]; + + output_params.append(&mut map); + } + } + + if output_count > 1 && mode == "hls" { + filter[1].push_str(format!(";[vout1]split={output_count}{output_v_map}").as_str()); + filter[1].push_str(format!(";[aout1]asplit={output_count}{output_a_map}").as_str()); + filter.drain(2..); + cmd.append(&mut filter); + cmd.append(&mut vec_strings!["-map", "[v_out1]", "-map", "[a_out1]"]); + } else if output_count > 1 && mode == "stream" { + filter[1].push_str(format!(",split={output_count}{output_v_map}").as_str()); + cmd.append(&mut filter); + cmd.append(&mut vec_strings!["-map", "[v_out1]", "-map", "0:a"]); + } else { + cmd.append(&mut filter); + } + } + + cmd.append(&mut output_params); + + cmd +} + +/// Validate input +/// +/// Check if input is a remote source, or from storage and see if it exists. +pub fn validate_source(source: &str) -> bool { + let re = Regex::new(r"^https?://.*").unwrap(); + + if re.is_match(source) { + match MediaProbe::new(source).video_streams { + Some(_) => return true, + None => { + error!("Remote file not exist: {source}"); + return false; + } + } + } + + Path::new(&source).is_file() +} + /// Read ffmpeg stderr decoder and encoder instance /// and log the output. pub fn stderr_reader(buffer: BufReader, suffix: &str) -> Result<(), Error> { @@ -489,22 +572,6 @@ pub fn validate_ffmpeg(config: &GlobalConfig) { } } -pub fn validate_source(source: &str) -> bool { - let re = Regex::new(r"^https?://.*").unwrap(); - - if re.is_match(source) { - match MediaProbe::new(source).video_streams { - Some(_) => return true, - None => { - error!("Remote file not exist: {source}"); - return false; - } - } - } - - Path::new(&source).is_file() -} - /// Get system time, in non test case. #[cfg(not(test))] pub fn time_now() -> DateTime {