Merge pull request #121 from jb-alvarado/master

updates and optimizations
This commit is contained in:
jb-alvarado 2022-05-07 22:28:39 +02:00 committed by GitHub
commit 6f66cc5533
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 713 additions and 389 deletions

142
Cargo.lock generated
View File

@ -88,15 +88,15 @@ dependencies = [
"libc",
"num-integer",
"num-traits",
"time",
"time 0.1.43",
"winapi 0.3.9",
]
[[package]]
name = "clap"
version = "3.1.12"
version = "3.1.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7c167e37342afc5f33fd87bbc870cedd020d2a6dffa05d45ccd9241fbdd146db"
checksum = "85a35a599b11c089a7f49105658d089b8f2cf0882993c17daf6de15285c2c35d"
dependencies = [
"atty",
"bitflags",
@ -124,9 +124,9 @@ dependencies = [
[[package]]
name = "clap_lex"
version = "0.1.1"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "189ddd3b5d32a70b35e7686054371742a937b0d99128e76dde6340210e966669"
checksum = "a37c35f1112dad5e6e0b1adaff798507497a18fceeb30cceb3bae7d1427b9213"
dependencies = [
"os_str_bytes",
]
@ -196,7 +196,7 @@ dependencies = [
[[package]]
name = "ffplayout-engine"
version = "0.9.4"
version = "0.9.5"
dependencies = [
"chrono",
"clap",
@ -216,6 +216,7 @@ dependencies = [
"serde_yaml",
"shlex",
"simplelog",
"time 0.3.9",
"walkdir",
]
@ -232,8 +233,7 @@ dependencies = [
[[package]]
name = "file-rotate"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8071df7315b1cd4006ce687043f393cca212761889b3626c1444ae06e8f72d0"
source = "git+https://github.com/jb-alvarado/file-rotate.git#ae5062a5b82626b4d1f9fea2a17325fe1d160d4c"
dependencies = [
"chrono",
"flate2",
@ -466,9 +466,9 @@ dependencies = [
[[package]]
name = "http"
version = "0.2.6"
version = "0.2.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "31f4c6746584866f0feabcc69893c5b51beef3831656a968ed7ae254cdc4fd03"
checksum = "ff8670570af52249509a86f5e3e18a08c60b177071826898fde8997cf5f6bfbb"
dependencies = [
"bytes",
"fnv",
@ -488,9 +488,9 @@ dependencies = [
[[package]]
name = "httparse"
version = "1.7.0"
version = "1.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6330e8a36bd8c859f3fa6d9382911fbb7147ec39807f63b923933a247240b9ba"
checksum = "496ce29bb5a52785b44e0f7ca2847ae0bb839c9bd28f69acac9b99d461c0c04c"
[[package]]
name = "httpdate"
@ -659,9 +659,9 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55"
[[package]]
name = "lettre"
version = "0.10.0-rc.5"
version = "0.10.0-rc.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5144148f337be14dabfc0f0d85b691a68ac6c77ef22a5c47c5504b70a7c9fcf3"
checksum = "2f6c70001f7ee6c93b6687a06607c7a38f9a7ae460139a496c23da21e95bc289"
dependencies = [
"base64",
"email-encoding",
@ -680,9 +680,9 @@ dependencies = [
[[package]]
name = "libc"
version = "0.2.124"
version = "0.2.125"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "21a41fed9d98f27ab1c6d161da622a4fa35e8a54a8adc24bbf3ddd0ef70b0e50"
checksum = "5916d2ae698f6de9bfb891ad7a8d65c09d232dc58cc4ac433c7da3b2fd84bc2b"
[[package]]
name = "linked-hash-map"
@ -702,9 +702,9 @@ dependencies = [
[[package]]
name = "log"
version = "0.4.16"
version = "0.4.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6389c490849ff5bc16be905ae24bc913a9c8892e19b2341dbc175e14c341c2b8"
checksum = "abb12e687cfb44aa40f41fc3978ef76448f9b6038cad6aef4259d3c095a2382e"
dependencies = [
"cfg-if 1.0.0",
]
@ -723,9 +723,9 @@ checksum = "a3e378b66a060d48947b590737b30a1be76706c8dd7b8ba0f2fe3989c68a853f"
[[package]]
name = "memchr"
version = "2.4.1"
version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "308cc39be01b73d0d18f82a0e7b2a3df85245f84af96fdddc5d202d27e47b86a"
checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d"
[[package]]
name = "mime"
@ -882,9 +882,9 @@ dependencies = [
[[package]]
name = "num-integer"
version = "0.1.44"
version = "0.1.45"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2cc698a63b549a70bc047073d2949cce27cd1c7b0a4a862d08a8031bc2801db"
checksum = "225d3389fb3509a24c93f5c29eb6bde2586b98d9f016636dff58d7c6f7569cd9"
dependencies = [
"autocfg",
"num-traits",
@ -892,9 +892,9 @@ dependencies = [
[[package]]
name = "num-traits"
version = "0.2.14"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a64b1ec5cda2586e284722486d802acf1f7dbdc623e2bfc57e65ca1cd099290"
checksum = "578ede34cf02f8924ab9447f50c28075b4d3e5b269972345e7e0372b38c6cdcd"
dependencies = [
"autocfg",
]
@ -909,6 +909,15 @@ dependencies = [
"libc",
]
[[package]]
name = "num_threads"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2819ce041d2ee131036f4fc9d6ae7ae125a3a40e97ba64d04fe799ad9dabbb44"
dependencies = [
"libc",
]
[[package]]
name = "once_cell"
version = "1.10.0"
@ -917,18 +926,30 @@ checksum = "87f3e037eac156d1775da914196f0f37741a274155e34a0b7e427c35d2a2ecb9"
[[package]]
name = "openssl"
version = "0.10.38"
version = "0.10.40"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c7ae222234c30df141154f159066c5093ff73b63204dcda7121eb082fc56a95"
checksum = "fb81a6430ac911acb25fe5ac8f1d2af1b4ea8a4fdfda0f1ee4292af2e2d8eb0e"
dependencies = [
"bitflags",
"cfg-if 1.0.0",
"foreign-types",
"libc",
"once_cell",
"openssl-macros",
"openssl-sys",
]
[[package]]
name = "openssl-macros"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b501e44f11665960c7e7fcf062c7d96a14ade4aa98116c004b2e37b5be7d736c"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "openssl-probe"
version = "0.1.5"
@ -946,9 +967,9 @@ dependencies = [
[[package]]
name = "openssl-sys"
version = "0.9.72"
version = "0.9.73"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7e46109c383602735fa0a2e48dd2b7c892b048e1bf69e5c3b1d804b7d9c203cb"
checksum = "9d5fd19fb3e0a8191c1e34935718976a3e70c112ab9a24af6d7cadccd9d90bc0"
dependencies = [
"autocfg",
"cc",
@ -966,9 +987,9 @@ checksum = "8e22443d1643a904602595ba1cd8f7d896afe56d26712531c5ff73a15b2fbf64"
[[package]]
name = "paris"
version = "1.5.11"
version = "1.5.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c69d19a208bba8b94bd27d4b7a06ad153cddc6b88cb2149a668e23ce7bdb67d5"
checksum = "2eaf2319cd71dd9ff38c72bebde61b9ea657134abcf26ae4205f54f772a32810"
[[package]]
name = "parking_lot"
@ -997,9 +1018,9 @@ dependencies = [
[[package]]
name = "pin-project-lite"
version = "0.2.8"
version = "0.2.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e280fbe77cc62c91527259e9442153f4688736748d24660126286329742b4c6c"
checksum = "e0a7ae3ac2f1173085d398531c705756c94a4c56843785df85a60c1a0afac116"
[[package]]
name = "pin-utils"
@ -1188,18 +1209,18 @@ dependencies = [
[[package]]
name = "serde"
version = "1.0.136"
version = "1.0.137"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ce31e24b01e1e524df96f1c2fdd054405f8d7376249a5110886fb4b658484789"
checksum = "61ea8d54c77f8315140a05f4c7237403bf38b72704d031543aa1d16abbf517d1"
dependencies = [
"serde_derive",
]
[[package]]
name = "serde_derive"
version = "1.0.136"
version = "1.0.137"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "08597e7152fcd306f41838ed3e37be9eaeed2b61c42e2117266a554fab4662f9"
checksum = "1f26faba0c3959972377d3b2d306ee9f71faee9714294e41bb777f83f88578be"
dependencies = [
"proc-macro2",
"quote",
@ -1208,9 +1229,9 @@ dependencies = [
[[package]]
name = "serde_json"
version = "1.0.79"
version = "1.0.81"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e8d9fa5c3b304765ce1fd9c4c8a3de2c8db365a5b91be52f186efc675681d95"
checksum = "9b7ce2b32a1aed03c558dc61a5cd328f15aff2dbc17daad8fb8af04d2100e15c"
dependencies = [
"itoa",
"ryu",
@ -1219,9 +1240,9 @@ dependencies = [
[[package]]
name = "serde_yaml"
version = "0.8.23"
version = "0.8.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a4a521f2940385c165a24ee286aa8599633d162077a54bdcae2a6fd5a7bfa7a0"
checksum = "707d15895415db6628332b737c838b88c598522e4dc70647e59b72312924aebc"
dependencies = [
"indexmap",
"ryu",
@ -1237,14 +1258,14 @@ checksum = "43b2853a4d09f215c24cc5489c992ce46052d359b5109343cbafbf26bc62f8a3"
[[package]]
name = "simplelog"
version = "0.11.2"
version = "0.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c1348164456f72ca0116e4538bdaabb0ddb622c7d9f16387c725af3e96d6001c"
checksum = "48dfff04aade74dd495b007c831cd6f4e0cee19c344dd9dc0884c0289b70a786"
dependencies = [
"chrono",
"log",
"paris",
"termcolor",
"time 0.3.9",
]
[[package]]
@ -1277,9 +1298,9 @@ checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623"
[[package]]
name = "syn"
version = "1.0.91"
version = "1.0.92"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b683b2b825c8eef438b77c36a06dc262294da3d5a5813fac20da149241dcd44d"
checksum = "7ff7c592601f11445996a06f8ad0c27f094a58857c2f89e97974ab9235b92c52"
dependencies = [
"proc-macro2",
"quote",
@ -1325,6 +1346,24 @@ dependencies = [
"winapi 0.3.9",
]
[[package]]
name = "time"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c2702e08a7a860f005826c6815dcac101b19b5eb330c27fe4a5928fec1d20ddd"
dependencies = [
"itoa",
"libc",
"num_threads",
"time-macros",
]
[[package]]
name = "time-macros"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "42657b1a6f4d817cda8e7a0ace261fe0cc946cf3a80314390b22cc61ae080792"
[[package]]
name = "tinyvec"
version = "1.6.0"
@ -1342,15 +1381,16 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c"
[[package]]
name = "tokio"
version = "1.17.0"
version = "1.18.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2af73ac49756f3f7c01172e34a23e5d0216f6c32333757c2c61feb2bbff5a5ee"
checksum = "dce653fb475565de9f6fb0614b28bca8df2c430c0cf84bcd9c843f15de5414cc"
dependencies = [
"bytes",
"libc",
"memchr",
"mio 0.8.2",
"num_cpus",
"once_cell",
"pin-project-lite",
"socket2",
"winapi 0.3.9",
@ -1424,9 +1464,9 @@ dependencies = [
[[package]]
name = "unicode-bidi"
version = "0.3.7"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a01404663e3db436ed2746d9fefef640d868edae3cceb81c3b8d5732fda678f"
checksum = "099b7128301d285f79ddd55b9a83d5e6b9e97c92e0ea0daebee7263e932de992"
[[package]]
name = "unicode-normalization"
@ -1439,9 +1479,9 @@ dependencies = [
[[package]]
name = "unicode-xid"
version = "0.2.2"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ccb82d61f80a663efe1f787a51b16b5a51e3314d6ac365b08639f52387b33f3"
checksum = "957e51f3646910546462e67d5f7599b9e4fb8acdd304b087a6494730f9eebf04"
[[package]]
name = "vcpkg"

View File

@ -4,7 +4,7 @@ description = "24/7 playout based on rust and ffmpeg"
license = "GPL-3.0"
authors = ["Jonathan Baecker jonbae77@gmail.com"]
readme = "README.md"
version = "0.9.4"
version = "0.9.5"
edition = "2021"
[dependencies]
@ -12,9 +12,9 @@ chrono = "0.4"
clap = { version = "3.1", features = ["derive"] }
crossbeam-channel = "0.5"
ffprobe = "0.3"
file-rotate = "0.6"
file-rotate = { git = "https://github.com/jb-alvarado/file-rotate.git" }
jsonrpc-http-server = "18.0"
lettre = "0.10.0-rc.5"
lettre = "0.10.0-rc.6"
log = "0.4"
notify = "4.0"
once_cell = "1.10"
@ -24,7 +24,8 @@ serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
serde_yaml = "0.8"
shlex = "1.1"
simplelog = { version = "^0.11", features = ["paris"] }
simplelog = { version = "^0.12", features = ["paris"] }
time = { version = "0.3", features = ["formatting", "macros"] }
walkdir = "2"
[target.x86_64-unknown-linux-musl.dependencies]
@ -48,6 +49,7 @@ license-file = ["LICENSE", "0"]
depends = ""
suggests = "ffmpeg"
copyright = "Copyright (c) 2022, Jonathan Baecker. All rights reserved."
conf-files = ["/etc/ffplayout/ffplayout.yml"]
assets = [
["target/x86_64-unknown-linux-musl/release/ffplayout", "/usr/bin/ffplayout", "755"],
["assets/ffplayout.yml", "/etc/ffplayout/ffplayout.yml", "644"],
@ -62,7 +64,7 @@ name = "ffplayout-engine"
license = "GPL-3.0"
assets = [
{ source = "target/x86_64-unknown-linux-musl/release/ffplayout", dest = "/usr/bin/ffplayout", mode = "755" },
{ source = "assets/ffplayout.yml", dest = "/etc/ffplayout/ffplayout.yml", mode = "644" },
{ source = "assets/ffplayout.yml", dest = "/etc/ffplayout/ffplayout.yml", mode = "644", config = true },
{ source = "assets/ffplayout-engine.service", dest = "/lib/systemd/system/ffplayout-engine.service", mode = "644" },
{ source = "README.md", dest = "/usr/share/doc/ffplayout-engine/README", mode = "644", doc = true },
{ source = "LICENSE", dest = "/usr/share/doc/ffplayout-engine/LICENSE", mode = "644" },

View File

@ -1,6 +1,6 @@
[Unit]
Description=Rust based 24/7 playout solution
After=network.target
After=network.target remote-fs.target
[Service]
ExecStart= /usr/bin/ffplayout

View File

@ -62,6 +62,7 @@ processing:
logo_opacity: 0.7
logo_filter: overlay=W-w-12:12
add_loudnorm: false
loudnorm_ingest: false
loud_i: -18
loud_tp: -1.5
loud_lra: 11

11
src/filter/a_loudnorm.rs Normal file
View File

@ -0,0 +1,11 @@
use crate::utils::GlobalConfig;
/// Loudnorm Audio Filter
///
/// Add loudness normalization.
pub fn filter_node(config: &GlobalConfig) -> String {
format!(
",loudnorm=I={}:TP={}:LRA={}",
config.processing.loud_i, config.processing.loud_tp, config.processing.loud_lra
)
}

View File

@ -0,0 +1,53 @@
use crate::filter::{a_loudnorm, v_overlay};
use crate::utils::GlobalConfig;
/// Audio Filter
///
/// If needed we add audio filters to the server instance.
fn audio_filter(config: &GlobalConfig) -> String {
let mut audio_chain = ";[0:a]afade=in:st=0:d=0.5".to_string();
if config.processing.loudnorm_ingest {
audio_chain.push_str(&a_loudnorm::filter_node(config));
}
if config.processing.volume != 1.0 {
audio_chain.push_str(format!(",volume={}", config.processing.volume).as_str());
}
audio_chain.push_str("[aout1]");
audio_chain
}
/// Create filter nodes for ingest live stream.
pub fn filter_cmd() -> Vec<String> {
let config = GlobalConfig::global();
let mut filter = format!(
"[0:v]fps={},scale={}:{},setdar=dar={},fade=in:st=0:d=0.5",
config.processing.fps,
config.processing.width,
config.processing.height,
config.processing.aspect
);
let overlay = v_overlay::filter_node(config, true);
if !overlay.is_empty() {
filter.push(',');
}
filter.push_str(&overlay);
filter.push_str("[vout1]");
filter.push_str(audio_filter(config).as_str());
vec![
"-filter_complex".to_string(),
filter,
"-map".to_string(),
"[vout1]".to_string(),
"-map".to_string(),
"[aout1]".to_string(),
]
}

View File

@ -2,7 +2,10 @@ use std::path::Path;
use simplelog::*;
pub mod a_loudnorm;
pub mod ingest_filter;
pub mod v_drawtext;
pub mod v_overlay;
use crate::utils::{get_delta, is_close, GlobalConfig, Media};
@ -28,7 +31,7 @@ impl Filters {
match codec_type {
"audio" => match &self.audio_chain {
Some(ac) => {
if filter.starts_with(";") || filter.starts_with("[") {
if filter.starts_with(';') || filter.starts_with('[') {
self.audio_chain = Some(format!("{ac}{filter}"))
} else {
self.audio_chain = Some(format!("{ac},{filter}"))
@ -46,7 +49,7 @@ impl Filters {
},
"video" => match &self.video_chain {
Some(vc) => {
if filter.starts_with(";") || filter.starts_with("[") {
if filter.starts_with(';') || filter.starts_with('[') {
self.video_chain = Some(format!("{vc}{filter}"))
} else {
self.video_chain = Some(format!("{vc},{filter}"))
@ -62,9 +65,9 @@ impl Filters {
}
}
fn deinterlace(field_order: Option<String>, chain: &mut Filters) {
fn deinterlace(field_order: &Option<String>, chain: &mut Filters) {
if let Some(order) = field_order {
if &order != "progressive" {
if order != "progressive" {
chain.add_filter("yadif=0:-1:0", "video")
}
}
@ -94,10 +97,7 @@ fn pad(aspect: f64, chain: &mut Filters, config: &GlobalConfig) {
fn fps(fps: f64, chain: &mut Filters, config: &GlobalConfig) {
if fps != config.processing.fps {
chain.add_filter(
&format!("fps={}", config.processing.fps),
"video",
)
chain.add_filter(&format!("fps={}", config.processing.fps), "video")
}
}
@ -113,10 +113,7 @@ fn scale(width: i64, height: i64, aspect: f64, chain: &mut Filters, config: &Glo
}
if !is_close(aspect, config.processing.aspect, 0.03) {
chain.add_filter(
&format!("setdar=dar={}", config.processing.aspect),
"video"
)
chain.add_filter(&format!("setdar=dar={}", config.processing.aspect), "video")
}
}
@ -142,17 +139,9 @@ fn fade(node: &mut Media, chain: &mut Filters, codec_type: &str) {
fn overlay(node: &mut Media, chain: &mut Filters, config: &GlobalConfig) {
if config.processing.add_logo
&& Path::new(&config.processing.logo).is_file()
&& &node.category.clone().unwrap_or(String::new()) != "advertisement"
&& &node.category.clone().unwrap_or_default() != "advertisement"
{
let opacity = format!(
"format=rgba,colorchannelmixer=aa={}",
config.processing.logo_opacity
);
let logo_loop = "loop=loop=-1:size=1:start=0";
let mut logo_chain = format!(
"null[v];movie={},{logo_loop},{opacity}",
config.processing.logo
);
let mut logo_chain = v_overlay::filter_node(config, false);
if node.last_ad.unwrap() {
logo_chain.push_str(",fade=in:st=0:d=1.0:alpha=1")
@ -173,7 +162,7 @@ fn overlay(node: &mut Media, chain: &mut Filters, config: &GlobalConfig) {
fn extend_video(node: &mut Media, chain: &mut Filters) {
let video_streams = node.probe.clone().unwrap().video_streams.unwrap();
if video_streams.len() > 0 {
if !video_streams.is_empty() {
if let Some(duration) = &video_streams[0].duration {
let duration_float = duration.clone().parse::<f64>().unwrap();
@ -190,16 +179,15 @@ fn extend_video(node: &mut Media, chain: &mut Filters) {
}
}
/// add drawtext filter for lower thirds messages
fn add_text(node: &mut Media, chain: &mut Filters, config: &GlobalConfig) {
// add drawtext filter for lower thirds messages
if config.text.add_text && config.text.over_pre {
let filter = v_drawtext::filter_node(node);
chain.add_filter(&filter, "video");
if let Some(filters) = &chain.video_chain {
for (i, f) in filters.split(",").enumerate() {
for (i, f) in filters.split(',').enumerate() {
if f.contains("drawtext") && !config.text.text_from_filename {
debug!("drawtext node is on index: <yellow>{i}</>");
break;
@ -211,7 +199,7 @@ fn add_text(node: &mut Media, chain: &mut Filters, config: &GlobalConfig) {
fn add_audio(node: &mut Media, chain: &mut Filters) {
let audio_streams = node.probe.clone().unwrap().audio_streams.unwrap();
if audio_streams.len() == 0 {
if audio_streams.is_empty() {
warn!("Clip: '{}' has no audio!", node.source);
let audio = format!(
"aevalsrc=0:channel_layout=stereo:duration={}:sample_rate=48000",
@ -223,55 +211,54 @@ fn add_audio(node: &mut Media, chain: &mut Filters) {
fn extend_audio(node: &mut Media, chain: &mut Filters) {
let audio_streams = node.probe.clone().unwrap().audio_streams.unwrap();
if audio_streams.len() > 0 {
if !audio_streams.is_empty() {
if let Some(duration) = &audio_streams[0].duration {
let duration_float = duration.clone().parse::<f64>().unwrap();
if node.out - node.seek > duration_float - node.seek + 0.1 {
chain.add_filter(
&format!("apad=whole_dur={}", node.out - node.seek),
"audio",
)
chain.add_filter(&format!("apad=whole_dur={}", node.out - node.seek), "audio")
}
}
}
}
/// Add single pass loudnorm filter to audio line.
fn add_loudnorm(node: &mut Media, chain: &mut Filters, config: &GlobalConfig) {
// add single pass loudnorm filter to audio line
if node.probe.is_some()
&& node.probe.clone().unwrap().audio_streams.unwrap().len() > 0
&& !node
.probe
.clone()
.unwrap()
.audio_streams
.unwrap()
.is_empty()
&& config.processing.add_loudnorm
{
let loud_filter = format!(
"loudnorm=I={}:TP={}:LRA={}",
config.processing.loud_i, config.processing.loud_tp, config.processing.loud_lra
);
let loud_filter = a_loudnorm::filter_node(config);
chain.add_filter(&loud_filter, "audio");
}
}
fn audio_volume(chain: &mut Filters, config: &GlobalConfig) {
if config.processing.volume != 1.0 {
chain.add_filter(
&format!("volume={}", config.processing.volume),
"audio",
)
chain.add_filter(&format!("volume={}", config.processing.volume), "audio")
}
}
fn aspect_calc(aspect_string: String) -> f64 {
let aspect_vec: Vec<&str> = aspect_string.split(':').collect();
let w: f64 = aspect_vec[0].parse().unwrap();
let h: f64 = aspect_vec[1].parse().unwrap();
let source_aspect: f64 = w as f64 / h as f64;
fn aspect_calc(aspect_string: &Option<String>, config: &GlobalConfig) -> f64 {
let mut source_aspect = config.processing.aspect;
if let Some(aspect) = aspect_string {
let aspect_vec: Vec<&str> = aspect.split(':').collect();
let w: f64 = aspect_vec[0].parse().unwrap();
let h: f64 = aspect_vec[1].parse().unwrap();
source_aspect = w as f64 / h as f64;
}
source_aspect
}
fn fps_calc(r_frame_rate: String) -> f64 {
fn fps_calc(r_frame_rate: &str) -> f64 {
let frame_rate_vec: Vec<&str> = r_frame_rate.split('/').collect();
let rate: f64 = frame_rate_vec[0].parse().unwrap();
let factor: f64 = frame_rate_vec[1].parse().unwrap();
@ -280,14 +267,8 @@ fn fps_calc(r_frame_rate: String) -> f64 {
fps
}
fn realtime_filter(
node: &mut Media,
chain: &mut Filters,
config: &GlobalConfig,
codec_type: &str,
) {
// this realtime filter is important for HLS output to stay in sync
/// This realtime filter is important for HLS output to stay in sync.
fn realtime_filter(node: &mut Media, chain: &mut Filters, config: &GlobalConfig, codec_type: &str) {
let mut t = "";
if codec_type == "audio" {
@ -325,18 +306,18 @@ pub fn filter_chains(node: &mut Media) -> Vec<String> {
}
let v_stream = &probe.video_streams.unwrap()[0];
let aspect = aspect_calc(v_stream.display_aspect_ratio.clone().unwrap());
let frame_per_sec = fps_calc(v_stream.r_frame_rate.clone());
let aspect = aspect_calc(&v_stream.display_aspect_ratio, config);
let frame_per_sec = fps_calc(&v_stream.r_frame_rate);
deinterlace(v_stream.field_order.clone(), &mut filters);
pad(aspect, &mut filters, &config);
fps(frame_per_sec, &mut filters, &config);
deinterlace(&v_stream.field_order, &mut filters);
pad(aspect, &mut filters, config);
fps(frame_per_sec, &mut filters, config);
scale(
v_stream.width.unwrap(),
v_stream.height.unwrap(),
aspect,
&mut filters,
&config,
config,
);
extend_video(node, &mut filters);
@ -344,15 +325,15 @@ pub fn filter_chains(node: &mut Media) -> Vec<String> {
extend_audio(node, &mut filters);
}
add_text(node, &mut filters, &config);
fade(node, &mut filters, "video".into());
overlay(node, &mut filters, &config);
realtime_filter(node, &mut filters, &config, "video".into());
add_text(node, &mut filters, config);
fade(node, &mut filters, "video");
overlay(node, &mut filters, config);
realtime_filter(node, &mut filters, config, "video");
add_loudnorm(node, &mut filters, &config);
fade(node, &mut filters, "audio".into());
audio_volume(&mut filters, &config);
realtime_filter(node, &mut filters, &config, "audio".into());
add_loudnorm(node, &mut filters, config);
fade(node, &mut filters, "audio");
audio_volume(&mut filters, config);
realtime_filter(node, &mut filters, config, "audio");
let mut filter_cmd = vec![];
let mut filter_str: String = String::new();
@ -368,7 +349,7 @@ pub fn filter_chains(node: &mut Media) -> Vec<String> {
if let Some(a_filters) = filters.audio_chain {
if filter_str.len() > 10 {
filter_str.push_str(";")
filter_str.push(';')
}
filter_str.push_str(a_filters.as_str());
filter_str.push_str(filters.audio_map.clone().unwrap().as_str());

View File

@ -24,14 +24,14 @@ pub fn filter_node(node: &mut Media) -> String {
};
let escape = text
.replace("'", "'\\\\\\''")
.replace("%", "\\\\\\%")
.replace(":", "\\:");
.replace('\'', "'\\\\\\''")
.replace('%', "\\\\\\%")
.replace(':', "\\:");
filter = format!("drawtext=text='{escape}':{}{font}", config.text.style)
} else {
filter = format!(
"zmq=b=tcp\\\\://'{}',drawtext=text=''{font}",
config.text.bind_address.replace(":", "\\:")
config.text.bind_address.replace(':', "\\:")
)
}
}

30
src/filter/v_overlay.rs Normal file
View File

@ -0,0 +1,30 @@
use std::path::Path;
use crate::utils::GlobalConfig;
/// Overlay Filter
///
/// When a logo is set, we create here the filter for the server.
pub fn filter_node(config: &GlobalConfig, add_tail: bool) -> String {
let mut logo_chain = String::new();
if config.processing.add_logo && Path::new(&config.processing.logo).is_file() {
let opacity = format!(
"format=rgba,colorchannelmixer=aa={}",
config.processing.logo_opacity
);
let logo_loop = "loop=loop=-1:size=1:start=0";
logo_chain = format!(
"null[v];movie={},{logo_loop},{opacity}",
config.processing.logo
);
if add_tail {
logo_chain.push_str(
format!("[l];[v][l]{}:shortest=1", config.processing.logo_filter).as_str(),
);
}
}
logo_chain
}

View File

@ -21,15 +21,18 @@ use walkdir::WalkDir;
use crate::utils::{get_sec, GlobalConfig, Media};
/// Folder Sources
///
/// Like playlist source, we create here a folder list for iterate over it.
#[derive(Debug, Clone)]
pub struct Source {
pub struct FolderSource {
config: GlobalConfig,
pub nodes: Arc<Mutex<Vec<Media>>>,
current_node: Media,
index: Arc<AtomicUsize>,
}
impl Source {
impl FolderSource {
pub fn new(current_list: Arc<Mutex<Vec<Media>>>, global_index: Arc<AtomicUsize>) -> Self {
let config = GlobalConfig::global();
let mut media_list = vec![];
@ -90,12 +93,9 @@ impl Source {
fn shuffle(&mut self) {
let mut rng = thread_rng();
self.nodes.lock().unwrap().shuffle(&mut rng);
let mut index: usize = 0;
for item in self.nodes.lock().unwrap().iter_mut() {
for (index, item) in self.nodes.lock().unwrap().iter_mut().enumerate() {
item.index = Some(index);
index += 1;
}
}
@ -104,17 +104,15 @@ impl Source {
.lock()
.unwrap()
.sort_by(|d1, d2| d1.source.cmp(&d2.source));
let mut index: usize = 0;
for item in self.nodes.lock().unwrap().iter_mut() {
for (index, item) in self.nodes.lock().unwrap().iter_mut().enumerate() {
item.index = Some(index);
index += 1;
}
}
}
impl Iterator for Source {
/// Create iterator for folder source
impl Iterator for FolderSource {
type Item = Media;
fn next(&mut self) -> Option<Self::Item> {
@ -159,6 +157,9 @@ fn file_extension(filename: &Path) -> Option<&str> {
filename.extension().and_then(OsStr::to_str)
}
/// Create a watcher, which monitor file changes.
/// When a change is register, update the current file list.
/// This makes it possible, to play infinitely and and always new files to it.
pub fn watchman(sources: Arc<Mutex<Vec<Media>>>) {
let config = GlobalConfig::global();
let (tx, rx) = channel();

View File

@ -1,6 +1,5 @@
use std::{
io::{BufReader, Error, Read},
path::Path,
process::{Command, Stdio},
sync::atomic::Ordering,
thread,
@ -9,48 +8,12 @@ use std::{
use crossbeam_channel::Sender;
use simplelog::*;
use crate::filter::ingest_filter::filter_cmd;
use crate::utils::{stderr_reader, GlobalConfig, Ingest, ProcessControl};
fn overlay(config: &GlobalConfig) -> String {
let mut logo_chain = String::new();
if config.processing.add_logo && Path::new(&config.processing.logo).is_file() {
let opacity = format!(
"format=rgba,colorchannelmixer=aa={}",
config.processing.logo_opacity
);
let logo_loop = "loop=loop=-1:size=1:start=0";
logo_chain = format!("[v];movie={},{logo_loop},{opacity}", config.processing.logo);
logo_chain
.push_str(format!("[l];[v][l]{}:shortest=1", config.processing.logo_filter).as_str());
}
logo_chain
}
fn audio_filter(config: &GlobalConfig) -> String {
let mut audio_chain = ";[0:a]afade=in:st=0:d=0.5".to_string();
if config.processing.add_loudnorm {
audio_chain.push_str(
format!(
",loudnorm=I={}:TP={}:LRA={}",
config.processing.loud_i, config.processing.loud_tp, config.processing.loud_lra
)
.as_str(),
);
}
if config.processing.volume != 1.0 {
audio_chain.push_str(format!(",volume={}", config.processing.volume).as_str());
}
audio_chain.push_str("[aout1]");
audio_chain
}
/// ffmpeg Ingest Server
///
/// Start ffmpeg in listen mode, and wait for input.
pub fn ingest_server(
log_format: String,
ingest_sender: Sender<(usize, [u8; 65088])>,
@ -58,32 +21,14 @@ pub fn ingest_server(
) -> Result<(), Error> {
let config = GlobalConfig::global();
let mut buffer: [u8; 65088] = [0; 65088];
let mut filter = format!(
"[0:v]fps={},scale={}:{},setdar=dar={},fade=in:st=0:d=0.5",
config.processing.fps,
config.processing.width,
config.processing.height,
config.processing.aspect
);
filter.push_str(&overlay(&config));
filter.push_str("[vout1]");
filter.push_str(audio_filter(&config).as_str());
let mut filter_list = vec![
"-filter_complex",
&filter,
"-map",
"[vout1]",
"-map",
"[aout1]",
];
let filter_list = filter_cmd();
let mut server_cmd = vec!["-hide_banner", "-nostats", "-v", log_format.as_str()];
let stream_input = config.ingest.input_cmd.clone().unwrap();
let stream_settings = config.processing.settings.clone().unwrap();
server_cmd.append(&mut stream_input.iter().map(String::as_str).collect());
server_cmd.append(&mut filter_list);
server_cmd.append(&mut filter_list.iter().map(String::as_str).collect());
server_cmd.append(&mut stream_settings.iter().map(String::as_str).collect());
let mut is_running;
@ -116,7 +61,6 @@ pub fn ingest_server(
let error_reader_thread = thread::spawn(move || stderr_reader(server_err, "Server"));
*proc_control.server_term.lock().unwrap() = Some(server_proc);
is_running = false;
loop {
@ -146,7 +90,9 @@ pub fn ingest_server(
}
drop(ingest_reader);
proc_control.server_is_running.store(false, Ordering::SeqCst);
proc_control
.server_is_running
.store(false, Ordering::SeqCst);
if let Err(e) = proc_control.wait(Ingest) {
error!("{e}")

View File

@ -1,6 +1,9 @@
use std::{
process,
sync::{Arc, Mutex, atomic::{AtomicBool, AtomicUsize}},
sync::{
atomic::{AtomicBool, AtomicUsize},
Arc, Mutex,
},
thread,
};
@ -12,10 +15,11 @@ pub mod folder;
pub mod ingest;
pub mod playlist;
pub use folder::{watchman, Source};
pub use folder::{watchman, FolderSource};
pub use ingest::ingest_server;
pub use playlist::CurrentProgram;
/// Create a source iterator from playlist, or from folder.
pub fn source_generator(
config: GlobalConfig,
current_list: Arc<Mutex<Vec<Media>>>,
@ -23,31 +27,31 @@ pub fn source_generator(
playout_stat: PlayoutStatus,
is_terminated: Arc<AtomicBool>,
) -> Box<dyn Iterator<Item = Media>> {
let get_source = match config.processing.clone().mode.as_str() {
let get_source = match config.processing.mode.as_str() {
"folder" => {
info!("Playout in folder mode");
debug!("Monitor folder: <b><magenta>{}</></b>", &config.storage.path);
debug!(
"Monitor folder: <b><magenta>{}</></b>",
&config.storage.path
);
let folder_source = Source::new(current_list, index);
let folder_source = FolderSource::new(current_list, index);
let node_clone = folder_source.nodes.clone();
// Spawn a thread to monitor folder for file changes.
thread::spawn(move || watchman(node_clone));
Box::new(folder_source) as Box<dyn Iterator<Item = Media>>
}
"playlist" => {
info!("Playout in playlist mode");
let program = CurrentProgram::new(
playout_stat,
is_terminated.clone(),
current_list,
index,
);
let program = CurrentProgram::new(playout_stat, is_terminated, current_list, index);
Box::new(program) as Box<dyn Iterator<Item = Media>>
}
_ => {
error!("Process Mode not exists!");
process::exit(0x0100);
process::exit(1);
}
};

View File

@ -1,7 +1,10 @@
use std::{
fs,
path::Path,
sync::{atomic::{AtomicBool, AtomicUsize, Ordering}, Arc, Mutex},
sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc, Mutex,
},
};
use serde_json::json;
@ -12,6 +15,9 @@ use crate::utils::{
seek_and_length, GlobalConfig, Media, PlayoutStatus, DUMMY_LEN,
};
/// Struct for current playlist.
///
/// Here we prepare the init clip and build a iterator where we pull our clips.
#[derive(Debug)]
pub struct CurrentProgram {
config: GlobalConfig,
@ -63,6 +69,7 @@ impl CurrentProgram {
}
}
// Check if playlist file got updated, and when yes we reload it and setup everything in place.
fn check_update(&mut self, seek: bool) {
if self.json_path.is_none() {
let json = read_json(None, self.is_terminated.clone(), seek, 0.0);
@ -115,15 +122,16 @@ impl CurrentProgram {
}
}
// Check if day is past and it is time for a new playlist.
fn check_for_next_playlist(&mut self) {
let current_time = get_sec();
let start_sec = self.config.playlist.start_sec.unwrap();
let target_length = self.config.playlist.length_sec.unwrap();
let (delta, total_delta) = get_delta(&current_time);
let mut duration = self.current_node.out.clone();
let mut duration = self.current_node.out;
if self.current_node.duration > self.current_node.out {
duration = self.current_node.duration.clone()
duration = self.current_node.duration
}
let next_start = self.current_node.begin.unwrap() - start_sec + duration + delta;
@ -158,32 +166,27 @@ impl CurrentProgram {
}
}
// Check if last and/or next clip is a advertisement.
fn last_next_ad(&mut self) {
let index = self.index.load(Ordering::SeqCst);
let current_list = self.nodes.lock().unwrap();
if index + 1 < current_list.len()
&& &current_list[index + 1]
.category
.clone()
.unwrap_or(String::new())
== "advertisement"
&& &current_list[index + 1].category.clone().unwrap_or_default() == "advertisement"
{
self.current_node.next_ad = Some(true);
}
if index > 0
&& index < current_list.len()
&& &current_list[index - 1]
.category
.clone()
.unwrap_or(String::new())
== "advertisement"
&& &current_list[index - 1].category.clone().unwrap_or_default() == "advertisement"
{
self.current_node.last_ad = Some(true);
}
}
// Get current time and when we are before start time,
// we add full seconds of a day to it.
fn get_current_time(&mut self) -> f64 {
let mut time_sec = get_sec();
@ -194,6 +197,7 @@ impl CurrentProgram {
time_sec
}
// On init or reload we need to seek for the current clip.
fn get_current_clip(&mut self) {
let mut time_sec = self.get_current_time();
@ -216,6 +220,7 @@ impl CurrentProgram {
}
}
// Prepare init clip.
fn init_clip(&mut self) {
self.get_current_clip();
@ -232,6 +237,7 @@ impl CurrentProgram {
}
}
/// Build the playlist iterator
impl Iterator for CurrentProgram {
type Item = Media;
@ -245,7 +251,7 @@ impl Iterator for CurrentProgram {
}
if self.playout_stat.list_init.load(Ordering::SeqCst) {
// on init load playlist, could be not long enough,
// On init load, playlist could be not long enough,
// so we check if we can take the next playlist already,
// or we fill the gap with a dummy.
let list_length = self.nodes.lock().unwrap().len();
@ -281,7 +287,8 @@ impl Iterator for CurrentProgram {
self.current_node = gen_source(media);
self.nodes.lock().unwrap().push(self.current_node.clone());
self.index.store(self.nodes.lock().unwrap().len(), Ordering::SeqCst);
self.index
.store(self.nodes.lock().unwrap().len(), Ordering::SeqCst);
}
}
@ -314,7 +321,7 @@ impl Iterator for CurrentProgram {
Some(self.current_node.clone())
} else {
let last_playlist = self.json_path.clone();
let last_ad = self.current_node.last_ad.clone();
let last_ad = self.current_node.last_ad;
self.check_for_next_playlist();
let (_, total_delta) = get_delta(&self.config.playlist.start_sec.unwrap());
@ -357,22 +364,22 @@ impl Iterator for CurrentProgram {
}
}
/// Prepare input clip:
///
/// - check begin and length from clip
/// - return clip only if we are in 24 hours time range
fn timed_source(
node: Media,
config: &GlobalConfig,
last: bool,
playout_stat: &PlayoutStatus,
) -> Media {
// prepare input clip
// check begin and length from clip
// return clip only if we are in 24 hours time range
let (delta, total_delta) = get_delta(&node.begin.unwrap());
let mut shifted_delta = delta;
let mut new_node = node.clone();
new_node.process = Some(false);
if config.playlist.length.contains(":") {
if config.playlist.length.contains(':') {
let time_shift = playout_stat.time_shift.lock().unwrap();
if *playout_stat.current_date.lock().unwrap() == *playout_stat.date.lock().unwrap()
@ -398,7 +405,7 @@ fn timed_source(
if (total_delta > node.out - node.seek && !last)
|| node.index.unwrap() < 2
|| !config.playlist.length.contains(":")
|| !config.playlist.length.contains(':')
{
// when we are in the 24 hour range, get the clip
new_node = gen_source(node);
@ -412,6 +419,7 @@ fn timed_source(
new_node
}
/// Generate the source CMD, or when clip not exist, get a dummy.
fn gen_source(mut node: Media) -> Media {
if Path::new(&node.source).is_file() {
node.add_probe();
@ -440,10 +448,9 @@ fn gen_source(mut node: Media) -> Media {
node
}
/// Handle init clip, but this clip can be the last one in playlist,
/// this we have to figure out and calculate the right length.
fn handle_list_init(mut node: Media) -> Media {
// handle init clip, but this clip can be the last one in playlist,
// this we have to figure out and calculate the right length
let (_, total_delta) = get_delta(&node.begin.unwrap());
let mut out = node.out;
@ -452,16 +459,13 @@ fn handle_list_init(mut node: Media) -> Media {
}
node.out = out;
let new_node = gen_source(node);
new_node
gen_source(node)
}
/// when we come to last clip in playlist,
/// or when we reached total playtime,
/// we end up here
fn handle_list_end(mut node: Media, total_delta: f64) -> Media {
// when we come to last clip in playlist,
// or when we reached total playtime,
// we end up here
debug!("Playlist end");
let mut out = if node.seek > 0.0 {

View File

@ -2,11 +2,10 @@ extern crate log;
extern crate simplelog;
use std::{
{fs, fs::File},
fs::{self, File},
path::PathBuf,
process::exit,
thread,
};
use serde::{Deserialize, Serialize};
@ -32,26 +31,26 @@ struct StatusData {
date: String,
}
fn main() {
init_config();
let config = GlobalConfig::global();
let play_control = PlayerControl::new();
let playout_stat = PlayoutStatus::new();
let proc_control = ProcessControl::new();
if !PathBuf::from(config.general.stat_file.clone()).exists() {
/// Here we create a status file in temp folder.
/// We need this for reading/saving program status.
/// For example when we skip a playing file,
/// we save the time difference, so we stay in sync.
///
/// When file not exists we create it, and when it exists we get its values.
fn status_file(stat_file: &str, playout_stat: &PlayoutStatus) {
if !PathBuf::from(stat_file).exists() {
let data = json!({
"time_shift": 0.0,
"date": String::new(),
});
let json: String = serde_json::to_string(&data).expect("Serialize status data failed");
fs::write(config.general.stat_file.clone(), &json).expect("Unable to write file");
fs::write(stat_file, &json).expect("Unable to write file");
} else {
let stat_file = File::options()
.read(true)
.write(false)
.open(&config.general.stat_file)
.open(&stat_file)
.expect("Could not open status file");
let data: StatusData =
@ -60,13 +59,24 @@ fn main() {
*playout_stat.time_shift.lock().unwrap() = data.time_shift;
*playout_stat.date.lock().unwrap() = data.date;
}
}
fn main() {
// Init the config, set process controller, create logging.
init_config();
let config = GlobalConfig::global();
let play_control = PlayerControl::new();
let playout_stat = PlayoutStatus::new();
let proc_control = ProcessControl::new();
let logging = init_logging();
CombinedLogger::init(logging).unwrap();
validate_ffmpeg();
status_file(&config.general.stat_file, &playout_stat);
if let Some(range) = config.general.generate.clone() {
// run a simple playlist generator and save them to disk
generate_playlist(range);
exit(0);
@ -77,16 +87,15 @@ fn main() {
let proc_ctl = proc_control.clone();
if config.rpc_server.enable {
thread::spawn( move || json_rpc_server(
play_ctl,
play_stat,
proc_ctl,
));
// If RPC server is enable we also fire up a JSON RPC server.
thread::spawn(move || json_rpc_server(play_ctl, play_stat, proc_ctl));
}
if &config.out.mode.to_lowercase() == "hls" {
// write files/playlist to HLS m3u8 playlist
write_hls(play_control, playout_stat, proc_control);
} else {
// play on desktop or stream to a remote target
player(play_control, playout_stat, proc_control);
}

View File

@ -5,6 +5,9 @@ use simplelog::*;
use crate::filter::v_drawtext;
use crate::utils::{GlobalConfig, Media};
/// Desktop Output
///
/// Instead of streaming, we run a ffplay instance and play on desktop.
pub fn output(log_format: &str) -> process::Child {
let config = GlobalConfig::global();

View File

@ -18,26 +18,129 @@ out:
*/
use std::{
io::BufReader,
io::{BufRead, BufReader, Error},
process::{Command, Stdio},
thread,
sync::atomic::Ordering,
thread::{self, sleep},
time::Duration,
};
use simplelog::*;
use crate::filter::ingest_filter::filter_cmd;
use crate::input::source_generator;
use crate::utils::{
sec_to_time, stderr_reader, GlobalConfig, PlayerControl, PlayoutStatus, ProcessControl,
sec_to_time, stderr_reader, Decoder, GlobalConfig, Ingest, PlayerControl, PlayoutStatus,
ProcessControl,
};
fn format_line(line: String, level: &str) -> String {
line.replace(&format!("[{level: >5}] "), "")
}
/// Ingest Server for HLS
fn ingest_to_hls_server(
playout_stat: PlayoutStatus,
mut proc_control: ProcessControl,
) -> Result<(), Error> {
let config = GlobalConfig::global();
let dec_settings = config.out.clone().output_cmd.unwrap();
let playlist_init = playout_stat.list_init;
let filter_list = filter_cmd();
let mut server_cmd = vec!["-hide_banner", "-nostats", "-v", "level+info"];
let stream_input = config.ingest.input_cmd.clone().unwrap();
server_cmd.append(&mut stream_input.iter().map(String::as_str).collect());
server_cmd.append(&mut filter_list.iter().map(String::as_str).collect());
server_cmd.append(&mut dec_settings.iter().map(String::as_str).collect());
let mut is_running;
info!(
"Start ingest server, listening on: <b><magenta>{}</></b>",
stream_input.last().unwrap()
);
debug!(
"Server CMD: <bright-blue>\"ffmpeg {}\"</>",
server_cmd.join(" ")
);
loop {
let mut server_proc = match Command::new("ffmpeg")
.args(server_cmd.clone())
.stderr(Stdio::piped())
.spawn()
{
Err(e) => {
error!("couldn't spawn ingest server: {e}");
panic!("couldn't spawn ingest server: {e}")
}
Ok(proc) => proc,
};
let server_err = BufReader::new(server_proc.stderr.take().unwrap());
*proc_control.server_term.lock().unwrap() = Some(server_proc);
is_running = false;
for line in server_err.lines() {
let line = line?;
if !is_running {
proc_control.server_is_running.store(true, Ordering::SeqCst);
playlist_init.store(true, Ordering::SeqCst);
is_running = true;
info!("Switch from {} to live ingest", config.processing.mode);
if let Err(e) = proc_control.kill(Decoder) {
error!("{e}");
}
}
if line.contains("[error]")
&& !line.contains("Input/output error")
&& !line.contains("Broken pipe")
{
error!(
"<bright black>[server]</> {}",
format_line(line.clone(), "error")
);
}
}
info!("Switch from live ingest to {}", config.processing.mode);
proc_control
.server_is_running
.store(false, Ordering::SeqCst);
if let Err(e) = proc_control.wait(Ingest) {
error!("{e}")
}
if proc_control.is_terminated.load(Ordering::SeqCst) {
break;
}
}
Ok(())
}
/// HLS Writer
///
/// Write with single ffmpeg instance directly to a HLS playlist.
pub fn write_hls(
play_control: PlayerControl,
playout_stat: PlayoutStatus,
proc_control: ProcessControl,
mut proc_control: ProcessControl,
) {
let config = GlobalConfig::global();
let dec_settings = config.out.clone().output_cmd.unwrap();
let ff_log_format = format!("level+{}", config.logging.ffmpeg_level.to_lowercase());
let play_stat = playout_stat.clone();
let proc_control_c = proc_control.clone();
let get_source = source_generator(
config.clone(),
@ -47,6 +150,11 @@ pub fn write_hls(
proc_control.is_terminated.clone(),
);
// spawn a thread for ffmpeg ingest server and create a channel for package sending
if config.ingest.enable {
thread::spawn(move || ingest_to_hls_server(play_stat, proc_control_c));
}
for node in get_source {
*play_control.current_media.lock().unwrap() = Some(node.clone());
@ -93,14 +201,18 @@ pub fn write_hls(
};
let dec_err = BufReader::new(dec_proc.stderr.take().unwrap());
let error_decoder_thread = thread::spawn(move || stderr_reader(dec_err, "Writer"));
*proc_control.decoder_term.lock().unwrap() = Some(dec_proc);
if let Err(e) = dec_proc.wait() {
error!("Writer: {e}")
if let Err(e) = stderr_reader(dec_err, "Writer") {
error!("{e:?}")
};
if let Err(e) = error_decoder_thread.join() {
error!("{e:?}");
};
if let Err(e) = proc_control.wait(Decoder) {
error!("{e}");
}
while proc_control.server_is_running.load(Ordering::SeqCst) {
sleep(Duration::from_secs(1));
}
}
}

View File

@ -21,6 +21,15 @@ use crate::utils::{
ProcessControl,
};
/// Player
///
/// Here we create the input file loop, from playlist, or folder source.
/// Then we read the stdout from the reader ffmpeg instance
/// and write it to the stdin from the streamer ffmpeg instance.
/// If it is configured we also fire up a ffmpeg ingest server instance,
/// for getting live feeds.
/// When a live ingest arrive, it stops the current playing and switch to the live source.
/// When ingest stops, it switch back to playlist/folder mode.
pub fn player(
play_control: PlayerControl,
playout_stat: PlayoutStatus,
@ -33,6 +42,7 @@ pub fn player(
let mut live_on = false;
let playlist_init = playout_stat.list_init.clone();
// get source iterator
let get_source = source_generator(
config.clone(),
play_control.current_list.clone(),
@ -41,6 +51,7 @@ pub fn player(
proc_control.is_terminated.clone(),
);
// get ffmpeg output instance
let mut enc_proc = match config.out.mode.as_str() {
"desktop" => desktop::output(&ff_log_format),
"stream" => stream::output(&ff_log_format),
@ -49,16 +60,20 @@ pub fn player(
let mut enc_writer = BufWriter::new(enc_proc.stdin.take().unwrap());
let enc_err = BufReader::new(enc_proc.stderr.take().unwrap());
// spawn a thread to log ffmpeg output error messages
let error_encoder_thread = thread::spawn(move || stderr_reader(enc_err, "Encoder"));
*proc_control.decoder_term.lock().unwrap() = Some(enc_proc);
let (ingest_sender, ingest_receiver) = bounded(96);
let ff_log_format_c = ff_log_format.clone();
let proc_control_c = proc_control.clone();
let mut ingest_receiver = None;
// spawn a thread for ffmpeg ingest server and create a channel for package sending
if config.ingest.enable {
let (ingest_sender, rx) = bounded(96);
ingest_receiver = Some(rx);
thread::spawn(move || ingest_server(ff_log_format_c, ingest_sender, proc_control_c));
}
@ -95,6 +110,7 @@ pub fn player(
dec_cmd.join(" ")
);
// create ffmpeg decoder instance, for reading the input files
let mut dec_proc = match Command::new("ffmpeg")
.args(dec_cmd)
.stdout(Stdio::piped())
@ -115,6 +131,7 @@ pub fn player(
*proc_control.decoder_term.lock().unwrap() = Some(dec_proc);
loop {
// when server is running, read from channel
if proc_control.server_is_running.load(Ordering::SeqCst) {
if !live_on {
info!("Switch from {} to live ingest", config.processing.mode);
@ -131,13 +148,14 @@ pub fn player(
playlist_init.store(true, Ordering::SeqCst);
}
for rx in ingest_receiver.try_iter() {
for rx in ingest_receiver.as_ref().unwrap().try_iter() {
if let Err(e) = enc_writer.write(&rx.1[..rx.0]) {
error!("Encoder write error: {:?}", e);
break 'source_iter;
};
}
// read from decoder instance
} else {
if live_on {
info!("Switch from live ingest to {}", config.processing.mode);

View File

@ -8,6 +8,9 @@ use simplelog::*;
use crate::filter::v_drawtext;
use crate::utils::{GlobalConfig, Media};
/// Streaming Output
///
/// Prepare the ffmpeg command for streaming output
pub fn output(log_format: &str) -> process::Child {
let config = GlobalConfig::global();
let mut enc_filter: Vec<String> = vec![];
@ -51,7 +54,10 @@ pub fn output(log_format: &str) -> process::Child {
enc_cmd.append(&mut preview);
enc_cmd.append(&mut output_cmd.iter().map(String::as_str).collect());
debug!("Encoder CMD: <bright-blue>\"ffmpeg {}\"</>", enc_cmd.join(" "));
debug!(
"Encoder CMD: <bright-blue>\"ffmpeg {}\"</>",
enc_cmd.join(" ")
);
let enc_proc = match Command::new("ffmpeg")
.args(enc_cmd)

View File

@ -13,6 +13,7 @@ use crate::utils::{
PlayoutStatus, ProcessControl,
};
/// map media struct to json object
fn get_media_map(media: Media) -> Value {
json!({
"seek": media.seek,
@ -23,6 +24,7 @@ fn get_media_map(media: Media) -> Value {
})
}
/// prepare json object for response
fn get_data_map(config: &GlobalConfig, media: Media) -> Map<String, Value> {
let mut data_map = Map::new();
let begin = media.begin.unwrap_or(0.0);
@ -45,6 +47,14 @@ fn get_data_map(config: &GlobalConfig, media: Media) -> Map<String, Value> {
data_map
}
/// JSON RPC Server
///
/// A simple rpc server for getting status information and controlling player:
///
/// - current clip information
/// - jump to next clip
/// - get last clip
/// - reset player state to original clip
pub fn json_rpc_server(
play_control: PlayerControl,
playout_stat: PlayoutStatus,
@ -52,7 +62,6 @@ pub fn json_rpc_server(
) {
let config = GlobalConfig::global();
let mut io = IoHandler::default();
let play = play_control.clone();
let proc = proc_control.clone();
io.add_sync_method("player", move |params: Params| {
@ -61,10 +70,11 @@ pub fn json_rpc_server(
let current_date = playout_stat.current_date.lock().unwrap().clone();
let mut date = playout_stat.date.lock().unwrap();
// get next clip
if map.contains_key("control") && &map["control"] == "next" {
let index = play.index.load(Ordering::SeqCst);
let index = play_control.index.load(Ordering::SeqCst);
if index < play.current_list.lock().unwrap().len() {
if index < play_control.current_list.lock().unwrap().len() {
if let Some(proc) = proc.decoder_term.lock().unwrap().as_mut() {
if let Err(e) = proc.kill() {
error!("Decoder {e:?}")
@ -77,7 +87,7 @@ pub fn json_rpc_server(
info!("Move to next clip");
let mut data_map = Map::new();
let mut media = play.current_list.lock().unwrap()[index].clone();
let mut media = play_control.current_list.lock().unwrap()[index].clone();
media.add_probe();
let (delta, _) = get_delta(&media.begin.unwrap_or(0.0));
@ -98,10 +108,11 @@ pub fn json_rpc_server(
return Ok(Value::String("Last clip can not be skipped".to_string()));
}
// get last clip
if map.contains_key("control") && &map["control"] == "back" {
let index = play.index.load(Ordering::SeqCst);
let index = play_control.index.load(Ordering::SeqCst);
if index > 1 && play.current_list.lock().unwrap().len() > 1 {
if index > 1 && play_control.current_list.lock().unwrap().len() > 1 {
if let Some(proc) = proc.decoder_term.lock().unwrap().as_mut() {
if let Err(e) = proc.kill() {
error!("Decoder {e:?}")
@ -113,8 +124,9 @@ pub fn json_rpc_server(
info!("Move to last clip");
let mut data_map = Map::new();
let mut media = play.current_list.lock().unwrap()[index - 2].clone();
play.index.fetch_sub(2, Ordering::SeqCst);
let mut media =
play_control.current_list.lock().unwrap()[index - 2].clone();
play_control.index.fetch_sub(2, Ordering::SeqCst);
media.add_probe();
let (delta, _) = get_delta(&media.begin.unwrap_or(0.0));
@ -135,6 +147,7 @@ pub fn json_rpc_server(
return Ok(Value::String("Clip index out of range".to_string()));
}
// reset player state
if map.contains_key("control") && &map["control"] == "reset" {
if let Some(proc) = proc.decoder_term.lock().unwrap().as_mut() {
if let Err(e) = proc.kill() {
@ -161,19 +174,21 @@ pub fn json_rpc_server(
return Ok(Value::String("Reset playout state failed".to_string()));
}
// get infos about current clip
if map.contains_key("media") && &map["media"] == "current" {
if let Some(media) = play.current_media.lock().unwrap().clone() {
if let Some(media) = play_control.current_media.lock().unwrap().clone() {
let data_map = get_data_map(config, media);
return Ok(Value::Object(data_map));
};
}
// get infos about next clip
if map.contains_key("media") && &map["media"] == "next" {
let index = play.index.load(Ordering::SeqCst);
let index = play_control.index.load(Ordering::SeqCst);
if index < play.current_list.lock().unwrap().len() {
let media = play.current_list.lock().unwrap()[index].clone();
if index < play_control.current_list.lock().unwrap().len() {
let media = play_control.current_list.lock().unwrap()[index].clone();
let data_map = get_data_map(config, media);
@ -183,11 +198,12 @@ pub fn json_rpc_server(
return Ok(Value::String("There is no next clip".to_string()));
}
// get infos about last clip
if map.contains_key("media") && &map["media"] == "last" {
let index = play.index.load(Ordering::SeqCst);
let index = play_control.index.load(Ordering::SeqCst);
if index > 1 && index - 2 < play.current_list.lock().unwrap().len() {
let media = play.current_list.lock().unwrap()[index - 2].clone();
if index > 1 && index - 2 < play_control.current_list.lock().unwrap().len() {
let media = play_control.current_list.lock().unwrap()[index - 2].clone();
let data_map = get_data_map(config, media);
@ -201,10 +217,12 @@ pub fn json_rpc_server(
Ok(Value::String("No, or wrong parameters set!".to_string()))
});
// build rpc server
let server = ServerBuilder::new(io)
.cors(DomainsValidation::AllowOnly(vec![
AccessControlAllowOrigin::Null,
]))
// add middleware, for authentication
.request_middleware(|request: hyper::Request<hyper::Body>| {
if request.headers().contains_key("authorization")
&& request.headers()["authorization"] == config.rpc_server.authorization
@ -223,7 +241,7 @@ pub fn json_rpc_server(
.start_http(&config.rpc_server.address.parse().unwrap())
.expect("Unable to start RPC server");
*proc_control.rpc_handle.lock().unwrap() = Some(server.close_handle().clone());
*proc_control.rpc_handle.lock().unwrap() = Some(server.close_handle());
server.wait();
}

View File

@ -17,7 +17,7 @@ pub struct Args {
long,
help = "Generate playlist for date. Date-range is possible, like: 2022-01-01 - 2022-01-10.",
name = "YYYY-MM-DD",
multiple_values=true
multiple_values = true
)]
pub generate: Option<Vec<String>>,
@ -54,8 +54,7 @@ pub struct Args {
pub volume: Option<f64>,
}
/// Get arguments from command line, and return them.
pub fn get_args() -> Args {
let args = Args::parse();
args
Args::parse()
}

View File

@ -12,6 +12,9 @@ use shlex::split;
use crate::utils::{get_args, time_to_sec};
/// Global Config
///
/// This we init ones, when ffplayout is starting and use them globally in the hole program.
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct GlobalConfig {
pub general: General,
@ -78,6 +81,7 @@ pub struct Processing {
pub logo_opacity: f32,
pub logo_filter: String,
pub add_loudnorm: bool,
pub loudnorm_ingest: bool,
pub loud_i: f32,
pub loud_tp: f32,
pub loud_lra: f32,
@ -132,6 +136,7 @@ pub struct Out {
}
impl GlobalConfig {
/// Read config from YAML file, and set some extra config values.
fn new() -> Self {
let args = get_args();
let mut config_path = match env::current_exe() {
@ -149,8 +154,7 @@ impl GlobalConfig {
Ok(file) => file,
Err(err) => {
println!(
"{config_path:?} doesn't exists!\n{}\n\nSystem error: {err}",
"Put \"ffplayout.yml\" in \"/etc/playout/\" or beside the executable!"
"{config_path:?} doesn't exists!\nPut \"ffplayout.yml\" in \"/etc/playout/\" or beside the executable!\n\nSystem error: {err}"
);
process::exit(0x0100);
}
@ -167,12 +171,13 @@ impl GlobalConfig {
let bitrate = config.processing.width * config.processing.height / 10;
config.playlist.start_sec = Some(time_to_sec(&config.playlist.day_start));
if config.playlist.length.contains(":") {
if config.playlist.length.contains(':') {
config.playlist.length_sec = Some(time_to_sec(&config.playlist.length));
} else {
config.playlist.length_sec = Some(86400.0);
}
// We set the decoder settings here, so we only define them ones.
let mut settings: Vec<String> = vec![
"-pix_fmt",
"yuv420p",
@ -209,6 +214,8 @@ impl GlobalConfig {
config.out.preview_cmd = split(config.out.preview_param.as_str());
config.out.output_cmd = split(config.out.output_param.as_str());
// Read command line arguments, and override the config with them.
if let Some(gen) = args.generate {
config.general.generate = Some(gen);
}
@ -237,7 +244,7 @@ impl GlobalConfig {
if let Some(length) = args.length {
config.playlist.length = length.clone();
if length.contains(":") {
if length.contains(':') {
config.playlist.length_sec = Some(time_to_sec(&length));
} else {
config.playlist.length_sec = Some(86400.0);
@ -266,11 +273,10 @@ impl GlobalConfig {
static INSTANCE: OnceCell<GlobalConfig> = OnceCell::new();
/// When add_loudnorm is False we use a different audio encoder,
/// s302m has higher quality, but is experimental
/// and works not well together with the loudnorm filter.
fn pre_audio_codec(add_loudnorm: bool) -> Vec<String> {
// when add_loudnorm is False we use a different audio encoder,
// s302m has higher quality, but is experimental
// and works not well together with the loudnorm filter
let mut codec = vec!["-c:a", "s302m", "-strict", "-2"];
if add_loudnorm {

View File

@ -12,6 +12,7 @@ use simplelog::*;
use crate::utils::Media;
/// Defined process units.
pub enum ProcessUnit {
Decoder,
Encoder,
@ -30,6 +31,10 @@ impl fmt::Display for ProcessUnit {
use ProcessUnit::*;
/// Process Controller
///
/// We save here some global states, about what is running and which processes are alive.
/// This we need for process termination, skipping clip decoder etc.
#[derive(Clone)]
pub struct ProcessControl {
pub decoder_term: Arc<Mutex<Option<Child>>>,
@ -88,6 +93,8 @@ impl ProcessControl {
Ok(())
}
/// Wait for process to proper close.
/// This prevents orphaned/zombi processes in system
pub fn wait(&mut self, proc: ProcessUnit) -> Result<(), String> {
match proc {
Decoder => {
@ -116,6 +123,7 @@ impl ProcessControl {
Ok(())
}
/// No matter what is running, terminate them all.
pub fn kill_all(&mut self) {
self.is_terminated.store(true, Ordering::SeqCst);
@ -141,6 +149,7 @@ impl Drop for ProcessControl {
}
}
/// Global player control, to get infos about current clip etc.
#[derive(Clone)]
pub struct PlayerControl {
pub current_media: Arc<Mutex<Option<Media>>>,
@ -158,6 +167,7 @@ impl PlayerControl {
}
}
/// Global playout control, for move forward/backward clip, or resetting playlist/state.
#[derive(Clone, Debug)]
pub struct PlayoutStatus {
pub time_shift: Arc<Mutex<f64>>,

View File

@ -1,3 +1,11 @@
/// Simple Playlist Generator
///
/// You can call ffplayout[.exe] -g YYYY-mm-dd - YYYY-mm-dd to generate JSON playlists.
///
/// The generator takes the files from storage, which are set in config.
/// It also respect the shuffle/sort mode.
///
/// Beside that it is really very basic, without any logic.
use std::{
fs::{create_dir_all, write},
path::Path,
@ -8,33 +16,28 @@ use std::{
use chrono::{Duration, NaiveDate};
use simplelog::*;
use crate::input::Source;
use crate::input::FolderSource;
use crate::utils::{json_serializer::Playlist, GlobalConfig, Media};
fn get_date_range(date_range: &Vec<String>) -> Vec<String> {
/// Generate a vector with dates, from given range.
fn get_date_range(date_range: &[String]) -> Vec<String> {
let mut range = vec![];
let start;
let end;
match NaiveDate::parse_from_str(&date_range[0], "%Y-%m-%d") {
Ok(s) => {
start = s;
}
let start = match NaiveDate::parse_from_str(&date_range[0], "%Y-%m-%d") {
Ok(s) => s,
Err(_) => {
error!("date format error in: <yellow>{:?}</>", date_range[0]);
exit(1);
}
}
};
match NaiveDate::parse_from_str(&date_range[2], "%Y-%m-%d") {
Ok(e) => {
end = e;
}
let end = match NaiveDate::parse_from_str(&date_range[2], "%Y-%m-%d") {
Ok(e) => e,
Err(_) => {
error!("date format error in: <yellow>{:?}</>", date_range[2]);
exit(1);
}
}
};
let duration = end.signed_duration_since(start);
let days = duration.num_days() + 1;
@ -46,9 +49,10 @@ fn get_date_range(date_range: &Vec<String>) -> Vec<String> {
range
}
/// Generate playlists
pub fn generate_playlist(mut date_range: Vec<String>) {
let config = GlobalConfig::global();
let total_length = config.playlist.length_sec.unwrap().clone();
let total_length = config.playlist.length_sec.unwrap();
let current_list = Arc::new(Mutex::new(vec![Media::new(0, "".to_string(), false)]));
let index = Arc::new(AtomicUsize::new(0));
let playlist_root = Path::new(&config.playlist.path);
@ -66,7 +70,7 @@ pub fn generate_playlist(mut date_range: Vec<String>) {
date_range = get_date_range(&date_range)
}
let media_list = Source::new(current_list, index);
let media_list = FolderSource::new(current_list, index);
let list_length = media_list.nodes.lock().unwrap().len();
for date in date_range {
@ -96,7 +100,7 @@ pub fn generate_playlist(mut date_range: Vec<String>) {
);
let mut filler = Media::new(0, config.storage.filler_clip.clone(), true);
let filler_length = filler.duration.clone();
let filler_length = filler.duration;
let mut length = 0.0;
let mut round = 0;
@ -109,7 +113,7 @@ pub fn generate_playlist(mut date_range: Vec<String>) {
};
for item in media_list.clone() {
let duration = item.duration.clone();
let duration = item.duration;
if total_length > length + duration {
playlist.program.push(item);

View File

@ -12,6 +12,7 @@ use crate::utils::{get_date, modified_time, validate_playlist, GlobalConfig, Med
pub const DUMMY_LEN: f64 = 60.0;
/// This is our main playlist object, it holds all necessary information for the current day.
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Playlist {
pub date: String,
@ -44,6 +45,8 @@ impl Playlist {
}
}
/// Read json playlist file, fills Playlist struct and set some extra values,
/// which we need to process.
pub fn read_json(
path: Option<String>,
is_terminated: Arc<AtomicBool>,
@ -89,13 +92,14 @@ pub fn read_json(
serde_json::from_reader(f).expect("Could not read json playlist file.");
playlist.current_file = Some(current_file.clone());
playlist.start_sec = Some(start_sec.clone());
playlist.start_sec = Some(start_sec);
let modify = modified_time(&current_file);
if let Some(modi) = modify {
playlist.modified = Some(modi.to_string());
}
// Add extra values to every media clip
for (i, item) in playlist.program.iter_mut().enumerate() {
item.begin = Some(start_sec);
item.index = Some(i);

View File

@ -1,9 +1,22 @@
use std::{path::Path, sync::{atomic::{AtomicBool, Ordering}, Arc}};
use std::{
path::Path,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};
use simplelog::*;
use crate::utils::{sec_to_time, GlobalConfig, MediaProbe, Playlist};
/// Validate a given playlist, to check if:
///
/// - the source files are existing
/// - file can be read by ffprobe and metadata exists
/// - total playtime fits target length from config
///
/// This function we run in a thread, to don't block the main function.
pub fn validate_playlist(playlist: Playlist, is_terminated: Arc<AtomicBool>, config: GlobalConfig) {
let date = playlist.date;
let mut length = config.playlist.length_sec.unwrap();
@ -15,7 +28,7 @@ pub fn validate_playlist(playlist: Playlist, is_terminated: Arc<AtomicBool>, con
for item in playlist.program.iter() {
if is_terminated.load(Ordering::SeqCst) {
return
return;
}
if Path::new(&item.source).is_file() {

View File

@ -8,19 +8,23 @@ use std::{
time::Duration,
};
use file_rotate::{compression::Compression, suffix::AppendCount, ContentLimit, FileRotate};
use chrono::prelude::*;
use file_rotate::{
compression::Compression,
suffix::{AppendTimestamp, DateFrom, FileLimit},
ContentLimit, FileRotate, TimeFrequency,
};
use lettre::{
message::header, transport::smtp::authentication::Credentials, Message, SmtpTransport,
Transport,
};
use chrono::prelude::*;
use log::{Level, LevelFilter, Log, Metadata, Record};
use regex::Regex;
use simplelog::*;
use crate::utils::GlobalConfig;
/// send log messages to mail recipient
fn send_mail(msg: String) {
let config = GlobalConfig::global();
@ -52,9 +56,10 @@ fn send_mail(msg: String) {
}
}
/// Basic Mail Queue
///
/// Check every give seconds for messages and send them.
fn mail_queue(messages: Arc<Mutex<Vec<String>>>, interval: u64) {
// check every give seconds for messages and send them
loop {
if messages.lock().unwrap().len() > 0 {
let msg = messages.lock().unwrap().join("\n");
@ -67,6 +72,7 @@ fn mail_queue(messages: Arc<Mutex<Vec<String>>>, interval: u64) {
}
}
/// Self made Mail Log struct, to extend simplelog.
pub struct LogMailer {
level: LevelFilter,
pub config: Config,
@ -121,12 +127,20 @@ impl SharedLogger for LogMailer {
}
}
/// Workaround to remove color information from log
///
/// ToDo: maybe in next version from simplelog this is not necessary anymore.
fn clean_string(text: &str) -> String {
let regex: Regex = Regex::new(r"\x1b\[[0-9;]*[mGKF]").unwrap();
regex.replace_all(text, "").to_string()
}
/// Initialize our logging, to have:
///
/// - console logger
/// - file logger
/// - mail logger
pub fn init_logging() -> Vec<Box<dyn SharedLogger>> {
let config = GlobalConfig::global();
let app_config = config.logging.clone();
@ -137,18 +151,26 @@ pub fn init_logging() -> Vec<Box<dyn SharedLogger>> {
time_level = LevelFilter::Error;
}
let log_config = simplelog::ConfigBuilder::new()
let mut log_config = ConfigBuilder::new()
.set_thread_level(LevelFilter::Off)
.set_target_level(LevelFilter::Off)
.set_level_padding(LevelPadding::Left)
.set_time_to_local(app_config.local_time)
.set_time_level(time_level)
.clone();
if app_config.local_time {
log_config = match log_config.set_time_offset_to_local() {
Ok(local) => local.clone(),
Err(_) => log_config,
};
};
if app_config.log_to_file {
let file_config = log_config
.clone()
.set_time_format("[%Y-%m-%d %H:%M:%S%.3f]".into())
.set_time_format_custom(format_description!(
"[[year]-[month]-[day] [hour]:[minute]:[second].[subsecond]]"
))
.build();
let mut log_path = "logs/ffplayout.log".to_string();
@ -166,8 +188,12 @@ pub fn init_logging() -> Vec<Box<dyn SharedLogger>> {
let log = || {
FileRotate::new(
log_path,
AppendCount::new(app_config.backup_count),
ContentLimit::Lines(1000),
AppendTimestamp::with_format(
"%Y-%m-%d",
FileLimit::MaxFiles(app_config.backup_count),
DateFrom::DateYesterday,
),
ContentLimit::Time(TimeFrequency::Daily),
Compression::None,
)
};
@ -180,7 +206,9 @@ pub fn init_logging() -> Vec<Box<dyn SharedLogger>> {
.set_level_color(Level::Info, Some(Color::Ansi256(10)))
.set_level_color(Level::Warn, Some(Color::Ansi256(208)))
.set_level_color(Level::Error, Some(Color::Ansi256(9)))
.set_time_format_str("\x1b[30;1m[%Y-%m-%d %H:%M:%S%.3f]\x1b[0m")
.set_time_format_custom(format_description!(
"\x1b[[30;1m[[[year]-[month]-[day] [hour]:[minute]:[second].[subsecond digits:4]]\x1b[[0m"
))
.build();
app_logger.push(TermLogger::new(
@ -191,14 +219,15 @@ pub fn init_logging() -> Vec<Box<dyn SharedLogger>> {
));
}
if config.mail.recipient.contains("@") && config.mail.recipient.contains(".") {
// set mail logger only the recipient is set in config
if config.mail.recipient.contains('@') && config.mail.recipient.contains('.') {
let messages: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
let messages_clone = messages.clone();
let interval = config.mail.interval.clone();
let interval = config.mail.interval;
thread::spawn(move || mail_queue(messages_clone, interval));
let mail_config = log_config.clone().build();
let mail_config = log_config.build();
let filter = match config.mail.mail_level.to_lowercase().as_str() {
"info" => LevelFilter::Info,

View File

@ -35,6 +35,7 @@ pub use logging::init_logging;
use crate::filter::filter_chains;
/// Video clip struct to hold some important states and comments for current media.
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Media {
#[serde(skip_serializing, skip_deserializing)]
@ -89,12 +90,12 @@ impl Media {
index: Some(index),
seek: 0.0,
out: duration,
duration: duration,
duration,
category: None,
source: src.clone(),
cmd: Some(vec!["-i".to_string(), src]),
filter: Some(vec![]),
probe: probe,
probe,
last_ad: Some(false),
next_ad: Some(false),
process: Some(true),
@ -124,6 +125,7 @@ impl Media {
}
}
/// We use the ffprobe crate, but we map the metadata to our needs.
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct MediaProbe {
pub format: Option<Format>,
@ -158,12 +160,12 @@ impl MediaProbe {
MediaProbe {
format: Some(obj.format),
audio_streams: if a_stream.len() > 0 {
audio_streams: if !a_stream.is_empty() {
Some(a_stream)
} else {
None
},
video_streams: if v_stream.len() > 0 {
video_streams: if !v_stream.is_empty() {
Some(v_stream)
} else {
None
@ -185,6 +187,9 @@ impl MediaProbe {
}
}
/// Write current status to status file in temp folder.
///
/// The status file is init in main function and mostly modified in RPC server.
pub fn write_status(date: &str, shift: f64) {
let config = GlobalConfig::global();
let stat_file = config.general.stat_file.clone();
@ -206,6 +211,7 @@ pub fn write_status(date: &str, shift: f64) {
// local.timestamp_millis() as i64
// }
/// Get current time in seconds.
pub fn get_sec() -> f64 {
let local: DateTime<Local> = Local::now();
@ -213,6 +219,10 @@ pub fn get_sec() -> f64 {
+ (local.nanosecond() as f64 / 1000000000.0)
}
/// Get current date for playlist, but check time with conditions:
///
/// - When time is before playlist start, get date from yesterday.
/// - When given next_start is over target length (normally a full day), get date from tomorrow.
pub fn get_date(seek: bool, start: f64, next_start: f64) -> String {
let local: DateTime<Local> = Local::now();
@ -227,6 +237,7 @@ pub fn get_date(seek: bool, start: f64, next_start: f64) -> String {
local.format("%Y-%m-%d").to_string()
}
/// Get file modification time.
pub fn modified_time(path: &str) -> Option<DateTime<Local>> {
let metadata = metadata(path).unwrap();
@ -238,8 +249,9 @@ pub fn modified_time(path: &str) -> Option<DateTime<Local>> {
None
}
/// Convert a formatted time string to seconds.
pub fn time_to_sec(time_str: &str) -> f64 {
if ["now", "", "none"].contains(&time_str) || !time_str.contains(":") {
if ["now", "", "none"].contains(&time_str) || !time_str.contains(':') {
return get_sec();
}
@ -251,6 +263,7 @@ pub fn time_to_sec(time_str: &str) -> f64 {
h * 3600.0 + m * 60.0 + s
}
/// Convert floating number (seconds) to a formatted time string.
pub fn sec_to_time(sec: f64) -> String {
let d = UNIX_EPOCH + time::Duration::from_millis((sec * 1000.0) as u64);
// Create DateTime from SystemTime
@ -259,6 +272,8 @@ pub fn sec_to_time(sec: f64) -> String {
date_time.format("%H:%M:%S%.3f").to_string()
}
/// Test if given numbers are close to each other,
/// with a third number for setting the maximum range.
pub fn is_close(a: f64, b: f64, to: f64) -> bool {
if (a - b).abs() < to {
return true;
@ -267,13 +282,16 @@ pub fn is_close(a: f64, b: f64, to: f64) -> bool {
false
}
/// Get delta between clip start and current time. This value we need to check,
/// if we still in sync.
///
/// We also get here the global delta between clip start and time when a new playlist should start.
pub fn get_delta(begin: &f64) -> (f64, f64) {
let config = GlobalConfig::global();
let mut current_time = get_sec();
let start = config.playlist.start_sec.unwrap();
let length = time_to_sec(&config.playlist.length);
let mut target_length = 86400.0;
let total_delta;
if length > 0.0 && length != target_length {
target_length = length
@ -290,15 +308,16 @@ pub fn get_delta(begin: &f64) -> (f64, f64) {
current_delta -= 86400.0
}
if current_time < start {
total_delta = start - current_time;
let total_delta = if current_time < start {
start - current_time
} else {
total_delta = target_length + start - current_time;
}
target_length + start - current_time
};
(current_delta, total_delta)
}
/// Check if clip in playlist is in sync with global time.
pub fn check_sync(delta: f64) -> bool {
let config = GlobalConfig::global();
@ -310,6 +329,7 @@ pub fn check_sync(delta: f64) -> bool {
true
}
/// Create a dummy clip as a placeholder for missing video files.
pub fn gen_dummy(duration: f64) -> (String, Vec<String>) {
let config = GlobalConfig::global();
let color = "#121212";
@ -334,6 +354,7 @@ pub fn gen_dummy(duration: f64) -> (String, Vec<String>) {
(source, cmd)
}
/// Set clip seek in and length value.
pub fn seek_and_length(src: String, seek: f64, out: f64, duration: f64) -> Vec<String> {
let mut source_cmd: Vec<String> = vec![];
@ -344,25 +365,19 @@ pub fn seek_and_length(src: String, seek: f64, out: f64, duration: f64) -> Vec<S
source_cmd.append(&mut vec!["-i".to_string(), src]);
if duration > out {
source_cmd.append(&mut vec![
"-t".to_string(),
format!("{}", out - seek).to_string(),
]);
source_cmd.append(&mut vec!["-t".to_string(), format!("{}", out - seek)]);
}
source_cmd
}
/// Read ffmpeg stderr decoder, encoder and server instance
/// and log the output.
pub fn stderr_reader(buffer: BufReader<ChildStderr>, suffix: &str) -> Result<(), Error> {
// read ffmpeg stderr decoder, encoder and server instance
// and log the output
fn format_line(line: String, level: &str) -> String {
line.replace(&format!("[{level: >5}] "), "")
}
// let buffer = BufReader::new(std_errors);
for line in buffer.lines() {
let line = line?;
@ -373,22 +388,21 @@ pub fn stderr_reader(buffer: BufReader<ChildStderr>, suffix: &str) -> Result<(),
"<bright black>[{suffix}]</> {}",
format_line(line, "warning")
)
} else {
if suffix != "server"
&& !line.contains("Input/output error")
&& !line.contains("Broken pipe")
{
error!(
"<bright black>[{suffix}]</> {}",
format_line(line.clone(), "error")
);
}
} else if suffix != "server"
&& !line.contains("Input/output error")
&& !line.contains("Broken pipe")
{
error!(
"<bright black>[{suffix}]</> {}",
format_line(line.clone(), "error")
);
}
}
Ok(())
}
/// Run program to test if it is in system.
fn is_in_system(name: &str) {
if let Ok(mut proc) = Command::new(name)
.stderr(Stdio::null())
@ -407,6 +421,8 @@ fn is_in_system(name: &str) {
fn ffmpeg_libs_and_filter() -> (Vec<String>, Vec<String>) {
let mut libs: Vec<String> = vec![];
let mut filters: Vec<String> = vec![];
// filter lines which contains filter
let re: Regex = Regex::new(r"^( ?) [TSC.]+").unwrap();
let mut ff_proc = match Command::new("ffmpeg")
@ -425,27 +441,27 @@ fn ffmpeg_libs_and_filter() -> (Vec<String>, Vec<String>) {
let err_buffer = BufReader::new(ff_proc.stderr.take().unwrap());
let out_buffer = BufReader::new(ff_proc.stdout.take().unwrap());
for line in err_buffer.lines() {
if let Ok(line) = line {
if line.contains("configuration:") {
let configs = line.split_whitespace();
// stderr shows only the ffmpeg configuration
// get codec library's
for line in err_buffer.lines().flatten() {
if line.contains("configuration:") {
let configs = line.split_whitespace();
for config in configs {
if config.contains("--enable-lib") {
libs.push(config.replace("--enable-", ""));
}
for config in configs {
if config.contains("--enable-lib") {
libs.push(config.replace("--enable-", ""));
}
}
}
}
for line in out_buffer.lines() {
if let Ok(line) = line {
if let Some(_) = re.captures(line.as_str()) {
let filter_line = line.split_whitespace();
// stdout shows filter help text
// get filters
for line in out_buffer.lines().flatten() {
if re.captures(line.as_str()).is_some() {
let filter_line = line.split_whitespace();
filters.push(filter_line.collect::<Vec<&str>>()[1].to_string());
}
filters.push(filter_line.collect::<Vec<&str>>()[1].to_string());
}
}
@ -455,6 +471,10 @@ fn ffmpeg_libs_and_filter() -> (Vec<String>, Vec<String>) {
(libs, filters)
}
/// Validate ffmpeg/ffprobe/ffplay.
///
/// Check if they are in system and has all filters and codecs we need.
pub fn validate_ffmpeg() {
let config = GlobalConfig::global();