From e988953212dd2464593cc75829e5447dd3f54ada Mon Sep 17 00:00:00 2001 From: jb-alvarado Date: Mon, 21 Mar 2022 14:36:09 +0100 Subject: [PATCH] use sync_channel for correct server transfer --- Cargo.lock | 193 ++++++++++++++++++++++++++++++++++++--- Cargo.toml | 4 +- examples/pipe_ffmpeg2.rs | 189 ++++++++++++++++++++------------------ src/input/ingest.rs | 10 +- src/output/mod.rs | 16 ++-- 5 files changed, 300 insertions(+), 112 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 26e13903..eec376de 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -46,6 +46,12 @@ version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" +[[package]] +name = "bytes" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8" + [[package]] name = "cc" version = "1.0.73" @@ -143,7 +149,7 @@ dependencies = [ [[package]] name = "ffplayout-rs" -version = "0.4.0" +version = "0.5.0" dependencies = [ "chrono", "clap", @@ -300,7 +306,7 @@ checksum = "d39cd93900197114fa1fcb7ae84ca742095eed9442088988ae74fa744e930e77" dependencies = [ "cfg-if 1.0.0", "libc", - "wasi", + "wasi 0.10.2+wasi-snapshot-preview1", ] [[package]] @@ -460,6 +466,15 @@ version = "0.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7fb9b38af92608140b86b693604b9ffcc5824240a484d1ecd4795bacb2fe88f3" +[[package]] +name = "lock_api" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "88943dd7ef4a2e5a4bfa2753aaab3013e34ce2533d1996fb18ef591e315e2b3b" +dependencies = [ + "scopeguard", +] + [[package]] name = "log" version = "0.4.14" @@ -522,12 +537,26 @@ dependencies = [ "kernel32-sys", "libc", "log", - "miow", + "miow 0.2.2", "net2", "slab", "winapi 0.2.8", ] +[[package]] +name = "mio" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ba42135c6a5917b9db9cd7b293e5409e1c6b041e6f9825e92e55a894c63b6f8" +dependencies = [ + "libc", + "log", + "miow 0.3.7", + "ntapi", + "wasi 0.11.0+wasi-snapshot-preview1", + "winapi 0.3.9", +] + [[package]] name = "mio-extras" version = "2.0.6" @@ -536,7 +565,7 @@ checksum = "52403fe290012ce777c4626790c8951324a2b9e3316b3143779c72b029742f19" dependencies = [ "lazycell", "log", - "mio", + "mio 0.6.23", "slab", ] @@ -552,6 +581,15 @@ dependencies = [ "ws2_32-sys", ] +[[package]] +name = "miow" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9f1c5b025cda876f66ef43a113f91ebc9f4ccef34843000e0adf6ebbab84e21" +dependencies = [ + "winapi 0.3.9", +] + [[package]] name = "native-tls" version = "0.2.8" @@ -603,12 +641,21 @@ dependencies = [ "fsevent-sys", "inotify", "libc", - "mio", + "mio 0.6.23", "mio-extras", "walkdir", "winapi 0.3.9", ] +[[package]] +name = "ntapi" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c28774a7fd2fbb4f0babd8237ce554b73af68021b5f695a3cebd6c59bac0980f" +dependencies = [ + "winapi 0.3.9", +] + [[package]] name = "num-integer" version = "0.1.44" @@ -702,6 +749,29 @@ version = "1.5.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c69d19a208bba8b94bd27d4b7a06ad153cddc6b88cb2149a668e23ce7bdb67d5" +[[package]] +name = "parking_lot" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87f5ec2493a61ac0506c0f4199f99070cbe83857b0337006a30f3e6719b8ef58" +dependencies = [ + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28141e0cc4143da2443301914478dc976a61ffdb3f043058310c70df2fed8954" +dependencies = [ + "cfg-if 1.0.0", + "libc", + "redox_syscall", + "smallvec", + "windows-sys 0.32.0", +] + [[package]] name = "pin-project-lite" version = "0.2.8" @@ -766,7 +836,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "26d4fa9c62a51815c9588b09a94f713c1e9a87d74142537d7c7d5ee972b8479f" dependencies = [ "libc", - "windows-sys", + "windows-sys 0.33.0", ] [[package]] @@ -874,6 +944,12 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "scopeguard" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" + [[package]] name = "security-framework" version = "2.6.1" @@ -940,6 +1016,15 @@ dependencies = [ "yaml-rust", ] +[[package]] +name = "signal-hook-registry" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e51e73328dc4ac0c7ccbda3a494dfa03df1de2f46018127f60c693f2648455b0" +dependencies = [ + "libc", +] + [[package]] name = "simplelog" version = "0.11.2" @@ -958,6 +1043,22 @@ version = "0.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9def91fd1e018fe007022791f865d0ccc9b3a0d5001e01aabb8b40e46000afb5" +[[package]] +name = "smallvec" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2dd574626839106c320a323308629dcb1acfc96e32a8cba364ddc61ac23ee83" + +[[package]] +name = "socket2" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "66d72b759436ae32898a2af0a14218dbf55efde3feeb170eb623637db85ee1e0" +dependencies = [ + "libc", + "winapi 0.3.9", +] + [[package]] name = "strsim" version = "0.10.0" @@ -1035,8 +1136,29 @@ version = "1.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2af73ac49756f3f7c01172e34a23e5d0216f6c32333757c2c61feb2bbff5a5ee" dependencies = [ + "bytes", + "libc", + "memchr", + "mio 0.8.1", "num_cpus", + "once_cell", + "parking_lot", "pin-project-lite", + "signal-hook-registry", + "socket2", + "tokio-macros", + "winapi 0.3.9", +] + +[[package]] +name = "tokio-macros" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b557f72f448c511a979e2564e55d74e6c4432fc96ff4f6241bc6bded342643b7" +dependencies = [ + "proc-macro2", + "quote", + "syn", ] [[package]] @@ -1089,6 +1211,12 @@ version = "0.10.2+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6" +[[package]] +name = "wasi" +version = "0.11.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" + [[package]] name = "winapi" version = "0.2.8" @@ -1132,43 +1260,86 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows-sys" +version = "0.32.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3df6e476185f92a12c072be4a189a0210dcdcf512a1891d6dff9edb874deadc6" +dependencies = [ + "windows_aarch64_msvc 0.32.0", + "windows_i686_gnu 0.32.0", + "windows_i686_msvc 0.32.0", + "windows_x86_64_gnu 0.32.0", + "windows_x86_64_msvc 0.32.0", +] + [[package]] name = "windows-sys" version = "0.33.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "43dbb096663629518eb1dfa72d80243ca5a6aca764cae62a2df70af760a9be75" dependencies = [ - "windows_aarch64_msvc", - "windows_i686_gnu", - "windows_i686_msvc", - "windows_x86_64_gnu", - "windows_x86_64_msvc", + "windows_aarch64_msvc 0.33.0", + "windows_i686_gnu 0.33.0", + "windows_i686_msvc 0.33.0", + "windows_x86_64_gnu 0.33.0", + "windows_x86_64_msvc 0.33.0", ] +[[package]] +name = "windows_aarch64_msvc" +version = "0.32.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8e92753b1c443191654ec532f14c199742964a061be25d77d7a96f09db20bf5" + [[package]] name = "windows_aarch64_msvc" version = "0.33.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cd761fd3eb9ab8cc1ed81e56e567f02dd82c4c837e48ac3b2181b9ffc5060807" +[[package]] +name = "windows_i686_gnu" +version = "0.32.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a711c68811799e017b6038e0922cb27a5e2f43a2ddb609fe0b6f3eeda9de615" + [[package]] name = "windows_i686_gnu" version = "0.33.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cab0cf703a96bab2dc0c02c0fa748491294bf9b7feb27e1f4f96340f208ada0e" +[[package]] +name = "windows_i686_msvc" +version = "0.32.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "146c11bb1a02615db74680b32a68e2d61f553cc24c4eb5b4ca10311740e44172" + [[package]] name = "windows_i686_msvc" version = "0.33.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8cfdbe89cc9ad7ce618ba34abc34bbb6c36d99e96cae2245b7943cd75ee773d0" +[[package]] +name = "windows_x86_64_gnu" +version = "0.32.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c912b12f7454c6620635bbff3450962753834be2a594819bd5e945af18ec64bc" + [[package]] name = "windows_x86_64_gnu" version = "0.33.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b4dd9b0c0e9ece7bb22e84d70d01b71c6d6248b81a3c60d11869451b4cb24784" +[[package]] +name = "windows_x86_64_msvc" +version = "0.32.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "504a2476202769977a040c6364301a3f65d0cc9e3fb08600b2bda150a0488316" + [[package]] name = "windows_x86_64_msvc" version = "0.33.0" diff --git a/Cargo.toml b/Cargo.toml index 879fbcf5..4ae8a3c7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ffplayout-rs" -version = "0.4.0" +version = "0.5.0" edition = "2021" [dependencies] @@ -19,7 +19,7 @@ serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" serde_yaml = "0.8" simplelog = { version = "^0.11.2", features = ["paris"] } -tokio = { version = "1.16.1", features = ["rt-multi-thread"] } +tokio = { version = "1.16.1", features = ["full"] } walkdir = "2" [target.x86_64-unknown-linux-musl.dependencies] diff --git a/examples/pipe_ffmpeg2.rs b/examples/pipe_ffmpeg2.rs index b9efbabc..3f6f3c02 100644 --- a/examples/pipe_ffmpeg2.rs +++ b/examples/pipe_ffmpeg2.rs @@ -1,8 +1,8 @@ use std::{ - io::{prelude::*, Error, Read}, + io::{prelude::*, BufReader, Error, Read}, process::{Command, Stdio}, sync::{ - mpsc::{channel, Receiver, Sender}, + mpsc::{sync_channel, Receiver, SyncSender}, Arc, Mutex, }, thread::sleep, @@ -14,11 +14,12 @@ use tokio::runtime::Runtime; async fn ingest_server( dec_setting: Vec<&str>, - ingest_sender: Sender<[u8; 65424]>, + ingest_sender: SyncSender<(usize, [u8; 65088])>, proc_terminator: Arc>>, is_terminated: Arc>, + server_is_running: Arc>, ) -> Result<(), Error> { - let mut buffer: [u8; 65424] = [0; 65424]; + let mut buffer: [u8; 65088] = [0; 65088]; let filter = "[0:v]fps=25,scale=1024:576,setdar=dar=1.778[vout1]"; let mut filter_list = vec!["-filter_complex", &filter, "-map", "[vout1]", "-map", "0:a"]; let mut server_cmd = vec!["-hide_banner", "-nostats", "-v", "error"]; @@ -36,6 +37,8 @@ async fn ingest_server( server_cmd.append(&mut filter_list); server_cmd.append(&mut dec_setting.clone()); + let mut is_running; + loop { if *is_terminated.lock().unwrap() { break; @@ -55,39 +58,54 @@ async fn ingest_server( let serv_terminator = server_proc.terminator()?; *proc_terminator.lock().unwrap() = Some(serv_terminator); let ingest_reader = server_proc.stdout.as_mut().unwrap(); + is_running = false; loop { - if *is_terminated.lock().unwrap() { - break; - } - - match ingest_reader.read_exact(&mut buffer[..]) { + let bytes_len = match ingest_reader.read(&mut buffer[..]) { Ok(length) => length, - Err(_) => break, + Err(e) => { + println!("Reading error from ingest server: {:?}", e); + + break; + } }; - if let Err(e) = ingest_sender.send(buffer) { - println!("Ingest server error: {:?}", e); + if !is_running { + *server_is_running.lock().unwrap() = true; + is_running = true; + } + + if bytes_len > 0 { + if let Err(e) = ingest_sender.send((bytes_len, buffer)) { + println!("Ingest server write error: {:?}", e); + + *is_terminated.lock().unwrap() = true; + break; + } + } else { break; } } + *server_is_running.lock().unwrap() = false; + sleep(Duration::from_secs(1)); + if let Err(e) = server_proc.kill() { + print!("Ingest server {:?}", e) + }; + if let Err(e) = server_proc.wait() { panic!("Decoder error: {:?}", e) }; } - println!("after server loop"); - Ok(()) } fn main() { - let decoder_term: Arc>> = Arc::new(Mutex::new(None)); - let player_term: Arc>> = Arc::new(Mutex::new(None)); let server_term: Arc>> = Arc::new(Mutex::new(None)); let is_terminated: Arc> = Arc::new(Mutex::new(false)); + let server_is_running: Arc> = Arc::new(Mutex::new(false)); let dec_setting: Vec<&str> = vec![ "-pix_fmt", @@ -117,7 +135,7 @@ fn main() { "-", ]; - let player_proc = match Command::new("ffplay") + let mut player_proc = match Command::new("ffplay") .args(["-v", "error", "-hide_banner", "-nostats", "-i", "pipe:0"]) .stdin(Stdio::piped()) .spawn() @@ -126,13 +144,10 @@ fn main() { Ok(proc) => proc, }; - let player_terminator = match player_proc.terminator() { - Ok(proc) => Some(proc), - Err(_) => None, - }; - - *player_term.lock().unwrap() = player_terminator; - let (ingest_sender, ingest_receiver): (Sender<[u8; 65424]>, Receiver<[u8; 65424]>) = channel(); + let (ingest_sender, ingest_receiver): ( + SyncSender<(usize, [u8; 65088])>, + Receiver<(usize, [u8; 65088])>, + ) = sync_channel(1); let runtime = Runtime::new().unwrap(); runtime.spawn(ingest_server( @@ -140,9 +155,10 @@ fn main() { ingest_sender, server_term.clone(), is_terminated.clone(), + server_is_running.clone(), )); - let mut buffer: [u8; 65424] = [0; 65424]; + let mut buffer: [u8; 65088] = [0; 65088]; let mut dec_cmd = vec![ "-v", @@ -152,11 +168,11 @@ fn main() { "-f", "lavfi", "-i", - "testsrc=duration=20:size=1024x576:rate=25", + "testsrc=duration=120:size=1024x576:rate=25", "-f", "lavfi", "-i", - "anoisesrc=d=20:c=pink:r=48000:a=0.5", + "anoisesrc=d=120:c=pink:r=48000:a=0.5", ]; dec_cmd.append(&mut dec_setting.clone()); @@ -170,78 +186,77 @@ fn main() { Ok(proc) => proc, }; - let dec_terminator = match dec_proc.terminator() { - Ok(proc) => Some(proc), - Err(_) => None, - }; - - *decoder_term.lock().unwrap() = dec_terminator; let mut player_writer = player_proc.stdin.as_ref().unwrap(); + let mut dec_reader = BufReader::new(dec_proc.stdout.take().unwrap()); - let dec_reader = dec_proc.stdout.as_mut().unwrap(); + let mut live_on = false; - 'outer: loop { - let bytes_len = match dec_reader.read(&mut buffer[..]) { - Ok(length) => length, - Err(e) => panic!("Reading error from decoder: {:?}", e), - }; + let mut count = 0; - if let Ok(receive) = ingest_receiver.try_recv() { - println!("in receiver"); - if let Err(e) = player_writer.write_all(&receive) { - panic!("Err: {:?}", e) + loop { + count += 1; + + if *server_is_running.lock().unwrap() { + if let Ok(receive) = ingest_receiver.try_recv() { + if let Err(e) = player_writer.write(&receive.1[..receive.0]) { + println!("Ingest receiver error: {:?}", e); + + break; + }; + } + + if !live_on { + println!("Switch from offline source to live"); + + live_on = true; + } + } else { + println!("{count}"); + let dec_bytes_len = match dec_reader.read(&mut buffer[..]) { + Ok(length) => length, + Err(e) => { + println!("Reading error from decoder: {:?}", e); + + break; + } }; - continue; + + if dec_bytes_len > 0 { + if let Err(e) = player_writer.write(&buffer[..dec_bytes_len]) { + println!("Encoder write error: {:?}", e); + + break; + }; + } else { + if live_on { + println!("Switch from live ingest to offline source"); + + live_on = false; + } + + player_writer.flush().unwrap(); + } } - - if let Err(e) = player_writer.write(&buffer[..bytes_len]) { - println!("write to player: {:?}", e); - - break 'outer - }; - - if bytes_len == 0 { - break; - } - } *is_terminated.lock().unwrap() = true; + if let Some(server) = &*server_term.lock().unwrap() { + unsafe { + if let Ok(_) = server.terminate() { + println!("Terminate ingest server done"); + } + } + }; + sleep(Duration::from_secs(1)); - println!("Terminate decoder..."); - - match &*decoder_term.lock().unwrap() { - Some(dec) => unsafe { - if let Ok(_) = dec.terminate() { - println!("Terminate decoder done"); - } - }, - None => (), + match player_proc.kill() { + Ok(_) => println!("Playout done..."), + Err(e) => panic!("Encoder error: {:?}", e), } - println!("Terminate encoder..."); - - match &*player_term.lock().unwrap() { - Some(enc) => unsafe { - if let Ok(_) = enc.terminate() { - println!("Terminate encoder done"); - } - }, - None => (), - } - - println!("Terminate server..."); - - match &*server_term.lock().unwrap() { - Some(serv) => unsafe { - if let Ok(_) = serv.terminate() { - println!("Terminate server done"); - } - }, - None => (), - } - - println!("Terminate done..."); + if let Err(e) = player_proc.wait() { + println!("Encoder: {e}") + }; } diff --git a/src/input/ingest.rs b/src/input/ingest.rs index eca8b768..e2a61242 100644 --- a/src/input/ingest.rs +++ b/src/input/ingest.rs @@ -1,8 +1,8 @@ use std::{ - io::{Error, Read}, + io::{BufReader, Error, Read}, path::Path, process::{Command, Stdio}, - sync::{mpsc::Sender, Arc, Mutex}, + sync::{mpsc::SyncSender, Arc, Mutex}, thread::sleep, time::Duration, }; @@ -55,14 +55,14 @@ fn audio_filter(config: &GlobalConfig) -> String { pub async fn ingest_server( log_format: String, - ingest_sender: Sender<(usize, [u8; 32256])>, + ingest_sender: SyncSender<(usize, [u8; 65088])>, rt_handle: Handle, proc_terminator: Arc>>, is_terminated: Arc>, server_is_running: Arc>, ) -> Result<(), Error> { let config = GlobalConfig::global(); - let mut buffer: [u8; 32256] = [0; 32256]; + let mut buffer: [u8; 65088] = [0; 65088]; let mut filter = format!( "[0:v]fps={},scale={}:{},setdar=dar={}", config.processing.fps, @@ -125,7 +125,7 @@ pub async fn ingest_server( "Server".to_string(), )); - let ingest_reader = server_proc.stdout.as_mut().unwrap(); + let mut ingest_reader = BufReader::new(server_proc.stdout.take().unwrap()); is_running = false; loop { diff --git a/src/output/mod.rs b/src/output/mod.rs index 9c413913..d3b7bfc6 100644 --- a/src/output/mod.rs +++ b/src/output/mod.rs @@ -5,7 +5,7 @@ use std::{ process, process::{Command, Stdio}, sync::{ - mpsc::{channel, Receiver, Sender}, + mpsc::{channel, sync_channel, Receiver, SyncSender}, Arc, Mutex, }, thread::sleep, @@ -33,7 +33,7 @@ pub fn play(rt_handle: &Handle) { let mut init_playlist: Option>> = None; let mut live_on = false; - let mut buffer: [u8; 65424] = [0; 65424]; + let mut buffer: [u8; 65088] = [0; 65088]; let get_source = match config.processing.clone().mode.as_str() { "folder" => { @@ -83,9 +83,9 @@ pub fn play(rt_handle: &Handle) { )); let (ingest_sender, ingest_receiver): ( - Sender<(usize, [u8; 32256])>, - Receiver<(usize, [u8; 32256])>, - ) = channel(); + SyncSender<(usize, [u8; 65088])>, + Receiver<(usize, [u8; 65088])>, + ) = sync_channel(4); if config.ingest.enable { rt_handle.spawn(ingest_server( @@ -162,12 +162,14 @@ pub fn play(rt_handle: &Handle) { live_on = true; if kill_dec { + info!("Switch from {} to live ingest", config.processing.mode); + if let Err(e) = dec_proc.kill() { - panic!("Decoder error: {:?}", e) + error!("Decoder error: {e}") }; if let Err(e) = dec_proc.wait() { - panic!("Decoder error: {:?}", e) + error!("Decoder error: {e}") }; kill_dec = false;