use sync_channel for correct server transfer

This commit is contained in:
jb-alvarado 2022-03-21 14:36:09 +01:00
parent 151508d4f1
commit e988953212
5 changed files with 300 additions and 112 deletions

193
Cargo.lock generated
View File

@ -46,6 +46,12 @@ version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
name = "bytes"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8"
[[package]]
name = "cc"
version = "1.0.73"
@ -143,7 +149,7 @@ dependencies = [
[[package]]
name = "ffplayout-rs"
version = "0.4.0"
version = "0.5.0"
dependencies = [
"chrono",
"clap",
@ -300,7 +306,7 @@ checksum = "d39cd93900197114fa1fcb7ae84ca742095eed9442088988ae74fa744e930e77"
dependencies = [
"cfg-if 1.0.0",
"libc",
"wasi",
"wasi 0.10.2+wasi-snapshot-preview1",
]
[[package]]
@ -460,6 +466,15 @@ version = "0.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7fb9b38af92608140b86b693604b9ffcc5824240a484d1ecd4795bacb2fe88f3"
[[package]]
name = "lock_api"
version = "0.4.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "88943dd7ef4a2e5a4bfa2753aaab3013e34ce2533d1996fb18ef591e315e2b3b"
dependencies = [
"scopeguard",
]
[[package]]
name = "log"
version = "0.4.14"
@ -522,12 +537,26 @@ dependencies = [
"kernel32-sys",
"libc",
"log",
"miow",
"miow 0.2.2",
"net2",
"slab",
"winapi 0.2.8",
]
[[package]]
name = "mio"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7ba42135c6a5917b9db9cd7b293e5409e1c6b041e6f9825e92e55a894c63b6f8"
dependencies = [
"libc",
"log",
"miow 0.3.7",
"ntapi",
"wasi 0.11.0+wasi-snapshot-preview1",
"winapi 0.3.9",
]
[[package]]
name = "mio-extras"
version = "2.0.6"
@ -536,7 +565,7 @@ checksum = "52403fe290012ce777c4626790c8951324a2b9e3316b3143779c72b029742f19"
dependencies = [
"lazycell",
"log",
"mio",
"mio 0.6.23",
"slab",
]
@ -552,6 +581,15 @@ dependencies = [
"ws2_32-sys",
]
[[package]]
name = "miow"
version = "0.3.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b9f1c5b025cda876f66ef43a113f91ebc9f4ccef34843000e0adf6ebbab84e21"
dependencies = [
"winapi 0.3.9",
]
[[package]]
name = "native-tls"
version = "0.2.8"
@ -603,12 +641,21 @@ dependencies = [
"fsevent-sys",
"inotify",
"libc",
"mio",
"mio 0.6.23",
"mio-extras",
"walkdir",
"winapi 0.3.9",
]
[[package]]
name = "ntapi"
version = "0.3.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c28774a7fd2fbb4f0babd8237ce554b73af68021b5f695a3cebd6c59bac0980f"
dependencies = [
"winapi 0.3.9",
]
[[package]]
name = "num-integer"
version = "0.1.44"
@ -702,6 +749,29 @@ version = "1.5.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c69d19a208bba8b94bd27d4b7a06ad153cddc6b88cb2149a668e23ce7bdb67d5"
[[package]]
name = "parking_lot"
version = "0.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87f5ec2493a61ac0506c0f4199f99070cbe83857b0337006a30f3e6719b8ef58"
dependencies = [
"lock_api",
"parking_lot_core",
]
[[package]]
name = "parking_lot_core"
version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "28141e0cc4143da2443301914478dc976a61ffdb3f043058310c70df2fed8954"
dependencies = [
"cfg-if 1.0.0",
"libc",
"redox_syscall",
"smallvec",
"windows-sys 0.32.0",
]
[[package]]
name = "pin-project-lite"
version = "0.2.8"
@ -766,7 +836,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26d4fa9c62a51815c9588b09a94f713c1e9a87d74142537d7c7d5ee972b8479f"
dependencies = [
"libc",
"windows-sys",
"windows-sys 0.33.0",
]
[[package]]
@ -874,6 +944,12 @@ dependencies = [
"winapi 0.3.9",
]
[[package]]
name = "scopeguard"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
[[package]]
name = "security-framework"
version = "2.6.1"
@ -940,6 +1016,15 @@ dependencies = [
"yaml-rust",
]
[[package]]
name = "signal-hook-registry"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e51e73328dc4ac0c7ccbda3a494dfa03df1de2f46018127f60c693f2648455b0"
dependencies = [
"libc",
]
[[package]]
name = "simplelog"
version = "0.11.2"
@ -958,6 +1043,22 @@ version = "0.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9def91fd1e018fe007022791f865d0ccc9b3a0d5001e01aabb8b40e46000afb5"
[[package]]
name = "smallvec"
version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f2dd574626839106c320a323308629dcb1acfc96e32a8cba364ddc61ac23ee83"
[[package]]
name = "socket2"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "66d72b759436ae32898a2af0a14218dbf55efde3feeb170eb623637db85ee1e0"
dependencies = [
"libc",
"winapi 0.3.9",
]
[[package]]
name = "strsim"
version = "0.10.0"
@ -1035,8 +1136,29 @@ version = "1.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2af73ac49756f3f7c01172e34a23e5d0216f6c32333757c2c61feb2bbff5a5ee"
dependencies = [
"bytes",
"libc",
"memchr",
"mio 0.8.1",
"num_cpus",
"once_cell",
"parking_lot",
"pin-project-lite",
"signal-hook-registry",
"socket2",
"tokio-macros",
"winapi 0.3.9",
]
[[package]]
name = "tokio-macros"
version = "1.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b557f72f448c511a979e2564e55d74e6c4432fc96ff4f6241bc6bded342643b7"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
@ -1089,6 +1211,12 @@ version = "0.10.2+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6"
[[package]]
name = "wasi"
version = "0.11.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
[[package]]
name = "winapi"
version = "0.2.8"
@ -1132,43 +1260,86 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
[[package]]
name = "windows-sys"
version = "0.32.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3df6e476185f92a12c072be4a189a0210dcdcf512a1891d6dff9edb874deadc6"
dependencies = [
"windows_aarch64_msvc 0.32.0",
"windows_i686_gnu 0.32.0",
"windows_i686_msvc 0.32.0",
"windows_x86_64_gnu 0.32.0",
"windows_x86_64_msvc 0.32.0",
]
[[package]]
name = "windows-sys"
version = "0.33.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "43dbb096663629518eb1dfa72d80243ca5a6aca764cae62a2df70af760a9be75"
dependencies = [
"windows_aarch64_msvc",
"windows_i686_gnu",
"windows_i686_msvc",
"windows_x86_64_gnu",
"windows_x86_64_msvc",
"windows_aarch64_msvc 0.33.0",
"windows_i686_gnu 0.33.0",
"windows_i686_msvc 0.33.0",
"windows_x86_64_gnu 0.33.0",
"windows_x86_64_msvc 0.33.0",
]
[[package]]
name = "windows_aarch64_msvc"
version = "0.32.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d8e92753b1c443191654ec532f14c199742964a061be25d77d7a96f09db20bf5"
[[package]]
name = "windows_aarch64_msvc"
version = "0.33.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cd761fd3eb9ab8cc1ed81e56e567f02dd82c4c837e48ac3b2181b9ffc5060807"
[[package]]
name = "windows_i686_gnu"
version = "0.32.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a711c68811799e017b6038e0922cb27a5e2f43a2ddb609fe0b6f3eeda9de615"
[[package]]
name = "windows_i686_gnu"
version = "0.33.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cab0cf703a96bab2dc0c02c0fa748491294bf9b7feb27e1f4f96340f208ada0e"
[[package]]
name = "windows_i686_msvc"
version = "0.32.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "146c11bb1a02615db74680b32a68e2d61f553cc24c4eb5b4ca10311740e44172"
[[package]]
name = "windows_i686_msvc"
version = "0.33.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8cfdbe89cc9ad7ce618ba34abc34bbb6c36d99e96cae2245b7943cd75ee773d0"
[[package]]
name = "windows_x86_64_gnu"
version = "0.32.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c912b12f7454c6620635bbff3450962753834be2a594819bd5e945af18ec64bc"
[[package]]
name = "windows_x86_64_gnu"
version = "0.33.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4dd9b0c0e9ece7bb22e84d70d01b71c6d6248b81a3c60d11869451b4cb24784"
[[package]]
name = "windows_x86_64_msvc"
version = "0.32.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "504a2476202769977a040c6364301a3f65d0cc9e3fb08600b2bda150a0488316"
[[package]]
name = "windows_x86_64_msvc"
version = "0.33.0"

View File

@ -1,6 +1,6 @@
[package]
name = "ffplayout-rs"
version = "0.4.0"
version = "0.5.0"
edition = "2021"
[dependencies]
@ -19,7 +19,7 @@ serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
serde_yaml = "0.8"
simplelog = { version = "^0.11.2", features = ["paris"] }
tokio = { version = "1.16.1", features = ["rt-multi-thread"] }
tokio = { version = "1.16.1", features = ["full"] }
walkdir = "2"
[target.x86_64-unknown-linux-musl.dependencies]

View File

@ -1,8 +1,8 @@
use std::{
io::{prelude::*, Error, Read},
io::{prelude::*, BufReader, Error, Read},
process::{Command, Stdio},
sync::{
mpsc::{channel, Receiver, Sender},
mpsc::{sync_channel, Receiver, SyncSender},
Arc, Mutex,
},
thread::sleep,
@ -14,11 +14,12 @@ use tokio::runtime::Runtime;
async fn ingest_server(
dec_setting: Vec<&str>,
ingest_sender: Sender<[u8; 65424]>,
ingest_sender: SyncSender<(usize, [u8; 65088])>,
proc_terminator: Arc<Mutex<Option<Terminator>>>,
is_terminated: Arc<Mutex<bool>>,
server_is_running: Arc<Mutex<bool>>,
) -> Result<(), Error> {
let mut buffer: [u8; 65424] = [0; 65424];
let mut buffer: [u8; 65088] = [0; 65088];
let filter = "[0:v]fps=25,scale=1024:576,setdar=dar=1.778[vout1]";
let mut filter_list = vec!["-filter_complex", &filter, "-map", "[vout1]", "-map", "0:a"];
let mut server_cmd = vec!["-hide_banner", "-nostats", "-v", "error"];
@ -36,6 +37,8 @@ async fn ingest_server(
server_cmd.append(&mut filter_list);
server_cmd.append(&mut dec_setting.clone());
let mut is_running;
loop {
if *is_terminated.lock().unwrap() {
break;
@ -55,39 +58,54 @@ async fn ingest_server(
let serv_terminator = server_proc.terminator()?;
*proc_terminator.lock().unwrap() = Some(serv_terminator);
let ingest_reader = server_proc.stdout.as_mut().unwrap();
is_running = false;
loop {
if *is_terminated.lock().unwrap() {
break;
}
match ingest_reader.read_exact(&mut buffer[..]) {
let bytes_len = match ingest_reader.read(&mut buffer[..]) {
Ok(length) => length,
Err(_) => break,
Err(e) => {
println!("Reading error from ingest server: {:?}", e);
break;
}
};
if let Err(e) = ingest_sender.send(buffer) {
println!("Ingest server error: {:?}", e);
if !is_running {
*server_is_running.lock().unwrap() = true;
is_running = true;
}
if bytes_len > 0 {
if let Err(e) = ingest_sender.send((bytes_len, buffer)) {
println!("Ingest server write error: {:?}", e);
*is_terminated.lock().unwrap() = true;
break;
}
} else {
break;
}
}
*server_is_running.lock().unwrap() = false;
sleep(Duration::from_secs(1));
if let Err(e) = server_proc.kill() {
print!("Ingest server {:?}", e)
};
if let Err(e) = server_proc.wait() {
panic!("Decoder error: {:?}", e)
};
}
println!("after server loop");
Ok(())
}
fn main() {
let decoder_term: Arc<Mutex<Option<Terminator>>> = Arc::new(Mutex::new(None));
let player_term: Arc<Mutex<Option<Terminator>>> = Arc::new(Mutex::new(None));
let server_term: Arc<Mutex<Option<Terminator>>> = Arc::new(Mutex::new(None));
let is_terminated: Arc<Mutex<bool>> = Arc::new(Mutex::new(false));
let server_is_running: Arc<Mutex<bool>> = Arc::new(Mutex::new(false));
let dec_setting: Vec<&str> = vec![
"-pix_fmt",
@ -117,7 +135,7 @@ fn main() {
"-",
];
let player_proc = match Command::new("ffplay")
let mut player_proc = match Command::new("ffplay")
.args(["-v", "error", "-hide_banner", "-nostats", "-i", "pipe:0"])
.stdin(Stdio::piped())
.spawn()
@ -126,13 +144,10 @@ fn main() {
Ok(proc) => proc,
};
let player_terminator = match player_proc.terminator() {
Ok(proc) => Some(proc),
Err(_) => None,
};
*player_term.lock().unwrap() = player_terminator;
let (ingest_sender, ingest_receiver): (Sender<[u8; 65424]>, Receiver<[u8; 65424]>) = channel();
let (ingest_sender, ingest_receiver): (
SyncSender<(usize, [u8; 65088])>,
Receiver<(usize, [u8; 65088])>,
) = sync_channel(1);
let runtime = Runtime::new().unwrap();
runtime.spawn(ingest_server(
@ -140,9 +155,10 @@ fn main() {
ingest_sender,
server_term.clone(),
is_terminated.clone(),
server_is_running.clone(),
));
let mut buffer: [u8; 65424] = [0; 65424];
let mut buffer: [u8; 65088] = [0; 65088];
let mut dec_cmd = vec![
"-v",
@ -152,11 +168,11 @@ fn main() {
"-f",
"lavfi",
"-i",
"testsrc=duration=20:size=1024x576:rate=25",
"testsrc=duration=120:size=1024x576:rate=25",
"-f",
"lavfi",
"-i",
"anoisesrc=d=20:c=pink:r=48000:a=0.5",
"anoisesrc=d=120:c=pink:r=48000:a=0.5",
];
dec_cmd.append(&mut dec_setting.clone());
@ -170,78 +186,77 @@ fn main() {
Ok(proc) => proc,
};
let dec_terminator = match dec_proc.terminator() {
Ok(proc) => Some(proc),
Err(_) => None,
};
*decoder_term.lock().unwrap() = dec_terminator;
let mut player_writer = player_proc.stdin.as_ref().unwrap();
let mut dec_reader = BufReader::new(dec_proc.stdout.take().unwrap());
let dec_reader = dec_proc.stdout.as_mut().unwrap();
let mut live_on = false;
'outer: loop {
let bytes_len = match dec_reader.read(&mut buffer[..]) {
Ok(length) => length,
Err(e) => panic!("Reading error from decoder: {:?}", e),
};
let mut count = 0;
if let Ok(receive) = ingest_receiver.try_recv() {
println!("in receiver");
if let Err(e) = player_writer.write_all(&receive) {
panic!("Err: {:?}", e)
loop {
count += 1;
if *server_is_running.lock().unwrap() {
if let Ok(receive) = ingest_receiver.try_recv() {
if let Err(e) = player_writer.write(&receive.1[..receive.0]) {
println!("Ingest receiver error: {:?}", e);
break;
};
}
if !live_on {
println!("Switch from offline source to live");
live_on = true;
}
} else {
println!("{count}");
let dec_bytes_len = match dec_reader.read(&mut buffer[..]) {
Ok(length) => length,
Err(e) => {
println!("Reading error from decoder: {:?}", e);
break;
}
};
continue;
if dec_bytes_len > 0 {
if let Err(e) = player_writer.write(&buffer[..dec_bytes_len]) {
println!("Encoder write error: {:?}", e);
break;
};
} else {
if live_on {
println!("Switch from live ingest to offline source");
live_on = false;
}
player_writer.flush().unwrap();
}
}
if let Err(e) = player_writer.write(&buffer[..bytes_len]) {
println!("write to player: {:?}", e);
break 'outer
};
if bytes_len == 0 {
break;
}
}
*is_terminated.lock().unwrap() = true;
if let Some(server) = &*server_term.lock().unwrap() {
unsafe {
if let Ok(_) = server.terminate() {
println!("Terminate ingest server done");
}
}
};
sleep(Duration::from_secs(1));
println!("Terminate decoder...");
match &*decoder_term.lock().unwrap() {
Some(dec) => unsafe {
if let Ok(_) = dec.terminate() {
println!("Terminate decoder done");
}
},
None => (),
match player_proc.kill() {
Ok(_) => println!("Playout done..."),
Err(e) => panic!("Encoder error: {:?}", e),
}
println!("Terminate encoder...");
match &*player_term.lock().unwrap() {
Some(enc) => unsafe {
if let Ok(_) = enc.terminate() {
println!("Terminate encoder done");
}
},
None => (),
}
println!("Terminate server...");
match &*server_term.lock().unwrap() {
Some(serv) => unsafe {
if let Ok(_) = serv.terminate() {
println!("Terminate server done");
}
},
None => (),
}
println!("Terminate done...");
if let Err(e) = player_proc.wait() {
println!("Encoder: {e}")
};
}

View File

@ -1,8 +1,8 @@
use std::{
io::{Error, Read},
io::{BufReader, Error, Read},
path::Path,
process::{Command, Stdio},
sync::{mpsc::Sender, Arc, Mutex},
sync::{mpsc::SyncSender, Arc, Mutex},
thread::sleep,
time::Duration,
};
@ -55,14 +55,14 @@ fn audio_filter(config: &GlobalConfig) -> String {
pub async fn ingest_server(
log_format: String,
ingest_sender: Sender<(usize, [u8; 32256])>,
ingest_sender: SyncSender<(usize, [u8; 65088])>,
rt_handle: Handle,
proc_terminator: Arc<Mutex<Option<Terminator>>>,
is_terminated: Arc<Mutex<bool>>,
server_is_running: Arc<Mutex<bool>>,
) -> Result<(), Error> {
let config = GlobalConfig::global();
let mut buffer: [u8; 32256] = [0; 32256];
let mut buffer: [u8; 65088] = [0; 65088];
let mut filter = format!(
"[0:v]fps={},scale={}:{},setdar=dar={}",
config.processing.fps,
@ -125,7 +125,7 @@ pub async fn ingest_server(
"Server".to_string(),
));
let ingest_reader = server_proc.stdout.as_mut().unwrap();
let mut ingest_reader = BufReader::new(server_proc.stdout.take().unwrap());
is_running = false;
loop {

View File

@ -5,7 +5,7 @@ use std::{
process,
process::{Command, Stdio},
sync::{
mpsc::{channel, Receiver, Sender},
mpsc::{channel, sync_channel, Receiver, SyncSender},
Arc, Mutex,
},
thread::sleep,
@ -33,7 +33,7 @@ pub fn play(rt_handle: &Handle) {
let mut init_playlist: Option<Arc<Mutex<bool>>> = None;
let mut live_on = false;
let mut buffer: [u8; 65424] = [0; 65424];
let mut buffer: [u8; 65088] = [0; 65088];
let get_source = match config.processing.clone().mode.as_str() {
"folder" => {
@ -83,9 +83,9 @@ pub fn play(rt_handle: &Handle) {
));
let (ingest_sender, ingest_receiver): (
Sender<(usize, [u8; 32256])>,
Receiver<(usize, [u8; 32256])>,
) = channel();
SyncSender<(usize, [u8; 65088])>,
Receiver<(usize, [u8; 65088])>,
) = sync_channel(4);
if config.ingest.enable {
rt_handle.spawn(ingest_server(
@ -162,12 +162,14 @@ pub fn play(rt_handle: &Handle) {
live_on = true;
if kill_dec {
info!("Switch from {} to live ingest", config.processing.mode);
if let Err(e) = dec_proc.kill() {
panic!("Decoder error: {:?}", e)
error!("Decoder error: {e}")
};
if let Err(e) = dec_proc.wait() {
panic!("Decoder error: {:?}", e)
error!("Decoder error: {e}")
};
kill_dec = false;