diff --git a/Cargo.lock b/Cargo.lock index ac4216f3..be80d786 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -88,15 +88,15 @@ dependencies = [ "libc", "num-integer", "num-traits", - "time", + "time 0.1.43", "winapi 0.3.9", ] [[package]] name = "clap" -version = "3.1.12" +version = "3.1.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7c167e37342afc5f33fd87bbc870cedd020d2a6dffa05d45ccd9241fbdd146db" +checksum = "85a35a599b11c089a7f49105658d089b8f2cf0882993c17daf6de15285c2c35d" dependencies = [ "atty", "bitflags", @@ -124,9 +124,9 @@ dependencies = [ [[package]] name = "clap_lex" -version = "0.1.1" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "189ddd3b5d32a70b35e7686054371742a937b0d99128e76dde6340210e966669" +checksum = "a37c35f1112dad5e6e0b1adaff798507497a18fceeb30cceb3bae7d1427b9213" dependencies = [ "os_str_bytes", ] @@ -196,7 +196,7 @@ dependencies = [ [[package]] name = "ffplayout-engine" -version = "0.9.4" +version = "0.9.5" dependencies = [ "chrono", "clap", @@ -216,6 +216,7 @@ dependencies = [ "serde_yaml", "shlex", "simplelog", + "time 0.3.9", "walkdir", ] @@ -232,8 +233,7 @@ dependencies = [ [[package]] name = "file-rotate" version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8071df7315b1cd4006ce687043f393cca212761889b3626c1444ae06e8f72d0" +source = "git+https://github.com/jb-alvarado/file-rotate.git#ae5062a5b82626b4d1f9fea2a17325fe1d160d4c" dependencies = [ "chrono", "flate2", @@ -466,9 +466,9 @@ dependencies = [ [[package]] name = "http" -version = "0.2.6" +version = "0.2.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "31f4c6746584866f0feabcc69893c5b51beef3831656a968ed7ae254cdc4fd03" +checksum = "ff8670570af52249509a86f5e3e18a08c60b177071826898fde8997cf5f6bfbb" dependencies = [ "bytes", "fnv", @@ -488,9 +488,9 @@ dependencies = [ [[package]] name = "httparse" -version = "1.7.0" +version = "1.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6330e8a36bd8c859f3fa6d9382911fbb7147ec39807f63b923933a247240b9ba" +checksum = "496ce29bb5a52785b44e0f7ca2847ae0bb839c9bd28f69acac9b99d461c0c04c" [[package]] name = "httpdate" @@ -659,9 +659,9 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" [[package]] name = "lettre" -version = "0.10.0-rc.5" +version = "0.10.0-rc.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5144148f337be14dabfc0f0d85b691a68ac6c77ef22a5c47c5504b70a7c9fcf3" +checksum = "2f6c70001f7ee6c93b6687a06607c7a38f9a7ae460139a496c23da21e95bc289" dependencies = [ "base64", "email-encoding", @@ -680,9 +680,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.124" +version = "0.2.125" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "21a41fed9d98f27ab1c6d161da622a4fa35e8a54a8adc24bbf3ddd0ef70b0e50" +checksum = "5916d2ae698f6de9bfb891ad7a8d65c09d232dc58cc4ac433c7da3b2fd84bc2b" [[package]] name = "linked-hash-map" @@ -702,9 +702,9 @@ dependencies = [ [[package]] name = "log" -version = "0.4.16" +version = "0.4.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6389c490849ff5bc16be905ae24bc913a9c8892e19b2341dbc175e14c341c2b8" +checksum = "abb12e687cfb44aa40f41fc3978ef76448f9b6038cad6aef4259d3c095a2382e" dependencies = [ "cfg-if 1.0.0", ] @@ -723,9 +723,9 @@ checksum = "a3e378b66a060d48947b590737b30a1be76706c8dd7b8ba0f2fe3989c68a853f" [[package]] name = "memchr" -version = "2.4.1" +version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "308cc39be01b73d0d18f82a0e7b2a3df85245f84af96fdddc5d202d27e47b86a" +checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" [[package]] name = "mime" @@ -882,9 +882,9 @@ dependencies = [ [[package]] name = "num-integer" -version = "0.1.44" +version = "0.1.45" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d2cc698a63b549a70bc047073d2949cce27cd1c7b0a4a862d08a8031bc2801db" +checksum = "225d3389fb3509a24c93f5c29eb6bde2586b98d9f016636dff58d7c6f7569cd9" dependencies = [ "autocfg", "num-traits", @@ -892,9 +892,9 @@ dependencies = [ [[package]] name = "num-traits" -version = "0.2.14" +version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a64b1ec5cda2586e284722486d802acf1f7dbdc623e2bfc57e65ca1cd099290" +checksum = "578ede34cf02f8924ab9447f50c28075b4d3e5b269972345e7e0372b38c6cdcd" dependencies = [ "autocfg", ] @@ -909,6 +909,15 @@ dependencies = [ "libc", ] +[[package]] +name = "num_threads" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2819ce041d2ee131036f4fc9d6ae7ae125a3a40e97ba64d04fe799ad9dabbb44" +dependencies = [ + "libc", +] + [[package]] name = "once_cell" version = "1.10.0" @@ -917,18 +926,30 @@ checksum = "87f3e037eac156d1775da914196f0f37741a274155e34a0b7e427c35d2a2ecb9" [[package]] name = "openssl" -version = "0.10.38" +version = "0.10.40" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c7ae222234c30df141154f159066c5093ff73b63204dcda7121eb082fc56a95" +checksum = "fb81a6430ac911acb25fe5ac8f1d2af1b4ea8a4fdfda0f1ee4292af2e2d8eb0e" dependencies = [ "bitflags", "cfg-if 1.0.0", "foreign-types", "libc", "once_cell", + "openssl-macros", "openssl-sys", ] +[[package]] +name = "openssl-macros" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b501e44f11665960c7e7fcf062c7d96a14ade4aa98116c004b2e37b5be7d736c" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "openssl-probe" version = "0.1.5" @@ -946,9 +967,9 @@ dependencies = [ [[package]] name = "openssl-sys" -version = "0.9.72" +version = "0.9.73" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e46109c383602735fa0a2e48dd2b7c892b048e1bf69e5c3b1d804b7d9c203cb" +checksum = "9d5fd19fb3e0a8191c1e34935718976a3e70c112ab9a24af6d7cadccd9d90bc0" dependencies = [ "autocfg", "cc", @@ -966,9 +987,9 @@ checksum = "8e22443d1643a904602595ba1cd8f7d896afe56d26712531c5ff73a15b2fbf64" [[package]] name = "paris" -version = "1.5.11" +version = "1.5.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c69d19a208bba8b94bd27d4b7a06ad153cddc6b88cb2149a668e23ce7bdb67d5" +checksum = "2eaf2319cd71dd9ff38c72bebde61b9ea657134abcf26ae4205f54f772a32810" [[package]] name = "parking_lot" @@ -997,9 +1018,9 @@ dependencies = [ [[package]] name = "pin-project-lite" -version = "0.2.8" +version = "0.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e280fbe77cc62c91527259e9442153f4688736748d24660126286329742b4c6c" +checksum = "e0a7ae3ac2f1173085d398531c705756c94a4c56843785df85a60c1a0afac116" [[package]] name = "pin-utils" @@ -1188,18 +1209,18 @@ dependencies = [ [[package]] name = "serde" -version = "1.0.136" +version = "1.0.137" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ce31e24b01e1e524df96f1c2fdd054405f8d7376249a5110886fb4b658484789" +checksum = "61ea8d54c77f8315140a05f4c7237403bf38b72704d031543aa1d16abbf517d1" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.136" +version = "1.0.137" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "08597e7152fcd306f41838ed3e37be9eaeed2b61c42e2117266a554fab4662f9" +checksum = "1f26faba0c3959972377d3b2d306ee9f71faee9714294e41bb777f83f88578be" dependencies = [ "proc-macro2", "quote", @@ -1208,9 +1229,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.79" +version = "1.0.81" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e8d9fa5c3b304765ce1fd9c4c8a3de2c8db365a5b91be52f186efc675681d95" +checksum = "9b7ce2b32a1aed03c558dc61a5cd328f15aff2dbc17daad8fb8af04d2100e15c" dependencies = [ "itoa", "ryu", @@ -1219,9 +1240,9 @@ dependencies = [ [[package]] name = "serde_yaml" -version = "0.8.23" +version = "0.8.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4a521f2940385c165a24ee286aa8599633d162077a54bdcae2a6fd5a7bfa7a0" +checksum = "707d15895415db6628332b737c838b88c598522e4dc70647e59b72312924aebc" dependencies = [ "indexmap", "ryu", @@ -1237,14 +1258,14 @@ checksum = "43b2853a4d09f215c24cc5489c992ce46052d359b5109343cbafbf26bc62f8a3" [[package]] name = "simplelog" -version = "0.11.2" +version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c1348164456f72ca0116e4538bdaabb0ddb622c7d9f16387c725af3e96d6001c" +checksum = "48dfff04aade74dd495b007c831cd6f4e0cee19c344dd9dc0884c0289b70a786" dependencies = [ - "chrono", "log", "paris", "termcolor", + "time 0.3.9", ] [[package]] @@ -1277,9 +1298,9 @@ checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" [[package]] name = "syn" -version = "1.0.91" +version = "1.0.92" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b683b2b825c8eef438b77c36a06dc262294da3d5a5813fac20da149241dcd44d" +checksum = "7ff7c592601f11445996a06f8ad0c27f094a58857c2f89e97974ab9235b92c52" dependencies = [ "proc-macro2", "quote", @@ -1325,6 +1346,24 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "time" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2702e08a7a860f005826c6815dcac101b19b5eb330c27fe4a5928fec1d20ddd" +dependencies = [ + "itoa", + "libc", + "num_threads", + "time-macros", +] + +[[package]] +name = "time-macros" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42657b1a6f4d817cda8e7a0ace261fe0cc946cf3a80314390b22cc61ae080792" + [[package]] name = "tinyvec" version = "1.6.0" @@ -1342,15 +1381,16 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" [[package]] name = "tokio" -version = "1.17.0" +version = "1.18.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2af73ac49756f3f7c01172e34a23e5d0216f6c32333757c2c61feb2bbff5a5ee" +checksum = "dce653fb475565de9f6fb0614b28bca8df2c430c0cf84bcd9c843f15de5414cc" dependencies = [ "bytes", "libc", "memchr", "mio 0.8.2", "num_cpus", + "once_cell", "pin-project-lite", "socket2", "winapi 0.3.9", @@ -1424,9 +1464,9 @@ dependencies = [ [[package]] name = "unicode-bidi" -version = "0.3.7" +version = "0.3.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a01404663e3db436ed2746d9fefef640d868edae3cceb81c3b8d5732fda678f" +checksum = "099b7128301d285f79ddd55b9a83d5e6b9e97c92e0ea0daebee7263e932de992" [[package]] name = "unicode-normalization" @@ -1439,9 +1479,9 @@ dependencies = [ [[package]] name = "unicode-xid" -version = "0.2.2" +version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ccb82d61f80a663efe1f787a51b16b5a51e3314d6ac365b08639f52387b33f3" +checksum = "957e51f3646910546462e67d5f7599b9e4fb8acdd304b087a6494730f9eebf04" [[package]] name = "vcpkg" diff --git a/Cargo.toml b/Cargo.toml index 6fb0b364..1ffcec92 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,7 +4,7 @@ description = "24/7 playout based on rust and ffmpeg" license = "GPL-3.0" authors = ["Jonathan Baecker jonbae77@gmail.com"] readme = "README.md" -version = "0.9.4" +version = "0.9.5" edition = "2021" [dependencies] @@ -12,9 +12,9 @@ chrono = "0.4" clap = { version = "3.1", features = ["derive"] } crossbeam-channel = "0.5" ffprobe = "0.3" -file-rotate = "0.6" +file-rotate = { git = "https://github.com/jb-alvarado/file-rotate.git" } jsonrpc-http-server = "18.0" -lettre = "0.10.0-rc.5" +lettre = "0.10.0-rc.6" log = "0.4" notify = "4.0" once_cell = "1.10" @@ -24,7 +24,8 @@ serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" serde_yaml = "0.8" shlex = "1.1" -simplelog = { version = "^0.11", features = ["paris"] } +simplelog = { version = "^0.12", features = ["paris"] } +time = { version = "0.3", features = ["formatting", "macros"] } walkdir = "2" [target.x86_64-unknown-linux-musl.dependencies] @@ -48,6 +49,7 @@ license-file = ["LICENSE", "0"] depends = "" suggests = "ffmpeg" copyright = "Copyright (c) 2022, Jonathan Baecker. All rights reserved." +conf-files = ["/etc/ffplayout/ffplayout.yml"] assets = [ ["target/x86_64-unknown-linux-musl/release/ffplayout", "/usr/bin/ffplayout", "755"], ["assets/ffplayout.yml", "/etc/ffplayout/ffplayout.yml", "644"], @@ -62,7 +64,7 @@ name = "ffplayout-engine" license = "GPL-3.0" assets = [ { source = "target/x86_64-unknown-linux-musl/release/ffplayout", dest = "/usr/bin/ffplayout", mode = "755" }, - { source = "assets/ffplayout.yml", dest = "/etc/ffplayout/ffplayout.yml", mode = "644" }, + { source = "assets/ffplayout.yml", dest = "/etc/ffplayout/ffplayout.yml", mode = "644", config = true }, { source = "assets/ffplayout-engine.service", dest = "/lib/systemd/system/ffplayout-engine.service", mode = "644" }, { source = "README.md", dest = "/usr/share/doc/ffplayout-engine/README", mode = "644", doc = true }, { source = "LICENSE", dest = "/usr/share/doc/ffplayout-engine/LICENSE", mode = "644" }, diff --git a/assets/ffplayout-engine.service b/assets/ffplayout-engine.service index 0471626a..4deae5d5 100644 --- a/assets/ffplayout-engine.service +++ b/assets/ffplayout-engine.service @@ -1,6 +1,6 @@ [Unit] Description=Rust based 24/7 playout solution -After=network.target +After=network.target remote-fs.target [Service] ExecStart= /usr/bin/ffplayout diff --git a/assets/ffplayout.yml b/assets/ffplayout.yml index 44697edb..e121438a 100644 --- a/assets/ffplayout.yml +++ b/assets/ffplayout.yml @@ -62,6 +62,7 @@ processing: logo_opacity: 0.7 logo_filter: overlay=W-w-12:12 add_loudnorm: false + loudnorm_ingest: false loud_i: -18 loud_tp: -1.5 loud_lra: 11 diff --git a/src/filter/a_loudnorm.rs b/src/filter/a_loudnorm.rs new file mode 100644 index 00000000..2d503372 --- /dev/null +++ b/src/filter/a_loudnorm.rs @@ -0,0 +1,11 @@ +use crate::utils::GlobalConfig; + +/// Loudnorm Audio Filter +/// +/// Add loudness normalization. +pub fn filter_node(config: &GlobalConfig) -> String { + format!( + ",loudnorm=I={}:TP={}:LRA={}", + config.processing.loud_i, config.processing.loud_tp, config.processing.loud_lra + ) +} diff --git a/src/filter/ingest_filter.rs b/src/filter/ingest_filter.rs new file mode 100644 index 00000000..98be5654 --- /dev/null +++ b/src/filter/ingest_filter.rs @@ -0,0 +1,53 @@ +use crate::filter::{a_loudnorm, v_overlay}; +use crate::utils::GlobalConfig; + +/// Audio Filter +/// +/// If needed we add audio filters to the server instance. +fn audio_filter(config: &GlobalConfig) -> String { + let mut audio_chain = ";[0:a]afade=in:st=0:d=0.5".to_string(); + + if config.processing.loudnorm_ingest { + audio_chain.push_str(&a_loudnorm::filter_node(config)); + } + + if config.processing.volume != 1.0 { + audio_chain.push_str(format!(",volume={}", config.processing.volume).as_str()); + } + + audio_chain.push_str("[aout1]"); + + audio_chain +} + +/// Create filter nodes for ingest live stream. +pub fn filter_cmd() -> Vec { + let config = GlobalConfig::global(); + + let mut filter = format!( + "[0:v]fps={},scale={}:{},setdar=dar={},fade=in:st=0:d=0.5", + config.processing.fps, + config.processing.width, + config.processing.height, + config.processing.aspect + ); + + let overlay = v_overlay::filter_node(config, true); + + if !overlay.is_empty() { + filter.push(','); + } + + filter.push_str(&overlay); + filter.push_str("[vout1]"); + filter.push_str(audio_filter(config).as_str()); + + vec![ + "-filter_complex".to_string(), + filter, + "-map".to_string(), + "[vout1]".to_string(), + "-map".to_string(), + "[aout1]".to_string(), + ] +} diff --git a/src/filter/mod.rs b/src/filter/mod.rs index 6bb2d23f..66ea47f1 100644 --- a/src/filter/mod.rs +++ b/src/filter/mod.rs @@ -2,7 +2,10 @@ use std::path::Path; use simplelog::*; +pub mod a_loudnorm; +pub mod ingest_filter; pub mod v_drawtext; +pub mod v_overlay; use crate::utils::{get_delta, is_close, GlobalConfig, Media}; @@ -28,7 +31,7 @@ impl Filters { match codec_type { "audio" => match &self.audio_chain { Some(ac) => { - if filter.starts_with(";") || filter.starts_with("[") { + if filter.starts_with(';') || filter.starts_with('[') { self.audio_chain = Some(format!("{ac}{filter}")) } else { self.audio_chain = Some(format!("{ac},{filter}")) @@ -46,7 +49,7 @@ impl Filters { }, "video" => match &self.video_chain { Some(vc) => { - if filter.starts_with(";") || filter.starts_with("[") { + if filter.starts_with(';') || filter.starts_with('[') { self.video_chain = Some(format!("{vc}{filter}")) } else { self.video_chain = Some(format!("{vc},{filter}")) @@ -62,9 +65,9 @@ impl Filters { } } -fn deinterlace(field_order: Option, chain: &mut Filters) { +fn deinterlace(field_order: &Option, chain: &mut Filters) { if let Some(order) = field_order { - if &order != "progressive" { + if order != "progressive" { chain.add_filter("yadif=0:-1:0", "video") } } @@ -94,10 +97,7 @@ fn pad(aspect: f64, chain: &mut Filters, config: &GlobalConfig) { fn fps(fps: f64, chain: &mut Filters, config: &GlobalConfig) { if fps != config.processing.fps { - chain.add_filter( - &format!("fps={}", config.processing.fps), - "video", - ) + chain.add_filter(&format!("fps={}", config.processing.fps), "video") } } @@ -113,10 +113,7 @@ fn scale(width: i64, height: i64, aspect: f64, chain: &mut Filters, config: &Glo } if !is_close(aspect, config.processing.aspect, 0.03) { - chain.add_filter( - &format!("setdar=dar={}", config.processing.aspect), - "video" - ) + chain.add_filter(&format!("setdar=dar={}", config.processing.aspect), "video") } } @@ -142,17 +139,9 @@ fn fade(node: &mut Media, chain: &mut Filters, codec_type: &str) { fn overlay(node: &mut Media, chain: &mut Filters, config: &GlobalConfig) { if config.processing.add_logo && Path::new(&config.processing.logo).is_file() - && &node.category.clone().unwrap_or(String::new()) != "advertisement" + && &node.category.clone().unwrap_or_default() != "advertisement" { - let opacity = format!( - "format=rgba,colorchannelmixer=aa={}", - config.processing.logo_opacity - ); - let logo_loop = "loop=loop=-1:size=1:start=0"; - let mut logo_chain = format!( - "null[v];movie={},{logo_loop},{opacity}", - config.processing.logo - ); + let mut logo_chain = v_overlay::filter_node(config, false); if node.last_ad.unwrap() { logo_chain.push_str(",fade=in:st=0:d=1.0:alpha=1") @@ -173,7 +162,7 @@ fn overlay(node: &mut Media, chain: &mut Filters, config: &GlobalConfig) { fn extend_video(node: &mut Media, chain: &mut Filters) { let video_streams = node.probe.clone().unwrap().video_streams.unwrap(); - if video_streams.len() > 0 { + if !video_streams.is_empty() { if let Some(duration) = &video_streams[0].duration { let duration_float = duration.clone().parse::().unwrap(); @@ -190,16 +179,15 @@ fn extend_video(node: &mut Media, chain: &mut Filters) { } } +/// add drawtext filter for lower thirds messages fn add_text(node: &mut Media, chain: &mut Filters, config: &GlobalConfig) { - // add drawtext filter for lower thirds messages - if config.text.add_text && config.text.over_pre { let filter = v_drawtext::filter_node(node); chain.add_filter(&filter, "video"); if let Some(filters) = &chain.video_chain { - for (i, f) in filters.split(",").enumerate() { + for (i, f) in filters.split(',').enumerate() { if f.contains("drawtext") && !config.text.text_from_filename { debug!("drawtext node is on index: {i}"); break; @@ -211,7 +199,7 @@ fn add_text(node: &mut Media, chain: &mut Filters, config: &GlobalConfig) { fn add_audio(node: &mut Media, chain: &mut Filters) { let audio_streams = node.probe.clone().unwrap().audio_streams.unwrap(); - if audio_streams.len() == 0 { + if audio_streams.is_empty() { warn!("Clip: '{}' has no audio!", node.source); let audio = format!( "aevalsrc=0:channel_layout=stereo:duration={}:sample_rate=48000", @@ -223,55 +211,54 @@ fn add_audio(node: &mut Media, chain: &mut Filters) { fn extend_audio(node: &mut Media, chain: &mut Filters) { let audio_streams = node.probe.clone().unwrap().audio_streams.unwrap(); - if audio_streams.len() > 0 { + if !audio_streams.is_empty() { if let Some(duration) = &audio_streams[0].duration { let duration_float = duration.clone().parse::().unwrap(); if node.out - node.seek > duration_float - node.seek + 0.1 { - chain.add_filter( - &format!("apad=whole_dur={}", node.out - node.seek), - "audio", - ) + chain.add_filter(&format!("apad=whole_dur={}", node.out - node.seek), "audio") } } } } +/// Add single pass loudnorm filter to audio line. fn add_loudnorm(node: &mut Media, chain: &mut Filters, config: &GlobalConfig) { - // add single pass loudnorm filter to audio line - if node.probe.is_some() - && node.probe.clone().unwrap().audio_streams.unwrap().len() > 0 + && !node + .probe + .clone() + .unwrap() + .audio_streams + .unwrap() + .is_empty() && config.processing.add_loudnorm { - let loud_filter = format!( - "loudnorm=I={}:TP={}:LRA={}", - config.processing.loud_i, config.processing.loud_tp, config.processing.loud_lra - ); - + let loud_filter = a_loudnorm::filter_node(config); chain.add_filter(&loud_filter, "audio"); } } fn audio_volume(chain: &mut Filters, config: &GlobalConfig) { if config.processing.volume != 1.0 { - chain.add_filter( - &format!("volume={}", config.processing.volume), - "audio", - ) + chain.add_filter(&format!("volume={}", config.processing.volume), "audio") } } -fn aspect_calc(aspect_string: String) -> f64 { - let aspect_vec: Vec<&str> = aspect_string.split(':').collect(); - let w: f64 = aspect_vec[0].parse().unwrap(); - let h: f64 = aspect_vec[1].parse().unwrap(); - let source_aspect: f64 = w as f64 / h as f64; +fn aspect_calc(aspect_string: &Option, config: &GlobalConfig) -> f64 { + let mut source_aspect = config.processing.aspect; + + if let Some(aspect) = aspect_string { + let aspect_vec: Vec<&str> = aspect.split(':').collect(); + let w: f64 = aspect_vec[0].parse().unwrap(); + let h: f64 = aspect_vec[1].parse().unwrap(); + source_aspect = w as f64 / h as f64; + } source_aspect } -fn fps_calc(r_frame_rate: String) -> f64 { +fn fps_calc(r_frame_rate: &str) -> f64 { let frame_rate_vec: Vec<&str> = r_frame_rate.split('/').collect(); let rate: f64 = frame_rate_vec[0].parse().unwrap(); let factor: f64 = frame_rate_vec[1].parse().unwrap(); @@ -280,14 +267,8 @@ fn fps_calc(r_frame_rate: String) -> f64 { fps } -fn realtime_filter( - node: &mut Media, - chain: &mut Filters, - config: &GlobalConfig, - codec_type: &str, -) { - // this realtime filter is important for HLS output to stay in sync - +/// This realtime filter is important for HLS output to stay in sync. +fn realtime_filter(node: &mut Media, chain: &mut Filters, config: &GlobalConfig, codec_type: &str) { let mut t = ""; if codec_type == "audio" { @@ -325,18 +306,18 @@ pub fn filter_chains(node: &mut Media) -> Vec { } let v_stream = &probe.video_streams.unwrap()[0]; - let aspect = aspect_calc(v_stream.display_aspect_ratio.clone().unwrap()); - let frame_per_sec = fps_calc(v_stream.r_frame_rate.clone()); + let aspect = aspect_calc(&v_stream.display_aspect_ratio, config); + let frame_per_sec = fps_calc(&v_stream.r_frame_rate); - deinterlace(v_stream.field_order.clone(), &mut filters); - pad(aspect, &mut filters, &config); - fps(frame_per_sec, &mut filters, &config); + deinterlace(&v_stream.field_order, &mut filters); + pad(aspect, &mut filters, config); + fps(frame_per_sec, &mut filters, config); scale( v_stream.width.unwrap(), v_stream.height.unwrap(), aspect, &mut filters, - &config, + config, ); extend_video(node, &mut filters); @@ -344,15 +325,15 @@ pub fn filter_chains(node: &mut Media) -> Vec { extend_audio(node, &mut filters); } - add_text(node, &mut filters, &config); - fade(node, &mut filters, "video".into()); - overlay(node, &mut filters, &config); - realtime_filter(node, &mut filters, &config, "video".into()); + add_text(node, &mut filters, config); + fade(node, &mut filters, "video"); + overlay(node, &mut filters, config); + realtime_filter(node, &mut filters, config, "video"); - add_loudnorm(node, &mut filters, &config); - fade(node, &mut filters, "audio".into()); - audio_volume(&mut filters, &config); - realtime_filter(node, &mut filters, &config, "audio".into()); + add_loudnorm(node, &mut filters, config); + fade(node, &mut filters, "audio"); + audio_volume(&mut filters, config); + realtime_filter(node, &mut filters, config, "audio"); let mut filter_cmd = vec![]; let mut filter_str: String = String::new(); @@ -368,7 +349,7 @@ pub fn filter_chains(node: &mut Media) -> Vec { if let Some(a_filters) = filters.audio_chain { if filter_str.len() > 10 { - filter_str.push_str(";") + filter_str.push(';') } filter_str.push_str(a_filters.as_str()); filter_str.push_str(filters.audio_map.clone().unwrap().as_str()); diff --git a/src/filter/v_drawtext.rs b/src/filter/v_drawtext.rs index 9b429eaf..3a787684 100644 --- a/src/filter/v_drawtext.rs +++ b/src/filter/v_drawtext.rs @@ -24,14 +24,14 @@ pub fn filter_node(node: &mut Media) -> String { }; let escape = text - .replace("'", "'\\\\\\''") - .replace("%", "\\\\\\%") - .replace(":", "\\:"); + .replace('\'', "'\\\\\\''") + .replace('%', "\\\\\\%") + .replace(':', "\\:"); filter = format!("drawtext=text='{escape}':{}{font}", config.text.style) } else { filter = format!( "zmq=b=tcp\\\\://'{}',drawtext=text=''{font}", - config.text.bind_address.replace(":", "\\:") + config.text.bind_address.replace(':', "\\:") ) } } diff --git a/src/filter/v_overlay.rs b/src/filter/v_overlay.rs new file mode 100644 index 00000000..0607090c --- /dev/null +++ b/src/filter/v_overlay.rs @@ -0,0 +1,30 @@ +use std::path::Path; + +use crate::utils::GlobalConfig; + +/// Overlay Filter +/// +/// When a logo is set, we create here the filter for the server. +pub fn filter_node(config: &GlobalConfig, add_tail: bool) -> String { + let mut logo_chain = String::new(); + + if config.processing.add_logo && Path::new(&config.processing.logo).is_file() { + let opacity = format!( + "format=rgba,colorchannelmixer=aa={}", + config.processing.logo_opacity + ); + let logo_loop = "loop=loop=-1:size=1:start=0"; + logo_chain = format!( + "null[v];movie={},{logo_loop},{opacity}", + config.processing.logo + ); + + if add_tail { + logo_chain.push_str( + format!("[l];[v][l]{}:shortest=1", config.processing.logo_filter).as_str(), + ); + } + } + + logo_chain +} diff --git a/src/input/folder.rs b/src/input/folder.rs index ee0be6c3..ca8c49a8 100644 --- a/src/input/folder.rs +++ b/src/input/folder.rs @@ -21,15 +21,18 @@ use walkdir::WalkDir; use crate::utils::{get_sec, GlobalConfig, Media}; +/// Folder Sources +/// +/// Like playlist source, we create here a folder list for iterate over it. #[derive(Debug, Clone)] -pub struct Source { +pub struct FolderSource { config: GlobalConfig, pub nodes: Arc>>, current_node: Media, index: Arc, } -impl Source { +impl FolderSource { pub fn new(current_list: Arc>>, global_index: Arc) -> Self { let config = GlobalConfig::global(); let mut media_list = vec![]; @@ -90,12 +93,9 @@ impl Source { fn shuffle(&mut self) { let mut rng = thread_rng(); self.nodes.lock().unwrap().shuffle(&mut rng); - let mut index: usize = 0; - for item in self.nodes.lock().unwrap().iter_mut() { + for (index, item) in self.nodes.lock().unwrap().iter_mut().enumerate() { item.index = Some(index); - - index += 1; } } @@ -104,17 +104,15 @@ impl Source { .lock() .unwrap() .sort_by(|d1, d2| d1.source.cmp(&d2.source)); - let mut index: usize = 0; - for item in self.nodes.lock().unwrap().iter_mut() { + for (index, item) in self.nodes.lock().unwrap().iter_mut().enumerate() { item.index = Some(index); - - index += 1; } } } -impl Iterator for Source { +/// Create iterator for folder source +impl Iterator for FolderSource { type Item = Media; fn next(&mut self) -> Option { @@ -159,6 +157,9 @@ fn file_extension(filename: &Path) -> Option<&str> { filename.extension().and_then(OsStr::to_str) } +/// Create a watcher, which monitor file changes. +/// When a change is register, update the current file list. +/// This makes it possible, to play infinitely and and always new files to it. pub fn watchman(sources: Arc>>) { let config = GlobalConfig::global(); let (tx, rx) = channel(); diff --git a/src/input/ingest.rs b/src/input/ingest.rs index 66ecc59d..05961723 100644 --- a/src/input/ingest.rs +++ b/src/input/ingest.rs @@ -1,6 +1,5 @@ use std::{ io::{BufReader, Error, Read}, - path::Path, process::{Command, Stdio}, sync::atomic::Ordering, thread, @@ -9,48 +8,12 @@ use std::{ use crossbeam_channel::Sender; use simplelog::*; +use crate::filter::ingest_filter::filter_cmd; use crate::utils::{stderr_reader, GlobalConfig, Ingest, ProcessControl}; -fn overlay(config: &GlobalConfig) -> String { - let mut logo_chain = String::new(); - - if config.processing.add_logo && Path::new(&config.processing.logo).is_file() { - let opacity = format!( - "format=rgba,colorchannelmixer=aa={}", - config.processing.logo_opacity - ); - let logo_loop = "loop=loop=-1:size=1:start=0"; - logo_chain = format!("[v];movie={},{logo_loop},{opacity}", config.processing.logo); - - logo_chain - .push_str(format!("[l];[v][l]{}:shortest=1", config.processing.logo_filter).as_str()); - } - - logo_chain -} - -fn audio_filter(config: &GlobalConfig) -> String { - let mut audio_chain = ";[0:a]afade=in:st=0:d=0.5".to_string(); - - if config.processing.add_loudnorm { - audio_chain.push_str( - format!( - ",loudnorm=I={}:TP={}:LRA={}", - config.processing.loud_i, config.processing.loud_tp, config.processing.loud_lra - ) - .as_str(), - ); - } - - if config.processing.volume != 1.0 { - audio_chain.push_str(format!(",volume={}", config.processing.volume).as_str()); - } - - audio_chain.push_str("[aout1]"); - - audio_chain -} - +/// ffmpeg Ingest Server +/// +/// Start ffmpeg in listen mode, and wait for input. pub fn ingest_server( log_format: String, ingest_sender: Sender<(usize, [u8; 65088])>, @@ -58,32 +21,14 @@ pub fn ingest_server( ) -> Result<(), Error> { let config = GlobalConfig::global(); let mut buffer: [u8; 65088] = [0; 65088]; - let mut filter = format!( - "[0:v]fps={},scale={}:{},setdar=dar={},fade=in:st=0:d=0.5", - config.processing.fps, - config.processing.width, - config.processing.height, - config.processing.aspect - ); - - filter.push_str(&overlay(&config)); - filter.push_str("[vout1]"); - filter.push_str(audio_filter(&config).as_str()); - let mut filter_list = vec![ - "-filter_complex", - &filter, - "-map", - "[vout1]", - "-map", - "[aout1]", - ]; + let filter_list = filter_cmd(); let mut server_cmd = vec!["-hide_banner", "-nostats", "-v", log_format.as_str()]; let stream_input = config.ingest.input_cmd.clone().unwrap(); let stream_settings = config.processing.settings.clone().unwrap(); server_cmd.append(&mut stream_input.iter().map(String::as_str).collect()); - server_cmd.append(&mut filter_list); + server_cmd.append(&mut filter_list.iter().map(String::as_str).collect()); server_cmd.append(&mut stream_settings.iter().map(String::as_str).collect()); let mut is_running; @@ -116,7 +61,6 @@ pub fn ingest_server( let error_reader_thread = thread::spawn(move || stderr_reader(server_err, "Server")); *proc_control.server_term.lock().unwrap() = Some(server_proc); - is_running = false; loop { @@ -146,7 +90,9 @@ pub fn ingest_server( } drop(ingest_reader); - proc_control.server_is_running.store(false, Ordering::SeqCst); + proc_control + .server_is_running + .store(false, Ordering::SeqCst); if let Err(e) = proc_control.wait(Ingest) { error!("{e}") diff --git a/src/input/mod.rs b/src/input/mod.rs index f4112487..a8863045 100644 --- a/src/input/mod.rs +++ b/src/input/mod.rs @@ -1,6 +1,9 @@ use std::{ process, - sync::{Arc, Mutex, atomic::{AtomicBool, AtomicUsize}}, + sync::{ + atomic::{AtomicBool, AtomicUsize}, + Arc, Mutex, + }, thread, }; @@ -12,10 +15,11 @@ pub mod folder; pub mod ingest; pub mod playlist; -pub use folder::{watchman, Source}; +pub use folder::{watchman, FolderSource}; pub use ingest::ingest_server; pub use playlist::CurrentProgram; +/// Create a source iterator from playlist, or from folder. pub fn source_generator( config: GlobalConfig, current_list: Arc>>, @@ -23,31 +27,31 @@ pub fn source_generator( playout_stat: PlayoutStatus, is_terminated: Arc, ) -> Box> { - let get_source = match config.processing.clone().mode.as_str() { + let get_source = match config.processing.mode.as_str() { "folder" => { info!("Playout in folder mode"); - debug!("Monitor folder: {}", &config.storage.path); + debug!( + "Monitor folder: {}", + &config.storage.path + ); - let folder_source = Source::new(current_list, index); + let folder_source = FolderSource::new(current_list, index); let node_clone = folder_source.nodes.clone(); + + // Spawn a thread to monitor folder for file changes. thread::spawn(move || watchman(node_clone)); Box::new(folder_source) as Box> } "playlist" => { info!("Playout in playlist mode"); - let program = CurrentProgram::new( - playout_stat, - is_terminated.clone(), - current_list, - index, - ); + let program = CurrentProgram::new(playout_stat, is_terminated, current_list, index); Box::new(program) as Box> } _ => { error!("Process Mode not exists!"); - process::exit(0x0100); + process::exit(1); } }; diff --git a/src/input/playlist.rs b/src/input/playlist.rs index 4a488485..36a2dd61 100644 --- a/src/input/playlist.rs +++ b/src/input/playlist.rs @@ -1,7 +1,10 @@ use std::{ fs, path::Path, - sync::{atomic::{AtomicBool, AtomicUsize, Ordering}, Arc, Mutex}, + sync::{ + atomic::{AtomicBool, AtomicUsize, Ordering}, + Arc, Mutex, + }, }; use serde_json::json; @@ -12,6 +15,9 @@ use crate::utils::{ seek_and_length, GlobalConfig, Media, PlayoutStatus, DUMMY_LEN, }; +/// Struct for current playlist. +/// +/// Here we prepare the init clip and build a iterator where we pull our clips. #[derive(Debug)] pub struct CurrentProgram { config: GlobalConfig, @@ -63,6 +69,7 @@ impl CurrentProgram { } } + // Check if playlist file got updated, and when yes we reload it and setup everything in place. fn check_update(&mut self, seek: bool) { if self.json_path.is_none() { let json = read_json(None, self.is_terminated.clone(), seek, 0.0); @@ -115,15 +122,16 @@ impl CurrentProgram { } } + // Check if day is past and it is time for a new playlist. fn check_for_next_playlist(&mut self) { let current_time = get_sec(); let start_sec = self.config.playlist.start_sec.unwrap(); let target_length = self.config.playlist.length_sec.unwrap(); let (delta, total_delta) = get_delta(¤t_time); - let mut duration = self.current_node.out.clone(); + let mut duration = self.current_node.out; if self.current_node.duration > self.current_node.out { - duration = self.current_node.duration.clone() + duration = self.current_node.duration } let next_start = self.current_node.begin.unwrap() - start_sec + duration + delta; @@ -158,32 +166,27 @@ impl CurrentProgram { } } + // Check if last and/or next clip is a advertisement. fn last_next_ad(&mut self) { let index = self.index.load(Ordering::SeqCst); let current_list = self.nodes.lock().unwrap(); if index + 1 < current_list.len() - && ¤t_list[index + 1] - .category - .clone() - .unwrap_or(String::new()) - == "advertisement" + && ¤t_list[index + 1].category.clone().unwrap_or_default() == "advertisement" { self.current_node.next_ad = Some(true); } if index > 0 && index < current_list.len() - && ¤t_list[index - 1] - .category - .clone() - .unwrap_or(String::new()) - == "advertisement" + && ¤t_list[index - 1].category.clone().unwrap_or_default() == "advertisement" { self.current_node.last_ad = Some(true); } } + // Get current time and when we are before start time, + // we add full seconds of a day to it. fn get_current_time(&mut self) -> f64 { let mut time_sec = get_sec(); @@ -194,6 +197,7 @@ impl CurrentProgram { time_sec } + // On init or reload we need to seek for the current clip. fn get_current_clip(&mut self) { let mut time_sec = self.get_current_time(); @@ -216,6 +220,7 @@ impl CurrentProgram { } } + // Prepare init clip. fn init_clip(&mut self) { self.get_current_clip(); @@ -232,6 +237,7 @@ impl CurrentProgram { } } +/// Build the playlist iterator impl Iterator for CurrentProgram { type Item = Media; @@ -245,7 +251,7 @@ impl Iterator for CurrentProgram { } if self.playout_stat.list_init.load(Ordering::SeqCst) { - // on init load playlist, could be not long enough, + // On init load, playlist could be not long enough, // so we check if we can take the next playlist already, // or we fill the gap with a dummy. let list_length = self.nodes.lock().unwrap().len(); @@ -281,7 +287,8 @@ impl Iterator for CurrentProgram { self.current_node = gen_source(media); self.nodes.lock().unwrap().push(self.current_node.clone()); - self.index.store(self.nodes.lock().unwrap().len(), Ordering::SeqCst); + self.index + .store(self.nodes.lock().unwrap().len(), Ordering::SeqCst); } } @@ -314,7 +321,7 @@ impl Iterator for CurrentProgram { Some(self.current_node.clone()) } else { let last_playlist = self.json_path.clone(); - let last_ad = self.current_node.last_ad.clone(); + let last_ad = self.current_node.last_ad; self.check_for_next_playlist(); let (_, total_delta) = get_delta(&self.config.playlist.start_sec.unwrap()); @@ -357,22 +364,22 @@ impl Iterator for CurrentProgram { } } +/// Prepare input clip: +/// +/// - check begin and length from clip +/// - return clip only if we are in 24 hours time range fn timed_source( node: Media, config: &GlobalConfig, last: bool, playout_stat: &PlayoutStatus, ) -> Media { - // prepare input clip - // check begin and length from clip - // return clip only if we are in 24 hours time range - let (delta, total_delta) = get_delta(&node.begin.unwrap()); let mut shifted_delta = delta; let mut new_node = node.clone(); new_node.process = Some(false); - if config.playlist.length.contains(":") { + if config.playlist.length.contains(':') { let time_shift = playout_stat.time_shift.lock().unwrap(); if *playout_stat.current_date.lock().unwrap() == *playout_stat.date.lock().unwrap() @@ -398,7 +405,7 @@ fn timed_source( if (total_delta > node.out - node.seek && !last) || node.index.unwrap() < 2 - || !config.playlist.length.contains(":") + || !config.playlist.length.contains(':') { // when we are in the 24 hour range, get the clip new_node = gen_source(node); @@ -412,6 +419,7 @@ fn timed_source( new_node } +/// Generate the source CMD, or when clip not exist, get a dummy. fn gen_source(mut node: Media) -> Media { if Path::new(&node.source).is_file() { node.add_probe(); @@ -440,10 +448,9 @@ fn gen_source(mut node: Media) -> Media { node } +/// Handle init clip, but this clip can be the last one in playlist, +/// this we have to figure out and calculate the right length. fn handle_list_init(mut node: Media) -> Media { - // handle init clip, but this clip can be the last one in playlist, - // this we have to figure out and calculate the right length - let (_, total_delta) = get_delta(&node.begin.unwrap()); let mut out = node.out; @@ -452,16 +459,13 @@ fn handle_list_init(mut node: Media) -> Media { } node.out = out; - - let new_node = gen_source(node); - new_node + gen_source(node) } +/// when we come to last clip in playlist, +/// or when we reached total playtime, +/// we end up here fn handle_list_end(mut node: Media, total_delta: f64) -> Media { - // when we come to last clip in playlist, - // or when we reached total playtime, - // we end up here - debug!("Playlist end"); let mut out = if node.seek > 0.0 { diff --git a/src/main.rs b/src/main.rs index 0427caf6..5de25627 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,11 +2,10 @@ extern crate log; extern crate simplelog; use std::{ - {fs, fs::File}, + fs::{self, File}, path::PathBuf, process::exit, thread, - }; use serde::{Deserialize, Serialize}; @@ -32,26 +31,26 @@ struct StatusData { date: String, } -fn main() { - init_config(); - let config = GlobalConfig::global(); - let play_control = PlayerControl::new(); - let playout_stat = PlayoutStatus::new(); - let proc_control = ProcessControl::new(); - - if !PathBuf::from(config.general.stat_file.clone()).exists() { +/// Here we create a status file in temp folder. +/// We need this for reading/saving program status. +/// For example when we skip a playing file, +/// we save the time difference, so we stay in sync. +/// +/// When file not exists we create it, and when it exists we get its values. +fn status_file(stat_file: &str, playout_stat: &PlayoutStatus) { + if !PathBuf::from(stat_file).exists() { let data = json!({ "time_shift": 0.0, "date": String::new(), }); let json: String = serde_json::to_string(&data).expect("Serialize status data failed"); - fs::write(config.general.stat_file.clone(), &json).expect("Unable to write file"); + fs::write(stat_file, &json).expect("Unable to write file"); } else { let stat_file = File::options() .read(true) .write(false) - .open(&config.general.stat_file) + .open(&stat_file) .expect("Could not open status file"); let data: StatusData = @@ -60,13 +59,24 @@ fn main() { *playout_stat.time_shift.lock().unwrap() = data.time_shift; *playout_stat.date.lock().unwrap() = data.date; } +} + +fn main() { + // Init the config, set process controller, create logging. + init_config(); + let config = GlobalConfig::global(); + let play_control = PlayerControl::new(); + let playout_stat = PlayoutStatus::new(); + let proc_control = ProcessControl::new(); let logging = init_logging(); CombinedLogger::init(logging).unwrap(); validate_ffmpeg(); + status_file(&config.general.stat_file, &playout_stat); if let Some(range) = config.general.generate.clone() { + // run a simple playlist generator and save them to disk generate_playlist(range); exit(0); @@ -77,16 +87,15 @@ fn main() { let proc_ctl = proc_control.clone(); if config.rpc_server.enable { - thread::spawn( move || json_rpc_server( - play_ctl, - play_stat, - proc_ctl, - )); + // If RPC server is enable we also fire up a JSON RPC server. + thread::spawn(move || json_rpc_server(play_ctl, play_stat, proc_ctl)); } if &config.out.mode.to_lowercase() == "hls" { + // write files/playlist to HLS m3u8 playlist write_hls(play_control, playout_stat, proc_control); } else { + // play on desktop or stream to a remote target player(play_control, playout_stat, proc_control); } diff --git a/src/output/desktop.rs b/src/output/desktop.rs index 3afd04b1..a94fa2f5 100644 --- a/src/output/desktop.rs +++ b/src/output/desktop.rs @@ -5,6 +5,9 @@ use simplelog::*; use crate::filter::v_drawtext; use crate::utils::{GlobalConfig, Media}; +/// Desktop Output +/// +/// Instead of streaming, we run a ffplay instance and play on desktop. pub fn output(log_format: &str) -> process::Child { let config = GlobalConfig::global(); diff --git a/src/output/hls.rs b/src/output/hls.rs index 51d5869b..deabfbf2 100644 --- a/src/output/hls.rs +++ b/src/output/hls.rs @@ -18,26 +18,129 @@ out: */ use std::{ - io::BufReader, + io::{BufRead, BufReader, Error}, process::{Command, Stdio}, - thread, + sync::atomic::Ordering, + thread::{self, sleep}, + time::Duration, }; use simplelog::*; +use crate::filter::ingest_filter::filter_cmd; use crate::input::source_generator; use crate::utils::{ - sec_to_time, stderr_reader, GlobalConfig, PlayerControl, PlayoutStatus, ProcessControl, + sec_to_time, stderr_reader, Decoder, GlobalConfig, Ingest, PlayerControl, PlayoutStatus, + ProcessControl, }; +fn format_line(line: String, level: &str) -> String { + line.replace(&format!("[{level: >5}] "), "") +} + +/// Ingest Server for HLS +fn ingest_to_hls_server( + playout_stat: PlayoutStatus, + mut proc_control: ProcessControl, +) -> Result<(), Error> { + let config = GlobalConfig::global(); + let dec_settings = config.out.clone().output_cmd.unwrap(); + let playlist_init = playout_stat.list_init; + let filter_list = filter_cmd(); + + let mut server_cmd = vec!["-hide_banner", "-nostats", "-v", "level+info"]; + let stream_input = config.ingest.input_cmd.clone().unwrap(); + + server_cmd.append(&mut stream_input.iter().map(String::as_str).collect()); + server_cmd.append(&mut filter_list.iter().map(String::as_str).collect()); + server_cmd.append(&mut dec_settings.iter().map(String::as_str).collect()); + + let mut is_running; + + info!( + "Start ingest server, listening on: {}", + stream_input.last().unwrap() + ); + + debug!( + "Server CMD: \"ffmpeg {}\"", + server_cmd.join(" ") + ); + + loop { + let mut server_proc = match Command::new("ffmpeg") + .args(server_cmd.clone()) + .stderr(Stdio::piped()) + .spawn() + { + Err(e) => { + error!("couldn't spawn ingest server: {e}"); + panic!("couldn't spawn ingest server: {e}") + } + Ok(proc) => proc, + }; + + let server_err = BufReader::new(server_proc.stderr.take().unwrap()); + *proc_control.server_term.lock().unwrap() = Some(server_proc); + is_running = false; + + for line in server_err.lines() { + let line = line?; + + if !is_running { + proc_control.server_is_running.store(true, Ordering::SeqCst); + playlist_init.store(true, Ordering::SeqCst); + is_running = true; + + info!("Switch from {} to live ingest", config.processing.mode); + + if let Err(e) = proc_control.kill(Decoder) { + error!("{e}"); + } + } + + if line.contains("[error]") + && !line.contains("Input/output error") + && !line.contains("Broken pipe") + { + error!( + "[server] {}", + format_line(line.clone(), "error") + ); + } + } + + info!("Switch from live ingest to {}", config.processing.mode); + + proc_control + .server_is_running + .store(false, Ordering::SeqCst); + + if let Err(e) = proc_control.wait(Ingest) { + error!("{e}") + } + + if proc_control.is_terminated.load(Ordering::SeqCst) { + break; + } + } + + Ok(()) +} + +/// HLS Writer +/// +/// Write with single ffmpeg instance directly to a HLS playlist. pub fn write_hls( play_control: PlayerControl, playout_stat: PlayoutStatus, - proc_control: ProcessControl, + mut proc_control: ProcessControl, ) { let config = GlobalConfig::global(); let dec_settings = config.out.clone().output_cmd.unwrap(); let ff_log_format = format!("level+{}", config.logging.ffmpeg_level.to_lowercase()); + let play_stat = playout_stat.clone(); + let proc_control_c = proc_control.clone(); let get_source = source_generator( config.clone(), @@ -47,6 +150,11 @@ pub fn write_hls( proc_control.is_terminated.clone(), ); + // spawn a thread for ffmpeg ingest server and create a channel for package sending + if config.ingest.enable { + thread::spawn(move || ingest_to_hls_server(play_stat, proc_control_c)); + } + for node in get_source { *play_control.current_media.lock().unwrap() = Some(node.clone()); @@ -93,14 +201,18 @@ pub fn write_hls( }; let dec_err = BufReader::new(dec_proc.stderr.take().unwrap()); - let error_decoder_thread = thread::spawn(move || stderr_reader(dec_err, "Writer")); + *proc_control.decoder_term.lock().unwrap() = Some(dec_proc); - if let Err(e) = dec_proc.wait() { - error!("Writer: {e}") + if let Err(e) = stderr_reader(dec_err, "Writer") { + error!("{e:?}") }; - if let Err(e) = error_decoder_thread.join() { - error!("{e:?}"); - }; + if let Err(e) = proc_control.wait(Decoder) { + error!("{e}"); + } + + while proc_control.server_is_running.load(Ordering::SeqCst) { + sleep(Duration::from_secs(1)); + } } } diff --git a/src/output/mod.rs b/src/output/mod.rs index 618e5da4..fd2ece80 100644 --- a/src/output/mod.rs +++ b/src/output/mod.rs @@ -21,6 +21,15 @@ use crate::utils::{ ProcessControl, }; +/// Player +/// +/// Here we create the input file loop, from playlist, or folder source. +/// Then we read the stdout from the reader ffmpeg instance +/// and write it to the stdin from the streamer ffmpeg instance. +/// If it is configured we also fire up a ffmpeg ingest server instance, +/// for getting live feeds. +/// When a live ingest arrive, it stops the current playing and switch to the live source. +/// When ingest stops, it switch back to playlist/folder mode. pub fn player( play_control: PlayerControl, playout_stat: PlayoutStatus, @@ -33,6 +42,7 @@ pub fn player( let mut live_on = false; let playlist_init = playout_stat.list_init.clone(); + // get source iterator let get_source = source_generator( config.clone(), play_control.current_list.clone(), @@ -41,6 +51,7 @@ pub fn player( proc_control.is_terminated.clone(), ); + // get ffmpeg output instance let mut enc_proc = match config.out.mode.as_str() { "desktop" => desktop::output(&ff_log_format), "stream" => stream::output(&ff_log_format), @@ -49,16 +60,20 @@ pub fn player( let mut enc_writer = BufWriter::new(enc_proc.stdin.take().unwrap()); let enc_err = BufReader::new(enc_proc.stderr.take().unwrap()); + + // spawn a thread to log ffmpeg output error messages let error_encoder_thread = thread::spawn(move || stderr_reader(enc_err, "Encoder")); *proc_control.decoder_term.lock().unwrap() = Some(enc_proc); - let (ingest_sender, ingest_receiver) = bounded(96); - let ff_log_format_c = ff_log_format.clone(); let proc_control_c = proc_control.clone(); + let mut ingest_receiver = None; + // spawn a thread for ffmpeg ingest server and create a channel for package sending if config.ingest.enable { + let (ingest_sender, rx) = bounded(96); + ingest_receiver = Some(rx); thread::spawn(move || ingest_server(ff_log_format_c, ingest_sender, proc_control_c)); } @@ -95,6 +110,7 @@ pub fn player( dec_cmd.join(" ") ); + // create ffmpeg decoder instance, for reading the input files let mut dec_proc = match Command::new("ffmpeg") .args(dec_cmd) .stdout(Stdio::piped()) @@ -115,6 +131,7 @@ pub fn player( *proc_control.decoder_term.lock().unwrap() = Some(dec_proc); loop { + // when server is running, read from channel if proc_control.server_is_running.load(Ordering::SeqCst) { if !live_on { info!("Switch from {} to live ingest", config.processing.mode); @@ -131,13 +148,14 @@ pub fn player( playlist_init.store(true, Ordering::SeqCst); } - for rx in ingest_receiver.try_iter() { + for rx in ingest_receiver.as_ref().unwrap().try_iter() { if let Err(e) = enc_writer.write(&rx.1[..rx.0]) { error!("Encoder write error: {:?}", e); break 'source_iter; }; } + // read from decoder instance } else { if live_on { info!("Switch from live ingest to {}", config.processing.mode); diff --git a/src/output/stream.rs b/src/output/stream.rs index 8dc7b276..9c203a09 100644 --- a/src/output/stream.rs +++ b/src/output/stream.rs @@ -8,6 +8,9 @@ use simplelog::*; use crate::filter::v_drawtext; use crate::utils::{GlobalConfig, Media}; +/// Streaming Output +/// +/// Prepare the ffmpeg command for streaming output pub fn output(log_format: &str) -> process::Child { let config = GlobalConfig::global(); let mut enc_filter: Vec = vec![]; @@ -51,7 +54,10 @@ pub fn output(log_format: &str) -> process::Child { enc_cmd.append(&mut preview); enc_cmd.append(&mut output_cmd.iter().map(String::as_str).collect()); - debug!("Encoder CMD: \"ffmpeg {}\"", enc_cmd.join(" ")); + debug!( + "Encoder CMD: \"ffmpeg {}\"", + enc_cmd.join(" ") + ); let enc_proc = match Command::new("ffmpeg") .args(enc_cmd) diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index f15ecc20..a8e56d1e 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -13,6 +13,7 @@ use crate::utils::{ PlayoutStatus, ProcessControl, }; +/// map media struct to json object fn get_media_map(media: Media) -> Value { json!({ "seek": media.seek, @@ -23,6 +24,7 @@ fn get_media_map(media: Media) -> Value { }) } +/// prepare json object for response fn get_data_map(config: &GlobalConfig, media: Media) -> Map { let mut data_map = Map::new(); let begin = media.begin.unwrap_or(0.0); @@ -45,6 +47,14 @@ fn get_data_map(config: &GlobalConfig, media: Media) -> Map { data_map } +/// JSON RPC Server +/// +/// A simple rpc server for getting status information and controlling player: +/// +/// - current clip information +/// - jump to next clip +/// - get last clip +/// - reset player state to original clip pub fn json_rpc_server( play_control: PlayerControl, playout_stat: PlayoutStatus, @@ -52,7 +62,6 @@ pub fn json_rpc_server( ) { let config = GlobalConfig::global(); let mut io = IoHandler::default(); - let play = play_control.clone(); let proc = proc_control.clone(); io.add_sync_method("player", move |params: Params| { @@ -61,10 +70,11 @@ pub fn json_rpc_server( let current_date = playout_stat.current_date.lock().unwrap().clone(); let mut date = playout_stat.date.lock().unwrap(); + // get next clip if map.contains_key("control") && &map["control"] == "next" { - let index = play.index.load(Ordering::SeqCst); + let index = play_control.index.load(Ordering::SeqCst); - if index < play.current_list.lock().unwrap().len() { + if index < play_control.current_list.lock().unwrap().len() { if let Some(proc) = proc.decoder_term.lock().unwrap().as_mut() { if let Err(e) = proc.kill() { error!("Decoder {e:?}") @@ -77,7 +87,7 @@ pub fn json_rpc_server( info!("Move to next clip"); let mut data_map = Map::new(); - let mut media = play.current_list.lock().unwrap()[index].clone(); + let mut media = play_control.current_list.lock().unwrap()[index].clone(); media.add_probe(); let (delta, _) = get_delta(&media.begin.unwrap_or(0.0)); @@ -98,10 +108,11 @@ pub fn json_rpc_server( return Ok(Value::String("Last clip can not be skipped".to_string())); } + // get last clip if map.contains_key("control") && &map["control"] == "back" { - let index = play.index.load(Ordering::SeqCst); + let index = play_control.index.load(Ordering::SeqCst); - if index > 1 && play.current_list.lock().unwrap().len() > 1 { + if index > 1 && play_control.current_list.lock().unwrap().len() > 1 { if let Some(proc) = proc.decoder_term.lock().unwrap().as_mut() { if let Err(e) = proc.kill() { error!("Decoder {e:?}") @@ -113,8 +124,9 @@ pub fn json_rpc_server( info!("Move to last clip"); let mut data_map = Map::new(); - let mut media = play.current_list.lock().unwrap()[index - 2].clone(); - play.index.fetch_sub(2, Ordering::SeqCst); + let mut media = + play_control.current_list.lock().unwrap()[index - 2].clone(); + play_control.index.fetch_sub(2, Ordering::SeqCst); media.add_probe(); let (delta, _) = get_delta(&media.begin.unwrap_or(0.0)); @@ -135,6 +147,7 @@ pub fn json_rpc_server( return Ok(Value::String("Clip index out of range".to_string())); } + // reset player state if map.contains_key("control") && &map["control"] == "reset" { if let Some(proc) = proc.decoder_term.lock().unwrap().as_mut() { if let Err(e) = proc.kill() { @@ -161,19 +174,21 @@ pub fn json_rpc_server( return Ok(Value::String("Reset playout state failed".to_string())); } + // get infos about current clip if map.contains_key("media") && &map["media"] == "current" { - if let Some(media) = play.current_media.lock().unwrap().clone() { + if let Some(media) = play_control.current_media.lock().unwrap().clone() { let data_map = get_data_map(config, media); return Ok(Value::Object(data_map)); }; } + // get infos about next clip if map.contains_key("media") && &map["media"] == "next" { - let index = play.index.load(Ordering::SeqCst); + let index = play_control.index.load(Ordering::SeqCst); - if index < play.current_list.lock().unwrap().len() { - let media = play.current_list.lock().unwrap()[index].clone(); + if index < play_control.current_list.lock().unwrap().len() { + let media = play_control.current_list.lock().unwrap()[index].clone(); let data_map = get_data_map(config, media); @@ -183,11 +198,12 @@ pub fn json_rpc_server( return Ok(Value::String("There is no next clip".to_string())); } + // get infos about last clip if map.contains_key("media") && &map["media"] == "last" { - let index = play.index.load(Ordering::SeqCst); + let index = play_control.index.load(Ordering::SeqCst); - if index > 1 && index - 2 < play.current_list.lock().unwrap().len() { - let media = play.current_list.lock().unwrap()[index - 2].clone(); + if index > 1 && index - 2 < play_control.current_list.lock().unwrap().len() { + let media = play_control.current_list.lock().unwrap()[index - 2].clone(); let data_map = get_data_map(config, media); @@ -201,10 +217,12 @@ pub fn json_rpc_server( Ok(Value::String("No, or wrong parameters set!".to_string())) }); + // build rpc server let server = ServerBuilder::new(io) .cors(DomainsValidation::AllowOnly(vec![ AccessControlAllowOrigin::Null, ])) + // add middleware, for authentication .request_middleware(|request: hyper::Request| { if request.headers().contains_key("authorization") && request.headers()["authorization"] == config.rpc_server.authorization @@ -223,7 +241,7 @@ pub fn json_rpc_server( .start_http(&config.rpc_server.address.parse().unwrap()) .expect("Unable to start RPC server"); - *proc_control.rpc_handle.lock().unwrap() = Some(server.close_handle().clone()); + *proc_control.rpc_handle.lock().unwrap() = Some(server.close_handle()); server.wait(); } diff --git a/src/utils/arg_parse.rs b/src/utils/arg_parse.rs index ff612d34..7adb6211 100644 --- a/src/utils/arg_parse.rs +++ b/src/utils/arg_parse.rs @@ -17,7 +17,7 @@ pub struct Args { long, help = "Generate playlist for date. Date-range is possible, like: 2022-01-01 - 2022-01-10.", name = "YYYY-MM-DD", - multiple_values=true + multiple_values = true )] pub generate: Option>, @@ -54,8 +54,7 @@ pub struct Args { pub volume: Option, } +/// Get arguments from command line, and return them. pub fn get_args() -> Args { - let args = Args::parse(); - - args + Args::parse() } diff --git a/src/utils/config.rs b/src/utils/config.rs index 07489213..0c753f44 100644 --- a/src/utils/config.rs +++ b/src/utils/config.rs @@ -12,6 +12,9 @@ use shlex::split; use crate::utils::{get_args, time_to_sec}; +/// Global Config +/// +/// This we init ones, when ffplayout is starting and use them globally in the hole program. #[derive(Debug, Serialize, Deserialize, Clone)] pub struct GlobalConfig { pub general: General, @@ -78,6 +81,7 @@ pub struct Processing { pub logo_opacity: f32, pub logo_filter: String, pub add_loudnorm: bool, + pub loudnorm_ingest: bool, pub loud_i: f32, pub loud_tp: f32, pub loud_lra: f32, @@ -132,6 +136,7 @@ pub struct Out { } impl GlobalConfig { + /// Read config from YAML file, and set some extra config values. fn new() -> Self { let args = get_args(); let mut config_path = match env::current_exe() { @@ -149,8 +154,7 @@ impl GlobalConfig { Ok(file) => file, Err(err) => { println!( - "{config_path:?} doesn't exists!\n{}\n\nSystem error: {err}", - "Put \"ffplayout.yml\" in \"/etc/playout/\" or beside the executable!" + "{config_path:?} doesn't exists!\nPut \"ffplayout.yml\" in \"/etc/playout/\" or beside the executable!\n\nSystem error: {err}" ); process::exit(0x0100); } @@ -167,12 +171,13 @@ impl GlobalConfig { let bitrate = config.processing.width * config.processing.height / 10; config.playlist.start_sec = Some(time_to_sec(&config.playlist.day_start)); - if config.playlist.length.contains(":") { + if config.playlist.length.contains(':') { config.playlist.length_sec = Some(time_to_sec(&config.playlist.length)); } else { config.playlist.length_sec = Some(86400.0); } + // We set the decoder settings here, so we only define them ones. let mut settings: Vec = vec![ "-pix_fmt", "yuv420p", @@ -209,6 +214,8 @@ impl GlobalConfig { config.out.preview_cmd = split(config.out.preview_param.as_str()); config.out.output_cmd = split(config.out.output_param.as_str()); + // Read command line arguments, and override the config with them. + if let Some(gen) = args.generate { config.general.generate = Some(gen); } @@ -237,7 +244,7 @@ impl GlobalConfig { if let Some(length) = args.length { config.playlist.length = length.clone(); - if length.contains(":") { + if length.contains(':') { config.playlist.length_sec = Some(time_to_sec(&length)); } else { config.playlist.length_sec = Some(86400.0); @@ -266,11 +273,10 @@ impl GlobalConfig { static INSTANCE: OnceCell = OnceCell::new(); +/// When add_loudnorm is False we use a different audio encoder, +/// s302m has higher quality, but is experimental +/// and works not well together with the loudnorm filter. fn pre_audio_codec(add_loudnorm: bool) -> Vec { - // when add_loudnorm is False we use a different audio encoder, - // s302m has higher quality, but is experimental - // and works not well together with the loudnorm filter - let mut codec = vec!["-c:a", "s302m", "-strict", "-2"]; if add_loudnorm { diff --git a/src/utils/controller.rs b/src/utils/controller.rs index c51b00ba..50add407 100644 --- a/src/utils/controller.rs +++ b/src/utils/controller.rs @@ -12,6 +12,7 @@ use simplelog::*; use crate::utils::Media; +/// Defined process units. pub enum ProcessUnit { Decoder, Encoder, @@ -30,6 +31,10 @@ impl fmt::Display for ProcessUnit { use ProcessUnit::*; +/// Process Controller +/// +/// We save here some global states, about what is running and which processes are alive. +/// This we need for process termination, skipping clip decoder etc. #[derive(Clone)] pub struct ProcessControl { pub decoder_term: Arc>>, @@ -88,6 +93,8 @@ impl ProcessControl { Ok(()) } + /// Wait for process to proper close. + /// This prevents orphaned/zombi processes in system pub fn wait(&mut self, proc: ProcessUnit) -> Result<(), String> { match proc { Decoder => { @@ -116,6 +123,7 @@ impl ProcessControl { Ok(()) } + /// No matter what is running, terminate them all. pub fn kill_all(&mut self) { self.is_terminated.store(true, Ordering::SeqCst); @@ -141,6 +149,7 @@ impl Drop for ProcessControl { } } +/// Global player control, to get infos about current clip etc. #[derive(Clone)] pub struct PlayerControl { pub current_media: Arc>>, @@ -158,6 +167,7 @@ impl PlayerControl { } } +/// Global playout control, for move forward/backward clip, or resetting playlist/state. #[derive(Clone, Debug)] pub struct PlayoutStatus { pub time_shift: Arc>, diff --git a/src/utils/generator.rs b/src/utils/generator.rs index 49e86ed4..eb32bf7c 100644 --- a/src/utils/generator.rs +++ b/src/utils/generator.rs @@ -1,3 +1,11 @@ +/// Simple Playlist Generator +/// +/// You can call ffplayout[.exe] -g YYYY-mm-dd - YYYY-mm-dd to generate JSON playlists. +/// +/// The generator takes the files from storage, which are set in config. +/// It also respect the shuffle/sort mode. +/// +/// Beside that it is really very basic, without any logic. use std::{ fs::{create_dir_all, write}, path::Path, @@ -8,33 +16,28 @@ use std::{ use chrono::{Duration, NaiveDate}; use simplelog::*; -use crate::input::Source; +use crate::input::FolderSource; use crate::utils::{json_serializer::Playlist, GlobalConfig, Media}; -fn get_date_range(date_range: &Vec) -> Vec { +/// Generate a vector with dates, from given range. +fn get_date_range(date_range: &[String]) -> Vec { let mut range = vec![]; - let start; - let end; - match NaiveDate::parse_from_str(&date_range[0], "%Y-%m-%d") { - Ok(s) => { - start = s; - } + let start = match NaiveDate::parse_from_str(&date_range[0], "%Y-%m-%d") { + Ok(s) => s, Err(_) => { error!("date format error in: {:?}", date_range[0]); exit(1); } - } + }; - match NaiveDate::parse_from_str(&date_range[2], "%Y-%m-%d") { - Ok(e) => { - end = e; - } + let end = match NaiveDate::parse_from_str(&date_range[2], "%Y-%m-%d") { + Ok(e) => e, Err(_) => { error!("date format error in: {:?}", date_range[2]); exit(1); } - } + }; let duration = end.signed_duration_since(start); let days = duration.num_days() + 1; @@ -46,9 +49,10 @@ fn get_date_range(date_range: &Vec) -> Vec { range } +/// Generate playlists pub fn generate_playlist(mut date_range: Vec) { let config = GlobalConfig::global(); - let total_length = config.playlist.length_sec.unwrap().clone(); + let total_length = config.playlist.length_sec.unwrap(); let current_list = Arc::new(Mutex::new(vec![Media::new(0, "".to_string(), false)])); let index = Arc::new(AtomicUsize::new(0)); let playlist_root = Path::new(&config.playlist.path); @@ -66,7 +70,7 @@ pub fn generate_playlist(mut date_range: Vec) { date_range = get_date_range(&date_range) } - let media_list = Source::new(current_list, index); + let media_list = FolderSource::new(current_list, index); let list_length = media_list.nodes.lock().unwrap().len(); for date in date_range { @@ -96,7 +100,7 @@ pub fn generate_playlist(mut date_range: Vec) { ); let mut filler = Media::new(0, config.storage.filler_clip.clone(), true); - let filler_length = filler.duration.clone(); + let filler_length = filler.duration; let mut length = 0.0; let mut round = 0; @@ -109,7 +113,7 @@ pub fn generate_playlist(mut date_range: Vec) { }; for item in media_list.clone() { - let duration = item.duration.clone(); + let duration = item.duration; if total_length > length + duration { playlist.program.push(item); diff --git a/src/utils/json_serializer.rs b/src/utils/json_serializer.rs index f963b1c3..4214b908 100644 --- a/src/utils/json_serializer.rs +++ b/src/utils/json_serializer.rs @@ -12,6 +12,7 @@ use crate::utils::{get_date, modified_time, validate_playlist, GlobalConfig, Med pub const DUMMY_LEN: f64 = 60.0; +/// This is our main playlist object, it holds all necessary information for the current day. #[derive(Debug, Serialize, Deserialize, Clone)] pub struct Playlist { pub date: String, @@ -44,6 +45,8 @@ impl Playlist { } } +/// Read json playlist file, fills Playlist struct and set some extra values, +/// which we need to process. pub fn read_json( path: Option, is_terminated: Arc, @@ -89,13 +92,14 @@ pub fn read_json( serde_json::from_reader(f).expect("Could not read json playlist file."); playlist.current_file = Some(current_file.clone()); - playlist.start_sec = Some(start_sec.clone()); + playlist.start_sec = Some(start_sec); let modify = modified_time(¤t_file); if let Some(modi) = modify { playlist.modified = Some(modi.to_string()); } + // Add extra values to every media clip for (i, item) in playlist.program.iter_mut().enumerate() { item.begin = Some(start_sec); item.index = Some(i); diff --git a/src/utils/json_validate.rs b/src/utils/json_validate.rs index 753df56f..f2dddf33 100644 --- a/src/utils/json_validate.rs +++ b/src/utils/json_validate.rs @@ -1,9 +1,22 @@ -use std::{path::Path, sync::{atomic::{AtomicBool, Ordering}, Arc}}; +use std::{ + path::Path, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, +}; use simplelog::*; use crate::utils::{sec_to_time, GlobalConfig, MediaProbe, Playlist}; +/// Validate a given playlist, to check if: +/// +/// - the source files are existing +/// - file can be read by ffprobe and metadata exists +/// - total playtime fits target length from config +/// +/// This function we run in a thread, to don't block the main function. pub fn validate_playlist(playlist: Playlist, is_terminated: Arc, config: GlobalConfig) { let date = playlist.date; let mut length = config.playlist.length_sec.unwrap(); @@ -15,7 +28,7 @@ pub fn validate_playlist(playlist: Playlist, is_terminated: Arc, con for item in playlist.program.iter() { if is_terminated.load(Ordering::SeqCst) { - return + return; } if Path::new(&item.source).is_file() { diff --git a/src/utils/logging.rs b/src/utils/logging.rs index bf83d732..e60e1005 100644 --- a/src/utils/logging.rs +++ b/src/utils/logging.rs @@ -8,19 +8,23 @@ use std::{ time::Duration, }; -use file_rotate::{compression::Compression, suffix::AppendCount, ContentLimit, FileRotate}; +use chrono::prelude::*; +use file_rotate::{ + compression::Compression, + suffix::{AppendTimestamp, DateFrom, FileLimit}, + ContentLimit, FileRotate, TimeFrequency, +}; use lettre::{ message::header, transport::smtp::authentication::Credentials, Message, SmtpTransport, Transport, }; - -use chrono::prelude::*; use log::{Level, LevelFilter, Log, Metadata, Record}; use regex::Regex; use simplelog::*; use crate::utils::GlobalConfig; +/// send log messages to mail recipient fn send_mail(msg: String) { let config = GlobalConfig::global(); @@ -52,9 +56,10 @@ fn send_mail(msg: String) { } } +/// Basic Mail Queue +/// +/// Check every give seconds for messages and send them. fn mail_queue(messages: Arc>>, interval: u64) { - // check every give seconds for messages and send them - loop { if messages.lock().unwrap().len() > 0 { let msg = messages.lock().unwrap().join("\n"); @@ -67,6 +72,7 @@ fn mail_queue(messages: Arc>>, interval: u64) { } } +/// Self made Mail Log struct, to extend simplelog. pub struct LogMailer { level: LevelFilter, pub config: Config, @@ -121,12 +127,20 @@ impl SharedLogger for LogMailer { } } +/// Workaround to remove color information from log +/// +/// ToDo: maybe in next version from simplelog this is not necessary anymore. fn clean_string(text: &str) -> String { let regex: Regex = Regex::new(r"\x1b\[[0-9;]*[mGKF]").unwrap(); regex.replace_all(text, "").to_string() } +/// Initialize our logging, to have: +/// +/// - console logger +/// - file logger +/// - mail logger pub fn init_logging() -> Vec> { let config = GlobalConfig::global(); let app_config = config.logging.clone(); @@ -137,18 +151,26 @@ pub fn init_logging() -> Vec> { time_level = LevelFilter::Error; } - let log_config = simplelog::ConfigBuilder::new() + let mut log_config = ConfigBuilder::new() .set_thread_level(LevelFilter::Off) .set_target_level(LevelFilter::Off) .set_level_padding(LevelPadding::Left) - .set_time_to_local(app_config.local_time) .set_time_level(time_level) .clone(); + if app_config.local_time { + log_config = match log_config.set_time_offset_to_local() { + Ok(local) => local.clone(), + Err(_) => log_config, + }; + }; + if app_config.log_to_file { let file_config = log_config .clone() - .set_time_format("[%Y-%m-%d %H:%M:%S%.3f]".into()) + .set_time_format_custom(format_description!( + "[[year]-[month]-[day] [hour]:[minute]:[second].[subsecond]]" + )) .build(); let mut log_path = "logs/ffplayout.log".to_string(); @@ -166,8 +188,12 @@ pub fn init_logging() -> Vec> { let log = || { FileRotate::new( log_path, - AppendCount::new(app_config.backup_count), - ContentLimit::Lines(1000), + AppendTimestamp::with_format( + "%Y-%m-%d", + FileLimit::MaxFiles(app_config.backup_count), + DateFrom::DateYesterday, + ), + ContentLimit::Time(TimeFrequency::Daily), Compression::None, ) }; @@ -180,7 +206,9 @@ pub fn init_logging() -> Vec> { .set_level_color(Level::Info, Some(Color::Ansi256(10))) .set_level_color(Level::Warn, Some(Color::Ansi256(208))) .set_level_color(Level::Error, Some(Color::Ansi256(9))) - .set_time_format_str("\x1b[30;1m[%Y-%m-%d %H:%M:%S%.3f]\x1b[0m") + .set_time_format_custom(format_description!( + "\x1b[[30;1m[[[year]-[month]-[day] [hour]:[minute]:[second].[subsecond digits:4]]\x1b[[0m" + )) .build(); app_logger.push(TermLogger::new( @@ -191,14 +219,15 @@ pub fn init_logging() -> Vec> { )); } - if config.mail.recipient.contains("@") && config.mail.recipient.contains(".") { + // set mail logger only the recipient is set in config + if config.mail.recipient.contains('@') && config.mail.recipient.contains('.') { let messages: Arc>> = Arc::new(Mutex::new(Vec::new())); let messages_clone = messages.clone(); - let interval = config.mail.interval.clone(); + let interval = config.mail.interval; thread::spawn(move || mail_queue(messages_clone, interval)); - let mail_config = log_config.clone().build(); + let mail_config = log_config.build(); let filter = match config.mail.mail_level.to_lowercase().as_str() { "info" => LevelFilter::Info, diff --git a/src/utils/mod.rs b/src/utils/mod.rs index ee26434b..23e6941d 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -35,6 +35,7 @@ pub use logging::init_logging; use crate::filter::filter_chains; +/// Video clip struct to hold some important states and comments for current media. #[derive(Debug, Serialize, Deserialize, Clone)] pub struct Media { #[serde(skip_serializing, skip_deserializing)] @@ -89,12 +90,12 @@ impl Media { index: Some(index), seek: 0.0, out: duration, - duration: duration, + duration, category: None, source: src.clone(), cmd: Some(vec!["-i".to_string(), src]), filter: Some(vec![]), - probe: probe, + probe, last_ad: Some(false), next_ad: Some(false), process: Some(true), @@ -124,6 +125,7 @@ impl Media { } } +/// We use the ffprobe crate, but we map the metadata to our needs. #[derive(Debug, Serialize, Deserialize, Clone)] pub struct MediaProbe { pub format: Option, @@ -158,12 +160,12 @@ impl MediaProbe { MediaProbe { format: Some(obj.format), - audio_streams: if a_stream.len() > 0 { + audio_streams: if !a_stream.is_empty() { Some(a_stream) } else { None }, - video_streams: if v_stream.len() > 0 { + video_streams: if !v_stream.is_empty() { Some(v_stream) } else { None @@ -185,6 +187,9 @@ impl MediaProbe { } } +/// Write current status to status file in temp folder. +/// +/// The status file is init in main function and mostly modified in RPC server. pub fn write_status(date: &str, shift: f64) { let config = GlobalConfig::global(); let stat_file = config.general.stat_file.clone(); @@ -206,6 +211,7 @@ pub fn write_status(date: &str, shift: f64) { // local.timestamp_millis() as i64 // } +/// Get current time in seconds. pub fn get_sec() -> f64 { let local: DateTime = Local::now(); @@ -213,6 +219,10 @@ pub fn get_sec() -> f64 { + (local.nanosecond() as f64 / 1000000000.0) } +/// Get current date for playlist, but check time with conditions: +/// +/// - When time is before playlist start, get date from yesterday. +/// - When given next_start is over target length (normally a full day), get date from tomorrow. pub fn get_date(seek: bool, start: f64, next_start: f64) -> String { let local: DateTime = Local::now(); @@ -227,6 +237,7 @@ pub fn get_date(seek: bool, start: f64, next_start: f64) -> String { local.format("%Y-%m-%d").to_string() } +/// Get file modification time. pub fn modified_time(path: &str) -> Option> { let metadata = metadata(path).unwrap(); @@ -238,8 +249,9 @@ pub fn modified_time(path: &str) -> Option> { None } +/// Convert a formatted time string to seconds. pub fn time_to_sec(time_str: &str) -> f64 { - if ["now", "", "none"].contains(&time_str) || !time_str.contains(":") { + if ["now", "", "none"].contains(&time_str) || !time_str.contains(':') { return get_sec(); } @@ -251,6 +263,7 @@ pub fn time_to_sec(time_str: &str) -> f64 { h * 3600.0 + m * 60.0 + s } +/// Convert floating number (seconds) to a formatted time string. pub fn sec_to_time(sec: f64) -> String { let d = UNIX_EPOCH + time::Duration::from_millis((sec * 1000.0) as u64); // Create DateTime from SystemTime @@ -259,6 +272,8 @@ pub fn sec_to_time(sec: f64) -> String { date_time.format("%H:%M:%S%.3f").to_string() } +/// Test if given numbers are close to each other, +/// with a third number for setting the maximum range. pub fn is_close(a: f64, b: f64, to: f64) -> bool { if (a - b).abs() < to { return true; @@ -267,13 +282,16 @@ pub fn is_close(a: f64, b: f64, to: f64) -> bool { false } +/// Get delta between clip start and current time. This value we need to check, +/// if we still in sync. +/// +/// We also get here the global delta between clip start and time when a new playlist should start. pub fn get_delta(begin: &f64) -> (f64, f64) { let config = GlobalConfig::global(); let mut current_time = get_sec(); let start = config.playlist.start_sec.unwrap(); let length = time_to_sec(&config.playlist.length); let mut target_length = 86400.0; - let total_delta; if length > 0.0 && length != target_length { target_length = length @@ -290,15 +308,16 @@ pub fn get_delta(begin: &f64) -> (f64, f64) { current_delta -= 86400.0 } - if current_time < start { - total_delta = start - current_time; + let total_delta = if current_time < start { + start - current_time } else { - total_delta = target_length + start - current_time; - } + target_length + start - current_time + }; (current_delta, total_delta) } +/// Check if clip in playlist is in sync with global time. pub fn check_sync(delta: f64) -> bool { let config = GlobalConfig::global(); @@ -310,6 +329,7 @@ pub fn check_sync(delta: f64) -> bool { true } +/// Create a dummy clip as a placeholder for missing video files. pub fn gen_dummy(duration: f64) -> (String, Vec) { let config = GlobalConfig::global(); let color = "#121212"; @@ -334,6 +354,7 @@ pub fn gen_dummy(duration: f64) -> (String, Vec) { (source, cmd) } +/// Set clip seek in and length value. pub fn seek_and_length(src: String, seek: f64, out: f64, duration: f64) -> Vec { let mut source_cmd: Vec = vec![]; @@ -344,25 +365,19 @@ pub fn seek_and_length(src: String, seek: f64, out: f64, duration: f64) -> Vec out { - source_cmd.append(&mut vec![ - "-t".to_string(), - format!("{}", out - seek).to_string(), - ]); + source_cmd.append(&mut vec!["-t".to_string(), format!("{}", out - seek)]); } source_cmd } +/// Read ffmpeg stderr decoder, encoder and server instance +/// and log the output. pub fn stderr_reader(buffer: BufReader, suffix: &str) -> Result<(), Error> { - // read ffmpeg stderr decoder, encoder and server instance - // and log the output - fn format_line(line: String, level: &str) -> String { line.replace(&format!("[{level: >5}] "), "") } - // let buffer = BufReader::new(std_errors); - for line in buffer.lines() { let line = line?; @@ -373,22 +388,21 @@ pub fn stderr_reader(buffer: BufReader, suffix: &str) -> Result<(), "[{suffix}] {}", format_line(line, "warning") ) - } else { - if suffix != "server" - && !line.contains("Input/output error") - && !line.contains("Broken pipe") - { - error!( - "[{suffix}] {}", - format_line(line.clone(), "error") - ); - } + } else if suffix != "server" + && !line.contains("Input/output error") + && !line.contains("Broken pipe") + { + error!( + "[{suffix}] {}", + format_line(line.clone(), "error") + ); } } Ok(()) } +/// Run program to test if it is in system. fn is_in_system(name: &str) { if let Ok(mut proc) = Command::new(name) .stderr(Stdio::null()) @@ -407,6 +421,8 @@ fn is_in_system(name: &str) { fn ffmpeg_libs_and_filter() -> (Vec, Vec) { let mut libs: Vec = vec![]; let mut filters: Vec = vec![]; + + // filter lines which contains filter let re: Regex = Regex::new(r"^( ?) [TSC.]+").unwrap(); let mut ff_proc = match Command::new("ffmpeg") @@ -425,27 +441,27 @@ fn ffmpeg_libs_and_filter() -> (Vec, Vec) { let err_buffer = BufReader::new(ff_proc.stderr.take().unwrap()); let out_buffer = BufReader::new(ff_proc.stdout.take().unwrap()); - for line in err_buffer.lines() { - if let Ok(line) = line { - if line.contains("configuration:") { - let configs = line.split_whitespace(); + // stderr shows only the ffmpeg configuration + // get codec library's + for line in err_buffer.lines().flatten() { + if line.contains("configuration:") { + let configs = line.split_whitespace(); - for config in configs { - if config.contains("--enable-lib") { - libs.push(config.replace("--enable-", "")); - } + for config in configs { + if config.contains("--enable-lib") { + libs.push(config.replace("--enable-", "")); } } } } - for line in out_buffer.lines() { - if let Ok(line) = line { - if let Some(_) = re.captures(line.as_str()) { - let filter_line = line.split_whitespace(); + // stdout shows filter help text + // get filters + for line in out_buffer.lines().flatten() { + if re.captures(line.as_str()).is_some() { + let filter_line = line.split_whitespace(); - filters.push(filter_line.collect::>()[1].to_string()); - } + filters.push(filter_line.collect::>()[1].to_string()); } } @@ -455,6 +471,10 @@ fn ffmpeg_libs_and_filter() -> (Vec, Vec) { (libs, filters) } + +/// Validate ffmpeg/ffprobe/ffplay. +/// +/// Check if they are in system and has all filters and codecs we need. pub fn validate_ffmpeg() { let config = GlobalConfig::global();