reorganize inputs, continue work on ingest server

This commit is contained in:
jb-alvarado 2022-03-18 17:04:43 +01:00
parent 4e117af07b
commit 5e7df9c3b1
11 changed files with 618 additions and 153 deletions

54
Cargo.lock generated
View File

@ -154,6 +154,7 @@ dependencies = [
"notify",
"once_cell",
"openssl",
"process_control",
"rand",
"regex",
"serde",
@ -758,6 +759,16 @@ dependencies = [
"unicode-xid",
]
[[package]]
name = "process_control"
version = "3.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26d4fa9c62a51815c9588b09a94f713c1e9a87d74142537d7c7d5ee972b8479f"
dependencies = [
"libc",
"windows-sys",
]
[[package]]
name = "quote"
version = "1.0.15"
@ -1121,6 +1132,49 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
[[package]]
name = "windows-sys"
version = "0.33.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "43dbb096663629518eb1dfa72d80243ca5a6aca764cae62a2df70af760a9be75"
dependencies = [
"windows_aarch64_msvc",
"windows_i686_gnu",
"windows_i686_msvc",
"windows_x86_64_gnu",
"windows_x86_64_msvc",
]
[[package]]
name = "windows_aarch64_msvc"
version = "0.33.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cd761fd3eb9ab8cc1ed81e56e567f02dd82c4c837e48ac3b2181b9ffc5060807"
[[package]]
name = "windows_i686_gnu"
version = "0.33.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cab0cf703a96bab2dc0c02c0fa748491294bf9b7feb27e1f4f96340f208ada0e"
[[package]]
name = "windows_i686_msvc"
version = "0.33.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8cfdbe89cc9ad7ce618ba34abc34bbb6c36d99e96cae2245b7943cd75ee773d0"
[[package]]
name = "windows_x86_64_gnu"
version = "0.33.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4dd9b0c0e9ece7bb22e84d70d01b71c6d6248b81a3c60d11869451b4cb24784"
[[package]]
name = "windows_x86_64_msvc"
version = "0.33.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ff1e4aa646495048ec7f3ffddc411e1d829c026a2ec62b39da15c1055e406eaa"
[[package]]
name = "ws2_32-sys"
version = "0.2.1"

View File

@ -14,6 +14,7 @@ notify = "4.0.17"
rand = "0.8.5"
regex = "1"
once_cell = "1.10"
process_control = "3.3"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
serde_yaml = "0.8"

View File

@ -1,13 +1,187 @@
use std::{
io::{prelude::*, Read},
process::{Command, Stdio},
io::{prelude::*, BufReader, Error, Read},
process::{ChildStderr, Command, Stdio},
sync::{
mpsc::{channel, Receiver, Sender},
Arc, Mutex,
},
thread::sleep,
time::Duration,
};
use process_control::{ChildExt, Terminator};
use tokio::runtime::{Handle, Runtime};
async fn ingest_server(
dec_setting: Vec<&str>,
ingest_sender: Sender<[u8; 65424]>,
proc_terminator: Arc<Mutex<Option<Terminator>>>,
is_terminated: Arc<Mutex<bool>>,
rt_handle: Handle,
) -> Result<(), Error> {
let mut buffer: [u8; 65424] = [0; 65424];
let filter = "[0:v]fps=25,scale=1024:576,setdar=dar=1.778[vout1]";
let mut filter_list = vec!["-filter_complex", &filter, "-map", "[vout1]", "-map", "0:a"];
let mut server_cmd = vec!["-hide_banner", "-nostats", "-v", "level+error"];
let mut stream_input = vec![
"-f",
"live_flv",
"-listen",
"1",
"-i",
"rtmp://localhost:1936/live/stream",
];
server_cmd.append(&mut stream_input);
server_cmd.append(&mut filter_list);
server_cmd.append(&mut dec_setting.clone());
loop {
if *is_terminated.lock().unwrap() {
break;
}
let mut server_proc = match Command::new("ffmpeg")
.args(server_cmd.clone())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
{
Err(e) => {
panic!("couldn't spawn ingest server: {}", e)
}
Ok(proc) => proc,
};
let serv_terminator = server_proc.terminator()?;
*proc_terminator.lock().unwrap() = Some(serv_terminator);
rt_handle.spawn(stderr_reader(
server_proc.stderr.take().unwrap(),
"Server".to_string(),
proc_terminator.clone(),
is_terminated.clone(),
));
let ingest_reader = server_proc.stdout.as_mut().unwrap();
loop {
match ingest_reader.read_exact(&mut buffer[..]) {
Ok(length) => length,
Err(_) => break,
};
if let Err(e) = ingest_sender.send(buffer) {
println!("Ingest server error: {:?}", e);
break;
}
}
sleep(Duration::from_secs(1));
if let Err(e) = server_proc.wait() {
panic!("Server error: {:?}", e)
};
}
Ok(())
}
pub async fn stderr_reader(
std_errors: ChildStderr,
suffix: String,
server_term: Arc<Mutex<Option<Terminator>>>,
is_terminated: Arc<Mutex<bool>>,
) -> Result<(), Error> {
// read ffmpeg stderr decoder and encoder instance
// and log the output
fn format_line(line: String, level: String) -> String {
line.replace(&format!("[{}] ", level), "")
}
let buffer = BufReader::new(std_errors);
for line in buffer.lines() {
let line = line?;
if line.contains("[info]") {
println!("[{suffix}] {}", format_line(line, "info".to_string()))
} else if line.contains("[warning]") {
println!("[{suffix}] {}", format_line(line, "warning".to_string()))
} else {
if suffix != "server" && !line.contains("Input/output error") {
println!(
"[{suffix}] {}",
format_line(line.clone(), "level+error".to_string())
);
}
if line.contains("Error closing file pipe:: Broken pipe") {
*is_terminated.lock().unwrap() = true;
match &*server_term.lock().unwrap() {
Some(serv) => unsafe {
if let Ok(_) = serv.terminate() {
println!("Terminate server done");
}
},
None => (),
}
}
}
}
Ok(())
}
fn main() {
let mut enc_proc = match Command::new("ffplay")
.args(["-v", "error", "-hide_banner", "-nostats", "-i", "pipe:0"])
let decoder_term: Arc<Mutex<Option<Terminator>>> = Arc::new(Mutex::new(None));
let player_term: Arc<Mutex<Option<Terminator>>> = Arc::new(Mutex::new(None));
let server_term: Arc<Mutex<Option<Terminator>>> = Arc::new(Mutex::new(None));
let is_terminated: Arc<Mutex<bool>> = Arc::new(Mutex::new(false));
let runtime = Runtime::new().unwrap();
let rt_handle = runtime.handle();
let dec_setting: Vec<&str> = vec![
"-pix_fmt",
"yuv420p",
"-c:v",
"mpeg2video",
"-g",
"1",
"-b:v",
"50000k",
"-minrate",
"50000k",
"-maxrate",
"50000k",
"-bufsize",
"25000k",
"-c:a",
"s302m",
"-strict",
"-2",
"-ar",
"48000",
"-ac",
"2",
"-f",
"mpegts",
"-",
];
let mut player_proc = match Command::new("ffplay")
.args([
"-v",
"level+error",
"-hide_banner",
"-nostats",
"-i",
"pipe:0",
])
.stdin(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
@ -16,44 +190,49 @@ fn main() {
Ok(proc) => proc,
};
rt_handle.spawn(stderr_reader(
player_proc.stderr.take().unwrap(),
"Player".to_string(),
server_term.clone(),
is_terminated.clone(),
));
let player_terminator = match player_proc.terminator() {
Ok(proc) => Some(proc),
Err(_) => None,
};
*player_term.lock().unwrap() = player_terminator;
let (ingest_sender, ingest_receiver): (Sender<[u8; 65424]>, Receiver<[u8; 65424]>) = channel();
rt_handle.spawn(ingest_server(
dec_setting.clone(),
ingest_sender,
server_term.clone(),
is_terminated.clone(),
rt_handle.clone(),
));
let mut buffer: [u8; 65424] = [0; 65424];
let mut dec_cmd = vec![
"-v",
"level+error",
"-hide_banner",
"-nostats",
"-f",
"lavfi",
"-i",
"testsrc=duration=20:size=1024x576:rate=25",
"-f",
"lavfi",
"-i",
"anoisesrc=d=20:c=pink:r=48000:a=0.5",
];
dec_cmd.append(&mut dec_setting.clone());
let mut dec_proc = match Command::new("ffmpeg")
.args([
"-f",
"lavfi",
"-i",
"testsrc=duration=6:size=1280x720:rate=25",
"-f",
"lavfi",
"-i",
"anoisesrc=d=6:c=pink:r=48000:a=0.5",
"-pix_fmt",
"yuv420p",
"-c:v",
"mpeg2video",
"-g",
"1",
"-b:v",
"50000k",
"-minrate",
"50000k",
"-maxrate",
"50000k",
"-bufsize",
"25000k",
"-c:a",
"s302m",
"-strict",
"-2",
"-ar",
"48000",
"-ac",
"2",
"-f",
"mpegts",
"-",
])
.args(dec_cmd)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
@ -62,18 +241,37 @@ fn main() {
Ok(proc) => proc,
};
let mut enc_writer = enc_proc.stdin.as_ref().unwrap();
rt_handle.spawn(stderr_reader(
dec_proc.stderr.take().unwrap(),
"Decoder".to_string(),
server_term.clone(),
is_terminated.clone(),
));
let dec_terminator = match dec_proc.terminator() {
Ok(proc) => Some(proc),
Err(_) => None,
};
*decoder_term.lock().unwrap() = dec_terminator;
let mut player_writer = player_proc.stdin.as_ref().unwrap();
let dec_reader = dec_proc.stdout.as_mut().unwrap();
loop {
let bytes_len = match dec_reader.read(&mut buffer[..]) {
Ok(length) => length,
Err(e) => panic!("Reading error from decoder: {:?}", e)
Err(e) => panic!("Reading error from decoder: {:?}", e),
};
match enc_writer.write(&buffer[..bytes_len]) {
Ok(_) => (),
Err(e) => panic!("Err: {:?}", e),
if let Ok(receive) = ingest_receiver.try_recv() {
if let Err(e) = player_writer.write_all(&receive) {
panic!("Err: {:?}", e)
};
continue;
}
if let Err(e) = player_writer.write(&buffer[..bytes_len]) {
panic!("Err: {:?}", e)
};
if bytes_len == 0 {
@ -81,15 +279,42 @@ fn main() {
}
}
match dec_proc.wait() {
Ok(_) => println!("decoding done..."),
Err(e) => panic!("Enc error: {:?}", e),
}
*is_terminated.lock().unwrap() = true;
sleep(Duration::from_secs(1));
match enc_proc.kill() {
Ok(_) => println!("Playout done..."),
Err(e) => panic!("Enc error: {:?}", e),
println!("Terminate decoder...");
match &*decoder_term.lock().unwrap() {
Some(dec) => unsafe {
if let Ok(_) = dec.terminate() {
println!("Terminate decoder done");
}
},
None => (),
}
println!("Terminate encoder...");
match &*player_term.lock().unwrap() {
Some(enc) => unsafe {
if let Ok(_) = enc.terminate() {
println!("Terminate encoder done");
}
},
None => (),
}
println!("Terminate server...");
match &*server_term.lock().unwrap() {
Some(serv) => unsafe {
if let Ok(_) = serv.terminate() {
println!("Terminate server done");
}
},
None => (),
}
println!("Terminate done...");
}

123
src/input/ingest.rs Normal file
View File

@ -0,0 +1,123 @@
use std::{
io::{Error, Read},
path::Path,
process::{Command, Stdio},
sync::{mpsc::Sender, Arc, Mutex},
thread::sleep,
time::Duration,
};
use process_control::{ChildExt, Terminator};
use simplelog::*;
use tokio::runtime::Handle;
use crate::utils::{stderr_reader, GlobalConfig};
fn overlay(config: &GlobalConfig) -> String {
let mut logo_chain = String::new();
if config.processing.add_logo && Path::new(&config.processing.logo).is_file() {
let opacity = format!(
"format=rgba,colorchannelmixer=aa={}",
config.processing.logo_opacity
);
let logo_loop = "loop=loop=-1:size=1:start=0";
logo_chain = format!("[v];movie={},{logo_loop},{opacity}", config.processing.logo);
logo_chain
.push_str(format!("[l];[v][l]{}:shortest=1", config.processing.logo_filter).as_str());
}
logo_chain
}
pub async fn ingest_server(
log_format: String,
ingest_sender: Sender<[u8; 65424]>,
rt_handle: Handle,
proc_terminator: Arc<Mutex<Option<Terminator>>>,
is_terminated: Arc<Mutex<bool>>,
) -> Result<(), Error> {
let config = GlobalConfig::global();
let mut buffer: [u8; 65424] = [0; 65424];
let mut filter = format!(
"[0:v]fps={},scale={}:{},setdar=dar={}",
config.processing.fps,
config.processing.width,
config.processing.height,
config.processing.aspect
);
filter.push_str(&overlay(&config));
filter.push_str("[vout1]");
let mut filter_list = vec!["-filter_complex", &filter, "-map", "[vout1]", "-map", "0:a"];
let mut server_cmd = vec!["-hide_banner", "-nostats", "-v", log_format.as_str()];
let stream_input = config.ingest.stream_input.clone();
let stream_settings = config.processing.settings.clone().unwrap();
server_cmd.append(&mut stream_input.iter().map(String::as_str).collect());
server_cmd.append(&mut filter_list);
server_cmd.append(&mut stream_settings.iter().map(String::as_str).collect());
info!(
"Start ingest server, listening on: <b><magenta>{}</></b>",
stream_input.last().unwrap()
);
debug!("Server CMD: <bright-blue>{:?}</>", server_cmd);
loop {
if *is_terminated.lock().unwrap() {
break
}
let mut server_proc = match Command::new("ffmpeg")
.args(server_cmd.clone())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
{
Err(e) => {
error!("couldn't spawn ingest server: {}", e);
panic!("couldn't spawn ingest server: {}", e)
}
Ok(proc) => proc,
};
let serv_terminator = server_proc.terminator()?;
*proc_terminator.lock().unwrap() = Some(serv_terminator);
rt_handle.spawn(stderr_reader(
server_proc.stderr.take().unwrap(),
"Server".to_string(),
proc_terminator.clone(),
is_terminated.clone(),
));
let ingest_reader = server_proc.stdout.as_mut().unwrap();
loop {
if let Err(e) = ingest_reader.read_exact(&mut buffer[..]) {
debug!("Ingest server read {:?}", e);
break;
};
if let Err(e) = ingest_sender.send(buffer) {
error!("Ingest server write error: {:?}", e);
*is_terminated.lock().unwrap() = true;
server_proc.kill().expect("Ingest server could not killed");
break;
}
}
sleep(Duration::from_secs(1));
if let Err(e) = server_proc.wait() {
panic!("Ingest server {:?}", e)
};
}
Ok(())
}

7
src/input/mod.rs Normal file
View File

@ -0,0 +1,7 @@
pub mod folder;
pub mod ingest;
pub mod playlist;
pub use ingest::ingest_server;
pub use folder::{watch_folder, Source};
pub use playlist::CurrentProgram;

View File

@ -1,4 +1,7 @@
use std::path::Path;
use std::{
path::Path,
sync::{Arc, Mutex},
};
use simplelog::*;
use tokio::runtime::Handle;
@ -16,7 +19,7 @@ pub struct CurrentProgram {
json_path: Option<String>,
nodes: Vec<Media>,
current_node: Media,
init: bool,
pub init: Arc<Mutex<bool>>,
index: usize,
rt_handle: Handle,
}
@ -33,7 +36,7 @@ impl CurrentProgram {
json_path: json.current_file,
nodes: json.program,
current_node: Media::new(0, "".to_string()),
init: true,
init: Arc::new(Mutex::new(true)),
index: 0,
rt_handle,
}
@ -73,7 +76,7 @@ impl CurrentProgram {
self.json_path = None;
self.nodes = vec![media.clone()];
self.current_node = media;
self.init = true;
*self.init.lock().unwrap() = true;
self.index = 0;
}
}
@ -103,7 +106,7 @@ impl CurrentProgram {
self.index = 0;
if json.current_file.is_none() {
self.init = true;
*self.init.lock().unwrap() = true;
}
}
}
@ -139,7 +142,7 @@ impl CurrentProgram {
for (i, item) in self.nodes.iter_mut().enumerate() {
if start_sec + item.out - item.seek > time_sec {
self.init = false;
*self.init.lock().unwrap() = false;
self.index = i + 1;
item.seek = time_sec - start_sec;
@ -155,7 +158,7 @@ impl Iterator for CurrentProgram {
type Item = Media;
fn next(&mut self) -> Option<Self::Item> {
if self.init {
if *self.init.lock().unwrap() {
debug!("Playlist init");
self.check_update(true);
@ -163,7 +166,7 @@ impl Iterator for CurrentProgram {
self.get_init_clip();
}
if self.init {
if *self.init.lock().unwrap() {
// on init load playlist, could be not long enough,
// so we check if we can take the next playlist already,
// or we fill the gap with a dummy.
@ -185,7 +188,7 @@ impl Iterator for CurrentProgram {
if DUMMY_LEN > total_delta {
duration = total_delta;
self.init = false;
*self.init.lock().unwrap() = false;
}
if self.config.playlist.start_sec.unwrap() > current_time {
@ -225,8 +228,7 @@ impl Iterator for CurrentProgram {
} else {
let last_playlist = self.json_path.clone();
self.check_for_next_playlist();
let (_, total_delta) =
get_delta(&self.config.playlist.start_sec.unwrap());
let (_, total_delta) = get_delta(&self.config.playlist.start_sec.unwrap());
let mut last_ad = self.is_ad(self.index, false);
if last_playlist == self.json_path

View File

@ -2,6 +2,7 @@ extern crate log;
extern crate simplelog;
mod filter;
mod input;
mod output;
mod utils;

View File

@ -4,26 +4,37 @@ use std::{
path::Path,
process,
process::{Command, Stdio},
sync::{mpsc::channel, Arc, Mutex},
sync::{
mpsc::{channel, Receiver, Sender},
Arc, Mutex,
},
thread::sleep,
time::Duration,
};
use process_control::{ChildExt, Terminator};
use simplelog::*;
use tokio::runtime::Handle;
mod desktop;
mod stream;
use crate::utils::{
ingest_server, sec_to_time, stderr_reader, watch_folder, CurrentProgram, GlobalConfig, Media,
Source,
};
use crate::input::{ingest_server, watch_folder, CurrentProgram, Source};
use crate::utils::{sec_to_time, stderr_reader, GlobalConfig, Media};
pub fn play(rt_handle: &Handle) {
let config = GlobalConfig::global();
let dec_settings = config.processing.clone().settings.unwrap();
let ff_log_format = format!("level+{}", config.logging.ffmpeg_level.to_lowercase());
let dec_pid: Arc<Mutex<u32>> = Arc::new(Mutex::new(0));
let decoder_term: Arc<Mutex<Option<Terminator>>> = Arc::new(Mutex::new(None));
let encoder_term: Arc<Mutex<Option<Terminator>>> = Arc::new(Mutex::new(None));
let server_term: Arc<Mutex<Option<Terminator>>> = Arc::new(Mutex::new(None));
let is_terminated: Arc<Mutex<bool>> = Arc::new(Mutex::new(false));
let mut init_playlist: Option<Arc<Mutex<bool>>> = None;
let mut live_on = false;
let mut buffer: [u8; 65424] = [0; 65424];
let get_source = match config.processing.clone().mode.as_str() {
"folder" => {
@ -51,7 +62,9 @@ pub fn play(rt_handle: &Handle) {
}
"playlist" => {
info!("Playout in playlist mode");
Box::new(CurrentProgram::new(rt_handle.clone())) as Box<dyn Iterator<Item = Media>>
let program = CurrentProgram::new(rt_handle.clone());
init_playlist = Some(program.init.clone());
Box::new(program) as Box<dyn Iterator<Item = Media>>
}
_ => {
error!("Process Mode not exists!");
@ -59,23 +72,36 @@ pub fn play(rt_handle: &Handle) {
}
};
let dec_settings = config.processing.clone().settings.unwrap();
let ff_log_format = format!("level+{}", config.logging.ffmpeg_level.to_lowercase());
let mut enc_proc = match config.out.mode.as_str() {
"desktop" => desktop::output(ff_log_format.clone()),
"stream" => stream::output(ff_log_format.clone()),
_ => panic!("Output mode doesn't exists!"),
};
let enc_terminator = match enc_proc.terminator() {
Ok(proc) => Some(proc),
Err(_) => None,
};
*encoder_term.lock().unwrap() = enc_terminator;
rt_handle.spawn(stderr_reader(
enc_proc.stderr.take().unwrap(),
"Encoder".to_string(),
server_term.clone(),
is_terminated.clone(),
));
let mut buffer: [u8; 65424] = [0; 65424];
let (ingest_sender, ingest_receiver): (Sender<[u8; 65424]>, Receiver<([u8; 65424])>) = channel();
ingest_server(ff_log_format.clone());
if config.ingest.enable {
rt_handle.spawn(ingest_server(
ff_log_format.clone(),
ingest_sender,
rt_handle.clone(),
server_term.clone(),
is_terminated.clone(),
));
}
for node in get_source {
let cmd = match node.cmd {
@ -94,8 +120,7 @@ pub fn play(rt_handle: &Handle) {
);
let filter = node.filter.unwrap();
let mut dec_cmd = vec!["-v", ff_log_format.as_str(), "-hide_banner", "-nostats"];
let mut dec_cmd = vec!["-hide_banner", "-nostats", "-v", ff_log_format.as_str()];
dec_cmd.append(&mut cmd.iter().map(String::as_str).collect());
@ -119,31 +144,65 @@ pub fn play(rt_handle: &Handle) {
Ok(proc) => proc,
};
*dec_pid.lock().unwrap() = dec_proc.id();
let dec_terminator = match dec_proc.terminator() {
Ok(proc) => Some(proc),
Err(_) => None,
};
*decoder_term.lock().unwrap() = dec_terminator;
let mut enc_writer = enc_proc.stdin.as_ref().unwrap();
let dec_reader = dec_proc.stdout.as_mut().unwrap();
// debug!("Decoder PID: <yellow>{}</>", dec_pid.lock().unwrap());
rt_handle.spawn(stderr_reader(
dec_proc.stderr.take().unwrap(),
"Decoder".to_string(),
server_term.clone(),
is_terminated.clone(),
));
let mut kill_dec = true;
loop {
let dec_bytes_len = match dec_reader.read(&mut buffer[..]) {
Ok(length) => length,
Err(e) => panic!("Reading error from decoder: {:?}", e),
};
if let Err(e) = enc_writer.write(&buffer[..dec_bytes_len]) {
panic!("Err: {:?}", e)
};
if let Ok(receive) = ingest_receiver.try_recv() {
if let Err(e) = enc_writer.write_all(&receive) {
panic!("Ingest receiver error: {:?}", e)
};
live_on = true;
if kill_dec {
if let Some(dec) = &*decoder_term.lock().unwrap() {
unsafe {
if let Ok(_) = dec.terminate() {
info!("Switch from decoder to live ingest");
}
}
};
kill_dec = false;
if let Some(init) = &init_playlist {
*init.lock().unwrap() = true;
}
}
} else if dec_bytes_len > 0 {
if let Err(e) = enc_writer.write(&buffer[..dec_bytes_len]) {
panic!("Encoder write error: {:?}", e)
};
} else {
if live_on {
info!("Switch from live ingest to decoder");
live_on = false;
}
if dec_bytes_len == 0 {
break;
};
}
}
if let Err(e) = dec_proc.wait() {
@ -151,10 +210,33 @@ pub fn play(rt_handle: &Handle) {
};
}
*is_terminated.lock().unwrap() = true;
sleep(Duration::from_secs(1));
match enc_proc.kill() {
Ok(_) => info!("Playout done..."),
Err(e) => panic!("Encoder error: {:?}", e),
}
if let Some(dec) = &*decoder_term.lock().unwrap() {
unsafe {
if let Ok(_) = dec.terminate() {
debug!("Terminate decoder done");
}
}
};
if let Some(enc) = &*encoder_term.lock().unwrap() {
unsafe {
if let Ok(_) = enc.terminate() {
debug!("Terminate encoder done");
}
}
};
if let Some(server) = &*server_term.lock().unwrap() {
unsafe {
if let Ok(_) = server.terminate() {
debug!("Terminate server done");
}
}
};
info!("Playout done...");
}

View File

@ -1,51 +0,0 @@
use std::path::Path;
use simplelog::*;
use crate::utils::GlobalConfig;
fn overlay(config: &GlobalConfig) -> String {
let mut logo_chain = String::new();
if config.processing.add_logo && Path::new(&config.processing.logo).is_file() {
let opacity = format!(
"format=rgba,colorchannelmixer=aa={}",
config.processing.logo_opacity
);
let logo_loop = "loop=loop=-1:size=1:start=0";
logo_chain = format!("[v];movie={},{logo_loop},{opacity}", config.processing.logo);
logo_chain
.push_str(format!("[l];[v][l]{}:shortest=1", config.processing.logo_filter).as_str());
}
logo_chain
}
pub fn ingest_server(log_format: String) {
let config = GlobalConfig::global();
let mut filter = format!(
"[0:v]fps={},scale={}:{},'setdar=dar={}",
config.processing.fps,
config.processing.width,
config.processing.height,
config.processing.aspect
);
filter.push_str(&overlay(&config));
filter.push_str("[vout1]");
let mut filter_list = vec!["-filter_complex", &filter, "-map", "[vout1]", "-map", "0:a"];
let mut server_cmd = vec!["-hide_banner", "-nostats", "-v", log_format.as_str()];
let stream_input = config.ingest.stream_input.clone();
let stream_settings = config.processing.settings.clone().unwrap();
server_cmd.append(&mut stream_input.iter().map(String::as_str).collect());
server_cmd.append(&mut filter_list);
server_cmd.append(&mut stream_settings.iter().map(String::as_str).collect());
info!(
"Start ingest server, listening on: <b><magenta>{}</></b>",
stream_input.last().unwrap()
);
}

View File

@ -7,29 +7,25 @@ use std::{
io::{BufRead, BufReader, Error},
path::Path,
process::ChildStderr,
sync::{Arc, Mutex},
time,
time::UNIX_EPOCH,
};
use process_control::Terminator;
use simplelog::*;
mod arg_parse;
mod config;
mod folder;
mod ingest;
mod json_reader;
pub mod json_reader;
mod json_validate;
mod logging;
mod playlist;
pub use arg_parse::get_args;
pub use config::{init_config, GlobalConfig};
pub use ingest::ingest_server;
pub use folder::{watch_folder, Source};
pub use json_reader::{read_json, DUMMY_LEN, Playlist};
pub use json_reader::{read_json, Playlist, DUMMY_LEN};
pub use json_validate::validate_playlist;
pub use logging::init_logging;
pub use playlist::CurrentProgram;
use crate::filter::filter_chains;
@ -82,11 +78,11 @@ impl Media {
}
}
fn add_probe(&mut self) {
pub fn add_probe(&mut self) {
self.probe = Some(MediaProbe::new(self.source.clone()))
}
fn add_filter(&mut self) {
pub fn add_filter(&mut self) {
let mut node = self.clone();
self.filter = Some(filter_chains(&mut node))
}
@ -258,7 +254,7 @@ pub fn check_sync(delta: f64) -> bool {
let config = GlobalConfig::global();
if delta.abs() > config.general.stop_threshold && config.general.stop_threshold > 0.0 {
error!("Start time out of sync for <yellow>{}</> seconds", delta);
error!("Clip begin out of sync for <yellow>{}</> seconds", delta);
return false;
}
@ -316,6 +312,8 @@ pub fn seek_and_length(src: String, seek: f64, out: f64, duration: f64) -> Vec<S
pub async fn stderr_reader(
std_errors: ChildStderr,
suffix: String,
server_term: Arc<Mutex<Option<Terminator>>>,
is_terminated: Arc<Mutex<bool>>,
) -> Result<(), Error> {
// read ffmpeg stderr decoder and encoder instance
// and log the output
@ -330,11 +328,34 @@ pub async fn stderr_reader(
let line = line?;
if line.contains("[info]") {
info!("<bright black>[{suffix}]</> {}", format_line(line, "info".to_string()))
info!(
"<bright black>[{suffix}]</> {}",
format_line(line, "info".to_string())
)
} else if line.contains("[warning]") {
warn!("<bright black>[{suffix}]</> {}", format_line(line, "warning".to_string()))
warn!(
"<bright black>[{suffix}]</> {}",
format_line(line, "warning".to_string())
)
} else {
error!("<bright black>[{suffix}]</> {}", format_line(line, "error".to_string()))
if suffix != "server" && !line.contains("Input/output error") {
error!(
"<bright black>[{suffix}]</> {}",
format_line(line.clone(), "error".to_string())
);
}
if line.contains("Error closing file pipe:: Broken pipe") {
*is_terminated.lock().unwrap() = true;
if let Some(server) = &*server_term.lock().unwrap() {
unsafe {
if let Ok(_) = server.terminate() {
info!("Terminate ingest server");
}
}
};
}
}
}