From 8cb916369187831639ee847fbdd2e21917b8b11b Mon Sep 17 00:00:00 2001 From: jb-alvarado Date: Sun, 20 Mar 2022 22:35:08 +0100 Subject: [PATCH] correct switch from ingest to decoder --- src/input/ingest.rs | 60 +++++++++++++++++++++++++++++++++------------ src/output/mod.rs | 57 +++++++++++++++++++----------------------- 2 files changed, 69 insertions(+), 48 deletions(-) diff --git a/src/input/ingest.rs b/src/input/ingest.rs index 1e26b7e9..eca8b768 100644 --- a/src/input/ingest.rs +++ b/src/input/ingest.rs @@ -35,10 +35,13 @@ fn audio_filter(config: &GlobalConfig) -> String { let mut audio_chain = ";[0:a]anull".to_string(); if config.processing.add_loudnorm { - audio_chain.push_str(format!( - ",loudnorm=I={}:TP={}:LRA={}", - config.processing.loud_i, config.processing.loud_tp, config.processing.loud_lra - ).as_str()); + audio_chain.push_str( + format!( + ",loudnorm=I={}:TP={}:LRA={}", + config.processing.loud_i, config.processing.loud_tp, config.processing.loud_lra + ) + .as_str(), + ); } if config.processing.volume != 1.0 { @@ -52,13 +55,14 @@ fn audio_filter(config: &GlobalConfig) -> String { pub async fn ingest_server( log_format: String, - ingest_sender: Sender<[u8; 65424]>, + ingest_sender: Sender<(usize, [u8; 32256])>, rt_handle: Handle, proc_terminator: Arc>>, is_terminated: Arc>, + server_is_running: Arc>, ) -> Result<(), Error> { let config = GlobalConfig::global(); - let mut buffer: [u8; 65424] = [0; 65424]; + let mut buffer: [u8; 32256] = [0; 32256]; let mut filter = format!( "[0:v]fps={},scale={}:{},setdar=dar={}", config.processing.fps, @@ -70,7 +74,14 @@ pub async fn ingest_server( filter.push_str(&overlay(&config)); filter.push_str("[vout1]"); filter.push_str(audio_filter(&config).as_str()); - let mut filter_list = vec!["-filter_complex", &filter, "-map", "[vout1]", "-map", "[aout1]"]; + let mut filter_list = vec![ + "-filter_complex", + &filter, + "-map", + "[vout1]", + "-map", + "[aout1]", + ]; let mut server_cmd = vec!["-hide_banner", "-nostats", "-v", log_format.as_str()]; let stream_input = config.ingest.stream_input.clone(); @@ -80,6 +91,8 @@ pub async fn ingest_server( server_cmd.append(&mut filter_list); server_cmd.append(&mut stream_settings.iter().map(String::as_str).collect()); + let mut is_running; + info!( "Start ingest server, listening on: {}", stream_input.last().unwrap() @@ -113,30 +126,45 @@ pub async fn ingest_server( )); let ingest_reader = server_proc.stdout.as_mut().unwrap(); + is_running = false; loop { - if let Err(e) = ingest_reader.read_exact(&mut buffer[..]) { - if !e.to_string().contains("failed to fill whole buffer") { + let bytes_len = match ingest_reader.read(&mut buffer[..]) { + Ok(length) => length, + Err(e) => { debug!("Ingest server read {:?}", e); - } - break; + break; + } }; - if let Err(e) = ingest_sender.send(buffer) { - error!("Ingest server write error: {:?}", e); + if !is_running { + *server_is_running.lock().unwrap() = true; + is_running = true; + } - *is_terminated.lock().unwrap() = true; - server_proc.kill().expect("Ingest server could not killed"); + if bytes_len > 0 { + if let Err(e) = ingest_sender.send((bytes_len, buffer)) { + error!("Ingest server write error: {:?}", e); + *is_terminated.lock().unwrap() = true; + break; + } + } else { break; } } + *server_is_running.lock().unwrap() = false; + sleep(Duration::from_secs(1)); + if let Err(e) = server_proc.kill() { + error!("Ingest server {:?}", e) + }; + if let Err(e) = server_proc.wait() { - panic!("Ingest server {:?}", e) + error!("Ingest server {:?}", e) }; } diff --git a/src/output/mod.rs b/src/output/mod.rs index 41777e2e..9c413913 100644 --- a/src/output/mod.rs +++ b/src/output/mod.rs @@ -1,6 +1,6 @@ use notify::{watcher, RecursiveMode, Watcher}; use std::{ - io::{prelude::*, Read}, + io::{prelude::*, BufReader, Read}, path::Path, process, process::{Command, Stdio}, @@ -12,7 +12,7 @@ use std::{ time::Duration, }; -use process_control::{ChildExt, Terminator}; +use process_control::Terminator; use simplelog::*; use tokio::runtime::Handle; @@ -27,9 +27,9 @@ pub fn play(rt_handle: &Handle) { let dec_settings = config.processing.clone().settings.unwrap(); let ff_log_format = format!("level+{}", config.logging.ffmpeg_level.to_lowercase()); - let decoder_term: Arc>> = Arc::new(Mutex::new(None)); let server_term: Arc>> = Arc::new(Mutex::new(None)); let is_terminated: Arc> = Arc::new(Mutex::new(false)); + let server_is_running: Arc> = Arc::new(Mutex::new(false)); let mut init_playlist: Option>> = None; let mut live_on = false; @@ -82,8 +82,10 @@ pub fn play(rt_handle: &Handle) { "Encoder".to_string(), )); - let (ingest_sender, ingest_receiver): (Sender<[u8; 65424]>, Receiver<([u8; 65424])>) = - channel(); + let (ingest_sender, ingest_receiver): ( + Sender<(usize, [u8; 32256])>, + Receiver<(usize, [u8; 32256])>, + ) = channel(); if config.ingest.enable { rt_handle.spawn(ingest_server( @@ -92,6 +94,7 @@ pub fn play(rt_handle: &Handle) { rt_handle.clone(), server_term.clone(), is_terminated.clone(), + server_is_running.clone(), )); } @@ -136,14 +139,8 @@ pub fn play(rt_handle: &Handle) { Ok(proc) => proc, }; - let dec_terminator = match dec_proc.terminator() { - Ok(proc) => Some(proc), - Err(_) => None, - }; - *decoder_term.lock().unwrap() = dec_terminator; - let mut enc_writer = enc_proc.stdin.as_ref().unwrap(); - let dec_reader = dec_proc.stdout.as_mut().unwrap(); + let mut dec_reader = BufReader::new(dec_proc.stdout.take().unwrap()); rt_handle.spawn(stderr_reader( dec_proc.stderr.take().unwrap(), @@ -153,22 +150,24 @@ pub fn play(rt_handle: &Handle) { let mut kill_dec = true; loop { - if let Ok(receive) = ingest_receiver.try_recv() { - if let Err(e) = enc_writer.write_all(&receive) { - error!("Ingest receiver error: {:?}", e); + if *server_is_running.lock().unwrap() { + if let Ok(receive) = ingest_receiver.try_recv() { + if let Err(e) = enc_writer.write(&receive.1[..receive.0]) { + error!("Ingest receiver error: {:?}", e); - break 'source_iter; - }; + break 'source_iter; + }; + } live_on = true; if kill_dec { - if let Some(dec) = &*decoder_term.lock().unwrap() { - unsafe { - if let Ok(_) = dec.terminate() { - info!("Switch from {} to live ingest", config.processing.mode); - } - } + if let Err(e) = dec_proc.kill() { + panic!("Decoder error: {:?}", e) + }; + + if let Err(e) = dec_proc.wait() { + panic!("Decoder error: {:?}", e) }; kill_dec = false; @@ -184,7 +183,7 @@ pub fn play(rt_handle: &Handle) { error!("Reading error from decoder: {:?}", e); break 'source_iter; - }, + } }; if dec_bytes_len > 0 { @@ -200,6 +199,8 @@ pub fn play(rt_handle: &Handle) { live_on = false; } + enc_writer.flush().unwrap(); + break; } } @@ -220,14 +221,6 @@ pub fn play(rt_handle: &Handle) { } }; - if let Some(dec) = &*decoder_term.lock().unwrap() { - unsafe { - if let Ok(_) = dec.terminate() { - info!("Terminate decoder done"); - } - } - }; - sleep(Duration::from_secs(1)); match enc_proc.kill() {