From 5e7df9c3b17859c0a899c31d1b5c274992cabfca Mon Sep 17 00:00:00 2001 From: jb-alvarado Date: Fri, 18 Mar 2022 17:04:43 +0100 Subject: [PATCH] reorganize inputs, continue work on ingest server --- Cargo.lock | 54 +++++ Cargo.toml | 1 + examples/pipe_ffmpeg.rs | 327 ++++++++++++++++++++++++++----- src/{utils => input}/folder.rs | 0 src/input/ingest.rs | 123 ++++++++++++ src/input/mod.rs | 7 + src/{utils => input}/playlist.rs | 24 +-- src/main.rs | 1 + src/output/mod.rs | 134 ++++++++++--- src/utils/ingest.rs | 51 ----- src/utils/mod.rs | 49 +++-- 11 files changed, 618 insertions(+), 153 deletions(-) rename src/{utils => input}/folder.rs (100%) create mode 100644 src/input/ingest.rs create mode 100644 src/input/mod.rs rename src/{utils => input}/playlist.rs (96%) delete mode 100644 src/utils/ingest.rs diff --git a/Cargo.lock b/Cargo.lock index 19ceff15..26e13903 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -154,6 +154,7 @@ dependencies = [ "notify", "once_cell", "openssl", + "process_control", "rand", "regex", "serde", @@ -758,6 +759,16 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "process_control" +version = "3.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26d4fa9c62a51815c9588b09a94f713c1e9a87d74142537d7c7d5ee972b8479f" +dependencies = [ + "libc", + "windows-sys", +] + [[package]] name = "quote" version = "1.0.15" @@ -1121,6 +1132,49 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows-sys" +version = "0.33.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43dbb096663629518eb1dfa72d80243ca5a6aca764cae62a2df70af760a9be75" +dependencies = [ + "windows_aarch64_msvc", + "windows_i686_gnu", + "windows_i686_msvc", + "windows_x86_64_gnu", + "windows_x86_64_msvc", +] + +[[package]] +name = "windows_aarch64_msvc" +version = "0.33.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd761fd3eb9ab8cc1ed81e56e567f02dd82c4c837e48ac3b2181b9ffc5060807" + +[[package]] +name = "windows_i686_gnu" +version = "0.33.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cab0cf703a96bab2dc0c02c0fa748491294bf9b7feb27e1f4f96340f208ada0e" + +[[package]] +name = "windows_i686_msvc" +version = "0.33.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8cfdbe89cc9ad7ce618ba34abc34bbb6c36d99e96cae2245b7943cd75ee773d0" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.33.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4dd9b0c0e9ece7bb22e84d70d01b71c6d6248b81a3c60d11869451b4cb24784" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.33.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff1e4aa646495048ec7f3ffddc411e1d829c026a2ec62b39da15c1055e406eaa" + [[package]] name = "ws2_32-sys" version = "0.2.1" diff --git a/Cargo.toml b/Cargo.toml index 9088b9ac..879fbcf5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,7 @@ notify = "4.0.17" rand = "0.8.5" regex = "1" once_cell = "1.10" +process_control = "3.3" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" serde_yaml = "0.8" diff --git a/examples/pipe_ffmpeg.rs b/examples/pipe_ffmpeg.rs index 882d5ef2..06183501 100644 --- a/examples/pipe_ffmpeg.rs +++ b/examples/pipe_ffmpeg.rs @@ -1,13 +1,187 @@ use std::{ - io::{prelude::*, Read}, - process::{Command, Stdio}, + io::{prelude::*, BufReader, Error, Read}, + process::{ChildStderr, Command, Stdio}, + sync::{ + mpsc::{channel, Receiver, Sender}, + Arc, Mutex, + }, thread::sleep, time::Duration, }; +use process_control::{ChildExt, Terminator}; +use tokio::runtime::{Handle, Runtime}; + +async fn ingest_server( + dec_setting: Vec<&str>, + ingest_sender: Sender<[u8; 65424]>, + proc_terminator: Arc>>, + is_terminated: Arc>, + rt_handle: Handle, +) -> Result<(), Error> { + let mut buffer: [u8; 65424] = [0; 65424]; + let filter = "[0:v]fps=25,scale=1024:576,setdar=dar=1.778[vout1]"; + let mut filter_list = vec!["-filter_complex", &filter, "-map", "[vout1]", "-map", "0:a"]; + let mut server_cmd = vec!["-hide_banner", "-nostats", "-v", "level+error"]; + let mut stream_input = vec![ + "-f", + "live_flv", + "-listen", + "1", + "-i", + "rtmp://localhost:1936/live/stream", + ]; + + server_cmd.append(&mut stream_input); + server_cmd.append(&mut filter_list); + server_cmd.append(&mut dec_setting.clone()); + + loop { + if *is_terminated.lock().unwrap() { + break; + } + + let mut server_proc = match Command::new("ffmpeg") + .args(server_cmd.clone()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn() + { + Err(e) => { + panic!("couldn't spawn ingest server: {}", e) + } + Ok(proc) => proc, + }; + + let serv_terminator = server_proc.terminator()?; + *proc_terminator.lock().unwrap() = Some(serv_terminator); + + rt_handle.spawn(stderr_reader( + server_proc.stderr.take().unwrap(), + "Server".to_string(), + proc_terminator.clone(), + is_terminated.clone(), + + )); + + let ingest_reader = server_proc.stdout.as_mut().unwrap(); + + loop { + match ingest_reader.read_exact(&mut buffer[..]) { + Ok(length) => length, + Err(_) => break, + }; + + if let Err(e) = ingest_sender.send(buffer) { + println!("Ingest server error: {:?}", e); + break; + } + } + + sleep(Duration::from_secs(1)); + + if let Err(e) = server_proc.wait() { + panic!("Server error: {:?}", e) + }; + } + + Ok(()) +} + +pub async fn stderr_reader( + std_errors: ChildStderr, + suffix: String, + server_term: Arc>>, + is_terminated: Arc>, +) -> Result<(), Error> { + // read ffmpeg stderr decoder and encoder instance + // and log the output + + fn format_line(line: String, level: String) -> String { + line.replace(&format!("[{}] ", level), "") + } + + let buffer = BufReader::new(std_errors); + + for line in buffer.lines() { + let line = line?; + + if line.contains("[info]") { + println!("[{suffix}] {}", format_line(line, "info".to_string())) + } else if line.contains("[warning]") { + println!("[{suffix}] {}", format_line(line, "warning".to_string())) + } else { + if suffix != "server" && !line.contains("Input/output error") { + println!( + "[{suffix}] {}", + format_line(line.clone(), "level+error".to_string()) + ); + } + + if line.contains("Error closing file pipe:: Broken pipe") { + *is_terminated.lock().unwrap() = true; + + match &*server_term.lock().unwrap() { + Some(serv) => unsafe { + if let Ok(_) = serv.terminate() { + println!("Terminate server done"); + } + }, + None => (), + } + } + } + } + + Ok(()) +} + fn main() { - let mut enc_proc = match Command::new("ffplay") - .args(["-v", "error", "-hide_banner", "-nostats", "-i", "pipe:0"]) + let decoder_term: Arc>> = Arc::new(Mutex::new(None)); + let player_term: Arc>> = Arc::new(Mutex::new(None)); + let server_term: Arc>> = Arc::new(Mutex::new(None)); + let is_terminated: Arc> = Arc::new(Mutex::new(false)); + + let runtime = Runtime::new().unwrap(); + let rt_handle = runtime.handle(); + + let dec_setting: Vec<&str> = vec![ + "-pix_fmt", + "yuv420p", + "-c:v", + "mpeg2video", + "-g", + "1", + "-b:v", + "50000k", + "-minrate", + "50000k", + "-maxrate", + "50000k", + "-bufsize", + "25000k", + "-c:a", + "s302m", + "-strict", + "-2", + "-ar", + "48000", + "-ac", + "2", + "-f", + "mpegts", + "-", + ]; + + let mut player_proc = match Command::new("ffplay") + .args([ + "-v", + "level+error", + "-hide_banner", + "-nostats", + "-i", + "pipe:0", + ]) .stdin(Stdio::piped()) .stderr(Stdio::piped()) .spawn() @@ -16,44 +190,49 @@ fn main() { Ok(proc) => proc, }; + rt_handle.spawn(stderr_reader( + player_proc.stderr.take().unwrap(), + "Player".to_string(), + server_term.clone(), + is_terminated.clone(), + )); + + let player_terminator = match player_proc.terminator() { + Ok(proc) => Some(proc), + Err(_) => None, + }; + *player_term.lock().unwrap() = player_terminator; + + let (ingest_sender, ingest_receiver): (Sender<[u8; 65424]>, Receiver<[u8; 65424]>) = channel(); + + rt_handle.spawn(ingest_server( + dec_setting.clone(), + ingest_sender, + server_term.clone(), + is_terminated.clone(), + rt_handle.clone(), + )); + let mut buffer: [u8; 65424] = [0; 65424]; + let mut dec_cmd = vec![ + "-v", + "level+error", + "-hide_banner", + "-nostats", + "-f", + "lavfi", + "-i", + "testsrc=duration=20:size=1024x576:rate=25", + "-f", + "lavfi", + "-i", + "anoisesrc=d=20:c=pink:r=48000:a=0.5", + ]; + + dec_cmd.append(&mut dec_setting.clone()); let mut dec_proc = match Command::new("ffmpeg") - .args([ - "-f", - "lavfi", - "-i", - "testsrc=duration=6:size=1280x720:rate=25", - "-f", - "lavfi", - "-i", - "anoisesrc=d=6:c=pink:r=48000:a=0.5", - "-pix_fmt", - "yuv420p", - "-c:v", - "mpeg2video", - "-g", - "1", - "-b:v", - "50000k", - "-minrate", - "50000k", - "-maxrate", - "50000k", - "-bufsize", - "25000k", - "-c:a", - "s302m", - "-strict", - "-2", - "-ar", - "48000", - "-ac", - "2", - "-f", - "mpegts", - "-", - ]) + .args(dec_cmd) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .spawn() @@ -62,18 +241,37 @@ fn main() { Ok(proc) => proc, }; - let mut enc_writer = enc_proc.stdin.as_ref().unwrap(); + rt_handle.spawn(stderr_reader( + dec_proc.stderr.take().unwrap(), + "Decoder".to_string(), + server_term.clone(), + is_terminated.clone(), + )); + + let dec_terminator = match dec_proc.terminator() { + Ok(proc) => Some(proc), + Err(_) => None, + }; + *decoder_term.lock().unwrap() = dec_terminator; + + let mut player_writer = player_proc.stdin.as_ref().unwrap(); let dec_reader = dec_proc.stdout.as_mut().unwrap(); loop { let bytes_len = match dec_reader.read(&mut buffer[..]) { Ok(length) => length, - Err(e) => panic!("Reading error from decoder: {:?}", e) + Err(e) => panic!("Reading error from decoder: {:?}", e), }; - match enc_writer.write(&buffer[..bytes_len]) { - Ok(_) => (), - Err(e) => panic!("Err: {:?}", e), + if let Ok(receive) = ingest_receiver.try_recv() { + if let Err(e) = player_writer.write_all(&receive) { + panic!("Err: {:?}", e) + }; + continue; + } + + if let Err(e) = player_writer.write(&buffer[..bytes_len]) { + panic!("Err: {:?}", e) }; if bytes_len == 0 { @@ -81,15 +279,42 @@ fn main() { } } - match dec_proc.wait() { - Ok(_) => println!("decoding done..."), - Err(e) => panic!("Enc error: {:?}", e), - } + *is_terminated.lock().unwrap() = true; sleep(Duration::from_secs(1)); - match enc_proc.kill() { - Ok(_) => println!("Playout done..."), - Err(e) => panic!("Enc error: {:?}", e), + println!("Terminate decoder..."); + + match &*decoder_term.lock().unwrap() { + Some(dec) => unsafe { + if let Ok(_) = dec.terminate() { + println!("Terminate decoder done"); + } + }, + None => (), } + + println!("Terminate encoder..."); + + match &*player_term.lock().unwrap() { + Some(enc) => unsafe { + if let Ok(_) = enc.terminate() { + println!("Terminate encoder done"); + } + }, + None => (), + } + + println!("Terminate server..."); + + match &*server_term.lock().unwrap() { + Some(serv) => unsafe { + if let Ok(_) = serv.terminate() { + println!("Terminate server done"); + } + }, + None => (), + } + + println!("Terminate done..."); } diff --git a/src/utils/folder.rs b/src/input/folder.rs similarity index 100% rename from src/utils/folder.rs rename to src/input/folder.rs diff --git a/src/input/ingest.rs b/src/input/ingest.rs new file mode 100644 index 00000000..5950e091 --- /dev/null +++ b/src/input/ingest.rs @@ -0,0 +1,123 @@ +use std::{ + io::{Error, Read}, + path::Path, + process::{Command, Stdio}, + sync::{mpsc::Sender, Arc, Mutex}, + thread::sleep, + time::Duration, +}; + +use process_control::{ChildExt, Terminator}; +use simplelog::*; +use tokio::runtime::Handle; + +use crate::utils::{stderr_reader, GlobalConfig}; + +fn overlay(config: &GlobalConfig) -> String { + let mut logo_chain = String::new(); + + if config.processing.add_logo && Path::new(&config.processing.logo).is_file() { + let opacity = format!( + "format=rgba,colorchannelmixer=aa={}", + config.processing.logo_opacity + ); + let logo_loop = "loop=loop=-1:size=1:start=0"; + logo_chain = format!("[v];movie={},{logo_loop},{opacity}", config.processing.logo); + + logo_chain + .push_str(format!("[l];[v][l]{}:shortest=1", config.processing.logo_filter).as_str()); + } + + logo_chain +} + +pub async fn ingest_server( + log_format: String, + ingest_sender: Sender<[u8; 65424]>, + rt_handle: Handle, + proc_terminator: Arc>>, + is_terminated: Arc>, +) -> Result<(), Error> { + let config = GlobalConfig::global(); + let mut buffer: [u8; 65424] = [0; 65424]; + let mut filter = format!( + "[0:v]fps={},scale={}:{},setdar=dar={}", + config.processing.fps, + config.processing.width, + config.processing.height, + config.processing.aspect + ); + + filter.push_str(&overlay(&config)); + filter.push_str("[vout1]"); + let mut filter_list = vec!["-filter_complex", &filter, "-map", "[vout1]", "-map", "0:a"]; + + let mut server_cmd = vec!["-hide_banner", "-nostats", "-v", log_format.as_str()]; + let stream_input = config.ingest.stream_input.clone(); + let stream_settings = config.processing.settings.clone().unwrap(); + + server_cmd.append(&mut stream_input.iter().map(String::as_str).collect()); + server_cmd.append(&mut filter_list); + server_cmd.append(&mut stream_settings.iter().map(String::as_str).collect()); + + info!( + "Start ingest server, listening on: {}", + stream_input.last().unwrap() + ); + + debug!("Server CMD: {:?}", server_cmd); + + loop { + if *is_terminated.lock().unwrap() { + break + } + let mut server_proc = match Command::new("ffmpeg") + .args(server_cmd.clone()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn() + { + Err(e) => { + error!("couldn't spawn ingest server: {}", e); + panic!("couldn't spawn ingest server: {}", e) + } + Ok(proc) => proc, + }; + + let serv_terminator = server_proc.terminator()?; + *proc_terminator.lock().unwrap() = Some(serv_terminator); + + rt_handle.spawn(stderr_reader( + server_proc.stderr.take().unwrap(), + "Server".to_string(), + proc_terminator.clone(), + is_terminated.clone(), + )); + + let ingest_reader = server_proc.stdout.as_mut().unwrap(); + + loop { + if let Err(e) = ingest_reader.read_exact(&mut buffer[..]) { + debug!("Ingest server read {:?}", e); + break; + }; + + if let Err(e) = ingest_sender.send(buffer) { + error!("Ingest server write error: {:?}", e); + + *is_terminated.lock().unwrap() = true; + server_proc.kill().expect("Ingest server could not killed"); + + break; + } + } + + sleep(Duration::from_secs(1)); + + if let Err(e) = server_proc.wait() { + panic!("Ingest server {:?}", e) + }; + } + + Ok(()) +} diff --git a/src/input/mod.rs b/src/input/mod.rs new file mode 100644 index 00000000..ccdf5271 --- /dev/null +++ b/src/input/mod.rs @@ -0,0 +1,7 @@ +pub mod folder; +pub mod ingest; +pub mod playlist; + +pub use ingest::ingest_server; +pub use folder::{watch_folder, Source}; +pub use playlist::CurrentProgram; diff --git a/src/utils/playlist.rs b/src/input/playlist.rs similarity index 96% rename from src/utils/playlist.rs rename to src/input/playlist.rs index 2e3b8295..77e3715e 100644 --- a/src/utils/playlist.rs +++ b/src/input/playlist.rs @@ -1,4 +1,7 @@ -use std::path::Path; +use std::{ + path::Path, + sync::{Arc, Mutex}, +}; use simplelog::*; use tokio::runtime::Handle; @@ -16,7 +19,7 @@ pub struct CurrentProgram { json_path: Option, nodes: Vec, current_node: Media, - init: bool, + pub init: Arc>, index: usize, rt_handle: Handle, } @@ -33,7 +36,7 @@ impl CurrentProgram { json_path: json.current_file, nodes: json.program, current_node: Media::new(0, "".to_string()), - init: true, + init: Arc::new(Mutex::new(true)), index: 0, rt_handle, } @@ -73,7 +76,7 @@ impl CurrentProgram { self.json_path = None; self.nodes = vec![media.clone()]; self.current_node = media; - self.init = true; + *self.init.lock().unwrap() = true; self.index = 0; } } @@ -103,7 +106,7 @@ impl CurrentProgram { self.index = 0; if json.current_file.is_none() { - self.init = true; + *self.init.lock().unwrap() = true; } } } @@ -139,7 +142,7 @@ impl CurrentProgram { for (i, item) in self.nodes.iter_mut().enumerate() { if start_sec + item.out - item.seek > time_sec { - self.init = false; + *self.init.lock().unwrap() = false; self.index = i + 1; item.seek = time_sec - start_sec; @@ -155,7 +158,7 @@ impl Iterator for CurrentProgram { type Item = Media; fn next(&mut self) -> Option { - if self.init { + if *self.init.lock().unwrap() { debug!("Playlist init"); self.check_update(true); @@ -163,7 +166,7 @@ impl Iterator for CurrentProgram { self.get_init_clip(); } - if self.init { + if *self.init.lock().unwrap() { // on init load playlist, could be not long enough, // so we check if we can take the next playlist already, // or we fill the gap with a dummy. @@ -185,7 +188,7 @@ impl Iterator for CurrentProgram { if DUMMY_LEN > total_delta { duration = total_delta; - self.init = false; + *self.init.lock().unwrap() = false; } if self.config.playlist.start_sec.unwrap() > current_time { @@ -225,8 +228,7 @@ impl Iterator for CurrentProgram { } else { let last_playlist = self.json_path.clone(); self.check_for_next_playlist(); - let (_, total_delta) = - get_delta(&self.config.playlist.start_sec.unwrap()); + let (_, total_delta) = get_delta(&self.config.playlist.start_sec.unwrap()); let mut last_ad = self.is_ad(self.index, false); if last_playlist == self.json_path diff --git a/src/main.rs b/src/main.rs index ca5e670c..b3b91c29 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,6 +2,7 @@ extern crate log; extern crate simplelog; mod filter; +mod input; mod output; mod utils; diff --git a/src/output/mod.rs b/src/output/mod.rs index 4dd0c017..f7f0f69b 100644 --- a/src/output/mod.rs +++ b/src/output/mod.rs @@ -4,26 +4,37 @@ use std::{ path::Path, process, process::{Command, Stdio}, - sync::{mpsc::channel, Arc, Mutex}, + sync::{ + mpsc::{channel, Receiver, Sender}, + Arc, Mutex, + }, thread::sleep, time::Duration, }; +use process_control::{ChildExt, Terminator}; use simplelog::*; use tokio::runtime::Handle; mod desktop; mod stream; -use crate::utils::{ - ingest_server, sec_to_time, stderr_reader, watch_folder, CurrentProgram, GlobalConfig, Media, - Source, -}; +use crate::input::{ingest_server, watch_folder, CurrentProgram, Source}; +use crate::utils::{sec_to_time, stderr_reader, GlobalConfig, Media}; pub fn play(rt_handle: &Handle) { let config = GlobalConfig::global(); + let dec_settings = config.processing.clone().settings.unwrap(); + let ff_log_format = format!("level+{}", config.logging.ffmpeg_level.to_lowercase()); - let dec_pid: Arc> = Arc::new(Mutex::new(0)); + let decoder_term: Arc>> = Arc::new(Mutex::new(None)); + let encoder_term: Arc>> = Arc::new(Mutex::new(None)); + let server_term: Arc>> = Arc::new(Mutex::new(None)); + let is_terminated: Arc> = Arc::new(Mutex::new(false)); + let mut init_playlist: Option>> = None; + let mut live_on = false; + + let mut buffer: [u8; 65424] = [0; 65424]; let get_source = match config.processing.clone().mode.as_str() { "folder" => { @@ -51,7 +62,9 @@ pub fn play(rt_handle: &Handle) { } "playlist" => { info!("Playout in playlist mode"); - Box::new(CurrentProgram::new(rt_handle.clone())) as Box> + let program = CurrentProgram::new(rt_handle.clone()); + init_playlist = Some(program.init.clone()); + Box::new(program) as Box> } _ => { error!("Process Mode not exists!"); @@ -59,23 +72,36 @@ pub fn play(rt_handle: &Handle) { } }; - let dec_settings = config.processing.clone().settings.unwrap(); - let ff_log_format = format!("level+{}", config.logging.ffmpeg_level.to_lowercase()); - let mut enc_proc = match config.out.mode.as_str() { "desktop" => desktop::output(ff_log_format.clone()), "stream" => stream::output(ff_log_format.clone()), _ => panic!("Output mode doesn't exists!"), }; + let enc_terminator = match enc_proc.terminator() { + Ok(proc) => Some(proc), + Err(_) => None, + }; + *encoder_term.lock().unwrap() = enc_terminator; + rt_handle.spawn(stderr_reader( enc_proc.stderr.take().unwrap(), "Encoder".to_string(), + server_term.clone(), + is_terminated.clone(), )); - let mut buffer: [u8; 65424] = [0; 65424]; + let (ingest_sender, ingest_receiver): (Sender<[u8; 65424]>, Receiver<([u8; 65424])>) = channel(); - ingest_server(ff_log_format.clone()); + if config.ingest.enable { + rt_handle.spawn(ingest_server( + ff_log_format.clone(), + ingest_sender, + rt_handle.clone(), + server_term.clone(), + is_terminated.clone(), + )); + } for node in get_source { let cmd = match node.cmd { @@ -94,8 +120,7 @@ pub fn play(rt_handle: &Handle) { ); let filter = node.filter.unwrap(); - - let mut dec_cmd = vec!["-v", ff_log_format.as_str(), "-hide_banner", "-nostats"]; + let mut dec_cmd = vec!["-hide_banner", "-nostats", "-v", ff_log_format.as_str()]; dec_cmd.append(&mut cmd.iter().map(String::as_str).collect()); @@ -119,31 +144,65 @@ pub fn play(rt_handle: &Handle) { Ok(proc) => proc, }; - *dec_pid.lock().unwrap() = dec_proc.id(); + let dec_terminator = match dec_proc.terminator() { + Ok(proc) => Some(proc), + Err(_) => None, + }; + *decoder_term.lock().unwrap() = dec_terminator; let mut enc_writer = enc_proc.stdin.as_ref().unwrap(); let dec_reader = dec_proc.stdout.as_mut().unwrap(); - // debug!("Decoder PID: {}", dec_pid.lock().unwrap()); - rt_handle.spawn(stderr_reader( dec_proc.stderr.take().unwrap(), "Decoder".to_string(), + server_term.clone(), + is_terminated.clone(), )); + let mut kill_dec = true; + loop { let dec_bytes_len = match dec_reader.read(&mut buffer[..]) { Ok(length) => length, Err(e) => panic!("Reading error from decoder: {:?}", e), }; - if let Err(e) = enc_writer.write(&buffer[..dec_bytes_len]) { - panic!("Err: {:?}", e) - }; + if let Ok(receive) = ingest_receiver.try_recv() { + if let Err(e) = enc_writer.write_all(&receive) { + panic!("Ingest receiver error: {:?}", e) + }; + + live_on = true; + + if kill_dec { + if let Some(dec) = &*decoder_term.lock().unwrap() { + unsafe { + if let Ok(_) = dec.terminate() { + info!("Switch from decoder to live ingest"); + } + } + }; + + kill_dec = false; + + if let Some(init) = &init_playlist { + *init.lock().unwrap() = true; + } + } + } else if dec_bytes_len > 0 { + if let Err(e) = enc_writer.write(&buffer[..dec_bytes_len]) { + panic!("Encoder write error: {:?}", e) + }; + } else { + if live_on { + info!("Switch from live ingest to decoder"); + + live_on = false; + } - if dec_bytes_len == 0 { break; - }; + } } if let Err(e) = dec_proc.wait() { @@ -151,10 +210,33 @@ pub fn play(rt_handle: &Handle) { }; } + *is_terminated.lock().unwrap() = true; + sleep(Duration::from_secs(1)); - match enc_proc.kill() { - Ok(_) => info!("Playout done..."), - Err(e) => panic!("Encoder error: {:?}", e), - } + if let Some(dec) = &*decoder_term.lock().unwrap() { + unsafe { + if let Ok(_) = dec.terminate() { + debug!("Terminate decoder done"); + } + } + }; + + if let Some(enc) = &*encoder_term.lock().unwrap() { + unsafe { + if let Ok(_) = enc.terminate() { + debug!("Terminate encoder done"); + } + } + }; + + if let Some(server) = &*server_term.lock().unwrap() { + unsafe { + if let Ok(_) = server.terminate() { + debug!("Terminate server done"); + } + } + }; + + info!("Playout done..."); } diff --git a/src/utils/ingest.rs b/src/utils/ingest.rs deleted file mode 100644 index 5c913b4e..00000000 --- a/src/utils/ingest.rs +++ /dev/null @@ -1,51 +0,0 @@ -use std::path::Path; - -use simplelog::*; - -use crate::utils::GlobalConfig; - -fn overlay(config: &GlobalConfig) -> String { - let mut logo_chain = String::new(); - - if config.processing.add_logo && Path::new(&config.processing.logo).is_file() { - let opacity = format!( - "format=rgba,colorchannelmixer=aa={}", - config.processing.logo_opacity - ); - let logo_loop = "loop=loop=-1:size=1:start=0"; - logo_chain = format!("[v];movie={},{logo_loop},{opacity}", config.processing.logo); - - logo_chain - .push_str(format!("[l];[v][l]{}:shortest=1", config.processing.logo_filter).as_str()); - } - - logo_chain -} - -pub fn ingest_server(log_format: String) { - let config = GlobalConfig::global(); - let mut filter = format!( - "[0:v]fps={},scale={}:{},'setdar=dar={}", - config.processing.fps, - config.processing.width, - config.processing.height, - config.processing.aspect - ); - - filter.push_str(&overlay(&config)); - filter.push_str("[vout1]"); - let mut filter_list = vec!["-filter_complex", &filter, "-map", "[vout1]", "-map", "0:a"]; - - let mut server_cmd = vec!["-hide_banner", "-nostats", "-v", log_format.as_str()]; - let stream_input = config.ingest.stream_input.clone(); - let stream_settings = config.processing.settings.clone().unwrap(); - - server_cmd.append(&mut stream_input.iter().map(String::as_str).collect()); - server_cmd.append(&mut filter_list); - server_cmd.append(&mut stream_settings.iter().map(String::as_str).collect()); - - info!( - "Start ingest server, listening on: {}", - stream_input.last().unwrap() - ); -} diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 4d18d746..9ec24cdc 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -7,29 +7,25 @@ use std::{ io::{BufRead, BufReader, Error}, path::Path, process::ChildStderr, + sync::{Arc, Mutex}, time, time::UNIX_EPOCH, }; +use process_control::Terminator; use simplelog::*; mod arg_parse; mod config; -mod folder; -mod ingest; -mod json_reader; +pub mod json_reader; mod json_validate; mod logging; -mod playlist; pub use arg_parse::get_args; pub use config::{init_config, GlobalConfig}; -pub use ingest::ingest_server; -pub use folder::{watch_folder, Source}; -pub use json_reader::{read_json, DUMMY_LEN, Playlist}; +pub use json_reader::{read_json, Playlist, DUMMY_LEN}; pub use json_validate::validate_playlist; pub use logging::init_logging; -pub use playlist::CurrentProgram; use crate::filter::filter_chains; @@ -82,11 +78,11 @@ impl Media { } } - fn add_probe(&mut self) { + pub fn add_probe(&mut self) { self.probe = Some(MediaProbe::new(self.source.clone())) } - fn add_filter(&mut self) { + pub fn add_filter(&mut self) { let mut node = self.clone(); self.filter = Some(filter_chains(&mut node)) } @@ -258,7 +254,7 @@ pub fn check_sync(delta: f64) -> bool { let config = GlobalConfig::global(); if delta.abs() > config.general.stop_threshold && config.general.stop_threshold > 0.0 { - error!("Start time out of sync for {} seconds", delta); + error!("Clip begin out of sync for {} seconds", delta); return false; } @@ -316,6 +312,8 @@ pub fn seek_and_length(src: String, seek: f64, out: f64, duration: f64) -> Vec>>, + is_terminated: Arc>, ) -> Result<(), Error> { // read ffmpeg stderr decoder and encoder instance // and log the output @@ -330,11 +328,34 @@ pub async fn stderr_reader( let line = line?; if line.contains("[info]") { - info!("[{suffix}] {}", format_line(line, "info".to_string())) + info!( + "[{suffix}] {}", + format_line(line, "info".to_string()) + ) } else if line.contains("[warning]") { - warn!("[{suffix}] {}", format_line(line, "warning".to_string())) + warn!( + "[{suffix}] {}", + format_line(line, "warning".to_string()) + ) } else { - error!("[{suffix}] {}", format_line(line, "error".to_string())) + if suffix != "server" && !line.contains("Input/output error") { + error!( + "[{suffix}] {}", + format_line(line.clone(), "error".to_string()) + ); + } + + if line.contains("Error closing file pipe:: Broken pipe") { + *is_terminated.lock().unwrap() = true; + + if let Some(server) = &*server_term.lock().unwrap() { + unsafe { + if let Ok(_) = server.terminate() { + info!("Terminate ingest server"); + } + } + }; + } } }