diff --git a/examples/pipe_ffmpeg2.rs b/examples/pipe_ffmpeg2.rs new file mode 100644 index 00000000..b9efbabc --- /dev/null +++ b/examples/pipe_ffmpeg2.rs @@ -0,0 +1,247 @@ +use std::{ + io::{prelude::*, Error, Read}, + process::{Command, Stdio}, + sync::{ + mpsc::{channel, Receiver, Sender}, + Arc, Mutex, + }, + thread::sleep, + time::Duration, +}; + +use process_control::{ChildExt, Terminator}; +use tokio::runtime::Runtime; + +async fn ingest_server( + dec_setting: Vec<&str>, + ingest_sender: Sender<[u8; 65424]>, + proc_terminator: Arc>>, + is_terminated: Arc>, +) -> Result<(), Error> { + let mut buffer: [u8; 65424] = [0; 65424]; + let filter = "[0:v]fps=25,scale=1024:576,setdar=dar=1.778[vout1]"; + let mut filter_list = vec!["-filter_complex", &filter, "-map", "[vout1]", "-map", "0:a"]; + let mut server_cmd = vec!["-hide_banner", "-nostats", "-v", "error"]; + + let mut stream_input = vec![ + "-f", + "live_flv", + "-listen", + "1", + "-i", + "rtmp://localhost:1936/live/stream", + ]; + + server_cmd.append(&mut stream_input); + server_cmd.append(&mut filter_list); + server_cmd.append(&mut dec_setting.clone()); + + loop { + if *is_terminated.lock().unwrap() { + break; + } + + let mut server_proc = match Command::new("ffmpeg") + .args(server_cmd.clone()) + .stdout(Stdio::piped()) + .spawn() + { + Err(e) => { + panic!("couldn't spawn ingest server: {}", e) + } + Ok(proc) => proc, + }; + + let serv_terminator = server_proc.terminator()?; + *proc_terminator.lock().unwrap() = Some(serv_terminator); + let ingest_reader = server_proc.stdout.as_mut().unwrap(); + + loop { + if *is_terminated.lock().unwrap() { + break; + } + + match ingest_reader.read_exact(&mut buffer[..]) { + Ok(length) => length, + Err(_) => break, + }; + + if let Err(e) = ingest_sender.send(buffer) { + println!("Ingest server error: {:?}", e); + break; + } + } + + sleep(Duration::from_secs(1)); + + if let Err(e) = server_proc.wait() { + panic!("Decoder error: {:?}", e) + }; + } + + println!("after server loop"); + + Ok(()) +} +fn main() { + let decoder_term: Arc>> = Arc::new(Mutex::new(None)); + let player_term: Arc>> = Arc::new(Mutex::new(None)); + let server_term: Arc>> = Arc::new(Mutex::new(None)); + let is_terminated: Arc> = Arc::new(Mutex::new(false)); + + let dec_setting: Vec<&str> = vec![ + "-pix_fmt", + "yuv420p", + "-c:v", + "mpeg2video", + "-g", + "1", + "-b:v", + "50000k", + "-minrate", + "50000k", + "-maxrate", + "50000k", + "-bufsize", + "25000k", + "-c:a", + "s302m", + "-strict", + "-2", + "-ar", + "48000", + "-ac", + "2", + "-f", + "mpegts", + "-", + ]; + + let player_proc = match Command::new("ffplay") + .args(["-v", "error", "-hide_banner", "-nostats", "-i", "pipe:0"]) + .stdin(Stdio::piped()) + .spawn() + { + Err(e) => panic!("couldn't spawn ffplay: {}", e), + Ok(proc) => proc, + }; + + let player_terminator = match player_proc.terminator() { + Ok(proc) => Some(proc), + Err(_) => None, + }; + + *player_term.lock().unwrap() = player_terminator; + let (ingest_sender, ingest_receiver): (Sender<[u8; 65424]>, Receiver<[u8; 65424]>) = channel(); + let runtime = Runtime::new().unwrap(); + + runtime.spawn(ingest_server( + dec_setting.clone(), + ingest_sender, + server_term.clone(), + is_terminated.clone(), + )); + + let mut buffer: [u8; 65424] = [0; 65424]; + + let mut dec_cmd = vec![ + "-v", + "error", + "-hide_banner", + "-nostats", + "-f", + "lavfi", + "-i", + "testsrc=duration=20:size=1024x576:rate=25", + "-f", + "lavfi", + "-i", + "anoisesrc=d=20:c=pink:r=48000:a=0.5", + ]; + + dec_cmd.append(&mut dec_setting.clone()); + + let mut dec_proc = match Command::new("ffmpeg") + .args(dec_cmd) + .stdout(Stdio::piped()) + .spawn() + { + Err(e) => panic!("couldn't spawn ffmpeg: {}", e), + Ok(proc) => proc, + }; + + let dec_terminator = match dec_proc.terminator() { + Ok(proc) => Some(proc), + Err(_) => None, + }; + + *decoder_term.lock().unwrap() = dec_terminator; + let mut player_writer = player_proc.stdin.as_ref().unwrap(); + + let dec_reader = dec_proc.stdout.as_mut().unwrap(); + + 'outer: loop { + let bytes_len = match dec_reader.read(&mut buffer[..]) { + Ok(length) => length, + Err(e) => panic!("Reading error from decoder: {:?}", e), + }; + + if let Ok(receive) = ingest_receiver.try_recv() { + println!("in receiver"); + if let Err(e) = player_writer.write_all(&receive) { + panic!("Err: {:?}", e) + }; + continue; + } + + if let Err(e) = player_writer.write(&buffer[..bytes_len]) { + println!("write to player: {:?}", e); + + break 'outer + }; + + if bytes_len == 0 { + break; + } + + } + + *is_terminated.lock().unwrap() = true; + + sleep(Duration::from_secs(1)); + + println!("Terminate decoder..."); + + match &*decoder_term.lock().unwrap() { + Some(dec) => unsafe { + if let Ok(_) = dec.terminate() { + println!("Terminate decoder done"); + } + }, + None => (), + } + + println!("Terminate encoder..."); + + match &*player_term.lock().unwrap() { + Some(enc) => unsafe { + if let Ok(_) = enc.terminate() { + println!("Terminate encoder done"); + } + }, + None => (), + } + + println!("Terminate server..."); + + match &*server_term.lock().unwrap() { + Some(serv) => unsafe { + if let Ok(_) = serv.terminate() { + println!("Terminate server done"); + } + }, + None => (), + } + + println!("Terminate done..."); +} diff --git a/src/input/ingest.rs b/src/input/ingest.rs index 5950e091..d39e7930 100644 --- a/src/input/ingest.rs +++ b/src/input/ingest.rs @@ -90,15 +90,16 @@ pub async fn ingest_server( rt_handle.spawn(stderr_reader( server_proc.stderr.take().unwrap(), "Server".to_string(), - proc_terminator.clone(), - is_terminated.clone(), )); let ingest_reader = server_proc.stdout.as_mut().unwrap(); loop { if let Err(e) = ingest_reader.read_exact(&mut buffer[..]) { - debug!("Ingest server read {:?}", e); + if !e.to_string().contains("failed to fill whole buffer") { + debug!("Ingest server read {:?}", e); + } + break; }; diff --git a/src/input/playlist.rs b/src/input/playlist.rs index 77e3715e..cc03d729 100644 --- a/src/input/playlist.rs +++ b/src/input/playlist.rs @@ -22,12 +22,13 @@ pub struct CurrentProgram { pub init: Arc>, index: usize, rt_handle: Handle, + is_terminated: Arc>, } impl CurrentProgram { - pub fn new(rt_handle: Handle) -> Self { + pub fn new(rt_handle: Handle, is_terminated: Arc>) -> Self { let config = GlobalConfig::global(); - let json = read_json(rt_handle.clone(), true, 0.0); + let json = read_json(rt_handle.clone(), is_terminated.clone(), true, 0.0); Self { config: config.clone(), @@ -39,12 +40,13 @@ impl CurrentProgram { init: Arc::new(Mutex::new(true)), index: 0, rt_handle, + is_terminated, } } fn check_update(&mut self, seek: bool) { if self.json_path.is_none() { - let json = read_json(self.rt_handle.clone(), seek, 0.0); + let json = read_json(self.rt_handle.clone(), self.is_terminated.clone(), seek, 0.0); self.json_path = json.current_file; self.json_mod = json.modified; @@ -58,7 +60,7 @@ impl CurrentProgram { .eq(&self.json_mod.clone().unwrap()) { // when playlist has changed, reload it - let json = read_json(self.rt_handle.clone(), false, 0.0); + let json = read_json(self.rt_handle.clone(), self.is_terminated.clone(), false, 0.0); self.json_mod = json.modified; self.nodes = json.program; @@ -98,7 +100,12 @@ impl CurrentProgram { || is_close(total_delta, 0.0, 2.0) || is_close(total_delta, target_length, 2.0) { - let json = read_json(self.rt_handle.clone(), false, next_start); + let json = read_json( + self.rt_handle.clone(), + self.is_terminated.clone(), + false, + next_start, + ); self.json_path = json.current_file.clone(); self.json_mod = json.modified; diff --git a/src/output/mod.rs b/src/output/mod.rs index f7f0f69b..7a8e364c 100644 --- a/src/output/mod.rs +++ b/src/output/mod.rs @@ -62,7 +62,7 @@ pub fn play(rt_handle: &Handle) { } "playlist" => { info!("Playout in playlist mode"); - let program = CurrentProgram::new(rt_handle.clone()); + let program = CurrentProgram::new(rt_handle.clone(), is_terminated.clone()); init_playlist = Some(program.init.clone()); Box::new(program) as Box> } @@ -87,8 +87,6 @@ pub fn play(rt_handle: &Handle) { rt_handle.spawn(stderr_reader( enc_proc.stderr.take().unwrap(), "Encoder".to_string(), - server_term.clone(), - is_terminated.clone(), )); let (ingest_sender, ingest_receiver): (Sender<[u8; 65424]>, Receiver<([u8; 65424])>) = channel(); @@ -103,7 +101,7 @@ pub fn play(rt_handle: &Handle) { )); } - for node in get_source { + 'source_iter: for node in get_source { let cmd = match node.cmd { Some(cmd) => cmd, None => break, @@ -156,8 +154,6 @@ pub fn play(rt_handle: &Handle) { rt_handle.spawn(stderr_reader( dec_proc.stderr.take().unwrap(), "Decoder".to_string(), - server_term.clone(), - is_terminated.clone(), )); let mut kill_dec = true; @@ -170,7 +166,9 @@ pub fn play(rt_handle: &Handle) { if let Ok(receive) = ingest_receiver.try_recv() { if let Err(e) = enc_writer.write_all(&receive) { - panic!("Ingest receiver error: {:?}", e) + error!("Ingest receiver error: {:?}", e); + + break 'source_iter }; live_on = true; @@ -192,7 +190,9 @@ pub fn play(rt_handle: &Handle) { } } else if dec_bytes_len > 0 { if let Err(e) = enc_writer.write(&buffer[..dec_bytes_len]) { - panic!("Encoder write error: {:?}", e) + error!("Encoder write error: {:?}", e); + + break 'source_iter }; } else { if live_on { diff --git a/src/utils/json_reader.rs b/src/utils/json_reader.rs index 59501c12..34f21e14 100644 --- a/src/utils/json_reader.rs +++ b/src/utils/json_reader.rs @@ -1,5 +1,5 @@ use serde::{Deserialize, Serialize}; -use std::{fs::File, path::Path}; +use std::{fs::File, path::Path, sync::{Arc, Mutex}}; use simplelog::*; use tokio::runtime::Handle; @@ -33,7 +33,7 @@ impl Playlist { } } -pub fn read_json(rt_handle: Handle, seek: bool, next_start: f64) -> Playlist { +pub fn read_json(rt_handle: Handle, is_terminated: Arc>, seek: bool, next_start: f64) -> Playlist { let config = GlobalConfig::global(); let mut playlist_path = Path::new(&config.playlist.path).to_owned(); @@ -82,7 +82,7 @@ pub fn read_json(rt_handle: Handle, seek: bool, next_start: f64) -> Playlist { start_sec += item.out - item.seek; } - rt_handle.spawn(validate_playlist(playlist.clone(), config.clone())); + rt_handle.spawn(validate_playlist(playlist.clone(), is_terminated, config.clone())); playlist } diff --git a/src/utils/json_validate.rs b/src/utils/json_validate.rs index 27a30b3d..6571e628 100644 --- a/src/utils/json_validate.rs +++ b/src/utils/json_validate.rs @@ -1,10 +1,10 @@ -use std::path::Path; +use std::{path::Path, sync::{Arc, Mutex},}; use simplelog::*; use crate::utils::{sec_to_time, GlobalConfig, MediaProbe, Playlist}; -pub async fn validate_playlist(playlist: Playlist, config: GlobalConfig) { +pub async fn validate_playlist(playlist: Playlist, is_terminated: Arc>, config: GlobalConfig) { let date = playlist.date; let length = config.playlist.length_sec.unwrap(); let mut start_sec = 0.0; @@ -12,6 +12,10 @@ pub async fn validate_playlist(playlist: Playlist, config: GlobalConfig) { debug!("validate playlist from: {date}"); for item in playlist.program.iter() { + if *is_terminated.lock().unwrap() { + break + } + if Path::new(&item.source).is_file() { let probe = MediaProbe::new(item.source.clone()); @@ -33,7 +37,7 @@ pub async fn validate_playlist(playlist: Playlist, config: GlobalConfig) { start_sec += item.out - item.seek; } - if length > start_sec { + if length > start_sec && !*is_terminated.lock().unwrap() { error!( "Playlist from {date} not long enough, {} needed!", sec_to_time(length - start_sec), diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 9ec24cdc..c4efccf4 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -7,12 +7,10 @@ use std::{ io::{BufRead, BufReader, Error}, path::Path, process::ChildStderr, - sync::{Arc, Mutex}, time, time::UNIX_EPOCH, }; -use process_control::Terminator; use simplelog::*; mod arg_parse; @@ -311,9 +309,7 @@ pub fn seek_and_length(src: String, seek: f64, out: f64, duration: f64) -> Vec>>, - is_terminated: Arc>, + suffix: String ) -> Result<(), Error> { // read ffmpeg stderr decoder and encoder instance // and log the output @@ -344,18 +340,6 @@ pub async fn stderr_reader( format_line(line.clone(), "error".to_string()) ); } - - if line.contains("Error closing file pipe:: Broken pipe") { - *is_terminated.lock().unwrap() = true; - - if let Some(server) = &*server_term.lock().unwrap() { - unsafe { - if let Ok(_) = server.terminate() { - info!("Terminate ingest server"); - } - } - }; - } } }