From da2fdd2d75334af6feed2780986e659d269c4f88 Mon Sep 17 00:00:00 2001 From: Jonathan Baecker Date: Sun, 27 Oct 2024 17:23:21 +0100 Subject: [PATCH] better wait after kill(), update notify, remove crossbeam, error counter for ingest, show all lines in log window --- Cargo.lock | 197 +++++++++++++++--------------- Cargo.toml | 2 +- engine/Cargo.toml | 3 +- engine/examples/flexi2.rs | 184 ++++++---------------------- engine/src/player/controller.rs | 43 ++++--- engine/src/player/input/folder.rs | 10 +- engine/src/player/input/ingest.rs | 27 ++-- engine/src/player/output/hls.rs | 29 ++++- engine/src/player/output/mod.rs | 5 +- engine/src/player/utils/mod.rs | 12 +- engine/src/utils/config.rs | 6 +- engine/src/utils/control.rs | 36 +----- engine/src/utils/mod.rs | 8 +- frontend/pages/logging.vue | 2 +- 14 files changed, 229 insertions(+), 335 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 42e76d1b..0e5afef6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -113,7 +113,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e01ed3140b2f8d422c68afa1ed2e85d996ea619c988ac834d255db32138655cb" dependencies = [ "quote", - "syn 2.0.82", + "syn 2.0.85", ] [[package]] @@ -151,7 +151,7 @@ dependencies = [ "parse-size", "proc-macro2", "quote", - "syn 2.0.82", + "syn 2.0.85", ] [[package]] @@ -191,7 +191,7 @@ dependencies = [ "actix-utils", "futures-core", "futures-util", - "mio 1.0.2", + "mio", "socket2", "tokio", "tracing", @@ -311,7 +311,7 @@ dependencies = [ "actix-router", "proc-macro2", "quote", - "syn 2.0.82", + "syn 2.0.85", ] [[package]] @@ -385,7 +385,7 @@ checksum = "4c221da13534b9352f3f79fcbbd6095f6d8aee63bdf1da8a73d36f9eeea17d5a" dependencies = [ "proc-macro2", "quote", - "syn 2.0.82", + "syn 2.0.85", ] [[package]] @@ -475,9 +475,9 @@ dependencies = [ [[package]] name = "anstream" -version = "0.6.15" +version = "0.6.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "64e15c1ab1f89faffbf04a634d5e1962e9074f2741eef6d97f3c4e322426d526" +checksum = "23a1e53f0f5d86382dafe1cf314783b2044280f406e7e1506368220ad11b1338" dependencies = [ "anstyle", "anstyle-parse", @@ -490,36 +490,36 @@ dependencies = [ [[package]] name = "anstyle" -version = "1.0.8" +version = "1.0.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1bec1de6f59aedf83baf9ff929c98f2ad654b97c9510f4e70cf6f661d49fd5b1" +checksum = "8365de52b16c035ff4fcafe0092ba9390540e3e352870ac09933bebcaa2c8c56" [[package]] name = "anstyle-parse" -version = "0.2.5" +version = "0.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb47de1e80c2b463c735db5b217a0ddc39d612e7ac9e2e96a5aed1f57616c1cb" +checksum = "3b2d16507662817a6a20a9ea92df6652ee4f94f914589377d69f3b21bc5798a9" dependencies = [ "utf8parse", ] [[package]] name = "anstyle-query" -version = "1.1.1" +version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6d36fc52c7f6c869915e99412912f22093507da8d9e942ceaf66fe4b7c14422a" +checksum = "79947af37f4177cfead1110013d678905c37501914fba0efea834c3fe9a8d60c" dependencies = [ - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] name = "anstyle-wincon" -version = "3.0.4" +version = "3.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5bf74e1b6e971609db8ca7a9ce79fd5768ab6ae46441c572e46cf596f59e57f8" +checksum = "2109dbce0e72be3ec00bed26e6a7479ca384ad226efdd66db8fa2e3a38c83125" dependencies = [ "anstyle", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -554,7 +554,7 @@ checksum = "721cae7de5c34fbb2acd27e21e6d2cf7b886dce0c27388d46c4e6c47ea4318dd" dependencies = [ "proc-macro2", "quote", - "syn 2.0.82", + "syn 2.0.85", ] [[package]] @@ -821,7 +821,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.82", + "syn 2.0.85", ] [[package]] @@ -832,9 +832,9 @@ checksum = "1462739cb27611015575c0c11df5df7601141071f07518d56fcc1be504cbec97" [[package]] name = "colorchoice" -version = "1.0.2" +version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d3fd119d74b830634cea2a0f58bbd0d54540518a14397557951e79340abc28c0" +checksum = "5b63caa9aa9397e2d9480a9b13673856c78d8ac123288526c37d7839f2a86990" [[package]] name = "concurrent-queue" @@ -1002,7 +1002,7 @@ dependencies = [ "proc-macro2", "quote", "strsim", - "syn 2.0.82", + "syn 2.0.85", ] [[package]] @@ -1013,7 +1013,7 @@ checksum = "d336a2a514f6ccccaa3e09b02d41d35330c07ddf03a62165fcec10bb561c7806" dependencies = [ "darling_core", "quote", - "syn 2.0.82", + "syn 2.0.85", ] [[package]] @@ -1060,7 +1060,7 @@ dependencies = [ "proc-macro2", "quote", "rustc_version", - "syn 2.0.82", + "syn 2.0.85", ] [[package]] @@ -1080,7 +1080,7 @@ checksum = "cb7330aeadfbe296029522e6c40f315320aba36fc43a5b3632f3795348f3bd22" dependencies = [ "proc-macro2", "quote", - "syn 2.0.82", + "syn 2.0.85", "unicode-xid", ] @@ -1104,7 +1104,7 @@ checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.82", + "syn 2.0.85", ] [[package]] @@ -1140,9 +1140,9 @@ checksum = "e079f19b08ca6239f47f8ba8509c11cf3ea30095831f7fed61441475edd8c449" [[package]] name = "encoding_rs" -version = "0.8.34" +version = "0.8.35" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b45de904aa0b010bce2ab45264d0631681847fa7b6f2eaa7dab7619943bc4f59" +checksum = "75030f3c4f45dafd7586dd6780965a8c7e8e285a5ecb86713e63a79c5b2766f3" dependencies = [ "cfg-if", ] @@ -1214,7 +1214,7 @@ checksum = "e8c02a5121d4ea3eb16a80748c74f5549a5665e4c21333c6098f283870fbdea6" [[package]] name = "ffplayout" -version = "0.24.0-rc3" +version = "0.24.0" dependencies = [ "actix-files", "actix-multipart", @@ -1226,7 +1226,6 @@ dependencies = [ "argon2", "chrono", "clap", - "crossbeam-channel", "derive_more 1.0.0", "faccess", "ffprobe", @@ -1282,11 +1281,11 @@ dependencies = [ [[package]] name = "file-id" -version = "0.2.1" +version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6584280525fb2059cba3db2c04abf947a1a29a45ddae89f3870f8281704fafc9" +checksum = "6bc904b9bbefcadbd8e3a9fb0d464a9b979de6324c03b3c663e8994f46a5be36" dependencies = [ - "windows-sys 0.48.0", + "windows-sys 0.52.0", ] [[package]] @@ -1436,7 +1435,7 @@ checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" dependencies = [ "proc-macro2", "quote", - "syn 2.0.82", + "syn 2.0.85", ] [[package]] @@ -1856,7 +1855,7 @@ checksum = "1ec89e9337638ecdc08744df490b221a7399bf8d164eb52a665454e60e075ad6" dependencies = [ "proc-macro2", "quote", - "syn 2.0.82", + "syn 2.0.85", ] [[package]] @@ -1917,9 +1916,9 @@ dependencies = [ [[package]] name = "inotify" -version = "0.9.6" +version = "0.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8069d3ec154eb856955c1c0fbffefbf5f3c40a104ec912d4797314c1801abff" +checksum = "fdd168d97690d0b8c412d6b6c10360277f4d7ee495c5d0d5d5fe0854923255cc" dependencies = [ "bitflags 1.3.2", "inotify-sys", @@ -1935,6 +1934,15 @@ dependencies = [ "libc", ] +[[package]] +name = "instant" +version = "0.1.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0242819d153cba4b4b05a5a8f2a7e9bbf97b6055b2a002b395c96b5ff3c0222" +dependencies = [ + "cfg-if", +] + [[package]] name = "ipnet" version = "2.10.1" @@ -2032,9 +2040,9 @@ dependencies = [ [[package]] name = "lettre" -version = "0.11.9" +version = "0.11.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69f204773bab09b150320ea1c83db41dc6ee606a4bc36dc1f43005fe7b58ce06" +checksum = "0161e452348e399deb685ba05e55ee116cae9410f4f51fe42d597361444521d9" dependencies = [ "async-trait", "base64 0.22.1", @@ -2077,9 +2085,9 @@ checksum = "8e9489c2807c139ffd9c1794f4af0ebe86a828db53ecdc7fea2111d0fed085d1" [[package]] name = "libm" -version = "0.2.8" +version = "0.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4ec2a862134d2a7d32d7983ddcdd1c4923530833c9f2ea1a44fc5fa473989058" +checksum = "3bda4c6077b0b08da2c48b172195795498381a7c8988c9e6212a6c55c5b9bd70" [[package]] name = "libredox" @@ -2229,18 +2237,6 @@ dependencies = [ "adler2", ] -[[package]] -name = "mio" -version = "0.8.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c" -dependencies = [ - "libc", - "log", - "wasi", - "windows-sys 0.48.0", -] - [[package]] name = "mio" version = "1.0.2" @@ -2303,36 +2299,45 @@ dependencies = [ [[package]] name = "notify" -version = "6.1.1" +version = "7.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6205bd8bb1e454ad2e27422015fb5e4f2bcc7e08fa8f27058670d208324a4d2d" +checksum = "c533b4c39709f9ba5005d8002048266593c1cfaf3c5f0739d5b8ab0c6c504009" dependencies = [ "bitflags 2.6.0", - "crossbeam-channel", "filetime", "fsevent-sys", "inotify", "kqueue", "libc", "log", - "mio 0.8.11", + "mio", + "notify-types", "walkdir", - "windows-sys 0.48.0", + "windows-sys 0.52.0", ] [[package]] name = "notify-debouncer-full" -version = "0.3.2" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fb7fd166739789c9ff169e654dc1501373db9d80a4c3f972817c8a4d7cf8f34e" +checksum = "9dcf855483228259b2353f89e99df35fc639b2b2510d1166e4858e3f67ec1afb" dependencies = [ "file-id", "log", "notify", - "parking_lot", + "notify-types", "walkdir", ] +[[package]] +name = "notify-types" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7393c226621f817964ffb3dc5704f9509e107a8b024b489cc2c1b217378785df" +dependencies = [ + "instant", +] + [[package]] name = "ntapi" version = "0.4.1" @@ -2535,9 +2540,9 @@ checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" [[package]] name = "pin-project-lite" -version = "0.2.14" +version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bda66fc9667c18cb2758a2ac84d1167245054bcf85d5d1aaa6923f45801bdd02" +checksum = "915a1e146535de9163f3987b8944ed8cf49a18bb0056bcebcdcece385cece4ff" [[package]] name = "pin-utils" @@ -2605,7 +2610,7 @@ dependencies = [ "darling", "proc-macro2", "quote", - "syn 2.0.82", + "syn 2.0.85", ] [[package]] @@ -2741,9 +2746,9 @@ dependencies = [ [[package]] name = "regex" -version = "1.11.0" +version = "1.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38200e5ee88914975b69f657f0801b6f6dccafd44fd9326302a4aaeecfacb1d8" +checksum = "b544ef1b4eac5dc2db33ea63606ae9ffcfac26c1416a2806ae0bf5f56b201191" dependencies = [ "aho-corasick", "memchr", @@ -2981,9 +2986,9 @@ dependencies = [ [[package]] name = "scc" -version = "2.2.2" +version = "2.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f2c1f7fc6deb21665a9060dfc7d271be784669295a31babdcd4dd2c79ae8cbfb" +checksum = "d8d25269dd3a12467afe2e510f69fb0b46b698e5afb296b59f2145259deaf8e8" dependencies = [ "sdd", ] @@ -3023,7 +3028,7 @@ checksum = "7e85ad2009c50b58e87caa8cd6dac16bdf511bbfb7af6c33df902396aa480fa5" dependencies = [ "proc-macro2", "quote", - "syn 2.0.82", + "syn 2.0.85", ] [[package]] @@ -3127,7 +3132,7 @@ dependencies = [ "darling", "proc-macro2", "quote", - "syn 2.0.82", + "syn 2.0.85", ] [[package]] @@ -3152,7 +3157,7 @@ checksum = "82fe9db325bcef1fbcde82e078a5cc4efdf787e96b3b9cf45b50b529f2083d67" dependencies = [ "proc-macro2", "quote", - "syn 2.0.82", + "syn 2.0.85", ] [[package]] @@ -3333,7 +3338,7 @@ dependencies = [ "quote", "sqlx-core", "sqlx-macros-core", - "syn 2.0.82", + "syn 2.0.85", ] [[package]] @@ -3356,7 +3361,7 @@ dependencies = [ "sqlx-mysql", "sqlx-postgres", "sqlx-sqlite", - "syn 2.0.82", + "syn 2.0.85", "tempfile", "tokio", "url", @@ -3609,9 +3614,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.82" +version = "2.0.85" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "83540f837a8afc019423a8edb95b52a8effe46957ee402287f4292fae35be021" +checksum = "5023162dfcd14ef8f32034d8bcd4cc5ddc61ef7a247c024a33e24e1f24d21b56" dependencies = [ "proc-macro2", "quote", @@ -3635,7 +3640,7 @@ checksum = "c8af7666ab7b6390ab78131fb5b0fce11d6b7a6951602017c35fa82800708971" dependencies = [ "proc-macro2", "quote", - "syn 2.0.82", + "syn 2.0.85", ] [[package]] @@ -3676,7 +3681,7 @@ dependencies = [ [[package]] name = "tests" -version = "0.24.0-rc3" +version = "0.24.0" dependencies = [ "actix-rt", "actix-test", @@ -3721,7 +3726,7 @@ checksum = "ae71770322cbd277e69d762a16c444af02aa0575ac0d174f0b9562d3b37f8602" dependencies = [ "proc-macro2", "quote", - "syn 2.0.82", + "syn 2.0.85", ] [[package]] @@ -3789,7 +3794,7 @@ dependencies = [ "backtrace", "bytes", "libc", - "mio 1.0.2", + "mio", "parking_lot", "pin-project-lite", "signal-hook-registry", @@ -3806,7 +3811,7 @@ checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" dependencies = [ "proc-macro2", "quote", - "syn 2.0.82", + "syn 2.0.85", ] [[package]] @@ -3893,7 +3898,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.82", + "syn 2.0.85", ] [[package]] @@ -3931,7 +3936,7 @@ checksum = "0ea0b99e8ec44abd6f94a18f28f7934437809dd062820797c52401298116f70e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.82", + "syn 2.0.85", "termcolor", ] @@ -4044,9 +4049,9 @@ checksum = "4e8257fbc510f0a46eb602c10215901938b5c2a7d5e70fc11483b1d3c9b5b18c" [[package]] name = "value-bag" -version = "1.9.0" +version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a84c137d37ab0142f0f2ddfe332651fdbf252e7b7dbb4e67b6c1f1b2e925101" +checksum = "3ef4c4aa54d5d05a279399bfa921ec387b7aba77caf7a682ae8d86785b8fdad2" dependencies = [ "value-bag-serde1", "value-bag-sval2", @@ -4054,9 +4059,9 @@ dependencies = [ [[package]] name = "value-bag-serde1" -version = "1.9.0" +version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ccacf50c5cb077a9abb723c5bcb5e0754c1a433f1e1de89edc328e2760b6328b" +checksum = "4bb773bd36fd59c7ca6e336c94454d9c66386416734817927ac93d81cb3c5b0b" dependencies = [ "erased-serde", "serde", @@ -4065,9 +4070,9 @@ dependencies = [ [[package]] name = "value-bag-sval2" -version = "1.9.0" +version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1785bae486022dfb9703915d42287dcb284c1ee37bd1080eeba78cc04721285b" +checksum = "53a916a702cac43a88694c97657d449775667bcd14b70419441d05b7fea4a83a" dependencies = [ "sval", "sval_buffer", @@ -4143,7 +4148,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.82", + "syn 2.0.85", "wasm-bindgen-shared", ] @@ -4177,7 +4182,7 @@ checksum = "26c6ab57572f7a24a4985830b120de1594465e5d500f24afe89e16b4e833ef68" dependencies = [ "proc-macro2", "quote", - "syn 2.0.82", + "syn 2.0.85", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -4287,7 +4292,7 @@ checksum = "9107ddc059d5b6fbfbffdfa7a7fe3e22a226def0b2608f72e9d552763d3e1ad7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.82", + "syn 2.0.85", ] [[package]] @@ -4298,7 +4303,7 @@ checksum = "29bee4b38ea3cde66011baa44dba677c432a78593e202392d1e9070cf2a7fca7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.82", + "syn 2.0.85", ] [[package]] @@ -4529,7 +4534,7 @@ checksum = "28cc31741b18cb6f1d5ff12f5b7523e3d6eb0852bbbad19d73905511d9849b95" dependencies = [ "proc-macro2", "quote", - "syn 2.0.82", + "syn 2.0.85", "synstructure", ] @@ -4551,7 +4556,7 @@ checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.82", + "syn 2.0.85", ] [[package]] @@ -4571,7 +4576,7 @@ checksum = "0ea7b4a3637ea8669cedf0f1fd5c286a17f3de97b8dd5a70a6c167a1730e63a5" dependencies = [ "proc-macro2", "quote", - "syn 2.0.82", + "syn 2.0.85", "synstructure", ] @@ -4627,7 +4632,7 @@ checksum = "6eafa6dfb17584ea3e2bd6e76e0cc15ad7af12b09abdd1ca55961bed9b1063c6" dependencies = [ "proc-macro2", "quote", - "syn 2.0.82", + "syn 2.0.85", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index f89d844a..4390ceac 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,7 +5,7 @@ resolver = "2" [workspace.package] description = "24/7 playout based on rust and ffmpeg" readme = "README.md" -version = "0.24.0-rc3" +version = "0.24.0" license = "GPL-3.0" repository = "https://github.com/ffplayout/ffplayout" authors = ["Jonathan Baecker "] diff --git a/engine/Cargo.toml b/engine/Cargo.toml index d43362e2..6b060bac 100644 --- a/engine/Cargo.toml +++ b/engine/Cargo.toml @@ -23,7 +23,6 @@ actix-web-static-files = "4.0" argon2 = "0.5" chrono = { version = "0.4", default-features = false, features = ["clock", "std", "serde"] } clap = { version = "4.3", features = ["derive", "env"] } -crossbeam-channel = "0.5" derive_more = { version = "1", features = ["display"] } faccess = "0.2" ffprobe = "0.4" @@ -37,7 +36,7 @@ local-ip-address = "0.6" log = { version = "0.4", features = ["std", "serde", "kv", "kv_std", "kv_sval", "kv_serde"] } m3u8-rs = "6" nix = { version = "0.29", features = ["user", "fs"] } -notify = "6.0" +notify = "7.0" notify-debouncer-full = { version = "*", default-features = false } num-traits = "0.2" once_cell = "1" diff --git a/engine/examples/flexi2.rs b/engine/examples/flexi2.rs index 74cb0844..7182f4e4 100644 --- a/engine/examples/flexi2.rs +++ b/engine/examples/flexi2.rs @@ -1,176 +1,60 @@ use log::*; use std::io::Write; -// use std::io::{Error, ErrorKind}; -// use std::sync::{Arc, Mutex}; use flexi_logger::writers::{FileLogWriter, LogWriter}; use flexi_logger::{Age, Cleanup, Criterion, DeferredNow, FileSpec, Logger, Naming}; -use paris::formatter::colorize_string; -pub struct LogMailer; - -impl LogWriter for LogMailer { - fn write(&self, now: &mut DeferredNow, record: &Record<'_>) -> std::io::Result<()> { - println!("target: {:?}", record.target()); - println!("key/value: {:?}", record.key_values().get("channel".into())); - println!( - "[{}] [{:>5}] Mail logger: {:?}", - now.now().format("%Y-%m-%d %H:%M:%S"), - record.level(), - record.args() - ); - Ok(()) - } - fn flush(&self) -> std::io::Result<()> { - Ok(()) - } -} - -pub struct LogConsole; - -impl LogWriter for LogConsole { - fn write(&self, now: &mut DeferredNow, record: &Record<'_>) -> std::io::Result<()> { - console_formatter(&mut std::io::stderr(), now, record)?; - - println!(); - Ok(()) - } - fn flush(&self) -> std::io::Result<()> { - Ok(()) - } -} - -pub fn file_logger(to_file: bool) -> Box { - if to_file { - Box::new( - FileLogWriter::builder( - FileSpec::default() - .suppress_timestamp() - // .directory("/var/log") - .basename("ffplayout"), - ) - .append() - .format(file_formatter) - .rotate( - Criterion::Age(Age::Day), - Naming::Timestamps, - Cleanup::KeepLogFiles(7), - ) - .print_message() - .try_build() - .unwrap(), +pub fn file_logger() -> Box { + Box::new( + FileLogWriter::builder( + FileSpec::default() + .suppress_timestamp() + .directory("./logs") + .discriminant("1") + .basename("ffplayout"), ) - } else { - Box::new(LogConsole) - } -} - -// struct MyWriter { -// file: Arc>, -// } - -// impl LogWriter for MyWriter { -// fn write( -// &self, -// now: &mut flexi_logger::DeferredNow, -// record: &flexi_logger::Record, -// ) -> std::io::Result<()> { -// let mut file = self -// .file -// .lock() -// .map_err(|e| Error::new(ErrorKind::Other, e.to_string()))?; -// flexi_logger::default_format(&mut *file, now, record) -// } - -// fn flush(&self) -> std::io::Result<()> { -// let mut file = self -// .file -// .lock() -// .map_err(|e| Error::new(ErrorKind::Other, e.to_string()))?; -// file.flush() -// } -// } - -// Define a macro for writing messages to the alert log and to the normal log -#[macro_use] -mod macros { - #[macro_export] - macro_rules! file_error { - ($($arg:tt)*) => ( - error!(target: "{File}", $($arg)*); + .append() + .format(file_formatter) + .rotate( + Criterion::Age(Age::Day), + Naming::TimestampsCustomFormat { + current_infix: Some(""), + format: "%Y-%m-%d", + }, + Cleanup::KeepLogFiles(4), ) - } -} - -pub fn console_formatter( - w: &mut dyn Write, - now: &mut DeferredNow, - record: &Record, -) -> std::io::Result<()> { - let timestamp = colorize_string(format!( - "[{}]", - now.now().format("%Y-%m-%d %H:%M:%S%.6f") - )); - - let level = match record.level() { - Level::Debug => colorize_string("[DEBUG]"), - Level::Error => colorize_string("[ERROR]"), - Level::Info => colorize_string("[ INFO]"), - Level::Trace => colorize_string("[TRACE]"), - Level::Warn => colorize_string("[ WARN]"), - }; - - write!( - w, - "{} {} {}", - timestamp, - level, - colorize_string(record.args().to_string()), + .print_message() + .try_build() + .unwrap(), ) } -pub fn file_formatter( +fn file_formatter( w: &mut dyn Write, now: &mut DeferredNow, record: &Record, ) -> std::io::Result<()> { - let timestamp = format!("[{}]", now.now().format("%Y-%m-%d %H:%M:%S%.6f")); - - let level = match record.level() { - Level::Debug => "[DEBUG]", - Level::Error => "[ERROR]", - Level::Info => "[ INFO]", - Level::Trace => "[TRACE]", - Level::Warn => "[ WARN]", - }; - - write!(w, "{} {} {}", timestamp, level, record.args()) + write!( + w, + "[{}] [{:>5}] {}", + now.now().format("%Y-%m-%d %H:%M:%S%.6f"), + record.level(), + record.args() + ) } fn main() { - let to_file = true; - Logger::try_with_str("WARN") .expect("LogSpecification String has errors") - .format(console_formatter) .print_message() .log_to_stderr() - .add_writer("File", file_logger(to_file)) - .add_writer("Mail", Box::new(LogMailer)) + .add_writer("Alert", file_logger()) .start() .unwrap(); - // Explicitly send logs to different loggers - info!(target: "{Mail}", "This logs only to Mail"); - warn!(target: "{File,Mail}", channel = 1; "This logs to File and Mail"); - error!(target: "{File}", "This logs only to file"); - error!(target: "{_Default}", "This logs to console"); - - file_error!("This is another file log"); - - error!("This is a normal error message"); - warn!("This is a warning"); - info!("This is an info message"); - debug!("This is an debug message"); - trace!("This is an trace message"); + error!(target : "{Alert,_Default}", "This is error message"); + warn!(target : "{Alert,_Default}", "This is a warning"); + info!(target : "{Alert,_Default}", "This is an info message"); + debug!(target : "{Alert,_Default}", "This is an debug message"); + trace!(target : "{Alert,_Default}", "This is an trace message"); } diff --git a/engine/src/player/controller.rs b/engine/src/player/controller.rs index 04d5ad06..d06b650b 100644 --- a/engine/src/player/controller.rs +++ b/engine/src/player/controller.rs @@ -227,27 +227,40 @@ impl ChannelManager { /// Wait for process to proper close. /// This prevents orphaned/zombi processes in system pub fn wait(&self, unit: ProcessUnit) -> Result<(), ProcessError> { - match unit { - Decoder => { - if let Some(proc) = self.decoder.lock().unwrap().as_mut() { - proc.wait() - .map_err(|e| ProcessError::Custom(format!("Decoder: {e}")))?; + loop { + match unit { + Decoder => { + if let Some(proc) = self.decoder.lock().unwrap().as_mut() { + match proc.try_wait() { + Ok(Some(_)) => break, + Ok(None) => thread::sleep(Duration::from_millis(10)), + Err(e) => return Err(ProcessError::Custom(format!("Decoder: {e}"))), + } + } } - } - Encoder => { - if let Some(proc) = self.encoder.lock().unwrap().as_mut() { - proc.wait() - .map_err(|e| ProcessError::Custom(format!("Encoder: {e}")))?; + Encoder => { + if let Some(proc) = self.encoder.lock().unwrap().as_mut() { + match proc.try_wait() { + Ok(Some(_)) => break, + Ok(None) => thread::sleep(Duration::from_millis(10)), + Err(e) => return Err(ProcessError::Custom(format!("Encoder: {e}"))), + } + } } - } - Ingest => { - if let Some(proc) = self.ingest.lock().unwrap().as_mut() { - proc.wait() - .map_err(|e| ProcessError::Custom(format!("Ingest: {e}")))?; + Ingest => { + if let Some(proc) = self.ingest.lock().unwrap().as_mut() { + match proc.try_wait() { + Ok(Some(_)) => break, + Ok(None) => thread::sleep(Duration::from_millis(10)), + Err(e) => return Err(ProcessError::Custom(format!("Ingest: {e}"))), + } + } } } } + thread::sleep(Duration::from_millis(50)); + Ok(()) } diff --git a/engine/src/player/input/folder.rs b/engine/src/player/input/folder.rs index cb85e075..66487a2e 100644 --- a/engine/src/player/input/folder.rs +++ b/engine/src/player/input/folder.rs @@ -13,7 +13,7 @@ use log::*; use notify::{ event::{CreateKind, ModifyKind, RemoveKind, RenameMode}, EventKind::{Create, Modify, Remove}, - RecursiveMode, Watcher, + RecursiveMode, }; use notify_debouncer_full::new_debouncer; @@ -41,11 +41,7 @@ pub fn watchman( let mut debouncer = new_debouncer(Duration::from_secs(1), None, tx).unwrap(); - debouncer - .watcher() - .watch(path, RecursiveMode::Recursive) - .unwrap(); - debouncer.cache().add_root(path, RecursiveMode::Recursive); + debouncer.watch(path, RecursiveMode::Recursive).unwrap(); while !is_terminated.load(Ordering::SeqCst) { if let Ok(result) = rx.try_recv() { @@ -93,7 +89,7 @@ pub fn watchman( info!(target: Target::file_mail(), channel = id; "Create new file: {new_path:?}"); } } - _ => debug!(target: Target::file_mail(), channel = id; "Not tracked file event: {event:?}") + _ => trace!(target: Target::file_mail(), channel = id; "Not tracked file event: {event:?}") }), Err(errors) => errors.iter().for_each(|error| error!(target: Target::file_mail(), channel = id; "{error:?}")), } diff --git a/engine/src/player/input/ingest.rs b/engine/src/player/input/ingest.rs index 63b9b582..97b33e0b 100644 --- a/engine/src/player/input/ingest.rs +++ b/engine/src/player/input/ingest.rs @@ -1,11 +1,10 @@ use std::{ io::{BufRead, BufReader, Read}, process::{ChildStderr, Command, Stdio}, - sync::atomic::Ordering, + sync::{atomic::Ordering, mpsc::SyncSender}, thread, }; -use crossbeam_channel::Sender; use log::*; use crate::utils::{ @@ -16,7 +15,7 @@ use crate::vec_strings; use crate::{ player::{ controller::{ChannelManager, ProcessUnit::*}, - utils::{test_tcp_port, valid_stream, Media}, + utils::{is_free_tcp_port, valid_stream, Media}, }, utils::errors::ProcessError, }; @@ -38,6 +37,8 @@ fn server_monitor( } if line.contains("rtmp") && line.contains("Unexpected stream") && !valid_stream(&line) { + warn!(target: Target::file_mail(), channel = id; "Unexpected ingest stream: {line}"); + if let Err(e) = channel_mgr.stop(Ingest) { error!(target: Target::file_mail(), channel = id; "{e}"); }; @@ -61,7 +62,7 @@ fn server_monitor( /// Start ffmpeg in listen mode, and wait for input. pub fn ingest_server( config: PlayoutConfig, - ingest_sender: Sender<(usize, [u8; 65088])>, + ingest_sender: SyncSender<(usize, [u8; 65088])>, channel_mgr: ChannelManager, ) -> Result<(), ProcessError> { let id = config.general.channel_id; @@ -103,20 +104,20 @@ pub fn ingest_server( let mut is_running; - if let Some(url) = stream_input.iter().find(|s| s.contains("://")) { - if !test_tcp_port(id, url) { - channel_mgr.channel.lock().unwrap().active = false; - channel_mgr.stop_all(); - } - - info!(target: Target::file_mail(), channel = id; "Start ingest server, listening on: {url}",); - }; - debug!(target: Target::file_mail(), channel = id; "Server CMD: \"ffmpeg {}\"", server_cmd.join(" ") ); + if let Some(url) = stream_input.iter().find(|s| s.contains("://")) { + if !is_free_tcp_port(id, url) { + channel_mgr.channel.lock().unwrap().active = false; + channel_mgr.stop_all(); + } else { + info!(target: Target::file_mail(), channel = id; "Start ingest server, listening on: {url}"); + } + }; + while !is_terminated.load(Ordering::SeqCst) { let proc_ctl = channel_mgr.clone(); let level = config.logging.ingest_level.clone(); diff --git a/engine/src/player/output/hls.rs b/engine/src/player/output/hls.rs index 2bd7b690..3fdd5a6e 100644 --- a/engine/src/player/output/hls.rs +++ b/engine/src/player/output/hls.rs @@ -34,8 +34,8 @@ use crate::{ controller::{ChannelManager, ProcessUnit::*}, input::source_generator, utils::{ - get_delta, prepare_output_cmd, sec_to_time, stderr_reader, test_tcp_port, valid_stream, - Media, + get_delta, is_free_tcp_port, prepare_output_cmd, sec_to_time, stderr_reader, + valid_stream, Media, }, }, utils::{errors::ProcessError, logging::Target}, @@ -47,6 +47,7 @@ fn ingest_to_hls_server(manager: ChannelManager) -> Result<(), ProcessError> { let id = config.general.channel_id; let playlist_init = manager.list_init.clone(); let chain = manager.filter_chain.clone(); + let mut error_count = 0; let mut server_prefix = vec_strings!["-hide_banner", "-nostats", "-v", "level+info"]; let stream_input = config.ingest.input_cmd.clone().unwrap(); @@ -76,12 +77,12 @@ fn ingest_to_hls_server(manager: ChannelManager) -> Result<(), ProcessError> { let mut is_running; if let Some(url) = stream_input.iter().find(|s| s.contains("://")) { - if !test_tcp_port(id, url) { + if !is_free_tcp_port(id, url) { manager.channel.lock().unwrap().active = false; manager.stop_all(); + } else { + info!(target: Target::file_mail(), channel = id; "Start ingest server, listening on: {url}"); } - - info!(target: Target::file_mail(), channel = id; "Start ingest server, listening on: {url}"); }; drop(config); @@ -90,6 +91,7 @@ fn ingest_to_hls_server(manager: ChannelManager) -> Result<(), ProcessError> { let config = manager.config.lock().unwrap().clone(); dummy_media.add_filter(&config, &chain); let server_cmd = prepare_output_cmd(&config, server_prefix.clone(), &dummy_media.filter); + let timer = SystemTime::now(); debug!(target: Target::file_mail(), channel = id; "Server CMD: \"ffmpeg {}\"", @@ -117,6 +119,8 @@ fn ingest_to_hls_server(manager: ChannelManager) -> Result<(), ProcessError> { let line = line?; if line.contains("rtmp") && line.contains("Unexpected stream") && !valid_stream(&line) { + warn!(target: Target::file_mail(), channel = id; "Unexpected ingest stream: {line}"); + if let Err(e) = proc_ctl.stop(Ingest) { error!(target: Target::file_mail(), channel = id; "{e}"); }; @@ -150,6 +154,21 @@ fn ingest_to_hls_server(manager: ChannelManager) -> Result<(), ProcessError> { if is_terminated.load(Ordering::SeqCst) { break; } + + if let Ok(elapsed) = timer.elapsed() { + if elapsed.as_millis() < 300 { + error_count += 1; + + if error_count > 10 { + error!(target: Target::file_mail(), channel = id; "Reach fatal error count in ingest, terminate channel!"); + manager.channel.lock().unwrap().active = false; + manager.stop_all(); + break; + } + } else { + error_count = 0; + } + } } Ok(()) diff --git a/engine/src/player/output/mod.rs b/engine/src/player/output/mod.rs index ac8af22f..5248f3a8 100644 --- a/engine/src/player/output/mod.rs +++ b/engine/src/player/output/mod.rs @@ -1,12 +1,11 @@ use std::{ io::{prelude::*, BufReader, BufWriter, Read}, process::{Command, Stdio}, - sync::atomic::Ordering, + sync::{atomic::Ordering, mpsc::sync_channel}, thread::{self, sleep}, time::{Duration, SystemTime}, }; -use crossbeam_channel::bounded; use log::*; mod desktop; @@ -72,7 +71,7 @@ pub fn player(manager: ChannelManager) -> Result<(), ProcessError> { // spawn a thread for ffmpeg ingest server and create a channel for package sending if config.ingest.enable { - let (ingest_sender, rx) = bounded(96); + let (ingest_sender, rx) = sync_channel(96); ingest_receiver = Some(rx); thread::spawn(move || ingest_server(config_clone, ingest_sender, channel_mgr_2)); } diff --git a/engine/src/player/utils/mod.rs b/engine/src/player/utils/mod.rs index 56cc9249..d07b692f 100644 --- a/engine/src/player/utils/mod.rs +++ b/engine/src/player/utils/mod.rs @@ -1059,7 +1059,7 @@ pub fn validate_ffmpeg(config: &mut PlayoutConfig) -> Result<(), String> { } /// get a free tcp socket -pub fn free_tcp_socket(exclude_socket: String) -> Option { +pub fn gen_tcp_socket(exclude_socket: String) -> Option { for _ in 0..100 { let port = rand::thread_rng().gen_range(45321..54268); let socket = format!("127.0.0.1:{port}"); @@ -1073,12 +1073,12 @@ pub fn free_tcp_socket(exclude_socket: String) -> Option { } /// check if tcp port is free -pub fn test_tcp_port(id: i32, url: &str) -> bool { - let re = Regex::new(r"^[\w]+\://").unwrap(); +pub fn is_free_tcp_port(id: i32, url: &str) -> bool { + let re = Regex::new(r"^[\w]+://([^/]+)").unwrap(); let mut addr = url.to_string(); - if re.is_match(url) { - addr = re.replace(url, "").to_string(); + if let Some(base_url) = re.captures(url).and_then(|u| u.get(1)) { + addr = base_url.as_str().to_string(); } if let Some(socket) = addr.split_once(':') { @@ -1092,7 +1092,7 @@ pub fn test_tcp_port(id: i32, url: &str) -> bool { } }; - error!(target: Target::file_mail(), channel = id; "Address {url} already in use!"); + error!(target: Target::file_mail(), channel = id; "Address {addr} already in use!"); false } diff --git a/engine/src/utils/config.rs b/engine/src/utils/config.rs index dfbd936d..c73c455e 100644 --- a/engine/src/utils/config.rs +++ b/engine/src/utils/config.rs @@ -14,7 +14,7 @@ use tokio::{fs, io::AsyncReadExt}; use ts_rs::TS; use crate::db::{handles, models}; -use crate::utils::{files::norm_abs_path, free_tcp_socket, time_to_sec}; +use crate::utils::{files::norm_abs_path, gen_tcp_socket, time_to_sec}; use crate::vec_strings; use crate::AdvancedConfig; use crate::ARGS; @@ -797,9 +797,9 @@ impl PlayoutConfig { // when text overlay without text_from_filename is on, turn also the RPC server on, // to get text messages from it if text.add_text && !text.text_from_filename { - text.zmq_stream_socket = free_tcp_socket(String::new()); + text.zmq_stream_socket = gen_tcp_socket(String::new()); text.zmq_server_socket = - free_tcp_socket(text.zmq_stream_socket.clone().unwrap_or_default()); + gen_tcp_socket(text.zmq_stream_socket.clone().unwrap_or_default()); text.node_pos = Some(2); } else { text.zmq_stream_socket = None; diff --git a/engine/src/utils/control.rs b/engine/src/utils/control.rs index 212846fc..fc6b6d68 100644 --- a/engine/src/utils/control.rs +++ b/engine/src/utils/control.rs @@ -161,17 +161,9 @@ pub async fn control_state( date.clone_from(¤t_date); handles::update_stat(conn, config.general.channel_id, current_date, delta).await?; - if let Some(proc) = manager.decoder.lock().unwrap().as_mut() { - if let Err(e) = proc.kill() { - error!(target: Target::file_mail(), channel = id; "Decoder {e:?}") - }; - - if let Err(e) = proc.wait() { - error!(target: Target::file_mail(), channel = id; "Decoder {e:?}") - }; - } else { + if manager.stop(Decoder).is_err() { return Err(ServiceError::InternalServerError); - } + }; data_map.insert("operation".to_string(), json!("move_to_last")); data_map.insert("shifted_seconds".to_string(), json!(delta)); @@ -197,17 +189,9 @@ pub async fn control_state( date.clone_from(¤t_date); handles::update_stat(conn, config.general.channel_id, current_date, delta).await?; - if let Some(proc) = manager.decoder.lock().unwrap().as_mut() { - if let Err(e) = proc.kill() { - error!(target: Target::file_mail(), channel = id; "Decoder {e:?}") - }; - - if let Err(e) = proc.wait() { - error!(target: Target::file_mail(), channel = id; "Decoder {e:?}") - }; - } else { + if manager.stop(Decoder).is_err() { return Err(ServiceError::InternalServerError); - } + }; data_map.insert("operation".to_string(), json!("move_to_next")); data_map.insert("shifted_seconds".to_string(), json!(delta)); @@ -228,17 +212,9 @@ pub async fn control_state( handles::update_stat(conn, config.general.channel_id, current_date, 0.0).await?; - if let Some(proc) = manager.decoder.lock().unwrap().as_mut() { - if let Err(e) = proc.kill() { - error!(target: Target::file_mail(), channel = id; "Decoder {e:?}") - }; - - if let Err(e) = proc.wait() { - error!(target: Target::file_mail(), channel = id; "Decoder {e:?}") - }; - } else { + if manager.stop(Decoder).is_err() { return Err(ServiceError::InternalServerError); - } + }; data_map.insert("operation".to_string(), json!("reset_playout_state")); diff --git a/engine/src/utils/mod.rs b/engine/src/utils/mod.rs index 8365aa35..6b1f6b29 100644 --- a/engine/src/utils/mod.rs +++ b/engine/src/utils/mod.rs @@ -230,12 +230,14 @@ pub async fn read_log_file(channel_id: &i32, date: &str) -> Result 5000000.0 { error!("Log file to big: {}", sizeof_fmt(file_size)); - format!("The log file is larger ({}) than the hard limit of 5MB, the probability is very high that something is wrong with the playout. Check this on the server with `less {log_path:?}`.", sizeof_fmt(file_size)) + format!("The log file is larger ({}) than the hard limit of 5MB, the probability is very high that something is wrong with the playout.\nCheck this on the server with `less {log_path:?}`.", sizeof_fmt(file_size)) } else { fs::read_to_string(log_path).await? }; @@ -293,7 +295,7 @@ where } /// get a free tcp socket -pub fn free_tcp_socket(exclude_socket: String) -> Option { +pub fn gen_tcp_socket(exclude_socket: String) -> Option { for _ in 0..100 { let port = rand::thread_rng().gen_range(45321..54268); let socket = format!("127.0.0.1:{port}"); diff --git a/frontend/pages/logging.vue b/frontend/pages/logging.vue index 4f074a8d..f6a01819 100644 --- a/frontend/pages/logging.vue +++ b/frontend/pages/logging.vue @@ -110,7 +110,7 @@ function filterLogsBySeverity(logString: string, minSeverity: string): string { const logLevel = match[1] return indexStore.severityLevels[logLevel] >= minLevel } - return false + return true }) return filteredLogs.join('\n') }