diff --git a/Cargo.lock b/Cargo.lock index a3f24df9..6ac29492 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -46,6 +46,21 @@ version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" +[[package]] +name = "bstr" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba3569f383e8f1598449f1a423e72e99569137b47740b1da11ef19af3d5c3223" +dependencies = [ + "memchr", +] + +[[package]] +name = "bytes" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8" + [[package]] name = "cc" version = "1.0.73" @@ -152,12 +167,13 @@ dependencies = [ [[package]] name = "ffplayout-rs" -version = "0.8.1" +version = "0.9.0" dependencies = [ "chrono", "clap", "ffprobe", "file-rotate", + "jsonrpc-http-server", "lettre", "log", "notify", @@ -219,6 +235,12 @@ dependencies = [ "miniz_oxide", ] +[[package]] +name = "fnv" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" + [[package]] name = "foreign-types" version = "0.3.2" @@ -269,18 +291,71 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7" +[[package]] +name = "futures" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f73fe65f54d1e12b726f517d3e2135ca3125a437b6d998caf1962961f7172d9e" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3083ce4b914124575708913bca19bfe887522d6e2e6d0952943f5eac4a74010" +dependencies = [ + "futures-core", + "futures-sink", +] + [[package]] name = "futures-core" version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c09fd04b7e4073ac7156a9539b57a484a8ea920f79c7c675d05d289ab6110d3" +[[package]] +name = "futures-executor" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9420b90cfa29e327d0429f19be13e7ddb68fa1cccb09d65e5706b8c7a749b8a6" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + [[package]] name = "futures-io" version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc4045962a5a5e935ee2fdedaa4e08284547402885ab326734432bed5d12966b" +[[package]] +name = "futures-macro" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33c1e13800337f4d4d7a316bf45a567dbcb6ffe087f16424852d97e97a91f512" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "futures-sink" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21163e139fa306126e6eedaf49ecdb4588f939600f0b1e770f4205ee4b7fa868" + [[package]] name = "futures-task" version = "0.3.21" @@ -293,8 +368,11 @@ version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d8b7abd5d659d9b90c8cba917f6ec750a74e2dc23902ef9cd4cc8c8b22e6036a" dependencies = [ + "futures-channel", "futures-core", "futures-io", + "futures-macro", + "futures-sink", "futures-task", "memchr", "pin-project-lite", @@ -310,7 +388,20 @@ checksum = "9be70c98951c83b8d2f8f60d7065fa6d5146873094452a1008da8c2f1e4205ad" dependencies = [ "cfg-if 1.0.0", "libc", - "wasi", + "wasi 0.10.2+wasi-snapshot-preview1", +] + +[[package]] +name = "globset" +version = "0.4.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10463d9ff00a2a068db14231982f5132edebad0d7660cd956a1c30292dbcbfbd" +dependencies = [ + "aho-corasick", + "bstr", + "fnv", + "log", + "regex", ] [[package]] @@ -345,12 +436,63 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "http" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31f4c6746584866f0feabcc69893c5b51beef3831656a968ed7ae254cdc4fd03" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + +[[package]] +name = "http-body" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ff4f84919677303da5f147645dbea6b1881f368d03ac84e1dc09031ebd7b2c6" +dependencies = [ + "bytes", + "http", + "pin-project-lite", +] + +[[package]] +name = "httparse" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9100414882e15fb7feccb4897e5f0ff0ff1ca7d1a86a23208ada4d7a18e6c6c4" + [[package]] name = "httpdate" version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421" +[[package]] +name = "hyper" +version = "0.14.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b26ae0a80afebe130861d90abf98e3814a4f28a4c6ffeb5ab8ebb2be311e0ef2" +dependencies = [ + "bytes", + "futures-channel", + "futures-core", + "futures-util", + "http", + "http-body", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "socket2", + "tokio", + "tower-service", + "tracing", + "want", +] + [[package]] name = "idna" version = "0.2.3" @@ -416,6 +558,55 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1aab8fc367588b89dcee83ab0fd66b72b50b72fa1904d7095045ace2b0c81c35" +[[package]] +name = "jsonrpc-core" +version = "18.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14f7f76aef2d054868398427f6c54943cf3d1caa9a7ec7d0c38d69df97a965eb" +dependencies = [ + "futures", + "futures-executor", + "futures-util", + "log", + "serde", + "serde_derive", + "serde_json", +] + +[[package]] +name = "jsonrpc-http-server" +version = "18.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1dea6e07251d9ce6a552abfb5d7ad6bc290a4596c8dcc3d795fae2bbdc1f3ff" +dependencies = [ + "futures", + "hyper", + "jsonrpc-core", + "jsonrpc-server-utils", + "log", + "net2", + "parking_lot", + "unicase", +] + +[[package]] +name = "jsonrpc-server-utils" +version = "18.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa4fdea130485b572c39a460d50888beb00afb3e35de23ccd7fad8ff19f0e0d4" +dependencies = [ + "bytes", + "futures", + "globset", + "jsonrpc-core", + "lazy_static", + "log", + "tokio", + "tokio-stream", + "tokio-util", + "unicase", +] + [[package]] name = "kernel32-sys" version = "0.2.2" @@ -461,9 +652,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.121" +version = "0.2.122" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "efaa7b300f3b5fe8eb6bf21ce3895e1751d9665086af2d64b42f19701015ff4f" +checksum = "ec647867e2bf0772e28c8bcde4f0d19a9216916e890543b5a03ed8ef27b8f259" [[package]] name = "linked-hash-map" @@ -471,6 +662,16 @@ version = "0.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7fb9b38af92608140b86b693604b9ffcc5824240a484d1ecd4795bacb2fe88f3" +[[package]] +name = "lock_api" +version = "0.4.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "327fa5b6a6940e4699ec49a9beae1ea4845c6bab9314e4f84ac68742139d8c53" +dependencies = [ + "autocfg", + "scopeguard", +] + [[package]] name = "log" version = "0.4.16" @@ -533,12 +734,26 @@ dependencies = [ "kernel32-sys", "libc", "log", - "miow", + "miow 0.2.2", "net2", "slab", "winapi 0.2.8", ] +[[package]] +name = "mio" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52da4364ffb0e4fe33a9841a98a3f3014fb964045ce4f7a45a398243c8d6b0c9" +dependencies = [ + "libc", + "log", + "miow 0.3.7", + "ntapi", + "wasi 0.11.0+wasi-snapshot-preview1", + "winapi 0.3.9", +] + [[package]] name = "mio-extras" version = "2.0.6" @@ -547,7 +762,7 @@ checksum = "52403fe290012ce777c4626790c8951324a2b9e3316b3143779c72b029742f19" dependencies = [ "lazycell", "log", - "mio", + "mio 0.6.23", "slab", ] @@ -563,6 +778,15 @@ dependencies = [ "ws2_32-sys", ] +[[package]] +name = "miow" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9f1c5b025cda876f66ef43a113f91ebc9f4ccef34843000e0adf6ebbab84e21" +dependencies = [ + "winapi 0.3.9", +] + [[package]] name = "native-tls" version = "0.2.10" @@ -614,12 +838,21 @@ dependencies = [ "fsevent-sys", "inotify", "libc", - "mio", + "mio 0.6.23", "mio-extras", "walkdir", "winapi 0.3.9", ] +[[package]] +name = "ntapi" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c28774a7fd2fbb4f0babd8237ce554b73af68021b5f695a3cebd6c59bac0980f" +dependencies = [ + "winapi 0.3.9", +] + [[package]] name = "num-integer" version = "0.1.44" @@ -713,6 +946,31 @@ version = "1.5.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c69d19a208bba8b94bd27d4b7a06ad153cddc6b88cb2149a668e23ce7bdb67d5" +[[package]] +name = "parking_lot" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99" +dependencies = [ + "instant", + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d76e8e1493bcac0d2766c42737f34458f1c8c50c0d23bcb24ea953affb273216" +dependencies = [ + "cfg-if 1.0.0", + "instant", + "libc", + "redox_syscall", + "smallvec", + "winapi 0.3.9", +] + [[package]] name = "pin-project-lite" version = "0.2.8" @@ -763,9 +1021,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.36" +version = "1.0.37" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c7342d5883fbccae1cc37a2353b09c87c9b0f3afd73f5fb9bba687a1f733b029" +checksum = "ec757218438d5fda206afc041538b2f6d889286160d649a86a24d37e1235afd1" dependencies = [ "unicode-xid", ] @@ -885,6 +1143,12 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "scopeguard" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" + [[package]] name = "security-framework" version = "2.6.1" @@ -975,6 +1239,22 @@ version = "0.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eb703cfe953bccee95685111adeedb76fabe4e97549a58d16f03ea7b9367bb32" +[[package]] +name = "smallvec" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2dd574626839106c320a323308629dcb1acfc96e32a8cba364ddc61ac23ee83" + +[[package]] +name = "socket2" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "66d72b759436ae32898a2af0a14218dbf55efde3feeb170eb623637db85ee1e0" +dependencies = [ + "libc", + "winapi 0.3.9", +] + [[package]] name = "strsim" version = "0.10.0" @@ -983,9 +1263,9 @@ checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" [[package]] name = "syn" -version = "1.0.90" +version = "1.0.91" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "704df27628939572cd88d33f171cd6f896f4eaca85252c6e0a72d8d8287ee86f" +checksum = "b683b2b825c8eef438b77c36a06dc262294da3d5a5813fac20da149241dcd44d" dependencies = [ "proc-macro2", "quote", @@ -1052,8 +1332,80 @@ version = "1.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2af73ac49756f3f7c01172e34a23e5d0216f6c32333757c2c61feb2bbff5a5ee" dependencies = [ + "bytes", + "libc", + "memchr", + "mio 0.8.2", "num_cpus", "pin-project-lite", + "socket2", + "winapi 0.3.9", +] + +[[package]] +name = "tokio-stream" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50145484efff8818b5ccd256697f36863f587da82cf8b409c53adf1e840798e3" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "tokio-util" +version = "0.6.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e99e1983e5d376cd8eb4b66604d2e99e79f5bd988c3055891dcd8c9e2604cc0" +dependencies = [ + "bytes", + "futures-core", + "futures-sink", + "log", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "tower-service" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "360dfd1d6d30e05fda32ace2c8c70e9c0a9da713275777f5a4dbb8a1893930c6" + +[[package]] +name = "tracing" +version = "0.1.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4a1bdf54a7c28a2bbf701e1d2233f6c77f473486b94bee4f9678da5a148dca7f" +dependencies = [ + "cfg-if 1.0.0", + "pin-project-lite", + "tracing-core", +] + +[[package]] +name = "tracing-core" +version = "0.1.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "90442985ee2f57c9e1b548ee72ae842f4a9a20e3f417cc38dbc5dc684d9bb4ee" +dependencies = [ + "lazy_static", +] + +[[package]] +name = "try-lock" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642" + +[[package]] +name = "unicase" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50f37be617794602aabbeee0be4f259dc1778fabe05e2d67ee8f79326d5cb4f6" +dependencies = [ + "version_check", ] [[package]] @@ -1100,12 +1452,28 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "want" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ce8a968cb1cd110d136ff8b819a556d6fb6d919363c61534f6860c7eb172ba0" +dependencies = [ + "log", + "try-lock", +] + [[package]] name = "wasi" version = "0.10.2+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6" +[[package]] +name = "wasi" +version = "0.11.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" + [[package]] name = "winapi" version = "0.2.8" diff --git a/Cargo.toml b/Cargo.toml index 8faa94b8..9fc37cc6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ffplayout-rs" -version = "0.8.1" +version = "0.9.0" edition = "2021" [dependencies] @@ -8,6 +8,7 @@ chrono = "0.4" clap = { version = "3.1", features = ["derive"] } ffprobe = "0.3" file-rotate = "0.6" +jsonrpc-http-server = "18.0" lettre = "0.10.0-rc.5" log = "0.4" notify = "4.0" diff --git a/assets/ffplayout.yml b/assets/ffplayout.yml index 7ba7df0e..52be5b88 100644 --- a/assets/ffplayout.yml +++ b/assets/ffplayout.yml @@ -1,5 +1,5 @@ general: - helptext: Sometimes it can happen, that a file is corrupt but still playable, + help_text: Sometimes it can happen, that a file is corrupt but still playable, this can produce an streaming error over all following files. The only way in this case is, to stop ffplayout and start it again. Here we only say when it stops, the starting process is in your hand. Best way is a systemd service @@ -7,10 +7,17 @@ general: value. A number below 3 can cause unexpected errors. stop_threshold: 11 +rpc_server: + help_text: Run a JSON RPC server, for getting infos about current playing, and + control for some functions. + enable: true + address: 127.0.0.1:7070 + authorization: av2Kx8g67lF9qj5wEH3ym1bI4cCs + mail: - helptext: Send error messages to email address, like missing playlist; invalid + help_text: Send error messages to email address, like missing playlist; invalid json format; missing clip path. Leave recipient blank, if you don't need this. - 'mail_level' can be WARNING or ERROR. + 'mail_level' can be INFO, WARNING or ERROR. 'interval' means seconds until a new mail will be sended. subject: "Playout Error" smtp_server: "mail.example.org" starttls: true @@ -18,9 +25,10 @@ mail: sender_pass: "abc123" recipient: mail_level: "ERROR" + interval: 30 logging: - helptext: Logging to file, if 'log_to_file' false log to console. 'backup_count' + help_text: Logging to file, if 'log_to_file' false log to console. 'backup_count' says how long log files will be saved in days. 'local_time' to false will set log timestamps to UTC. Path to /var/log/ only if you run this program as daemon. 'log_level' can be DEBUG, INFO, WARNING, ERROR. 'ffmpeg_level' can be info, @@ -34,7 +42,7 @@ logging: ffmpeg_level: "error" processing: - helptext: Default processing, for all clips that they get prepared in that way, + help_text: Default processing, for all clips that they get prepared in that way, so the output is unique. Set playing mode, like playlist, or folder. 'aspect' must be a float number. 'logo' is only used if the path exist. 'logo_scale' scale the logo to target size, leave it blank when no scaling @@ -60,7 +68,7 @@ processing: volume: 1 ingest: - helptext: Works not with direct hls output, it always needs full processing! Run a server + help_text: Works not with direct hls output, it always needs full processing! Run a server for a ingest stream. This stream will override the normal streaming until is done. There is no authentication, this is up to you. The recommend way is to set address to localhost, stream to a local server with authentication and from there stream to this app. @@ -68,7 +76,7 @@ ingest: input_param: -f live_flv -listen 1 -i rtmp://localhost:1936/live/stream playlist: - helptext: > + help_text: > 'path' can be a path to a single file, or a directory. For directory put only the root folder, for example '/playlists', subdirectories are read by the script. Subdirectories needs this structure '/playlists/2018/01'. 'day_start' @@ -82,7 +90,7 @@ playlist: infinit: false storage: - helptext: Play ordered or randomly files from path. 'filler_clip' is for fill + help_text: Play ordered or randomly files from path. 'filler_clip' is for fill the end to reach 24 hours, it will loop when is necessary. 'extensions' search only files with this extension. Set 'shuffle' to 'True' to pick files randomly. path: "/mediaStorage" @@ -93,7 +101,7 @@ storage: shuffle: true text: - helptext: Overlay text in combination with libzmq for remote text manipulation. + help_text: Overlay text in combination with libzmq for remote text manipulation. On windows fontfile path need to be like this 'C\:/WINDOWS/fonts/DejaVuSans.ttf'. In a standard environment the filter drawtext node is Parsed_drawtext_2. 'over_pre' if True text will be overlay in pre processing. Continue same text @@ -110,7 +118,7 @@ text: regex: ^.+[/\\](.*)(.mp4|.mkv)$ out: - helptext: The final playout compression. Set the settings to your needs. + help_text: The final playout compression. Set the settings to your needs. 'mode' has the standard options 'desktop', 'hls', 'stream'. Self made outputs can be define, by adding script in output folder with an 'output' function inside. 'preview' works only in streaming output and creates a separate preview stream. diff --git a/src/filter/mod.rs b/src/filter/mod.rs index 97006783..be7ed418 100644 --- a/src/filter/mod.rs +++ b/src/filter/mod.rs @@ -122,7 +122,7 @@ fn scale(width: i64, height: i64, aspect: f64, chain: &mut Filters, config: &Glo } fn fade(node: &mut Media, chain: &mut Filters, codec_type: String) { - let mut t = "".to_string(); + let mut t = String::new(); if codec_type == "audio".to_string() { t = "a".to_string() @@ -291,9 +291,9 @@ fn realtime_filter( config: &GlobalConfig, codec_type: String, ) { - //this realtime filter is important for HLS output to stay in sync + // this realtime filter is important for HLS output to stay in sync - let mut t = "".to_string(); + let mut t = String::new(); if codec_type == "audio".to_string() { t = "a".to_string() @@ -354,15 +354,15 @@ pub fn filter_chains(node: &mut Media) -> Vec { add_text(node, &mut filters, &config); fade(node, &mut filters, "video".into()); overlay(node, &mut filters, &config); - realtime_filter(node, &mut filters, &config, "video".into()); + realtime_filter(node, &mut filters, &config, "video".into()); add_loudnorm(node, &mut filters, &config); fade(node, &mut filters, "audio".into()); audio_volume(&mut filters, &config); - realtime_filter(node, &mut filters, &config, "audio".into()); + realtime_filter(node, &mut filters, &config, "audio".into()); let mut filter_cmd = vec![]; - let mut filter_str: String = "".to_string(); + let mut filter_str: String = String::new(); let mut filter_map: Vec = vec![]; if filters.video_chain.is_some() { diff --git a/src/input/folder.rs b/src/input/folder.rs index b2b7f3a6..737e9cc3 100644 --- a/src/input/folder.rs +++ b/src/input/folder.rs @@ -12,19 +12,24 @@ use std::{ use walkdir::WalkDir; -use crate::utils::{GlobalConfig, Media}; +use crate::utils::{get_sec, GlobalConfig, Media}; #[derive(Debug, Clone)] pub struct Source { config: GlobalConfig, - pub nodes: Arc>>, - index: usize, + pub nodes: Arc>>, + current_node: Media, + index: Arc>, } impl Source { - pub fn new() -> Self { + pub fn new( + current_list: Arc>>, + global_index: Arc>, + ) -> Self { let config = GlobalConfig::global(); - let mut file_list = vec![]; + let mut media_list = vec![]; + let mut index: usize = 0; for entry in WalkDir::new(config.storage.path.clone()) .into_iter() @@ -40,7 +45,8 @@ impl Source { .clone() .contains(&ext.unwrap().to_lowercase()) { - file_list.push(entry.path().display().to_string()); + let media = Media::new(0, entry.path().display().to_string(), false); + media_list.push(media); } } } @@ -48,25 +54,51 @@ impl Source { if config.storage.shuffle { info!("Shuffle files"); let mut rng = thread_rng(); - file_list.shuffle(&mut rng); + media_list.shuffle(&mut rng); } else { - file_list.sort(); + media_list.sort_by(|d1, d2| d1.source.cmp(&d2.source)); } + for item in media_list.iter_mut() { + item.index = Some(index); + + index += 1; + } + + *current_list.lock().unwrap() = media_list; + Self { config: config.clone(), - nodes: Arc::new(Mutex::new(file_list)), - index: 0, + nodes: current_list, + current_node: Media::new(0, String::new(), false), + index: global_index, } } fn shuffle(&mut self) { let mut rng = thread_rng(); self.nodes.lock().unwrap().shuffle(&mut rng); + let mut index: usize = 0; + + for item in self.nodes.lock().unwrap().iter_mut() { + item.index = Some(index); + + index += 1; + } } fn sort(&mut self) { - self.nodes.lock().unwrap().sort(); + self.nodes + .lock() + .unwrap() + .sort_by(|d1, d2| d1.source.cmp(&d2.source)); + let mut index: usize = 0; + + for item in self.nodes.lock().unwrap().iter_mut() { + item.index = Some(index); + + index += 1; + } } } @@ -74,14 +106,16 @@ impl Iterator for Source { type Item = Media; fn next(&mut self) -> Option { - if self.index < self.nodes.lock().unwrap().len() { - let current_file = self.nodes.lock().unwrap()[self.index].clone(); - let mut media = Media::new(self.index, current_file); - media.add_probe(); - media.add_filter(); - self.index += 1; + if *self.index.lock().unwrap() < self.nodes.lock().unwrap().len() { + let i = *self.index.lock().unwrap(); + self.current_node = self.nodes.lock().unwrap()[i].clone(); + self.current_node.add_probe(); + self.current_node.add_filter(); + self.current_node.begin = Some(get_sec()); - Some(media) + *self.index.lock().unwrap() += 1; + + Some(self.current_node.clone()) } else { if self.config.storage.shuffle { info!("Shuffle files"); @@ -91,13 +125,14 @@ impl Iterator for Source { self.sort(); } - let current_file = self.nodes.lock().unwrap()[0].clone(); - let mut media = Media::new(self.index, current_file); - media.add_probe(); - media.add_filter(); - self.index = 1; + self.current_node = self.nodes.lock().unwrap()[0].clone(); + self.current_node.add_probe(); + self.current_node.add_filter(); + self.current_node.begin = Some(get_sec()); - Some(media) + *self.index.lock().unwrap() = 1; + + Some(self.current_node.clone()) } } } @@ -106,31 +141,37 @@ fn file_extension(filename: &Path) -> Option<&str> { filename.extension().and_then(OsStr::to_str) } -pub async fn watch_folder( +pub async fn file_worker( receiver: Receiver, - sources: Arc>>, + sources: Arc>>, ) { while let Ok(res) = receiver.recv() { match res { Create(new_path) => { - sources.lock().unwrap().push(new_path.display().to_string()); + let index = sources.lock().unwrap().len(); + let media = Media::new(index, new_path.display().to_string(), false); + + sources.lock().unwrap().push(media); info!("Create new file: {:?}", new_path); } Remove(old_path) => { sources .lock() .unwrap() - .retain(|x| x != &old_path.display().to_string()); + .retain(|x| x.source != old_path.display().to_string()); info!("Remove file: {:?}", old_path); } Rename(old_path, new_path) => { - let i = sources + let index = sources .lock() .unwrap() .iter() - .position(|x| *x == old_path.display().to_string()) + .position(|x| *x.source == old_path.display().to_string()) .unwrap(); - sources.lock().unwrap()[i] = new_path.display().to_string(); + + let media = Media::new(index, new_path.display().to_string(), false); + sources.lock().unwrap()[index] = media; + info!("Rename file: {:?} to {:?}", old_path, new_path); } _ => (), diff --git a/src/input/ingest.rs b/src/input/ingest.rs index b14bcd27..b411d25f 100644 --- a/src/input/ingest.rs +++ b/src/input/ingest.rs @@ -2,16 +2,16 @@ use std::{ io::{BufReader, Error, Read}, path::Path, process::{Command, Stdio}, - sync::{mpsc::SyncSender, Arc, Mutex}, + sync::{mpsc::SyncSender}, thread::sleep, time::Duration, }; -use process_control::{ChildExt, Terminator}; +use process_control::ChildExt; use simplelog::*; use tokio::runtime::Handle; -use crate::utils::{stderr_reader, GlobalConfig}; +use crate::utils::{stderr_reader, GlobalConfig, ProcessControl}; fn overlay(config: &GlobalConfig) -> String { let mut logo_chain = String::new(); @@ -57,9 +57,7 @@ pub async fn ingest_server( log_format: String, ingest_sender: SyncSender<(usize, [u8; 65088])>, rt_handle: Handle, - proc_terminator: Arc>>, - is_terminated: Arc>, - server_is_running: Arc>, + proc_control: ProcessControl, ) -> Result<(), Error> { let config = GlobalConfig::global(); let mut buffer: [u8; 65088] = [0; 65088]; @@ -101,7 +99,7 @@ pub async fn ingest_server( debug!("Server CMD: \"ffmpeg {}\"", server_cmd.join(" ")); loop { - if *is_terminated.lock().unwrap() { + if *proc_control.is_terminated.lock().unwrap() { break; } let mut server_proc = match Command::new("ffmpeg") @@ -118,7 +116,7 @@ pub async fn ingest_server( }; let serv_terminator = server_proc.terminator()?; - *proc_terminator.lock().unwrap() = Some(serv_terminator); + *proc_control.server_term.lock().unwrap() = Some(serv_terminator); rt_handle.spawn(stderr_reader( server_proc.stderr.take().unwrap(), @@ -139,7 +137,7 @@ pub async fn ingest_server( }; if !is_running { - *server_is_running.lock().unwrap() = true; + *proc_control.server_is_running.lock().unwrap() = true; is_running = true; } @@ -147,7 +145,7 @@ pub async fn ingest_server( if let Err(e) = ingest_sender.send((bytes_len, buffer)) { error!("Ingest server write error: {:?}", e); - *is_terminated.lock().unwrap() = true; + *proc_control.is_terminated.lock().unwrap() = true; break; } } else { @@ -155,7 +153,7 @@ pub async fn ingest_server( } } - *server_is_running.lock().unwrap() = false; + *proc_control.server_is_running.lock().unwrap() = false; sleep(Duration::from_secs(1)); diff --git a/src/input/mod.rs b/src/input/mod.rs index ccdf5271..c5273137 100644 --- a/src/input/mod.rs +++ b/src/input/mod.rs @@ -3,5 +3,5 @@ pub mod ingest; pub mod playlist; pub use ingest::ingest_server; -pub use folder::{watch_folder, Source}; +pub use folder::{file_worker, Source}; pub use playlist::CurrentProgram; diff --git a/src/input/playlist.rs b/src/input/playlist.rs index 73333967..0736d954 100644 --- a/src/input/playlist.rs +++ b/src/input/playlist.rs @@ -1,14 +1,18 @@ use std::{ + fs, path::Path, sync::{Arc, Mutex}, + thread::sleep, + time::Duration, }; +use serde_json::json; use simplelog::*; use tokio::runtime::Handle; use crate::utils::{ - check_sync, gen_dummy, get_delta, get_sec, is_close, json_reader::read_json, - modified_time, seek_and_length, GlobalConfig, Media, DUMMY_LEN, + check_sync, gen_dummy, get_delta, get_sec, is_close, json_reader::read_json, modified_time, + seek_and_length, GlobalConfig, Media, PlayoutStatus, DUMMY_LEN, }; #[derive(Debug)] @@ -18,31 +22,50 @@ pub struct CurrentProgram { json_mod: Option, json_path: Option, json_date: String, - nodes: Vec, + pub nodes: Arc>>, current_node: Media, - pub init: Arc>, - index: usize, + index: Arc>, rt_handle: Handle, is_terminated: Arc>, + playout_stat: PlayoutStatus, } impl CurrentProgram { - pub fn new(rt_handle: Handle, is_terminated: Arc>) -> Self { + pub fn new( + rt_handle: Handle, + playout_stat: PlayoutStatus, + is_terminated: Arc>, + current_list: Arc>>, + global_index: Arc>, + ) -> Self { let config = GlobalConfig::global(); let json = read_json(None, rt_handle.clone(), is_terminated.clone(), true, 0.0); + *current_list.lock().unwrap() = json.program; + *playout_stat.current_date.lock().unwrap() = json.date.clone(); + + if *playout_stat.date.lock().unwrap() != json.date { + let data = json!({ + "time_shift": 0.0, + "date": json.date, + }); + + let json: String = serde_json::to_string(&data).expect("Serialize status data failed"); + fs::write(config.general.stat_file.clone(), &json).expect("Unable to write file"); + } + Self { config: config.clone(), start_sec: json.start_sec.unwrap(), json_mod: json.modified, json_path: json.current_file, json_date: json.date, - nodes: json.program, - current_node: Media::new(0, "".to_string()), - init: Arc::new(Mutex::new(true)), - index: 0, + nodes: current_list, + current_node: Media::new(0, String::new(), false), + index: global_index, rt_handle, is_terminated, + playout_stat, } } @@ -58,7 +81,7 @@ impl CurrentProgram { self.json_path = json.current_file; self.json_mod = json.modified; - self.nodes = json.program; + *self.nodes.lock().unwrap() = json.program; } else if Path::new(&self.json_path.clone().unwrap()).is_file() { let mod_time = modified_time(&self.json_path.clone().unwrap()); @@ -82,26 +105,26 @@ impl CurrentProgram { ); self.json_mod = json.modified; - self.nodes = json.program; + *self.nodes.lock().unwrap() = json.program; self.get_current_clip(); - self.index += 1; + *self.index.lock().unwrap() += 1; } } else { error!( "Playlist {} not exists!", self.json_path.clone().unwrap() ); - let mut media = Media::new(0, "".to_string()); + let mut media = Media::new(0, String::new(), false); media.begin = Some(get_sec()); media.duration = DUMMY_LEN; media.out = DUMMY_LEN; self.json_path = None; - self.nodes = vec![media.clone()]; + *self.nodes.lock().unwrap() = vec![media.clone()]; self.current_node = media; - *self.init.lock().unwrap() = true; - self.index = 0; + *self.playout_stat.list_init.lock().unwrap() = true; + *self.index.lock().unwrap() = 0; } } @@ -130,35 +153,44 @@ impl CurrentProgram { next_start, ); + let data = json!({ + "time_shift": 0.0, + "date": json.date, + }); + + *self.playout_stat.current_date.lock().unwrap() = json.date.clone(); + let status_data: String = + serde_json::to_string(&data).expect("Serialize status data failed"); + fs::write(self.config.general.stat_file.clone(), &status_data) + .expect("Unable to write file"); + self.json_path = json.current_file.clone(); self.json_mod = json.modified; self.json_date = json.date; - self.nodes = json.program; - self.index = 0; + *self.nodes.lock().unwrap() = json.program; + *self.index.lock().unwrap() = 0; if json.current_file.is_none() { - *self.init.lock().unwrap() = true; + *self.playout_stat.list_init.lock().unwrap() = true; } } } - fn is_ad(&mut self, i: usize, next: bool) -> Option { - if next { - if i + 1 < self.nodes.len() && self.nodes[i + 1].category == "advertisement".to_string() - { - return Some(true); - } else { - return Some(false); - } - } else { - if i > 0 - && i < self.nodes.len() - && self.nodes[i - 1].category == "advertisement".to_string() - { - return Some(true); - } else { - return Some(false); - } + fn last_next_ad(&mut self) { + let index = *self.index.lock().unwrap(); + let current_list = self.nodes.lock().unwrap(); + + if index + 1 < current_list.len() + && current_list[index + 1].category == "advertisement".to_string() + { + self.current_node.next_ad = Some(true); + } + + if index > 0 + && index < current_list.len() + && current_list[index - 1].category == "advertisement".to_string() + { + self.current_node.last_ad = Some(true); } } @@ -173,12 +205,20 @@ impl CurrentProgram { } fn get_current_clip(&mut self) { - let time_sec = self.get_current_time(); + let mut time_sec = self.get_current_time(); - for (i, item) in self.nodes.iter_mut().enumerate() { + if *self.playout_stat.current_date.lock().unwrap() == *self.playout_stat.date.lock().unwrap() + && *self.playout_stat.time_shift.lock().unwrap() != 0.0 + { + let shift = *self.playout_stat.time_shift.lock().unwrap(); + info!("Shift playlist start for {shift} seconds"); + time_sec += shift; + } + + for (i, item) in self.nodes.lock().unwrap().iter_mut().enumerate() { if item.begin.unwrap() + item.out - item.seek > time_sec { - *self.init.lock().unwrap() = false; - self.index = i; + *self.playout_stat.list_init.lock().unwrap() = false; + *self.index.lock().unwrap() = i; break; } @@ -188,12 +228,13 @@ impl CurrentProgram { fn init_clip(&mut self) { self.get_current_clip(); - if !*self.init.lock().unwrap() { + if !*self.playout_stat.list_init.lock().unwrap() { let time_sec = self.get_current_time(); + let index = *self.index.lock().unwrap(); // de-instance node to preserve original values in list - let mut node_clone = self.nodes[self.index].clone(); - self.index += 1; + let mut node_clone = self.nodes.lock().unwrap()[index].clone(); + *self.index.lock().unwrap() += 1; node_clone.seek = time_sec - node_clone.begin.unwrap(); self.current_node = handle_list_init(node_clone); @@ -205,7 +246,7 @@ impl Iterator for CurrentProgram { type Item = Media; fn next(&mut self) -> Option { - if *self.init.lock().unwrap() { + if *self.playout_stat.list_init.lock().unwrap() { debug!("Playlist init"); self.check_update(true); @@ -213,14 +254,15 @@ impl Iterator for CurrentProgram { self.init_clip(); } - if *self.init.lock().unwrap() { + if *self.playout_stat.list_init.lock().unwrap() { // on init load playlist, could be not long enough, // so we check if we can take the next playlist already, // or we fill the gap with a dummy. - self.current_node = self.nodes[self.nodes.len() - 1].clone(); + let list_length = self.nodes.lock().unwrap().len(); + self.current_node = self.nodes.lock().unwrap()[list_length - 1].clone(); self.check_for_next_playlist(); - let new_node = self.nodes[self.nodes.len() - 1].clone(); + let new_node = self.nodes.lock().unwrap()[list_length - 1].clone(); let new_length = new_node.begin.unwrap() + new_node.duration; if new_length @@ -235,41 +277,45 @@ impl Iterator for CurrentProgram { if DUMMY_LEN > total_delta { duration = total_delta; - *self.init.lock().unwrap() = false; + *self.playout_stat.list_init.lock().unwrap() = false; } if self.config.playlist.start_sec.unwrap() > current_time { current_time += self.config.playlist.length_sec.unwrap() + 1.0; } - let mut media = Media::new(0, "".to_string()); + let mut media = Media::new(0, String::new(), false); media.begin = Some(current_time); media.duration = duration; media.out = duration; self.current_node = gen_source(media); - self.nodes.push(self.current_node.clone()); - self.index = self.nodes.len(); + self.nodes.lock().unwrap().push(self.current_node.clone()); + *self.index.lock().unwrap() = self.nodes.lock().unwrap().len(); } } - self.current_node.last_ad = self.is_ad(self.index, false); - self.current_node.next_ad = self.is_ad(self.index, true); + self.last_next_ad(); return Some(self.current_node.clone()); } - if self.index < self.nodes.len() { + if *self.index.lock().unwrap() < self.nodes.lock().unwrap().len() { self.check_for_next_playlist(); let mut is_last = false; + let index = *self.index.lock().unwrap(); - if self.index == self.nodes.len() - 1 { + if index == self.nodes.lock().unwrap().len() - 1 { is_last = true } - self.current_node = timed_source(self.nodes[self.index].clone(), &self.config, is_last); - self.current_node.last_ad = self.is_ad(self.index, false); - self.current_node.next_ad = self.is_ad(self.index, true); - self.index += 1; + self.current_node = timed_source( + self.nodes.lock().unwrap()[index].clone(), + &self.config, + is_last, + &self.playout_stat, + ); + self.last_next_ad(); + *self.index.lock().unwrap() += 1; // update playlist should happen after current clip, // to prevent unknown behaviors. @@ -277,17 +323,17 @@ impl Iterator for CurrentProgram { Some(self.current_node.clone()) } else { let last_playlist = self.json_path.clone(); + let last_ad = self.current_node.last_ad.clone(); self.check_for_next_playlist(); let (_, total_delta) = get_delta(&self.config.playlist.start_sec.unwrap()); - let mut last_ad = self.is_ad(self.index, false); if last_playlist == self.json_path && total_delta.abs() > self.config.general.stop_threshold { // Test if playlist is to early finish, // and if we have to fill it with a placeholder. - self.index += 1; - self.current_node = Media::new(self.index, "".to_string()); + let index = *self.index.lock().unwrap(); + self.current_node = Media::new(index, String::new(), false); self.current_node.begin = Some(get_sec()); let mut duration = total_delta.abs(); @@ -297,39 +343,60 @@ impl Iterator for CurrentProgram { self.current_node.duration = duration; self.current_node.out = duration; self.current_node = gen_source(self.current_node.clone()); - self.nodes.push(self.current_node.clone()); + self.nodes.lock().unwrap().push(self.current_node.clone()); + self.last_next_ad(); - last_ad = self.is_ad(self.index, false); self.current_node.last_ad = last_ad; self.current_node.add_filter(); + *self.index.lock().unwrap() += 1; + return Some(self.current_node.clone()); } - self.current_node = gen_source(self.nodes[0].clone()); + *self.index.lock().unwrap() = 0; + self.current_node = + gen_source(self.nodes.lock().unwrap()[0].clone()); + self.last_next_ad(); self.current_node.last_ad = last_ad; - self.current_node.next_ad = self.is_ad(0, true); - self.index = 1; + *self.index.lock().unwrap() = 1; Some(self.current_node.clone()) } } } -fn timed_source(node: Media, config: &GlobalConfig, last: bool) -> Media { +fn timed_source( + node: Media, + config: &GlobalConfig, + last: bool, + playout_stat: &PlayoutStatus, +) -> Media { // prepare input clip // check begin and length from clip // return clip only if we are in 24 hours time range let (delta, total_delta) = get_delta(&node.begin.unwrap()); + let mut shifted_delta = delta; let mut new_node = node.clone(); new_node.process = Some(false); if config.playlist.length.contains(":") { - debug!("Delta: {delta:.3}"); - debug!("Total delta: {total_delta:.3}"); - let sync = check_sync(delta); + if *playout_stat.current_date.lock().unwrap() == *playout_stat.date.lock().unwrap() + && *playout_stat.time_shift.lock().unwrap() != 0.0 + { + sleep(Duration::from_millis(300)); + shifted_delta = delta - *playout_stat.time_shift.lock().unwrap(); + + debug!("Delta: {shifted_delta:.3}, shifted: {delta:.3}"); + } else { + debug!("Delta: {shifted_delta:.3}"); + } + + debug!("Total time remaining: {total_delta:.3}"); + + let sync = check_sync(shifted_delta); if !sync { new_node.cmd = None; diff --git a/src/main.rs b/src/main.rs index 4d5372f8..cc4dd126 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,35 +1,83 @@ +use std::{ + path::PathBuf, + {fs, fs::File}, +}; + extern crate log; extern crate simplelog; -use std::sync::{Arc, Mutex}; +use serde::{Deserialize, Serialize}; +use serde_json::json; +use simplelog::*; +use tokio::runtime::Builder; mod filter; mod input; mod output; mod utils; -use simplelog::*; -use tokio::runtime::Builder; - use crate::output::{player, write_hls}; -use crate::utils::{init_config, init_logging, validate_ffmpeg, GlobalConfig}; +use crate::utils::{ + init_config, init_logging, run_rpc, validate_ffmpeg, GlobalConfig, PlayerControl, + PlayoutStatus, ProcessControl, +}; + +#[derive(Serialize, Deserialize)] +struct StatusData { + time_shift: f64, + date: String, +} fn main() { init_config(); let config = GlobalConfig::global(); + let play_control = PlayerControl::new(); + let playout_stat = PlayoutStatus::new(); + let proc_control = ProcessControl::new(); + + if !PathBuf::from(config.general.stat_file.clone()).exists() { + let data = json!({ + "time_shift": 0.0, + "date": String::new(), + }); + + let json: String = serde_json::to_string(&data).expect("Serialize status data failed"); + fs::write(config.general.stat_file.clone(), &json).expect("Unable to write file"); + } else { + let stat_file = File::options() + .read(true) + .write(false) + .open(&config.general.stat_file) + .expect("Could not open status file"); + + let data: StatusData = + serde_json::from_reader(stat_file).expect("Could not read status file."); + + *playout_stat.time_shift.lock().unwrap() = data.time_shift; + *playout_stat.date.lock().unwrap() = data.date; + } let runtime = Builder::new_multi_thread().enable_all().build().unwrap(); let rt_handle = runtime.handle(); - let is_terminated: Arc> = Arc::new(Mutex::new(false)); - let logging = init_logging(rt_handle.clone(), is_terminated.clone()); + let logging = init_logging(rt_handle.clone(), proc_control.is_terminated.clone()); CombinedLogger::init(logging).unwrap(); validate_ffmpeg(); - if config.out.mode.to_lowercase() == "hls".to_string() { - write_hls(rt_handle, is_terminated); - } else { - player(rt_handle, is_terminated); + if config.rpc_server.enable { + rt_handle.spawn(run_rpc( + play_control.clone(), + playout_stat.clone(), + proc_control.clone(), + )); } + + if config.out.mode.to_lowercase() == "hls".to_string() { + write_hls(rt_handle, play_control, playout_stat, proc_control); + } else { + player(rt_handle, play_control, playout_stat, proc_control); + } + + info!("Playout done..."); } diff --git a/src/output/desktop.rs b/src/output/desktop.rs index 59c214bb..f5523b1f 100644 --- a/src/output/desktop.rs +++ b/src/output/desktop.rs @@ -29,7 +29,7 @@ pub fn output(log_format: String) -> process::Child { ); let mut filter: String = "null,".to_string(); - filter.push_str(v_drawtext::filter_node(&mut Media::new(0, "".to_string())).as_str()); + filter.push_str(v_drawtext::filter_node(&mut Media::new(0, String::new(), false)).as_str()); enc_filter = vec!["-vf".to_string(), filter]; } diff --git a/src/output/hls.rs b/src/output/hls.rs index 8186322a..2d9eeb46 100644 --- a/src/output/hls.rs +++ b/src/output/hls.rs @@ -1,16 +1,3 @@ -use std::{ - process::{Command, Stdio}, - sync::{ - Arc, Mutex, - }, -}; - -use simplelog::*; -use tokio::runtime::Handle; - -use crate::output::source_generator; -use crate::utils::{sec_to_time, stderr_reader, GlobalConfig}; - /* This module write the files compression directly to a hls (m3u8) playlist, without pre- and post-processing. @@ -30,18 +17,38 @@ out: */ -pub fn write_hls(rt_handle: &Handle, is_terminated: Arc>) { +use std::process::{Command, Stdio}; + +use simplelog::*; +use tokio::runtime::Handle; + +use crate::output::source_generator; +use crate::utils::{ + sec_to_time, stderr_reader, GlobalConfig, PlayerControl, PlayoutStatus, ProcessControl, +}; + +pub fn write_hls( + rt_handle: &Handle, + play_control: PlayerControl, + playout_stat: PlayoutStatus, + proc_control: ProcessControl, +) { let config = GlobalConfig::global(); let dec_settings = config.out.clone().output_cmd.unwrap(); let ff_log_format = format!("level+{}", config.logging.ffmpeg_level.to_lowercase()); - let (get_source, _) = source_generator( + let get_source = source_generator( rt_handle, config.clone(), - is_terminated.clone(), + play_control.current_list.clone(), + play_control.index.clone(), + playout_stat, + proc_control.is_terminated.clone(), ); for node in get_source { + *play_control.current_media.lock().unwrap() = Some(node.clone()); + let cmd = match node.cmd { Some(cmd) => cmd, None => break, diff --git a/src/output/mod.rs b/src/output/mod.rs index 0ba5a797..b3ba158d 100644 --- a/src/output/mod.rs +++ b/src/output/mod.rs @@ -3,7 +3,7 @@ use std::{ io::{prelude::*, BufReader, BufWriter, Read}, path::Path, process, - process::{Child, Command, Stdio}, + process::{Command, Stdio}, sync::{ mpsc::{channel, sync_channel, Receiver, SyncSender}, Arc, Mutex, @@ -12,7 +12,7 @@ use std::{ time::Duration, }; -use process_control::Terminator; +use process_control::ChildExt; use simplelog::*; use tokio::runtime::Handle; @@ -22,71 +22,19 @@ mod stream; pub use hls::write_hls; -use crate::input::{ingest_server, watch_folder, CurrentProgram, Source}; -use crate::utils::{sec_to_time, stderr_reader, GlobalConfig, Media}; - -#[derive(Debug)] -struct ProcessCleanup { - server_term: Arc>>, - is_terminated: Arc>, - enc_proc: Child, - is_alive: bool, -} - -impl ProcessCleanup { - fn new( - server_term: Arc>>, - is_terminated: Arc>, - enc_proc: Child, - ) -> Self { - Self { - server_term, - is_terminated, - enc_proc, - is_alive: true, - } - } -} - -impl ProcessCleanup { - fn kill(&mut self) { - *self.is_terminated.lock().unwrap() = true; - - if self.is_alive { - if let Some(server) = &*self.server_term.lock().unwrap() { - unsafe { - if let Ok(_) = server.terminate() { - info!("Terminate ingest server done"); - } - } - }; - - self.is_alive = false; - } - - if let Ok(_) = self.enc_proc.kill() { - info!("Playout done...") - } - - if let Err(e) = self.enc_proc.wait() { - error!("Encoder: {e}") - }; - } -} - -impl Drop for ProcessCleanup { - fn drop(&mut self) { - self.kill() - } -} +use crate::input::{file_worker, ingest_server, CurrentProgram, Source}; +use crate::utils::{ + sec_to_time, stderr_reader, GlobalConfig, Media, PlayerControl, PlayoutStatus, ProcessControl, +}; pub fn source_generator( rt_handle: &Handle, config: GlobalConfig, + current_list: Arc>>, + index: Arc>, + playout_stat: PlayoutStatus, is_terminated: Arc>, -) -> (Box>, Arc>) { - let mut init_playlist: Arc> = Arc::new(Mutex::new(false)); - +) -> Box> { let get_source = match config.processing.clone().mode.as_str() { "folder" => { let path = config.storage.path.clone(); @@ -97,24 +45,29 @@ pub fn source_generator( info!("Playout in folder mode."); - let folder_source = Source::new(); - let (sender, receiver) = channel(); - let mut watcher = watcher(sender, Duration::from_secs(2)).unwrap(); + let folder_source = Source::new(current_list, index); - watcher + let (sender, receiver) = channel(); + let mut watchman = watcher(sender, Duration::from_secs(2)).unwrap(); + watchman .watch(path.clone(), RecursiveMode::Recursive) .unwrap(); debug!("Monitor folder: {}", path); - rt_handle.spawn(watch_folder(receiver, Arc::clone(&folder_source.nodes))); + rt_handle.spawn(file_worker(receiver, folder_source.nodes.clone())); Box::new(folder_source) as Box> } "playlist" => { info!("Playout in playlist mode"); - let program = CurrentProgram::new(rt_handle.clone(), is_terminated.clone()); - init_playlist = program.init.clone(); + let program = CurrentProgram::new( + rt_handle.clone(), + playout_stat, + is_terminated.clone(), + current_list, + index, + ); Box::new(program) as Box> } @@ -124,20 +77,30 @@ pub fn source_generator( } }; - (get_source, init_playlist) + get_source } -pub fn player(rt_handle: &Handle, is_terminated: Arc>) { +pub fn player( + rt_handle: &Handle, + play_control: PlayerControl, + playout_stat: PlayoutStatus, + proc_control: ProcessControl, +) { let config = GlobalConfig::global(); let dec_settings = config.processing.clone().settings.unwrap(); let ff_log_format = format!("level+{}", config.logging.ffmpeg_level.to_lowercase()); - let server_term: Arc>> = Arc::new(Mutex::new(None)); - let server_is_running: Arc> = Arc::new(Mutex::new(false)); let mut buffer: [u8; 65088] = [0; 65088]; let mut live_on = false; + let playlist_init = playout_stat.list_init.clone(); - let (get_source, init_playlist) = - source_generator(rt_handle, config.clone(), is_terminated.clone()); + let get_source = source_generator( + rt_handle, + config.clone(), + play_control.current_list.clone(), + play_control.index.clone(), + playout_stat, + proc_control.is_terminated.clone(), + ); let mut enc_proc = match config.out.mode.as_str() { "desktop" => desktop::output(ff_log_format.clone()), @@ -162,16 +125,13 @@ pub fn player(rt_handle: &Handle, is_terminated: Arc>) { ff_log_format.clone(), ingest_sender, rt_handle.clone(), - server_term.clone(), - is_terminated.clone(), - server_is_running.clone(), + proc_control.clone(), )); } - let mut proc_cleanup = - ProcessCleanup::new(server_term.clone(), is_terminated.clone(), enc_proc); - 'source_iter: for node in get_source { + *play_control.current_media.lock().unwrap() = Some(node.clone()); + let cmd = match node.cmd { Some(cmd) => cmd, None => break, @@ -222,8 +182,12 @@ pub fn player(rt_handle: &Handle, is_terminated: Arc>) { "Decoder".to_string(), )); + if let Ok(dec_terminator) = dec_proc.terminator() { + *proc_control.decoder_term.lock().unwrap() = Some(dec_terminator); + }; + loop { - if *server_is_running.lock().unwrap() { + if *proc_control.server_is_running.lock().unwrap() { if !live_on { info!("Switch from {} to live ingest", config.processing.mode); @@ -241,7 +205,7 @@ pub fn player(rt_handle: &Handle, is_terminated: Arc>) { live_on = true; - *init_playlist.lock().unwrap() = true; + *playlist_init.lock().unwrap() = true; } if let Ok(receive) = ingest_receiver.try_recv() { @@ -290,5 +254,11 @@ pub fn player(rt_handle: &Handle, is_terminated: Arc>) { sleep(Duration::from_secs(1)); - proc_cleanup.kill(); + if let Err(e) = enc_proc.kill() { + panic!("Encoder error: {:?}", e) + }; + + if let Err(e) = enc_proc.wait() { + panic!("Encoder error: {:?}", e) + }; } diff --git a/src/output/stream.rs b/src/output/stream.rs index 39328788..aaee2a89 100644 --- a/src/output/stream.rs +++ b/src/output/stream.rs @@ -32,7 +32,7 @@ pub fn output(log_format: String) -> process::Child { ); let mut filter: String = "[0:v]null,".to_string(); - filter.push_str(v_drawtext::filter_node(&mut Media::new(0, "".to_string())).as_str()); + filter.push_str(v_drawtext::filter_node(&mut Media::new(0, String::new(), false)).as_str()); if config.out.preview { filter.push_str(",split=2[v_out1][v_out2]"); diff --git a/src/utils/config.rs b/src/utils/config.rs index c0dd36f6..af56df6e 100644 --- a/src/utils/config.rs +++ b/src/utils/config.rs @@ -15,6 +15,7 @@ use crate::utils::{get_args, time_to_sec}; #[derive(Debug, Serialize, Deserialize, Clone)] pub struct GlobalConfig { pub general: General, + pub rpc_server: RpcServer, pub mail: Mail, pub logging: Logging, pub processing: Processing, @@ -28,6 +29,16 @@ pub struct GlobalConfig { #[derive(Debug, Serialize, Deserialize, Clone)] pub struct General { pub stop_threshold: f64, + + #[serde(skip_serializing, skip_deserializing)] + pub stat_file: String, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct RpcServer { + pub enable: bool, + pub address: String, + pub authorization: String, } #[derive(Debug, Serialize, Deserialize, Clone)] @@ -39,6 +50,7 @@ pub struct Mail { pub sender_pass: String, pub recipient: String, pub mail_level: String, + pub interval: i32, } #[derive(Debug, Serialize, Deserialize, Clone)] @@ -118,8 +130,6 @@ pub struct Out { pub output_cmd: Option>, } -static INSTANCE: OnceCell = OnceCell::new(); - impl GlobalConfig { fn new() -> Self { let args = get_args(); @@ -139,7 +149,8 @@ impl GlobalConfig { Err(err) => { println!( "{:?} doesn't exists!\n{}\n\nSystem error: {err}", - config_path, "Put \"ffplayout.yml\" in \"/etc/playout/\" or beside the executable!" + config_path, + "Put \"ffplayout.yml\" in \"/etc/playout/\" or beside the executable!" ); process::exit(0x0100); } @@ -147,6 +158,10 @@ impl GlobalConfig { let mut config: GlobalConfig = serde_yaml::from_reader(f).expect("Could not read config file."); + config.general.stat_file = env::temp_dir() + .join("ffplayout_status.json") + .display() + .to_string(); let fps = config.processing.fps.to_string(); let bitrate = config.processing.width * config.processing.height / 10; config.playlist.start_sec = Some(time_to_sec(&config.playlist.day_start)); @@ -244,6 +259,8 @@ impl GlobalConfig { } } +static INSTANCE: OnceCell = OnceCell::new(); + fn pre_audio_codec(add_loudnorm: bool) -> Vec { // when add_loudnorm is False we use a different audio encoder, // s302m has higher quality, but is experimental diff --git a/src/utils/json_reader.rs b/src/utils/json_reader.rs index 8fcd8697..6cf66ae4 100644 --- a/src/utils/json_reader.rs +++ b/src/utils/json_reader.rs @@ -23,7 +23,7 @@ pub struct Playlist { impl Playlist { fn new(date: String, start: f64) -> Self { - let mut media = Media::new(0, "".to_string()); + let mut media = Media::new(0, String::new(), false); media.begin = Some(start); media.duration = DUMMY_LEN; media.out = DUMMY_LEN; @@ -31,7 +31,7 @@ impl Playlist { date, start_sec: Some(start), current_file: None, - modified: Some("".to_string()), + modified: Some(String::new()), program: vec![media], } } diff --git a/src/utils/logging.rs b/src/utils/logging.rs index 27654b85..6062a081 100644 --- a/src/utils/logging.rs +++ b/src/utils/logging.rs @@ -1,6 +1,7 @@ extern crate log; extern crate simplelog; +use chrono::prelude::*; use regex::Regex; use std::{ path::Path, @@ -51,11 +52,15 @@ fn send_mail(msg: String) { } } -async fn mail_queue(messages: Arc>>, is_terminated: Arc>) { - let mut count = 0; +async fn mail_queue( + messages: Arc>>, + is_terminated: Arc>, + interval: i32, +) { + let mut count: i32 = 0; loop { - if *is_terminated.lock().unwrap() || count == 60 { + if *is_terminated.lock().unwrap() || count == interval { // check every 30 seconds for messages and send them if messages.lock().unwrap().len() > 0 { let msg = messages.lock().unwrap().join("\n"); @@ -71,14 +76,14 @@ async fn mail_queue(messages: Arc>>, is_terminated: Arc>>, } @@ -103,21 +108,13 @@ impl Log for LogMailer { fn log(&self, record: &Record<'_>) { if self.enabled(record.metadata()) { - match record.level() { - Level::Error => { - self.messages - .lock() - .unwrap() - .push(record.args().to_string()); - } - Level::Warn => { - self.messages - .lock() - .unwrap() - .push(record.args().to_string()); - } - _ => (), - } + let local: DateTime = Local::now(); + let time_stamp: String = local.format("[%Y-%m-%d %H:%M:%S%.3f]").to_string(); + let level = record.level().to_string().to_uppercase(); + let rec = record.args().to_string(); + let full_line: String = format!("{time_stamp} [{level: >5}] {rec}"); + + self.messages.lock().unwrap().push(full_line); } } @@ -212,19 +209,24 @@ pub fn init_logging( } if config.mail.recipient.len() > 3 { - let mut filter = LevelFilter::Error; let messages: Arc>> = Arc::new(Mutex::new(Vec::new())); + let interval = config.mail.interval.clone(); - rt_handle.spawn(mail_queue(messages.clone(), is_terminated.clone())); + rt_handle.spawn(mail_queue( + messages.clone(), + is_terminated.clone(), + interval, + )); let mail_config = log_config .clone() - .set_time_format_str("[%Y-%m-%d %H:%M:%S%.3f]") .build(); - if config.mail.mail_level.to_lowercase() == "warning".to_string() { - filter = LevelFilter::Warn - } + let filter = match config.mail.mail_level.to_lowercase().as_str() { + "info" => LevelFilter::Info, + "warning" => LevelFilter::Warn, + _ => LevelFilter::Error, + }; app_logger.push(LogMailer::new(filter, mail_config, messages)); } diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 69cd733c..c3978a59 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -1,34 +1,146 @@ use chrono::prelude::*; use chrono::Duration; use ffprobe::{ffprobe, Format, Stream}; -use serde::{Deserialize, Serialize}; use std::{ + fs, fs::metadata, io::{BufRead, BufReader, Error}, path::Path, process::exit, process::{ChildStderr, Command, Stdio}, + sync::{Arc, Mutex, RwLock}, time, time::UNIX_EPOCH, }; +use jsonrpc_http_server::CloseHandle; +use process_control::Terminator; use regex::Regex; use simplelog::*; +use serde::{Deserialize, Serialize}; +use serde_json::json; mod arg_parse; mod config; pub mod json_reader; mod json_validate; mod logging; +mod rpc_server; pub use arg_parse::get_args; pub use config::{init_config, GlobalConfig}; pub use json_reader::{read_json, Playlist, DUMMY_LEN}; pub use json_validate::validate_playlist; pub use logging::init_logging; +pub use rpc_server::run_rpc; use crate::filter::filter_chains; +#[derive(Clone)] +pub struct ProcessControl { + pub decoder_term: Arc>>, + pub encoder_term: Arc>>, + pub server_term: Arc>>, + pub server_is_running: Arc>, + pub rpc_handle: Arc>>, + pub is_terminated: Arc>, + pub is_alive: Arc>, +} + +impl ProcessControl { + pub fn new() -> Self { + Self { + decoder_term: Arc::new(Mutex::new(None)), + encoder_term: Arc::new(Mutex::new(None)), + server_term: Arc::new(Mutex::new(None)), + server_is_running: Arc::new(Mutex::new(false)), + rpc_handle: Arc::new(Mutex::new(None)), + is_terminated: Arc::new(Mutex::new(false)), + is_alive: Arc::new(RwLock::new(true)), + } + } +} + +impl ProcessControl { + pub fn kill_all(&mut self) { + *self.is_terminated.lock().unwrap() = true; + + if *self.is_alive.read().unwrap() { + *self.is_alive.write().unwrap() = false; + + if let Some(rpc) = &*self.rpc_handle.lock().unwrap() { + rpc.clone().close() + }; + + if let Some(server) = &*self.server_term.lock().unwrap() { + unsafe { + if let Err(e) = server.terminate() { + error!("Ingest server: {:?}", e); + } + } + }; + + if let Some(decoder) = &*self.decoder_term.lock().unwrap() { + unsafe { + if let Err(e) = decoder.terminate() { + error!("Decoder: {:?}", e); + } + } + }; + + if let Some(encoder) = &*self.encoder_term.lock().unwrap() { + unsafe { + if let Err(e) = encoder.terminate() { + error!("Encoder: {:?}", e); + } + } + }; + } + } +} + +impl Drop for ProcessControl { + fn drop(&mut self) { + self.kill_all() + } +} + +#[derive(Clone, Debug)] +pub struct PlayoutStatus { + pub time_shift: Arc>, + pub date: Arc>, + pub current_date: Arc>, + pub list_init: Arc>, +} + +impl PlayoutStatus { + pub fn new() -> Self { + Self { + time_shift: Arc::new(Mutex::new(0.0)), + date: Arc::new(Mutex::new(String::new())), + current_date: Arc::new(Mutex::new(String::new())), + list_init: Arc::new(Mutex::new(true)), + } + } +} + +#[derive(Clone)] +pub struct PlayerControl { + pub current_media: Arc>>, + pub current_list: Arc>>, + pub index: Arc>, +} + +impl PlayerControl { + pub fn new() -> Self { + Self { + current_media: Arc::new(Mutex::new(None)), + current_list: Arc::new(Mutex::new(vec![Media::new(0, String::new(), false)])), + index: Arc::new(Mutex::new(0)), + } + } +} + #[derive(Debug, Serialize, Deserialize, Clone)] pub struct Media { pub begin: Option, @@ -48,11 +160,11 @@ pub struct Media { } impl Media { - pub fn new(index: usize, src: String) -> Self { + pub fn new(index: usize, src: String, do_probe: bool) -> Self { let mut duration: f64 = 0.0; let mut probe = None; - if Path::new(&src).is_file() { + if do_probe && Path::new(&src).is_file() { probe = Some(MediaProbe::new(src.clone())); duration = match probe.clone().unwrap().format.unwrap().duration { @@ -67,7 +179,7 @@ impl Media { seek: 0.0, out: duration, duration: duration, - category: "".to_string(), + category: String::new(), source: src.clone(), cmd: Some(vec!["-i".to_string(), src]), filter: Some(vec![]), @@ -79,7 +191,18 @@ impl Media { } pub fn add_probe(&mut self) { - self.probe = Some(MediaProbe::new(self.source.clone())) + let probe = MediaProbe::new(self.source.clone()); + self.probe = Some(probe.clone()); + + if self.duration == 0.0 { + let duration = match probe.format.unwrap().duration { + Some(dur) => dur.parse().unwrap(), + None => 0.0, + }; + + self.out = duration; + self.duration = duration; + } } pub fn add_filter(&mut self) { @@ -151,6 +274,21 @@ impl MediaProbe { } } +pub fn write_status(date: String, shift: f64) { + let config = GlobalConfig::global(); + let stat_file = config.general.stat_file.clone(); + + let data = json!({ + "time_shift": shift, + "date": date, + }); + + let status_data: String = serde_json::to_string(&data) + .expect("Serialize status data failed"); + fs::write(stat_file, &status_data) + .expect("Unable to write file"); +} + // pub fn get_timestamp() -> i64 { // let local: DateTime = Local::now(); @@ -308,7 +446,7 @@ pub fn seek_and_length(src: String, seek: f64, out: f64, duration: f64) -> Vec Result<(), Error> { - // read ffmpeg stderr decoder and encoder instance + // read ffmpeg stderr decoder, encoder and server instance // and log the output fn format_line(line: String, level: String) -> String { diff --git a/src/utils/rpc_server.rs b/src/utils/rpc_server.rs new file mode 100644 index 00000000..d5c4d34c --- /dev/null +++ b/src/utils/rpc_server.rs @@ -0,0 +1,174 @@ +use std::sync::{Arc, Mutex}; + +use jsonrpc_http_server::jsonrpc_core::{IoHandler, Params, Value}; +use jsonrpc_http_server::{ + hyper, AccessControlAllowOrigin, DomainsValidation, Response, RestApi, ServerBuilder, +}; +use process_control::Terminator; +use serde_json::{json, Map}; +use simplelog::*; + +use crate::utils::{ + get_delta, get_sec, sec_to_time, write_status, GlobalConfig, Media, PlayerControl, + PlayoutStatus, ProcessControl, +}; + +fn get_media_map(media: Media) -> Value { + json!({ + "seek": media.seek, + "out": media.out, + "duration": media.duration, + "category": media.category, + "source": media.source, + }) +} + +fn get_data_map(config: &GlobalConfig, media: Media) -> Map { + let mut data_map = Map::new(); + let begin = media.begin.unwrap_or(0.0); + + data_map.insert("play_mode".to_string(), json!(config.processing.mode)); + data_map.insert("index".to_string(), json!(media.index)); + data_map.insert("start_sec".to_string(), json!(begin)); + + if begin > 0.0 { + let played_time = get_sec() - begin; + let remaining_time = media.out - played_time; + + data_map.insert("start_time".to_string(), json!(sec_to_time(begin))); + data_map.insert("played_sec".to_string(), json!(played_time)); + data_map.insert("remaining_sec".to_string(), json!(remaining_time)); + } + + data_map.insert("current_media".to_string(), get_media_map(media)); + + data_map +} + +fn kill_decoder(terminator: Arc>>) -> Result<(), String> { + match &*terminator.lock().unwrap() { + Some(decoder) => unsafe { + if let Err(e) = decoder.terminate() { + return Err(format!("Terminate decoder: {e}")); + } + }, + None => return Err("No decoder terminator found".to_string()), + } + + Ok(()) +} + +pub async fn run_rpc( + play_control: PlayerControl, + playout_stat: PlayoutStatus, + proc_control: ProcessControl, +) { + let config = GlobalConfig::global(); + let mut io = IoHandler::default(); + let play = play_control.clone(); + let proc = proc_control.clone(); + + io.add_sync_method("player", move |params: Params| { + match params { + Params::Map(map) => { + if map.contains_key("control") && map["control"] == "next".to_string() { + if let Ok(_) = kill_decoder(proc.decoder_term.clone()) { + info!("Move to next clip"); + let index = *play.index.lock().unwrap(); + + if index < play.current_list.lock().unwrap().len() { + let mut data_map = Map::new(); + let mut media = play.current_list.lock().unwrap()[index].clone(); + media.add_probe(); + + let (delta, _) = get_delta(&media.begin.unwrap_or(0.0)); + *playout_stat.time_shift.lock().unwrap() = delta; + write_status(playout_stat.current_date.lock().unwrap().clone(), delta); + + data_map.insert("operation".to_string(), json!("Move to next clip")); + data_map.insert("media".to_string(), get_media_map(media)); + + return Ok(Value::Object(data_map)); + } + } + return Ok(Value::String("Move failed".to_string())); + } + + if map.contains_key("control") && map["control"] == "back".to_string() { + if let Ok(_) = kill_decoder(proc.decoder_term.clone()) { + let index = *play.index.lock().unwrap(); + + if index > 1 && play.current_list.lock().unwrap().len() > 1 { + info!("Move to last clip"); + let mut data_map = Map::new(); + let mut media = play.current_list.lock().unwrap()[index - 2].clone(); + *play.index.lock().unwrap() = index - 2; + media.add_probe(); + + let (delta, _) = get_delta(&media.begin.unwrap_or(0.0)); + *playout_stat.time_shift.lock().unwrap() = delta; + write_status(playout_stat.current_date.lock().unwrap().clone(), delta); + + data_map.insert("operation".to_string(), json!("Move to last clip")); + data_map.insert("media".to_string(), get_media_map(media)); + + return Ok(Value::Object(data_map)); + } + } + return Ok(Value::String("Move failed".to_string())); + } + + if map.contains_key("control") && map["control"] == "reset".to_string() { + *playout_stat.date.lock().unwrap() = String::new(); + *playout_stat.time_shift.lock().unwrap() = 0.0; + *playout_stat.list_init.lock().unwrap() = true; + + write_status(String::new().clone(), 0.0); + + if let Err(e) = kill_decoder(proc.decoder_term.clone()) { + error!("{e}"); + } + + return Ok(Value::String("Reset playout to original state".to_string())); + } + + if map.contains_key("media") && map["media"] == "current".to_string() { + if let Some(media) = play.current_media.lock().unwrap().clone() { + let data_map = get_data_map(config, media); + + return Ok(Value::Object(data_map)); + }; + } + } + _ => return Ok(Value::String(format!("Wrong parameters..."))), + } + + Ok(Value::String(format!("no parameters set..."))) + }); + + let server = ServerBuilder::new(io) + .cors(DomainsValidation::AllowOnly(vec![ + AccessControlAllowOrigin::Null, + ])) + .request_middleware(|request: hyper::Request| { + if request.headers().contains_key("authorization") + && request.headers()["authorization"] == config.rpc_server.authorization + { + if request.uri() == "/status" { + println!("{:?}", request.headers().contains_key("authorization")); + Response::ok("Server running OK.").into() + } else { + request.into() + } + } else { + Response::bad_request("No authorization header or valid key found!").into() + } + }) + .rest_api(RestApi::Secure) + .start_http(&config.rpc_server.address.parse().unwrap()) + .expect("Unable to start RPC server"); + + *proc_control.rpc_handle.lock().unwrap() = Some(server.close_handle().clone()); + + server.wait(); +}