support multiple outputs for streaming output and hls mode

This commit is contained in:
jb-alvarado 2022-05-31 12:09:08 +02:00
parent f1b77f5c3e
commit a9dae4ee5b
6 changed files with 238 additions and 100 deletions

View File

@ -42,6 +42,7 @@ The main purpose of ffplayout is to provide a 24/7 broadcasting solution that pl
- **HLS**
- JSON RPC server, for getting infos about current playing and controlling
- [live ingest](/docs/live_ingest.md)
- [multiple outputs](/docs/multiple_outputs.md)
Requirements
-----

46
docs/multiple_outputs.md Normal file
View File

@ -0,0 +1,46 @@
### Multiple Outputs
ffplayout supports multiple outputs in a way, that it can output the same stream to multiple targets with different encoding settings.
For example you want to stream different resolutions, you could apply this output parameters:
```YAML
...
output_param: >-
-c:v libx264
-crf 23
-x264-params keyint=50:min-keyint=25:scenecut=-1
-maxrate 1300k
-bufsize 2600k
-preset faster
-tune zerolatency
-profile:v Main
-level 3.1
-c:a aac
-ar 44100
-b:a 128k
-flags +global_header
-f flv rtmp://example.org/live/stream-high
-s 960x540
-c:v libx264
-crf 23
-x264-params keyint=50:min-keyint=25:scenecut=-1
-maxrate 1000k
-bufsize 1800k
-preset faster
-tune zerolatency
-profile:v Main
-level 3.1
-c:a aac
-ar 44100
-b:a 128k
-flags +global_header
-f flv rtmp://example.org/live/stream-low
```
When you are using the text overlay filter, it will apply to all outputs.
The same works to for HLS output.
If you want to use different resolution, you should apply them in order from biggest to smallest. Use the biggest resolution in config under `processing:` and the smaller ones in `output_params:`.

View File

@ -30,8 +30,8 @@ use simplelog::*;
use crate::filter::ingest_filter::filter_cmd;
use crate::input::{ingest::log_line, source_generator};
use crate::utils::{
sec_to_time, stderr_reader, Decoder, GlobalConfig, Ingest, PlayerControl, PlayoutStatus,
ProcessControl,
prepare_output_cmd, sec_to_time, stderr_reader, Decoder, GlobalConfig, Ingest, PlayerControl,
PlayoutStatus, ProcessControl,
};
use crate::vec_strings;
@ -44,12 +44,17 @@ fn ingest_to_hls_server(
let playlist_init = playout_stat.list_init;
let level = config.logging.ffmpeg_level.clone();
let mut server_cmd = vec_strings!["-hide_banner", "-nostats", "-v", "level+info"];
let stream_input = config.ingest.input_cmd.clone().unwrap();
let mut server_prefix = vec_strings!["-hide_banner", "-nostats", "-v", "level+info"];
let mut stream_input = config.ingest.input_cmd.clone().unwrap();
server_prefix.append(&mut stream_input);
let server_filter = filter_cmd(&config);
server_cmd.append(&mut stream_input.clone());
server_cmd.append(&mut filter_cmd(&config));
server_cmd.append(&mut config.out.clone().output_cmd.unwrap());
let server_cmd = prepare_output_cmd(
server_prefix,
server_filter,
config.out.clone().output_cmd.unwrap(),
"hls",
);
let mut is_running;
@ -167,23 +172,23 @@ pub fn write_hls(
node.source
);
let mut filter = node.filter.unwrap();
let mut dec_cmd = vec_strings!["-hide_banner", "-nostats", "-v", &ff_log_format];
dec_cmd.append(&mut cmd);
if filter.len() > 1 {
dec_cmd.append(&mut filter);
}
dec_cmd.append(&mut config.out.clone().output_cmd.unwrap());
let mut enc_prefix = vec_strings!["-hide_banner", "-nostats", "-v", &ff_log_format];
enc_prefix.append(&mut cmd);
let enc_filter = node.filter.unwrap();
let enc_cmd = prepare_output_cmd(
enc_prefix,
enc_filter,
config.out.clone().output_cmd.unwrap(),
&config.out.mode,
);
debug!(
"HLS writer CMD: <bright-blue>\"ffmpeg {}\"</>",
dec_cmd.join(" ")
enc_cmd.join(" ")
);
let mut dec_proc = match Command::new("ffmpeg")
.args(dec_cmd)
let mut enc_proc = match Command::new("ffmpeg")
.args(enc_cmd)
.stderr(Stdio::piped())
.spawn()
{
@ -194,8 +199,8 @@ pub fn write_hls(
Ok(proc) => proc,
};
let dec_err = BufReader::new(dec_proc.stderr.take().unwrap());
*proc_control.decoder_term.lock().unwrap() = Some(dec_proc);
let dec_err = BufReader::new(enc_proc.stderr.take().unwrap());
*proc_control.decoder_term.lock().unwrap() = Some(enc_proc);
if let Err(e) = stderr_reader(dec_err, "Writer") {
error!("{e:?}")
@ -209,4 +214,8 @@ pub fn write_hls(
sleep(Duration::from_secs(1));
}
}
sleep(Duration::from_secs(1));
proc_control.kill_all();
}

View File

@ -3,23 +3,19 @@ use std::process::{self, Command, Stdio};
use simplelog::*;
use crate::filter::v_drawtext;
use crate::utils::{GlobalConfig, Media};
use crate::utils::{prepare_output_cmd, GlobalConfig, Media};
use crate::vec_strings;
/// Streaming Output
///
/// Prepare the ffmpeg command for streaming output
pub fn output(config: &GlobalConfig, log_format: &str) -> process::Child {
let mut enc_cmd = vec![];
let mut enc_filter = vec![];
let mut preview = vec![];
let mut preview_cmd = config.out.preview_cmd.as_ref().unwrap().clone();
let output_cmd = config.out.output_cmd.as_ref().unwrap().clone();
let params_len = output_cmd.len();
let mut output_count = 1;
let mut output_v_map = "[v_out1]".to_string();
let mut output_params = output_cmd.clone();
let mut output_cmd = config.out.output_cmd.as_ref().unwrap().clone();
let mut enc_cmd = vec_strings![
let enc_prefix = vec_strings![
"-hide_banner",
"-nostats",
"-v",
@ -41,63 +37,18 @@ pub fn output(config: &GlobalConfig, log_format: &str) -> process::Child {
v_drawtext::filter_node(config, &mut Media::new(0, String::new(), false)).as_str(),
);
if config.out.preview {
output_count += 1;
output_v_map.push_str(format!("[v_out{output_count}]").as_str());
preview = vec_strings!["-map", "[v_out1]", "-map", "0:a"];
preview.append(&mut preview_cmd);
preview.append(&mut vec_strings!["-map", "[v_out2]", "-map", "0:a"]);
}
output_params.clear();
// check for multiple outputs and add mapping to it
for (i, param) in output_cmd.iter().enumerate() {
output_params.push(param.clone());
if i > 0
&& !param.starts_with('-')
&& !output_cmd[i - 1].starts_with('-')
&& i < params_len - 1
{
output_count += 1;
let v_map = format!("[v_out{output_count}]");
output_v_map.push_str(v_map.as_str());
let mut map = vec![
"-map".to_string(),
v_map,
"-map".to_string(),
"0:a".to_string(),
];
output_params.append(&mut map);
}
}
if output_count > 1 {
if !filter.is_empty() {
filter.push(',');
}
filter.push_str(format!("split={output_count}{output_v_map}").as_str());
if preview.is_empty() {
output_params.insert(0, "-map".to_string());
output_params.insert(1, "[v_out1]".to_string());
output_params.insert(2, "-map".to_string());
output_params.insert(3, "0:a".to_string());
}
}
enc_filter = vec!["-filter_complex".to_string(), filter];
} else if config.out.preview {
preview = preview_cmd;
}
enc_cmd.append(&mut enc_filter);
enc_cmd.append(&mut preview);
enc_cmd.append(&mut output_params);
if config.out.preview {
enc_cmd.append(&mut preview_cmd);
}
println!("{enc_filter:?}");
enc_cmd.append(&mut output_cmd);
let enc_cmd = prepare_output_cmd(enc_prefix, enc_filter, enc_cmd, &config.out.mode);
debug!(
"Encoder CMD: <bright-blue>\"ffmpeg {}\"</>",

View File

@ -3,6 +3,7 @@ use chrono::prelude::*;
#[cfg(test)]
use crate::utils::*;
use crate::vec_strings;
#[test]
fn mock_date_time() {
@ -50,3 +51,66 @@ fn test_delta() {
assert!(delta < 2.0);
}
#[test]
fn test_prepare_output_cmd() {
let enc_prefix = vec_strings![
"-hide_banner",
"-nostats",
"-v",
"level+error",
"-re",
"-i",
"pipe:0"
];
let filter = vec_strings![
"-filter_complex",
"[0:v]null,zmq=b=tcp\\\\://'127.0.0.1\\:5555',drawtext=text=''"
];
let params = vec_strings![
"-c:v",
"libx264",
"-flags",
"+global_header",
"-f",
"flv",
"rtmp://localhost/live/stream",
"-s",
"512x288",
"-c:v",
"libx264",
"-flags",
"+global_header",
"-f",
"flv",
"rtmp://localhost:1937/live/stream"
];
let mut t1_params = enc_prefix.clone();
t1_params.append(&mut params.clone());
let cmd_two_outs =
prepare_output_cmd(enc_prefix.clone(), vec_strings![], params.clone(), "stream");
assert_eq!(cmd_two_outs, t1_params);
let mut test_cmd = enc_prefix.clone();
let mut test_params = params.clone();
let mut t2_filter = filter.clone();
t2_filter[1].push_str(",split=2[v_out1][v_out2]");
test_cmd.append(&mut t2_filter);
test_params.insert(0, "-map".to_string());
test_params.insert(1, "[v_out1]".to_string());
test_params.insert(2, "-map".to_string());
test_params.insert(3, "0:a".to_string());
test_params.insert(11, "-map".to_string());
test_params.insert(12, "[v_out2]".to_string());
test_params.insert(13, "-map".to_string());
test_params.insert(14, "0:a".to_string());
test_cmd.append(&mut test_params);
let cmd_two_outs_with_filter = prepare_output_cmd(enc_prefix, filter, params, "stream");
assert_eq!(cmd_two_outs_with_filter, test_cmd);
}

View File

@ -29,7 +29,7 @@ pub use json_serializer::{read_json, Playlist, DUMMY_LEN};
pub use json_validate::validate_playlist;
pub use logging::{init_logging, send_mail};
use crate::filter::filter_chains;
use crate::{filter::filter_chains, vec_strings};
/// Video clip struct to hold some important states and comments for current media.
#[derive(Debug, Serialize, Deserialize, Clone)]
@ -365,6 +365,89 @@ pub fn format_log_line(line: String, level: &str) -> String {
line.replace(&format!("[{level: >5}] "), "")
}
/// Prepare output parameters
///
/// seek for multiple outputs and add mapping for it
pub fn prepare_output_cmd(
prefix: Vec<String>,
mut filter: Vec<String>,
params: Vec<String>,
mode: &str,
) -> Vec<String> {
let params_len = params.len();
let mut output_params = params.clone();
let mut output_a_map = "[a_out1]".to_string();
let mut output_v_map = "[v_out1]".to_string();
let mut output_count = 1;
let mut cmd = prefix;
if !filter.is_empty() {
output_params.clear();
for (i, param) in params.iter().enumerate() {
output_params.push(param.clone());
if i > 0
&& !param.starts_with('-')
&& !params[i - 1].starts_with('-')
&& i < params_len - 1
{
output_count += 1;
let mut a_map = "0:a".to_string();
let v_map = format!("[v_out{output_count}]");
output_v_map.push_str(v_map.as_str());
if mode == "hls" {
a_map = format!("[a_out{output_count}]");
}
output_a_map.push_str(a_map.as_str());
let mut map = vec!["-map".to_string(), v_map, "-map".to_string(), a_map];
output_params.append(&mut map);
}
}
if output_count > 1 && mode == "hls" {
filter[1].push_str(format!(";[vout1]split={output_count}{output_v_map}").as_str());
filter[1].push_str(format!(";[aout1]asplit={output_count}{output_a_map}").as_str());
filter.drain(2..);
cmd.append(&mut filter);
cmd.append(&mut vec_strings!["-map", "[v_out1]", "-map", "[a_out1]"]);
} else if output_count > 1 && mode == "stream" {
filter[1].push_str(format!(",split={output_count}{output_v_map}").as_str());
cmd.append(&mut filter);
cmd.append(&mut vec_strings!["-map", "[v_out1]", "-map", "0:a"]);
} else {
cmd.append(&mut filter);
}
}
cmd.append(&mut output_params);
cmd
}
/// Validate input
///
/// Check if input is a remote source, or from storage and see if it exists.
pub fn validate_source(source: &str) -> bool {
let re = Regex::new(r"^https?://.*").unwrap();
if re.is_match(source) {
match MediaProbe::new(source).video_streams {
Some(_) => return true,
None => {
error!("Remote file not exist: {source}");
return false;
}
}
}
Path::new(&source).is_file()
}
/// Read ffmpeg stderr decoder and encoder instance
/// and log the output.
pub fn stderr_reader(buffer: BufReader<ChildStderr>, suffix: &str) -> Result<(), Error> {
@ -489,22 +572,6 @@ pub fn validate_ffmpeg(config: &GlobalConfig) {
}
}
pub fn validate_source(source: &str) -> bool {
let re = Regex::new(r"^https?://.*").unwrap();
if re.is_match(source) {
match MediaProbe::new(source).video_streams {
Some(_) => return true,
None => {
error!("Remote file not exist: {source}");
return false;
}
}
}
Path::new(&source).is_file()
}
/// Get system time, in non test case.
#[cfg(not(test))]
pub fn time_now() -> DateTime<Local> {