correct switch from ingest to decoder
This commit is contained in:
parent
7c7bc06b7f
commit
8cb9163691
@ -35,10 +35,13 @@ fn audio_filter(config: &GlobalConfig) -> String {
|
||||
let mut audio_chain = ";[0:a]anull".to_string();
|
||||
|
||||
if config.processing.add_loudnorm {
|
||||
audio_chain.push_str(format!(
|
||||
audio_chain.push_str(
|
||||
format!(
|
||||
",loudnorm=I={}:TP={}:LRA={}",
|
||||
config.processing.loud_i, config.processing.loud_tp, config.processing.loud_lra
|
||||
).as_str());
|
||||
)
|
||||
.as_str(),
|
||||
);
|
||||
}
|
||||
|
||||
if config.processing.volume != 1.0 {
|
||||
@ -52,13 +55,14 @@ fn audio_filter(config: &GlobalConfig) -> String {
|
||||
|
||||
pub async fn ingest_server(
|
||||
log_format: String,
|
||||
ingest_sender: Sender<[u8; 65424]>,
|
||||
ingest_sender: Sender<(usize, [u8; 32256])>,
|
||||
rt_handle: Handle,
|
||||
proc_terminator: Arc<Mutex<Option<Terminator>>>,
|
||||
is_terminated: Arc<Mutex<bool>>,
|
||||
server_is_running: Arc<Mutex<bool>>,
|
||||
) -> Result<(), Error> {
|
||||
let config = GlobalConfig::global();
|
||||
let mut buffer: [u8; 65424] = [0; 65424];
|
||||
let mut buffer: [u8; 32256] = [0; 32256];
|
||||
let mut filter = format!(
|
||||
"[0:v]fps={},scale={}:{},setdar=dar={}",
|
||||
config.processing.fps,
|
||||
@ -70,7 +74,14 @@ pub async fn ingest_server(
|
||||
filter.push_str(&overlay(&config));
|
||||
filter.push_str("[vout1]");
|
||||
filter.push_str(audio_filter(&config).as_str());
|
||||
let mut filter_list = vec!["-filter_complex", &filter, "-map", "[vout1]", "-map", "[aout1]"];
|
||||
let mut filter_list = vec![
|
||||
"-filter_complex",
|
||||
&filter,
|
||||
"-map",
|
||||
"[vout1]",
|
||||
"-map",
|
||||
"[aout1]",
|
||||
];
|
||||
|
||||
let mut server_cmd = vec!["-hide_banner", "-nostats", "-v", log_format.as_str()];
|
||||
let stream_input = config.ingest.stream_input.clone();
|
||||
@ -80,6 +91,8 @@ pub async fn ingest_server(
|
||||
server_cmd.append(&mut filter_list);
|
||||
server_cmd.append(&mut stream_settings.iter().map(String::as_str).collect());
|
||||
|
||||
let mut is_running;
|
||||
|
||||
info!(
|
||||
"Start ingest server, listening on: <b><magenta>{}</></b>",
|
||||
stream_input.last().unwrap()
|
||||
@ -113,30 +126,45 @@ pub async fn ingest_server(
|
||||
));
|
||||
|
||||
let ingest_reader = server_proc.stdout.as_mut().unwrap();
|
||||
is_running = false;
|
||||
|
||||
loop {
|
||||
if let Err(e) = ingest_reader.read_exact(&mut buffer[..]) {
|
||||
if !e.to_string().contains("failed to fill whole buffer") {
|
||||
let bytes_len = match ingest_reader.read(&mut buffer[..]) {
|
||||
Ok(length) => length,
|
||||
Err(e) => {
|
||||
debug!("Ingest server read {:?}", e);
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
if let Err(e) = ingest_sender.send(buffer) {
|
||||
if !is_running {
|
||||
*server_is_running.lock().unwrap() = true;
|
||||
is_running = true;
|
||||
}
|
||||
|
||||
if bytes_len > 0 {
|
||||
if let Err(e) = ingest_sender.send((bytes_len, buffer)) {
|
||||
error!("Ingest server write error: {:?}", e);
|
||||
|
||||
*is_terminated.lock().unwrap() = true;
|
||||
server_proc.kill().expect("Ingest server could not killed");
|
||||
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
*server_is_running.lock().unwrap() = false;
|
||||
|
||||
sleep(Duration::from_secs(1));
|
||||
|
||||
if let Err(e) = server_proc.kill() {
|
||||
error!("Ingest server {:?}", e)
|
||||
};
|
||||
|
||||
if let Err(e) = server_proc.wait() {
|
||||
panic!("Ingest server {:?}", e)
|
||||
error!("Ingest server {:?}", e)
|
||||
};
|
||||
}
|
||||
|
||||
|
@ -1,6 +1,6 @@
|
||||
use notify::{watcher, RecursiveMode, Watcher};
|
||||
use std::{
|
||||
io::{prelude::*, Read},
|
||||
io::{prelude::*, BufReader, Read},
|
||||
path::Path,
|
||||
process,
|
||||
process::{Command, Stdio},
|
||||
@ -12,7 +12,7 @@ use std::{
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use process_control::{ChildExt, Terminator};
|
||||
use process_control::Terminator;
|
||||
use simplelog::*;
|
||||
use tokio::runtime::Handle;
|
||||
|
||||
@ -27,9 +27,9 @@ pub fn play(rt_handle: &Handle) {
|
||||
let dec_settings = config.processing.clone().settings.unwrap();
|
||||
let ff_log_format = format!("level+{}", config.logging.ffmpeg_level.to_lowercase());
|
||||
|
||||
let decoder_term: Arc<Mutex<Option<Terminator>>> = Arc::new(Mutex::new(None));
|
||||
let server_term: Arc<Mutex<Option<Terminator>>> = Arc::new(Mutex::new(None));
|
||||
let is_terminated: Arc<Mutex<bool>> = Arc::new(Mutex::new(false));
|
||||
let server_is_running: Arc<Mutex<bool>> = Arc::new(Mutex::new(false));
|
||||
let mut init_playlist: Option<Arc<Mutex<bool>>> = None;
|
||||
let mut live_on = false;
|
||||
|
||||
@ -82,8 +82,10 @@ pub fn play(rt_handle: &Handle) {
|
||||
"Encoder".to_string(),
|
||||
));
|
||||
|
||||
let (ingest_sender, ingest_receiver): (Sender<[u8; 65424]>, Receiver<([u8; 65424])>) =
|
||||
channel();
|
||||
let (ingest_sender, ingest_receiver): (
|
||||
Sender<(usize, [u8; 32256])>,
|
||||
Receiver<(usize, [u8; 32256])>,
|
||||
) = channel();
|
||||
|
||||
if config.ingest.enable {
|
||||
rt_handle.spawn(ingest_server(
|
||||
@ -92,6 +94,7 @@ pub fn play(rt_handle: &Handle) {
|
||||
rt_handle.clone(),
|
||||
server_term.clone(),
|
||||
is_terminated.clone(),
|
||||
server_is_running.clone(),
|
||||
));
|
||||
}
|
||||
|
||||
@ -136,14 +139,8 @@ pub fn play(rt_handle: &Handle) {
|
||||
Ok(proc) => proc,
|
||||
};
|
||||
|
||||
let dec_terminator = match dec_proc.terminator() {
|
||||
Ok(proc) => Some(proc),
|
||||
Err(_) => None,
|
||||
};
|
||||
*decoder_term.lock().unwrap() = dec_terminator;
|
||||
|
||||
let mut enc_writer = enc_proc.stdin.as_ref().unwrap();
|
||||
let dec_reader = dec_proc.stdout.as_mut().unwrap();
|
||||
let mut dec_reader = BufReader::new(dec_proc.stdout.take().unwrap());
|
||||
|
||||
rt_handle.spawn(stderr_reader(
|
||||
dec_proc.stderr.take().unwrap(),
|
||||
@ -153,22 +150,24 @@ pub fn play(rt_handle: &Handle) {
|
||||
let mut kill_dec = true;
|
||||
|
||||
loop {
|
||||
if *server_is_running.lock().unwrap() {
|
||||
if let Ok(receive) = ingest_receiver.try_recv() {
|
||||
if let Err(e) = enc_writer.write_all(&receive) {
|
||||
if let Err(e) = enc_writer.write(&receive.1[..receive.0]) {
|
||||
error!("Ingest receiver error: {:?}", e);
|
||||
|
||||
break 'source_iter;
|
||||
};
|
||||
}
|
||||
|
||||
live_on = true;
|
||||
|
||||
if kill_dec {
|
||||
if let Some(dec) = &*decoder_term.lock().unwrap() {
|
||||
unsafe {
|
||||
if let Ok(_) = dec.terminate() {
|
||||
info!("Switch from {} to live ingest", config.processing.mode);
|
||||
}
|
||||
}
|
||||
if let Err(e) = dec_proc.kill() {
|
||||
panic!("Decoder error: {:?}", e)
|
||||
};
|
||||
|
||||
if let Err(e) = dec_proc.wait() {
|
||||
panic!("Decoder error: {:?}", e)
|
||||
};
|
||||
|
||||
kill_dec = false;
|
||||
@ -184,7 +183,7 @@ pub fn play(rt_handle: &Handle) {
|
||||
error!("Reading error from decoder: {:?}", e);
|
||||
|
||||
break 'source_iter;
|
||||
},
|
||||
}
|
||||
};
|
||||
|
||||
if dec_bytes_len > 0 {
|
||||
@ -200,6 +199,8 @@ pub fn play(rt_handle: &Handle) {
|
||||
live_on = false;
|
||||
}
|
||||
|
||||
enc_writer.flush().unwrap();
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
@ -220,14 +221,6 @@ pub fn play(rt_handle: &Handle) {
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(dec) = &*decoder_term.lock().unwrap() {
|
||||
unsafe {
|
||||
if let Ok(_) = dec.terminate() {
|
||||
info!("Terminate decoder done");
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
sleep(Duration::from_secs(1));
|
||||
|
||||
match enc_proc.kill() {
|
||||
|
Loading…
x
Reference in New Issue
Block a user