Merge pull request #9 from jb-alvarado/main

JSON RPC
This commit is contained in:
jb-alvarado 2022-04-07 20:45:49 +02:00 committed by GitHub
commit 0bfedc90fe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 1132 additions and 293 deletions

390
Cargo.lock generated
View File

@ -46,6 +46,21 @@ version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
name = "bstr"
version = "0.2.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba3569f383e8f1598449f1a423e72e99569137b47740b1da11ef19af3d5c3223"
dependencies = [
"memchr",
]
[[package]]
name = "bytes"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8"
[[package]]
name = "cc"
version = "1.0.73"
@ -152,12 +167,13 @@ dependencies = [
[[package]]
name = "ffplayout-rs"
version = "0.8.1"
version = "0.9.0"
dependencies = [
"chrono",
"clap",
"ffprobe",
"file-rotate",
"jsonrpc-http-server",
"lettre",
"log",
"notify",
@ -219,6 +235,12 @@ dependencies = [
"miniz_oxide",
]
[[package]]
name = "fnv"
version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
[[package]]
name = "foreign-types"
version = "0.3.2"
@ -269,18 +291,71 @@ version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7"
[[package]]
name = "futures"
version = "0.3.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f73fe65f54d1e12b726f517d3e2135ca3125a437b6d998caf1962961f7172d9e"
dependencies = [
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-channel"
version = "0.3.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c3083ce4b914124575708913bca19bfe887522d6e2e6d0952943f5eac4a74010"
dependencies = [
"futures-core",
"futures-sink",
]
[[package]]
name = "futures-core"
version = "0.3.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c09fd04b7e4073ac7156a9539b57a484a8ea920f79c7c675d05d289ab6110d3"
[[package]]
name = "futures-executor"
version = "0.3.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9420b90cfa29e327d0429f19be13e7ddb68fa1cccb09d65e5706b8c7a749b8a6"
dependencies = [
"futures-core",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-io"
version = "0.3.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc4045962a5a5e935ee2fdedaa4e08284547402885ab326734432bed5d12966b"
[[package]]
name = "futures-macro"
version = "0.3.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "33c1e13800337f4d4d7a316bf45a567dbcb6ffe087f16424852d97e97a91f512"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "futures-sink"
version = "0.3.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "21163e139fa306126e6eedaf49ecdb4588f939600f0b1e770f4205ee4b7fa868"
[[package]]
name = "futures-task"
version = "0.3.21"
@ -293,8 +368,11 @@ version = "0.3.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d8b7abd5d659d9b90c8cba917f6ec750a74e2dc23902ef9cd4cc8c8b22e6036a"
dependencies = [
"futures-channel",
"futures-core",
"futures-io",
"futures-macro",
"futures-sink",
"futures-task",
"memchr",
"pin-project-lite",
@ -310,7 +388,20 @@ checksum = "9be70c98951c83b8d2f8f60d7065fa6d5146873094452a1008da8c2f1e4205ad"
dependencies = [
"cfg-if 1.0.0",
"libc",
"wasi",
"wasi 0.10.2+wasi-snapshot-preview1",
]
[[package]]
name = "globset"
version = "0.4.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "10463d9ff00a2a068db14231982f5132edebad0d7660cd956a1c30292dbcbfbd"
dependencies = [
"aho-corasick",
"bstr",
"fnv",
"log",
"regex",
]
[[package]]
@ -345,12 +436,63 @@ dependencies = [
"winapi 0.3.9",
]
[[package]]
name = "http"
version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "31f4c6746584866f0feabcc69893c5b51beef3831656a968ed7ae254cdc4fd03"
dependencies = [
"bytes",
"fnv",
"itoa",
]
[[package]]
name = "http-body"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ff4f84919677303da5f147645dbea6b1881f368d03ac84e1dc09031ebd7b2c6"
dependencies = [
"bytes",
"http",
"pin-project-lite",
]
[[package]]
name = "httparse"
version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9100414882e15fb7feccb4897e5f0ff0ff1ca7d1a86a23208ada4d7a18e6c6c4"
[[package]]
name = "httpdate"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421"
[[package]]
name = "hyper"
version = "0.14.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b26ae0a80afebe130861d90abf98e3814a4f28a4c6ffeb5ab8ebb2be311e0ef2"
dependencies = [
"bytes",
"futures-channel",
"futures-core",
"futures-util",
"http",
"http-body",
"httparse",
"httpdate",
"itoa",
"pin-project-lite",
"socket2",
"tokio",
"tower-service",
"tracing",
"want",
]
[[package]]
name = "idna"
version = "0.2.3"
@ -416,6 +558,55 @@ version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1aab8fc367588b89dcee83ab0fd66b72b50b72fa1904d7095045ace2b0c81c35"
[[package]]
name = "jsonrpc-core"
version = "18.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "14f7f76aef2d054868398427f6c54943cf3d1caa9a7ec7d0c38d69df97a965eb"
dependencies = [
"futures",
"futures-executor",
"futures-util",
"log",
"serde",
"serde_derive",
"serde_json",
]
[[package]]
name = "jsonrpc-http-server"
version = "18.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e1dea6e07251d9ce6a552abfb5d7ad6bc290a4596c8dcc3d795fae2bbdc1f3ff"
dependencies = [
"futures",
"hyper",
"jsonrpc-core",
"jsonrpc-server-utils",
"log",
"net2",
"parking_lot",
"unicase",
]
[[package]]
name = "jsonrpc-server-utils"
version = "18.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fa4fdea130485b572c39a460d50888beb00afb3e35de23ccd7fad8ff19f0e0d4"
dependencies = [
"bytes",
"futures",
"globset",
"jsonrpc-core",
"lazy_static",
"log",
"tokio",
"tokio-stream",
"tokio-util",
"unicase",
]
[[package]]
name = "kernel32-sys"
version = "0.2.2"
@ -461,9 +652,9 @@ dependencies = [
[[package]]
name = "libc"
version = "0.2.121"
version = "0.2.122"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "efaa7b300f3b5fe8eb6bf21ce3895e1751d9665086af2d64b42f19701015ff4f"
checksum = "ec647867e2bf0772e28c8bcde4f0d19a9216916e890543b5a03ed8ef27b8f259"
[[package]]
name = "linked-hash-map"
@ -471,6 +662,16 @@ version = "0.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7fb9b38af92608140b86b693604b9ffcc5824240a484d1ecd4795bacb2fe88f3"
[[package]]
name = "lock_api"
version = "0.4.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "327fa5b6a6940e4699ec49a9beae1ea4845c6bab9314e4f84ac68742139d8c53"
dependencies = [
"autocfg",
"scopeguard",
]
[[package]]
name = "log"
version = "0.4.16"
@ -533,12 +734,26 @@ dependencies = [
"kernel32-sys",
"libc",
"log",
"miow",
"miow 0.2.2",
"net2",
"slab",
"winapi 0.2.8",
]
[[package]]
name = "mio"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "52da4364ffb0e4fe33a9841a98a3f3014fb964045ce4f7a45a398243c8d6b0c9"
dependencies = [
"libc",
"log",
"miow 0.3.7",
"ntapi",
"wasi 0.11.0+wasi-snapshot-preview1",
"winapi 0.3.9",
]
[[package]]
name = "mio-extras"
version = "2.0.6"
@ -547,7 +762,7 @@ checksum = "52403fe290012ce777c4626790c8951324a2b9e3316b3143779c72b029742f19"
dependencies = [
"lazycell",
"log",
"mio",
"mio 0.6.23",
"slab",
]
@ -563,6 +778,15 @@ dependencies = [
"ws2_32-sys",
]
[[package]]
name = "miow"
version = "0.3.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b9f1c5b025cda876f66ef43a113f91ebc9f4ccef34843000e0adf6ebbab84e21"
dependencies = [
"winapi 0.3.9",
]
[[package]]
name = "native-tls"
version = "0.2.10"
@ -614,12 +838,21 @@ dependencies = [
"fsevent-sys",
"inotify",
"libc",
"mio",
"mio 0.6.23",
"mio-extras",
"walkdir",
"winapi 0.3.9",
]
[[package]]
name = "ntapi"
version = "0.3.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c28774a7fd2fbb4f0babd8237ce554b73af68021b5f695a3cebd6c59bac0980f"
dependencies = [
"winapi 0.3.9",
]
[[package]]
name = "num-integer"
version = "0.1.44"
@ -713,6 +946,31 @@ version = "1.5.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c69d19a208bba8b94bd27d4b7a06ad153cddc6b88cb2149a668e23ce7bdb67d5"
[[package]]
name = "parking_lot"
version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99"
dependencies = [
"instant",
"lock_api",
"parking_lot_core",
]
[[package]]
name = "parking_lot_core"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d76e8e1493bcac0d2766c42737f34458f1c8c50c0d23bcb24ea953affb273216"
dependencies = [
"cfg-if 1.0.0",
"instant",
"libc",
"redox_syscall",
"smallvec",
"winapi 0.3.9",
]
[[package]]
name = "pin-project-lite"
version = "0.2.8"
@ -763,9 +1021,9 @@ dependencies = [
[[package]]
name = "proc-macro2"
version = "1.0.36"
version = "1.0.37"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c7342d5883fbccae1cc37a2353b09c87c9b0f3afd73f5fb9bba687a1f733b029"
checksum = "ec757218438d5fda206afc041538b2f6d889286160d649a86a24d37e1235afd1"
dependencies = [
"unicode-xid",
]
@ -885,6 +1143,12 @@ dependencies = [
"winapi 0.3.9",
]
[[package]]
name = "scopeguard"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
[[package]]
name = "security-framework"
version = "2.6.1"
@ -975,6 +1239,22 @@ version = "0.4.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eb703cfe953bccee95685111adeedb76fabe4e97549a58d16f03ea7b9367bb32"
[[package]]
name = "smallvec"
version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f2dd574626839106c320a323308629dcb1acfc96e32a8cba364ddc61ac23ee83"
[[package]]
name = "socket2"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "66d72b759436ae32898a2af0a14218dbf55efde3feeb170eb623637db85ee1e0"
dependencies = [
"libc",
"winapi 0.3.9",
]
[[package]]
name = "strsim"
version = "0.10.0"
@ -983,9 +1263,9 @@ checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623"
[[package]]
name = "syn"
version = "1.0.90"
version = "1.0.91"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "704df27628939572cd88d33f171cd6f896f4eaca85252c6e0a72d8d8287ee86f"
checksum = "b683b2b825c8eef438b77c36a06dc262294da3d5a5813fac20da149241dcd44d"
dependencies = [
"proc-macro2",
"quote",
@ -1052,8 +1332,80 @@ version = "1.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2af73ac49756f3f7c01172e34a23e5d0216f6c32333757c2c61feb2bbff5a5ee"
dependencies = [
"bytes",
"libc",
"memchr",
"mio 0.8.2",
"num_cpus",
"pin-project-lite",
"socket2",
"winapi 0.3.9",
]
[[package]]
name = "tokio-stream"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "50145484efff8818b5ccd256697f36863f587da82cf8b409c53adf1e840798e3"
dependencies = [
"futures-core",
"pin-project-lite",
"tokio",
]
[[package]]
name = "tokio-util"
version = "0.6.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e99e1983e5d376cd8eb4b66604d2e99e79f5bd988c3055891dcd8c9e2604cc0"
dependencies = [
"bytes",
"futures-core",
"futures-sink",
"log",
"pin-project-lite",
"tokio",
]
[[package]]
name = "tower-service"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "360dfd1d6d30e05fda32ace2c8c70e9c0a9da713275777f5a4dbb8a1893930c6"
[[package]]
name = "tracing"
version = "0.1.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4a1bdf54a7c28a2bbf701e1d2233f6c77f473486b94bee4f9678da5a148dca7f"
dependencies = [
"cfg-if 1.0.0",
"pin-project-lite",
"tracing-core",
]
[[package]]
name = "tracing-core"
version = "0.1.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "90442985ee2f57c9e1b548ee72ae842f4a9a20e3f417cc38dbc5dc684d9bb4ee"
dependencies = [
"lazy_static",
]
[[package]]
name = "try-lock"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642"
[[package]]
name = "unicase"
version = "2.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "50f37be617794602aabbeee0be4f259dc1778fabe05e2d67ee8f79326d5cb4f6"
dependencies = [
"version_check",
]
[[package]]
@ -1100,12 +1452,28 @@ dependencies = [
"winapi-util",
]
[[package]]
name = "want"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ce8a968cb1cd110d136ff8b819a556d6fb6d919363c61534f6860c7eb172ba0"
dependencies = [
"log",
"try-lock",
]
[[package]]
name = "wasi"
version = "0.10.2+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6"
[[package]]
name = "wasi"
version = "0.11.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
[[package]]
name = "winapi"
version = "0.2.8"

View File

@ -1,6 +1,6 @@
[package]
name = "ffplayout-rs"
version = "0.8.1"
version = "0.9.0"
edition = "2021"
[dependencies]
@ -8,6 +8,7 @@ chrono = "0.4"
clap = { version = "3.1", features = ["derive"] }
ffprobe = "0.3"
file-rotate = "0.6"
jsonrpc-http-server = "18.0"
lettre = "0.10.0-rc.5"
log = "0.4"
notify = "4.0"

View File

@ -1,5 +1,5 @@
general:
helptext: Sometimes it can happen, that a file is corrupt but still playable,
help_text: Sometimes it can happen, that a file is corrupt but still playable,
this can produce an streaming error over all following files. The only way
in this case is, to stop ffplayout and start it again. Here we only say when
it stops, the starting process is in your hand. Best way is a systemd service
@ -7,10 +7,17 @@ general:
value. A number below 3 can cause unexpected errors.
stop_threshold: 11
rpc_server:
help_text: Run a JSON RPC server, for getting infos about current playing, and
control for some functions.
enable: true
address: 127.0.0.1:7070
authorization: av2Kx8g67lF9qj5wEH3ym1bI4cCs
mail:
helptext: Send error messages to email address, like missing playlist; invalid
help_text: Send error messages to email address, like missing playlist; invalid
json format; missing clip path. Leave recipient blank, if you don't need this.
'mail_level' can be WARNING or ERROR.
'mail_level' can be INFO, WARNING or ERROR. 'interval' means seconds until a new mail will be sended.
subject: "Playout Error"
smtp_server: "mail.example.org"
starttls: true
@ -18,9 +25,10 @@ mail:
sender_pass: "abc123"
recipient:
mail_level: "ERROR"
interval: 30
logging:
helptext: Logging to file, if 'log_to_file' false log to console. 'backup_count'
help_text: Logging to file, if 'log_to_file' false log to console. 'backup_count'
says how long log files will be saved in days. 'local_time' to false will set
log timestamps to UTC. Path to /var/log/ only if you run this program as daemon.
'log_level' can be DEBUG, INFO, WARNING, ERROR. 'ffmpeg_level' can be info,
@ -34,7 +42,7 @@ logging:
ffmpeg_level: "error"
processing:
helptext: Default processing, for all clips that they get prepared in that way,
help_text: Default processing, for all clips that they get prepared in that way,
so the output is unique. Set playing mode, like playlist, or folder.
'aspect' must be a float number. 'logo' is only used if the path exist.
'logo_scale' scale the logo to target size, leave it blank when no scaling
@ -60,7 +68,7 @@ processing:
volume: 1
ingest:
helptext: Works not with direct hls output, it always needs full processing! Run a server
help_text: Works not with direct hls output, it always needs full processing! Run a server
for a ingest stream. This stream will override the normal streaming until is done.
There is no authentication, this is up to you. The recommend way is to set address to
localhost, stream to a local server with authentication and from there stream to this app.
@ -68,7 +76,7 @@ ingest:
input_param: -f live_flv -listen 1 -i rtmp://localhost:1936/live/stream
playlist:
helptext: >
help_text: >
'path' can be a path to a single file, or a directory. For directory put
only the root folder, for example '/playlists', subdirectories are read by the
script. Subdirectories needs this structure '/playlists/2018/01'. 'day_start'
@ -82,7 +90,7 @@ playlist:
infinit: false
storage:
helptext: Play ordered or randomly files from path. 'filler_clip' is for fill
help_text: Play ordered or randomly files from path. 'filler_clip' is for fill
the end to reach 24 hours, it will loop when is necessary. 'extensions' search
only files with this extension. Set 'shuffle' to 'True' to pick files randomly.
path: "/mediaStorage"
@ -93,7 +101,7 @@ storage:
shuffle: true
text:
helptext: Overlay text in combination with libzmq for remote text manipulation.
help_text: Overlay text in combination with libzmq for remote text manipulation.
On windows fontfile path need to be like this 'C\:/WINDOWS/fonts/DejaVuSans.ttf'.
In a standard environment the filter drawtext node is Parsed_drawtext_2.
'over_pre' if True text will be overlay in pre processing. Continue same text
@ -110,7 +118,7 @@ text:
regex: ^.+[/\\](.*)(.mp4|.mkv)$
out:
helptext: The final playout compression. Set the settings to your needs.
help_text: The final playout compression. Set the settings to your needs.
'mode' has the standard options 'desktop', 'hls', 'stream'. Self made
outputs can be define, by adding script in output folder with an 'output' function
inside. 'preview' works only in streaming output and creates a separate preview stream.

View File

@ -122,7 +122,7 @@ fn scale(width: i64, height: i64, aspect: f64, chain: &mut Filters, config: &Glo
}
fn fade(node: &mut Media, chain: &mut Filters, codec_type: String) {
let mut t = "".to_string();
let mut t = String::new();
if codec_type == "audio".to_string() {
t = "a".to_string()
@ -291,9 +291,9 @@ fn realtime_filter(
config: &GlobalConfig,
codec_type: String,
) {
//this realtime filter is important for HLS output to stay in sync
// this realtime filter is important for HLS output to stay in sync
let mut t = "".to_string();
let mut t = String::new();
if codec_type == "audio".to_string() {
t = "a".to_string()
@ -354,15 +354,15 @@ pub fn filter_chains(node: &mut Media) -> Vec<String> {
add_text(node, &mut filters, &config);
fade(node, &mut filters, "video".into());
overlay(node, &mut filters, &config);
realtime_filter(node, &mut filters, &config, "video".into());
realtime_filter(node, &mut filters, &config, "video".into());
add_loudnorm(node, &mut filters, &config);
fade(node, &mut filters, "audio".into());
audio_volume(&mut filters, &config);
realtime_filter(node, &mut filters, &config, "audio".into());
realtime_filter(node, &mut filters, &config, "audio".into());
let mut filter_cmd = vec![];
let mut filter_str: String = "".to_string();
let mut filter_str: String = String::new();
let mut filter_map: Vec<String> = vec![];
if filters.video_chain.is_some() {

View File

@ -12,19 +12,24 @@ use std::{
use walkdir::WalkDir;
use crate::utils::{GlobalConfig, Media};
use crate::utils::{get_sec, GlobalConfig, Media};
#[derive(Debug, Clone)]
pub struct Source {
config: GlobalConfig,
pub nodes: Arc<Mutex<Vec<String>>>,
index: usize,
pub nodes: Arc<Mutex<Vec<Media>>>,
current_node: Media,
index: Arc<Mutex<usize>>,
}
impl Source {
pub fn new() -> Self {
pub fn new(
current_list: Arc<Mutex<Vec<Media>>>,
global_index: Arc<Mutex<usize>>,
) -> Self {
let config = GlobalConfig::global();
let mut file_list = vec![];
let mut media_list = vec![];
let mut index: usize = 0;
for entry in WalkDir::new(config.storage.path.clone())
.into_iter()
@ -40,7 +45,8 @@ impl Source {
.clone()
.contains(&ext.unwrap().to_lowercase())
{
file_list.push(entry.path().display().to_string());
let media = Media::new(0, entry.path().display().to_string(), false);
media_list.push(media);
}
}
}
@ -48,25 +54,51 @@ impl Source {
if config.storage.shuffle {
info!("Shuffle files");
let mut rng = thread_rng();
file_list.shuffle(&mut rng);
media_list.shuffle(&mut rng);
} else {
file_list.sort();
media_list.sort_by(|d1, d2| d1.source.cmp(&d2.source));
}
for item in media_list.iter_mut() {
item.index = Some(index);
index += 1;
}
*current_list.lock().unwrap() = media_list;
Self {
config: config.clone(),
nodes: Arc::new(Mutex::new(file_list)),
index: 0,
nodes: current_list,
current_node: Media::new(0, String::new(), false),
index: global_index,
}
}
fn shuffle(&mut self) {
let mut rng = thread_rng();
self.nodes.lock().unwrap().shuffle(&mut rng);
let mut index: usize = 0;
for item in self.nodes.lock().unwrap().iter_mut() {
item.index = Some(index);
index += 1;
}
}
fn sort(&mut self) {
self.nodes.lock().unwrap().sort();
self.nodes
.lock()
.unwrap()
.sort_by(|d1, d2| d1.source.cmp(&d2.source));
let mut index: usize = 0;
for item in self.nodes.lock().unwrap().iter_mut() {
item.index = Some(index);
index += 1;
}
}
}
@ -74,14 +106,16 @@ impl Iterator for Source {
type Item = Media;
fn next(&mut self) -> Option<Self::Item> {
if self.index < self.nodes.lock().unwrap().len() {
let current_file = self.nodes.lock().unwrap()[self.index].clone();
let mut media = Media::new(self.index, current_file);
media.add_probe();
media.add_filter();
self.index += 1;
if *self.index.lock().unwrap() < self.nodes.lock().unwrap().len() {
let i = *self.index.lock().unwrap();
self.current_node = self.nodes.lock().unwrap()[i].clone();
self.current_node.add_probe();
self.current_node.add_filter();
self.current_node.begin = Some(get_sec());
Some(media)
*self.index.lock().unwrap() += 1;
Some(self.current_node.clone())
} else {
if self.config.storage.shuffle {
info!("Shuffle files");
@ -91,13 +125,14 @@ impl Iterator for Source {
self.sort();
}
let current_file = self.nodes.lock().unwrap()[0].clone();
let mut media = Media::new(self.index, current_file);
media.add_probe();
media.add_filter();
self.index = 1;
self.current_node = self.nodes.lock().unwrap()[0].clone();
self.current_node.add_probe();
self.current_node.add_filter();
self.current_node.begin = Some(get_sec());
Some(media)
*self.index.lock().unwrap() = 1;
Some(self.current_node.clone())
}
}
}
@ -106,31 +141,37 @@ fn file_extension(filename: &Path) -> Option<&str> {
filename.extension().and_then(OsStr::to_str)
}
pub async fn watch_folder(
pub async fn file_worker(
receiver: Receiver<notify::DebouncedEvent>,
sources: Arc<Mutex<Vec<String>>>,
sources: Arc<Mutex<Vec<Media>>>,
) {
while let Ok(res) = receiver.recv() {
match res {
Create(new_path) => {
sources.lock().unwrap().push(new_path.display().to_string());
let index = sources.lock().unwrap().len();
let media = Media::new(index, new_path.display().to_string(), false);
sources.lock().unwrap().push(media);
info!("Create new file: {:?}", new_path);
}
Remove(old_path) => {
sources
.lock()
.unwrap()
.retain(|x| x != &old_path.display().to_string());
.retain(|x| x.source != old_path.display().to_string());
info!("Remove file: {:?}", old_path);
}
Rename(old_path, new_path) => {
let i = sources
let index = sources
.lock()
.unwrap()
.iter()
.position(|x| *x == old_path.display().to_string())
.position(|x| *x.source == old_path.display().to_string())
.unwrap();
sources.lock().unwrap()[i] = new_path.display().to_string();
let media = Media::new(index, new_path.display().to_string(), false);
sources.lock().unwrap()[index] = media;
info!("Rename file: {:?} to {:?}", old_path, new_path);
}
_ => (),

View File

@ -2,16 +2,16 @@ use std::{
io::{BufReader, Error, Read},
path::Path,
process::{Command, Stdio},
sync::{mpsc::SyncSender, Arc, Mutex},
sync::{mpsc::SyncSender},
thread::sleep,
time::Duration,
};
use process_control::{ChildExt, Terminator};
use process_control::ChildExt;
use simplelog::*;
use tokio::runtime::Handle;
use crate::utils::{stderr_reader, GlobalConfig};
use crate::utils::{stderr_reader, GlobalConfig, ProcessControl};
fn overlay(config: &GlobalConfig) -> String {
let mut logo_chain = String::new();
@ -57,9 +57,7 @@ pub async fn ingest_server(
log_format: String,
ingest_sender: SyncSender<(usize, [u8; 65088])>,
rt_handle: Handle,
proc_terminator: Arc<Mutex<Option<Terminator>>>,
is_terminated: Arc<Mutex<bool>>,
server_is_running: Arc<Mutex<bool>>,
proc_control: ProcessControl,
) -> Result<(), Error> {
let config = GlobalConfig::global();
let mut buffer: [u8; 65088] = [0; 65088];
@ -101,7 +99,7 @@ pub async fn ingest_server(
debug!("Server CMD: <bright-blue>\"ffmpeg {}\"</>", server_cmd.join(" "));
loop {
if *is_terminated.lock().unwrap() {
if *proc_control.is_terminated.lock().unwrap() {
break;
}
let mut server_proc = match Command::new("ffmpeg")
@ -118,7 +116,7 @@ pub async fn ingest_server(
};
let serv_terminator = server_proc.terminator()?;
*proc_terminator.lock().unwrap() = Some(serv_terminator);
*proc_control.server_term.lock().unwrap() = Some(serv_terminator);
rt_handle.spawn(stderr_reader(
server_proc.stderr.take().unwrap(),
@ -139,7 +137,7 @@ pub async fn ingest_server(
};
if !is_running {
*server_is_running.lock().unwrap() = true;
*proc_control.server_is_running.lock().unwrap() = true;
is_running = true;
}
@ -147,7 +145,7 @@ pub async fn ingest_server(
if let Err(e) = ingest_sender.send((bytes_len, buffer)) {
error!("Ingest server write error: {:?}", e);
*is_terminated.lock().unwrap() = true;
*proc_control.is_terminated.lock().unwrap() = true;
break;
}
} else {
@ -155,7 +153,7 @@ pub async fn ingest_server(
}
}
*server_is_running.lock().unwrap() = false;
*proc_control.server_is_running.lock().unwrap() = false;
sleep(Duration::from_secs(1));

View File

@ -3,5 +3,5 @@ pub mod ingest;
pub mod playlist;
pub use ingest::ingest_server;
pub use folder::{watch_folder, Source};
pub use folder::{file_worker, Source};
pub use playlist::CurrentProgram;

View File

@ -1,14 +1,18 @@
use std::{
fs,
path::Path,
sync::{Arc, Mutex},
thread::sleep,
time::Duration,
};
use serde_json::json;
use simplelog::*;
use tokio::runtime::Handle;
use crate::utils::{
check_sync, gen_dummy, get_delta, get_sec, is_close, json_reader::read_json,
modified_time, seek_and_length, GlobalConfig, Media, DUMMY_LEN,
check_sync, gen_dummy, get_delta, get_sec, is_close, json_reader::read_json, modified_time,
seek_and_length, GlobalConfig, Media, PlayoutStatus, DUMMY_LEN,
};
#[derive(Debug)]
@ -18,31 +22,50 @@ pub struct CurrentProgram {
json_mod: Option<String>,
json_path: Option<String>,
json_date: String,
nodes: Vec<Media>,
pub nodes: Arc<Mutex<Vec<Media>>>,
current_node: Media,
pub init: Arc<Mutex<bool>>,
index: usize,
index: Arc<Mutex<usize>>,
rt_handle: Handle,
is_terminated: Arc<Mutex<bool>>,
playout_stat: PlayoutStatus,
}
impl CurrentProgram {
pub fn new(rt_handle: Handle, is_terminated: Arc<Mutex<bool>>) -> Self {
pub fn new(
rt_handle: Handle,
playout_stat: PlayoutStatus,
is_terminated: Arc<Mutex<bool>>,
current_list: Arc<Mutex<Vec<Media>>>,
global_index: Arc<Mutex<usize>>,
) -> Self {
let config = GlobalConfig::global();
let json = read_json(None, rt_handle.clone(), is_terminated.clone(), true, 0.0);
*current_list.lock().unwrap() = json.program;
*playout_stat.current_date.lock().unwrap() = json.date.clone();
if *playout_stat.date.lock().unwrap() != json.date {
let data = json!({
"time_shift": 0.0,
"date": json.date,
});
let json: String = serde_json::to_string(&data).expect("Serialize status data failed");
fs::write(config.general.stat_file.clone(), &json).expect("Unable to write file");
}
Self {
config: config.clone(),
start_sec: json.start_sec.unwrap(),
json_mod: json.modified,
json_path: json.current_file,
json_date: json.date,
nodes: json.program,
current_node: Media::new(0, "".to_string()),
init: Arc::new(Mutex::new(true)),
index: 0,
nodes: current_list,
current_node: Media::new(0, String::new(), false),
index: global_index,
rt_handle,
is_terminated,
playout_stat,
}
}
@ -58,7 +81,7 @@ impl CurrentProgram {
self.json_path = json.current_file;
self.json_mod = json.modified;
self.nodes = json.program;
*self.nodes.lock().unwrap() = json.program;
} else if Path::new(&self.json_path.clone().unwrap()).is_file() {
let mod_time = modified_time(&self.json_path.clone().unwrap());
@ -82,26 +105,26 @@ impl CurrentProgram {
);
self.json_mod = json.modified;
self.nodes = json.program;
*self.nodes.lock().unwrap() = json.program;
self.get_current_clip();
self.index += 1;
*self.index.lock().unwrap() += 1;
}
} else {
error!(
"Playlist <b><magenta>{}</></b> not exists!",
self.json_path.clone().unwrap()
);
let mut media = Media::new(0, "".to_string());
let mut media = Media::new(0, String::new(), false);
media.begin = Some(get_sec());
media.duration = DUMMY_LEN;
media.out = DUMMY_LEN;
self.json_path = None;
self.nodes = vec![media.clone()];
*self.nodes.lock().unwrap() = vec![media.clone()];
self.current_node = media;
*self.init.lock().unwrap() = true;
self.index = 0;
*self.playout_stat.list_init.lock().unwrap() = true;
*self.index.lock().unwrap() = 0;
}
}
@ -130,35 +153,44 @@ impl CurrentProgram {
next_start,
);
let data = json!({
"time_shift": 0.0,
"date": json.date,
});
*self.playout_stat.current_date.lock().unwrap() = json.date.clone();
let status_data: String =
serde_json::to_string(&data).expect("Serialize status data failed");
fs::write(self.config.general.stat_file.clone(), &status_data)
.expect("Unable to write file");
self.json_path = json.current_file.clone();
self.json_mod = json.modified;
self.json_date = json.date;
self.nodes = json.program;
self.index = 0;
*self.nodes.lock().unwrap() = json.program;
*self.index.lock().unwrap() = 0;
if json.current_file.is_none() {
*self.init.lock().unwrap() = true;
*self.playout_stat.list_init.lock().unwrap() = true;
}
}
}
fn is_ad(&mut self, i: usize, next: bool) -> Option<bool> {
if next {
if i + 1 < self.nodes.len() && self.nodes[i + 1].category == "advertisement".to_string()
{
return Some(true);
} else {
return Some(false);
}
} else {
if i > 0
&& i < self.nodes.len()
&& self.nodes[i - 1].category == "advertisement".to_string()
{
return Some(true);
} else {
return Some(false);
}
fn last_next_ad(&mut self) {
let index = *self.index.lock().unwrap();
let current_list = self.nodes.lock().unwrap();
if index + 1 < current_list.len()
&& current_list[index + 1].category == "advertisement".to_string()
{
self.current_node.next_ad = Some(true);
}
if index > 0
&& index < current_list.len()
&& current_list[index - 1].category == "advertisement".to_string()
{
self.current_node.last_ad = Some(true);
}
}
@ -173,12 +205,20 @@ impl CurrentProgram {
}
fn get_current_clip(&mut self) {
let time_sec = self.get_current_time();
let mut time_sec = self.get_current_time();
for (i, item) in self.nodes.iter_mut().enumerate() {
if *self.playout_stat.current_date.lock().unwrap() == *self.playout_stat.date.lock().unwrap()
&& *self.playout_stat.time_shift.lock().unwrap() != 0.0
{
let shift = *self.playout_stat.time_shift.lock().unwrap();
info!("Shift playlist start for <yellow>{shift}</> seconds");
time_sec += shift;
}
for (i, item) in self.nodes.lock().unwrap().iter_mut().enumerate() {
if item.begin.unwrap() + item.out - item.seek > time_sec {
*self.init.lock().unwrap() = false;
self.index = i;
*self.playout_stat.list_init.lock().unwrap() = false;
*self.index.lock().unwrap() = i;
break;
}
@ -188,12 +228,13 @@ impl CurrentProgram {
fn init_clip(&mut self) {
self.get_current_clip();
if !*self.init.lock().unwrap() {
if !*self.playout_stat.list_init.lock().unwrap() {
let time_sec = self.get_current_time();
let index = *self.index.lock().unwrap();
// de-instance node to preserve original values in list
let mut node_clone = self.nodes[self.index].clone();
self.index += 1;
let mut node_clone = self.nodes.lock().unwrap()[index].clone();
*self.index.lock().unwrap() += 1;
node_clone.seek = time_sec - node_clone.begin.unwrap();
self.current_node = handle_list_init(node_clone);
@ -205,7 +246,7 @@ impl Iterator for CurrentProgram {
type Item = Media;
fn next(&mut self) -> Option<Self::Item> {
if *self.init.lock().unwrap() {
if *self.playout_stat.list_init.lock().unwrap() {
debug!("Playlist init");
self.check_update(true);
@ -213,14 +254,15 @@ impl Iterator for CurrentProgram {
self.init_clip();
}
if *self.init.lock().unwrap() {
if *self.playout_stat.list_init.lock().unwrap() {
// on init load playlist, could be not long enough,
// so we check if we can take the next playlist already,
// or we fill the gap with a dummy.
self.current_node = self.nodes[self.nodes.len() - 1].clone();
let list_length = self.nodes.lock().unwrap().len();
self.current_node = self.nodes.lock().unwrap()[list_length - 1].clone();
self.check_for_next_playlist();
let new_node = self.nodes[self.nodes.len() - 1].clone();
let new_node = self.nodes.lock().unwrap()[list_length - 1].clone();
let new_length = new_node.begin.unwrap() + new_node.duration;
if new_length
@ -235,41 +277,45 @@ impl Iterator for CurrentProgram {
if DUMMY_LEN > total_delta {
duration = total_delta;
*self.init.lock().unwrap() = false;
*self.playout_stat.list_init.lock().unwrap() = false;
}
if self.config.playlist.start_sec.unwrap() > current_time {
current_time += self.config.playlist.length_sec.unwrap() + 1.0;
}
let mut media = Media::new(0, "".to_string());
let mut media = Media::new(0, String::new(), false);
media.begin = Some(current_time);
media.duration = duration;
media.out = duration;
self.current_node = gen_source(media);
self.nodes.push(self.current_node.clone());
self.index = self.nodes.len();
self.nodes.lock().unwrap().push(self.current_node.clone());
*self.index.lock().unwrap() = self.nodes.lock().unwrap().len();
}
}
self.current_node.last_ad = self.is_ad(self.index, false);
self.current_node.next_ad = self.is_ad(self.index, true);
self.last_next_ad();
return Some(self.current_node.clone());
}
if self.index < self.nodes.len() {
if *self.index.lock().unwrap() < self.nodes.lock().unwrap().len() {
self.check_for_next_playlist();
let mut is_last = false;
let index = *self.index.lock().unwrap();
if self.index == self.nodes.len() - 1 {
if index == self.nodes.lock().unwrap().len() - 1 {
is_last = true
}
self.current_node = timed_source(self.nodes[self.index].clone(), &self.config, is_last);
self.current_node.last_ad = self.is_ad(self.index, false);
self.current_node.next_ad = self.is_ad(self.index, true);
self.index += 1;
self.current_node = timed_source(
self.nodes.lock().unwrap()[index].clone(),
&self.config,
is_last,
&self.playout_stat,
);
self.last_next_ad();
*self.index.lock().unwrap() += 1;
// update playlist should happen after current clip,
// to prevent unknown behaviors.
@ -277,17 +323,17 @@ impl Iterator for CurrentProgram {
Some(self.current_node.clone())
} else {
let last_playlist = self.json_path.clone();
let last_ad = self.current_node.last_ad.clone();
self.check_for_next_playlist();
let (_, total_delta) = get_delta(&self.config.playlist.start_sec.unwrap());
let mut last_ad = self.is_ad(self.index, false);
if last_playlist == self.json_path
&& total_delta.abs() > self.config.general.stop_threshold
{
// Test if playlist is to early finish,
// and if we have to fill it with a placeholder.
self.index += 1;
self.current_node = Media::new(self.index, "".to_string());
let index = *self.index.lock().unwrap();
self.current_node = Media::new(index, String::new(), false);
self.current_node.begin = Some(get_sec());
let mut duration = total_delta.abs();
@ -297,39 +343,60 @@ impl Iterator for CurrentProgram {
self.current_node.duration = duration;
self.current_node.out = duration;
self.current_node = gen_source(self.current_node.clone());
self.nodes.push(self.current_node.clone());
self.nodes.lock().unwrap().push(self.current_node.clone());
self.last_next_ad();
last_ad = self.is_ad(self.index, false);
self.current_node.last_ad = last_ad;
self.current_node.add_filter();
*self.index.lock().unwrap() += 1;
return Some(self.current_node.clone());
}
self.current_node = gen_source(self.nodes[0].clone());
*self.index.lock().unwrap() = 0;
self.current_node =
gen_source(self.nodes.lock().unwrap()[0].clone());
self.last_next_ad();
self.current_node.last_ad = last_ad;
self.current_node.next_ad = self.is_ad(0, true);
self.index = 1;
*self.index.lock().unwrap() = 1;
Some(self.current_node.clone())
}
}
}
fn timed_source(node: Media, config: &GlobalConfig, last: bool) -> Media {
fn timed_source(
node: Media,
config: &GlobalConfig,
last: bool,
playout_stat: &PlayoutStatus,
) -> Media {
// prepare input clip
// check begin and length from clip
// return clip only if we are in 24 hours time range
let (delta, total_delta) = get_delta(&node.begin.unwrap());
let mut shifted_delta = delta;
let mut new_node = node.clone();
new_node.process = Some(false);
if config.playlist.length.contains(":") {
debug!("Delta: <yellow>{delta:.3}</>");
debug!("Total delta: <yellow>{total_delta:.3}</>");
let sync = check_sync(delta);
if *playout_stat.current_date.lock().unwrap() == *playout_stat.date.lock().unwrap()
&& *playout_stat.time_shift.lock().unwrap() != 0.0
{
sleep(Duration::from_millis(300));
shifted_delta = delta - *playout_stat.time_shift.lock().unwrap();
debug!("Delta: <yellow>{shifted_delta:.3}</>, shifted: <yellow>{delta:.3}</>");
} else {
debug!("Delta: <yellow>{shifted_delta:.3}</>");
}
debug!("Total time remaining: <yellow>{total_delta:.3}</>");
let sync = check_sync(shifted_delta);
if !sync {
new_node.cmd = None;

View File

@ -1,35 +1,83 @@
use std::{
path::PathBuf,
{fs, fs::File},
};
extern crate log;
extern crate simplelog;
use std::sync::{Arc, Mutex};
use serde::{Deserialize, Serialize};
use serde_json::json;
use simplelog::*;
use tokio::runtime::Builder;
mod filter;
mod input;
mod output;
mod utils;
use simplelog::*;
use tokio::runtime::Builder;
use crate::output::{player, write_hls};
use crate::utils::{init_config, init_logging, validate_ffmpeg, GlobalConfig};
use crate::utils::{
init_config, init_logging, run_rpc, validate_ffmpeg, GlobalConfig, PlayerControl,
PlayoutStatus, ProcessControl,
};
#[derive(Serialize, Deserialize)]
struct StatusData {
time_shift: f64,
date: String,
}
fn main() {
init_config();
let config = GlobalConfig::global();
let play_control = PlayerControl::new();
let playout_stat = PlayoutStatus::new();
let proc_control = ProcessControl::new();
if !PathBuf::from(config.general.stat_file.clone()).exists() {
let data = json!({
"time_shift": 0.0,
"date": String::new(),
});
let json: String = serde_json::to_string(&data).expect("Serialize status data failed");
fs::write(config.general.stat_file.clone(), &json).expect("Unable to write file");
} else {
let stat_file = File::options()
.read(true)
.write(false)
.open(&config.general.stat_file)
.expect("Could not open status file");
let data: StatusData =
serde_json::from_reader(stat_file).expect("Could not read status file.");
*playout_stat.time_shift.lock().unwrap() = data.time_shift;
*playout_stat.date.lock().unwrap() = data.date;
}
let runtime = Builder::new_multi_thread().enable_all().build().unwrap();
let rt_handle = runtime.handle();
let is_terminated: Arc<Mutex<bool>> = Arc::new(Mutex::new(false));
let logging = init_logging(rt_handle.clone(), is_terminated.clone());
let logging = init_logging(rt_handle.clone(), proc_control.is_terminated.clone());
CombinedLogger::init(logging).unwrap();
validate_ffmpeg();
if config.out.mode.to_lowercase() == "hls".to_string() {
write_hls(rt_handle, is_terminated);
} else {
player(rt_handle, is_terminated);
if config.rpc_server.enable {
rt_handle.spawn(run_rpc(
play_control.clone(),
playout_stat.clone(),
proc_control.clone(),
));
}
if config.out.mode.to_lowercase() == "hls".to_string() {
write_hls(rt_handle, play_control, playout_stat, proc_control);
} else {
player(rt_handle, play_control, playout_stat, proc_control);
}
info!("Playout done...");
}

View File

@ -29,7 +29,7 @@ pub fn output(log_format: String) -> process::Child {
);
let mut filter: String = "null,".to_string();
filter.push_str(v_drawtext::filter_node(&mut Media::new(0, "".to_string())).as_str());
filter.push_str(v_drawtext::filter_node(&mut Media::new(0, String::new(), false)).as_str());
enc_filter = vec!["-vf".to_string(), filter];
}

View File

@ -1,16 +1,3 @@
use std::{
process::{Command, Stdio},
sync::{
Arc, Mutex,
},
};
use simplelog::*;
use tokio::runtime::Handle;
use crate::output::source_generator;
use crate::utils::{sec_to_time, stderr_reader, GlobalConfig};
/*
This module write the files compression directly to a hls (m3u8) playlist,
without pre- and post-processing.
@ -30,18 +17,38 @@ out:
*/
pub fn write_hls(rt_handle: &Handle, is_terminated: Arc<Mutex<bool>>) {
use std::process::{Command, Stdio};
use simplelog::*;
use tokio::runtime::Handle;
use crate::output::source_generator;
use crate::utils::{
sec_to_time, stderr_reader, GlobalConfig, PlayerControl, PlayoutStatus, ProcessControl,
};
pub fn write_hls(
rt_handle: &Handle,
play_control: PlayerControl,
playout_stat: PlayoutStatus,
proc_control: ProcessControl,
) {
let config = GlobalConfig::global();
let dec_settings = config.out.clone().output_cmd.unwrap();
let ff_log_format = format!("level+{}", config.logging.ffmpeg_level.to_lowercase());
let (get_source, _) = source_generator(
let get_source = source_generator(
rt_handle,
config.clone(),
is_terminated.clone(),
play_control.current_list.clone(),
play_control.index.clone(),
playout_stat,
proc_control.is_terminated.clone(),
);
for node in get_source {
*play_control.current_media.lock().unwrap() = Some(node.clone());
let cmd = match node.cmd {
Some(cmd) => cmd,
None => break,

View File

@ -3,7 +3,7 @@ use std::{
io::{prelude::*, BufReader, BufWriter, Read},
path::Path,
process,
process::{Child, Command, Stdio},
process::{Command, Stdio},
sync::{
mpsc::{channel, sync_channel, Receiver, SyncSender},
Arc, Mutex,
@ -12,7 +12,7 @@ use std::{
time::Duration,
};
use process_control::Terminator;
use process_control::ChildExt;
use simplelog::*;
use tokio::runtime::Handle;
@ -22,71 +22,19 @@ mod stream;
pub use hls::write_hls;
use crate::input::{ingest_server, watch_folder, CurrentProgram, Source};
use crate::utils::{sec_to_time, stderr_reader, GlobalConfig, Media};
#[derive(Debug)]
struct ProcessCleanup {
server_term: Arc<Mutex<Option<Terminator>>>,
is_terminated: Arc<Mutex<bool>>,
enc_proc: Child,
is_alive: bool,
}
impl ProcessCleanup {
fn new(
server_term: Arc<Mutex<Option<Terminator>>>,
is_terminated: Arc<Mutex<bool>>,
enc_proc: Child,
) -> Self {
Self {
server_term,
is_terminated,
enc_proc,
is_alive: true,
}
}
}
impl ProcessCleanup {
fn kill(&mut self) {
*self.is_terminated.lock().unwrap() = true;
if self.is_alive {
if let Some(server) = &*self.server_term.lock().unwrap() {
unsafe {
if let Ok(_) = server.terminate() {
info!("Terminate ingest server done");
}
}
};
self.is_alive = false;
}
if let Ok(_) = self.enc_proc.kill() {
info!("Playout done...")
}
if let Err(e) = self.enc_proc.wait() {
error!("Encoder: {e}")
};
}
}
impl Drop for ProcessCleanup {
fn drop(&mut self) {
self.kill()
}
}
use crate::input::{file_worker, ingest_server, CurrentProgram, Source};
use crate::utils::{
sec_to_time, stderr_reader, GlobalConfig, Media, PlayerControl, PlayoutStatus, ProcessControl,
};
pub fn source_generator(
rt_handle: &Handle,
config: GlobalConfig,
current_list: Arc<Mutex<Vec<Media>>>,
index: Arc<Mutex<usize>>,
playout_stat: PlayoutStatus,
is_terminated: Arc<Mutex<bool>>,
) -> (Box<dyn Iterator<Item = Media>>, Arc<Mutex<bool>>) {
let mut init_playlist: Arc<Mutex<bool>> = Arc::new(Mutex::new(false));
) -> Box<dyn Iterator<Item = Media>> {
let get_source = match config.processing.clone().mode.as_str() {
"folder" => {
let path = config.storage.path.clone();
@ -97,24 +45,29 @@ pub fn source_generator(
info!("Playout in folder mode.");
let folder_source = Source::new();
let (sender, receiver) = channel();
let mut watcher = watcher(sender, Duration::from_secs(2)).unwrap();
let folder_source = Source::new(current_list, index);
watcher
let (sender, receiver) = channel();
let mut watchman = watcher(sender, Duration::from_secs(2)).unwrap();
watchman
.watch(path.clone(), RecursiveMode::Recursive)
.unwrap();
debug!("Monitor folder: <b><magenta>{}</></b>", path);
rt_handle.spawn(watch_folder(receiver, Arc::clone(&folder_source.nodes)));
rt_handle.spawn(file_worker(receiver, folder_source.nodes.clone()));
Box::new(folder_source) as Box<dyn Iterator<Item = Media>>
}
"playlist" => {
info!("Playout in playlist mode");
let program = CurrentProgram::new(rt_handle.clone(), is_terminated.clone());
init_playlist = program.init.clone();
let program = CurrentProgram::new(
rt_handle.clone(),
playout_stat,
is_terminated.clone(),
current_list,
index,
);
Box::new(program) as Box<dyn Iterator<Item = Media>>
}
@ -124,20 +77,30 @@ pub fn source_generator(
}
};
(get_source, init_playlist)
get_source
}
pub fn player(rt_handle: &Handle, is_terminated: Arc<Mutex<bool>>) {
pub fn player(
rt_handle: &Handle,
play_control: PlayerControl,
playout_stat: PlayoutStatus,
proc_control: ProcessControl,
) {
let config = GlobalConfig::global();
let dec_settings = config.processing.clone().settings.unwrap();
let ff_log_format = format!("level+{}", config.logging.ffmpeg_level.to_lowercase());
let server_term: Arc<Mutex<Option<Terminator>>> = Arc::new(Mutex::new(None));
let server_is_running: Arc<Mutex<bool>> = Arc::new(Mutex::new(false));
let mut buffer: [u8; 65088] = [0; 65088];
let mut live_on = false;
let playlist_init = playout_stat.list_init.clone();
let (get_source, init_playlist) =
source_generator(rt_handle, config.clone(), is_terminated.clone());
let get_source = source_generator(
rt_handle,
config.clone(),
play_control.current_list.clone(),
play_control.index.clone(),
playout_stat,
proc_control.is_terminated.clone(),
);
let mut enc_proc = match config.out.mode.as_str() {
"desktop" => desktop::output(ff_log_format.clone()),
@ -162,16 +125,13 @@ pub fn player(rt_handle: &Handle, is_terminated: Arc<Mutex<bool>>) {
ff_log_format.clone(),
ingest_sender,
rt_handle.clone(),
server_term.clone(),
is_terminated.clone(),
server_is_running.clone(),
proc_control.clone(),
));
}
let mut proc_cleanup =
ProcessCleanup::new(server_term.clone(), is_terminated.clone(), enc_proc);
'source_iter: for node in get_source {
*play_control.current_media.lock().unwrap() = Some(node.clone());
let cmd = match node.cmd {
Some(cmd) => cmd,
None => break,
@ -222,8 +182,12 @@ pub fn player(rt_handle: &Handle, is_terminated: Arc<Mutex<bool>>) {
"Decoder".to_string(),
));
if let Ok(dec_terminator) = dec_proc.terminator() {
*proc_control.decoder_term.lock().unwrap() = Some(dec_terminator);
};
loop {
if *server_is_running.lock().unwrap() {
if *proc_control.server_is_running.lock().unwrap() {
if !live_on {
info!("Switch from {} to live ingest", config.processing.mode);
@ -241,7 +205,7 @@ pub fn player(rt_handle: &Handle, is_terminated: Arc<Mutex<bool>>) {
live_on = true;
*init_playlist.lock().unwrap() = true;
*playlist_init.lock().unwrap() = true;
}
if let Ok(receive) = ingest_receiver.try_recv() {
@ -290,5 +254,11 @@ pub fn player(rt_handle: &Handle, is_terminated: Arc<Mutex<bool>>) {
sleep(Duration::from_secs(1));
proc_cleanup.kill();
if let Err(e) = enc_proc.kill() {
panic!("Encoder error: {:?}", e)
};
if let Err(e) = enc_proc.wait() {
panic!("Encoder error: {:?}", e)
};
}

View File

@ -32,7 +32,7 @@ pub fn output(log_format: String) -> process::Child {
);
let mut filter: String = "[0:v]null,".to_string();
filter.push_str(v_drawtext::filter_node(&mut Media::new(0, "".to_string())).as_str());
filter.push_str(v_drawtext::filter_node(&mut Media::new(0, String::new(), false)).as_str());
if config.out.preview {
filter.push_str(",split=2[v_out1][v_out2]");

View File

@ -15,6 +15,7 @@ use crate::utils::{get_args, time_to_sec};
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct GlobalConfig {
pub general: General,
pub rpc_server: RpcServer,
pub mail: Mail,
pub logging: Logging,
pub processing: Processing,
@ -28,6 +29,16 @@ pub struct GlobalConfig {
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct General {
pub stop_threshold: f64,
#[serde(skip_serializing, skip_deserializing)]
pub stat_file: String,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct RpcServer {
pub enable: bool,
pub address: String,
pub authorization: String,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
@ -39,6 +50,7 @@ pub struct Mail {
pub sender_pass: String,
pub recipient: String,
pub mail_level: String,
pub interval: i32,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
@ -118,8 +130,6 @@ pub struct Out {
pub output_cmd: Option<Vec<String>>,
}
static INSTANCE: OnceCell<GlobalConfig> = OnceCell::new();
impl GlobalConfig {
fn new() -> Self {
let args = get_args();
@ -139,7 +149,8 @@ impl GlobalConfig {
Err(err) => {
println!(
"{:?} doesn't exists!\n{}\n\nSystem error: {err}",
config_path, "Put \"ffplayout.yml\" in \"/etc/playout/\" or beside the executable!"
config_path,
"Put \"ffplayout.yml\" in \"/etc/playout/\" or beside the executable!"
);
process::exit(0x0100);
}
@ -147,6 +158,10 @@ impl GlobalConfig {
let mut config: GlobalConfig =
serde_yaml::from_reader(f).expect("Could not read config file.");
config.general.stat_file = env::temp_dir()
.join("ffplayout_status.json")
.display()
.to_string();
let fps = config.processing.fps.to_string();
let bitrate = config.processing.width * config.processing.height / 10;
config.playlist.start_sec = Some(time_to_sec(&config.playlist.day_start));
@ -244,6 +259,8 @@ impl GlobalConfig {
}
}
static INSTANCE: OnceCell<GlobalConfig> = OnceCell::new();
fn pre_audio_codec(add_loudnorm: bool) -> Vec<String> {
// when add_loudnorm is False we use a different audio encoder,
// s302m has higher quality, but is experimental

View File

@ -23,7 +23,7 @@ pub struct Playlist {
impl Playlist {
fn new(date: String, start: f64) -> Self {
let mut media = Media::new(0, "".to_string());
let mut media = Media::new(0, String::new(), false);
media.begin = Some(start);
media.duration = DUMMY_LEN;
media.out = DUMMY_LEN;
@ -31,7 +31,7 @@ impl Playlist {
date,
start_sec: Some(start),
current_file: None,
modified: Some("".to_string()),
modified: Some(String::new()),
program: vec![media],
}
}

View File

@ -1,6 +1,7 @@
extern crate log;
extern crate simplelog;
use chrono::prelude::*;
use regex::Regex;
use std::{
path::Path,
@ -51,11 +52,15 @@ fn send_mail(msg: String) {
}
}
async fn mail_queue(messages: Arc<Mutex<Vec<String>>>, is_terminated: Arc<Mutex<bool>>) {
let mut count = 0;
async fn mail_queue(
messages: Arc<Mutex<Vec<String>>>,
is_terminated: Arc<Mutex<bool>>,
interval: i32,
) {
let mut count: i32 = 0;
loop {
if *is_terminated.lock().unwrap() || count == 60 {
if *is_terminated.lock().unwrap() || count == interval {
// check every 30 seconds for messages and send them
if messages.lock().unwrap().len() > 0 {
let msg = messages.lock().unwrap().join("\n");
@ -71,14 +76,14 @@ async fn mail_queue(messages: Arc<Mutex<Vec<String>>>, is_terminated: Arc<Mutex<
break;
}
sleep(Duration::from_millis(500));
sleep(Duration::from_secs(1));
count += 1;
}
}
pub struct LogMailer {
level: LevelFilter,
config: Config,
pub config: Config,
messages: Arc<Mutex<Vec<String>>>,
}
@ -103,21 +108,13 @@ impl Log for LogMailer {
fn log(&self, record: &Record<'_>) {
if self.enabled(record.metadata()) {
match record.level() {
Level::Error => {
self.messages
.lock()
.unwrap()
.push(record.args().to_string());
}
Level::Warn => {
self.messages
.lock()
.unwrap()
.push(record.args().to_string());
}
_ => (),
}
let local: DateTime<Local> = Local::now();
let time_stamp: String = local.format("[%Y-%m-%d %H:%M:%S%.3f]").to_string();
let level = record.level().to_string().to_uppercase();
let rec = record.args().to_string();
let full_line: String = format!("{time_stamp} [{level: >5}] {rec}");
self.messages.lock().unwrap().push(full_line);
}
}
@ -212,19 +209,24 @@ pub fn init_logging(
}
if config.mail.recipient.len() > 3 {
let mut filter = LevelFilter::Error;
let messages: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
let interval = config.mail.interval.clone();
rt_handle.spawn(mail_queue(messages.clone(), is_terminated.clone()));
rt_handle.spawn(mail_queue(
messages.clone(),
is_terminated.clone(),
interval,
));
let mail_config = log_config
.clone()
.set_time_format_str("[%Y-%m-%d %H:%M:%S%.3f]")
.build();
if config.mail.mail_level.to_lowercase() == "warning".to_string() {
filter = LevelFilter::Warn
}
let filter = match config.mail.mail_level.to_lowercase().as_str() {
"info" => LevelFilter::Info,
"warning" => LevelFilter::Warn,
_ => LevelFilter::Error,
};
app_logger.push(LogMailer::new(filter, mail_config, messages));
}

View File

@ -1,34 +1,146 @@
use chrono::prelude::*;
use chrono::Duration;
use ffprobe::{ffprobe, Format, Stream};
use serde::{Deserialize, Serialize};
use std::{
fs,
fs::metadata,
io::{BufRead, BufReader, Error},
path::Path,
process::exit,
process::{ChildStderr, Command, Stdio},
sync::{Arc, Mutex, RwLock},
time,
time::UNIX_EPOCH,
};
use jsonrpc_http_server::CloseHandle;
use process_control::Terminator;
use regex::Regex;
use simplelog::*;
use serde::{Deserialize, Serialize};
use serde_json::json;
mod arg_parse;
mod config;
pub mod json_reader;
mod json_validate;
mod logging;
mod rpc_server;
pub use arg_parse::get_args;
pub use config::{init_config, GlobalConfig};
pub use json_reader::{read_json, Playlist, DUMMY_LEN};
pub use json_validate::validate_playlist;
pub use logging::init_logging;
pub use rpc_server::run_rpc;
use crate::filter::filter_chains;
#[derive(Clone)]
pub struct ProcessControl {
pub decoder_term: Arc<Mutex<Option<Terminator>>>,
pub encoder_term: Arc<Mutex<Option<Terminator>>>,
pub server_term: Arc<Mutex<Option<Terminator>>>,
pub server_is_running: Arc<Mutex<bool>>,
pub rpc_handle: Arc<Mutex<Option<CloseHandle>>>,
pub is_terminated: Arc<Mutex<bool>>,
pub is_alive: Arc<RwLock<bool>>,
}
impl ProcessControl {
pub fn new() -> Self {
Self {
decoder_term: Arc::new(Mutex::new(None)),
encoder_term: Arc::new(Mutex::new(None)),
server_term: Arc::new(Mutex::new(None)),
server_is_running: Arc::new(Mutex::new(false)),
rpc_handle: Arc::new(Mutex::new(None)),
is_terminated: Arc::new(Mutex::new(false)),
is_alive: Arc::new(RwLock::new(true)),
}
}
}
impl ProcessControl {
pub fn kill_all(&mut self) {
*self.is_terminated.lock().unwrap() = true;
if *self.is_alive.read().unwrap() {
*self.is_alive.write().unwrap() = false;
if let Some(rpc) = &*self.rpc_handle.lock().unwrap() {
rpc.clone().close()
};
if let Some(server) = &*self.server_term.lock().unwrap() {
unsafe {
if let Err(e) = server.terminate() {
error!("Ingest server: {:?}", e);
}
}
};
if let Some(decoder) = &*self.decoder_term.lock().unwrap() {
unsafe {
if let Err(e) = decoder.terminate() {
error!("Decoder: {:?}", e);
}
}
};
if let Some(encoder) = &*self.encoder_term.lock().unwrap() {
unsafe {
if let Err(e) = encoder.terminate() {
error!("Encoder: {:?}", e);
}
}
};
}
}
}
impl Drop for ProcessControl {
fn drop(&mut self) {
self.kill_all()
}
}
#[derive(Clone, Debug)]
pub struct PlayoutStatus {
pub time_shift: Arc<Mutex<f64>>,
pub date: Arc<Mutex<String>>,
pub current_date: Arc<Mutex<String>>,
pub list_init: Arc<Mutex<bool>>,
}
impl PlayoutStatus {
pub fn new() -> Self {
Self {
time_shift: Arc::new(Mutex::new(0.0)),
date: Arc::new(Mutex::new(String::new())),
current_date: Arc::new(Mutex::new(String::new())),
list_init: Arc::new(Mutex::new(true)),
}
}
}
#[derive(Clone)]
pub struct PlayerControl {
pub current_media: Arc<Mutex<Option<Media>>>,
pub current_list: Arc<Mutex<Vec<Media>>>,
pub index: Arc<Mutex<usize>>,
}
impl PlayerControl {
pub fn new() -> Self {
Self {
current_media: Arc::new(Mutex::new(None)),
current_list: Arc::new(Mutex::new(vec![Media::new(0, String::new(), false)])),
index: Arc::new(Mutex::new(0)),
}
}
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Media {
pub begin: Option<f64>,
@ -48,11 +160,11 @@ pub struct Media {
}
impl Media {
pub fn new(index: usize, src: String) -> Self {
pub fn new(index: usize, src: String, do_probe: bool) -> Self {
let mut duration: f64 = 0.0;
let mut probe = None;
if Path::new(&src).is_file() {
if do_probe && Path::new(&src).is_file() {
probe = Some(MediaProbe::new(src.clone()));
duration = match probe.clone().unwrap().format.unwrap().duration {
@ -67,7 +179,7 @@ impl Media {
seek: 0.0,
out: duration,
duration: duration,
category: "".to_string(),
category: String::new(),
source: src.clone(),
cmd: Some(vec!["-i".to_string(), src]),
filter: Some(vec![]),
@ -79,7 +191,18 @@ impl Media {
}
pub fn add_probe(&mut self) {
self.probe = Some(MediaProbe::new(self.source.clone()))
let probe = MediaProbe::new(self.source.clone());
self.probe = Some(probe.clone());
if self.duration == 0.0 {
let duration = match probe.format.unwrap().duration {
Some(dur) => dur.parse().unwrap(),
None => 0.0,
};
self.out = duration;
self.duration = duration;
}
}
pub fn add_filter(&mut self) {
@ -151,6 +274,21 @@ impl MediaProbe {
}
}
pub fn write_status(date: String, shift: f64) {
let config = GlobalConfig::global();
let stat_file = config.general.stat_file.clone();
let data = json!({
"time_shift": shift,
"date": date,
});
let status_data: String = serde_json::to_string(&data)
.expect("Serialize status data failed");
fs::write(stat_file, &status_data)
.expect("Unable to write file");
}
// pub fn get_timestamp() -> i64 {
// let local: DateTime<Local> = Local::now();
@ -308,7 +446,7 @@ pub fn seek_and_length(src: String, seek: f64, out: f64, duration: f64) -> Vec<S
}
pub async fn stderr_reader(std_errors: ChildStderr, suffix: String) -> Result<(), Error> {
// read ffmpeg stderr decoder and encoder instance
// read ffmpeg stderr decoder, encoder and server instance
// and log the output
fn format_line(line: String, level: String) -> String {

174
src/utils/rpc_server.rs Normal file
View File

@ -0,0 +1,174 @@
use std::sync::{Arc, Mutex};
use jsonrpc_http_server::jsonrpc_core::{IoHandler, Params, Value};
use jsonrpc_http_server::{
hyper, AccessControlAllowOrigin, DomainsValidation, Response, RestApi, ServerBuilder,
};
use process_control::Terminator;
use serde_json::{json, Map};
use simplelog::*;
use crate::utils::{
get_delta, get_sec, sec_to_time, write_status, GlobalConfig, Media, PlayerControl,
PlayoutStatus, ProcessControl,
};
fn get_media_map(media: Media) -> Value {
json!({
"seek": media.seek,
"out": media.out,
"duration": media.duration,
"category": media.category,
"source": media.source,
})
}
fn get_data_map(config: &GlobalConfig, media: Media) -> Map<String, Value> {
let mut data_map = Map::new();
let begin = media.begin.unwrap_or(0.0);
data_map.insert("play_mode".to_string(), json!(config.processing.mode));
data_map.insert("index".to_string(), json!(media.index));
data_map.insert("start_sec".to_string(), json!(begin));
if begin > 0.0 {
let played_time = get_sec() - begin;
let remaining_time = media.out - played_time;
data_map.insert("start_time".to_string(), json!(sec_to_time(begin)));
data_map.insert("played_sec".to_string(), json!(played_time));
data_map.insert("remaining_sec".to_string(), json!(remaining_time));
}
data_map.insert("current_media".to_string(), get_media_map(media));
data_map
}
fn kill_decoder(terminator: Arc<Mutex<Option<Terminator>>>) -> Result<(), String> {
match &*terminator.lock().unwrap() {
Some(decoder) => unsafe {
if let Err(e) = decoder.terminate() {
return Err(format!("Terminate decoder: {e}"));
}
},
None => return Err("No decoder terminator found".to_string()),
}
Ok(())
}
pub async fn run_rpc(
play_control: PlayerControl,
playout_stat: PlayoutStatus,
proc_control: ProcessControl,
) {
let config = GlobalConfig::global();
let mut io = IoHandler::default();
let play = play_control.clone();
let proc = proc_control.clone();
io.add_sync_method("player", move |params: Params| {
match params {
Params::Map(map) => {
if map.contains_key("control") && map["control"] == "next".to_string() {
if let Ok(_) = kill_decoder(proc.decoder_term.clone()) {
info!("Move to next clip");
let index = *play.index.lock().unwrap();
if index < play.current_list.lock().unwrap().len() {
let mut data_map = Map::new();
let mut media = play.current_list.lock().unwrap()[index].clone();
media.add_probe();
let (delta, _) = get_delta(&media.begin.unwrap_or(0.0));
*playout_stat.time_shift.lock().unwrap() = delta;
write_status(playout_stat.current_date.lock().unwrap().clone(), delta);
data_map.insert("operation".to_string(), json!("Move to next clip"));
data_map.insert("media".to_string(), get_media_map(media));
return Ok(Value::Object(data_map));
}
}
return Ok(Value::String("Move failed".to_string()));
}
if map.contains_key("control") && map["control"] == "back".to_string() {
if let Ok(_) = kill_decoder(proc.decoder_term.clone()) {
let index = *play.index.lock().unwrap();
if index > 1 && play.current_list.lock().unwrap().len() > 1 {
info!("Move to last clip");
let mut data_map = Map::new();
let mut media = play.current_list.lock().unwrap()[index - 2].clone();
*play.index.lock().unwrap() = index - 2;
media.add_probe();
let (delta, _) = get_delta(&media.begin.unwrap_or(0.0));
*playout_stat.time_shift.lock().unwrap() = delta;
write_status(playout_stat.current_date.lock().unwrap().clone(), delta);
data_map.insert("operation".to_string(), json!("Move to last clip"));
data_map.insert("media".to_string(), get_media_map(media));
return Ok(Value::Object(data_map));
}
}
return Ok(Value::String("Move failed".to_string()));
}
if map.contains_key("control") && map["control"] == "reset".to_string() {
*playout_stat.date.lock().unwrap() = String::new();
*playout_stat.time_shift.lock().unwrap() = 0.0;
*playout_stat.list_init.lock().unwrap() = true;
write_status(String::new().clone(), 0.0);
if let Err(e) = kill_decoder(proc.decoder_term.clone()) {
error!("{e}");
}
return Ok(Value::String("Reset playout to original state".to_string()));
}
if map.contains_key("media") && map["media"] == "current".to_string() {
if let Some(media) = play.current_media.lock().unwrap().clone() {
let data_map = get_data_map(config, media);
return Ok(Value::Object(data_map));
};
}
}
_ => return Ok(Value::String(format!("Wrong parameters..."))),
}
Ok(Value::String(format!("no parameters set...")))
});
let server = ServerBuilder::new(io)
.cors(DomainsValidation::AllowOnly(vec![
AccessControlAllowOrigin::Null,
]))
.request_middleware(|request: hyper::Request<hyper::Body>| {
if request.headers().contains_key("authorization")
&& request.headers()["authorization"] == config.rpc_server.authorization
{
if request.uri() == "/status" {
println!("{:?}", request.headers().contains_key("authorization"));
Response::ok("Server running OK.").into()
} else {
request.into()
}
} else {
Response::bad_request("No authorization header or valid key found!").into()
}
})
.rest_api(RestApi::Secure)
.start_http(&config.rpc_server.address.parse().unwrap())
.expect("Unable to start RPC server");
*proc_control.rpc_handle.lock().unwrap() = Some(server.close_handle().clone());
server.wait();
}