From 8a68c9bd864866a9d6bda1c9d438393e1a455c24 Mon Sep 17 00:00:00 2001 From: jb-alvarado Date: Tue, 5 Apr 2022 17:07:34 +0200 Subject: [PATCH] work on json rpc server --- Cargo.lock | 378 +++++++++++++++++++++++++++++++++++++++- Cargo.toml | 3 +- assets/ffplayout.yml | 27 +-- src/filter/mod.rs | 16 +- src/input/folder.rs | 17 -- src/input/ingest.rs | 20 +-- src/input/playlist.rs | 31 ++-- src/main.rs | 26 +-- src/output/mod.rs | 85 ++------- src/utils/config.rs | 9 + src/utils/logging.rs | 54 +++--- src/utils/mod.rs | 84 ++++++++- src/utils/rpc_server.rs | 74 ++++++++ 13 files changed, 648 insertions(+), 176 deletions(-) create mode 100644 src/utils/rpc_server.rs diff --git a/Cargo.lock b/Cargo.lock index a3f24df9..74ad5dfa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -46,6 +46,21 @@ version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" +[[package]] +name = "bstr" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba3569f383e8f1598449f1a423e72e99569137b47740b1da11ef19af3d5c3223" +dependencies = [ + "memchr", +] + +[[package]] +name = "bytes" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8" + [[package]] name = "cc" version = "1.0.73" @@ -152,12 +167,13 @@ dependencies = [ [[package]] name = "ffplayout-rs" -version = "0.8.1" +version = "0.9.0" dependencies = [ "chrono", "clap", "ffprobe", "file-rotate", + "jsonrpc-http-server", "lettre", "log", "notify", @@ -219,6 +235,12 @@ dependencies = [ "miniz_oxide", ] +[[package]] +name = "fnv" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" + [[package]] name = "foreign-types" version = "0.3.2" @@ -269,18 +291,71 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7" +[[package]] +name = "futures" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f73fe65f54d1e12b726f517d3e2135ca3125a437b6d998caf1962961f7172d9e" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3083ce4b914124575708913bca19bfe887522d6e2e6d0952943f5eac4a74010" +dependencies = [ + "futures-core", + "futures-sink", +] + [[package]] name = "futures-core" version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c09fd04b7e4073ac7156a9539b57a484a8ea920f79c7c675d05d289ab6110d3" +[[package]] +name = "futures-executor" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9420b90cfa29e327d0429f19be13e7ddb68fa1cccb09d65e5706b8c7a749b8a6" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + [[package]] name = "futures-io" version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc4045962a5a5e935ee2fdedaa4e08284547402885ab326734432bed5d12966b" +[[package]] +name = "futures-macro" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33c1e13800337f4d4d7a316bf45a567dbcb6ffe087f16424852d97e97a91f512" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "futures-sink" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21163e139fa306126e6eedaf49ecdb4588f939600f0b1e770f4205ee4b7fa868" + [[package]] name = "futures-task" version = "0.3.21" @@ -293,8 +368,11 @@ version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d8b7abd5d659d9b90c8cba917f6ec750a74e2dc23902ef9cd4cc8c8b22e6036a" dependencies = [ + "futures-channel", "futures-core", "futures-io", + "futures-macro", + "futures-sink", "futures-task", "memchr", "pin-project-lite", @@ -310,7 +388,20 @@ checksum = "9be70c98951c83b8d2f8f60d7065fa6d5146873094452a1008da8c2f1e4205ad" dependencies = [ "cfg-if 1.0.0", "libc", - "wasi", + "wasi 0.10.2+wasi-snapshot-preview1", +] + +[[package]] +name = "globset" +version = "0.4.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10463d9ff00a2a068db14231982f5132edebad0d7660cd956a1c30292dbcbfbd" +dependencies = [ + "aho-corasick", + "bstr", + "fnv", + "log", + "regex", ] [[package]] @@ -345,12 +436,63 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "http" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31f4c6746584866f0feabcc69893c5b51beef3831656a968ed7ae254cdc4fd03" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + +[[package]] +name = "http-body" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ff4f84919677303da5f147645dbea6b1881f368d03ac84e1dc09031ebd7b2c6" +dependencies = [ + "bytes", + "http", + "pin-project-lite", +] + +[[package]] +name = "httparse" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9100414882e15fb7feccb4897e5f0ff0ff1ca7d1a86a23208ada4d7a18e6c6c4" + [[package]] name = "httpdate" version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421" +[[package]] +name = "hyper" +version = "0.14.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b26ae0a80afebe130861d90abf98e3814a4f28a4c6ffeb5ab8ebb2be311e0ef2" +dependencies = [ + "bytes", + "futures-channel", + "futures-core", + "futures-util", + "http", + "http-body", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "socket2", + "tokio", + "tower-service", + "tracing", + "want", +] + [[package]] name = "idna" version = "0.2.3" @@ -416,6 +558,55 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1aab8fc367588b89dcee83ab0fd66b72b50b72fa1904d7095045ace2b0c81c35" +[[package]] +name = "jsonrpc-core" +version = "18.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14f7f76aef2d054868398427f6c54943cf3d1caa9a7ec7d0c38d69df97a965eb" +dependencies = [ + "futures", + "futures-executor", + "futures-util", + "log", + "serde", + "serde_derive", + "serde_json", +] + +[[package]] +name = "jsonrpc-http-server" +version = "18.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1dea6e07251d9ce6a552abfb5d7ad6bc290a4596c8dcc3d795fae2bbdc1f3ff" +dependencies = [ + "futures", + "hyper", + "jsonrpc-core", + "jsonrpc-server-utils", + "log", + "net2", + "parking_lot", + "unicase", +] + +[[package]] +name = "jsonrpc-server-utils" +version = "18.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa4fdea130485b572c39a460d50888beb00afb3e35de23ccd7fad8ff19f0e0d4" +dependencies = [ + "bytes", + "futures", + "globset", + "jsonrpc-core", + "lazy_static", + "log", + "tokio", + "tokio-stream", + "tokio-util", + "unicase", +] + [[package]] name = "kernel32-sys" version = "0.2.2" @@ -471,6 +662,16 @@ version = "0.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7fb9b38af92608140b86b693604b9ffcc5824240a484d1ecd4795bacb2fe88f3" +[[package]] +name = "lock_api" +version = "0.4.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "327fa5b6a6940e4699ec49a9beae1ea4845c6bab9314e4f84ac68742139d8c53" +dependencies = [ + "autocfg", + "scopeguard", +] + [[package]] name = "log" version = "0.4.16" @@ -533,12 +734,26 @@ dependencies = [ "kernel32-sys", "libc", "log", - "miow", + "miow 0.2.2", "net2", "slab", "winapi 0.2.8", ] +[[package]] +name = "mio" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52da4364ffb0e4fe33a9841a98a3f3014fb964045ce4f7a45a398243c8d6b0c9" +dependencies = [ + "libc", + "log", + "miow 0.3.7", + "ntapi", + "wasi 0.11.0+wasi-snapshot-preview1", + "winapi 0.3.9", +] + [[package]] name = "mio-extras" version = "2.0.6" @@ -547,7 +762,7 @@ checksum = "52403fe290012ce777c4626790c8951324a2b9e3316b3143779c72b029742f19" dependencies = [ "lazycell", "log", - "mio", + "mio 0.6.23", "slab", ] @@ -563,6 +778,15 @@ dependencies = [ "ws2_32-sys", ] +[[package]] +name = "miow" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9f1c5b025cda876f66ef43a113f91ebc9f4ccef34843000e0adf6ebbab84e21" +dependencies = [ + "winapi 0.3.9", +] + [[package]] name = "native-tls" version = "0.2.10" @@ -614,12 +838,21 @@ dependencies = [ "fsevent-sys", "inotify", "libc", - "mio", + "mio 0.6.23", "mio-extras", "walkdir", "winapi 0.3.9", ] +[[package]] +name = "ntapi" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c28774a7fd2fbb4f0babd8237ce554b73af68021b5f695a3cebd6c59bac0980f" +dependencies = [ + "winapi 0.3.9", +] + [[package]] name = "num-integer" version = "0.1.44" @@ -713,6 +946,31 @@ version = "1.5.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c69d19a208bba8b94bd27d4b7a06ad153cddc6b88cb2149a668e23ce7bdb67d5" +[[package]] +name = "parking_lot" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99" +dependencies = [ + "instant", + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d76e8e1493bcac0d2766c42737f34458f1c8c50c0d23bcb24ea953affb273216" +dependencies = [ + "cfg-if 1.0.0", + "instant", + "libc", + "redox_syscall", + "smallvec", + "winapi 0.3.9", +] + [[package]] name = "pin-project-lite" version = "0.2.8" @@ -885,6 +1143,12 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "scopeguard" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" + [[package]] name = "security-framework" version = "2.6.1" @@ -975,6 +1239,22 @@ version = "0.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eb703cfe953bccee95685111adeedb76fabe4e97549a58d16f03ea7b9367bb32" +[[package]] +name = "smallvec" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2dd574626839106c320a323308629dcb1acfc96e32a8cba364ddc61ac23ee83" + +[[package]] +name = "socket2" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "66d72b759436ae32898a2af0a14218dbf55efde3feeb170eb623637db85ee1e0" +dependencies = [ + "libc", + "winapi 0.3.9", +] + [[package]] name = "strsim" version = "0.10.0" @@ -1052,8 +1332,80 @@ version = "1.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2af73ac49756f3f7c01172e34a23e5d0216f6c32333757c2c61feb2bbff5a5ee" dependencies = [ + "bytes", + "libc", + "memchr", + "mio 0.8.2", "num_cpus", "pin-project-lite", + "socket2", + "winapi 0.3.9", +] + +[[package]] +name = "tokio-stream" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50145484efff8818b5ccd256697f36863f587da82cf8b409c53adf1e840798e3" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "tokio-util" +version = "0.6.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e99e1983e5d376cd8eb4b66604d2e99e79f5bd988c3055891dcd8c9e2604cc0" +dependencies = [ + "bytes", + "futures-core", + "futures-sink", + "log", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "tower-service" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "360dfd1d6d30e05fda32ace2c8c70e9c0a9da713275777f5a4dbb8a1893930c6" + +[[package]] +name = "tracing" +version = "0.1.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4a1bdf54a7c28a2bbf701e1d2233f6c77f473486b94bee4f9678da5a148dca7f" +dependencies = [ + "cfg-if 1.0.0", + "pin-project-lite", + "tracing-core", +] + +[[package]] +name = "tracing-core" +version = "0.1.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "90442985ee2f57c9e1b548ee72ae842f4a9a20e3f417cc38dbc5dc684d9bb4ee" +dependencies = [ + "lazy_static", +] + +[[package]] +name = "try-lock" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642" + +[[package]] +name = "unicase" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50f37be617794602aabbeee0be4f259dc1778fabe05e2d67ee8f79326d5cb4f6" +dependencies = [ + "version_check", ] [[package]] @@ -1100,12 +1452,28 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "want" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ce8a968cb1cd110d136ff8b819a556d6fb6d919363c61534f6860c7eb172ba0" +dependencies = [ + "log", + "try-lock", +] + [[package]] name = "wasi" version = "0.10.2+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6" +[[package]] +name = "wasi" +version = "0.11.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" + [[package]] name = "winapi" version = "0.2.8" diff --git a/Cargo.toml b/Cargo.toml index 8faa94b8..9fc37cc6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ffplayout-rs" -version = "0.8.1" +version = "0.9.0" edition = "2021" [dependencies] @@ -8,6 +8,7 @@ chrono = "0.4" clap = { version = "3.1", features = ["derive"] } ffprobe = "0.3" file-rotate = "0.6" +jsonrpc-http-server = "18.0" lettre = "0.10.0-rc.5" log = "0.4" notify = "4.0" diff --git a/assets/ffplayout.yml b/assets/ffplayout.yml index 7ba7df0e..29551e12 100644 --- a/assets/ffplayout.yml +++ b/assets/ffplayout.yml @@ -1,5 +1,5 @@ general: - helptext: Sometimes it can happen, that a file is corrupt but still playable, + help_text: Sometimes it can happen, that a file is corrupt but still playable, this can produce an streaming error over all following files. The only way in this case is, to stop ffplayout and start it again. Here we only say when it stops, the starting process is in your hand. Best way is a systemd service @@ -7,10 +7,17 @@ general: value. A number below 3 can cause unexpected errors. stop_threshold: 11 +rpc_server: + help_text: Run a JSON RPC server, for getting infos about current playing, and + control for some functions. + enable: true + address: 127.0.0.1:7070 + authorization: av2Kx8g67lF9qj5wEH3ym1bI4cCs + mail: - helptext: Send error messages to email address, like missing playlist; invalid + help_text: Send error messages to email address, like missing playlist; invalid json format; missing clip path. Leave recipient blank, if you don't need this. - 'mail_level' can be WARNING or ERROR. + 'mail_level' can be INFO, WARNING or ERROR. 'interval' means seconds until a new mail will be sended. subject: "Playout Error" smtp_server: "mail.example.org" starttls: true @@ -20,7 +27,7 @@ mail: mail_level: "ERROR" logging: - helptext: Logging to file, if 'log_to_file' false log to console. 'backup_count' + help_text: Logging to file, if 'log_to_file' false log to console. 'backup_count' says how long log files will be saved in days. 'local_time' to false will set log timestamps to UTC. Path to /var/log/ only if you run this program as daemon. 'log_level' can be DEBUG, INFO, WARNING, ERROR. 'ffmpeg_level' can be info, @@ -34,7 +41,7 @@ logging: ffmpeg_level: "error" processing: - helptext: Default processing, for all clips that they get prepared in that way, + help_text: Default processing, for all clips that they get prepared in that way, so the output is unique. Set playing mode, like playlist, or folder. 'aspect' must be a float number. 'logo' is only used if the path exist. 'logo_scale' scale the logo to target size, leave it blank when no scaling @@ -60,7 +67,7 @@ processing: volume: 1 ingest: - helptext: Works not with direct hls output, it always needs full processing! Run a server + help_text: Works not with direct hls output, it always needs full processing! Run a server for a ingest stream. This stream will override the normal streaming until is done. There is no authentication, this is up to you. The recommend way is to set address to localhost, stream to a local server with authentication and from there stream to this app. @@ -68,7 +75,7 @@ ingest: input_param: -f live_flv -listen 1 -i rtmp://localhost:1936/live/stream playlist: - helptext: > + help_text: > 'path' can be a path to a single file, or a directory. For directory put only the root folder, for example '/playlists', subdirectories are read by the script. Subdirectories needs this structure '/playlists/2018/01'. 'day_start' @@ -82,7 +89,7 @@ playlist: infinit: false storage: - helptext: Play ordered or randomly files from path. 'filler_clip' is for fill + help_text: Play ordered or randomly files from path. 'filler_clip' is for fill the end to reach 24 hours, it will loop when is necessary. 'extensions' search only files with this extension. Set 'shuffle' to 'True' to pick files randomly. path: "/mediaStorage" @@ -93,7 +100,7 @@ storage: shuffle: true text: - helptext: Overlay text in combination with libzmq for remote text manipulation. + help_text: Overlay text in combination with libzmq for remote text manipulation. On windows fontfile path need to be like this 'C\:/WINDOWS/fonts/DejaVuSans.ttf'. In a standard environment the filter drawtext node is Parsed_drawtext_2. 'over_pre' if True text will be overlay in pre processing. Continue same text @@ -110,7 +117,7 @@ text: regex: ^.+[/\\](.*)(.mp4|.mkv)$ out: - helptext: The final playout compression. Set the settings to your needs. + help_text: The final playout compression. Set the settings to your needs. 'mode' has the standard options 'desktop', 'hls', 'stream'. Self made outputs can be define, by adding script in output folder with an 'output' function inside. 'preview' works only in streaming output and creates a separate preview stream. diff --git a/src/filter/mod.rs b/src/filter/mod.rs index 7c79fce8..97006783 100644 --- a/src/filter/mod.rs +++ b/src/filter/mod.rs @@ -155,18 +155,14 @@ fn overlay(node: &mut Media, chain: &mut Filters, config: &GlobalConfig) { config.processing.logo ); - if let Some(last) = &node.last { - if last.category == "advertisement" { - logo_chain.push_str(",fade=in:st=0:d=1.0:alpha=1") - } + if node.last_ad.unwrap() { + logo_chain.push_str(",fade=in:st=0:d=1.0:alpha=1") } - if let Some(next) = &node.next { - if next.category == "advertisement" { - logo_chain.push_str( - format!(",fade=out:st={}:d=1.0:alpha=1", node.out - node.seek - 1.0).as_str(), - ) - } + if node.next_ad.unwrap() { + logo_chain.push_str( + format!(",fade=out:st={}:d=1.0:alpha=1", node.out - node.seek - 1.0).as_str(), + ) } logo_chain diff --git a/src/input/folder.rs b/src/input/folder.rs index bed799ec..43ca504e 100644 --- a/src/input/folder.rs +++ b/src/input/folder.rs @@ -70,18 +70,6 @@ impl Source { fn sort(&mut self) { self.nodes.lock().unwrap().sort(); } - - fn last_next_node(&mut self) { - if self.index + 1 < self.nodes.lock().unwrap().len() { - let next_node = self.nodes.lock().unwrap()[self.index + 1].clone(); - self.current_node.next = Some(Box::new(Media::new(self.index + 1, next_node))); - } - - if self.index > 0 && self.index < self.nodes.lock().unwrap().len() { - let last_node = self.nodes.lock().unwrap()[self.index - 1].clone(); - self.current_node.last = Some(Box::new(Media::new(self.index - 1, last_node))); - } - } } impl Iterator for Source { @@ -93,14 +81,11 @@ impl Iterator for Source { self.current_node = Media::new(self.index, current_file); self.current_node.add_probe(); self.current_node.add_filter(); - self.last_next_node(); self.index += 1; Some(self.current_node.clone()) } else { - let last = self.current_node.clone(); - if self.config.storage.shuffle { info!("Shuffle files"); self.shuffle(); @@ -113,8 +98,6 @@ impl Iterator for Source { self.current_node = Media::new(self.index, current_file); self.current_node.add_probe(); self.current_node.add_filter(); - self.last_next_node(); - self.current_node.last = Some(Box::new(last)); self.index = 1; diff --git a/src/input/ingest.rs b/src/input/ingest.rs index b14bcd27..b411d25f 100644 --- a/src/input/ingest.rs +++ b/src/input/ingest.rs @@ -2,16 +2,16 @@ use std::{ io::{BufReader, Error, Read}, path::Path, process::{Command, Stdio}, - sync::{mpsc::SyncSender, Arc, Mutex}, + sync::{mpsc::SyncSender}, thread::sleep, time::Duration, }; -use process_control::{ChildExt, Terminator}; +use process_control::ChildExt; use simplelog::*; use tokio::runtime::Handle; -use crate::utils::{stderr_reader, GlobalConfig}; +use crate::utils::{stderr_reader, GlobalConfig, ProcessControl}; fn overlay(config: &GlobalConfig) -> String { let mut logo_chain = String::new(); @@ -57,9 +57,7 @@ pub async fn ingest_server( log_format: String, ingest_sender: SyncSender<(usize, [u8; 65088])>, rt_handle: Handle, - proc_terminator: Arc>>, - is_terminated: Arc>, - server_is_running: Arc>, + proc_control: ProcessControl, ) -> Result<(), Error> { let config = GlobalConfig::global(); let mut buffer: [u8; 65088] = [0; 65088]; @@ -101,7 +99,7 @@ pub async fn ingest_server( debug!("Server CMD: \"ffmpeg {}\"", server_cmd.join(" ")); loop { - if *is_terminated.lock().unwrap() { + if *proc_control.is_terminated.lock().unwrap() { break; } let mut server_proc = match Command::new("ffmpeg") @@ -118,7 +116,7 @@ pub async fn ingest_server( }; let serv_terminator = server_proc.terminator()?; - *proc_terminator.lock().unwrap() = Some(serv_terminator); + *proc_control.server_term.lock().unwrap() = Some(serv_terminator); rt_handle.spawn(stderr_reader( server_proc.stderr.take().unwrap(), @@ -139,7 +137,7 @@ pub async fn ingest_server( }; if !is_running { - *server_is_running.lock().unwrap() = true; + *proc_control.server_is_running.lock().unwrap() = true; is_running = true; } @@ -147,7 +145,7 @@ pub async fn ingest_server( if let Err(e) = ingest_sender.send((bytes_len, buffer)) { error!("Ingest server write error: {:?}", e); - *is_terminated.lock().unwrap() = true; + *proc_control.is_terminated.lock().unwrap() = true; break; } } else { @@ -155,7 +153,7 @@ pub async fn ingest_server( } } - *server_is_running.lock().unwrap() = false; + *proc_control.server_is_running.lock().unwrap() = false; sleep(Duration::from_secs(1)); diff --git a/src/input/playlist.rs b/src/input/playlist.rs index b5e9d32f..5d3d87f0 100644 --- a/src/input/playlist.rs +++ b/src/input/playlist.rs @@ -142,13 +142,18 @@ impl CurrentProgram { } } - fn last_next_node(&mut self) { - if self.index + 1 < self.nodes.len() { - self.current_node.next = Some(Box::new(self.nodes[self.index + 1].clone())); + fn last_next_ad(&mut self) { + if self.index + 1 < self.nodes.len() + && self.nodes[self.index + 1].category == "advertisement".to_string() + { + self.current_node.next_ad = Some(true); } - if self.index > 0 && self.index < self.nodes.len() { - self.current_node.last = Some(Box::new(self.nodes[self.index - 1].clone())); + if self.index > 0 + && self.index < self.nodes.len() + && self.nodes[self.index - 1].category == "advertisement".to_string() + { + self.current_node.last_ad = Some(true); } } @@ -242,7 +247,7 @@ impl Iterator for CurrentProgram { } } - self.last_next_node(); + self.last_next_ad(); return Some(self.current_node.clone()); } @@ -256,7 +261,7 @@ impl Iterator for CurrentProgram { } self.current_node = timed_source(self.nodes[self.index].clone(), &self.config, is_last); - self.last_next_node(); + self.last_next_ad(); self.index += 1; // update playlist should happen after current clip, @@ -265,7 +270,7 @@ impl Iterator for CurrentProgram { Some(self.current_node.clone()) } else { let last_playlist = self.json_path.clone(); - let last = self.current_node.clone(); + let last_ad = self.current_node.last_ad.clone(); self.check_for_next_playlist(); let (_, total_delta) = get_delta(&self.config.playlist.start_sec.unwrap()); @@ -274,7 +279,6 @@ impl Iterator for CurrentProgram { { // Test if playlist is to early finish, // and if we have to fill it with a placeholder. - self.index += 1; self.current_node = Media::new(self.index, "".to_string()); self.current_node.begin = Some(get_sec()); let mut duration = total_delta.abs(); @@ -286,17 +290,20 @@ impl Iterator for CurrentProgram { self.current_node.out = duration; self.current_node = gen_source(self.current_node.clone()); self.nodes.push(self.current_node.clone()); + self.last_next_ad(); - self.current_node.last = Some(Box::new(last)); + self.current_node.last_ad = last_ad; self.current_node.add_filter(); + self.index += 1; + return Some(self.current_node.clone()); } self.index = 0; self.current_node = gen_source(self.nodes[self.index].clone()); - self.last_next_node(); - self.current_node.last = Some(Box::new(last)); + self.last_next_ad(); + self.current_node.last_ad = last_ad; self.index = 1; diff --git a/src/main.rs b/src/main.rs index 4d5372f8..ee4a65ed 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,35 +1,39 @@ extern crate log; extern crate simplelog; -use std::sync::{Arc, Mutex}; +use simplelog::*; +use tokio::runtime::Builder; mod filter; mod input; mod output; mod utils; -use simplelog::*; -use tokio::runtime::Builder; - use crate::output::{player, write_hls}; -use crate::utils::{init_config, init_logging, validate_ffmpeg, GlobalConfig}; +use crate::utils::{init_config, init_logging, validate_ffmpeg, run_rpc, GlobalConfig, ProcessControl}; fn main() { init_config(); let config = GlobalConfig::global(); + let proc_control = ProcessControl::new(); let runtime = Builder::new_multi_thread().enable_all().build().unwrap(); let rt_handle = runtime.handle(); - let is_terminated: Arc> = Arc::new(Mutex::new(false)); - let logging = init_logging(rt_handle.clone(), is_terminated.clone()); + let logging = init_logging(rt_handle.clone(), proc_control.is_terminated.clone()); CombinedLogger::init(logging).unwrap(); validate_ffmpeg(); - if config.out.mode.to_lowercase() == "hls".to_string() { - write_hls(rt_handle, is_terminated); - } else { - player(rt_handle, is_terminated); + if config.rpc_server.enable { + rt_handle.spawn(run_rpc(proc_control.clone())); } + + if config.out.mode.to_lowercase() == "hls".to_string() { + write_hls(rt_handle, proc_control.is_terminated.clone()); + } else { + player(rt_handle, proc_control); + } + + info!("Playout done..."); } diff --git a/src/output/mod.rs b/src/output/mod.rs index 0849a5fd..2982c9d3 100644 --- a/src/output/mod.rs +++ b/src/output/mod.rs @@ -3,7 +3,7 @@ use std::{ io::{prelude::*, BufReader, BufWriter, Read}, path::Path, process, - process::{Child, Command, Stdio}, + process::{Command, Stdio}, sync::{ mpsc::{channel, sync_channel, Receiver, SyncSender}, Arc, Mutex, @@ -12,7 +12,7 @@ use std::{ time::Duration, }; -use process_control::Terminator; +use process_control::ChildExt; use simplelog::*; use tokio::runtime::Handle; @@ -23,62 +23,7 @@ mod stream; pub use hls::write_hls; use crate::input::{ingest_server, watch_folder, CurrentProgram, Source}; -use crate::utils::{sec_to_time, stderr_reader, GlobalConfig, Media}; - -#[derive(Debug)] -struct ProcessCleanup { - server_term: Arc>>, - is_terminated: Arc>, - enc_proc: Child, - is_alive: bool, -} - -impl ProcessCleanup { - fn new( - server_term: Arc>>, - is_terminated: Arc>, - enc_proc: Child, - ) -> Self { - Self { - server_term, - is_terminated, - enc_proc, - is_alive: true, - } - } -} - -impl ProcessCleanup { - fn kill(&mut self) { - *self.is_terminated.lock().unwrap() = true; - - if self.is_alive { - if let Some(server) = &*self.server_term.lock().unwrap() { - unsafe { - if let Ok(_) = server.terminate() { - info!("Terminate ingest server done"); - } - } - }; - - self.is_alive = false; - } - - if let Ok(_) = self.enc_proc.kill() { - info!("Playout done...") - } - - if let Err(e) = self.enc_proc.wait() { - error!("Encoder: {e}") - }; - } -} - -impl Drop for ProcessCleanup { - fn drop(&mut self) { - self.kill() - } -} +use crate::utils::{sec_to_time, stderr_reader, GlobalConfig, Media, ProcessControl}; pub fn source_generator( rt_handle: &Handle, @@ -127,17 +72,17 @@ pub fn source_generator( (get_source, init_playlist) } -pub fn player(rt_handle: &Handle, is_terminated: Arc>) { +pub fn player(rt_handle: &Handle, proc_control: ProcessControl) { let config = GlobalConfig::global(); let dec_settings = config.processing.clone().settings.unwrap(); let ff_log_format = format!("level+{}", config.logging.ffmpeg_level.to_lowercase()); - let server_term: Arc>> = Arc::new(Mutex::new(None)); + let server_is_running: Arc> = Arc::new(Mutex::new(false)); let mut buffer: [u8; 65088] = [0; 65088]; let mut live_on = false; let (get_source, init_playlist) = - source_generator(rt_handle, config.clone(), is_terminated.clone()); + source_generator(rt_handle, config.clone(), proc_control.is_terminated.clone()); let mut enc_proc = match config.out.mode.as_str() { "desktop" => desktop::output(ff_log_format.clone()), @@ -162,17 +107,13 @@ pub fn player(rt_handle: &Handle, is_terminated: Arc>) { ff_log_format.clone(), ingest_sender, rt_handle.clone(), - server_term.clone(), - is_terminated.clone(), - server_is_running.clone(), + proc_control.clone(), )); } - let mut proc_cleanup = - ProcessCleanup::new(server_term.clone(), is_terminated.clone(), enc_proc); - 'source_iter: for node in get_source { - println!("{:?}", &node.clone()); + *proc_control.current_media.lock().unwrap() = Some(node.clone()); + let cmd = match node.cmd { Some(cmd) => cmd, None => break, @@ -223,6 +164,10 @@ pub fn player(rt_handle: &Handle, is_terminated: Arc>) { "Decoder".to_string(), )); + if let Ok(dec_terminator) = dec_proc.terminator() { + *proc_control.decoder_term.lock().unwrap() = Some(dec_terminator); + }; + loop { if *server_is_running.lock().unwrap() { if !live_on { @@ -291,5 +236,7 @@ pub fn player(rt_handle: &Handle, is_terminated: Arc>) { sleep(Duration::from_secs(1)); - proc_cleanup.kill(); + if let Err(e) = enc_proc.wait() { + panic!("Encoder error: {:?}", e) + }; } diff --git a/src/utils/config.rs b/src/utils/config.rs index c0dd36f6..57207fe1 100644 --- a/src/utils/config.rs +++ b/src/utils/config.rs @@ -15,6 +15,7 @@ use crate::utils::{get_args, time_to_sec}; #[derive(Debug, Serialize, Deserialize, Clone)] pub struct GlobalConfig { pub general: General, + pub rpc_server: RpcServer, pub mail: Mail, pub logging: Logging, pub processing: Processing, @@ -30,6 +31,13 @@ pub struct General { pub stop_threshold: f64, } +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct RpcServer { + pub enable: bool, + pub address: String, + pub authorization: String, +} + #[derive(Debug, Serialize, Deserialize, Clone)] pub struct Mail { pub subject: String, @@ -39,6 +47,7 @@ pub struct Mail { pub sender_pass: String, pub recipient: String, pub mail_level: String, + pub interval: i32, } #[derive(Debug, Serialize, Deserialize, Clone)] diff --git a/src/utils/logging.rs b/src/utils/logging.rs index 27654b85..6062a081 100644 --- a/src/utils/logging.rs +++ b/src/utils/logging.rs @@ -1,6 +1,7 @@ extern crate log; extern crate simplelog; +use chrono::prelude::*; use regex::Regex; use std::{ path::Path, @@ -51,11 +52,15 @@ fn send_mail(msg: String) { } } -async fn mail_queue(messages: Arc>>, is_terminated: Arc>) { - let mut count = 0; +async fn mail_queue( + messages: Arc>>, + is_terminated: Arc>, + interval: i32, +) { + let mut count: i32 = 0; loop { - if *is_terminated.lock().unwrap() || count == 60 { + if *is_terminated.lock().unwrap() || count == interval { // check every 30 seconds for messages and send them if messages.lock().unwrap().len() > 0 { let msg = messages.lock().unwrap().join("\n"); @@ -71,14 +76,14 @@ async fn mail_queue(messages: Arc>>, is_terminated: Arc>>, } @@ -103,21 +108,13 @@ impl Log for LogMailer { fn log(&self, record: &Record<'_>) { if self.enabled(record.metadata()) { - match record.level() { - Level::Error => { - self.messages - .lock() - .unwrap() - .push(record.args().to_string()); - } - Level::Warn => { - self.messages - .lock() - .unwrap() - .push(record.args().to_string()); - } - _ => (), - } + let local: DateTime = Local::now(); + let time_stamp: String = local.format("[%Y-%m-%d %H:%M:%S%.3f]").to_string(); + let level = record.level().to_string().to_uppercase(); + let rec = record.args().to_string(); + let full_line: String = format!("{time_stamp} [{level: >5}] {rec}"); + + self.messages.lock().unwrap().push(full_line); } } @@ -212,19 +209,24 @@ pub fn init_logging( } if config.mail.recipient.len() > 3 { - let mut filter = LevelFilter::Error; let messages: Arc>> = Arc::new(Mutex::new(Vec::new())); + let interval = config.mail.interval.clone(); - rt_handle.spawn(mail_queue(messages.clone(), is_terminated.clone())); + rt_handle.spawn(mail_queue( + messages.clone(), + is_terminated.clone(), + interval, + )); let mail_config = log_config .clone() - .set_time_format_str("[%Y-%m-%d %H:%M:%S%.3f]") .build(); - if config.mail.mail_level.to_lowercase() == "warning".to_string() { - filter = LevelFilter::Warn - } + let filter = match config.mail.mail_level.to_lowercase().as_str() { + "info" => LevelFilter::Info, + "warning" => LevelFilter::Warn, + _ => LevelFilter::Error, + }; app_logger.push(LogMailer::new(filter, mail_config, messages)); } diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 1b046f54..cf29c038 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -8,10 +8,13 @@ use std::{ path::Path, process::exit, process::{ChildStderr, Command, Stdio}, + sync::{Arc, Mutex, RwLock}, time, time::UNIX_EPOCH, }; +use jsonrpc_http_server::CloseHandle; +use process_control::Terminator; use regex::Regex; use simplelog::*; @@ -20,15 +23,88 @@ mod config; pub mod json_reader; mod json_validate; mod logging; +mod rpc_server; pub use arg_parse::get_args; pub use config::{init_config, GlobalConfig}; pub use json_reader::{read_json, Playlist, DUMMY_LEN}; pub use json_validate::validate_playlist; pub use logging::init_logging; +pub use rpc_server::run_rpc; use crate::filter::filter_chains; +#[derive(Clone)] +pub struct ProcessControl { + pub decoder_term: Arc>>, + pub encoder_term: Arc>>, + pub server_term: Arc>>, + pub server_is_running: Arc>, + pub rpc_handle: Arc>>, + pub is_terminated: Arc>, + pub is_alive: Arc>, + pub current_media: Arc>>, +} + +impl ProcessControl { + pub fn new() -> Self { + Self { + decoder_term: Arc::new(Mutex::new(None)), + encoder_term: Arc::new(Mutex::new(None)), + server_term: Arc::new(Mutex::new(None)), + server_is_running: Arc::new(Mutex::new(false)), + rpc_handle: Arc::new(Mutex::new(None)), + is_terminated: Arc::new(Mutex::new(false)), + is_alive: Arc::new(RwLock::new(true)), + current_media: Arc::new(Mutex::new(None)), + } + } +} + +impl ProcessControl { + pub fn kill_all(&mut self) { + *self.is_terminated.lock().unwrap() = true; + + if *self.is_alive.read().unwrap() { + *self.is_alive.write().unwrap() = false; + + if let Some(rpc) = &*self.rpc_handle.lock().unwrap() { + rpc.clone().close() + }; + + if let Some(server) = &*self.server_term.lock().unwrap() { + unsafe { + if let Err(e)= server.terminate() { + error!("Ingest server: {:?}", e); + } + } + }; + + if let Some(decoder) = &*self.decoder_term.lock().unwrap() { + unsafe { + if let Err(e) = decoder.terminate() { + error!("Decoder: {:?}", e); + } + } + }; + + if let Some(encoder) = &*self.encoder_term.lock().unwrap() { + unsafe { + if let Err(e) = encoder.terminate() { + error!("Encoder: {:?}", e); + } + } + }; + } + } +} + +impl Drop for ProcessControl { + fn drop(&mut self) { + self.kill_all() + } +} + #[derive(Debug, Serialize, Deserialize, Clone)] pub struct Media { pub begin: Option, @@ -42,8 +118,8 @@ pub struct Media { pub cmd: Option>, pub filter: Option>, pub probe: Option, - pub last: Option>, - pub next: Option>, + pub last_ad: Option, + pub next_ad: Option, pub process: Option, } @@ -72,8 +148,8 @@ impl Media { cmd: Some(vec!["-i".to_string(), src]), filter: Some(vec![]), probe: probe, - last: None, - next: None, + last_ad: Some(false), + next_ad: Some(false), process: Some(true), } } diff --git a/src/utils/rpc_server.rs b/src/utils/rpc_server.rs new file mode 100644 index 00000000..8d5ec0ea --- /dev/null +++ b/src/utils/rpc_server.rs @@ -0,0 +1,74 @@ +use serde_json::{Map, Number}; + +use jsonrpc_http_server::jsonrpc_core::{IoHandler, Params, Value}; +use jsonrpc_http_server::{ + hyper, AccessControlAllowOrigin, DomainsValidation, Response, RestApi, ServerBuilder, +}; +use simplelog::*; + +use crate::utils::{GlobalConfig, ProcessControl}; + +pub async fn run_rpc(proc_control: ProcessControl) { + let config = GlobalConfig::global(); + let mut io = IoHandler::default(); + let proc = proc_control.clone(); + + io.add_sync_method("player", move |params: Params| { + match params { + Params::Map(map) => { + if map.contains_key("control") && map["control"] == "next".to_string() { + if let Some(decoder) = &*proc.decoder_term.lock().unwrap() { + unsafe { + if let Ok(_) = decoder.terminate() { + info!("Skip current clip"); + return Ok(Value::String(format!("Skip current clip"))); + } + } + } + } + + if map.contains_key("media") && map["media"] == "current".to_string() { + if let Some(media) = proc.current_media.lock().unwrap().clone() { + let mut media_map = Map::new(); + media_map.insert( + "begin".to_string(), + Value::Number(Number::from_f64(media.begin.unwrap_or(0.0)).unwrap()), + ); + media_map.insert("source".to_string(), Value::String(media.source)); + + return Ok(Value::Object(media_map)); + }; + } + } + _ => return Ok(Value::String(format!("Wrong parameters..."))), + } + + Ok(Value::String(format!("no parameters set..."))) + }); + + let server = ServerBuilder::new(io) + .cors(DomainsValidation::AllowOnly(vec![ + AccessControlAllowOrigin::Null, + ])) + .request_middleware(|request: hyper::Request| { + if request.headers().contains_key("authorization") + && request.headers()["authorization"] == config.rpc_server.authorization + { + if request.uri() == "/status" { + println!("{:?}", request.headers().contains_key("authorization")); + Response::ok("Server running OK.").into() + } else { + request.into() + } + } else { + Response::bad_request("No authorization header or valid key found!").into() + } + }) + .rest_api(RestApi::Secure) + .start_http(&config.rpc_server.address.parse().unwrap()) + .expect("Unable to start RPC server"); + + *proc_control.rpc_handle.lock().unwrap() = Some(server.close_handle().clone()); + + server.wait(); +}