better error handling on broken pipe

This commit is contained in:
jb-alvarado 2022-03-19 20:53:01 +01:00
parent 5e7df9c3b1
commit 25fccf1a67
7 changed files with 282 additions and 39 deletions

247
examples/pipe_ffmpeg2.rs Normal file
View File

@ -0,0 +1,247 @@
use std::{
io::{prelude::*, Error, Read},
process::{Command, Stdio},
sync::{
mpsc::{channel, Receiver, Sender},
Arc, Mutex,
},
thread::sleep,
time::Duration,
};
use process_control::{ChildExt, Terminator};
use tokio::runtime::Runtime;
async fn ingest_server(
dec_setting: Vec<&str>,
ingest_sender: Sender<[u8; 65424]>,
proc_terminator: Arc<Mutex<Option<Terminator>>>,
is_terminated: Arc<Mutex<bool>>,
) -> Result<(), Error> {
let mut buffer: [u8; 65424] = [0; 65424];
let filter = "[0:v]fps=25,scale=1024:576,setdar=dar=1.778[vout1]";
let mut filter_list = vec!["-filter_complex", &filter, "-map", "[vout1]", "-map", "0:a"];
let mut server_cmd = vec!["-hide_banner", "-nostats", "-v", "error"];
let mut stream_input = vec![
"-f",
"live_flv",
"-listen",
"1",
"-i",
"rtmp://localhost:1936/live/stream",
];
server_cmd.append(&mut stream_input);
server_cmd.append(&mut filter_list);
server_cmd.append(&mut dec_setting.clone());
loop {
if *is_terminated.lock().unwrap() {
break;
}
let mut server_proc = match Command::new("ffmpeg")
.args(server_cmd.clone())
.stdout(Stdio::piped())
.spawn()
{
Err(e) => {
panic!("couldn't spawn ingest server: {}", e)
}
Ok(proc) => proc,
};
let serv_terminator = server_proc.terminator()?;
*proc_terminator.lock().unwrap() = Some(serv_terminator);
let ingest_reader = server_proc.stdout.as_mut().unwrap();
loop {
if *is_terminated.lock().unwrap() {
break;
}
match ingest_reader.read_exact(&mut buffer[..]) {
Ok(length) => length,
Err(_) => break,
};
if let Err(e) = ingest_sender.send(buffer) {
println!("Ingest server error: {:?}", e);
break;
}
}
sleep(Duration::from_secs(1));
if let Err(e) = server_proc.wait() {
panic!("Decoder error: {:?}", e)
};
}
println!("after server loop");
Ok(())
}
fn main() {
let decoder_term: Arc<Mutex<Option<Terminator>>> = Arc::new(Mutex::new(None));
let player_term: Arc<Mutex<Option<Terminator>>> = Arc::new(Mutex::new(None));
let server_term: Arc<Mutex<Option<Terminator>>> = Arc::new(Mutex::new(None));
let is_terminated: Arc<Mutex<bool>> = Arc::new(Mutex::new(false));
let dec_setting: Vec<&str> = vec![
"-pix_fmt",
"yuv420p",
"-c:v",
"mpeg2video",
"-g",
"1",
"-b:v",
"50000k",
"-minrate",
"50000k",
"-maxrate",
"50000k",
"-bufsize",
"25000k",
"-c:a",
"s302m",
"-strict",
"-2",
"-ar",
"48000",
"-ac",
"2",
"-f",
"mpegts",
"-",
];
let player_proc = match Command::new("ffplay")
.args(["-v", "error", "-hide_banner", "-nostats", "-i", "pipe:0"])
.stdin(Stdio::piped())
.spawn()
{
Err(e) => panic!("couldn't spawn ffplay: {}", e),
Ok(proc) => proc,
};
let player_terminator = match player_proc.terminator() {
Ok(proc) => Some(proc),
Err(_) => None,
};
*player_term.lock().unwrap() = player_terminator;
let (ingest_sender, ingest_receiver): (Sender<[u8; 65424]>, Receiver<[u8; 65424]>) = channel();
let runtime = Runtime::new().unwrap();
runtime.spawn(ingest_server(
dec_setting.clone(),
ingest_sender,
server_term.clone(),
is_terminated.clone(),
));
let mut buffer: [u8; 65424] = [0; 65424];
let mut dec_cmd = vec![
"-v",
"error",
"-hide_banner",
"-nostats",
"-f",
"lavfi",
"-i",
"testsrc=duration=20:size=1024x576:rate=25",
"-f",
"lavfi",
"-i",
"anoisesrc=d=20:c=pink:r=48000:a=0.5",
];
dec_cmd.append(&mut dec_setting.clone());
let mut dec_proc = match Command::new("ffmpeg")
.args(dec_cmd)
.stdout(Stdio::piped())
.spawn()
{
Err(e) => panic!("couldn't spawn ffmpeg: {}", e),
Ok(proc) => proc,
};
let dec_terminator = match dec_proc.terminator() {
Ok(proc) => Some(proc),
Err(_) => None,
};
*decoder_term.lock().unwrap() = dec_terminator;
let mut player_writer = player_proc.stdin.as_ref().unwrap();
let dec_reader = dec_proc.stdout.as_mut().unwrap();
'outer: loop {
let bytes_len = match dec_reader.read(&mut buffer[..]) {
Ok(length) => length,
Err(e) => panic!("Reading error from decoder: {:?}", e),
};
if let Ok(receive) = ingest_receiver.try_recv() {
println!("in receiver");
if let Err(e) = player_writer.write_all(&receive) {
panic!("Err: {:?}", e)
};
continue;
}
if let Err(e) = player_writer.write(&buffer[..bytes_len]) {
println!("write to player: {:?}", e);
break 'outer
};
if bytes_len == 0 {
break;
}
}
*is_terminated.lock().unwrap() = true;
sleep(Duration::from_secs(1));
println!("Terminate decoder...");
match &*decoder_term.lock().unwrap() {
Some(dec) => unsafe {
if let Ok(_) = dec.terminate() {
println!("Terminate decoder done");
}
},
None => (),
}
println!("Terminate encoder...");
match &*player_term.lock().unwrap() {
Some(enc) => unsafe {
if let Ok(_) = enc.terminate() {
println!("Terminate encoder done");
}
},
None => (),
}
println!("Terminate server...");
match &*server_term.lock().unwrap() {
Some(serv) => unsafe {
if let Ok(_) = serv.terminate() {
println!("Terminate server done");
}
},
None => (),
}
println!("Terminate done...");
}

View File

@ -90,15 +90,16 @@ pub async fn ingest_server(
rt_handle.spawn(stderr_reader( rt_handle.spawn(stderr_reader(
server_proc.stderr.take().unwrap(), server_proc.stderr.take().unwrap(),
"Server".to_string(), "Server".to_string(),
proc_terminator.clone(),
is_terminated.clone(),
)); ));
let ingest_reader = server_proc.stdout.as_mut().unwrap(); let ingest_reader = server_proc.stdout.as_mut().unwrap();
loop { loop {
if let Err(e) = ingest_reader.read_exact(&mut buffer[..]) { if let Err(e) = ingest_reader.read_exact(&mut buffer[..]) {
debug!("Ingest server read {:?}", e); if !e.to_string().contains("failed to fill whole buffer") {
debug!("Ingest server read {:?}", e);
}
break; break;
}; };

View File

@ -22,12 +22,13 @@ pub struct CurrentProgram {
pub init: Arc<Mutex<bool>>, pub init: Arc<Mutex<bool>>,
index: usize, index: usize,
rt_handle: Handle, rt_handle: Handle,
is_terminated: Arc<Mutex<bool>>,
} }
impl CurrentProgram { impl CurrentProgram {
pub fn new(rt_handle: Handle) -> Self { pub fn new(rt_handle: Handle, is_terminated: Arc<Mutex<bool>>) -> Self {
let config = GlobalConfig::global(); let config = GlobalConfig::global();
let json = read_json(rt_handle.clone(), true, 0.0); let json = read_json(rt_handle.clone(), is_terminated.clone(), true, 0.0);
Self { Self {
config: config.clone(), config: config.clone(),
@ -39,12 +40,13 @@ impl CurrentProgram {
init: Arc::new(Mutex::new(true)), init: Arc::new(Mutex::new(true)),
index: 0, index: 0,
rt_handle, rt_handle,
is_terminated,
} }
} }
fn check_update(&mut self, seek: bool) { fn check_update(&mut self, seek: bool) {
if self.json_path.is_none() { if self.json_path.is_none() {
let json = read_json(self.rt_handle.clone(), seek, 0.0); let json = read_json(self.rt_handle.clone(), self.is_terminated.clone(), seek, 0.0);
self.json_path = json.current_file; self.json_path = json.current_file;
self.json_mod = json.modified; self.json_mod = json.modified;
@ -58,7 +60,7 @@ impl CurrentProgram {
.eq(&self.json_mod.clone().unwrap()) .eq(&self.json_mod.clone().unwrap())
{ {
// when playlist has changed, reload it // when playlist has changed, reload it
let json = read_json(self.rt_handle.clone(), false, 0.0); let json = read_json(self.rt_handle.clone(), self.is_terminated.clone(), false, 0.0);
self.json_mod = json.modified; self.json_mod = json.modified;
self.nodes = json.program; self.nodes = json.program;
@ -98,7 +100,12 @@ impl CurrentProgram {
|| is_close(total_delta, 0.0, 2.0) || is_close(total_delta, 0.0, 2.0)
|| is_close(total_delta, target_length, 2.0) || is_close(total_delta, target_length, 2.0)
{ {
let json = read_json(self.rt_handle.clone(), false, next_start); let json = read_json(
self.rt_handle.clone(),
self.is_terminated.clone(),
false,
next_start,
);
self.json_path = json.current_file.clone(); self.json_path = json.current_file.clone();
self.json_mod = json.modified; self.json_mod = json.modified;

View File

@ -62,7 +62,7 @@ pub fn play(rt_handle: &Handle) {
} }
"playlist" => { "playlist" => {
info!("Playout in playlist mode"); info!("Playout in playlist mode");
let program = CurrentProgram::new(rt_handle.clone()); let program = CurrentProgram::new(rt_handle.clone(), is_terminated.clone());
init_playlist = Some(program.init.clone()); init_playlist = Some(program.init.clone());
Box::new(program) as Box<dyn Iterator<Item = Media>> Box::new(program) as Box<dyn Iterator<Item = Media>>
} }
@ -87,8 +87,6 @@ pub fn play(rt_handle: &Handle) {
rt_handle.spawn(stderr_reader( rt_handle.spawn(stderr_reader(
enc_proc.stderr.take().unwrap(), enc_proc.stderr.take().unwrap(),
"Encoder".to_string(), "Encoder".to_string(),
server_term.clone(),
is_terminated.clone(),
)); ));
let (ingest_sender, ingest_receiver): (Sender<[u8; 65424]>, Receiver<([u8; 65424])>) = channel(); let (ingest_sender, ingest_receiver): (Sender<[u8; 65424]>, Receiver<([u8; 65424])>) = channel();
@ -103,7 +101,7 @@ pub fn play(rt_handle: &Handle) {
)); ));
} }
for node in get_source { 'source_iter: for node in get_source {
let cmd = match node.cmd { let cmd = match node.cmd {
Some(cmd) => cmd, Some(cmd) => cmd,
None => break, None => break,
@ -156,8 +154,6 @@ pub fn play(rt_handle: &Handle) {
rt_handle.spawn(stderr_reader( rt_handle.spawn(stderr_reader(
dec_proc.stderr.take().unwrap(), dec_proc.stderr.take().unwrap(),
"Decoder".to_string(), "Decoder".to_string(),
server_term.clone(),
is_terminated.clone(),
)); ));
let mut kill_dec = true; let mut kill_dec = true;
@ -170,7 +166,9 @@ pub fn play(rt_handle: &Handle) {
if let Ok(receive) = ingest_receiver.try_recv() { if let Ok(receive) = ingest_receiver.try_recv() {
if let Err(e) = enc_writer.write_all(&receive) { if let Err(e) = enc_writer.write_all(&receive) {
panic!("Ingest receiver error: {:?}", e) error!("Ingest receiver error: {:?}", e);
break 'source_iter
}; };
live_on = true; live_on = true;
@ -192,7 +190,9 @@ pub fn play(rt_handle: &Handle) {
} }
} else if dec_bytes_len > 0 { } else if dec_bytes_len > 0 {
if let Err(e) = enc_writer.write(&buffer[..dec_bytes_len]) { if let Err(e) = enc_writer.write(&buffer[..dec_bytes_len]) {
panic!("Encoder write error: {:?}", e) error!("Encoder write error: {:?}", e);
break 'source_iter
}; };
} else { } else {
if live_on { if live_on {

View File

@ -1,5 +1,5 @@
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::{fs::File, path::Path}; use std::{fs::File, path::Path, sync::{Arc, Mutex}};
use simplelog::*; use simplelog::*;
use tokio::runtime::Handle; use tokio::runtime::Handle;
@ -33,7 +33,7 @@ impl Playlist {
} }
} }
pub fn read_json(rt_handle: Handle, seek: bool, next_start: f64) -> Playlist { pub fn read_json(rt_handle: Handle, is_terminated: Arc<Mutex<bool>>, seek: bool, next_start: f64) -> Playlist {
let config = GlobalConfig::global(); let config = GlobalConfig::global();
let mut playlist_path = Path::new(&config.playlist.path).to_owned(); let mut playlist_path = Path::new(&config.playlist.path).to_owned();
@ -82,7 +82,7 @@ pub fn read_json(rt_handle: Handle, seek: bool, next_start: f64) -> Playlist {
start_sec += item.out - item.seek; start_sec += item.out - item.seek;
} }
rt_handle.spawn(validate_playlist(playlist.clone(), config.clone())); rt_handle.spawn(validate_playlist(playlist.clone(), is_terminated, config.clone()));
playlist playlist
} }

View File

@ -1,10 +1,10 @@
use std::path::Path; use std::{path::Path, sync::{Arc, Mutex},};
use simplelog::*; use simplelog::*;
use crate::utils::{sec_to_time, GlobalConfig, MediaProbe, Playlist}; use crate::utils::{sec_to_time, GlobalConfig, MediaProbe, Playlist};
pub async fn validate_playlist(playlist: Playlist, config: GlobalConfig) { pub async fn validate_playlist(playlist: Playlist, is_terminated: Arc<Mutex<bool>>, config: GlobalConfig) {
let date = playlist.date; let date = playlist.date;
let length = config.playlist.length_sec.unwrap(); let length = config.playlist.length_sec.unwrap();
let mut start_sec = 0.0; let mut start_sec = 0.0;
@ -12,6 +12,10 @@ pub async fn validate_playlist(playlist: Playlist, config: GlobalConfig) {
debug!("validate playlist from: <yellow>{date}</>"); debug!("validate playlist from: <yellow>{date}</>");
for item in playlist.program.iter() { for item in playlist.program.iter() {
if *is_terminated.lock().unwrap() {
break
}
if Path::new(&item.source).is_file() { if Path::new(&item.source).is_file() {
let probe = MediaProbe::new(item.source.clone()); let probe = MediaProbe::new(item.source.clone());
@ -33,7 +37,7 @@ pub async fn validate_playlist(playlist: Playlist, config: GlobalConfig) {
start_sec += item.out - item.seek; start_sec += item.out - item.seek;
} }
if length > start_sec { if length > start_sec && !*is_terminated.lock().unwrap() {
error!( error!(
"Playlist from <yellow>{date}</> not long enough, <yellow>{}</> needed!", "Playlist from <yellow>{date}</> not long enough, <yellow>{}</> needed!",
sec_to_time(length - start_sec), sec_to_time(length - start_sec),

View File

@ -7,12 +7,10 @@ use std::{
io::{BufRead, BufReader, Error}, io::{BufRead, BufReader, Error},
path::Path, path::Path,
process::ChildStderr, process::ChildStderr,
sync::{Arc, Mutex},
time, time,
time::UNIX_EPOCH, time::UNIX_EPOCH,
}; };
use process_control::Terminator;
use simplelog::*; use simplelog::*;
mod arg_parse; mod arg_parse;
@ -311,9 +309,7 @@ pub fn seek_and_length(src: String, seek: f64, out: f64, duration: f64) -> Vec<S
pub async fn stderr_reader( pub async fn stderr_reader(
std_errors: ChildStderr, std_errors: ChildStderr,
suffix: String, suffix: String
server_term: Arc<Mutex<Option<Terminator>>>,
is_terminated: Arc<Mutex<bool>>,
) -> Result<(), Error> { ) -> Result<(), Error> {
// read ffmpeg stderr decoder and encoder instance // read ffmpeg stderr decoder and encoder instance
// and log the output // and log the output
@ -344,18 +340,6 @@ pub async fn stderr_reader(
format_line(line.clone(), "error".to_string()) format_line(line.clone(), "error".to_string())
); );
} }
if line.contains("Error closing file pipe:: Broken pipe") {
*is_terminated.lock().unwrap() = true;
if let Some(server) = &*server_term.lock().unwrap() {
unsafe {
if let Ok(_) = server.terminate() {
info!("Terminate ingest server");
}
}
};
}
} }
} }