cleanup, add audio filter to ingest,
This commit is contained in:
parent
e4dd9db22d
commit
f3eb038a4a
@ -201,17 +201,13 @@ fn add_text(node: &mut Media, chain: &mut Filters, config: &GlobalConfig) {
|
|||||||
|
|
||||||
chain.add_filter(filter, "video".into());
|
chain.add_filter(filter, "video".into());
|
||||||
|
|
||||||
match &chain.video_chain {
|
if let Some(filters) = &chain.video_chain {
|
||||||
Some(filters) => {
|
for (i, f) in filters.split(",").enumerate() {
|
||||||
for (i, f) in filters.split(",").enumerate() {
|
if f.contains("drawtext") && !config.text.text_from_filename {
|
||||||
if f.contains("drawtext") && !config.text.text_from_filename {
|
debug!("drawtext node is on index: <yellow>{i}</>");
|
||||||
debug!("drawtext node is on index: <yellow>{i}</>");
|
break;
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
None => (),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -249,9 +245,10 @@ fn extend_audio(node: &mut Media, chain: &mut Filters) {
|
|||||||
fn add_loudnorm(node: &mut Media, chain: &mut Filters, config: &GlobalConfig) {
|
fn add_loudnorm(node: &mut Media, chain: &mut Filters, config: &GlobalConfig) {
|
||||||
// add single pass loudnorm filter to audio line
|
// add single pass loudnorm filter to audio line
|
||||||
|
|
||||||
let audio_streams = node.probe.clone().unwrap().audio_streams.unwrap();
|
if node.probe.is_some()
|
||||||
|
&& node.probe.clone().unwrap().audio_streams.unwrap().len() > 0
|
||||||
if audio_streams.len() > 0 && config.processing.add_loudnorm {
|
&& config.processing.add_loudnorm
|
||||||
|
{
|
||||||
let loud_filter = format!(
|
let loud_filter = format!(
|
||||||
"loudnorm=I={}:TP={}:LRA={}",
|
"loudnorm=I={}:TP={}:LRA={}",
|
||||||
config.processing.loud_i, config.processing.loud_tp, config.processing.loud_lra
|
config.processing.loud_i, config.processing.loud_tp, config.processing.loud_lra
|
||||||
@ -318,15 +315,16 @@ pub fn filter_chains(node: &mut Media) -> Vec<String> {
|
|||||||
&config,
|
&config,
|
||||||
);
|
);
|
||||||
extend_video(node, &mut filters);
|
extend_video(node, &mut filters);
|
||||||
add_text(node, &mut filters, &config);
|
|
||||||
add_audio(node, &mut filters);
|
add_audio(node, &mut filters);
|
||||||
extend_audio(node, &mut filters);
|
extend_audio(node, &mut filters);
|
||||||
add_loudnorm(node, &mut filters, &config);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
add_text(node, &mut filters, &config);
|
||||||
fade(node, &mut filters, "video".into());
|
fade(node, &mut filters, "video".into());
|
||||||
overlay(node, &mut filters, &config);
|
overlay(node, &mut filters, &config);
|
||||||
|
|
||||||
|
add_loudnorm(node, &mut filters, &config);
|
||||||
fade(node, &mut filters, "audio".into());
|
fade(node, &mut filters, "audio".into());
|
||||||
audio_volume(&mut filters, &config);
|
audio_volume(&mut filters, &config);
|
||||||
|
|
||||||
|
@ -31,15 +31,34 @@ fn overlay(config: &GlobalConfig) -> String {
|
|||||||
logo_chain
|
logo_chain
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn audio_filter(config: &GlobalConfig) -> String {
|
||||||
|
let mut audio_chain = ";[0:a]anull".to_string();
|
||||||
|
|
||||||
|
if config.processing.add_loudnorm {
|
||||||
|
audio_chain.push_str(format!(
|
||||||
|
",loudnorm=I={}:TP={}:LRA={}",
|
||||||
|
config.processing.loud_i, config.processing.loud_tp, config.processing.loud_lra
|
||||||
|
).as_str());
|
||||||
|
}
|
||||||
|
|
||||||
|
if config.processing.volume != 1.0 {
|
||||||
|
audio_chain.push_str(format!(",volume={}", config.processing.volume).as_str());
|
||||||
|
}
|
||||||
|
|
||||||
|
audio_chain.push_str("[aout1]");
|
||||||
|
|
||||||
|
audio_chain
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn ingest_server(
|
pub async fn ingest_server(
|
||||||
log_format: String,
|
log_format: String,
|
||||||
ingest_sender: Sender<[u8; 32256]>,
|
ingest_sender: Sender<[u8; 65424]>,
|
||||||
rt_handle: Handle,
|
rt_handle: Handle,
|
||||||
proc_terminator: Arc<Mutex<Option<Terminator>>>,
|
proc_terminator: Arc<Mutex<Option<Terminator>>>,
|
||||||
is_terminated: Arc<Mutex<bool>>,
|
is_terminated: Arc<Mutex<bool>>,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
let config = GlobalConfig::global();
|
let config = GlobalConfig::global();
|
||||||
let mut buffer: [u8; 32256] = [0; 32256];
|
let mut buffer: [u8; 65424] = [0; 65424];
|
||||||
let mut filter = format!(
|
let mut filter = format!(
|
||||||
"[0:v]fps={},scale={}:{},setdar=dar={}",
|
"[0:v]fps={},scale={}:{},setdar=dar={}",
|
||||||
config.processing.fps,
|
config.processing.fps,
|
||||||
@ -50,7 +69,8 @@ pub async fn ingest_server(
|
|||||||
|
|
||||||
filter.push_str(&overlay(&config));
|
filter.push_str(&overlay(&config));
|
||||||
filter.push_str("[vout1]");
|
filter.push_str("[vout1]");
|
||||||
let mut filter_list = vec!["-filter_complex", &filter, "-map", "[vout1]", "-map", "0:a"];
|
filter.push_str(audio_filter(&config).as_str());
|
||||||
|
let mut filter_list = vec!["-filter_complex", &filter, "-map", "[vout1]", "-map", "[aout1]"];
|
||||||
|
|
||||||
let mut server_cmd = vec!["-hide_banner", "-nostats", "-v", log_format.as_str()];
|
let mut server_cmd = vec!["-hide_banner", "-nostats", "-v", log_format.as_str()];
|
||||||
let stream_input = config.ingest.stream_input.clone();
|
let stream_input = config.ingest.stream_input.clone();
|
||||||
@ -69,7 +89,7 @@ pub async fn ingest_server(
|
|||||||
|
|
||||||
loop {
|
loop {
|
||||||
if *is_terminated.lock().unwrap() {
|
if *is_terminated.lock().unwrap() {
|
||||||
break
|
break;
|
||||||
}
|
}
|
||||||
let mut server_proc = match Command::new("ffmpeg")
|
let mut server_proc = match Command::new("ffmpeg")
|
||||||
.args(server_cmd.clone())
|
.args(server_cmd.clone())
|
||||||
|
@ -8,7 +8,7 @@ use tokio::runtime::Handle;
|
|||||||
|
|
||||||
use crate::utils::{
|
use crate::utils::{
|
||||||
check_sync, gen_dummy, get_delta, get_sec, is_close, json_reader::read_json, modified_time,
|
check_sync, gen_dummy, get_delta, get_sec, is_close, json_reader::read_json, modified_time,
|
||||||
sec_to_time, seek_and_length, GlobalConfig, Media, DUMMY_LEN,
|
seek_and_length, GlobalConfig, Media, DUMMY_LEN,
|
||||||
};
|
};
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
@ -161,11 +161,6 @@ impl CurrentProgram {
|
|||||||
self.index = i + 1;
|
self.index = i + 1;
|
||||||
item.seek = time_sec - item.begin.unwrap();
|
item.seek = time_sec - item.begin.unwrap();
|
||||||
|
|
||||||
println!("time_sec: {}", sec_to_time(time_sec));
|
|
||||||
println!("item.begin: {}", sec_to_time(item.begin.unwrap()));
|
|
||||||
println!("item.seek: {}", item.seek);
|
|
||||||
println!("item.out: {}", item.out);
|
|
||||||
|
|
||||||
self.current_node = handle_list_init(item.clone());
|
self.current_node = handle_list_init(item.clone());
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -28,7 +28,6 @@ pub fn play(rt_handle: &Handle) {
|
|||||||
let ff_log_format = format!("level+{}", config.logging.ffmpeg_level.to_lowercase());
|
let ff_log_format = format!("level+{}", config.logging.ffmpeg_level.to_lowercase());
|
||||||
|
|
||||||
let decoder_term: Arc<Mutex<Option<Terminator>>> = Arc::new(Mutex::new(None));
|
let decoder_term: Arc<Mutex<Option<Terminator>>> = Arc::new(Mutex::new(None));
|
||||||
let encoder_term: Arc<Mutex<Option<Terminator>>> = Arc::new(Mutex::new(None));
|
|
||||||
let server_term: Arc<Mutex<Option<Terminator>>> = Arc::new(Mutex::new(None));
|
let server_term: Arc<Mutex<Option<Terminator>>> = Arc::new(Mutex::new(None));
|
||||||
let is_terminated: Arc<Mutex<bool>> = Arc::new(Mutex::new(false));
|
let is_terminated: Arc<Mutex<bool>> = Arc::new(Mutex::new(false));
|
||||||
let mut init_playlist: Option<Arc<Mutex<bool>>> = None;
|
let mut init_playlist: Option<Arc<Mutex<bool>>> = None;
|
||||||
@ -78,18 +77,12 @@ pub fn play(rt_handle: &Handle) {
|
|||||||
_ => panic!("Output mode doesn't exists!"),
|
_ => panic!("Output mode doesn't exists!"),
|
||||||
};
|
};
|
||||||
|
|
||||||
let enc_terminator = match enc_proc.terminator() {
|
|
||||||
Ok(proc) => Some(proc),
|
|
||||||
Err(_) => None,
|
|
||||||
};
|
|
||||||
*encoder_term.lock().unwrap() = enc_terminator;
|
|
||||||
|
|
||||||
rt_handle.spawn(stderr_reader(
|
rt_handle.spawn(stderr_reader(
|
||||||
enc_proc.stderr.take().unwrap(),
|
enc_proc.stderr.take().unwrap(),
|
||||||
"Encoder".to_string(),
|
"Encoder".to_string(),
|
||||||
));
|
));
|
||||||
|
|
||||||
let (ingest_sender, ingest_receiver): (Sender<[u8; 32256]>, Receiver<([u8; 32256])>) =
|
let (ingest_sender, ingest_receiver): (Sender<[u8; 65424]>, Receiver<([u8; 65424])>) =
|
||||||
channel();
|
channel();
|
||||||
|
|
||||||
if config.ingest.enable {
|
if config.ingest.enable {
|
||||||
@ -160,11 +153,6 @@ pub fn play(rt_handle: &Handle) {
|
|||||||
let mut kill_dec = true;
|
let mut kill_dec = true;
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let dec_bytes_len = match dec_reader.read(&mut buffer[..]) {
|
|
||||||
Ok(length) => length,
|
|
||||||
Err(e) => panic!("Reading error from decoder: {:?}", e),
|
|
||||||
};
|
|
||||||
|
|
||||||
if let Ok(receive) = ingest_receiver.try_recv() {
|
if let Ok(receive) = ingest_receiver.try_recv() {
|
||||||
if let Err(e) = enc_writer.write_all(&receive) {
|
if let Err(e) = enc_writer.write_all(&receive) {
|
||||||
error!("Ingest receiver error: {:?}", e);
|
error!("Ingest receiver error: {:?}", e);
|
||||||
@ -189,20 +177,31 @@ pub fn play(rt_handle: &Handle) {
|
|||||||
*init.lock().unwrap() = true;
|
*init.lock().unwrap() = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else if dec_bytes_len > 0 {
|
|
||||||
if let Err(e) = enc_writer.write(&buffer[..dec_bytes_len]) {
|
|
||||||
error!("Encoder write error: {:?}", e);
|
|
||||||
|
|
||||||
break 'source_iter;
|
|
||||||
};
|
|
||||||
} else {
|
} else {
|
||||||
if live_on {
|
let dec_bytes_len = match dec_reader.read(&mut buffer[..]) {
|
||||||
info!("Switch from live ingest to {}", config.processing.mode);
|
Ok(length) => length,
|
||||||
|
Err(e) => {
|
||||||
|
error!("Reading error from decoder: {:?}", e);
|
||||||
|
|
||||||
live_on = false;
|
break 'source_iter;
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
if dec_bytes_len > 0 {
|
||||||
|
if let Err(e) = enc_writer.write(&buffer[..dec_bytes_len]) {
|
||||||
|
error!("Encoder write error: {:?}", e);
|
||||||
|
|
||||||
|
break 'source_iter;
|
||||||
|
};
|
||||||
|
} else {
|
||||||
|
if live_on {
|
||||||
|
info!("Switch from live ingest to {}", config.processing.mode);
|
||||||
|
|
||||||
|
live_on = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -213,31 +212,30 @@ pub fn play(rt_handle: &Handle) {
|
|||||||
|
|
||||||
*is_terminated.lock().unwrap() = true;
|
*is_terminated.lock().unwrap() = true;
|
||||||
|
|
||||||
sleep(Duration::from_secs(1));
|
if let Some(server) = &*server_term.lock().unwrap() {
|
||||||
|
unsafe {
|
||||||
|
if let Ok(_) = server.terminate() {
|
||||||
|
info!("Terminate ingest server done");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
if let Some(dec) = &*decoder_term.lock().unwrap() {
|
if let Some(dec) = &*decoder_term.lock().unwrap() {
|
||||||
unsafe {
|
unsafe {
|
||||||
if let Ok(_) = dec.terminate() {
|
if let Ok(_) = dec.terminate() {
|
||||||
debug!("Terminate decoder done");
|
info!("Terminate decoder done");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
if let Some(enc) = &*encoder_term.lock().unwrap() {
|
sleep(Duration::from_secs(1));
|
||||||
unsafe {
|
|
||||||
if let Ok(_) = enc.terminate() {
|
|
||||||
debug!("Terminate encoder done");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
if let Some(server) = &*server_term.lock().unwrap() {
|
match enc_proc.kill() {
|
||||||
unsafe {
|
Ok(_) => info!("Playout done..."),
|
||||||
if let Ok(_) = server.terminate() {
|
Err(e) => panic!("Encoder error: {:?}", e),
|
||||||
debug!("Terminate server done");
|
}
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
info!("Playout done...");
|
if let Err(e) = enc_proc.wait() {
|
||||||
|
error!("Encoder: {e}")
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user