Merge pull request #1 from jb-alvarado/main

Ingest Live Stream
This commit is contained in:
jb-alvarado 2022-03-21 14:53:56 +01:00 committed by GitHub
commit 48b4d69281
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 972 additions and 142 deletions

56
Cargo.lock generated
View File

@ -143,7 +143,7 @@ dependencies = [
[[package]]
name = "ffplayout-rs"
version = "0.4.0"
version = "0.5.0"
dependencies = [
"chrono",
"clap",
@ -154,6 +154,7 @@ dependencies = [
"notify",
"once_cell",
"openssl",
"process_control",
"rand",
"regex",
"serde",
@ -758,6 +759,16 @@ dependencies = [
"unicode-xid",
]
[[package]]
name = "process_control"
version = "3.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26d4fa9c62a51815c9588b09a94f713c1e9a87d74142537d7c7d5ee972b8479f"
dependencies = [
"libc",
"windows-sys",
]
[[package]]
name = "quote"
version = "1.0.15"
@ -1121,6 +1132,49 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
[[package]]
name = "windows-sys"
version = "0.33.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "43dbb096663629518eb1dfa72d80243ca5a6aca764cae62a2df70af760a9be75"
dependencies = [
"windows_aarch64_msvc",
"windows_i686_gnu",
"windows_i686_msvc",
"windows_x86_64_gnu",
"windows_x86_64_msvc",
]
[[package]]
name = "windows_aarch64_msvc"
version = "0.33.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cd761fd3eb9ab8cc1ed81e56e567f02dd82c4c837e48ac3b2181b9ffc5060807"
[[package]]
name = "windows_i686_gnu"
version = "0.33.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cab0cf703a96bab2dc0c02c0fa748491294bf9b7feb27e1f4f96340f208ada0e"
[[package]]
name = "windows_i686_msvc"
version = "0.33.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8cfdbe89cc9ad7ce618ba34abc34bbb6c36d99e96cae2245b7943cd75ee773d0"
[[package]]
name = "windows_x86_64_gnu"
version = "0.33.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4dd9b0c0e9ece7bb22e84d70d01b71c6d6248b81a3c60d11869451b4cb24784"
[[package]]
name = "windows_x86_64_msvc"
version = "0.33.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ff1e4aa646495048ec7f3ffddc411e1d829c026a2ec62b39da15c1055e406eaa"
[[package]]
name = "ws2_32-sys"
version = "0.2.1"

View File

@ -1,6 +1,6 @@
[package]
name = "ffplayout-rs"
version = "0.4.0"
version = "0.5.0"
edition = "2021"
[dependencies]
@ -14,6 +14,7 @@ notify = "4.0.17"
rand = "0.8.5"
regex = "1"
once_cell = "1.10"
process_control = "3.3"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
serde_yaml = "0.8"

View File

@ -1,13 +1,187 @@
use std::{
io::{prelude::*, Read},
process::{Command, Stdio},
io::{prelude::*, BufReader, Error, Read},
process::{ChildStderr, Command, Stdio},
sync::{
mpsc::{channel, Receiver, Sender},
Arc, Mutex,
},
thread::sleep,
time::Duration,
};
use process_control::{ChildExt, Terminator};
use tokio::runtime::{Handle, Runtime};
async fn ingest_server(
dec_setting: Vec<&str>,
ingest_sender: Sender<[u8; 65424]>,
proc_terminator: Arc<Mutex<Option<Terminator>>>,
is_terminated: Arc<Mutex<bool>>,
rt_handle: Handle,
) -> Result<(), Error> {
let mut buffer: [u8; 65424] = [0; 65424];
let filter = "[0:v]fps=25,scale=1024:576,setdar=dar=1.778[vout1]";
let mut filter_list = vec!["-filter_complex", &filter, "-map", "[vout1]", "-map", "0:a"];
let mut server_cmd = vec!["-hide_banner", "-nostats", "-v", "level+error"];
let mut stream_input = vec![
"-f",
"live_flv",
"-listen",
"1",
"-i",
"rtmp://localhost:1936/live/stream",
];
server_cmd.append(&mut stream_input);
server_cmd.append(&mut filter_list);
server_cmd.append(&mut dec_setting.clone());
loop {
if *is_terminated.lock().unwrap() {
break;
}
let mut server_proc = match Command::new("ffmpeg")
.args(server_cmd.clone())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
{
Err(e) => {
panic!("couldn't spawn ingest server: {}", e)
}
Ok(proc) => proc,
};
let serv_terminator = server_proc.terminator()?;
*proc_terminator.lock().unwrap() = Some(serv_terminator);
rt_handle.spawn(stderr_reader(
server_proc.stderr.take().unwrap(),
"Server".to_string(),
proc_terminator.clone(),
is_terminated.clone(),
));
let ingest_reader = server_proc.stdout.as_mut().unwrap();
loop {
match ingest_reader.read_exact(&mut buffer[..]) {
Ok(length) => length,
Err(_) => break,
};
if let Err(e) = ingest_sender.send(buffer) {
println!("Ingest server error: {:?}", e);
break;
}
}
sleep(Duration::from_secs(1));
if let Err(e) = server_proc.wait() {
panic!("Server error: {:?}", e)
};
}
Ok(())
}
pub async fn stderr_reader(
std_errors: ChildStderr,
suffix: String,
server_term: Arc<Mutex<Option<Terminator>>>,
is_terminated: Arc<Mutex<bool>>,
) -> Result<(), Error> {
// read ffmpeg stderr decoder and encoder instance
// and log the output
fn format_line(line: String, level: String) -> String {
line.replace(&format!("[{}] ", level), "")
}
let buffer = BufReader::new(std_errors);
for line in buffer.lines() {
let line = line?;
if line.contains("[info]") {
println!("[{suffix}] {}", format_line(line, "info".to_string()))
} else if line.contains("[warning]") {
println!("[{suffix}] {}", format_line(line, "warning".to_string()))
} else {
if suffix != "server" && !line.contains("Input/output error") {
println!(
"[{suffix}] {}",
format_line(line.clone(), "level+error".to_string())
);
}
if line.contains("Error closing file pipe:: Broken pipe") {
*is_terminated.lock().unwrap() = true;
match &*server_term.lock().unwrap() {
Some(serv) => unsafe {
if let Ok(_) = serv.terminate() {
println!("Terminate server done");
}
},
None => (),
}
}
}
}
Ok(())
}
fn main() {
let mut enc_proc = match Command::new("ffplay")
.args(["-v", "error", "-hide_banner", "-nostats", "-i", "pipe:0"])
let decoder_term: Arc<Mutex<Option<Terminator>>> = Arc::new(Mutex::new(None));
let player_term: Arc<Mutex<Option<Terminator>>> = Arc::new(Mutex::new(None));
let server_term: Arc<Mutex<Option<Terminator>>> = Arc::new(Mutex::new(None));
let is_terminated: Arc<Mutex<bool>> = Arc::new(Mutex::new(false));
let runtime = Runtime::new().unwrap();
let rt_handle = runtime.handle();
let dec_setting: Vec<&str> = vec![
"-pix_fmt",
"yuv420p",
"-c:v",
"mpeg2video",
"-g",
"1",
"-b:v",
"50000k",
"-minrate",
"50000k",
"-maxrate",
"50000k",
"-bufsize",
"25000k",
"-c:a",
"s302m",
"-strict",
"-2",
"-ar",
"48000",
"-ac",
"2",
"-f",
"mpegts",
"-",
];
let mut player_proc = match Command::new("ffplay")
.args([
"-v",
"level+error",
"-hide_banner",
"-nostats",
"-i",
"pipe:0",
])
.stdin(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
@ -16,44 +190,49 @@ fn main() {
Ok(proc) => proc,
};
rt_handle.spawn(stderr_reader(
player_proc.stderr.take().unwrap(),
"Player".to_string(),
server_term.clone(),
is_terminated.clone(),
));
let player_terminator = match player_proc.terminator() {
Ok(proc) => Some(proc),
Err(_) => None,
};
*player_term.lock().unwrap() = player_terminator;
let (ingest_sender, ingest_receiver): (Sender<[u8; 65424]>, Receiver<[u8; 65424]>) = channel();
rt_handle.spawn(ingest_server(
dec_setting.clone(),
ingest_sender,
server_term.clone(),
is_terminated.clone(),
rt_handle.clone(),
));
let mut buffer: [u8; 65424] = [0; 65424];
let mut dec_cmd = vec![
"-v",
"level+error",
"-hide_banner",
"-nostats",
"-f",
"lavfi",
"-i",
"testsrc=duration=20:size=1024x576:rate=25",
"-f",
"lavfi",
"-i",
"anoisesrc=d=20:c=pink:r=48000:a=0.5",
];
dec_cmd.append(&mut dec_setting.clone());
let mut dec_proc = match Command::new("ffmpeg")
.args([
"-f",
"lavfi",
"-i",
"testsrc=duration=6:size=1280x720:rate=25",
"-f",
"lavfi",
"-i",
"anoisesrc=d=6:c=pink:r=48000:a=0.5",
"-pix_fmt",
"yuv420p",
"-c:v",
"mpeg2video",
"-g",
"1",
"-b:v",
"50000k",
"-minrate",
"50000k",
"-maxrate",
"50000k",
"-bufsize",
"25000k",
"-c:a",
"s302m",
"-strict",
"-2",
"-ar",
"48000",
"-ac",
"2",
"-f",
"mpegts",
"-",
])
.args(dec_cmd)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
@ -62,18 +241,37 @@ fn main() {
Ok(proc) => proc,
};
let mut enc_writer = enc_proc.stdin.as_ref().unwrap();
rt_handle.spawn(stderr_reader(
dec_proc.stderr.take().unwrap(),
"Decoder".to_string(),
server_term.clone(),
is_terminated.clone(),
));
let dec_terminator = match dec_proc.terminator() {
Ok(proc) => Some(proc),
Err(_) => None,
};
*decoder_term.lock().unwrap() = dec_terminator;
let mut player_writer = player_proc.stdin.as_ref().unwrap();
let dec_reader = dec_proc.stdout.as_mut().unwrap();
loop {
let bytes_len = match dec_reader.read(&mut buffer[..]) {
Ok(length) => length,
Err(e) => panic!("Reading error from decoder: {:?}", e)
Err(e) => panic!("Reading error from decoder: {:?}", e),
};
match enc_writer.write(&buffer[..bytes_len]) {
Ok(_) => (),
Err(e) => panic!("Err: {:?}", e),
if let Ok(receive) = ingest_receiver.try_recv() {
if let Err(e) = player_writer.write_all(&receive) {
panic!("Err: {:?}", e)
};
continue;
}
if let Err(e) = player_writer.write(&buffer[..bytes_len]) {
panic!("Err: {:?}", e)
};
if bytes_len == 0 {
@ -81,15 +279,42 @@ fn main() {
}
}
match dec_proc.wait() {
Ok(_) => println!("decoding done..."),
Err(e) => panic!("Enc error: {:?}", e),
}
*is_terminated.lock().unwrap() = true;
sleep(Duration::from_secs(1));
match enc_proc.kill() {
Ok(_) => println!("Playout done..."),
Err(e) => panic!("Enc error: {:?}", e),
println!("Terminate decoder...");
match &*decoder_term.lock().unwrap() {
Some(dec) => unsafe {
if let Ok(_) = dec.terminate() {
println!("Terminate decoder done");
}
},
None => (),
}
println!("Terminate encoder...");
match &*player_term.lock().unwrap() {
Some(enc) => unsafe {
if let Ok(_) = enc.terminate() {
println!("Terminate encoder done");
}
},
None => (),
}
println!("Terminate server...");
match &*server_term.lock().unwrap() {
Some(serv) => unsafe {
if let Ok(_) = serv.terminate() {
println!("Terminate server done");
}
},
None => (),
}
println!("Terminate done...");
}

262
examples/pipe_ffmpeg2.rs Normal file
View File

@ -0,0 +1,262 @@
use std::{
io::{prelude::*, BufReader, Error, Read},
process::{Command, Stdio},
sync::{
mpsc::{sync_channel, Receiver, SyncSender},
Arc, Mutex,
},
thread::sleep,
time::Duration,
};
use process_control::{ChildExt, Terminator};
use tokio::runtime::Runtime;
async fn ingest_server(
dec_setting: Vec<&str>,
ingest_sender: SyncSender<(usize, [u8; 65088])>,
proc_terminator: Arc<Mutex<Option<Terminator>>>,
is_terminated: Arc<Mutex<bool>>,
server_is_running: Arc<Mutex<bool>>,
) -> Result<(), Error> {
let mut buffer: [u8; 65088] = [0; 65088];
let filter = "[0:v]fps=25,scale=1024:576,setdar=dar=1.778[vout1]";
let mut filter_list = vec!["-filter_complex", &filter, "-map", "[vout1]", "-map", "0:a"];
let mut server_cmd = vec!["-hide_banner", "-nostats", "-v", "error"];
let mut stream_input = vec![
"-f",
"live_flv",
"-listen",
"1",
"-i",
"rtmp://localhost:1936/live/stream",
];
server_cmd.append(&mut stream_input);
server_cmd.append(&mut filter_list);
server_cmd.append(&mut dec_setting.clone());
let mut is_running;
loop {
if *is_terminated.lock().unwrap() {
break;
}
let mut server_proc = match Command::new("ffmpeg")
.args(server_cmd.clone())
.stdout(Stdio::piped())
.spawn()
{
Err(e) => {
panic!("couldn't spawn ingest server: {}", e)
}
Ok(proc) => proc,
};
let serv_terminator = server_proc.terminator()?;
*proc_terminator.lock().unwrap() = Some(serv_terminator);
let ingest_reader = server_proc.stdout.as_mut().unwrap();
is_running = false;
loop {
let bytes_len = match ingest_reader.read(&mut buffer[..]) {
Ok(length) => length,
Err(e) => {
println!("Reading error from ingest server: {:?}", e);
break;
}
};
if !is_running {
*server_is_running.lock().unwrap() = true;
is_running = true;
}
if bytes_len > 0 {
if let Err(e) = ingest_sender.send((bytes_len, buffer)) {
println!("Ingest server write error: {:?}", e);
*is_terminated.lock().unwrap() = true;
break;
}
} else {
break;
}
}
*server_is_running.lock().unwrap() = false;
sleep(Duration::from_secs(1));
if let Err(e) = server_proc.kill() {
print!("Ingest server {:?}", e)
};
if let Err(e) = server_proc.wait() {
panic!("Decoder error: {:?}", e)
};
}
Ok(())
}
fn main() {
let server_term: Arc<Mutex<Option<Terminator>>> = Arc::new(Mutex::new(None));
let is_terminated: Arc<Mutex<bool>> = Arc::new(Mutex::new(false));
let server_is_running: Arc<Mutex<bool>> = Arc::new(Mutex::new(false));
let dec_setting: Vec<&str> = vec![
"-pix_fmt",
"yuv420p",
"-c:v",
"mpeg2video",
"-g",
"1",
"-b:v",
"50000k",
"-minrate",
"50000k",
"-maxrate",
"50000k",
"-bufsize",
"25000k",
"-c:a",
"s302m",
"-strict",
"-2",
"-ar",
"48000",
"-ac",
"2",
"-f",
"mpegts",
"-",
];
let mut player_proc = match Command::new("ffplay")
.args(["-v", "error", "-hide_banner", "-nostats", "-i", "pipe:0"])
.stdin(Stdio::piped())
.spawn()
{
Err(e) => panic!("couldn't spawn ffplay: {}", e),
Ok(proc) => proc,
};
let (ingest_sender, ingest_receiver): (
SyncSender<(usize, [u8; 65088])>,
Receiver<(usize, [u8; 65088])>,
) = sync_channel(1);
let runtime = Runtime::new().unwrap();
runtime.spawn(ingest_server(
dec_setting.clone(),
ingest_sender,
server_term.clone(),
is_terminated.clone(),
server_is_running.clone(),
));
let mut buffer: [u8; 65088] = [0; 65088];
let mut dec_cmd = vec![
"-v",
"error",
"-hide_banner",
"-nostats",
"-f",
"lavfi",
"-i",
"testsrc=duration=120:size=1024x576:rate=25",
"-f",
"lavfi",
"-i",
"anoisesrc=d=120:c=pink:r=48000:a=0.5",
];
dec_cmd.append(&mut dec_setting.clone());
let mut dec_proc = match Command::new("ffmpeg")
.args(dec_cmd)
.stdout(Stdio::piped())
.spawn()
{
Err(e) => panic!("couldn't spawn ffmpeg: {}", e),
Ok(proc) => proc,
};
let mut player_writer = player_proc.stdin.as_ref().unwrap();
let mut dec_reader = BufReader::new(dec_proc.stdout.take().unwrap());
let mut live_on = false;
let mut count = 0;
loop {
count += 1;
if *server_is_running.lock().unwrap() {
if let Ok(receive) = ingest_receiver.try_recv() {
if let Err(e) = player_writer.write(&receive.1[..receive.0]) {
println!("Ingest receiver error: {:?}", e);
break;
};
}
if !live_on {
println!("Switch from offline source to live");
live_on = true;
}
} else {
println!("{count}");
let dec_bytes_len = match dec_reader.read(&mut buffer[..]) {
Ok(length) => length,
Err(e) => {
println!("Reading error from decoder: {:?}", e);
break;
}
};
if dec_bytes_len > 0 {
if let Err(e) = player_writer.write(&buffer[..dec_bytes_len]) {
println!("Encoder write error: {:?}", e);
break;
};
} else {
if live_on {
println!("Switch from live ingest to offline source");
live_on = false;
}
player_writer.flush().unwrap();
}
}
}
*is_terminated.lock().unwrap() = true;
if let Some(server) = &*server_term.lock().unwrap() {
unsafe {
if let Ok(_) = server.terminate() {
println!("Terminate ingest server done");
}
}
};
sleep(Duration::from_secs(1));
match player_proc.kill() {
Ok(_) => println!("Playout done..."),
Err(e) => panic!("Encoder error: {:?}", e),
}
if let Err(e) = player_proc.wait() {
println!("Encoder: {e}")
};
}

View File

@ -201,17 +201,13 @@ fn add_text(node: &mut Media, chain: &mut Filters, config: &GlobalConfig) {
chain.add_filter(filter, "video".into());
match &chain.video_chain {
Some(filters) => {
for (i, f) in filters.split(",").enumerate() {
if f.contains("drawtext") && !config.text.text_from_filename {
debug!("drawtext node is on index: <yellow>{i}</>");
break;
}
if let Some(filters) = &chain.video_chain {
for (i, f) in filters.split(",").enumerate() {
if f.contains("drawtext") && !config.text.text_from_filename {
debug!("drawtext node is on index: <yellow>{i}</>");
break;
}
}
None => (),
}
}
}
@ -249,9 +245,10 @@ fn extend_audio(node: &mut Media, chain: &mut Filters) {
fn add_loudnorm(node: &mut Media, chain: &mut Filters, config: &GlobalConfig) {
// add single pass loudnorm filter to audio line
let audio_streams = node.probe.clone().unwrap().audio_streams.unwrap();
if audio_streams.len() != 0 && config.processing.add_loudnorm {
if node.probe.is_some()
&& node.probe.clone().unwrap().audio_streams.unwrap().len() > 0
&& config.processing.add_loudnorm
{
let loud_filter = format!(
"loudnorm=I={}:TP={}:LRA={}",
config.processing.loud_i, config.processing.loud_tp, config.processing.loud_lra
@ -318,15 +315,16 @@ pub fn filter_chains(node: &mut Media) -> Vec<String> {
&config,
);
extend_video(node, &mut filters);
add_text(node, &mut filters, &config);
add_audio(node, &mut filters);
extend_audio(node, &mut filters);
add_loudnorm(node, &mut filters, &config);
}
add_text(node, &mut filters, &config);
fade(node, &mut filters, "video".into());
overlay(node, &mut filters, &config);
add_loudnorm(node, &mut filters, &config);
fade(node, &mut filters, "audio".into());
audio_volume(&mut filters, &config);

172
src/input/ingest.rs Normal file
View File

@ -0,0 +1,172 @@
use std::{
io::{BufReader, Error, Read},
path::Path,
process::{Command, Stdio},
sync::{mpsc::SyncSender, Arc, Mutex},
thread::sleep,
time::Duration,
};
use process_control::{ChildExt, Terminator};
use simplelog::*;
use tokio::runtime::Handle;
use crate::utils::{stderr_reader, GlobalConfig};
fn overlay(config: &GlobalConfig) -> String {
let mut logo_chain = String::new();
if config.processing.add_logo && Path::new(&config.processing.logo).is_file() {
let opacity = format!(
"format=rgba,colorchannelmixer=aa={}",
config.processing.logo_opacity
);
let logo_loop = "loop=loop=-1:size=1:start=0";
logo_chain = format!("[v];movie={},{logo_loop},{opacity}", config.processing.logo);
logo_chain
.push_str(format!("[l];[v][l]{}:shortest=1", config.processing.logo_filter).as_str());
}
logo_chain
}
fn audio_filter(config: &GlobalConfig) -> String {
let mut audio_chain = ";[0:a]anull".to_string();
if config.processing.add_loudnorm {
audio_chain.push_str(
format!(
",loudnorm=I={}:TP={}:LRA={}",
config.processing.loud_i, config.processing.loud_tp, config.processing.loud_lra
)
.as_str(),
);
}
if config.processing.volume != 1.0 {
audio_chain.push_str(format!(",volume={}", config.processing.volume).as_str());
}
audio_chain.push_str("[aout1]");
audio_chain
}
pub async fn ingest_server(
log_format: String,
ingest_sender: SyncSender<(usize, [u8; 65088])>,
rt_handle: Handle,
proc_terminator: Arc<Mutex<Option<Terminator>>>,
is_terminated: Arc<Mutex<bool>>,
server_is_running: Arc<Mutex<bool>>,
) -> Result<(), Error> {
let config = GlobalConfig::global();
let mut buffer: [u8; 65088] = [0; 65088];
let mut filter = format!(
"[0:v]fps={},scale={}:{},setdar=dar={}",
config.processing.fps,
config.processing.width,
config.processing.height,
config.processing.aspect
);
filter.push_str(&overlay(&config));
filter.push_str("[vout1]");
filter.push_str(audio_filter(&config).as_str());
let mut filter_list = vec![
"-filter_complex",
&filter,
"-map",
"[vout1]",
"-map",
"[aout1]",
];
let mut server_cmd = vec!["-hide_banner", "-nostats", "-v", log_format.as_str()];
let stream_input = config.ingest.stream_input.clone();
let stream_settings = config.processing.settings.clone().unwrap();
server_cmd.append(&mut stream_input.iter().map(String::as_str).collect());
server_cmd.append(&mut filter_list);
server_cmd.append(&mut stream_settings.iter().map(String::as_str).collect());
let mut is_running;
info!(
"Start ingest server, listening on: <b><magenta>{}</></b>",
stream_input.last().unwrap()
);
debug!("Server CMD: <bright-blue>{:?}</>", server_cmd);
loop {
if *is_terminated.lock().unwrap() {
break;
}
let mut server_proc = match Command::new("ffmpeg")
.args(server_cmd.clone())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
{
Err(e) => {
error!("couldn't spawn ingest server: {}", e);
panic!("couldn't spawn ingest server: {}", e)
}
Ok(proc) => proc,
};
let serv_terminator = server_proc.terminator()?;
*proc_terminator.lock().unwrap() = Some(serv_terminator);
rt_handle.spawn(stderr_reader(
server_proc.stderr.take().unwrap(),
"Server".to_string(),
));
let mut ingest_reader = BufReader::new(server_proc.stdout.take().unwrap());
is_running = false;
loop {
let bytes_len = match ingest_reader.read(&mut buffer[..]) {
Ok(length) => length,
Err(e) => {
debug!("Ingest server read {:?}", e);
break;
}
};
if !is_running {
*server_is_running.lock().unwrap() = true;
is_running = true;
}
if bytes_len > 0 {
if let Err(e) = ingest_sender.send((bytes_len, buffer)) {
error!("Ingest server write error: {:?}", e);
*is_terminated.lock().unwrap() = true;
break;
}
} else {
break;
}
}
*server_is_running.lock().unwrap() = false;
sleep(Duration::from_secs(1));
if let Err(e) = server_proc.kill() {
error!("Ingest server {:?}", e)
};
if let Err(e) = server_proc.wait() {
error!("Ingest server {:?}", e)
};
}
Ok(())
}

7
src/input/mod.rs Normal file
View File

@ -0,0 +1,7 @@
pub mod folder;
pub mod ingest;
pub mod playlist;
pub use ingest::ingest_server;
pub use folder::{watch_folder, Source};
pub use playlist::CurrentProgram;

View File

@ -1,4 +1,7 @@
use std::path::Path;
use std::{
path::Path,
sync::{Arc, Mutex},
};
use simplelog::*;
use tokio::runtime::Handle;
@ -16,15 +19,16 @@ pub struct CurrentProgram {
json_path: Option<String>,
nodes: Vec<Media>,
current_node: Media,
init: bool,
pub init: Arc<Mutex<bool>>,
index: usize,
rt_handle: Handle,
is_terminated: Arc<Mutex<bool>>,
}
impl CurrentProgram {
pub fn new(rt_handle: Handle) -> Self {
pub fn new(rt_handle: Handle, is_terminated: Arc<Mutex<bool>>) -> Self {
let config = GlobalConfig::global();
let json = read_json(rt_handle.clone(), true, 0.0);
let json = read_json(rt_handle.clone(), is_terminated.clone(), true, 0.0);
Self {
config: config.clone(),
@ -33,15 +37,21 @@ impl CurrentProgram {
json_path: json.current_file,
nodes: json.program,
current_node: Media::new(0, "".to_string()),
init: true,
init: Arc::new(Mutex::new(true)),
index: 0,
rt_handle,
is_terminated,
}
}
fn check_update(&mut self, seek: bool) {
if self.json_path.is_none() {
let json = read_json(self.rt_handle.clone(), seek, 0.0);
let json = read_json(
self.rt_handle.clone(),
self.is_terminated.clone(),
seek,
0.0,
);
self.json_path = json.current_file;
self.json_mod = json.modified;
@ -55,7 +65,12 @@ impl CurrentProgram {
.eq(&self.json_mod.clone().unwrap())
{
// when playlist has changed, reload it
let json = read_json(self.rt_handle.clone(), false, 0.0);
let json = read_json(
self.rt_handle.clone(),
self.is_terminated.clone(),
false,
0.0,
);
self.json_mod = json.modified;
self.nodes = json.program;
@ -73,7 +88,7 @@ impl CurrentProgram {
self.json_path = None;
self.nodes = vec![media.clone()];
self.current_node = media;
self.init = true;
*self.init.lock().unwrap() = true;
self.index = 0;
}
}
@ -95,7 +110,12 @@ impl CurrentProgram {
|| is_close(total_delta, 0.0, 2.0)
|| is_close(total_delta, target_length, 2.0)
{
let json = read_json(self.rt_handle.clone(), false, next_start);
let json = read_json(
self.rt_handle.clone(),
self.is_terminated.clone(),
false,
next_start,
);
self.json_path = json.current_file.clone();
self.json_mod = json.modified;
@ -103,7 +123,7 @@ impl CurrentProgram {
self.index = 0;
if json.current_file.is_none() {
self.init = true;
*self.init.lock().unwrap() = true;
}
}
}
@ -135,18 +155,19 @@ impl CurrentProgram {
time_sec += self.config.playlist.length_sec.unwrap()
}
let mut start_sec = self.start_sec.clone();
for (i, item) in self.nodes.iter_mut().enumerate() {
if start_sec + item.out - item.seek > time_sec {
self.init = false;
if item.begin.unwrap() + item.out - item.seek > time_sec {
*self.init.lock().unwrap() = false;
self.index = i + 1;
item.seek = time_sec - start_sec;
self.current_node = handle_list_init(item.clone());
// de-instance node to preserve original values in list
let mut node_clone = item.clone();
node_clone.seek = time_sec - node_clone.begin.unwrap();
self.current_node = handle_list_init(node_clone);
break;
}
start_sec += item.out - item.seek;
}
}
}
@ -155,7 +176,7 @@ impl Iterator for CurrentProgram {
type Item = Media;
fn next(&mut self) -> Option<Self::Item> {
if self.init {
if *self.init.lock().unwrap() {
debug!("Playlist init");
self.check_update(true);
@ -163,7 +184,7 @@ impl Iterator for CurrentProgram {
self.get_init_clip();
}
if self.init {
if *self.init.lock().unwrap() {
// on init load playlist, could be not long enough,
// so we check if we can take the next playlist already,
// or we fill the gap with a dummy.
@ -185,7 +206,7 @@ impl Iterator for CurrentProgram {
if DUMMY_LEN > total_delta {
duration = total_delta;
self.init = false;
*self.init.lock().unwrap() = false;
}
if self.config.playlist.start_sec.unwrap() > current_time {
@ -225,8 +246,7 @@ impl Iterator for CurrentProgram {
} else {
let last_playlist = self.json_path.clone();
self.check_for_next_playlist();
let (_, total_delta) =
get_delta(&self.config.playlist.start_sec.unwrap());
let (_, total_delta) = get_delta(&self.config.playlist.start_sec.unwrap());
let mut last_ad = self.is_ad(self.index, false);
if last_playlist == self.json_path

View File

@ -2,6 +2,7 @@ extern crate log;
extern crate simplelog;
mod filter;
mod input;
mod output;
mod utils;

View File

@ -1,28 +1,39 @@
use notify::{watcher, RecursiveMode, Watcher};
use std::{
io::{prelude::*, Read},
io::{prelude::*, BufReader, Read},
path::Path,
process,
process::{Command, Stdio},
sync::{mpsc::channel, Arc, Mutex},
sync::{
mpsc::{channel, sync_channel, Receiver, SyncSender},
Arc, Mutex,
},
thread::sleep,
time::Duration,
};
use process_control::Terminator;
use simplelog::*;
use tokio::runtime::Handle;
mod desktop;
mod stream;
use crate::utils::{
sec_to_time, stderr_reader, watch_folder, CurrentProgram, GlobalConfig, Media, Source,
};
use crate::input::{ingest_server, watch_folder, CurrentProgram, Source};
use crate::utils::{sec_to_time, stderr_reader, GlobalConfig, Media};
pub fn play(rt_handle: &Handle) {
let config = GlobalConfig::global();
let dec_settings = config.processing.clone().settings.unwrap();
let ff_log_format = format!("level+{}", config.logging.ffmpeg_level.to_lowercase());
let dec_pid: Arc<Mutex<u32>> = Arc::new(Mutex::new(0));
let server_term: Arc<Mutex<Option<Terminator>>> = Arc::new(Mutex::new(None));
let is_terminated: Arc<Mutex<bool>> = Arc::new(Mutex::new(false));
let server_is_running: Arc<Mutex<bool>> = Arc::new(Mutex::new(false));
let mut init_playlist: Option<Arc<Mutex<bool>>> = None;
let mut live_on = false;
let mut buffer: [u8; 65088] = [0; 65088];
let get_source = match config.processing.clone().mode.as_str() {
"folder" => {
@ -50,7 +61,9 @@ pub fn play(rt_handle: &Handle) {
}
"playlist" => {
info!("Playout in playlist mode");
Box::new(CurrentProgram::new(rt_handle.clone())) as Box<dyn Iterator<Item = Media>>
let program = CurrentProgram::new(rt_handle.clone(), is_terminated.clone());
init_playlist = Some(program.init.clone());
Box::new(program) as Box<dyn Iterator<Item = Media>>
}
_ => {
error!("Process Mode not exists!");
@ -58,9 +71,6 @@ pub fn play(rt_handle: &Handle) {
}
};
let dec_settings = config.processing.clone().settings.unwrap();
let ff_log_format = format!("level+{}", config.logging.ffmpeg_level.to_lowercase());
let mut enc_proc = match config.out.mode.as_str() {
"desktop" => desktop::output(ff_log_format.clone()),
"stream" => stream::output(ff_log_format.clone()),
@ -72,9 +82,23 @@ pub fn play(rt_handle: &Handle) {
"Encoder".to_string(),
));
let mut buffer: [u8; 65424] = [0; 65424];
let (ingest_sender, ingest_receiver): (
SyncSender<(usize, [u8; 65088])>,
Receiver<(usize, [u8; 65088])>,
) = sync_channel(4);
for node in get_source {
if config.ingest.enable {
rt_handle.spawn(ingest_server(
ff_log_format.clone(),
ingest_sender,
rt_handle.clone(),
server_term.clone(),
is_terminated.clone(),
server_is_running.clone(),
));
}
'source_iter: for node in get_source {
let cmd = match node.cmd {
Some(cmd) => cmd,
None => break,
@ -91,8 +115,7 @@ pub fn play(rt_handle: &Handle) {
);
let filter = node.filter.unwrap();
let mut dec_cmd = vec!["-v", ff_log_format.as_str(), "-hide_banner", "-nostats"];
let mut dec_cmd = vec!["-hide_banner", "-nostats", "-v", ff_log_format.as_str()];
dec_cmd.append(&mut cmd.iter().map(String::as_str).collect());
@ -116,31 +139,73 @@ pub fn play(rt_handle: &Handle) {
Ok(proc) => proc,
};
*dec_pid.lock().unwrap() = dec_proc.id();
let mut enc_writer = enc_proc.stdin.as_ref().unwrap();
let dec_reader = dec_proc.stdout.as_mut().unwrap();
// debug!("Decoder PID: <yellow>{}</>", dec_pid.lock().unwrap());
let mut dec_reader = BufReader::new(dec_proc.stdout.take().unwrap());
rt_handle.spawn(stderr_reader(
dec_proc.stderr.take().unwrap(),
"Decoder".to_string(),
));
let mut kill_dec = true;
loop {
let dec_bytes_len = match dec_reader.read(&mut buffer[..]) {
Ok(length) => length,
Err(e) => panic!("Reading error from decoder: {:?}", e),
};
if *server_is_running.lock().unwrap() {
if let Ok(receive) = ingest_receiver.try_recv() {
if let Err(e) = enc_writer.write(&receive.1[..receive.0]) {
error!("Ingest receiver error: {:?}", e);
if let Err(e) = enc_writer.write(&buffer[..dec_bytes_len]) {
panic!("Err: {:?}", e)
};
break 'source_iter;
};
}
if dec_bytes_len == 0 {
break;
};
live_on = true;
if kill_dec {
info!("Switch from {} to live ingest", config.processing.mode);
if let Err(e) = dec_proc.kill() {
error!("Decoder error: {e}")
};
if let Err(e) = dec_proc.wait() {
error!("Decoder error: {e}")
};
kill_dec = false;
if let Some(init) = &init_playlist {
*init.lock().unwrap() = true;
}
}
} else {
let dec_bytes_len = match dec_reader.read(&mut buffer[..]) {
Ok(length) => length,
Err(e) => {
error!("Reading error from decoder: {:?}", e);
break 'source_iter;
}
};
if dec_bytes_len > 0 {
if let Err(e) = enc_writer.write(&buffer[..dec_bytes_len]) {
error!("Encoder write error: {:?}", e);
break 'source_iter;
};
} else {
if live_on {
info!("Switch from live ingest to {}", config.processing.mode);
live_on = false;
}
enc_writer.flush().unwrap();
break;
}
}
}
if let Err(e) = dec_proc.wait() {
@ -148,10 +213,24 @@ pub fn play(rt_handle: &Handle) {
};
}
*is_terminated.lock().unwrap() = true;
if let Some(server) = &*server_term.lock().unwrap() {
unsafe {
if let Ok(_) = server.terminate() {
info!("Terminate ingest server done");
}
}
};
sleep(Duration::from_secs(1));
match enc_proc.kill() {
Ok(_) => info!("Playout done..."),
Err(e) => panic!("Encoder error: {:?}", e),
}
if let Err(e) = enc_proc.wait() {
error!("Encoder: {e}")
};
}

View File

@ -1,5 +1,5 @@
use serde::{Deserialize, Serialize};
use std::{fs::File, path::Path};
use std::{fs::File, path::Path, sync::{Arc, Mutex}};
use simplelog::*;
use tokio::runtime::Handle;
@ -33,7 +33,7 @@ impl Playlist {
}
}
pub fn read_json(rt_handle: Handle, seek: bool, next_start: f64) -> Playlist {
pub fn read_json(rt_handle: Handle, is_terminated: Arc<Mutex<bool>>, seek: bool, next_start: f64) -> Playlist {
let config = GlobalConfig::global();
let mut playlist_path = Path::new(&config.playlist.path).to_owned();
@ -82,7 +82,7 @@ pub fn read_json(rt_handle: Handle, seek: bool, next_start: f64) -> Playlist {
start_sec += item.out - item.seek;
}
rt_handle.spawn(validate_playlist(playlist.clone(), config.clone()));
rt_handle.spawn(validate_playlist(playlist.clone(), is_terminated, config.clone()));
playlist
}

View File

@ -1,10 +1,10 @@
use std::path::Path;
use std::{path::Path, sync::{Arc, Mutex},};
use simplelog::*;
use crate::utils::{sec_to_time, GlobalConfig, MediaProbe, Playlist};
pub async fn validate_playlist(playlist: Playlist, config: GlobalConfig) {
pub async fn validate_playlist(playlist: Playlist, is_terminated: Arc<Mutex<bool>>, config: GlobalConfig) {
let date = playlist.date;
let length = config.playlist.length_sec.unwrap();
let mut start_sec = 0.0;
@ -12,6 +12,10 @@ pub async fn validate_playlist(playlist: Playlist, config: GlobalConfig) {
debug!("validate playlist from: <yellow>{date}</>");
for item in playlist.program.iter() {
if *is_terminated.lock().unwrap() {
break
}
if Path::new(&item.source).is_file() {
let probe = MediaProbe::new(item.source.clone());
@ -33,7 +37,7 @@ pub async fn validate_playlist(playlist: Playlist, config: GlobalConfig) {
start_sec += item.out - item.seek;
}
if length > start_sec {
if length > start_sec + 1.0 && !*is_terminated.lock().unwrap() {
error!(
"Playlist from <yellow>{date}</> not long enough, <yellow>{}</> needed!",
sec_to_time(length - start_sec),

View File

@ -15,19 +15,15 @@ use simplelog::*;
mod arg_parse;
mod config;
mod folder;
mod json_reader;
pub mod json_reader;
mod json_validate;
mod logging;
mod playlist;
pub use arg_parse::get_args;
pub use config::{init_config, GlobalConfig};
pub use folder::{watch_folder, Source};
pub use json_reader::{read_json, DUMMY_LEN, Playlist};
pub use json_reader::{read_json, Playlist, DUMMY_LEN};
pub use json_validate::validate_playlist;
pub use logging::init_logging;
pub use playlist::CurrentProgram;
use crate::filter::filter_chains;
@ -80,11 +76,11 @@ impl Media {
}
}
fn add_probe(&mut self) {
pub fn add_probe(&mut self) {
self.probe = Some(MediaProbe::new(self.source.clone()))
}
fn add_filter(&mut self) {
pub fn add_filter(&mut self) {
let mut node = self.clone();
self.filter = Some(filter_chains(&mut node))
}
@ -205,11 +201,11 @@ pub fn time_to_sec(time_str: &String) -> f64 {
}
pub fn sec_to_time(sec: f64) -> String {
let d = UNIX_EPOCH + time::Duration::from_secs(sec as u64);
let d = UNIX_EPOCH + time::Duration::from_millis((sec * 1000.0) as u64);
// Create DateTime from SystemTime
let date_time = DateTime::<Utc>::from(d);
date_time.format("%H:%M:%S").to_string()
date_time.format("%H:%M:%S%.3f").to_string()
}
pub fn is_close(a: f64, b: f64, to: f64) -> bool {
@ -256,7 +252,7 @@ pub fn check_sync(delta: f64) -> bool {
let config = GlobalConfig::global();
if delta.abs() > config.general.stop_threshold && config.general.stop_threshold > 0.0 {
error!("Start time out of sync for <yellow>{}</> seconds", delta);
error!("Clip begin out of sync for <yellow>{}</> seconds", delta);
return false;
}
@ -311,10 +307,7 @@ pub fn seek_and_length(src: String, seek: f64, out: f64, duration: f64) -> Vec<S
source_cmd
}
pub async fn stderr_reader(
std_errors: ChildStderr,
suffix: String,
) -> Result<(), Error> {
pub async fn stderr_reader(std_errors: ChildStderr, suffix: String) -> Result<(), Error> {
// read ffmpeg stderr decoder and encoder instance
// and log the output
@ -328,11 +321,25 @@ pub async fn stderr_reader(
let line = line?;
if line.contains("[info]") {
info!("<bright black>[{suffix}]</> {}", format_line(line, "info".to_string()))
info!(
"<bright black>[{suffix}]</> {}",
format_line(line, "info".to_string())
)
} else if line.contains("[warning]") {
warn!("<bright black>[{suffix}]</> {}", format_line(line, "warning".to_string()))
warn!(
"<bright black>[{suffix}]</> {}",
format_line(line, "warning".to_string())
)
} else {
error!("<bright black>[{suffix}]</> {}", format_line(line, "error".to_string()))
if suffix != "server"
&& !line.contains("Input/output error")
&& !line.contains("Broken pipe")
{
error!(
"<bright black>[{suffix}]</> {}",
format_line(line.clone(), "error".to_string())
);
}
}
}