use two ports for zmq for HLS mode, fix filter chain from ingest filter, fix drawtext position from hls ingest server

This commit is contained in:
jb-alvarado 2022-08-09 18:04:25 +02:00
parent 1f42fce994
commit fb12d98020
12 changed files with 105 additions and 27 deletions

View File

@ -34,6 +34,11 @@ pub fn log_line(line: String, level: &str) {
"<bright black>[Server]</> {}", "<bright black>[Server]</> {}",
format_log_line(line, "error") format_log_line(line, "error")
); );
} else if line.contains("[fatal]") {
error!(
"<bright black>[Server]</> {}",
format_log_line(line, "fatal")
)
} }
} }

View File

@ -18,7 +18,7 @@ pub fn output(config: &PlayoutConfig, log_format: &str) -> process::Child {
let mut enc_cmd = vec_strings!["-hide_banner", "-nostats", "-v", log_format, "-i", "pipe:0"]; let mut enc_cmd = vec_strings!["-hide_banner", "-nostats", "-v", log_format, "-i", "pipe:0"];
if config.text.add_text && !config.text.text_from_filename { if config.text.add_text && !config.text.text_from_filename {
if let Some(socket) = config.text.bind_address.clone() { if let Some(socket) = config.text.zmq_stream_socket.clone() {
debug!( debug!(
"Using drawtext filter, listening on address: <yellow>{}</>", "Using drawtext filter, listening on address: <yellow>{}</>",
socket socket
@ -26,7 +26,8 @@ pub fn output(config: &PlayoutConfig, log_format: &str) -> process::Child {
let mut filter: String = "null,".to_string(); let mut filter: String = "null,".to_string();
filter.push_str( filter.push_str(
v_drawtext::filter_node(config, None, &Arc::new(Mutex::new(vec![]))).as_str(), v_drawtext::filter_node(config, None, &Arc::new(Mutex::new(vec![])), false)
.as_str(),
); );
enc_filter = vec!["-vf".to_string(), filter]; enc_filter = vec!["-vf".to_string(), filter];
} }

View File

@ -49,6 +49,20 @@ fn ingest_to_hls_server(
server_prefix.append(&mut stream_input); server_prefix.append(&mut stream_input);
let server_filter = filter_cmd(&config, &playout_stat.chain); let server_filter = filter_cmd(&config, &playout_stat.chain);
if server_filter.len() > 1 {
let filter_chain = server_filter[1]
.split_terminator([',', ';'])
.collect::<Vec<&str>>();
for (i, link) in filter_chain.iter().enumerate() {
if link.contains("drawtext") {
playout_stat
.drawtext_server_index
.store(i, Ordering::SeqCst);
}
}
}
let server_cmd = prepare_output_cmd( let server_cmd = prepare_output_cmd(
server_prefix, server_prefix,
server_filter, server_filter,

View File

@ -27,7 +27,7 @@ pub fn output(config: &PlayoutConfig, log_format: &str) -> process::Child {
]; ];
if config.text.add_text && !config.text.text_from_filename { if config.text.add_text && !config.text.text_from_filename {
if let Some(socket) = config.text.bind_address.clone() { if let Some(socket) = config.text.zmq_stream_socket.clone() {
debug!( debug!(
"Using drawtext filter, listening on address: <yellow>{}</>", "Using drawtext filter, listening on address: <yellow>{}</>",
socket socket
@ -35,7 +35,8 @@ pub fn output(config: &PlayoutConfig, log_format: &str) -> process::Child {
let mut filter: String = "null,".to_string(); let mut filter: String = "null,".to_string();
filter.push_str( filter.push_str(
v_drawtext::filter_node(config, None, &Arc::new(Mutex::new(vec![]))).as_str(), v_drawtext::filter_node(config, None, &Arc::new(Mutex::new(vec![])), false)
.as_str(),
); );
enc_filter = vec!["-vf".to_string(), filter]; enc_filter = vec!["-vf".to_string(), filter];
} }

View File

@ -28,7 +28,7 @@ pub fn output(config: &PlayoutConfig, log_format: &str) -> process::Child {
]; ];
if config.text.add_text && !config.text.text_from_filename { if config.text.add_text && !config.text.text_from_filename {
if let Some(socket) = config.text.bind_address.clone() { if let Some(socket) = config.text.zmq_stream_socket.clone() {
debug!( debug!(
"Using drawtext filter, listening on address: <yellow>{}</>", "Using drawtext filter, listening on address: <yellow>{}</>",
socket socket
@ -37,7 +37,8 @@ pub fn output(config: &PlayoutConfig, log_format: &str) -> process::Child {
let mut filter = "[0:v]null,".to_string(); let mut filter = "[0:v]null,".to_string();
filter.push_str( filter.push_str(
v_drawtext::filter_node(config, None, &Arc::new(Mutex::new(vec![]))).as_str(), v_drawtext::filter_node(config, None, &Arc::new(Mutex::new(vec![])), false)
.as_str(),
); );
enc_filter = vec!["-filter_complex".to_string(), filter]; enc_filter = vec!["-filter_complex".to_string(), filter];

View File

@ -1,8 +1,8 @@
use futures::executor;
use std::{process::exit, sync::atomic::Ordering}; use std::{process::exit, sync::atomic::Ordering};
mod zmq_cmd; mod zmq_cmd;
use futures::executor;
use jsonrpc_http_server::{ use jsonrpc_http_server::{
hyper, hyper,
jsonrpc_core::{IoHandler, Params, Value}, jsonrpc_core::{IoHandler, Params, Value},
@ -83,20 +83,59 @@ pub fn json_rpc_server(
&& &map["control"] == "text" && &map["control"] == "text"
&& map.contains_key("message") && map.contains_key("message")
{ {
let mut filter = get_filter_from_json(map["message"].to_string()); let filter = get_filter_from_json(map["message"].to_string());
let socket = config.text.bind_address.clone();
// TODO: in Rust 1.64 use let_chains instead // TODO: in Rust 1.64 use let_chains instead
if !filter.is_empty() && config.text.bind_address.is_some() { if !filter.is_empty() && config.text.zmq_stream_socket.is_some() {
let mut clips_filter = playout_stat.chain.lock().unwrap(); let mut clips_filter = playout_stat.chain.lock().unwrap();
*clips_filter = vec![filter.clone()]; *clips_filter = vec![filter.clone()];
filter = format!("Parsed_drawtext_2 reinit {filter}");
if let Ok(reply) = executor::block_on(zmq_send(&filter, &socket.unwrap())) { let reply = executor::block_on(async {
return Ok(Value::String(reply)); let mut reply_text = String::new();
if config.out.mode != "hls"
|| !proc.server_is_running.load(Ordering::SeqCst)
{
let filter_stream = format!(
"Parsed_drawtext_{} reinit {filter}",
playout_stat.drawtext_stream_index.load(Ordering::SeqCst)
);
if let Ok(reply) = zmq_send(
&filter_stream,
&config.text.zmq_stream_socket.clone().unwrap(),
)
.await
{
reply_text = reply;
}; };
} }
if config.out.mode == "hls" && proc.server_is_running.load(Ordering::SeqCst)
{
let filter_server = format!(
"Parsed_drawtext_{} reinit {filter}",
playout_stat.drawtext_server_index.load(Ordering::SeqCst)
);
if let Ok(reply) = zmq_send(
&filter_server,
&config.text.zmq_server_socket.clone().unwrap(),
)
.await
{
reply_text = reply;
};
}
reply_text
});
if !reply.is_empty() {
return Ok(Value::String(reply));
}
}
return Ok(Value::String("Last clip can not be skipped".to_string())); return Ok(Value::String("Last clip can not be skipped".to_string()));
} }

View File

@ -34,18 +34,18 @@ pub fn filter_cmd(config: &PlayoutConfig, filter_chain: &Arc<Mutex<Vec<String>>>
); );
let overlay = v_overlay::filter_node(config, true); let overlay = v_overlay::filter_node(config, true);
let drawtext = v_drawtext::filter_node(config, None, filter_chain); let drawtext = v_drawtext::filter_node(config, None, filter_chain, true);
if !overlay.is_empty() { if !overlay.is_empty() {
filter.push(','); filter.push(',');
}
if !drawtext.is_empty() {
filter.push(',');
}
filter.push_str(&overlay); filter.push_str(&overlay);
}
if config.out.mode == "hls" && !drawtext.is_empty() {
filter.push(',');
filter.push_str(&drawtext); filter.push_str(&drawtext);
}
filter.push_str("[vout1]"); filter.push_str("[vout1]");
filter.push_str(audio_filter(config).as_str()); filter.push_str(audio_filter(config).as_str());

View File

@ -205,7 +205,7 @@ fn add_text(
if config.text.add_text if config.text.add_text
&& (config.text.text_from_filename || config.out.mode.to_lowercase() == "hls") && (config.text.text_from_filename || config.out.mode.to_lowercase() == "hls")
{ {
let filter = v_drawtext::filter_node(config, Some(node), filter_chain); let filter = v_drawtext::filter_node(config, Some(node), filter_chain, false);
chain.add_filter(&filter, "video"); chain.add_filter(&filter, "video");
} }

View File

@ -11,6 +11,7 @@ pub fn filter_node(
config: &PlayoutConfig, config: &PlayoutConfig,
node: Option<&Media>, node: Option<&Media>,
filter_chain: &Arc<Mutex<Vec<String>>>, filter_chain: &Arc<Mutex<Vec<String>>>,
is_server: bool,
) -> String { ) -> String {
let mut filter = String::new(); let mut filter = String::new();
let mut font = String::new(); let mut font = String::new();
@ -20,6 +21,11 @@ pub fn filter_node(
font = format!(":fontfile='{}'", config.text.fontfile) font = format!(":fontfile='{}'", config.text.fontfile)
} }
let zmq_socket = match is_server {
true => config.text.zmq_server_socket.clone(),
false => config.text.zmq_stream_socket.clone(),
};
// TODO: in Rust 1.64 use let_chains instead // TODO: in Rust 1.64 use let_chains instead
if config.text.text_from_filename && node.is_some() { if config.text.text_from_filename && node.is_some() {
let source = node let source = node
@ -38,7 +44,7 @@ pub fn filter_node(
.replace('%', "\\\\\\%") .replace('%', "\\\\\\%")
.replace(':', "\\:"); .replace(':', "\\:");
filter = format!("drawtext=text='{escape}':{}{font}", config.text.style) filter = format!("drawtext=text='{escape}':{}{font}", config.text.style)
} else if let Some(socket) = config.text.bind_address.clone() { } else if let Some(socket) = zmq_socket {
let chain = filter_chain.lock().unwrap(); let chain = filter_chain.lock().unwrap();
let mut filter_cmd = format!("text=''{font}"); let mut filter_cmd = format!("text=''{font}");

View File

@ -145,10 +145,13 @@ pub struct Text {
pub add_text: bool, pub add_text: bool,
#[serde(skip_serializing, skip_deserializing)] #[serde(skip_serializing, skip_deserializing)]
pub bind_address: Option<String>, pub node_pos: Option<usize>,
#[serde(skip_serializing, skip_deserializing)] #[serde(skip_serializing, skip_deserializing)]
pub node_pos: Option<usize>, pub zmq_stream_socket: Option<String>,
#[serde(skip_serializing, skip_deserializing)]
pub zmq_server_socket: Option<String>,
pub fontfile: String, pub fontfile: String,
pub text_from_filename: bool, pub text_from_filename: bool,
@ -251,10 +254,13 @@ impl PlayoutConfig {
// to get text messages from it // to get text messages from it
if config.text.add_text && !config.text.text_from_filename { if config.text.add_text && !config.text.text_from_filename {
config.rpc_server.enable = true; config.rpc_server.enable = true;
config.text.bind_address = free_tcp_socket(); config.text.zmq_stream_socket = free_tcp_socket(String::new());
config.text.zmq_server_socket =
free_tcp_socket(config.text.zmq_stream_socket.clone().unwrap_or_default());
config.text.node_pos = Some(2); config.text.node_pos = Some(2);
} else { } else {
config.text.bind_address = None; config.text.zmq_stream_socket = None;
config.text.zmq_server_socket = None;
config.text.node_pos = None; config.text.node_pos = None;
} }
@ -272,7 +278,7 @@ impl Default for PlayoutConfig {
/// s302m has higher quality, but is experimental /// s302m has higher quality, but is experimental
/// and works not well together with the loudnorm filter. /// and works not well together with the loudnorm filter.
fn pre_audio_codec(add_loudnorm: bool) -> Vec<String> { fn pre_audio_codec(add_loudnorm: bool) -> Vec<String> {
let mut codec = vec_strings!["-c:a", "s302m", "-strict", "-2"]; let mut codec = vec_strings!["-c:a", "s302m", "-strict", "-2", "-sample_fmt", "s16"];
if add_loudnorm { if add_loudnorm {
codec = vec_strings!["-c:a", "mp2", "-b:a", "384k"]; codec = vec_strings!["-c:a", "mp2", "-b:a", "384k"];

View File

@ -183,6 +183,8 @@ pub struct PlayoutStatus {
pub date: Arc<Mutex<String>>, pub date: Arc<Mutex<String>>,
pub list_init: Arc<AtomicBool>, pub list_init: Arc<AtomicBool>,
pub time_shift: Arc<Mutex<f64>>, pub time_shift: Arc<Mutex<f64>>,
pub drawtext_server_index: Arc<AtomicUsize>,
pub drawtext_stream_index: Arc<AtomicUsize>,
} }
impl PlayoutStatus { impl PlayoutStatus {
@ -193,6 +195,8 @@ impl PlayoutStatus {
date: Arc::new(Mutex::new(String::new())), date: Arc::new(Mutex::new(String::new())),
list_init: Arc::new(AtomicBool::new(true)), list_init: Arc::new(AtomicBool::new(true)),
time_shift: Arc::new(Mutex::new(0.0)), time_shift: Arc::new(Mutex::new(0.0)),
drawtext_server_index: Arc::new(AtomicUsize::new(2)),
drawtext_stream_index: Arc::new(AtomicUsize::new(2)),
} }
} }
} }

View File

@ -712,12 +712,13 @@ pub fn validate_ffmpeg(config: &PlayoutConfig) -> Result<(), String> {
} }
/// get a free tcp socket /// get a free tcp socket
pub fn free_tcp_socket() -> Option<String> { pub fn free_tcp_socket(exclude_socket: String) -> Option<String> {
for _ in 0..100 { for _ in 0..100 {
let port = rand::thread_rng().gen_range(45321..54268); let port = rand::thread_rng().gen_range(45321..54268);
let socket = format!("127.0.0.1:{port}");
if TcpListener::bind(("127.0.0.1", port)).is_ok() { if socket != exclude_socket && TcpListener::bind(("127.0.0.1", port)).is_ok() {
return Some(format!("127.0.0.1:{port}")); return Some(socket);
} }
} }