unify filter applying
- use same filter function for all Ingest, HLS and local/remote clips - fix preserve drawtext filter in HLS mode, when switching to and from ingest - stop playout on some known unrecoverable ffmpeg errors - error save fps calc with fallback
This commit is contained in:
parent
226d43aed3
commit
05f87472d4
@ -9,8 +9,10 @@ use std::{
|
||||
use crossbeam_channel::Sender;
|
||||
use simplelog::*;
|
||||
|
||||
use ffplayout_lib::filter::ingest_filter::filter_cmd;
|
||||
use ffplayout_lib::utils::{format_log_line, test_tcp_port, Ingest, PlayoutConfig, ProcessControl};
|
||||
use ffplayout_lib::filter::filter_chains;
|
||||
use ffplayout_lib::utils::{
|
||||
format_log_line, test_tcp_port, Ingest, Media, PlayoutConfig, ProcessControl,
|
||||
};
|
||||
use ffplayout_lib::vec_strings;
|
||||
|
||||
pub fn log_line(line: String, level: &str) {
|
||||
@ -82,9 +84,12 @@ pub fn ingest_server(
|
||||
let mut buffer: [u8; 65088] = [0; 65088];
|
||||
let mut server_cmd = vec_strings!["-hide_banner", "-nostats", "-v", "level+info"];
|
||||
let stream_input = config.ingest.input_cmd.clone().unwrap();
|
||||
let mut dummy_media = Media::new(0, "Live Stream".to_string(), false);
|
||||
dummy_media.is_live = Some(true);
|
||||
let mut filters = filter_chains(&config, &mut dummy_media, &Arc::new(Mutex::new(vec![])));
|
||||
|
||||
server_cmd.append(&mut stream_input.clone());
|
||||
server_cmd.append(&mut filter_cmd(&config, &Arc::new(Mutex::new(vec![]))));
|
||||
server_cmd.append(&mut filters);
|
||||
server_cmd.append(&mut config.processing.settings.unwrap());
|
||||
|
||||
let mut is_running;
|
||||
|
@ -26,8 +26,7 @@ pub fn output(config: &PlayoutConfig, log_format: &str) -> process::Child {
|
||||
|
||||
let mut filter: String = "null,".to_string();
|
||||
filter.push_str(
|
||||
v_drawtext::filter_node(config, None, &Arc::new(Mutex::new(vec![])), false)
|
||||
.as_str(),
|
||||
v_drawtext::filter_node(config, None, &Arc::new(Mutex::new(vec![]))).as_str(),
|
||||
);
|
||||
enc_filter = vec!["-vf".to_string(), filter];
|
||||
}
|
||||
|
@ -28,10 +28,10 @@ use std::{
|
||||
use simplelog::*;
|
||||
|
||||
use crate::input::{ingest::log_line, source_generator};
|
||||
use ffplayout_lib::filter::ingest_filter::filter_cmd;
|
||||
use ffplayout_lib::filter::filter_chains;
|
||||
use ffplayout_lib::utils::{
|
||||
prepare_output_cmd, sec_to_time, stderr_reader, test_tcp_port, Decoder, Ingest, PlayerControl,
|
||||
PlayoutConfig, PlayoutStatus, ProcessControl,
|
||||
prepare_output_cmd, sec_to_time, stderr_reader, test_tcp_port, Decoder, Ingest, Media,
|
||||
PlayerControl, PlayoutConfig, PlayoutStatus, ProcessControl,
|
||||
};
|
||||
use ffplayout_lib::vec_strings;
|
||||
|
||||
@ -47,28 +47,8 @@ fn ingest_to_hls_server(
|
||||
let mut server_prefix = vec_strings!["-hide_banner", "-nostats", "-v", "level+info"];
|
||||
let stream_input = config.ingest.input_cmd.clone().unwrap();
|
||||
server_prefix.append(&mut stream_input.clone());
|
||||
let server_filter = filter_cmd(&config, &playout_stat.chain);
|
||||
|
||||
if server_filter.len() > 1 {
|
||||
let filter_chain = server_filter[1]
|
||||
.split_terminator([',', ';'])
|
||||
.collect::<Vec<&str>>();
|
||||
|
||||
for (i, link) in filter_chain.iter().enumerate() {
|
||||
if link.contains("drawtext") {
|
||||
playout_stat
|
||||
.drawtext_server_index
|
||||
.store(i, Ordering::SeqCst);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let server_cmd = prepare_output_cmd(
|
||||
server_prefix,
|
||||
server_filter,
|
||||
config.out.clone().output_cmd.unwrap(),
|
||||
"hls",
|
||||
);
|
||||
let mut dummy_media = Media::new(0, "Live Stream".to_string(), false);
|
||||
dummy_media.is_live = Some(true);
|
||||
|
||||
let mut is_running;
|
||||
|
||||
@ -81,13 +61,37 @@ fn ingest_to_hls_server(
|
||||
info!("Start ingest server, listening on: <b><magenta>{url}</></b>");
|
||||
};
|
||||
|
||||
debug!(
|
||||
"Server CMD: <bright-blue>\"ffmpeg {}\"</>",
|
||||
server_cmd.join(" ")
|
||||
);
|
||||
|
||||
loop {
|
||||
let mut proc_ctl = proc_control.clone();
|
||||
let filters = filter_chains(&config, &mut dummy_media, &playout_stat.chain);
|
||||
|
||||
if filters.len() > 1 {
|
||||
// get correct filter index from drawtext node for zmq
|
||||
let filter_chain = filters[1]
|
||||
.split_terminator([',', ';'])
|
||||
.collect::<Vec<&str>>();
|
||||
|
||||
for (i, link) in filter_chain.iter().enumerate() {
|
||||
if link.contains("drawtext") {
|
||||
playout_stat
|
||||
.drawtext_server_index
|
||||
.store(i, Ordering::SeqCst);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let server_cmd = prepare_output_cmd(
|
||||
server_prefix.clone(),
|
||||
filters,
|
||||
config.out.clone().output_cmd.unwrap(),
|
||||
"hls",
|
||||
);
|
||||
|
||||
debug!(
|
||||
"Server CMD: <bright-blue>\"ffmpeg {}\"</>",
|
||||
server_cmd.join(" ")
|
||||
);
|
||||
|
||||
let proc_ctl = proc_control.clone();
|
||||
let mut server_proc = match Command::new("ffmpeg")
|
||||
.args(server_cmd.clone())
|
||||
.stderr(Stdio::piped())
|
||||
@ -128,6 +132,10 @@ fn ingest_to_hls_server(
|
||||
log_line(line, &level);
|
||||
}
|
||||
|
||||
if proc_control.server_is_running.load(Ordering::SeqCst) {
|
||||
info!("Switch from live ingest to {}", config.processing.mode);
|
||||
}
|
||||
|
||||
proc_control
|
||||
.server_is_running
|
||||
.store(false, Ordering::SeqCst);
|
||||
@ -139,8 +147,6 @@ fn ingest_to_hls_server(
|
||||
if proc_control.is_terminated.load(Ordering::SeqCst) {
|
||||
break;
|
||||
}
|
||||
|
||||
info!("Switch from live ingest to {}", config.processing.mode);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
@ -134,7 +134,8 @@ pub fn player(
|
||||
*proc_control.decoder_term.lock().unwrap() = Some(dec_proc);
|
||||
let dec_p_ctl = proc_control.clone();
|
||||
|
||||
let error_decoder_thread = thread::spawn(move || stderr_reader(dec_err, "Decoder", dec_p_ctl));
|
||||
let error_decoder_thread =
|
||||
thread::spawn(move || stderr_reader(dec_err, "Decoder", dec_p_ctl));
|
||||
|
||||
loop {
|
||||
// when server is running, read from channel
|
||||
|
@ -35,8 +35,7 @@ pub fn output(config: &PlayoutConfig, log_format: &str) -> process::Child {
|
||||
|
||||
let mut filter: String = "null,".to_string();
|
||||
filter.push_str(
|
||||
v_drawtext::filter_node(config, None, &Arc::new(Mutex::new(vec![])), false)
|
||||
.as_str(),
|
||||
v_drawtext::filter_node(config, None, &Arc::new(Mutex::new(vec![]))).as_str(),
|
||||
);
|
||||
enc_filter = vec!["-vf".to_string(), filter];
|
||||
}
|
||||
|
@ -37,8 +37,7 @@ pub fn output(config: &PlayoutConfig, log_format: &str) -> process::Child {
|
||||
let mut filter = "[0:v]null,".to_string();
|
||||
|
||||
filter.push_str(
|
||||
v_drawtext::filter_node(config, None, &Arc::new(Mutex::new(vec![])), false)
|
||||
.as_str(),
|
||||
v_drawtext::filter_node(config, None, &Arc::new(Mutex::new(vec![]))).as_str(),
|
||||
);
|
||||
|
||||
enc_filter = vec!["-filter_complex".to_string(), filter];
|
||||
|
@ -12,8 +12,8 @@ use serde_json::{json, Map};
|
||||
use simplelog::*;
|
||||
|
||||
use ffplayout_lib::utils::{
|
||||
get_delta, get_filter_from_json, get_sec, sec_to_time, write_status, Media, PlayerControl,
|
||||
PlayoutConfig, PlayoutStatus, ProcessControl,
|
||||
get_delta, get_filter_from_json, get_sec, sec_to_time, write_status, Ingest, Media,
|
||||
PlayerControl, PlayoutConfig, PlayoutStatus, ProcessControl,
|
||||
};
|
||||
|
||||
use zmq_cmd::zmq_send;
|
||||
@ -90,6 +90,24 @@ pub fn json_rpc_server(
|
||||
let mut clips_filter = playout_stat.chain.lock().unwrap();
|
||||
*clips_filter = vec![filter.clone()];
|
||||
|
||||
if config.out.mode == "hls" {
|
||||
if proc.server_is_running.load(Ordering::SeqCst) {
|
||||
let filter_server = format!(
|
||||
"Parsed_drawtext_{} reinit {filter}",
|
||||
playout_stat.drawtext_server_index.load(Ordering::SeqCst)
|
||||
);
|
||||
|
||||
if let Ok(reply) = block_on(zmq_send(
|
||||
&filter_server,
|
||||
&config.text.zmq_server_socket.clone().unwrap(),
|
||||
)) {
|
||||
return Ok(Value::String(reply));
|
||||
};
|
||||
} else if let Err(e) = proc.kill(Ingest) {
|
||||
error!("Ingest {e:?}")
|
||||
}
|
||||
}
|
||||
|
||||
if config.out.mode != "hls" || !proc.server_is_running.load(Ordering::SeqCst) {
|
||||
let filter_stream = format!(
|
||||
"Parsed_drawtext_{} reinit {filter}",
|
||||
@ -103,20 +121,6 @@ pub fn json_rpc_server(
|
||||
return Ok(Value::String(reply));
|
||||
};
|
||||
}
|
||||
|
||||
if config.out.mode == "hls" && proc.server_is_running.load(Ordering::SeqCst) {
|
||||
let filter_server = format!(
|
||||
"Parsed_drawtext_{} reinit {filter}",
|
||||
playout_stat.drawtext_server_index.load(Ordering::SeqCst)
|
||||
);
|
||||
|
||||
if let Ok(reply) = block_on(zmq_send(
|
||||
&filter_server,
|
||||
&config.text.zmq_server_socket.clone().unwrap(),
|
||||
)) {
|
||||
return Ok(Value::String(reply));
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
return Ok(Value::String("Last clip can not be skipped".to_string()));
|
||||
|
46
lib/src/filter/custom_filter.rs
Normal file
46
lib/src/filter/custom_filter.rs
Normal file
@ -0,0 +1,46 @@
|
||||
use simplelog::*;
|
||||
|
||||
fn strip_str(mut input: &str) -> String {
|
||||
input = input.strip_prefix(';').unwrap_or(input);
|
||||
input = input.strip_prefix("[0:v]").unwrap_or(input);
|
||||
input = input.strip_prefix("[0:a]").unwrap_or(input);
|
||||
input = input.strip_suffix(';').unwrap_or(input);
|
||||
input = input.strip_suffix("[c_v_out]").unwrap_or(input);
|
||||
input = input.strip_suffix("[c_a_out]").unwrap_or(input);
|
||||
|
||||
input.to_string()
|
||||
}
|
||||
|
||||
/// Apply custom filters
|
||||
pub fn custom_filter(filter: &str) -> (String, String) {
|
||||
let mut video_filter = String::new();
|
||||
let mut audio_filter = String::new();
|
||||
|
||||
if filter.contains("[c_v_out]") && filter.contains("[c_a_out]") {
|
||||
let v_pos = filter.find("[c_v_out]").unwrap();
|
||||
let a_pos = filter.find("[c_a_out]").unwrap();
|
||||
let mut delimiter = "[c_v_out]";
|
||||
|
||||
if v_pos > a_pos {
|
||||
delimiter = "[c_a_out]";
|
||||
}
|
||||
|
||||
if let Some((f_1, f_2)) = filter.split_once(delimiter) {
|
||||
if f_2.contains("[c_a_out]") {
|
||||
video_filter = strip_str(f_1);
|
||||
audio_filter = strip_str(f_2);
|
||||
} else {
|
||||
video_filter = strip_str(f_2);
|
||||
audio_filter = strip_str(f_1);
|
||||
}
|
||||
}
|
||||
} else if filter.contains("[c_v_out]") {
|
||||
video_filter = strip_str(filter);
|
||||
} else if filter.contains("[c_a_out]") {
|
||||
audio_filter = strip_str(filter);
|
||||
} else if !filter.is_empty() && filter != "~" {
|
||||
error!("Custom filter is not well formatted, use correct out link names (\"[c_v_out]\" and/or \"[c_a_out]\"). Filter skipped!")
|
||||
}
|
||||
|
||||
(video_filter, audio_filter)
|
||||
}
|
@ -1,60 +0,0 @@
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use crate::filter::{a_loudnorm, v_drawtext, v_overlay};
|
||||
use crate::utils::PlayoutConfig;
|
||||
|
||||
/// Audio Filter
|
||||
///
|
||||
/// If needed we add audio filters to the server instance.
|
||||
fn audio_filter(config: &PlayoutConfig) -> String {
|
||||
let mut audio_chain = ";[0:a]afade=in:st=0:d=0.5".to_string();
|
||||
|
||||
if config.processing.loudnorm_ingest {
|
||||
audio_chain.push(',');
|
||||
audio_chain.push_str(&a_loudnorm::filter_node(config));
|
||||
}
|
||||
|
||||
if config.processing.volume != 1.0 {
|
||||
audio_chain.push_str(format!(",volume={}", config.processing.volume).as_str());
|
||||
}
|
||||
|
||||
audio_chain.push_str("[aout1]");
|
||||
|
||||
audio_chain
|
||||
}
|
||||
|
||||
/// Create filter nodes for ingest live stream.
|
||||
pub fn filter_cmd(config: &PlayoutConfig, filter_chain: &Arc<Mutex<Vec<String>>>) -> Vec<String> {
|
||||
let mut filter = format!(
|
||||
"[0:v]fps={},scale={}:{},setdar=dar={},fade=in:st=0:d=0.5",
|
||||
config.processing.fps,
|
||||
config.processing.width,
|
||||
config.processing.height,
|
||||
config.processing.aspect
|
||||
);
|
||||
|
||||
let overlay = v_overlay::filter_node(config, true);
|
||||
let drawtext = v_drawtext::filter_node(config, None, filter_chain, true);
|
||||
|
||||
if !overlay.is_empty() {
|
||||
filter.push(',');
|
||||
filter.push_str(&overlay);
|
||||
}
|
||||
|
||||
if config.out.mode == "hls" && !drawtext.is_empty() {
|
||||
filter.push(',');
|
||||
filter.push_str(&drawtext);
|
||||
}
|
||||
|
||||
filter.push_str("[vout1]");
|
||||
filter.push_str(audio_filter(config).as_str());
|
||||
|
||||
vec![
|
||||
"-filter_complex".to_string(),
|
||||
filter,
|
||||
"-map".to_string(),
|
||||
"[vout1]".to_string(),
|
||||
"-map".to_string(),
|
||||
"[aout1]".to_string(),
|
||||
]
|
||||
}
|
@ -6,11 +6,11 @@ use std::{
|
||||
use simplelog::*;
|
||||
|
||||
pub mod a_loudnorm;
|
||||
pub mod ingest_filter;
|
||||
pub mod custom_filter;
|
||||
pub mod v_drawtext;
|
||||
pub mod v_overlay;
|
||||
|
||||
use crate::utils::{get_delta, is_close, Media, MediaProbe, PlayoutConfig};
|
||||
use crate::utils::{fps_calc, get_delta, is_close, Media, MediaProbe, PlayoutConfig};
|
||||
|
||||
#[derive(Clone, Copy, PartialEq)]
|
||||
enum FilterType {
|
||||
@ -20,6 +20,8 @@ enum FilterType {
|
||||
|
||||
use FilterType::*;
|
||||
|
||||
use self::custom_filter::custom_filter;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct Filters {
|
||||
audio_chain: Option<String>,
|
||||
@ -33,7 +35,7 @@ impl Filters {
|
||||
Filters {
|
||||
audio_chain: None,
|
||||
video_chain: None,
|
||||
audio_map: "1:a".to_string(),
|
||||
audio_map: "0:a".to_string(),
|
||||
video_map: "0:v".to_string(),
|
||||
}
|
||||
}
|
||||
@ -109,9 +111,15 @@ fn fps(fps: f64, chain: &mut Filters, config: &PlayoutConfig) {
|
||||
}
|
||||
}
|
||||
|
||||
fn scale(v_stream: &ffprobe::Stream, aspect: f64, chain: &mut Filters, config: &PlayoutConfig) {
|
||||
fn scale(
|
||||
width: Option<i64>,
|
||||
height: Option<i64>,
|
||||
aspect: f64,
|
||||
chain: &mut Filters,
|
||||
config: &PlayoutConfig,
|
||||
) {
|
||||
// width: i64, height: i64
|
||||
if let (Some(w), Some(h)) = (v_stream.width, v_stream.height) {
|
||||
if let (Some(w), Some(h)) = (width, height) {
|
||||
if w != config.processing.width || h != config.processing.height {
|
||||
chain.add_filter(
|
||||
&format!(
|
||||
@ -146,7 +154,7 @@ fn fade(node: &mut Media, chain: &mut Filters, codec_type: FilterType) {
|
||||
t = "a"
|
||||
}
|
||||
|
||||
if node.seek > 0.0 {
|
||||
if node.seek > 0.0 || node.is_live == Some(true) {
|
||||
chain.add_filter(&format!("{t}fade=in:st=0:d=0.5"), codec_type)
|
||||
}
|
||||
|
||||
@ -212,7 +220,7 @@ fn add_text(
|
||||
if config.text.add_text
|
||||
&& (config.text.text_from_filename || config.out.mode.to_lowercase() == "hls")
|
||||
{
|
||||
let filter = v_drawtext::filter_node(config, Some(node), filter_chain, false);
|
||||
let filter = v_drawtext::filter_node(config, Some(node), filter_chain);
|
||||
|
||||
chain.add_filter(&filter, Video);
|
||||
}
|
||||
@ -255,14 +263,8 @@ fn extend_audio(node: &mut Media, chain: &mut Filters) {
|
||||
}
|
||||
|
||||
/// Add single pass loudnorm filter to audio line.
|
||||
fn add_loudnorm(node: &mut Media, chain: &mut Filters, config: &PlayoutConfig) {
|
||||
if config.processing.add_loudnorm
|
||||
&& node
|
||||
.probe
|
||||
.as_ref()
|
||||
.and_then(|p| p.audio_streams.get(0))
|
||||
.is_none()
|
||||
{
|
||||
fn add_loudnorm(chain: &mut Filters, config: &PlayoutConfig) {
|
||||
if config.processing.add_loudnorm {
|
||||
let loud_filter = a_loudnorm::filter_node(config);
|
||||
chain.add_filter(&loud_filter, Audio);
|
||||
}
|
||||
@ -287,15 +289,6 @@ fn aspect_calc(aspect_string: &Option<String>, config: &PlayoutConfig) -> f64 {
|
||||
source_aspect
|
||||
}
|
||||
|
||||
fn fps_calc(r_frame_rate: &str) -> f64 {
|
||||
let frame_rate_vec = r_frame_rate.split('/').collect::<Vec<&str>>();
|
||||
let rate: f64 = frame_rate_vec[0].parse().unwrap();
|
||||
let factor: f64 = frame_rate_vec[1].parse().unwrap();
|
||||
let fps: f64 = rate / factor;
|
||||
|
||||
fps
|
||||
}
|
||||
|
||||
/// This realtime filter is important for HLS output to stay in sync.
|
||||
fn realtime_filter(
|
||||
node: &mut Media,
|
||||
@ -329,47 +322,8 @@ fn realtime_filter(
|
||||
}
|
||||
}
|
||||
|
||||
fn strip_str(mut input: &str) -> String {
|
||||
input = input.strip_prefix(';').unwrap_or(input);
|
||||
input = input.strip_prefix("[0:v]").unwrap_or(input);
|
||||
input = input.strip_prefix("[0:a]").unwrap_or(input);
|
||||
input = input.strip_suffix(';').unwrap_or(input);
|
||||
input = input.strip_suffix("[c_v_out]").unwrap_or(input);
|
||||
input = input.strip_suffix("[c_a_out]").unwrap_or(input);
|
||||
|
||||
input.to_string()
|
||||
}
|
||||
|
||||
/// Apply custom filters
|
||||
fn custom_filter(filter: &str, chain: &mut Filters) {
|
||||
let mut video_filter = String::new();
|
||||
let mut audio_filter = String::new();
|
||||
|
||||
if filter.contains("[c_v_out]") && filter.contains("[c_a_out]") {
|
||||
let v_pos = filter.find("[c_v_out]").unwrap();
|
||||
let a_pos = filter.find("[c_a_out]").unwrap();
|
||||
let mut delimiter = "[c_v_out]";
|
||||
|
||||
if v_pos > a_pos {
|
||||
delimiter = "[c_a_out]";
|
||||
}
|
||||
|
||||
if let Some((f_1, f_2)) = filter.split_once(delimiter) {
|
||||
if f_2.contains("[c_a_out]") {
|
||||
video_filter = strip_str(f_1);
|
||||
audio_filter = strip_str(f_2);
|
||||
} else {
|
||||
video_filter = strip_str(f_2);
|
||||
audio_filter = strip_str(f_1);
|
||||
}
|
||||
}
|
||||
} else if filter.contains("[c_v_out]") {
|
||||
video_filter = strip_str(filter);
|
||||
} else if filter.contains("[c_a_out]") {
|
||||
audio_filter = strip_str(filter);
|
||||
} else if !filter.is_empty() && filter != "~" {
|
||||
error!("Custom filter is not well formatted, use correct out link names (\"[c_v_out]\" and/or \"[c_a_out]\"). Filter skipped!")
|
||||
}
|
||||
fn custom(filter: &str, chain: &mut Filters) {
|
||||
let (video_filter, audio_filter) = custom_filter(filter);
|
||||
|
||||
if !video_filter.is_empty() {
|
||||
chain.add_filter(&video_filter, Video);
|
||||
@ -388,24 +342,33 @@ pub fn filter_chains(
|
||||
let mut filters = Filters::new();
|
||||
|
||||
if let Some(probe) = node.probe.as_ref() {
|
||||
if probe.audio_streams.get(0).is_some() && !Path::new(&node.audio).is_file() {
|
||||
filters.audio_map = "0:a".to_string();
|
||||
if probe.audio_streams.get(0).is_none() || Path::new(&node.audio).is_file() {
|
||||
filters.audio_map = "1:a".to_string();
|
||||
}
|
||||
|
||||
if let Some(v_stream) = &probe.video_streams.get(0) {
|
||||
let aspect = aspect_calc(&v_stream.display_aspect_ratio, config);
|
||||
let frame_per_sec = fps_calc(&v_stream.r_frame_rate);
|
||||
let frame_per_sec = fps_calc(&v_stream.r_frame_rate, 1.0);
|
||||
|
||||
deinterlace(&v_stream.field_order, &mut filters);
|
||||
pad(aspect, &mut filters, v_stream, config);
|
||||
fps(frame_per_sec, &mut filters, config);
|
||||
scale(v_stream, aspect, &mut filters, config);
|
||||
scale(
|
||||
v_stream.width,
|
||||
v_stream.height,
|
||||
aspect,
|
||||
&mut filters,
|
||||
config,
|
||||
);
|
||||
}
|
||||
|
||||
extend_video(node, &mut filters);
|
||||
|
||||
add_audio(node, &mut filters);
|
||||
extend_audio(node, &mut filters);
|
||||
} else {
|
||||
fps(0.0, &mut filters, config);
|
||||
scale(None, None, 1.0, &mut filters, config);
|
||||
}
|
||||
|
||||
add_text(node, &mut filters, config, filter_chain);
|
||||
@ -413,12 +376,12 @@ pub fn filter_chains(
|
||||
overlay(node, &mut filters, config);
|
||||
realtime_filter(node, &mut filters, config, Video);
|
||||
|
||||
add_loudnorm(node, &mut filters, config);
|
||||
add_loudnorm(&mut filters, config);
|
||||
fade(node, &mut filters, Audio);
|
||||
audio_volume(&mut filters, config);
|
||||
realtime_filter(node, &mut filters, config, Audio);
|
||||
|
||||
custom_filter(&config.processing.custom_filter, &mut filters);
|
||||
custom(&config.processing.custom_filter, &mut filters);
|
||||
|
||||
let mut filter_cmd = vec![];
|
||||
let mut filter_str: String = String::new();
|
||||
|
@ -11,7 +11,6 @@ pub fn filter_node(
|
||||
config: &PlayoutConfig,
|
||||
node: Option<&Media>,
|
||||
filter_chain: &Arc<Mutex<Vec<String>>>,
|
||||
is_server: bool,
|
||||
) -> String {
|
||||
let mut filter = String::new();
|
||||
let mut font = String::new();
|
||||
@ -21,9 +20,10 @@ pub fn filter_node(
|
||||
font = format!(":fontfile='{}'", config.text.fontfile)
|
||||
}
|
||||
|
||||
let zmq_socket = match is_server {
|
||||
true => config.text.zmq_server_socket.clone(),
|
||||
false => config.text.zmq_stream_socket.clone(),
|
||||
let zmq_socket = match node.and_then(|n| n.is_live) {
|
||||
Some(true) => config.text.zmq_server_socket.clone(),
|
||||
Some(false) => config.text.zmq_stream_socket.clone(),
|
||||
None => config.text.zmq_stream_socket.clone(),
|
||||
};
|
||||
|
||||
// TODO: in Rust 1.64 use let_chains instead
|
||||
|
@ -8,7 +8,7 @@ use std::{
|
||||
use serde::{Deserialize, Serialize};
|
||||
use shlex::split;
|
||||
|
||||
use crate::utils::{free_tcp_socket, home_dir, time_to_sec, MediaProbe};
|
||||
use crate::utils::{fps_calc, free_tcp_socket, home_dir, time_to_sec, MediaProbe};
|
||||
use crate::vec_strings;
|
||||
|
||||
pub const DUMMY_LEN: f64 = 60.0;
|
||||
@ -230,16 +230,12 @@ impl PlayoutConfig {
|
||||
config.processing.logo_fps = None;
|
||||
|
||||
if Path::new(&config.processing.logo).is_file() {
|
||||
if let Some(fps) = MediaProbe::new(&config.processing.logo)
|
||||
if let Some(v_stream) = MediaProbe::new(&config.processing.logo)
|
||||
.video_streams
|
||||
.get(0)
|
||||
.and_then(|v| v.r_frame_rate.split_once('/'))
|
||||
.and_then(|f| {
|
||||
f.0.parse::<f64>()
|
||||
.ok()
|
||||
.and_then(|x_num| f.1.parse::<f64>().ok().map(|y_num| x_num / y_num))
|
||||
})
|
||||
{
|
||||
let fps = fps_calc(&v_stream.r_frame_rate, config.processing.fps);
|
||||
|
||||
config.processing.logo_fps = Some(fps);
|
||||
};
|
||||
}
|
||||
|
@ -67,8 +67,8 @@ impl Default for ProcessControl {
|
||||
}
|
||||
|
||||
impl ProcessControl {
|
||||
pub fn kill(&mut self, proc: ProcessUnit) -> Result<(), String> {
|
||||
match proc {
|
||||
pub fn kill(&self, unit: ProcessUnit) -> Result<(), String> {
|
||||
match unit {
|
||||
Decoder => {
|
||||
if let Some(proc) = self.decoder_term.lock().unwrap().as_mut() {
|
||||
if let Err(e) = proc.kill() {
|
||||
@ -92,7 +92,7 @@ impl ProcessControl {
|
||||
}
|
||||
}
|
||||
|
||||
if let Err(e) = self.wait(proc) {
|
||||
if let Err(e) = self.wait(unit) {
|
||||
return Err(e);
|
||||
};
|
||||
|
||||
@ -101,8 +101,8 @@ impl ProcessControl {
|
||||
|
||||
/// Wait for process to proper close.
|
||||
/// This prevents orphaned/zombi processes in system
|
||||
pub fn wait(&mut self, proc: ProcessUnit) -> Result<(), String> {
|
||||
match proc {
|
||||
pub fn wait(&self, unit: ProcessUnit) -> Result<(), String> {
|
||||
match unit {
|
||||
Decoder => {
|
||||
if let Some(proc) = self.decoder_term.lock().unwrap().as_mut() {
|
||||
if let Err(e) = proc.wait() {
|
||||
|
@ -112,6 +112,7 @@ fn loop_playlist(
|
||||
cmd: item.cmd.clone(),
|
||||
probe: item.probe.clone(),
|
||||
process: Some(true),
|
||||
is_live: Some(false),
|
||||
last_ad: Some(false),
|
||||
next_ad: Some(false),
|
||||
filter: Some(vec![]),
|
||||
|
@ -4,7 +4,7 @@ use std::{
|
||||
io::{BufRead, BufReader, Error},
|
||||
net::TcpListener,
|
||||
path::{Path, PathBuf},
|
||||
process::{ChildStderr, Command, Stdio, exit},
|
||||
process::{exit, ChildStderr, Command, Stdio},
|
||||
sync::{Arc, Mutex},
|
||||
time::{self, UNIX_EPOCH},
|
||||
};
|
||||
@ -80,6 +80,9 @@ pub struct Media {
|
||||
|
||||
#[serde(skip_serializing, skip_deserializing)]
|
||||
pub process: Option<bool>,
|
||||
|
||||
#[serde(skip_serializing, skip_deserializing)]
|
||||
pub is_live: Option<bool>,
|
||||
}
|
||||
|
||||
impl Media {
|
||||
@ -114,6 +117,7 @@ impl Media {
|
||||
last_ad: Some(false),
|
||||
next_ad: Some(false),
|
||||
process: Some(true),
|
||||
is_live: Some(false),
|
||||
}
|
||||
}
|
||||
|
||||
@ -213,6 +217,17 @@ impl MediaProbe {
|
||||
}
|
||||
}
|
||||
|
||||
/// Calculate fps from rae/factor string
|
||||
pub fn fps_calc(r_frame_rate: &str, default: f64) -> f64 {
|
||||
let mut fps = default;
|
||||
|
||||
if let Some((r, f)) = r_frame_rate.split_once('/') {
|
||||
fps = r.parse::<f64>().unwrap_or(1.0) / f.parse::<f64>().unwrap_or(1.0);
|
||||
}
|
||||
|
||||
fps
|
||||
}
|
||||
|
||||
/// Covert JSON string to ffmpeg filter command.
|
||||
pub fn get_filter_from_json(raw_text: String) -> String {
|
||||
let re1 = Regex::new(r#""|}|\{"#).unwrap();
|
||||
@ -645,7 +660,11 @@ pub fn format_log_line(line: String, level: &str) -> String {
|
||||
|
||||
/// Read ffmpeg stderr decoder and encoder instance
|
||||
/// and log the output.
|
||||
pub fn stderr_reader(buffer: BufReader<ChildStderr>, suffix: &str, mut proc_control: ProcessControl) -> Result<(), Error> {
|
||||
pub fn stderr_reader(
|
||||
buffer: BufReader<ChildStderr>,
|
||||
suffix: &str,
|
||||
mut proc_control: ProcessControl,
|
||||
) -> Result<(), Error> {
|
||||
for line in buffer.lines() {
|
||||
let line = line?;
|
||||
|
||||
@ -659,18 +678,17 @@ pub fn stderr_reader(buffer: BufReader<ChildStderr>, suffix: &str, mut proc_cont
|
||||
"<bright black>[{suffix}]</> {}",
|
||||
format_log_line(line, "warning")
|
||||
)
|
||||
} else if line.contains("[error]") {
|
||||
} else if line.contains("[error]") || line.contains("[fatal]") {
|
||||
error!(
|
||||
"<bright black>[{suffix}]</> {}",
|
||||
format_log_line(line, "error")
|
||||
)
|
||||
} else if line.contains("[fatal]") {
|
||||
error!(
|
||||
"<bright black>[{suffix}]</> {}",
|
||||
format_log_line(line.clone(), "fatal")
|
||||
line.replace("[error]", "").replace("[fatal]", "")
|
||||
);
|
||||
|
||||
if line.contains("Invalid argument") || line.contains("Numerical result") {
|
||||
if line.contains("Invalid argument")
|
||||
|| line.contains("Numerical result")
|
||||
|| line.contains("No such file or directory")
|
||||
|| line.contains("Error initializing complex filters")
|
||||
{
|
||||
proc_control.kill_all();
|
||||
exit(1);
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user