From f908e29fc595fd3111c27fb072d8afe178fe263a Mon Sep 17 00:00:00 2001 From: jb-alvarado Date: Mon, 10 Jun 2024 18:44:43 +0200 Subject: [PATCH] migrate config to db --- Cargo.lock | 443 ++------------- ffplayout-engine/Cargo.toml | 2 +- ffplayout-engine/src/rpc/zmq_cmd.rs | 3 +- ffplayout/Cargo.toml | 7 +- ffplayout/examples/flexi3.rs | 8 +- ffplayout/src/api/routes.rs | 94 +--- ffplayout/src/db/handles.rs | 67 ++- ffplayout/src/db/mod.rs | 7 +- ffplayout/src/db/models.rs | 135 ++++- ffplayout/src/main.rs | 28 +- ffplayout/src/player/controller.rs | 30 +- ffplayout/src/player/filter/mod.rs | 149 ++---- ffplayout/src/player/filter/v_drawtext.rs | 12 +- ffplayout/src/player/input/ingest.rs | 8 +- ffplayout/src/player/input/mod.rs | 10 +- ffplayout/src/player/input/playlist.rs | 24 +- ffplayout/src/player/output/desktop.rs | 8 +- ffplayout/src/player/output/hls.rs | 29 +- ffplayout/src/player/output/mod.rs | 27 +- ffplayout/src/player/output/null.rs | 6 +- ffplayout/src/player/output/stream.rs | 6 +- ffplayout/src/player/utils/json_validate.rs | 8 +- ffplayout/src/player/utils/mod.rs | 71 +-- ffplayout/src/sse/broadcast.rs | 62 +-- ffplayout/src/sse/routes.rs | 14 +- ffplayout/src/utils/advanced_config.rs | 97 ++-- ffplayout/src/utils/channels.rs | 26 +- ffplayout/src/utils/config.rs | 563 +++++++++++--------- ffplayout/src/utils/control.rs | 240 +++++++-- ffplayout/src/utils/logging.rs | 7 +- ffplayout/src/utils/mod.rs | 134 ++++- ffplayout/src/utils/task_runner.rs | 12 +- migrations/00001_create_tables.sql | 62 ++- 33 files changed, 1197 insertions(+), 1202 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5d7455f0..dca81cb7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -21,9 +21,9 @@ dependencies = [ [[package]] name = "actix-files" -version = "0.6.5" +version = "0.6.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bf0bdd6ff79de7c9a021f5d9ea79ce23e108d8bfc9b49b5b4a2cf6fad5a35212" +checksum = "0773d59061dedb49a8aed04c67291b9d8cf2fe0b60130a381aab53c6dd86e9be" dependencies = [ "actix-http", "actix-service", @@ -93,9 +93,9 @@ dependencies = [ [[package]] name = "actix-multipart" -version = "0.6.1" +version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3b960e2aea75f49c8f069108063d12a48d329fc8b60b786dfc7552a9d5918d2d" +checksum = "d974dd6c4f78d102d057c672dcf6faa618fafa9df91d44f9c466688fc1275a3a" dependencies = [ "actix-multipart-derive", "actix-utils", @@ -109,6 +109,7 @@ dependencies = [ "log", "memchr", "mime", + "rand", "serde", "serde_json", "serde_plain", @@ -146,9 +147,9 @@ dependencies = [ [[package]] name = "actix-rt" -version = "2.9.0" +version = "2.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "28f32d40287d3f402ae0028a9d54bef51af15c8769492826a69d28f81893151d" +checksum = "24eda4e2a6e042aa4e55ac438a2ae052d3b5da0ecf83d7411e1a368946925208" dependencies = [ "futures-core", "tokio", @@ -156,9 +157,9 @@ dependencies = [ [[package]] name = "actix-server" -version = "2.3.0" +version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3eb13e7eef0423ea6eab0e59f6c72e7cb46d33691ad56a726b3cd07ddec2c2d4" +checksum = "b02303ce8d4e8be5b855af6cf3c3a08f3eff26880faad82bab679c22d3650cb5" dependencies = [ "actix-rt", "actix-service", @@ -166,7 +167,7 @@ dependencies = [ "futures-core", "futures-util", "mio", - "socket2 0.5.7", + "socket2", "tokio", "tracing", ] @@ -194,9 +195,9 @@ dependencies = [ [[package]] name = "actix-web" -version = "4.6.0" +version = "4.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1cf67dadb19d7c95e5a299e2dda24193b89d5d4f33a3b9800888ede9e19aa32" +checksum = "5d6316df3fa569627c98b12557a8b6ff0674e5be4bb9b5e4ae2550ddb4964ed6" dependencies = [ "actix-codec", "actix-http", @@ -228,16 +229,16 @@ dependencies = [ "serde_json", "serde_urlencoded", "smallvec", - "socket2 0.5.7", + "socket2", "time", "url", ] [[package]] name = "actix-web-codegen" -version = "4.2.2" +version = "4.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb1f50ebbb30eca122b188319a4398b3f7bb4a8cdf50ecfb73bfc6a3c3ce54f5" +checksum = "f591380e2e68490b5dfaf1dd1aa0ebe78d84ba7067078512b4ea6e4492d622b8" dependencies = [ "actix-router", "proc-macro2", @@ -483,159 +484,6 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d92bec98840b8f03a5ff5413de5293bfcd8bf96467cf5452609f939ec6f5de16" -[[package]] -name = "async-attributes" -version = "1.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a3203e79f4dd9bdda415ed03cf14dae5a2bf775c683a00f94e9cd1faf0f596e5" -dependencies = [ - "quote", - "syn 1.0.109", -] - -[[package]] -name = "async-channel" -version = "1.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35" -dependencies = [ - "concurrent-queue", - "event-listener 2.5.3", - "futures-core", -] - -[[package]] -name = "async-channel" -version = "2.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89b47800b0be77592da0afd425cc03468052844aff33b84e33cc696f64e77b6a" -dependencies = [ - "concurrent-queue", - "event-listener-strategy", - "futures-core", - "pin-project-lite", -] - -[[package]] -name = "async-executor" -version = "1.12.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c8828ec6e544c02b0d6691d21ed9f9218d0384a82542855073c2a3f58304aaf0" -dependencies = [ - "async-task", - "concurrent-queue", - "fastrand 2.1.0", - "futures-lite 2.3.0", - "slab", -] - -[[package]] -name = "async-global-executor" -version = "2.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "05b1b633a2115cd122d73b955eadd9916c18c8f510ec9cd1686404c60ad1c29c" -dependencies = [ - "async-channel 2.3.1", - "async-executor", - "async-io 2.3.3", - "async-lock 3.4.0", - "blocking", - "futures-lite 2.3.0", - "once_cell", -] - -[[package]] -name = "async-io" -version = "1.13.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0fc5b45d93ef0529756f812ca52e44c221b35341892d3dcc34132ac02f3dd2af" -dependencies = [ - "async-lock 2.8.0", - "autocfg", - "cfg-if", - "concurrent-queue", - "futures-lite 1.13.0", - "log", - "parking", - "polling 2.8.0", - "rustix 0.37.27", - "slab", - "socket2 0.4.10", - "waker-fn", -] - -[[package]] -name = "async-io" -version = "2.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0d6baa8f0178795da0e71bc42c9e5d13261aac7ee549853162e66a241ba17964" -dependencies = [ - "async-lock 3.4.0", - "cfg-if", - "concurrent-queue", - "futures-io", - "futures-lite 2.3.0", - "parking", - "polling 3.7.1", - "rustix 0.38.34", - "slab", - "tracing", - "windows-sys 0.52.0", -] - -[[package]] -name = "async-lock" -version = "2.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "287272293e9d8c41773cec55e365490fe034813a2f172f502d6ddcf75b2f582b" -dependencies = [ - "event-listener 2.5.3", -] - -[[package]] -name = "async-lock" -version = "3.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ff6e472cdea888a4bd64f342f09b3f50e1886d32afe8df3d663c01140b811b18" -dependencies = [ - "event-listener 5.3.1", - "event-listener-strategy", - "pin-project-lite", -] - -[[package]] -name = "async-std" -version = "1.12.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62565bb4402e926b29953c785397c6dc0391b7b446e45008b0049eb43cec6f5d" -dependencies = [ - "async-attributes", - "async-channel 1.9.0", - "async-global-executor", - "async-io 1.13.0", - "async-lock 2.8.0", - "crossbeam-utils", - "futures-channel", - "futures-core", - "futures-io", - "futures-lite 1.13.0", - "gloo-timers", - "kv-log-macro", - "log", - "memchr", - "once_cell", - "pin-project-lite", - "pin-utils", - "slab", - "wasm-bindgen-futures", -] - -[[package]] -name = "async-task" -version = "4.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b75356056920673b02621b35afd0f7dda9306d03c79a30f5c56c44cf256e3de" - [[package]] name = "async-trait" version = "0.1.80" @@ -669,12 +517,6 @@ dependencies = [ "num-traits", ] -[[package]] -name = "atomic-waker" -version = "1.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" - [[package]] name = "autocfg" version = "1.3.0" @@ -747,19 +589,6 @@ dependencies = [ "generic-array", ] -[[package]] -name = "blocking" -version = "1.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "703f41c54fc768e63e091340b424302bb1c29ef4aa0c7f10fe849dfb114d29ea" -dependencies = [ - "async-channel 2.3.1", - "async-task", - "futures-io", - "futures-lite 2.3.0", - "piper", -] - [[package]] name = "brotli" version = "6.0.0" @@ -810,9 +639,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.0.98" +version = "1.0.99" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "41c270e7540d725e65ac7f1b212ac8ce349719624d7bcff99f8e2e488e8cf03f" +checksum = "96c51067fd44124faa7f870b4b1c969379ad32b2ba805aa959430ceaa384f695" dependencies = [ "jobserver", "libc", @@ -912,15 +741,6 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b6a852b24ab71dffc585bcb46eaf7959d175cb865a7152e35b348d1b2960422" -[[package]] -name = "concurrent-queue" -version = "2.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973" -dependencies = [ - "crossbeam-utils", -] - [[package]] name = "const-oid" version = "0.9.6" @@ -1233,27 +1053,6 @@ version = "2.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" -[[package]] -name = "event-listener" -version = "5.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6032be9bd27023a771701cc49f9f053c751055f71efb2e0ae5c15809093675ba" -dependencies = [ - "concurrent-queue", - "parking", - "pin-project-lite", -] - -[[package]] -name = "event-listener-strategy" -version = "0.5.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f214dc438f977e6d4e3500aaa277f5ad94ca83fbbd9b1a15713ce2344ccc5a1" -dependencies = [ - "event-listener 5.3.1", - "pin-project-lite", -] - [[package]] name = "faccess" version = "0.2.4" @@ -1265,15 +1064,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "fastrand" -version = "1.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e51093e27b0797c359783294ca4f0a911c270184cb10f85783b118614a1501be" -dependencies = [ - "instant", -] - [[package]] name = "fastrand" version = "2.1.0" @@ -1300,6 +1090,7 @@ dependencies = [ "ffplayout-lib", "ffprobe", "flexi_logger", + "futures", "futures-util", "home", "itertools 0.13.0", @@ -1336,6 +1127,7 @@ dependencies = [ "toml_edit", "uuid", "walkdir", + "zeromq", ] [[package]] @@ -1486,9 +1278,9 @@ dependencies = [ [[package]] name = "flexi_logger" -version = "0.28.1" +version = "0.28.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "419c99d8fc346ea0eaeaac2cc3945024d8fe82aa435aefc2fb9fcda1065f8774" +checksum = "e66e9fce047f849b42d25931c6442c10605a828a43b1419d3a9d89cfcf55d3f7" dependencies = [ "chrono", "glob", @@ -1593,34 +1385,6 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" -[[package]] -name = "futures-lite" -version = "1.13.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49a9d51ce47660b1e808d3c990b4709f2f415d928835a17dfd16991515c46bce" -dependencies = [ - "fastrand 1.9.0", - "futures-core", - "futures-io", - "memchr", - "parking", - "pin-project-lite", - "waker-fn", -] - -[[package]] -name = "futures-lite" -version = "2.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "52527eb5074e35e9339c6b4e8d12600c7128b68fb25dcb9fa9dec18f7c25f3a5" -dependencies = [ - "fastrand 2.1.0", - "futures-core", - "futures-io", - "parking", - "pin-project-lite", -] - [[package]] name = "futures-macro" version = "0.3.30" @@ -1697,18 +1461,6 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" -[[package]] -name = "gloo-timers" -version = "0.2.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b995a66bb87bebce9a0f4a95aed01daca4872c050bfcb21653361c03bc35e5c" -dependencies = [ - "futures-channel", - "futures-core", - "js-sys", - "wasm-bindgen", -] - [[package]] name = "h2" version = "0.3.26" @@ -1913,7 +1665,7 @@ dependencies = [ "http-body", "hyper", "pin-project-lite", - "socket2 0.5.7", + "socket2", "tokio", "tower", "tower-service", @@ -1995,26 +1747,6 @@ dependencies = [ "libc", ] -[[package]] -name = "instant" -version = "0.1.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e0242819d153cba4b4b05a5a8f2a7e9bbf97b6055b2a002b395c96b5ff3c0222" -dependencies = [ - "cfg-if", -] - -[[package]] -name = "io-lifetimes" -version = "1.0.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eae7b9aee968036d54dce06cebaefd919e4472e753296daccd6d344e3e2df0c2" -dependencies = [ - "hermit-abi", - "libc", - "windows-sys 0.48.0", -] - [[package]] name = "ipnet" version = "2.9.0" @@ -2115,15 +1847,6 @@ dependencies = [ "libc", ] -[[package]] -name = "kv-log-macro" -version = "1.0.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0de8b303297635ad57c9f5059fd9cee7a47f8e8daa09df0fcd07dd39fb22977f" -dependencies = [ - "log", -] - [[package]] name = "language-tags" version = "0.3.2" @@ -2150,7 +1873,7 @@ dependencies = [ "chumsky", "email-encoding", "email_address", - "fastrand 2.1.0", + "fastrand", "futures-io", "futures-util", "httpdate", @@ -2161,7 +1884,7 @@ dependencies = [ "quoted_printable", "rustls 0.23.9", "rustls-pemfile", - "socket2 0.5.7", + "socket2", "tokio", "tokio-rustls 0.26.0", "url", @@ -2200,12 +1923,6 @@ dependencies = [ "vcpkg", ] -[[package]] -name = "linux-raw-sys" -version = "0.3.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ef53942eb7bf7ff43a617b3e2c1c4a5ecf5944a7c1bc12d7ee39bbb15e5c1519" - [[package]] name = "linux-raw-sys" version = "0.4.14" @@ -2516,12 +2233,6 @@ version = "1.5.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8fecab3723493c7851f292cb060f3ee1c42f19b8d749345d0d7eaf3fd19aa62d" -[[package]] -name = "parking" -version = "2.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bb813b8af86854136c6922af0598d719255ecb2179515e6e7730d468f05c9cae" - [[package]] name = "parking_lot" version = "0.12.3" @@ -2646,17 +2357,6 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" -[[package]] -name = "piper" -version = "0.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ae1d5c74c9876f070d3e8fd503d748c7d974c3e48da8f41350fa5222ef9b4391" -dependencies = [ - "atomic-waker", - "fastrand 2.1.0", - "futures-io", -] - [[package]] name = "pkcs1" version = "0.7.5" @@ -2684,37 +2384,6 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d231b230927b5e4ad203db57bbcbee2802f6bce620b1e4a9024a07d94e2907ec" -[[package]] -name = "polling" -version = "2.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4b2d323e8ca7996b3e23126511a523f7e62924d93ecd5ae73b333815b0eb3dce" -dependencies = [ - "autocfg", - "bitflags 1.3.2", - "cfg-if", - "concurrent-queue", - "libc", - "log", - "pin-project-lite", - "windows-sys 0.48.0", -] - -[[package]] -name = "polling" -version = "3.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5e6a007746f34ed64099e88783b0ae369eaa3da6392868ba262e2af9b8fbaea1" -dependencies = [ - "cfg-if", - "concurrent-queue", - "hermit-abi", - "pin-project-lite", - "rustix 0.38.34", - "tracing", - "windows-sys 0.52.0", -] - [[package]] name = "powerfmt" version = "0.2.0" @@ -2842,9 +2511,9 @@ dependencies = [ [[package]] name = "regex" -version = "1.10.4" +version = "1.10.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c117dbdfde9c8308975b6a18d71f3f385c89461f7b3fb054288ecf2a2058ba4c" +checksum = "b91213439dad192326a0d7c6ee3955910425f441d7038e0d6933b0aec5c4517f" dependencies = [ "aho-corasick", "memchr", @@ -2854,9 +2523,9 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.4.6" +version = "0.4.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "86b83b8b9847f9bf95ef68afb0b8e6cdb80f498442f5179a29fad448fcc1eaea" +checksum = "38caf58cc5ef2fed281f89292ef23f6365465ed9a41b7a7754eb4e26496c92df" dependencies = [ "aho-corasick", "memchr", @@ -2865,15 +2534,15 @@ dependencies = [ [[package]] name = "regex-lite" -version = "0.1.5" +version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30b661b2f27137bdbc16f00eda72866a92bb28af1753ffbd56744fb6e2e9cd8e" +checksum = "53a49587ad06b26609c52e423de037e7f57f20d53535d66e08c695f347df952a" [[package]] name = "regex-syntax" -version = "0.8.3" +version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "adad44e29e4c806119491a7f06f03de4d1af22c3a680dd47f1e6e179439d1f56" +checksum = "7a66a03ae7c801facd77a29370b4faec201768915ac14a721ba36f20bc9c209b" [[package]] name = "relative-path" @@ -2994,20 +2663,6 @@ dependencies = [ "semver", ] -[[package]] -name = "rustix" -version = "0.37.27" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fea8ca367a3a01fe35e6943c400addf443c0f57670e6ec51196f71a4b8762dd2" -dependencies = [ - "bitflags 1.3.2", - "errno", - "io-lifetimes", - "libc", - "linux-raw-sys 0.3.8", - "windows-sys 0.48.0", -] - [[package]] name = "rustix" version = "0.38.34" @@ -3017,7 +2672,7 @@ dependencies = [ "bitflags 2.5.0", "errno", "libc", - "linux-raw-sys 0.4.14", + "linux-raw-sys", "windows-sys 0.52.0", ] @@ -3329,16 +2984,6 @@ version = "1.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67" -[[package]] -name = "socket2" -version = "0.4.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9f7916fc008ca5542385b89a3d3ce689953c143e9304a9bf8beec1de48994c0d" -dependencies = [ - "libc", - "winapi", -] - [[package]] name = "socket2" version = "0.5.7" @@ -3411,7 +3056,7 @@ dependencies = [ "crc", "crossbeam-queue", "either", - "event-listener 2.5.3", + "event-listener", "futures-channel", "futures-core", "futures-intrusive", @@ -3753,8 +3398,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "85b77fafb263dd9d05cbeac119526425676db3784113aa9295c88498cbf8bff1" dependencies = [ "cfg-if", - "fastrand 2.1.0", - "rustix 0.38.34", + "fastrand", + "rustix", "windows-sys 0.52.0", ] @@ -3886,7 +3531,7 @@ dependencies = [ "parking_lot", "pin-project-lite", "signal-hook-registry", - "socket2 0.5.7", + "socket2", "tokio-macros", "windows-sys 0.48.0", ] @@ -3943,6 +3588,7 @@ checksum = "9cf6b47b3771c49ac75ad09a6162f53ad4b8088b76ac60e8ec1455b31a189fe1" dependencies = [ "bytes", "futures-core", + "futures-io", "futures-sink", "pin-project-lite", "tokio", @@ -4120,9 +3766,9 @@ checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da" [[package]] name = "utf8parse" -version = "0.2.1" +version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" +checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" [[package]] name = "uuid" @@ -4187,12 +3833,6 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" -[[package]] -name = "waker-fn" -version = "1.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "317211a0dc0ceedd78fb2ca9a44aed3d7b9b26f81870d485c07122b4350673b7" - [[package]] name = "walkdir" version = "2.5.0" @@ -4559,7 +4199,6 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fb0560d00172817b7f7c2265060783519c475702ae290b154115ca75e976d4d0" dependencies = [ - "async-std", "async-trait", "asynchronous-codec", "bytes", @@ -4576,6 +4215,8 @@ dependencies = [ "rand", "regex", "thiserror", + "tokio", + "tokio-util", "uuid", ] diff --git a/ffplayout-engine/Cargo.toml b/ffplayout-engine/Cargo.toml index c896c781..e5c31b11 100644 --- a/ffplayout-engine/Cargo.toml +++ b/ffplayout-engine/Cargo.toml @@ -27,7 +27,7 @@ serde_json = "1.0" simplelog = { version = "0.12", features = ["paris"] } tiny_http = { version = "0.12", default-features = false } zeromq = { version = "0.4", default-features = false, features = [ - "async-std-runtime", + "tokio-runtime", "tcp-transport", ] } diff --git a/ffplayout-engine/src/rpc/zmq_cmd.rs b/ffplayout-engine/src/rpc/zmq_cmd.rs index 9238b3f5..41356867 100644 --- a/ffplayout-engine/src/rpc/zmq_cmd.rs +++ b/ffplayout-engine/src/rpc/zmq_cmd.rs @@ -1,7 +1,6 @@ use std::error::Error; -use zeromq::Socket; -use zeromq::{SocketRecv, SocketSend, ZmqMessage}; +use zeromq::{Socket, SocketRecv, SocketSend, ZmqMessage}; pub async fn zmq_send(msg: &str, socket_addr: &str) -> Result> { let mut socket = zeromq::ReqSocket::new(); diff --git a/ffplayout/Cargo.toml b/ffplayout/Cargo.toml index 2b4e4e2e..24477a66 100644 --- a/ffplayout/Cargo.toml +++ b/ffplayout/Cargo.toml @@ -30,6 +30,7 @@ faccess = "0.2" ffprobe = "0.4" flexi_logger = { version = "0.28", features = ["kv", "colors"] } futures-util = { version = "0.3", default-features = false, features = ["std"] } +futures = "0.3" home = "0.5" itertools = "0.13" jsonwebtoken = "9" @@ -41,7 +42,7 @@ log = { version = "0.4", features = ["std", "serde", "kv", "kv_std", "kv_sval", notify = "6.0" notify-debouncer-full = { version = "*", default-features = false } num-traits = "0.2" -once_cell = "1.18" +once_cell = "1" paris = "1.5" parking_lot = "0.12" path-clean = "1.0" @@ -64,6 +65,10 @@ tokio-stream = "0.1" toml_edit = {version ="0.22", features = ["serde"]} uuid = "1.8" walkdir = "2" +zeromq = { version = "0.4", default-features = false, features = [ + "tokio-runtime", + "tcp-transport", +] } [target.'cfg(not(target_arch = "windows"))'.dependencies] signal-child = "1" diff --git a/ffplayout/examples/flexi3.rs b/ffplayout/examples/flexi3.rs index 98e6b016..10da1055 100644 --- a/ffplayout/examples/flexi3.rs +++ b/ffplayout/examples/flexi3.rs @@ -22,13 +22,15 @@ impl MultiFileLogger { let writer = FileLogWriter::builder( FileSpec::default() .suppress_timestamp() - .basename("ffplayout") - .discriminant(channel), + .basename("ffplayout"), ) .append() .rotate( Criterion::Age(Age::Day), - Naming::Timestamps, + Naming::TimestampsCustomFormat { + current_infix: Some(""), + format: "%Y-%m-%d", + }, Cleanup::KeepLogFiles(7), ) .print_message() diff --git a/ffplayout/src/api/routes.rs b/ffplayout/src/api/routes.rs index 0d3a1b16..e9f67085 100644 --- a/ffplayout/src/api/routes.rs +++ b/ffplayout/src/api/routes.rs @@ -8,7 +8,7 @@ /// /// For all endpoints an (Bearer) authentication is required.\ /// `{id}` represent the channel id, and at default is 1. -use std::{collections::HashMap, env, path::PathBuf}; +use std::{env, path::PathBuf, sync::Mutex}; use actix_files; use actix_multipart::Multipart; @@ -27,24 +27,20 @@ use argon2::{ Argon2, PasswordHasher, PasswordVerifier, }; use chrono::{DateTime, Datelike, Local, NaiveDateTime, TimeDelta, TimeZone, Utc}; +use log::*; use path_clean::PathClean; use regex::Regex; use serde::{Deserialize, Serialize}; -use simplelog::*; use sqlx::{Pool, Sqlite}; use tokio::{fs, task}; -use crate::db::{ - handles, - models::{Channel, LoginUser, TextPreset, User}, -}; use crate::player::utils::{ - get_date_range, import::import_file, sec_to_time, time_to_sec, JsonPlaylist, + get_data_map, get_date_range, import::import_file, sec_to_time, time_to_sec, JsonPlaylist, }; use crate::utils::{ channels::{create_channel, delete_channel}, config::{PlayoutConfig, Template}, - control::{control_state, media_info, send_message, ControlParams, Process}, + control::{control_state, send_message, ControlParams, Process}, errors::ServiceError, files::{ browser, create_directory, norm_abs_path, remove_file_or_folder, rename_file, upload, @@ -52,13 +48,20 @@ use crate::utils::{ }, naive_date_time_from_str, playlist::{delete_playlist, generate_playlist, read_playlist, write_playlist}, - playout_config, public_path, read_log_file, read_playout_config, system, Role, + public_path, read_log_file, system, Role, TextFilter, }; use crate::vec_strings; use crate::{ api::auth::{create_jwt, Claims}, utils::control::ProcessControl, }; +use crate::{ + db::{ + handles, + models::{Channel, LoginUser, TextPreset, User}, + }, + player::controller::ChannelController, +}; #[derive(Serialize)] struct UserObj { @@ -491,9 +494,7 @@ async fn get_playout_config( _details: AuthDetails, ) -> Result { if let Ok(channel) = handles::select_channel(&pool.into_inner(), &id).await { - if let Ok(config) = read_playout_config(&channel.config_path) { - return Ok(web::Json(config)); - } + // TODO: get config }; Err(ServiceError::InternalServerError) @@ -513,8 +514,7 @@ async fn update_playout_config( data: web::Json, ) -> Result { if let Ok(channel) = handles::select_channel(&pool.into_inner(), &id).await { - let toml_string = toml_edit::ser::to_string_pretty(&data)?; - fs::write(&channel.config_path, toml_string).await?; + // TODO: update config return Ok("Update playout config success."); }; @@ -632,14 +632,14 @@ async fn delete_preset( #[post("/control/{id}/text/")] #[protect(any("Role::Admin", "Role::User"), ty = "Role")] pub async fn send_text_message( - pool: web::Data>, id: web::Path, - data: web::Json>, + data: web::Json, + controllers: web::Data>, ) -> Result { - let (config, _) = playout_config(&pool.clone().into_inner(), &id).await?; + let manager = controllers.lock().unwrap().get(*id).unwrap(); - match send_message(&config, data.into_inner()).await { - Ok(res) => Ok(res.text().await.unwrap_or_else(|_| "Success".into())), + match send_message(manager, data.into_inner()).await { + Ok(res) => Ok(web::Json(res)), Err(e) => Err(e), } } @@ -660,11 +660,12 @@ pub async fn control_playout( pool: web::Data>, id: web::Path, control: web::Json, + controllers: web::Data>, ) -> Result { - let (config, _) = playout_config(&pool.clone().into_inner(), &id).await?; + let manager = controllers.lock().unwrap().get(*id).unwrap(); - match control_state(&config, &control.control).await { - Ok(res) => Ok(res.text().await.unwrap_or_else(|_| "Success".into())), + match control_state(&pool, manager, &control.control).await { + Ok(res) => Ok(web::Json(res)), Err(e) => Err(e), } } @@ -696,54 +697,13 @@ pub async fn control_playout( #[get("/control/{id}/media/current")] #[protect(any("Role::Admin", "Role::User"), ty = "Role")] pub async fn media_current( - pool: web::Data>, id: web::Path, + controllers: web::Data>, ) -> Result { - let (config, _) = playout_config(&pool.clone().into_inner(), &id).await?; + let manager = controllers.lock().unwrap().get(*id).unwrap(); + let media_map = get_data_map(&manager); - match media_info(&config, "current".into()).await { - Ok(res) => Ok(res.text().await.unwrap_or_else(|_| "Success".into())), - Err(e) => Err(e), - } -} - -/// **Get next Clip** -/// -/// ```BASH -/// curl -X GET http://127.0.0.1:8787/api/control/1/media/next -H 'Authorization: Bearer ' -/// ``` -#[get("/control/{id}/media/next")] -#[protect(any("Role::Admin", "Role::User"), ty = "Role")] -pub async fn media_next( - pool: web::Data>, - id: web::Path, -) -> Result { - let (config, _) = playout_config(&pool.clone().into_inner(), &id).await?; - - match media_info(&config, "next".into()).await { - Ok(res) => Ok(res.text().await.unwrap_or_else(|_| "Success".into())), - Err(e) => Err(e), - } -} - -/// **Get last Clip** -/// -/// ```BASH -/// curl -X GET http://127.0.0.1:8787/api/control/1/media/last -/// -H 'Content-Type: application/json' -H 'Authorization: Bearer ' -/// ``` -#[get("/control/{id}/media/last")] -#[protect(any("Role::Admin", "Role::User"), ty = "Role")] -pub async fn media_last( - pool: web::Data>, - id: web::Path, -) -> Result { - let (config, _) = playout_config(&pool.clone().into_inner(), &id).await?; - - match media_info(&config, "last".into()).await { - Ok(res) => Ok(res.text().await.unwrap_or_else(|_| "Success".into())), - Err(e) => Err(e), - } + Ok(web::Json(media_map)) } /// #### ffplayout Process Control diff --git a/ffplayout/src/db/handles.rs b/ffplayout/src/db/handles.rs index 93401f90..c8bd30ca 100644 --- a/ffplayout/src/db/handles.rs +++ b/ffplayout/src/db/handles.rs @@ -5,30 +5,20 @@ use argon2::{ use rand::{distributions::Alphanumeric, Rng}; use simplelog::*; -use sqlx::{migrate::MigrateDatabase, sqlite::SqliteQueryResult, Pool, Sqlite}; +use sqlx::{sqlite::SqliteQueryResult, Pool, Sqlite}; use tokio::task; -use crate::db::{ - db_pool, - models::{Channel, TextPreset, User}, -}; -use crate::utils::{db_path, local_utc_offset, GlobalSettings, Role}; +use super::models::{AdvancedConfiguration, Configuration}; +use crate::db::models::{Channel, TextPreset, User}; +use crate::utils::{local_utc_offset, GlobalSettings, Role}; -pub async fn db_migrate() -> Result<&'static str, Box> { - let db_path = db_path()?; - - if !Sqlite::database_exists(db_path).await.unwrap_or(false) { - Sqlite::create_database(db_path).await.unwrap(); - } - - let pool = db_pool().await?; - - match sqlx::migrate!("../migrations").run(&pool).await { +pub async fn db_migrate(conn: &Pool) -> Result<&'static str, Box> { + match sqlx::migrate!("../migrations").run(conn).await { Ok(_) => info!("Database migration successfully"), Err(e) => panic!("{e}"), } - if let Err(_) = select_global(&pool).await { + if let Err(_) = select_global(conn).await { let secret: String = rand::thread_rng() .sample_iter(&Alphanumeric) .take(80) @@ -43,7 +33,7 @@ pub async fn db_migrate() -> Result<&'static str, Box> { END; INSERT INTO global(secret) VALUES($1);"; - sqlx::query(query).bind(secret).execute(&pool).await?; + sqlx::query(query).bind(secret).execute(conn).await?; } Ok("Database migrated!") @@ -80,24 +70,39 @@ pub async fn update_channel( id: i32, channel: Channel, ) -> Result { - let query = "UPDATE channels SET name = $2, preview_url = $3, config_path = $4, extra_extensions = $5 WHERE id = $1"; + let query = + "UPDATE channels SET name = $2, preview_url = $3, extra_extensions = $4 WHERE id = $1"; sqlx::query(query) .bind(id) .bind(channel.name) .bind(channel.preview_url) - .bind(channel.config_path) .bind(channel.extra_extensions) .execute(conn) .await } +pub async fn update_stat( + conn: &Pool, + id: i32, + current_date: String, + time_shift: f64, +) -> Result { + let query = "UPDATE channels SET current_date = $2, time_shift = $3 WHERE id = $1"; + + sqlx::query(query) + .bind(id) + .bind(current_date) + .bind(time_shift) + .execute(conn) + .await +} + pub async fn insert_channel(conn: &Pool, channel: Channel) -> Result { - let query = "INSERT INTO channels (name, preview_url, config_path, extra_extensions) VALUES($1, $2, $3, $4)"; + let query = "INSERT INTO channels (name, preview_url, extra_extensions) VALUES($1, $2, $3)"; let result = sqlx::query(query) .bind(channel.name) .bind(channel.preview_url) - .bind(channel.config_path) .bind(channel.extra_extensions) .execute(conn) .await?; @@ -123,6 +128,24 @@ pub async fn select_last_channel(conn: &Pool) -> Result, + channel: i32, +) -> Result { + let query = "SELECT * FROM configurations WHERE channel_id = $1"; + + sqlx::query_as(query).bind(channel).fetch_one(conn).await +} + +pub async fn select_advanced_configuration( + conn: &Pool, + channel: i32, +) -> Result { + let query = "SELECT * FROM advanced_configurations WHERE channel_id = $1"; + + sqlx::query_as(query).bind(channel).fetch_one(conn).await +} + pub async fn select_role(conn: &Pool, id: &i32) -> Result { let query = "SELECT name FROM roles WHERE id = $1"; let result: Role = sqlx::query_as(query).bind(id).fetch_one(conn).await?; diff --git a/ffplayout/src/db/mod.rs b/ffplayout/src/db/mod.rs index fdf6adbd..2f89848b 100644 --- a/ffplayout/src/db/mod.rs +++ b/ffplayout/src/db/mod.rs @@ -1,4 +1,4 @@ -use sqlx::{Pool, Sqlite, SqlitePool}; +use sqlx::{migrate::MigrateDatabase, Pool, Sqlite, SqlitePool}; pub mod handles; pub mod models; @@ -7,6 +7,11 @@ use crate::utils::db_path; pub async fn db_pool() -> Result, sqlx::Error> { let db_path = db_path().unwrap(); + + if !Sqlite::database_exists(db_path).await.unwrap_or(false) { + Sqlite::create_database(db_path).await.unwrap(); + } + let conn = SqlitePool::connect(db_path).await?; Ok(conn) diff --git a/ffplayout/src/db/models.rs b/ffplayout/src/db/models.rs index a65aa674..3b6f124f 100644 --- a/ffplayout/src/db/models.rs +++ b/ffplayout/src/db/models.rs @@ -111,10 +111,143 @@ pub struct Channel { pub config_path: String, pub extra_extensions: String, pub active: bool, - pub modified: Option, + pub current_date: Option, pub time_shift: f64, #[sqlx(default)] #[serde(default)] pub utc_offset: i32, } + +#[derive(Clone, Debug, Deserialize, Serialize, sqlx::FromRow)] +pub struct Configuration { + pub id: i32, + pub channel_id: i32, + pub general_help: String, + pub stop_threshold: f64, + + pub mail_help: String, + pub subject: String, + pub smtp_server: String, + pub starttls: bool, + pub sender_addr: String, + pub sender_pass: String, + pub recipient: String, + pub mail_level: String, + pub interval: i64, + + pub logging_help: String, + pub ffmpeg_level: String, + pub ingest_level: String, + #[serde(default)] + pub detect_silence: bool, + #[serde(default)] + pub ignore_lines: String, + + pub processing_help: String, + pub processing_mode: String, + #[serde(default)] + pub audio_only: bool, + #[serde(default = "default_track_index")] + pub audio_track_index: i32, + #[serde(default)] + pub copy_audio: bool, + #[serde(default)] + pub copy_video: bool, + pub width: i64, + pub height: i64, + pub aspect: f64, + pub fps: f64, + pub add_logo: bool, + pub logo: String, + pub logo_scale: String, + pub logo_opacity: f32, + pub logo_position: String, + #[serde(default = "default_tracks")] + pub audio_tracks: i32, + #[serde(default = "default_channels")] + pub audio_channels: u8, + pub volume: f64, + #[serde(default)] + pub decoder_filter: String, + + pub ingest_help: String, + pub ingest_enable: bool, + pub ingest_param: String, + #[serde(default)] + pub ingest_filter: String, + + pub playlist_help: String, + pub playlist_path: String, + pub day_start: String, + pub length: String, + pub infinit: bool, + + pub storage_help: String, + pub storage_path: String, + + #[serde(alias = "filler_clip")] + pub filler: String, + pub extensions: String, + pub shuffle: bool, + + pub text_help: String, + pub add_text: bool, + + pub fontfile: String, + pub text_from_filename: bool, + pub style: String, + pub regex: String, + + pub task_help: String, + pub task_enable: bool, + pub task_path: String, + + pub output_help: String, + pub output_mode: String, + pub output_param: String, +} + +fn default_track_index() -> i32 { + -1 +} + +fn default_tracks() -> i32 { + 1 +} + +fn default_channels() -> u8 { + 2 +} + +#[derive(Clone, Debug, Deserialize, Serialize, sqlx::FromRow)] +pub struct AdvancedConfiguration { + pub id: i32, + pub channel_id: i32, + pub decoder_input_param: Option, + pub decoder_output_param: Option, + pub encoder_input_param: Option, + pub ingest_input_param: Option, + pub deinterlace: Option, + pub pad_scale_w: Option, + pub pad_scale_h: Option, + pub pad_video: Option, + pub fps: Option, + pub scale: Option, + pub set_dar: Option, + pub fade_in: Option, + pub fade_out: Option, + pub overlay_logo_scale: Option, + pub overlay_logo_fade_in: Option, + pub overlay_logo_fade_out: Option, + pub overlay_logo: Option, + pub tpad: Option, + pub drawtext_from_file: Option, + pub drawtext_from_zmq: Option, + pub aevalsrc: Option, + pub afade_in: Option, + pub afade_out: Option, + pub apad: Option, + pub volume: Option, + pub split: Option, +} diff --git a/ffplayout/src/main.rs b/ffplayout/src/main.rs index c1dddc1e..d416f6b5 100644 --- a/ffplayout/src/main.rs +++ b/ffplayout/src/main.rs @@ -1,7 +1,6 @@ use std::{ collections::HashSet, env, io, - path::PathBuf, process::{self, exit}, sync::{Arc, Mutex}, thread, @@ -68,14 +67,18 @@ async fn validator( #[actix_web::main] async fn main() -> std::io::Result<()> { - if let Err(c) = run_args().await { - exit(c); - } - let pool = db_pool() .await .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; + if let Err(e) = handles::db_migrate(&pool).await { + panic!("{e}"); + }; + + if let Err(c) = run_args().await { + exit(c); + } + let channel_controllers = Arc::new(Mutex::new(ChannelController::new())); let channels = handles::select_all_channels(&pool) .await @@ -87,14 +90,7 @@ async fn main() -> std::io::Result<()> { init_logging(mail_queues.clone())?; for channel in channels.iter() { - let config_path = PathBuf::from(&channel.config_path); - let config = match web::block(move || PlayoutConfig::new(Some(config_path), None)).await { - Ok(config) => config, - Err(e) => { - error!("Failed to load configuration: {}", e); - continue; - } - }; + let config = PlayoutConfig::new(&pool, channel.id).await; let channel_manager = ChannelManager::new(channel.clone(), config.clone()); @@ -110,8 +106,10 @@ async fn main() -> std::io::Result<()> { } if channel.active { + let pool_clone = pool.clone(); + thread::spawn(move || { - if let Err(e) = controller::start(channel_manager) { + if let Err(e) = controller::start(pool_clone, channel_manager) { error!("{e}"); }; @@ -182,8 +180,6 @@ async fn main() -> std::io::Result<()> { .service(send_text_message) .service(control_playout) .service(media_current) - .service(media_next) - .service(media_last) .service(process_control) .service(get_playlist) .service(save_playlist) diff --git a/ffplayout/src/player/controller.rs b/ffplayout/src/player/controller.rs index c74ae791..8895ab43 100644 --- a/ffplayout/src/player/controller.rs +++ b/ffplayout/src/player/controller.rs @@ -13,6 +13,7 @@ use signal_child::Signalable; use log::*; use serde::{Deserialize, Serialize}; +use sqlx::{Pool, Sqlite}; use crate::db::models::Channel; use crate::player::{ @@ -55,6 +56,14 @@ pub struct ChannelManager { pub ingest_is_running: Arc, pub is_terminated: Arc, pub is_alive: Arc, + pub chain: Option>>>, + pub current_date: Arc>, + pub list_init: Arc, + pub current_media: Arc>>, + pub current_list: Arc>>, + pub filler_list: Arc>>, + pub current_index: Arc, + pub filler_index: Arc, } impl ChannelManager { @@ -72,10 +81,9 @@ impl ChannelManager { channel.name = other.name.clone(); channel.preview_url = other.preview_url.clone(); - channel.config_path = other.config_path.clone(); channel.extra_extensions = other.extra_extensions.clone(); channel.active = other.active.clone(); - channel.modified = other.modified.clone(); + channel.current_date = other.current_date.clone(); channel.time_shift = other.time_shift.clone(); channel.utc_offset = other.utc_offset.clone(); } @@ -240,6 +248,16 @@ impl ChannelController { self.channels.push(manager); } + pub fn get(&self, id: i32) -> Option { + for manager in self.channels.iter() { + if manager.channel.lock().unwrap().id == id { + return Some(manager.clone()); + } + } + + None + } + pub fn remove(&mut self, channel_id: i32) { self.channels.retain(|manager| { let channel = manager.channel.lock().unwrap(); @@ -255,9 +273,9 @@ impl ChannelController { } } -pub fn start(channel: ChannelManager) -> Result<(), ProcessError> { +pub fn start(db_pool: Pool, channel: ChannelManager) -> Result<(), ProcessError> { let config = channel.config.lock()?.clone(); - let mode = config.out.mode.clone(); + let mode = config.output.mode.clone(); let play_control = PlayerControl::new(); let play_control_clone = play_control.clone(); let play_status = PlayoutStatus::new(); @@ -269,8 +287,8 @@ pub fn start(channel: ChannelManager) -> Result<(), ProcessError> { match mode { // write files/playlist to HLS m3u8 playlist - HLS => write_hls(channel, play_control, play_status), + HLS => write_hls(channel, db_pool, play_control, play_status), // play on desktop or stream to a remote target - _ => player(channel, &play_control, play_status), + _ => player(channel, db_pool, &play_control, play_status), } } diff --git a/ffplayout/src/player/filter/mod.rs b/ffplayout/src/player/filter/mod.rs index fec64061..70876b89 100644 --- a/ffplayout/src/player/filter/mod.rs +++ b/ffplayout/src/player/filter/mod.rs @@ -179,18 +179,14 @@ impl Filters { impl Default for Filters { fn default() -> Self { - Self::new(PlayoutConfig::new(None, None), 0) + Self::new(PlayoutConfig::default(), 0) } } fn deinterlace(field_order: &Option, chain: &mut Filters, config: &PlayoutConfig) { if let Some(order) = field_order { if order != "progressive" { - let deinterlace = match config - .advanced - .as_ref() - .and_then(|a| a.decoder.filters.deinterlace.clone()) - { + let deinterlace = match config.advanced.filter.deinterlace.clone() { Some(deinterlace) => deinterlace, None => "yadif=0:-1:0".to_string(), }; @@ -206,22 +202,14 @@ fn pad(aspect: f64, chain: &mut Filters, v_stream: &ffprobe::Stream, config: &Pl if let (Some(w), Some(h)) = (v_stream.width, v_stream.height) { if w > config.processing.width && aspect > config.processing.aspect { - scale = match config - .advanced - .as_ref() - .and_then(|a| a.decoder.filters.pad_scale_w.clone()) - { + scale = match config.advanced.filter.pad_scale_w.clone() { Some(pad_scale_w) => { custom_format(&format!("{pad_scale_w},"), &[&config.processing.width]) } None => format!("scale={}:-1,", config.processing.width), }; } else if h > config.processing.height && aspect < config.processing.aspect { - scale = match config - .advanced - .as_ref() - .and_then(|a| a.decoder.filters.pad_scale_h.clone()) - { + scale = match config.advanced.filter.pad_scale_h.clone() { Some(pad_scale_h) => { custom_format(&format!("{pad_scale_h},"), &[&config.processing.width]) } @@ -230,11 +218,7 @@ fn pad(aspect: f64, chain: &mut Filters, v_stream: &ffprobe::Stream, config: &Pl } } - let pad = match config - .advanced - .as_ref() - .and_then(|a| a.decoder.filters.pad_video.clone()) - { + let pad = match config.advanced.filter.pad_video.clone() { Some(pad_video) => custom_format( &format!("{scale}{pad_video}"), &[ @@ -254,11 +238,7 @@ fn pad(aspect: f64, chain: &mut Filters, v_stream: &ffprobe::Stream, config: &Pl fn fps(fps: f64, chain: &mut Filters, config: &PlayoutConfig) { if fps != config.processing.fps { - let fps_filter = match config - .advanced - .as_ref() - .and_then(|a| a.decoder.filters.fps.clone()) - { + let fps_filter = match config.advanced.filter.fps.clone() { Some(fps) => custom_format(&fps, &[&config.processing.fps]), None => format!("fps={}", config.processing.fps), }; @@ -277,11 +257,7 @@ fn scale( // width: i64, height: i64 if let (Some(w), Some(h)) = (width, height) { if w != config.processing.width || h != config.processing.height { - let scale = match config - .advanced - .as_ref() - .and_then(|a| a.decoder.filters.scale.clone()) - { + let scale = match config.advanced.filter.scale.clone() { Some(scale) => custom_format( &scale, &[&config.processing.width, &config.processing.height], @@ -298,11 +274,7 @@ fn scale( } if !is_close(aspect, config.processing.aspect, 0.03) { - let dar = match config - .advanced - .as_ref() - .and_then(|a| a.decoder.filters.set_dar.clone()) - { + let dar = match config.advanced.filter.set_dar.clone() { Some(set_dar) => custom_format(&set_dar, &[&config.processing.aspect]), None => format!("setdar=dar={}", config.processing.aspect), }; @@ -310,11 +282,7 @@ fn scale( chain.add_filter(&dar, 0, Video); } } else { - let scale = match config - .advanced - .as_ref() - .and_then(|a| a.decoder.filters.scale.clone()) - { + let scale = match config.advanced.filter.scale.clone() { Some(scale) => custom_format( &scale, &[&config.processing.width, &config.processing.height], @@ -326,11 +294,7 @@ fn scale( }; chain.add_filter(&scale, 0, Video); - let dar = match config - .advanced - .as_ref() - .and_then(|a| a.decoder.filters.set_dar.clone()) - { + let dar = match config.advanced.filter.set_dar.clone() { Some(set_dar) => custom_format(&set_dar, &[&config.processing.aspect]), None => format!("setdar=dar={}", config.processing.aspect), }; @@ -361,18 +325,10 @@ fn fade( let mut fade_in = format!("{t}fade=in:st=0:d=0.5"); if t == "a" { - if let Some(fade) = config - .advanced - .as_ref() - .and_then(|a| a.decoder.filters.afade_in.clone()) - { + if let Some(fade) = config.advanced.filter.afade_in.clone() { fade_in = custom_format(&fade, &[t]); } - } else if let Some(fade) = config - .advanced - .as_ref() - .and_then(|a| a.decoder.filters.fade_in.clone()) - { + } else if let Some(fade) = config.advanced.filter.fade_in.clone() { fade_in = custom_format(&fade, &[t]); }; @@ -383,19 +339,10 @@ fn fade( let mut fade_out = format!("{t}fade=out:st={}:d=1.0", (node.out - node.seek - 1.0)); if t == "a" { - if let Some(fade) = config - .advanced - .as_ref() - .and_then(|a| a.decoder.filters.afade_out.clone()) - { + if let Some(fade) = config.advanced.filter.afade_out.clone() { fade_out = custom_format(&fade, &[node.out - node.seek - 1.0]); } - } else if let Some(fade) = config - .advanced - .as_ref() - .and_then(|a| a.decoder.filters.fade_out.clone()) - .clone() - { + } else if let Some(fade) = config.advanced.filter.fade_out.clone().clone() { fade_out = custom_format(&fade, &[node.out - node.seek - 1.0]); }; @@ -419,11 +366,7 @@ fn overlay(node: &mut Media, chain: &mut Filters, config: &PlayoutConfig) { ); if node.last_ad { - match config - .advanced - .as_ref() - .and_then(|a| a.decoder.filters.overlay_logo_fade_in.clone()) - { + match config.advanced.filter.overlay_logo_fade_in.clone() { Some(fade_in) => logo_chain.push_str(&format!(",{fade_in}")), None => logo_chain.push_str(",fade=in:st=0:d=1.0:alpha=1"), }; @@ -432,11 +375,7 @@ fn overlay(node: &mut Media, chain: &mut Filters, config: &PlayoutConfig) { if node.next_ad { let length = node.out - node.seek - 1.0; - match config - .advanced - .as_ref() - .and_then(|a| a.decoder.filters.overlay_logo_fade_out.clone()) - { + match config.advanced.filter.overlay_logo_fade_out.clone() { Some(fade_out) => { logo_chain.push_str(&custom_format(&format!(",{fade_out}"), &[length])) } @@ -445,11 +384,7 @@ fn overlay(node: &mut Media, chain: &mut Filters, config: &PlayoutConfig) { } if !config.processing.logo_scale.is_empty() { - match &config - .advanced - .as_ref() - .and_then(|a| a.decoder.filters.overlay_logo_scale.clone()) - { + match &config.advanced.filter.overlay_logo_scale.clone() { Some(logo_scale) => logo_chain.push_str(&custom_format( &format!(",{logo_scale}"), &[&config.processing.logo_scale], @@ -458,11 +393,7 @@ fn overlay(node: &mut Media, chain: &mut Filters, config: &PlayoutConfig) { } } - match config - .advanced - .as_ref() - .and_then(|a| a.decoder.filters.overlay_logo.clone()) - { + match config.advanced.filter.overlay_logo.clone() { Some(overlay) => { if !overlay.starts_with(',') { logo_chain.push(','); @@ -494,11 +425,7 @@ fn extend_video(node: &mut Media, chain: &mut Filters, config: &PlayoutConfig) { if node.out - node.seek > video_duration - node.seek + 0.1 && node.duration >= node.out { let duration = (node.out - node.seek) - (video_duration - node.seek); - let tpad = match config - .advanced - .as_ref() - .and_then(|a| a.decoder.filters.tpad.clone()) - { + let tpad = match config.advanced.filter.tpad.clone() { Some(pad) => custom_format(&pad, &[duration]), None => format!("tpad=stop_mode=add:stop_duration={duration}"), }; @@ -516,7 +443,7 @@ fn add_text( filter_chain: &Option>>>, ) { if config.text.add_text - && (config.text.text_from_filename || config.out.mode == HLS || node.unit == Encoder) + && (config.text.text_from_filename || config.output.mode == HLS || node.unit == Encoder) { let filter = v_drawtext::filter_node(config, Some(node), filter_chain); @@ -525,11 +452,7 @@ fn add_text( } fn add_audio(node: &Media, chain: &mut Filters, nr: i32, config: &PlayoutConfig) { - let audio = match config - .advanced - .as_ref() - .and_then(|a| a.decoder.filters.aevalsrc.clone()) - { + let audio = match config.advanced.filter.aevalsrc.clone() { Some(aevalsrc) => custom_format(&aevalsrc, &[node.out - node.seek]), None => format!( "aevalsrc=0:channel_layout=stereo:duration={}:sample_rate=48000", @@ -551,11 +474,7 @@ fn extend_audio(node: &mut Media, chain: &mut Filters, nr: i32, config: &Playout { if node.out - node.seek > audio_duration - node.seek + 0.1 && node.duration >= node.out { - let apad = match config - .advanced - .as_ref() - .and_then(|a| a.decoder.filters.apad.clone()) - { + let apad = match config.advanced.filter.apad.clone() { Some(apad) => custom_format(&apad, &[node.out - node.seek]), None => format!("apad=whole_dur={}", node.out - node.seek), }; @@ -568,11 +487,7 @@ fn extend_audio(node: &mut Media, chain: &mut Filters, nr: i32, config: &Playout fn audio_volume(chain: &mut Filters, config: &PlayoutConfig, nr: i32) { if config.processing.volume != 1.0 { - let volume = match config - .advanced - .as_ref() - .and_then(|a| a.decoder.filters.volume.clone()) - { + let volume = match config.advanced.filter.volume.clone() { Some(volume) => custom_format(&volume, &[config.processing.volume]), None => format!("volume={}", config.processing.volume), }; @@ -614,11 +529,7 @@ pub fn split_filter( } } - let split = match config - .advanced - .as_ref() - .and_then(|a| a.decoder.filters.split.clone()) - { + let split = match config.advanced.filter.split.clone() { Some(split) => custom_format(&split, &[count.to_string(), out_link.join("")]), None => format!("split={count}{}", out_link.join("")), }; @@ -630,7 +541,7 @@ pub fn split_filter( /// Process output filter chain and add new filters to existing ones. fn process_output_filters(config: &PlayoutConfig, chain: &mut Filters, custom_filter: &str) { let filter = - if (config.text.add_text && !config.text.text_from_filename) || config.out.mode == HLS { + if (config.text.add_text && !config.text.text_from_filename) || config.output.mode == HLS { let re_v = Regex::new(r"\[[0:]+[v^\[]+([:0]+)?\]").unwrap(); // match video filter input link let _re_a = Regex::new(r"\[[0:]+[a^\[]+([:0]+)?\]").unwrap(); // match video filter input link let mut cf = custom_filter.to_string(); @@ -683,10 +594,10 @@ pub fn filter_chains( add_text(node, &mut filters, config, filter_chain); } - if let Some(f) = config.out.output_filter.clone() { + if let Some(f) = config.output.output_filter.clone() { process_output_filters(config, &mut filters, &f) - } else if config.out.output_count > 1 && !config.processing.audio_only { - split_filter(&mut filters, config.out.output_count, 0, Video, config); + } else if config.output.output_count > 1 && !config.processing.audio_only { + split_filter(&mut filters, config.output.output_count, 0, Video, config); } return filters; @@ -783,8 +694,8 @@ pub fn filter_chains( error!("Setting 'audio_track_index' other than '-1' is not allowed in audio copy mode!") } - if config.out.mode == HLS { - if let Some(f) = config.out.output_filter.clone() { + if config.output.mode == HLS { + if let Some(f) = config.output.output_filter.clone() { process_output_filters(config, &mut filters, &f) } } diff --git a/ffplayout/src/player/filter/v_drawtext.rs b/ffplayout/src/player/filter/v_drawtext.rs index 80a53997..58835135 100644 --- a/ffplayout/src/player/filter/v_drawtext.rs +++ b/ffplayout/src/player/filter/v_drawtext.rs @@ -48,11 +48,7 @@ pub fn filter_node( .replace('%', "\\\\\\%") .replace(':', "\\:"); - filter = match &config - .advanced - .clone() - .and_then(|a| a.decoder.filters.drawtext_from_file) - { + filter = match &config.advanced.filter.drawtext_from_file { Some(drawtext) => custom_format(drawtext, &[&escaped_text, &config.text.style, &font]), None => format!("drawtext=text='{escaped_text}':{}{font}", config.text.style), }; @@ -65,11 +61,7 @@ pub fn filter_node( } } - filter = match config - .advanced - .as_ref() - .and_then(|a| a.decoder.filters.drawtext_from_zmq.clone()) - { + filter = match config.advanced.filter.drawtext_from_zmq.clone() { Some(drawtext) => custom_format(&drawtext, &[&socket.replace(':', "\\:"), &filter_cmd]), None => format!( "zmq=b=tcp\\\\://'{}',drawtext@dyntext={filter_cmd}", diff --git a/ffplayout/src/player/input/ingest.rs b/ffplayout/src/player/input/ingest.rs index 4f17f7be..5d98459b 100644 --- a/ffplayout/src/player/input/ingest.rs +++ b/ffplayout/src/player/input/ingest.rs @@ -70,11 +70,7 @@ pub fn ingest_server( let is_terminated = channel_mgr.is_terminated.clone(); let ingest_is_running = channel_mgr.ingest_is_running.clone(); - if let Some(ingest_input_cmd) = config - .advanced - .as_ref() - .and_then(|a| a.ingest.input_cmd.clone()) - { + if let Some(ingest_input_cmd) = config.advanced.ingest.input_cmd { server_cmd.append(&mut ingest_input_cmd.clone()); } @@ -107,7 +103,7 @@ pub fn ingest_server( while !is_terminated.load(Ordering::SeqCst) { let proc_ctl = channel_mgr.clone(); - let level = config.logging.ingest_level.clone().unwrap(); + let level = config.logging.ingest_level.clone(); let ignore = config.logging.ignore_lines.clone(); let mut server_proc = match Command::new("ffmpeg") .args(server_cmd.clone()) diff --git a/ffplayout/src/player/input/mod.rs b/ffplayout/src/player/input/mod.rs index ba87b075..30c99833 100644 --- a/ffplayout/src/player/input/mod.rs +++ b/ffplayout/src/player/input/mod.rs @@ -4,6 +4,7 @@ use std::{ }; use simplelog::*; +use sqlx::{Pool, Sqlite}; pub mod folder; pub mod ingest; @@ -22,6 +23,7 @@ use crate::utils::config::{PlayoutConfig, ProcessMode::*}; /// Create a source iterator from playlist, or from folder. pub fn source_generator( config: PlayoutConfig, + db_pool: Pool, player_control: &PlayerControl, playout_stat: PlayoutStatus, is_terminated: Arc, @@ -45,7 +47,13 @@ pub fn source_generator( } Playlist => { info!("Playout in playlist mode"); - let program = CurrentProgram::new(&config, playout_stat, is_terminated, player_control); + let program = CurrentProgram::new( + &config, + db_pool, + playout_stat, + is_terminated, + player_control, + ); Box::new(program) as Box> } diff --git a/ffplayout/src/player/input/playlist.rs b/ffplayout/src/player/input/playlist.rs index b2e07269..9dd05276 100644 --- a/ffplayout/src/player/input/playlist.rs +++ b/ffplayout/src/player/input/playlist.rs @@ -1,5 +1,4 @@ use std::{ - fs, path::Path, sync::{ atomic::{AtomicBool, Ordering}, @@ -7,9 +6,11 @@ use std::{ }, }; -use serde_json::json; +use futures::executor; use simplelog::*; +use sqlx::{Pool, Sqlite}; +use crate::db::handles; use crate::player::{ controller::{PlayerControl, PlayoutStatus}, utils::{ @@ -27,6 +28,7 @@ use crate::utils::config::{PlayoutConfig, IMAGE_FORMAT}; #[derive(Debug)] pub struct CurrentProgram { config: PlayoutConfig, + db_pool: Pool, start_sec: f64, end_sec: f64, json_playlist: JsonPlaylist, @@ -42,12 +44,14 @@ pub struct CurrentProgram { impl CurrentProgram { pub fn new( config: &PlayoutConfig, + db_pool: Pool, playout_stat: PlayoutStatus, is_terminated: Arc, player_control: &PlayerControl, ) -> Self { Self { config: config.clone(), + db_pool, start_sec: config.playlist.start_sec.unwrap(), end_sec: config.playlist.length_sec.unwrap(), json_playlist: JsonPlaylist::new( @@ -205,15 +209,13 @@ impl CurrentProgram { .clone_from(&date); *self.playout_stat.time_shift.lock().unwrap() = 0.0; - if let Err(e) = fs::write( - &self.config.general.stat_file, - serde_json::to_string(&json!({ - "time_shift": 0.0, - "date": date, - })) - .unwrap(), - ) { - error!("Unable to write status file: {e}"); + if let Err(e) = executor::block_on(handles::update_stat( + &self.db_pool, + self.config.general.channel_id, + date, + 0.0, + )) { + error!("Unable to write status: {e}"); }; } diff --git a/ffplayout/src/player/output/desktop.rs b/ffplayout/src/player/output/desktop.rs index 88333a41..eb370cdd 100644 --- a/ffplayout/src/player/output/desktop.rs +++ b/ffplayout/src/player/output/desktop.rs @@ -14,11 +14,7 @@ pub fn output(config: &PlayoutConfig, log_format: &str) -> process::Child { let mut enc_cmd = vec_strings!["-hide_banner", "-nostats", "-v", log_format]; - if let Some(encoder_input_cmd) = config - .advanced - .as_ref() - .and_then(|a| a.encoder.input_cmd.clone()) - { + if let Some(encoder_input_cmd) = &config.advanced.encoder.input_cmd { enc_cmd.append(&mut encoder_input_cmd.clone()); } @@ -30,7 +26,7 @@ pub fn output(config: &PlayoutConfig, log_format: &str) -> process::Child { "ffplayout" ]); - if let Some(mut cmd) = config.out.output_cmd.clone() { + if let Some(mut cmd) = config.output.output_cmd.clone() { if !cmd.iter().any(|i| { [ "-c", diff --git a/ffplayout/src/player/output/hls.rs b/ffplayout/src/player/output/hls.rs index 7997f0fc..8f7523f8 100644 --- a/ffplayout/src/player/output/hls.rs +++ b/ffplayout/src/player/output/hls.rs @@ -26,6 +26,7 @@ use std::{ }; use log::*; +use sqlx::{Pool, Sqlite}; use crate::utils::{config::PlayoutConfig, logging::log_line, task_runner}; use crate::vec_strings; @@ -57,11 +58,7 @@ fn ingest_to_hls_server( let is_terminated = channel_mgr.is_terminated.clone(); let ingest_is_running = channel_mgr.ingest_is_running.clone(); - if let Some(ingest_input_cmd) = config - .advanced - .as_ref() - .and_then(|a| a.ingest.input_cmd.clone()) - { + if let Some(ingest_input_cmd) = &config.advanced.ingest.input_cmd { server_prefix.append(&mut ingest_input_cmd.clone()); } @@ -151,6 +148,7 @@ fn ingest_to_hls_server( /// Write with single ffmpeg instance directly to a HLS playlist. pub fn write_hls( channel_mgr: ChannelManager, + db_pool: Pool, player_control: PlayerControl, playout_stat: PlayoutStatus, ) -> Result<(), ProcessError> { @@ -158,13 +156,13 @@ pub fn write_hls( let config_clone = config.clone(); let ff_log_format = format!("level+{}", config.logging.ffmpeg_level.to_lowercase()); let play_stat = playout_stat.clone(); - let play_stat2 = playout_stat.clone(); - let channel_mgr_c = channel_mgr.clone(); + let channel_mgr_2 = channel_mgr.clone(); let is_terminated = channel_mgr.is_terminated.clone(); let ingest_is_running = channel_mgr.ingest_is_running.clone(); let get_source = source_generator( config.clone(), + db_pool, &player_control, playout_stat, is_terminated.clone(), @@ -172,7 +170,7 @@ pub fn write_hls( // spawn a thread for ffmpeg ingest server and create a channel for package sending if config.ingest.enable { - thread::spawn(move || ingest_to_hls_server(config_clone, play_stat, channel_mgr_c)); + thread::spawn(move || ingest_to_hls_server(config_clone, play_stat, channel_mgr_2)); } for node in get_source { @@ -196,14 +194,9 @@ pub fn write_hls( if config.task.enable { if config.task.path.is_file() { - let task_config = config.clone(); - let task_node = node.clone(); - let server_running = ingest_is_running.load(Ordering::SeqCst); - let stat = play_stat2.clone(); + let channel_mgr_3 = channel_mgr.clone(); - thread::spawn(move || { - task_runner::run(task_config, task_node, stat, server_running) - }); + thread::spawn(move || task_runner::run(channel_mgr_3)); } else { error!( "{:?} executable not exists!", @@ -214,11 +207,7 @@ pub fn write_hls( let mut enc_prefix = vec_strings!["-hide_banner", "-nostats", "-v", &ff_log_format]; - if let Some(encoder_input_cmd) = config - .advanced - .as_ref() - .and_then(|a| a.encoder.input_cmd.clone()) - { + if let Some(encoder_input_cmd) = &config.advanced.encoder.input_cmd { enc_prefix.append(&mut encoder_input_cmd.clone()); } diff --git a/ffplayout/src/player/output/mod.rs b/ffplayout/src/player/output/mod.rs index fe83697e..a35c11a6 100644 --- a/ffplayout/src/player/output/mod.rs +++ b/ffplayout/src/player/output/mod.rs @@ -7,7 +7,8 @@ use std::{ }; use crossbeam_channel::bounded; -use simplelog::*; +use log::*; +use sqlx::{Pool, Sqlite}; mod desktop; mod hls; @@ -35,6 +36,7 @@ use crate::vec_strings; /// When ingest stops, it switch back to playlist/folder mode. pub fn player( channel_mgr: ChannelManager, + db_pool: Pool, play_control: &PlayerControl, playout_stat: PlayoutStatus, ) -> Result<(), ProcessError> { @@ -45,7 +47,6 @@ pub fn player( let mut buffer = [0; 65088]; let mut live_on = false; let playlist_init = playout_stat.list_init.clone(); - let play_stat = playout_stat.clone(); let is_terminated = channel_mgr.is_terminated.clone(); let ingest_is_running = channel_mgr.ingest_is_running.clone(); @@ -53,13 +54,14 @@ pub fn player( // get source iterator let node_sources = source_generator( config.clone(), + db_pool, play_control, playout_stat, is_terminated.clone(), ); // get ffmpeg output instance - let mut enc_proc = match config.out.mode { + let mut enc_proc = match config.output.mode { Desktop => desktop::output(&config, &ff_log_format), Null => null::output(&config, &ff_log_format), Stream => stream::output(&config, &ff_log_format), @@ -76,14 +78,14 @@ pub fn player( let error_encoder_thread = thread::spawn(move || stderr_reader(enc_err, ignore_enc, Encoder, enc_p_ctl)); - let channel_mgr_c = channel_mgr.clone(); + let channel_mgr_2 = channel_mgr.clone(); let mut ingest_receiver = None; // spawn a thread for ffmpeg ingest server and create a channel for package sending if config.ingest.enable { let (ingest_sender, rx) = bounded(96); ingest_receiver = Some(rx); - thread::spawn(move || ingest_server(config_clone, ingest_sender, channel_mgr_c)); + thread::spawn(move || ingest_server(config_clone, ingest_sender, channel_mgr_2)); } 'source_iter: for node in node_sources { @@ -128,14 +130,9 @@ pub fn player( if config.task.enable { if config.task.path.is_file() { - let task_config = config.clone(); - let task_node = node.clone(); - let server_running = ingest_is_running.load(Ordering::SeqCst); - let stat = play_stat.clone(); + let channel_mgr_3 = channel_mgr.clone(); - thread::spawn(move || { - task_runner::run(task_config, task_node, stat, server_running) - }); + thread::spawn(move || task_runner::run(channel_mgr_3)); } else { error!( "{:?} executable not exists!", @@ -146,11 +143,7 @@ pub fn player( let mut dec_cmd = vec_strings!["-hide_banner", "-nostats", "-v", &ff_log_format]; - if let Some(decoder_input_cmd) = config - .advanced - .as_ref() - .and_then(|a| a.decoder.input_cmd.clone()) - { + if let Some(decoder_input_cmd) = &config.advanced.decoder.input_cmd { dec_cmd.append(&mut decoder_input_cmd.clone()); } diff --git a/ffplayout/src/player/output/null.rs b/ffplayout/src/player/output/null.rs index 68d4f903..28c3631e 100644 --- a/ffplayout/src/player/output/null.rs +++ b/ffplayout/src/player/output/null.rs @@ -19,11 +19,7 @@ pub fn output(config: &PlayoutConfig, log_format: &str) -> process::Child { let mut enc_prefix = vec_strings!["-hide_banner", "-nostats", "-v", log_format]; - if let Some(input_cmd) = config - .advanced - .as_ref() - .and_then(|a| a.encoder.input_cmd.clone()) - { + if let Some(input_cmd) = &config.advanced.encoder.input_cmd { enc_prefix.append(&mut input_cmd.clone()); } diff --git a/ffplayout/src/player/output/stream.rs b/ffplayout/src/player/output/stream.rs index 28ddfaaa..da6f016e 100644 --- a/ffplayout/src/player/output/stream.rs +++ b/ffplayout/src/player/output/stream.rs @@ -19,11 +19,7 @@ pub fn output(config: &PlayoutConfig, log_format: &str) -> process::Child { let mut enc_prefix = vec_strings!["-hide_banner", "-nostats", "-v", log_format]; - if let Some(input_cmd) = config - .advanced - .as_ref() - .and_then(|a| a.encoder.input_cmd.clone()) - { + if let Some(input_cmd) = &config.advanced.encoder.input_cmd { enc_prefix.append(&mut input_cmd.clone()); } diff --git a/ffplayout/src/player/utils/json_validate.rs b/ffplayout/src/player/utils/json_validate.rs index 41bd9f44..7c3ddaf2 100644 --- a/ffplayout/src/player/utils/json_validate.rs +++ b/ffplayout/src/player/utils/json_validate.rs @@ -37,15 +37,11 @@ fn check_media( let mut dec_cmd = vec_strings!["-hide_banner", "-nostats", "-v", "level+info"]; let mut error_list = vec![]; let mut config = config.clone(); - config.out.mode = Null; + config.output.mode = Null; let mut process_length = 0.1; - if let Some(decoder_input_cmd) = config - .advanced - .as_ref() - .and_then(|a| a.decoder.input_cmd.clone()) - { + if let Some(decoder_input_cmd) = &config.advanced.decoder.input_cmd { dec_cmd.append(&mut decoder_input_cmd.clone()); } diff --git a/ffplayout/src/player/utils/mod.rs b/ffplayout/src/player/utils/mod.rs index 694bf717..64baf44d 100644 --- a/ffplayout/src/player/utils/mod.rs +++ b/ffplayout/src/player/utils/mod.rs @@ -1,13 +1,13 @@ use std::{ ffi::OsStr, fmt, - fs::{self, metadata, File}, + fs::{metadata, File}, io::{BufRead, BufReader, Error}, net::TcpListener, path::{Path, PathBuf}, process::{exit, ChildStderr, Command, Stdio}, str::FromStr, - sync::{Arc, Mutex}, + sync::{atomic::Ordering, Arc, Mutex}, }; use chrono::{prelude::*, TimeDelta}; @@ -26,7 +26,7 @@ pub mod json_validate; use crate::player::{ controller::{ - ChannelManager, PlayoutStatus, + ChannelManager, ProcessUnit::{self, *}, }, filter::{filter_chains, Filters}, @@ -62,7 +62,7 @@ pub fn prepare_output_cmd( mut cmd: Vec, filters: &Option, ) -> Vec { - let mut output_params = config.out.clone().output_cmd.unwrap(); + let mut output_params = config.output.clone().output_cmd.unwrap(); let mut new_params = vec![]; let mut count = 0; let re_v = Regex::new(r"\[?0:v(:0)?\]?").unwrap(); @@ -143,21 +143,26 @@ pub fn get_media_map(media: Media) -> Value { } /// prepare json object for response -pub fn get_data_map( - config: &PlayoutConfig, - media: Media, - playout_stat: &PlayoutStatus, - server_is_running: bool, -) -> Map { +pub fn get_data_map(manager: &ChannelManager) -> Map { + let media = manager + .current_media + .lock() + .unwrap() + .clone() + .unwrap_or(Media::new(0, "", false)); + let channel = manager.channel.lock().unwrap().clone(); + let config = manager.config.lock().unwrap().processing.clone(); + let ingest_is_running = manager.ingest_is_running.load(Ordering::SeqCst); + let mut data_map = Map::new(); let current_time = time_in_seconds(); - let shift = *playout_stat.time_shift.lock().unwrap(); + let shift = channel.time_shift; let begin = media.begin.unwrap_or(0.0) - shift; let played_time = current_time - begin; data_map.insert("index".to_string(), json!(media.index)); - data_map.insert("ingest".to_string(), json!(server_is_running)); - data_map.insert("mode".to_string(), json!(config.processing.mode)); + data_map.insert("ingest".to_string(), json!(ingest_is_running)); + data_map.insert("mode".to_string(), json!(config.mode)); data_map.insert( "shift".to_string(), json!((shift * 1000.0).round() / 1000.0), @@ -438,34 +443,6 @@ pub fn json_writer(path: &PathBuf, data: JsonPlaylist) -> Result<(), Error> { Ok(()) } -/// Write current status to status file in temp folder. -/// -/// The status file is init in main function and mostly modified in RPC server. -pub fn write_status(config: &PlayoutConfig, date: &str, shift: f64) { - let data = json!({ - "time_shift": shift, - "date": date, - }); - - match serde_json::to_string(&data) { - Ok(status) => { - if let Err(e) = fs::write(&config.general.stat_file, status) { - error!( - "Unable to write to status file {}: {e}", - config.general.stat_file - ) - }; - } - Err(e) => error!("Serialize status data failed: {e}"), - }; -} - -// pub fn get_timestamp() -> i32 { -// let local: DateTime = time_now(); - -// local.timestamp_millis() as i32 -// } - /// Get current time in seconds. pub fn time_in_seconds() -> f64 { let local: DateTime = time_now(); @@ -758,9 +735,9 @@ pub fn include_file_extension(config: &PlayoutConfig, file_path: &Path) -> bool } } - if config.out.mode == HLS { + if config.output.mode == HLS { if let Some(ts_path) = config - .out + .output .output_cmd .clone() .unwrap_or_else(|| vec![String::new()]) @@ -775,7 +752,7 @@ pub fn include_file_extension(config: &PlayoutConfig, file_path: &Path) -> bool } if let Some(m3u8_path) = config - .out + .output .output_cmd .clone() .unwrap_or_else(|| vec![String::new()]) @@ -932,14 +909,14 @@ pub fn validate_ffmpeg(config: &mut PlayoutConfig) -> Result<(), String> { is_in_system("ffmpeg")?; is_in_system("ffprobe")?; - if config.out.mode == Desktop { + if config.output.mode == Desktop { is_in_system("ffplay")?; } ffmpeg_filter_and_libs(config)?; if config - .out + .output .output_cmd .as_ref() .unwrap() @@ -960,7 +937,7 @@ pub fn validate_ffmpeg(config: &mut PlayoutConfig) -> Result<(), String> { } if config - .out + .output .output_cmd .as_ref() .unwrap() diff --git a/ffplayout/src/sse/broadcast.rs b/ffplayout/src/sse/broadcast.rs index b0cb58c3..88b4984b 100644 --- a/ffplayout/src/sse/broadcast.rs +++ b/ffplayout/src/sse/broadcast.rs @@ -1,4 +1,7 @@ -use std::{sync::Arc, time::Duration}; +use std::{ + sync::{atomic::Ordering, Arc}, + time::Duration, +}; use actix_web::{rt::time::interval, web}; use actix_web_lab::{ @@ -10,26 +13,20 @@ use parking_lot::Mutex; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; -use crate::utils::{config::PlayoutConfig, control::media_info, system}; +use crate::player::{controller::ChannelManager, utils::get_data_map}; +use crate::utils::system; #[derive(Debug, Clone)] struct Client { - _channel: i32, - config: PlayoutConfig, + manager: ChannelManager, endpoint: String, sender: mpsc::Sender, } impl Client { - fn new( - _channel: i32, - config: PlayoutConfig, - endpoint: String, - sender: mpsc::Sender, - ) -> Self { + fn new(manager: ChannelManager, endpoint: String, sender: mpsc::Sender) -> Self { Self { - _channel, - config, + manager, endpoint, sender, } @@ -102,8 +99,7 @@ impl Broadcaster { /// Registers client with broadcaster, returning an SSE response body. pub async fn new_client( &self, - channel: i32, - config: PlayoutConfig, + manager: ChannelManager, endpoint: String, ) -> Sse>> { let (tx, rx) = mpsc::channel(10); @@ -113,7 +109,7 @@ impl Broadcaster { self.inner .lock() .clients - .push(Client::new(channel, config, endpoint, tx)); + .push(Client::new(manager, endpoint, tx)); Sse::from_infallible_receiver(rx) } @@ -123,23 +119,22 @@ impl Broadcaster { let clients = self.inner.lock().clients.clone(); for client in clients.iter().filter(|client| client.endpoint == "playout") { - match media_info(&client.config, "current".into()).await { - Ok(res) => { - let _ = client - .sender - .send( - sse::Data::new(res.text().await.unwrap_or_else(|_| "Success".into())) - .into(), - ) - .await; - } - Err(_) => { - let _ = client - .sender - .send(sse::Data::new("not running").into()) - .await; - } - }; + let media_map = get_data_map(&client.manager); + + if client.manager.is_alive.load(Ordering::SeqCst) { + let _ = client + .sender + .send( + sse::Data::new(serde_json::to_string(&media_map).unwrap_or_default()) + .into(), + ) + .await; + } else { + let _ = client + .sender + .send(sse::Data::new("not running").into()) + .await; + } } } @@ -149,7 +144,8 @@ impl Broadcaster { for client in clients { if &client.endpoint == "system" { - if let Ok(stat) = web::block(move || system::stat(client.config.clone())).await { + let config = client.manager.config.lock().unwrap().clone(); + if let Ok(stat) = web::block(move || system::stat(config.clone())).await { let stat_string = stat.to_string(); let _ = client.sender.send(sse::Data::new(stat_string).into()).await; }; diff --git a/ffplayout/src/sse/routes.rs b/ffplayout/src/sse/routes.rs index a33bf02b..406080c5 100644 --- a/ffplayout/src/sse/routes.rs +++ b/ffplayout/src/sse/routes.rs @@ -1,11 +1,13 @@ +use std::sync::Mutex; + use actix_web::{get, post, web, Responder}; use actix_web_grants::proc_macro::protect; use serde::{Deserialize, Serialize}; -use sqlx::{Pool, Sqlite}; use super::{check_uuid, prune_uuids, AuthState, UuidData}; +use crate::player::controller::ChannelController; use crate::sse::broadcast::Broadcaster; -use crate::utils::{errors::ServiceError, playout_config, Role}; +use crate::utils::{errors::ServiceError, Role}; #[derive(Deserialize, Serialize)] struct User { @@ -62,21 +64,21 @@ async fn validate_uuid( /// ```BASH /// curl -X GET 'http://127.0.0.1:8787/data/event/1?endpoint=system&uuid=f2f8c29b-712a-48c5-8919-b535d3a05a3a' /// ``` -#[get("/event/{channel}")] +#[get("/event/{id}")] async fn event_stream( - pool: web::Data>, broadcaster: web::Data, data: web::Data, id: web::Path, user: web::Query, + controllers: web::Data>, ) -> Result { let mut uuids = data.uuids.lock().await; check_uuid(&mut uuids, user.uuid.as_str())?; - let (config, _) = playout_config(&pool.clone().into_inner(), &id).await?; + let manager = controllers.lock().unwrap().get(*id).unwrap(); Ok(broadcaster - .new_client(*id, config, user.endpoint.clone()) + .new_client(manager.clone(), user.endpoint.clone()) .await) } diff --git a/ffplayout/src/utils/advanced_config.rs b/ffplayout/src/utils/advanced_config.rs index fd5833cd..3bc2fe6b 100644 --- a/ffplayout/src/utils/advanced_config.rs +++ b/ffplayout/src/utils/advanced_config.rs @@ -1,12 +1,13 @@ -use std::{fs::File, io::Read, path::PathBuf}; - use serde::{Deserialize, Serialize}; use shlex::split; +use crate::db::models::AdvancedConfiguration; + #[derive(Debug, Default, Serialize, Deserialize, Clone)] pub struct AdvancedConfig { pub decoder: DecoderConfig, pub encoder: EncoderConfig, + pub filter: FilterConfig, pub ingest: IngestConfig, } @@ -14,29 +15,24 @@ pub struct AdvancedConfig { pub struct DecoderConfig { pub input_param: Option, pub output_param: Option, - pub filters: Filters, - #[serde(skip_serializing, skip_deserializing)] pub input_cmd: Option>, - #[serde(skip_serializing, skip_deserializing)] pub output_cmd: Option>, } #[derive(Debug, Default, Serialize, Deserialize, Clone)] pub struct EncoderConfig { pub input_param: Option, - #[serde(skip_serializing, skip_deserializing)] pub input_cmd: Option>, } #[derive(Debug, Default, Serialize, Deserialize, Clone)] pub struct IngestConfig { pub input_param: Option, - #[serde(skip_serializing, skip_deserializing)] pub input_cmd: Option>, } #[derive(Debug, Default, Serialize, Deserialize, Clone)] -pub struct Filters { +pub struct FilterConfig { pub deinterlace: Option, pub pad_scale_w: Option, pub pad_scale_h: Option, @@ -62,37 +58,58 @@ pub struct Filters { } impl AdvancedConfig { - pub fn new(cfg_path: PathBuf) -> Self { - let mut config: AdvancedConfig = Default::default(); - - if let Ok(mut file) = File::open(cfg_path) { - let mut contents = String::new(); - - if let Err(e) = file.read_to_string(&mut contents) { - eprintln!("Read advanced config file: {e}") - }; - - if let Ok(tm) = toml_edit::de::from_str(&contents) { - config = tm - }; - - if let Some(input_parm) = &config.decoder.input_param { - config.decoder.input_cmd = split(input_parm); - } - - if let Some(output_param) = &config.decoder.output_param { - config.decoder.output_cmd = split(output_param); - } - - if let Some(input_param) = &config.encoder.input_param { - config.encoder.input_cmd = split(input_param); - } - - if let Some(input_param) = &config.ingest.input_param { - config.ingest.input_cmd = split(input_param); - } - }; - - config + pub fn new(config: AdvancedConfiguration) -> Self { + Self { + decoder: DecoderConfig { + input_param: config.decoder_input_param.clone(), + output_param: config.decoder_output_param.clone(), + input_cmd: match config.decoder_input_param { + Some(input_param) => split(&input_param), + None => None, + }, + output_cmd: match config.decoder_output_param { + Some(output_param) => split(&output_param), + None => None, + }, + }, + encoder: EncoderConfig { + input_param: config.encoder_input_param.clone(), + input_cmd: match config.encoder_input_param { + Some(input_param) => split(&input_param), + None => None, + }, + }, + filter: FilterConfig { + deinterlace: config.deinterlace, + pad_scale_w: config.pad_scale_w, + pad_scale_h: config.pad_scale_h, + pad_video: config.pad_video, + fps: config.fps, + scale: config.scale, + set_dar: config.set_dar, + fade_in: config.fade_in, + fade_out: config.fade_out, + overlay_logo_scale: config.overlay_logo_scale, + overlay_logo_fade_in: config.overlay_logo_fade_in, + overlay_logo_fade_out: config.overlay_logo_fade_out, + overlay_logo: config.overlay_logo, + tpad: config.tpad, + drawtext_from_file: config.drawtext_from_file, + drawtext_from_zmq: config.drawtext_from_zmq, + aevalsrc: config.aevalsrc, + afade_in: config.afade_in, + afade_out: config.afade_out, + apad: config.apad, + volume: config.volume, + split: config.split, + }, + ingest: IngestConfig { + input_param: config.ingest_input_param.clone(), + input_cmd: match config.ingest_input_param { + Some(input_param) => split(&input_param), + None => None, + }, + }, + } } } diff --git a/ffplayout/src/utils/channels.rs b/ffplayout/src/utils/channels.rs index 30542566..ca61353f 100644 --- a/ffplayout/src/utils/channels.rs +++ b/ffplayout/src/utils/channels.rs @@ -1,4 +1,4 @@ -use std::{fs, path::PathBuf}; +use std::fs; use rand::prelude::*; use simplelog::*; @@ -11,34 +11,22 @@ pub async fn create_channel( conn: &Pool, target_channel: Channel, ) -> Result { - if !target_channel.config_path.starts_with("/etc/ffplayout") { - return Err(ServiceError::BadRequest("Bad config path!".to_string())); - } - let channel_name = target_channel.name.to_lowercase().replace(' ', ""); let channel_num = match handles::select_last_channel(conn).await { Ok(num) => num + 1, Err(_) => rand::thread_rng().gen_range(71..99), }; - let mut config = PlayoutConfig::new( - Some(PathBuf::from("/usr/share/ffplayout/ffplayout.toml.orig")), - None, - ); + let mut config = PlayoutConfig::new(conn, channel_num).await; - config.general.stat_file = format!(".ffp_{channel_name}",); - config.rpc_server.address = format!("127.0.0.1:70{:7>2}", channel_num); config.playlist.path = config.playlist.path.join(channel_name); - config.out.output_param = config - .out + config.output.output_param = config + .output .output_param .replace("stream.m3u8", &format!("stream{channel_num}.m3u8")) .replace("stream-%d.ts", &format!("stream{channel_num}-%d.ts")); - let toml_string = toml_edit::ser::to_string(&config)?; - fs::write(&target_channel.config_path, toml_string)?; - let new_channel = handles::insert_channel(conn, target_channel).await?; // TODO: Create Channel controller @@ -46,15 +34,11 @@ pub async fn create_channel( } pub async fn delete_channel(conn: &Pool, id: i32) -> Result<(), ServiceError> { - let channel = handles::select_channel(conn, &id).await?; + let _channel = handles::select_channel(conn, &id).await?; let (_config, _) = playout_config(conn, &id).await?; // TODO: Remove Channel controller - if let Err(e) = fs::remove_file(channel.config_path) { - error!("{e}"); - }; - handles::delete_channel(conn, &id).await?; Ok(()) diff --git a/ffplayout/src/utils/config.rs b/ffplayout/src/utils/config.rs index 76d90fdc..88359d87 100644 --- a/ffplayout/src/utils/config.rs +++ b/ffplayout/src/utils/config.rs @@ -1,21 +1,19 @@ use std::{ - env, fmt, - fs::File, - io::Read, + fmt, path::{Path, PathBuf}, - process, str::FromStr, }; use chrono::NaiveTime; use flexi_logger::Level; -use serde::{de, Deserialize, Deserializer, Serialize, Serializer}; +use serde::{Deserialize, Serialize}; use shlex::split; +use sqlx::{Pool, Sqlite}; -use crate::AdvancedConfig; - +use crate::db::{handles, models::Configuration}; use crate::utils::{free_tcp_socket, time_to_sec}; use crate::vec_strings; +use crate::AdvancedConfig; pub const DUMMY_LEN: f64 = 60.0; pub const IMAGE_FORMAT: [&str; 21] = [ @@ -46,8 +44,7 @@ pub const FFMPEG_UNRECOVERABLE_ERRORS: [&str; 5] = [ "Error while decoding stream #0:0: Invalid data found when processing input", ]; -#[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq)] -#[serde(rename_all = "lowercase")] +#[derive(Debug, Clone, Eq, PartialEq, Deserialize, Serialize)] pub enum OutputMode { Desktop, HLS, @@ -55,6 +52,23 @@ pub enum OutputMode { Stream, } +impl OutputMode { + fn new(s: &str) -> Self { + match s { + "desktop" => Self::Desktop, + "null" => Self::Null, + "stream" => Self::Stream, + _ => Self::HLS, + } + } +} + +impl Default for OutputMode { + fn default() -> Self { + Self::HLS + } +} + impl FromStr for OutputMode { type Err = String; @@ -69,13 +83,28 @@ impl FromStr for OutputMode { } } -#[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq)] +#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)] #[serde(rename_all = "lowercase")] pub enum ProcessMode { Folder, Playlist, } +impl ProcessMode { + fn new(s: &str) -> Self { + match s { + "folder" => Self::Folder, + _ => Self::Playlist, + } + } +} + +impl Default for ProcessMode { + fn default() -> Self { + ProcessMode::Playlist + } +} + impl fmt::Display for ProcessMode { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match *self { @@ -97,12 +126,12 @@ impl FromStr for ProcessMode { } } -#[derive(Debug, Serialize, Deserialize, Clone)] +#[derive(Clone, Debug, Default, Deserialize, Serialize)] pub struct Template { pub sources: Vec, } -#[derive(Debug, Serialize, Deserialize, Clone)] +#[derive(Clone, Debug, Default, Deserialize, Serialize)] pub struct Source { pub start: NaiveTime, pub duration: NaiveTime, @@ -113,12 +142,10 @@ pub struct Source { /// Global Config /// /// This we init ones, when ffplayout is starting and use them globally in the hole program. -#[derive(Debug, Serialize, Deserialize, Clone)] +#[derive(Debug, Default, Clone, Deserialize, Serialize)] pub struct PlayoutConfig { - #[serde(default, skip_serializing, skip_deserializing)] - pub advanced: Option, + pub advanced: AdvancedConfig, pub general: General, - pub rpc_server: RpcServer, pub mail: Mail, pub logging: Logging, pub processing: Processing, @@ -126,50 +153,40 @@ pub struct PlayoutConfig { pub playlist: Playlist, pub storage: Storage, pub text: Text, - #[serde(default)] pub task: Task, - pub out: Out, + pub output: Output, } -#[derive(Debug, Serialize, Deserialize, Clone)] +#[derive(Debug, Default, Clone, Deserialize, Serialize)] pub struct General { pub help_text: String, + pub channel_id: i32, pub stop_threshold: f64, - - #[serde(default, skip_serializing, skip_deserializing)] - pub config_path: String, - - #[serde(default)] - pub stat_file: String, - - #[serde(skip_serializing, skip_deserializing)] pub generate: Option>, - - #[serde(skip_serializing, skip_deserializing)] pub ffmpeg_filters: Vec, - - #[serde(skip_serializing, skip_deserializing)] pub ffmpeg_libs: Vec, - - #[serde(skip_serializing, skip_deserializing)] pub template: Option