Merge pull request #124 from jb-alvarado/master

fix log suffix, thread hanging
This commit is contained in:
jb-alvarado 2022-05-18 14:31:22 +02:00 committed by GitHub
commit 1da890ad0c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 323 additions and 254 deletions

70
Cargo.lock generated
View File

@ -82,13 +82,12 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "chrono"
version = "0.4.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "670ad68c9088c2a963aaa298cb369688cf3f9465ce5e2d4ca10e6e0098a1ce73"
source = "git+https://github.com/sbrocket/chrono?branch=parse-error-kind-public#edd600cc111162573bb81e1850100205849f957d"
dependencies = [
"libc",
"num-integer",
"num-traits",
"time 0.1.43",
"time 0.1.44",
"winapi 0.3.9",
]
@ -196,7 +195,7 @@ dependencies = [
[[package]]
name = "ffplayout-engine"
version = "0.9.5"
version = "0.9.6"
dependencies = [
"chrono",
"clap",
@ -232,7 +231,7 @@ dependencies = [
[[package]]
name = "file-rotate"
version = "0.6.0"
source = "git+https://github.com/jb-alvarado/file-rotate.git#ee1dc1cea05885b8cb472191b50a044869da7e04"
source = "git+https://github.com/Ploppz/file-rotate.git?branch=timestamp-parse-fix#cb1874a15a7a18de820a57df48d3513e5a4076f4"
dependencies = [
"chrono",
"flate2",
@ -415,7 +414,7 @@ checksum = "9be70c98951c83b8d2f8f60d7065fa6d5146873094452a1008da8c2f1e4205ad"
dependencies = [
"cfg-if 1.0.0",
"libc",
"wasi 0.10.2+wasi-snapshot-preview1",
"wasi 0.10.0+wasi-snapshot-preview1",
]
[[package]]
@ -581,9 +580,9 @@ dependencies = [
[[package]]
name = "itoa"
version = "1.0.1"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1aab8fc367588b89dcee83ab0fd66b72b50b72fa1904d7095045ace2b0c81c35"
checksum = "112c678d4050afce233f4f2852bb2eb519230b3cf12f33585275537d7e41578d"
[[package]]
name = "jsonrpc-core"
@ -679,9 +678,9 @@ dependencies = [
[[package]]
name = "libc"
version = "0.2.125"
version = "0.2.126"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5916d2ae698f6de9bfb891ad7a8d65c09d232dc58cc4ac433c7da3b2fd84bc2b"
checksum = "349d5a591cd28b49e1d1037471617a32ddcda5731b99419008085f72d5a53836"
[[package]]
name = "linked-hash-map"
@ -960,9 +959,9 @@ dependencies = [
[[package]]
name = "os_str_bytes"
version = "6.0.0"
version = "6.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e22443d1643a904602595ba1cd8f7d896afe56d26712531c5ff73a15b2fbf64"
checksum = "029d8d0b2f198229de29dca79676f2738ff952edf3fde542eb8bf94d8c21b435"
[[package]]
name = "paris"
@ -1045,11 +1044,11 @@ dependencies = [
[[package]]
name = "proc-macro2"
version = "1.0.38"
version = "1.0.39"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9027b48e9d4c9175fa2218adf3557f91c1137021739951d4932f5f8268ac48aa"
checksum = "c54b25569025b7fc9651de43004ae593a75ad88543b17178aa5e1b9c4f15f56f"
dependencies = [
"unicode-xid",
"unicode-ident",
]
[[package]]
@ -1134,9 +1133,9 @@ dependencies = [
[[package]]
name = "ryu"
version = "1.0.9"
version = "1.0.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "73b4b750c782965c211b42f022f59af1fbceabdd026623714f104152f1ec149f"
checksum = "f3f6f92acf49d1b98f7a81226834412ada05458b7364277387724a237f062695"
[[package]]
name = "same-file"
@ -1149,12 +1148,12 @@ dependencies = [
[[package]]
name = "schannel"
version = "0.1.19"
version = "0.1.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f05ba609c234e60bee0d547fe94a4c7e9da733d1c962cf6e59efa4cd9c8bc75"
checksum = "88d6731146462ea25d9244b2ed5fd1d716d25c52e4d54aa4fb0f3c4e9854dbe2"
dependencies = [
"lazy_static",
"winapi 0.3.9",
"windows-sys",
]
[[package]]
@ -1277,13 +1276,13 @@ checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623"
[[package]]
name = "syn"
version = "1.0.93"
version = "1.0.95"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "04066589568b72ec65f42d65a1a52436e954b168773148893c020269563decf2"
checksum = "fbaf6116ab8924f39d52792136fb74fd60a80194cf1b1c6ffa6453eef1c3f942"
dependencies = [
"proc-macro2",
"quote",
"unicode-xid",
"unicode-ident",
]
[[package]]
@ -1317,11 +1316,12 @@ checksum = "b1141d4d61095b28419e22cb0bbf02755f5e54e0526f97f1e3d1d160e60885fb"
[[package]]
name = "time"
version = "0.1.43"
version = "0.1.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca8a50ef2360fbd1eeb0ecd46795a87a19024eb4b53c5dc916ca1fd95fe62438"
checksum = "6db9e6914ab8b1ae1c260a4ae7a49b6c5611b40328a735b21862567685e73255"
dependencies = [
"libc",
"wasi 0.10.0+wasi-snapshot-preview1",
"winapi 0.3.9",
]
@ -1388,9 +1388,9 @@ dependencies = [
[[package]]
name = "tokio-util"
version = "0.6.9"
version = "0.6.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e99e1983e5d376cd8eb4b66604d2e99e79f5bd988c3055891dcd8c9e2604cc0"
checksum = "36943ee01a6d67977dd3f84a5a1d2efeb4ada3a1ae771cadfaa535d9d9fc6507"
dependencies = [
"bytes",
"futures-core",
@ -1447,6 +1447,12 @@ version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "099b7128301d285f79ddd55b9a83d5e6b9e97c92e0ea0daebee7263e932de992"
[[package]]
name = "unicode-ident"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d22af068fba1eb5edcb4aea19d382b2a3deb4c8f9d475c589b6ada9e0fd493ee"
[[package]]
name = "unicode-normalization"
version = "0.1.19"
@ -1456,12 +1462,6 @@ dependencies = [
"tinyvec",
]
[[package]]
name = "unicode-xid"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "957e51f3646910546462e67d5f7599b9e4fb8acdd304b087a6494730f9eebf04"
[[package]]
name = "vcpkg"
version = "0.2.15"
@ -1497,9 +1497,9 @@ dependencies = [
[[package]]
name = "wasi"
version = "0.10.2+wasi-snapshot-preview1"
version = "0.10.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6"
checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f"
[[package]]
name = "wasi"

View File

@ -4,15 +4,15 @@ description = "24/7 playout based on rust and ffmpeg"
license = "GPL-3.0"
authors = ["Jonathan Baecker jonbae77@gmail.com"]
readme = "README.md"
version = "0.9.5"
version = "0.9.6"
edition = "2021"
[dependencies]
chrono = "0.4"
chrono = { git = "https://github.com/sbrocket/chrono", branch = "parse-error-kind-public" }
clap = { version = "3.1", features = ["derive"] }
crossbeam-channel = "0.5"
ffprobe = "0.3"
file-rotate = { git = "https://github.com/jb-alvarado/file-rotate.git" }
file-rotate = { git = "https://github.com/Ploppz/file-rotate.git", branch = "timestamp-parse-fix" }
jsonrpc-http-server = "18.0"
lettre = "0.10.0-rc.6"
log = "0.4"

View File

@ -1,5 +1,5 @@
[Unit]
Description=Rust based 24/7 playout solution
Description=Rust and ffmpeg based playout solution
After=network.target remote-fs.target
[Service]

26
docs/README.md Normal file
View File

@ -0,0 +1,26 @@
**ffplayout-engine Documentation**
================
### **[For Developer](/docs/developer.md)**
Learn how to setup a developer environment and to cross compile for different platforms.
### **[Folder Mode](/docs/folder_mode.md)**
Learn more about playing the content of a folder.
### **[Live Ingest](/docs/live_ingest.md)**
Using live ingest to inject a live stream.
### **[Output Modes](/docs/output.md)**
The different output modes.
### **[Preview Stream](/docs/preview_stream.md)**
Setup and use a preview stream.
### **[Remove Sources](/docs/remote_source.md)**
Use of remote sources, like https://example.org/video.mp4

View File

@ -5,7 +5,7 @@ use crate::utils::GlobalConfig;
/// Add loudness normalization.
pub fn filter_node(config: &GlobalConfig) -> String {
format!(
",loudnorm=I={}:TP={}:LRA={}",
"loudnorm=I={}:TP={}:LRA={}",
config.processing.loud_i, config.processing.loud_tp, config.processing.loud_lra
)
}

View File

@ -8,6 +8,7 @@ fn audio_filter(config: &GlobalConfig) -> String {
let mut audio_chain = ";[0:a]afade=in:st=0:d=0.5".to_string();
if config.processing.loudnorm_ingest {
audio_chain.push(',');
audio_chain.push_str(&a_loudnorm::filter_node(config));
}

View File

@ -13,8 +13,8 @@ use crate::utils::{get_delta, is_close, GlobalConfig, Media};
struct Filters {
audio_chain: Option<String>,
video_chain: Option<String>,
audio_map: Option<String>,
video_map: Option<String>,
audio_map: String,
video_map: String,
}
impl Filters {
@ -22,8 +22,8 @@ impl Filters {
Filters {
audio_chain: None,
video_chain: None,
audio_map: Some("0:a".to_string()),
video_map: Some("0:v".to_string()),
audio_map: "1:a".to_string(),
video_map: "0:v".to_string(),
}
}
@ -41,10 +41,9 @@ impl Filters {
if filter.contains("aevalsrc") || filter.contains("anoisesrc") {
self.audio_chain = Some(filter.to_string());
} else {
self.audio_chain =
Some(format!("[{}]{filter}", self.audio_map.clone().unwrap()));
self.audio_chain = Some(format!("[{}]{filter}", self.audio_map.clone()));
}
self.audio_map = Some("[aout1]".to_string());
self.audio_map = "[aout1]".to_string();
}
},
"video" => match &self.video_chain {
@ -57,7 +56,7 @@ impl Filters {
}
None => {
self.video_chain = Some(format!("[0:v]{filter}"));
self.video_map = Some("[vout1]".to_string());
self.video_map = "[vout1]".to_string();
}
},
_ => (),
@ -101,18 +100,30 @@ fn fps(fps: f64, chain: &mut Filters, config: &GlobalConfig) {
}
}
fn scale(width: i64, height: i64, aspect: f64, chain: &mut Filters, config: &GlobalConfig) {
if width != config.processing.width || height != config.processing.height {
fn scale(v_stream: &ffprobe::Stream, aspect: f64, chain: &mut Filters, config: &GlobalConfig) {
// width: i64, height: i64
if let (Some(w), Some(h)) = (v_stream.width, v_stream.height) {
if w != config.processing.width || h != config.processing.height {
chain.add_filter(
&format!(
"scale={}:{}",
config.processing.width, config.processing.height
),
"video",
)
}
if !is_close(aspect, config.processing.aspect, 0.03) {
chain.add_filter(&format!("setdar=dar={}", config.processing.aspect), "video")
}
} else {
chain.add_filter(
&format!(
"scale={}:{}",
config.processing.width, config.processing.height
),
"video",
)
}
if !is_close(aspect, config.processing.aspect, 0.03) {
);
chain.add_filter(&format!("setdar=dar={}", config.processing.aspect), "video")
}
}
@ -161,20 +172,22 @@ fn overlay(node: &mut Media, chain: &mut Filters, config: &GlobalConfig) {
}
fn extend_video(node: &mut Media, chain: &mut Filters) {
let video_streams = node.probe.clone().unwrap().video_streams.unwrap();
if !video_streams.is_empty() {
if let Some(duration) = &video_streams[0].duration {
let duration_float = duration.clone().parse::<f64>().unwrap();
if let Some(duration) = node
.probe
.as_ref()
.and_then(|p| p.video_streams.as_ref())
.and_then(|v| v[0].duration.as_ref())
{
let duration_float = duration.clone().parse::<f64>().unwrap();
if node.out - node.seek > duration_float - node.seek + 0.1 {
chain.add_filter(
&format!(
"tpad=stop_mode=add:stop_duration={}",
(node.out - node.seek) - (duration_float - node.seek)
),
"video",
)
}
if node.out - node.seek > duration_float - node.seek + 0.1 {
chain.add_filter(
&format!(
"tpad=stop_mode=add:stop_duration={}",
(node.out - node.seek) - (duration_float - node.seek)
),
"video",
)
}
}
}
@ -198,8 +211,13 @@ fn add_text(node: &mut Media, chain: &mut Filters, config: &GlobalConfig) {
}
fn add_audio(node: &mut Media, chain: &mut Filters) {
let audio_streams = node.probe.clone().unwrap().audio_streams.unwrap();
if audio_streams.is_empty() {
if node
.probe
.as_ref()
.and_then(|p| p.audio_streams.as_ref())
.unwrap_or(&vec![])
.is_empty()
{
warn!("Clip: '{}' has no audio!", node.source);
let audio = format!(
"aevalsrc=0:channel_layout=stereo:duration={}:sample_rate=48000",
@ -210,29 +228,29 @@ fn add_audio(node: &mut Media, chain: &mut Filters) {
}
fn extend_audio(node: &mut Media, chain: &mut Filters) {
let audio_streams = node.probe.clone().unwrap().audio_streams.unwrap();
if !audio_streams.is_empty() {
if let Some(duration) = &audio_streams[0].duration {
let duration_float = duration.clone().parse::<f64>().unwrap();
if let Some(duration) = node
.probe
.as_ref()
.and_then(|p| p.audio_streams.as_ref())
.and_then(|a| a[0].duration.as_ref())
{
let duration_float = duration.clone().parse::<f64>().unwrap();
if node.out - node.seek > duration_float - node.seek + 0.1 {
chain.add_filter(&format!("apad=whole_dur={}", node.out - node.seek), "audio")
}
if node.out - node.seek > duration_float - node.seek + 0.1 {
chain.add_filter(&format!("apad=whole_dur={}", node.out - node.seek), "audio")
}
}
}
/// Add single pass loudnorm filter to audio line.
fn add_loudnorm(node: &mut Media, chain: &mut Filters, config: &GlobalConfig) {
if node.probe.is_some()
if config.processing.add_loudnorm
&& !node
.probe
.clone()
.unwrap()
.audio_streams
.unwrap()
.as_ref()
.and_then(|p| p.audio_streams.as_ref())
.unwrap_or(&vec![])
.is_empty()
&& config.processing.add_loudnorm
{
let loud_filter = a_loudnorm::filter_node(config);
chain.add_filter(&loud_filter, "audio");
@ -259,7 +277,7 @@ fn aspect_calc(aspect_string: &Option<String>, config: &GlobalConfig) -> f64 {
}
fn fps_calc(r_frame_rate: &str) -> f64 {
let frame_rate_vec: Vec<&str> = r_frame_rate.split('/').collect();
let frame_rate_vec = r_frame_rate.split('/').collect::<Vec<&str>>();
let rate: f64 = frame_rate_vec[0].parse().unwrap();
let factor: f64 = frame_rate_vec[1].parse().unwrap();
let fps: f64 = rate / factor;
@ -294,29 +312,24 @@ fn realtime_filter(node: &mut Media, chain: &mut Filters, config: &GlobalConfig,
pub fn filter_chains(config: &GlobalConfig, node: &mut Media) -> Vec<String> {
let mut filters = Filters::new();
let mut audio_map = "1:a".to_string();
filters.audio_map = Some(audio_map);
if let Some(probe) = node.probe.clone() {
if let Some(probe) = node.probe.as_ref() {
if probe.audio_streams.is_some() {
audio_map = "0:a".to_string();
filters.audio_map = Some(audio_map);
filters.audio_map = "0:a".to_string();
}
let v_stream = &probe.video_streams.unwrap()[0];
let aspect = aspect_calc(&v_stream.display_aspect_ratio, config);
let frame_per_sec = fps_calc(&v_stream.r_frame_rate);
if let Some(v_streams) = &probe.video_streams.as_ref() {
let v_stream = &v_streams[0];
let aspect = aspect_calc(&v_stream.display_aspect_ratio, config);
let frame_per_sec = fps_calc(&v_stream.r_frame_rate);
deinterlace(&v_stream.field_order, &mut filters);
pad(aspect, &mut filters, config);
fps(frame_per_sec, &mut filters, config);
scale(v_stream, aspect, &mut filters, config);
}
deinterlace(&v_stream.field_order, &mut filters);
pad(aspect, &mut filters, config);
fps(frame_per_sec, &mut filters, config);
scale(
v_stream.width.unwrap(),
v_stream.height.unwrap(),
aspect,
&mut filters,
config,
);
extend_video(node, &mut filters);
add_audio(node, &mut filters);
@ -339,8 +352,8 @@ pub fn filter_chains(config: &GlobalConfig, node: &mut Media) -> Vec<String> {
if let Some(v_filters) = filters.video_chain {
filter_str.push_str(v_filters.as_str());
filter_str.push_str(filters.video_map.clone().unwrap().as_str());
filter_map.append(&mut vec!["-map".to_string(), filters.video_map.unwrap()]);
filter_str.push_str(filters.video_map.clone().as_str());
filter_map.append(&mut vec!["-map".to_string(), filters.video_map]);
} else {
filter_map.append(&mut vec!["-map".to_string(), "0:v".to_string()]);
}
@ -350,10 +363,10 @@ pub fn filter_chains(config: &GlobalConfig, node: &mut Media) -> Vec<String> {
filter_str.push(';')
}
filter_str.push_str(a_filters.as_str());
filter_str.push_str(filters.audio_map.clone().unwrap().as_str());
filter_map.append(&mut vec!["-map".to_string(), filters.audio_map.unwrap()]);
filter_str.push_str(filters.audio_map.clone().as_str());
filter_map.append(&mut vec!["-map".to_string(), filters.audio_map]);
} else {
filter_map.append(&mut vec!["-map".to_string(), filters.audio_map.unwrap()]);
filter_map.append(&mut vec!["-map".to_string(), filters.audio_map]);
}
if filter_str.len() > 10 {

View File

@ -3,7 +3,7 @@ use std::{
path::Path,
process::exit,
sync::{
atomic::{AtomicUsize, Ordering},
atomic::{AtomicBool, AtomicUsize, Ordering},
mpsc::channel,
{Arc, Mutex},
},
@ -51,17 +51,15 @@ impl FolderSource {
for entry in WalkDir::new(config.storage.path.clone())
.into_iter()
.filter_map(|e| e.ok())
.flat_map(|e| e.ok())
.filter(|f| f.path().is_file())
{
if entry.path().is_file() {
let ext = file_extension(entry.path());
if ext.is_some()
&& config
.storage
.extensions
.clone()
.contains(&ext.unwrap().to_lowercase())
if let Some(ext) = file_extension(entry.path()) {
if config
.storage
.extensions
.clone()
.contains(&ext.to_lowercase())
{
let media = Media::new(0, entry.path().display().to_string(), false);
media_list.push(media);
@ -95,20 +93,21 @@ impl FolderSource {
fn shuffle(&mut self) {
let mut rng = thread_rng();
self.nodes.lock().unwrap().shuffle(&mut rng);
let mut nodes = self.nodes.lock().unwrap();
for (index, item) in self.nodes.lock().unwrap().iter_mut().enumerate() {
nodes.shuffle(&mut rng);
for (index, item) in nodes.iter_mut().enumerate() {
item.index = Some(index);
}
}
fn sort(&mut self) {
self.nodes
.lock()
.unwrap()
.sort_by(|d1, d2| d1.source.cmp(&d2.source));
let mut nodes = self.nodes.lock().unwrap();
for (index, item) in self.nodes.lock().unwrap().iter_mut().enumerate() {
nodes.sort_by(|d1, d2| d1.source.cmp(&d2.source));
for (index, item) in nodes.iter_mut().enumerate() {
item.index = Some(index);
}
}
@ -163,7 +162,11 @@ fn file_extension(filename: &Path) -> Option<&str> {
/// Create a watcher, which monitor file changes.
/// When a change is register, update the current file list.
/// This makes it possible, to play infinitely and and always new files to it.
pub fn watchman(config: GlobalConfig, sources: Arc<Mutex<Vec<Media>>>) {
pub fn watchman(
config: GlobalConfig,
is_terminated: Arc<AtomicBool>,
sources: Arc<Mutex<Vec<Media>>>,
) {
let (tx, rx) = channel();
let path = config.storage.path;
@ -176,7 +179,7 @@ pub fn watchman(config: GlobalConfig, sources: Arc<Mutex<Vec<Media>>>) {
let mut watcher = watcher(tx, Duration::from_secs(1)).unwrap();
watcher.watch(path, RecursiveMode::Recursive).unwrap();
loop {
while !is_terminated.load(Ordering::SeqCst) {
if let Ok(res) = rx.try_recv() {
match res {
Create(new_path) => {

View File

@ -40,7 +40,7 @@ pub fn source_generator(
let node_clone = folder_source.nodes.clone();
// Spawn a thread to monitor folder for file changes.
thread::spawn(move || watchman(config_clone, node_clone));
thread::spawn(move || watchman(config_clone, is_terminated.clone(), node_clone));
Box::new(folder_source) as Box<dyn Iterator<Item = Media>>
}

View File

@ -80,30 +80,28 @@ impl CurrentProgram {
} else if Path::new(&self.json_path.clone().unwrap()).is_file() {
let mod_time = modified_time(&self.json_path.clone().unwrap());
if !mod_time
.unwrap()
.to_string()
.eq(&self.json_mod.clone().unwrap())
{
// when playlist has changed, reload it
info!(
"Reload playlist <b><magenta>{}</></b>",
self.json_path.clone().unwrap()
);
if let Some(m) = mod_time {
if !m.to_string().eq(&self.json_mod.clone().unwrap()) {
// when playlist has changed, reload it
info!(
"Reload playlist <b><magenta>{}</></b>",
self.json_path.clone().unwrap()
);
let json = read_json(
&self.config,
self.json_path.clone(),
self.is_terminated.clone(),
false,
0.0,
);
let json = read_json(
&self.config,
self.json_path.clone(),
self.is_terminated.clone(),
false,
0.0,
);
self.json_mod = json.modified;
*self.nodes.lock().unwrap() = json.program;
self.json_mod = json.modified;
*self.nodes.lock().unwrap() = json.program;
self.get_current_clip();
self.index.fetch_add(1, Ordering::SeqCst);
self.get_current_clip();
self.index.fetch_add(1, Ordering::SeqCst);
}
}
} else {
error!(
@ -211,14 +209,14 @@ impl CurrentProgram {
// On init or reload we need to seek for the current clip.
fn get_current_clip(&mut self) {
let mut time_sec = self.get_current_time();
let shift = self.playout_stat.time_shift.lock().unwrap();
if *self.playout_stat.current_date.lock().unwrap()
== *self.playout_stat.date.lock().unwrap()
&& *self.playout_stat.time_shift.lock().unwrap() != 0.0
&& *shift != 0.0
{
let shift = *self.playout_stat.time_shift.lock().unwrap();
info!("Shift playlist start for <yellow>{shift}</> seconds");
time_sec += shift;
info!("Shift playlist start for <yellow>{}</> seconds", *shift);
time_sec += *shift;
}
for (i, item) in self.nodes.lock().unwrap().iter_mut().enumerate() {
@ -513,7 +511,7 @@ fn handle_list_end(mut node: Media, total_delta: f64) -> Media {
return node;
} else {
error!("Playlist is not long enough: <yellow>{total_delta:.2}</> seconds needed");
warn!("Playlist is not long enough: <yellow>{total_delta:.2}</> seconds needed");
}
node.process = Some(true);

View File

@ -5,6 +5,7 @@ use std::{
fs::{self, File},
path::PathBuf,
process::exit,
sync::{Arc, Mutex},
thread,
};
@ -23,8 +24,8 @@ mod utils;
use crate::output::{player, write_hls};
use crate::utils::{
generate_playlist, init_logging, validate_ffmpeg, GlobalConfig, PlayerControl, PlayoutStatus,
ProcessControl,
generate_playlist, init_logging, send_mail, validate_ffmpeg, GlobalConfig, PlayerControl,
PlayoutStatus, ProcessControl,
};
use rpc::json_rpc_server;
@ -70,8 +71,10 @@ fn main() {
let play_control = PlayerControl::new();
let playout_stat = PlayoutStatus::new();
let proc_control = ProcessControl::new();
let proc_ctl = proc_control.clone();
let messages = Arc::new(Mutex::new(Vec::new()));
let logging = init_logging(&config);
let logging = init_logging(&config, proc_ctl, messages.clone());
CombinedLogger::init(logging).unwrap();
validate_ffmpeg(&config);
@ -101,5 +104,11 @@ fn main() {
player(&config, play_control, playout_stat, proc_control);
}
let msg = messages.lock().unwrap();
if msg.len() > 0 {
send_mail(&config, msg.join("\n"));
}
info!("Playout done...");
}

View File

@ -17,8 +17,7 @@ pub use hls::write_hls;
use crate::input::{ingest_server, source_generator};
use crate::utils::{
sec_to_time, stderr_reader, Decoder, Encoder, GlobalConfig, PlayerControl, PlayoutStatus,
ProcessControl,
sec_to_time, stderr_reader, Decoder, GlobalConfig, PlayerControl, PlayoutStatus, ProcessControl,
};
use crate::vec_strings;
@ -202,9 +201,7 @@ pub fn player(
sleep(Duration::from_secs(1));
if let Err(e) = proc_control.kill(Encoder) {
error!("{e}")
}
proc_control.kill_all();
if let Err(e) = error_encoder_thread.join() {
error!("{e:?}");

View File

@ -1,7 +1,4 @@
use std::{
process,
process::{Command, Stdio},
};
use std::process::{self, Command, Stdio};
use simplelog::*;

View File

@ -71,12 +71,13 @@ pub fn json_rpc_server(
let mut time_shift = playout_stat.time_shift.lock().unwrap();
let current_date = playout_stat.current_date.lock().unwrap().clone();
let mut date = playout_stat.date.lock().unwrap();
let current_list = play_control.current_list.lock().unwrap();
// get next clip
if map.contains_key("control") && &map["control"] == "next" {
let index = play_control.index.load(Ordering::SeqCst);
if index < play_control.current_list.lock().unwrap().len() {
if index < current_list.len() {
if let Some(proc) = proc.decoder_term.lock().unwrap().as_mut() {
if let Err(e) = proc.kill() {
error!("Decoder {e:?}")
@ -89,7 +90,7 @@ pub fn json_rpc_server(
info!("Move to next clip");
let mut data_map = Map::new();
let mut media = play_control.current_list.lock().unwrap()[index].clone();
let mut media = current_list[index].clone();
media.add_probe();
let (delta, _) = get_delta(&config, &media.begin.unwrap_or(0.0));
@ -114,7 +115,7 @@ pub fn json_rpc_server(
if map.contains_key("control") && &map["control"] == "back" {
let index = play_control.index.load(Ordering::SeqCst);
if index > 1 && play_control.current_list.lock().unwrap().len() > 1 {
if index > 1 && current_list.len() > 1 {
if let Some(proc) = proc.decoder_term.lock().unwrap().as_mut() {
if let Err(e) = proc.kill() {
error!("Decoder {e:?}")
@ -126,8 +127,7 @@ pub fn json_rpc_server(
info!("Move to last clip");
let mut data_map = Map::new();
let mut media =
play_control.current_list.lock().unwrap()[index - 2].clone();
let mut media = current_list[index - 2].clone();
play_control.index.fetch_sub(2, Ordering::SeqCst);
media.add_probe();
@ -189,8 +189,8 @@ pub fn json_rpc_server(
if map.contains_key("media") && &map["media"] == "next" {
let index = play_control.index.load(Ordering::SeqCst);
if index < play_control.current_list.lock().unwrap().len() {
let media = play_control.current_list.lock().unwrap()[index].clone();
if index < current_list.len() {
let media = current_list[index].clone();
let data_map = get_data_map(&config, media);
@ -204,8 +204,8 @@ pub fn json_rpc_server(
if map.contains_key("media") && &map["media"] == "last" {
let index = play_control.index.load(Ordering::SeqCst);
if index > 1 && index - 2 < play_control.current_list.lock().unwrap().len() {
let media = play_control.current_list.lock().unwrap()[index - 2].clone();
if index > 1 && index - 2 < current_list.len() {
let media = current_list[index - 2].clone();
let data_map = get_data_map(&config, media);
@ -219,6 +219,8 @@ pub fn json_rpc_server(
Ok(Value::String("No, or wrong parameters set!".to_string()))
});
info!("Run JSON RPC server, listening on: <b><magenta>http://{addr}</></b>");
// build rpc server
let server = ServerBuilder::new(io)
.cors(DomainsValidation::AllowOnly(vec![
@ -230,7 +232,6 @@ pub fn json_rpc_server(
&& request.headers()["authorization"] == auth
{
if request.uri() == "/status" {
println!("{:?}", request.headers().contains_key("authorization"));
Response::ok("Server running OK.").into()
} else {
request.into()

View File

@ -1,4 +1,5 @@
use std::{
sync::{Arc, Mutex},
thread::{self, sleep},
time::Duration,
};
@ -27,18 +28,20 @@ fn playlist_change_at_midnight() {
config.playlist.day_start = "00:00:00".into();
config.playlist.length = "24:00:00".into();
config.logging.log_to_file = false;
let messages = Arc::new(Mutex::new(Vec::new()));
let play_control = PlayerControl::new();
let playout_stat = PlayoutStatus::new();
let proc_control = ProcessControl::new();
let proc_ctl = proc_control.clone();
let proc_ctl2 = proc_control.clone();
let logging = init_logging(&config);
let logging = init_logging(&config, proc_ctl, messages);
CombinedLogger::init(logging).unwrap();
mock_time::set_mock_time("2022-05-09T23:59:45");
thread::spawn(move || timed_kill(30, proc_ctl));
thread::spawn(move || timed_kill(30, proc_ctl2));
player(&config, play_control, playout_stat, proc_control);
}
@ -52,18 +55,20 @@ fn playlist_change_at_six() {
config.playlist.day_start = "06:00:00".into();
config.playlist.length = "24:00:00".into();
config.logging.log_to_file = false;
let messages = Arc::new(Mutex::new(Vec::new()));
let play_control = PlayerControl::new();
let playout_stat = PlayoutStatus::new();
let proc_control = ProcessControl::new();
let proc_ctl = proc_control.clone();
let proc_ctl2 = proc_control.clone();
let logging = init_logging(&config);
let logging = init_logging(&config, proc_ctl, messages);
CombinedLogger::init(logging).unwrap();
mock_time::set_mock_time("2022-05-09T05:59:45");
thread::spawn(move || timed_kill(30, proc_ctl));
thread::spawn(move || timed_kill(30, proc_ctl2));
player(&config, play_control, playout_stat, proc_control);
}

View File

@ -138,22 +138,25 @@ impl GlobalConfig {
/// Read config from YAML file, and set some extra config values.
pub fn new() -> Self {
let args = get_args();
let mut config_path = match env::current_exe() {
Ok(path) => path.parent().unwrap().join("ffplayout.yml"),
Err(_) => PathBuf::from("./ffplayout.yml"),
};
let mut config_path = PathBuf::from("/etc/ffplayout/ffplayout.yml");
if let Some(cfg) = args.config {
config_path = PathBuf::from(cfg);
} else if Path::new("/etc/ffplayout/ffplayout.yml").is_file() {
config_path = PathBuf::from("/etc/ffplayout/ffplayout.yml");
}
if !config_path.is_file() {
if Path::new("./assets/ffplayout.yml").is_file() {
config_path = PathBuf::from("./assets/ffplayout.yml")
} else if let Some(p) = env::current_exe().ok().as_ref().and_then(|op| op.parent()) {
config_path = p.join("ffplayout.yml")
};
}
let f = match File::open(&config_path) {
Ok(file) => file,
Err(err) => {
Err(_) => {
println!(
"{config_path:?} doesn't exists!\nPut \"ffplayout.yml\" in \"/etc/playout/\" or beside the executable!\n\nSystem error: {err}"
"{config_path:?} doesn't exists!\nPut \"ffplayout.yml\" in \"/etc/playout/\" or beside the executable!"
);
process::exit(0x0100);
}
@ -237,6 +240,7 @@ impl GlobalConfig {
if let Some(folder) = args.folder {
config.storage.path = folder;
config.processing.mode = "folder".into();
}
if let Some(start) = args.start {

View File

@ -3,7 +3,7 @@ extern crate simplelog;
use std::{
path::Path,
sync::{Arc, Mutex},
sync::{atomic::Ordering, Arc, Mutex},
thread::{self, sleep},
time::Duration,
};
@ -22,40 +22,47 @@ use log::{Level, LevelFilter, Log, Metadata, Record};
use regex::Regex;
use simplelog::*;
use crate::utils::GlobalConfig;
use crate::utils::{GlobalConfig, ProcessControl};
/// send log messages to mail recipient
fn send_mail(cfg: &GlobalConfig, msg: String) {
let email = Message::builder()
pub fn send_mail(cfg: &GlobalConfig, msg: String) {
if let Ok(email) = Message::builder()
.from(cfg.mail.sender_addr.parse().unwrap())
.to(cfg.mail.recipient.parse().unwrap())
.subject(cfg.mail.subject.clone())
.header(header::ContentType::TEXT_PLAIN)
.body(clean_string(&msg))
.unwrap();
{
let credentials =
Credentials::new(cfg.mail.sender_addr.clone(), cfg.mail.sender_pass.clone());
let credentials = Credentials::new(cfg.mail.sender_addr.clone(), cfg.mail.sender_pass.clone());
let mut transporter = SmtpTransport::relay(cfg.mail.smtp_server.clone().as_str());
let mut transporter = SmtpTransport::relay(cfg.mail.smtp_server.clone().as_str());
if cfg.mail.starttls {
transporter = SmtpTransport::starttls_relay(cfg.mail.smtp_server.clone().as_str())
}
if cfg.mail.starttls {
transporter = SmtpTransport::starttls_relay(cfg.mail.smtp_server.clone().as_str())
}
let mailer = transporter.unwrap().credentials(credentials).build();
let mailer = transporter.unwrap().credentials(credentials).build();
// Send the email
match mailer.send(&email) {
Ok(_) => (),
Err(e) => info!("Could not send email: {:?}", e),
// Send the email
if let Err(e) = mailer.send(&email) {
error!("Could not send email: {:?}", e)
}
} else {
error!("Mail Message failed!")
}
}
/// Basic Mail Queue
///
/// Check every give seconds for messages and send them.
fn mail_queue(cfg: GlobalConfig, messages: Arc<Mutex<Vec<String>>>, interval: u64) {
loop {
fn mail_queue(
cfg: GlobalConfig,
proc_ctl: ProcessControl,
messages: Arc<Mutex<Vec<String>>>,
interval: u64,
) {
while !proc_ctl.is_terminated.load(Ordering::SeqCst) {
if messages.lock().unwrap().len() > 0 {
let msg = messages.lock().unwrap().join("\n");
send_mail(&cfg, msg);
@ -72,6 +79,7 @@ pub struct LogMailer {
level: LevelFilter,
pub config: Config,
messages: Arc<Mutex<Vec<String>>>,
last_message: Arc<Mutex<String>>,
}
impl LogMailer {
@ -84,6 +92,7 @@ impl LogMailer {
level: log_level,
config,
messages,
last_message: Arc::new(Mutex::new(String::new())),
})
}
}
@ -99,9 +108,16 @@ impl Log for LogMailer {
let time_stamp = local.format("[%Y-%m-%d %H:%M:%S%.3f]");
let level = record.level().to_string().to_uppercase();
let rec = record.args().to_string();
let full_line: String = format!("{time_stamp} [{level: >5}] {rec}");
let mut last_msg = self.last_message.lock().unwrap();
self.messages.lock().unwrap().push(full_line);
// put message only to mail queue when it differs from last message
// this we do to prevent spamming the mail box
if *last_msg != rec {
*last_msg = rec.clone();
let full_line: String = format!("{time_stamp} [{level: >5}] {rec}");
self.messages.lock().unwrap().push(full_line);
}
}
}
@ -136,7 +152,11 @@ fn clean_string(text: &str) -> String {
/// - console logger
/// - file logger
/// - mail logger
pub fn init_logging(config: &GlobalConfig) -> Vec<Box<dyn SharedLogger>> {
pub fn init_logging(
config: &GlobalConfig,
proc_ctl: ProcessControl,
messages: Arc<Mutex<Vec<String>>>,
) -> Vec<Box<dyn SharedLogger>> {
let config_clone = config.clone();
let app_config = config.logging.clone();
let mut time_level = LevelFilter::Off;
@ -214,11 +234,10 @@ pub fn init_logging(config: &GlobalConfig) -> Vec<Box<dyn SharedLogger>> {
// set mail logger only the recipient is set in config
if config.mail.recipient.contains('@') && config.mail.recipient.contains('.') {
let messages: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
let messages_clone = messages.clone();
let interval = config.mail.interval;
thread::spawn(move || mail_queue(config_clone, messages_clone, interval));
thread::spawn(move || mail_queue(config_clone, proc_ctl, messages_clone, interval));
let mail_config = log_config.build();

View File

@ -1,17 +1,13 @@
use chrono::prelude::*;
use chrono::Duration;
use ffprobe::{ffprobe, Format, Stream};
use std::{
fs,
fs::metadata,
fs::{self, metadata},
io::{BufRead, BufReader, Error},
path::Path,
process::exit,
process::{ChildStderr, Command, Stdio},
time,
time::UNIX_EPOCH,
process::{exit, ChildStderr, Command, Stdio},
time::{self, UNIX_EPOCH},
};
use chrono::{prelude::*, Duration};
use ffprobe::{ffprobe, Format, Stream};
use regex::Regex;
use serde::{Deserialize, Serialize};
use serde_json::json;
@ -31,7 +27,7 @@ pub use controller::{PlayerControl, PlayoutStatus, ProcessControl, ProcessUnit::
pub use generator::generate_playlist;
pub use json_serializer::{read_json, Playlist, DUMMY_LEN};
pub use json_validate::validate_playlist;
pub use logging::init_logging;
pub use logging::{init_logging, send_mail};
use crate::filter::filter_chains;
@ -73,16 +69,19 @@ pub struct Media {
impl Media {
pub fn new(index: usize, src: String, do_probe: bool) -> Self {
let mut duration: f64 = 0.0;
let mut duration = 0.0;
let mut probe = None;
if do_probe && Path::new(&src).is_file() {
probe = Some(MediaProbe::new(src.clone()));
duration = match probe.clone().unwrap().format.unwrap().duration {
Some(dur) => dur.parse().unwrap(),
None => 0.0,
};
if let Some(dur) = probe
.as_ref()
.and_then(|p| p.format.as_ref())
.and_then(|f| f.duration.as_ref())
{
duration = dur.parse().unwrap()
}
}
Self {
@ -107,14 +106,17 @@ impl Media {
let probe = MediaProbe::new(self.source.clone());
self.probe = Some(probe.clone());
if self.duration == 0.0 {
let duration = match probe.format.unwrap().duration {
Some(dur) => dur.parse().unwrap(),
None => 0.0,
};
if let Some(dur) = probe
.format
.and_then(|f| f.duration)
.map(|d| d.parse().unwrap())
.filter(|d| !is_close(*d, self.duration, 0.5))
{
self.duration = dur;
self.out = duration;
self.duration = duration;
if self.out == 0.0 {
self.out = dur;
}
}
}
}
@ -136,25 +138,22 @@ pub struct MediaProbe {
impl MediaProbe {
fn new(input: String) -> Self {
let probe = ffprobe(&input);
let mut a_stream: Vec<Stream> = vec![];
let mut v_stream: Vec<Stream> = vec![];
let mut a_stream = vec![];
let mut v_stream = vec![];
match probe {
Ok(obj) => {
for stream in obj.streams {
let cp_stream = stream.clone();
match cp_stream.codec_type {
Some(codec_type) => {
if codec_type == "audio" {
a_stream.push(stream)
} else if codec_type == "video" {
v_stream.push(stream)
}
}
_ => {
error!("No codec type found for stream: {stream:?}")
if let Some(c_type) = cp_stream.codec_type {
match c_type.as_str() {
"audio" => a_stream.push(stream),
"video" => v_stream.push(stream),
_ => {}
}
} else {
error!("No codec type found for stream: {stream:?}")
}
}
@ -191,15 +190,13 @@ impl MediaProbe {
///
/// The status file is init in main function and mostly modified in RPC server.
pub fn write_status(config: &GlobalConfig, date: &str, shift: f64) {
let stat_file = config.general.stat_file.clone();
let data = json!({
"time_shift": shift,
"date": date,
});
let status_data: String = serde_json::to_string(&data).expect("Serialize status data failed");
if let Err(e) = fs::write(stat_file, &status_data) {
if let Err(e) = fs::write(&config.general.stat_file, &status_data) {
error!("Unable to write file: {e:?}")
};
}
@ -238,9 +235,7 @@ pub fn get_date(seek: bool, start: f64, next_start: f64) -> String {
/// Get file modification time.
pub fn modified_time(path: &str) -> Option<DateTime<Local>> {
let metadata = metadata(path).unwrap();
if let Ok(time) = metadata.modified() {
if let Ok(time) = metadata(path).and_then(|metadata| metadata.modified()) {
let date_time: DateTime<Local> = time.into();
return Some(date_time);
}
@ -318,7 +313,7 @@ pub fn get_delta(config: &GlobalConfig, begin: &f64) -> (f64, f64) {
/// Check if clip in playlist is in sync with global time.
pub fn check_sync(config: &GlobalConfig, delta: f64) -> bool {
if delta.abs() > config.general.stop_threshold && config.general.stop_threshold > 0.0 {
error!("Clip begin out of sync for <yellow>{}</> seconds", delta);
error!("Clip begin out of sync for <yellow>{delta:.3}</> seconds. Stop playout!");
return false;
}
@ -523,10 +518,11 @@ pub mod mock_time {
}
pub fn set_mock_time(date_time: &str) {
let date_obj = NaiveDateTime::parse_from_str(date_time, "%Y-%m-%dT%H:%M:%S");
let time = Local.from_local_datetime(&date_obj.unwrap()).unwrap();
if let Ok(d) = NaiveDateTime::parse_from_str(date_time, "%Y-%m-%dT%H:%M:%S") {
let time = Local.from_local_datetime(&d).unwrap();
DATE_TIME_DIFF.with(|cell| *cell.borrow_mut() = Some(Local::now() - time));
DATE_TIME_DIFF.with(|cell| *cell.borrow_mut() = Some(Local::now() - time));
}
}
}