work on json rpc server

This commit is contained in:
jb-alvarado 2022-04-05 17:07:34 +02:00
parent 1f9d9fd6d5
commit 8a68c9bd86
13 changed files with 648 additions and 176 deletions

378
Cargo.lock generated
View File

@ -46,6 +46,21 @@ version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
name = "bstr"
version = "0.2.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba3569f383e8f1598449f1a423e72e99569137b47740b1da11ef19af3d5c3223"
dependencies = [
"memchr",
]
[[package]]
name = "bytes"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8"
[[package]]
name = "cc"
version = "1.0.73"
@ -152,12 +167,13 @@ dependencies = [
[[package]]
name = "ffplayout-rs"
version = "0.8.1"
version = "0.9.0"
dependencies = [
"chrono",
"clap",
"ffprobe",
"file-rotate",
"jsonrpc-http-server",
"lettre",
"log",
"notify",
@ -219,6 +235,12 @@ dependencies = [
"miniz_oxide",
]
[[package]]
name = "fnv"
version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
[[package]]
name = "foreign-types"
version = "0.3.2"
@ -269,18 +291,71 @@ version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7"
[[package]]
name = "futures"
version = "0.3.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f73fe65f54d1e12b726f517d3e2135ca3125a437b6d998caf1962961f7172d9e"
dependencies = [
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-channel"
version = "0.3.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c3083ce4b914124575708913bca19bfe887522d6e2e6d0952943f5eac4a74010"
dependencies = [
"futures-core",
"futures-sink",
]
[[package]]
name = "futures-core"
version = "0.3.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c09fd04b7e4073ac7156a9539b57a484a8ea920f79c7c675d05d289ab6110d3"
[[package]]
name = "futures-executor"
version = "0.3.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9420b90cfa29e327d0429f19be13e7ddb68fa1cccb09d65e5706b8c7a749b8a6"
dependencies = [
"futures-core",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-io"
version = "0.3.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc4045962a5a5e935ee2fdedaa4e08284547402885ab326734432bed5d12966b"
[[package]]
name = "futures-macro"
version = "0.3.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "33c1e13800337f4d4d7a316bf45a567dbcb6ffe087f16424852d97e97a91f512"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "futures-sink"
version = "0.3.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "21163e139fa306126e6eedaf49ecdb4588f939600f0b1e770f4205ee4b7fa868"
[[package]]
name = "futures-task"
version = "0.3.21"
@ -293,8 +368,11 @@ version = "0.3.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d8b7abd5d659d9b90c8cba917f6ec750a74e2dc23902ef9cd4cc8c8b22e6036a"
dependencies = [
"futures-channel",
"futures-core",
"futures-io",
"futures-macro",
"futures-sink",
"futures-task",
"memchr",
"pin-project-lite",
@ -310,7 +388,20 @@ checksum = "9be70c98951c83b8d2f8f60d7065fa6d5146873094452a1008da8c2f1e4205ad"
dependencies = [
"cfg-if 1.0.0",
"libc",
"wasi",
"wasi 0.10.2+wasi-snapshot-preview1",
]
[[package]]
name = "globset"
version = "0.4.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "10463d9ff00a2a068db14231982f5132edebad0d7660cd956a1c30292dbcbfbd"
dependencies = [
"aho-corasick",
"bstr",
"fnv",
"log",
"regex",
]
[[package]]
@ -345,12 +436,63 @@ dependencies = [
"winapi 0.3.9",
]
[[package]]
name = "http"
version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "31f4c6746584866f0feabcc69893c5b51beef3831656a968ed7ae254cdc4fd03"
dependencies = [
"bytes",
"fnv",
"itoa",
]
[[package]]
name = "http-body"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ff4f84919677303da5f147645dbea6b1881f368d03ac84e1dc09031ebd7b2c6"
dependencies = [
"bytes",
"http",
"pin-project-lite",
]
[[package]]
name = "httparse"
version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9100414882e15fb7feccb4897e5f0ff0ff1ca7d1a86a23208ada4d7a18e6c6c4"
[[package]]
name = "httpdate"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421"
[[package]]
name = "hyper"
version = "0.14.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b26ae0a80afebe130861d90abf98e3814a4f28a4c6ffeb5ab8ebb2be311e0ef2"
dependencies = [
"bytes",
"futures-channel",
"futures-core",
"futures-util",
"http",
"http-body",
"httparse",
"httpdate",
"itoa",
"pin-project-lite",
"socket2",
"tokio",
"tower-service",
"tracing",
"want",
]
[[package]]
name = "idna"
version = "0.2.3"
@ -416,6 +558,55 @@ version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1aab8fc367588b89dcee83ab0fd66b72b50b72fa1904d7095045ace2b0c81c35"
[[package]]
name = "jsonrpc-core"
version = "18.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "14f7f76aef2d054868398427f6c54943cf3d1caa9a7ec7d0c38d69df97a965eb"
dependencies = [
"futures",
"futures-executor",
"futures-util",
"log",
"serde",
"serde_derive",
"serde_json",
]
[[package]]
name = "jsonrpc-http-server"
version = "18.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e1dea6e07251d9ce6a552abfb5d7ad6bc290a4596c8dcc3d795fae2bbdc1f3ff"
dependencies = [
"futures",
"hyper",
"jsonrpc-core",
"jsonrpc-server-utils",
"log",
"net2",
"parking_lot",
"unicase",
]
[[package]]
name = "jsonrpc-server-utils"
version = "18.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fa4fdea130485b572c39a460d50888beb00afb3e35de23ccd7fad8ff19f0e0d4"
dependencies = [
"bytes",
"futures",
"globset",
"jsonrpc-core",
"lazy_static",
"log",
"tokio",
"tokio-stream",
"tokio-util",
"unicase",
]
[[package]]
name = "kernel32-sys"
version = "0.2.2"
@ -471,6 +662,16 @@ version = "0.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7fb9b38af92608140b86b693604b9ffcc5824240a484d1ecd4795bacb2fe88f3"
[[package]]
name = "lock_api"
version = "0.4.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "327fa5b6a6940e4699ec49a9beae1ea4845c6bab9314e4f84ac68742139d8c53"
dependencies = [
"autocfg",
"scopeguard",
]
[[package]]
name = "log"
version = "0.4.16"
@ -533,12 +734,26 @@ dependencies = [
"kernel32-sys",
"libc",
"log",
"miow",
"miow 0.2.2",
"net2",
"slab",
"winapi 0.2.8",
]
[[package]]
name = "mio"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "52da4364ffb0e4fe33a9841a98a3f3014fb964045ce4f7a45a398243c8d6b0c9"
dependencies = [
"libc",
"log",
"miow 0.3.7",
"ntapi",
"wasi 0.11.0+wasi-snapshot-preview1",
"winapi 0.3.9",
]
[[package]]
name = "mio-extras"
version = "2.0.6"
@ -547,7 +762,7 @@ checksum = "52403fe290012ce777c4626790c8951324a2b9e3316b3143779c72b029742f19"
dependencies = [
"lazycell",
"log",
"mio",
"mio 0.6.23",
"slab",
]
@ -563,6 +778,15 @@ dependencies = [
"ws2_32-sys",
]
[[package]]
name = "miow"
version = "0.3.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b9f1c5b025cda876f66ef43a113f91ebc9f4ccef34843000e0adf6ebbab84e21"
dependencies = [
"winapi 0.3.9",
]
[[package]]
name = "native-tls"
version = "0.2.10"
@ -614,12 +838,21 @@ dependencies = [
"fsevent-sys",
"inotify",
"libc",
"mio",
"mio 0.6.23",
"mio-extras",
"walkdir",
"winapi 0.3.9",
]
[[package]]
name = "ntapi"
version = "0.3.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c28774a7fd2fbb4f0babd8237ce554b73af68021b5f695a3cebd6c59bac0980f"
dependencies = [
"winapi 0.3.9",
]
[[package]]
name = "num-integer"
version = "0.1.44"
@ -713,6 +946,31 @@ version = "1.5.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c69d19a208bba8b94bd27d4b7a06ad153cddc6b88cb2149a668e23ce7bdb67d5"
[[package]]
name = "parking_lot"
version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99"
dependencies = [
"instant",
"lock_api",
"parking_lot_core",
]
[[package]]
name = "parking_lot_core"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d76e8e1493bcac0d2766c42737f34458f1c8c50c0d23bcb24ea953affb273216"
dependencies = [
"cfg-if 1.0.0",
"instant",
"libc",
"redox_syscall",
"smallvec",
"winapi 0.3.9",
]
[[package]]
name = "pin-project-lite"
version = "0.2.8"
@ -885,6 +1143,12 @@ dependencies = [
"winapi 0.3.9",
]
[[package]]
name = "scopeguard"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
[[package]]
name = "security-framework"
version = "2.6.1"
@ -975,6 +1239,22 @@ version = "0.4.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eb703cfe953bccee95685111adeedb76fabe4e97549a58d16f03ea7b9367bb32"
[[package]]
name = "smallvec"
version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f2dd574626839106c320a323308629dcb1acfc96e32a8cba364ddc61ac23ee83"
[[package]]
name = "socket2"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "66d72b759436ae32898a2af0a14218dbf55efde3feeb170eb623637db85ee1e0"
dependencies = [
"libc",
"winapi 0.3.9",
]
[[package]]
name = "strsim"
version = "0.10.0"
@ -1052,8 +1332,80 @@ version = "1.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2af73ac49756f3f7c01172e34a23e5d0216f6c32333757c2c61feb2bbff5a5ee"
dependencies = [
"bytes",
"libc",
"memchr",
"mio 0.8.2",
"num_cpus",
"pin-project-lite",
"socket2",
"winapi 0.3.9",
]
[[package]]
name = "tokio-stream"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "50145484efff8818b5ccd256697f36863f587da82cf8b409c53adf1e840798e3"
dependencies = [
"futures-core",
"pin-project-lite",
"tokio",
]
[[package]]
name = "tokio-util"
version = "0.6.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e99e1983e5d376cd8eb4b66604d2e99e79f5bd988c3055891dcd8c9e2604cc0"
dependencies = [
"bytes",
"futures-core",
"futures-sink",
"log",
"pin-project-lite",
"tokio",
]
[[package]]
name = "tower-service"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "360dfd1d6d30e05fda32ace2c8c70e9c0a9da713275777f5a4dbb8a1893930c6"
[[package]]
name = "tracing"
version = "0.1.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4a1bdf54a7c28a2bbf701e1d2233f6c77f473486b94bee4f9678da5a148dca7f"
dependencies = [
"cfg-if 1.0.0",
"pin-project-lite",
"tracing-core",
]
[[package]]
name = "tracing-core"
version = "0.1.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "90442985ee2f57c9e1b548ee72ae842f4a9a20e3f417cc38dbc5dc684d9bb4ee"
dependencies = [
"lazy_static",
]
[[package]]
name = "try-lock"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642"
[[package]]
name = "unicase"
version = "2.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "50f37be617794602aabbeee0be4f259dc1778fabe05e2d67ee8f79326d5cb4f6"
dependencies = [
"version_check",
]
[[package]]
@ -1100,12 +1452,28 @@ dependencies = [
"winapi-util",
]
[[package]]
name = "want"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ce8a968cb1cd110d136ff8b819a556d6fb6d919363c61534f6860c7eb172ba0"
dependencies = [
"log",
"try-lock",
]
[[package]]
name = "wasi"
version = "0.10.2+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6"
[[package]]
name = "wasi"
version = "0.11.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
[[package]]
name = "winapi"
version = "0.2.8"

View File

@ -1,6 +1,6 @@
[package]
name = "ffplayout-rs"
version = "0.8.1"
version = "0.9.0"
edition = "2021"
[dependencies]
@ -8,6 +8,7 @@ chrono = "0.4"
clap = { version = "3.1", features = ["derive"] }
ffprobe = "0.3"
file-rotate = "0.6"
jsonrpc-http-server = "18.0"
lettre = "0.10.0-rc.5"
log = "0.4"
notify = "4.0"

View File

@ -1,5 +1,5 @@
general:
helptext: Sometimes it can happen, that a file is corrupt but still playable,
help_text: Sometimes it can happen, that a file is corrupt but still playable,
this can produce an streaming error over all following files. The only way
in this case is, to stop ffplayout and start it again. Here we only say when
it stops, the starting process is in your hand. Best way is a systemd service
@ -7,10 +7,17 @@ general:
value. A number below 3 can cause unexpected errors.
stop_threshold: 11
rpc_server:
help_text: Run a JSON RPC server, for getting infos about current playing, and
control for some functions.
enable: true
address: 127.0.0.1:7070
authorization: av2Kx8g67lF9qj5wEH3ym1bI4cCs
mail:
helptext: Send error messages to email address, like missing playlist; invalid
help_text: Send error messages to email address, like missing playlist; invalid
json format; missing clip path. Leave recipient blank, if you don't need this.
'mail_level' can be WARNING or ERROR.
'mail_level' can be INFO, WARNING or ERROR. 'interval' means seconds until a new mail will be sended.
subject: "Playout Error"
smtp_server: "mail.example.org"
starttls: true
@ -20,7 +27,7 @@ mail:
mail_level: "ERROR"
logging:
helptext: Logging to file, if 'log_to_file' false log to console. 'backup_count'
help_text: Logging to file, if 'log_to_file' false log to console. 'backup_count'
says how long log files will be saved in days. 'local_time' to false will set
log timestamps to UTC. Path to /var/log/ only if you run this program as daemon.
'log_level' can be DEBUG, INFO, WARNING, ERROR. 'ffmpeg_level' can be info,
@ -34,7 +41,7 @@ logging:
ffmpeg_level: "error"
processing:
helptext: Default processing, for all clips that they get prepared in that way,
help_text: Default processing, for all clips that they get prepared in that way,
so the output is unique. Set playing mode, like playlist, or folder.
'aspect' must be a float number. 'logo' is only used if the path exist.
'logo_scale' scale the logo to target size, leave it blank when no scaling
@ -60,7 +67,7 @@ processing:
volume: 1
ingest:
helptext: Works not with direct hls output, it always needs full processing! Run a server
help_text: Works not with direct hls output, it always needs full processing! Run a server
for a ingest stream. This stream will override the normal streaming until is done.
There is no authentication, this is up to you. The recommend way is to set address to
localhost, stream to a local server with authentication and from there stream to this app.
@ -68,7 +75,7 @@ ingest:
input_param: -f live_flv -listen 1 -i rtmp://localhost:1936/live/stream
playlist:
helptext: >
help_text: >
'path' can be a path to a single file, or a directory. For directory put
only the root folder, for example '/playlists', subdirectories are read by the
script. Subdirectories needs this structure '/playlists/2018/01'. 'day_start'
@ -82,7 +89,7 @@ playlist:
infinit: false
storage:
helptext: Play ordered or randomly files from path. 'filler_clip' is for fill
help_text: Play ordered or randomly files from path. 'filler_clip' is for fill
the end to reach 24 hours, it will loop when is necessary. 'extensions' search
only files with this extension. Set 'shuffle' to 'True' to pick files randomly.
path: "/mediaStorage"
@ -93,7 +100,7 @@ storage:
shuffle: true
text:
helptext: Overlay text in combination with libzmq for remote text manipulation.
help_text: Overlay text in combination with libzmq for remote text manipulation.
On windows fontfile path need to be like this 'C\:/WINDOWS/fonts/DejaVuSans.ttf'.
In a standard environment the filter drawtext node is Parsed_drawtext_2.
'over_pre' if True text will be overlay in pre processing. Continue same text
@ -110,7 +117,7 @@ text:
regex: ^.+[/\\](.*)(.mp4|.mkv)$
out:
helptext: The final playout compression. Set the settings to your needs.
help_text: The final playout compression. Set the settings to your needs.
'mode' has the standard options 'desktop', 'hls', 'stream'. Self made
outputs can be define, by adding script in output folder with an 'output' function
inside. 'preview' works only in streaming output and creates a separate preview stream.

View File

@ -155,18 +155,14 @@ fn overlay(node: &mut Media, chain: &mut Filters, config: &GlobalConfig) {
config.processing.logo
);
if let Some(last) = &node.last {
if last.category == "advertisement" {
logo_chain.push_str(",fade=in:st=0:d=1.0:alpha=1")
}
if node.last_ad.unwrap() {
logo_chain.push_str(",fade=in:st=0:d=1.0:alpha=1")
}
if let Some(next) = &node.next {
if next.category == "advertisement" {
logo_chain.push_str(
format!(",fade=out:st={}:d=1.0:alpha=1", node.out - node.seek - 1.0).as_str(),
)
}
if node.next_ad.unwrap() {
logo_chain.push_str(
format!(",fade=out:st={}:d=1.0:alpha=1", node.out - node.seek - 1.0).as_str(),
)
}
logo_chain

View File

@ -70,18 +70,6 @@ impl Source {
fn sort(&mut self) {
self.nodes.lock().unwrap().sort();
}
fn last_next_node(&mut self) {
if self.index + 1 < self.nodes.lock().unwrap().len() {
let next_node = self.nodes.lock().unwrap()[self.index + 1].clone();
self.current_node.next = Some(Box::new(Media::new(self.index + 1, next_node)));
}
if self.index > 0 && self.index < self.nodes.lock().unwrap().len() {
let last_node = self.nodes.lock().unwrap()[self.index - 1].clone();
self.current_node.last = Some(Box::new(Media::new(self.index - 1, last_node)));
}
}
}
impl Iterator for Source {
@ -93,14 +81,11 @@ impl Iterator for Source {
self.current_node = Media::new(self.index, current_file);
self.current_node.add_probe();
self.current_node.add_filter();
self.last_next_node();
self.index += 1;
Some(self.current_node.clone())
} else {
let last = self.current_node.clone();
if self.config.storage.shuffle {
info!("Shuffle files");
self.shuffle();
@ -113,8 +98,6 @@ impl Iterator for Source {
self.current_node = Media::new(self.index, current_file);
self.current_node.add_probe();
self.current_node.add_filter();
self.last_next_node();
self.current_node.last = Some(Box::new(last));
self.index = 1;

View File

@ -2,16 +2,16 @@ use std::{
io::{BufReader, Error, Read},
path::Path,
process::{Command, Stdio},
sync::{mpsc::SyncSender, Arc, Mutex},
sync::{mpsc::SyncSender},
thread::sleep,
time::Duration,
};
use process_control::{ChildExt, Terminator};
use process_control::ChildExt;
use simplelog::*;
use tokio::runtime::Handle;
use crate::utils::{stderr_reader, GlobalConfig};
use crate::utils::{stderr_reader, GlobalConfig, ProcessControl};
fn overlay(config: &GlobalConfig) -> String {
let mut logo_chain = String::new();
@ -57,9 +57,7 @@ pub async fn ingest_server(
log_format: String,
ingest_sender: SyncSender<(usize, [u8; 65088])>,
rt_handle: Handle,
proc_terminator: Arc<Mutex<Option<Terminator>>>,
is_terminated: Arc<Mutex<bool>>,
server_is_running: Arc<Mutex<bool>>,
proc_control: ProcessControl,
) -> Result<(), Error> {
let config = GlobalConfig::global();
let mut buffer: [u8; 65088] = [0; 65088];
@ -101,7 +99,7 @@ pub async fn ingest_server(
debug!("Server CMD: <bright-blue>\"ffmpeg {}\"</>", server_cmd.join(" "));
loop {
if *is_terminated.lock().unwrap() {
if *proc_control.is_terminated.lock().unwrap() {
break;
}
let mut server_proc = match Command::new("ffmpeg")
@ -118,7 +116,7 @@ pub async fn ingest_server(
};
let serv_terminator = server_proc.terminator()?;
*proc_terminator.lock().unwrap() = Some(serv_terminator);
*proc_control.server_term.lock().unwrap() = Some(serv_terminator);
rt_handle.spawn(stderr_reader(
server_proc.stderr.take().unwrap(),
@ -139,7 +137,7 @@ pub async fn ingest_server(
};
if !is_running {
*server_is_running.lock().unwrap() = true;
*proc_control.server_is_running.lock().unwrap() = true;
is_running = true;
}
@ -147,7 +145,7 @@ pub async fn ingest_server(
if let Err(e) = ingest_sender.send((bytes_len, buffer)) {
error!("Ingest server write error: {:?}", e);
*is_terminated.lock().unwrap() = true;
*proc_control.is_terminated.lock().unwrap() = true;
break;
}
} else {
@ -155,7 +153,7 @@ pub async fn ingest_server(
}
}
*server_is_running.lock().unwrap() = false;
*proc_control.server_is_running.lock().unwrap() = false;
sleep(Duration::from_secs(1));

View File

@ -142,13 +142,18 @@ impl CurrentProgram {
}
}
fn last_next_node(&mut self) {
if self.index + 1 < self.nodes.len() {
self.current_node.next = Some(Box::new(self.nodes[self.index + 1].clone()));
fn last_next_ad(&mut self) {
if self.index + 1 < self.nodes.len()
&& self.nodes[self.index + 1].category == "advertisement".to_string()
{
self.current_node.next_ad = Some(true);
}
if self.index > 0 && self.index < self.nodes.len() {
self.current_node.last = Some(Box::new(self.nodes[self.index - 1].clone()));
if self.index > 0
&& self.index < self.nodes.len()
&& self.nodes[self.index - 1].category == "advertisement".to_string()
{
self.current_node.last_ad = Some(true);
}
}
@ -242,7 +247,7 @@ impl Iterator for CurrentProgram {
}
}
self.last_next_node();
self.last_next_ad();
return Some(self.current_node.clone());
}
@ -256,7 +261,7 @@ impl Iterator for CurrentProgram {
}
self.current_node = timed_source(self.nodes[self.index].clone(), &self.config, is_last);
self.last_next_node();
self.last_next_ad();
self.index += 1;
// update playlist should happen after current clip,
@ -265,7 +270,7 @@ impl Iterator for CurrentProgram {
Some(self.current_node.clone())
} else {
let last_playlist = self.json_path.clone();
let last = self.current_node.clone();
let last_ad = self.current_node.last_ad.clone();
self.check_for_next_playlist();
let (_, total_delta) = get_delta(&self.config.playlist.start_sec.unwrap());
@ -274,7 +279,6 @@ impl Iterator for CurrentProgram {
{
// Test if playlist is to early finish,
// and if we have to fill it with a placeholder.
self.index += 1;
self.current_node = Media::new(self.index, "".to_string());
self.current_node.begin = Some(get_sec());
let mut duration = total_delta.abs();
@ -286,17 +290,20 @@ impl Iterator for CurrentProgram {
self.current_node.out = duration;
self.current_node = gen_source(self.current_node.clone());
self.nodes.push(self.current_node.clone());
self.last_next_ad();
self.current_node.last = Some(Box::new(last));
self.current_node.last_ad = last_ad;
self.current_node.add_filter();
self.index += 1;
return Some(self.current_node.clone());
}
self.index = 0;
self.current_node = gen_source(self.nodes[self.index].clone());
self.last_next_node();
self.current_node.last = Some(Box::new(last));
self.last_next_ad();
self.current_node.last_ad = last_ad;
self.index = 1;

View File

@ -1,35 +1,39 @@
extern crate log;
extern crate simplelog;
use std::sync::{Arc, Mutex};
use simplelog::*;
use tokio::runtime::Builder;
mod filter;
mod input;
mod output;
mod utils;
use simplelog::*;
use tokio::runtime::Builder;
use crate::output::{player, write_hls};
use crate::utils::{init_config, init_logging, validate_ffmpeg, GlobalConfig};
use crate::utils::{init_config, init_logging, validate_ffmpeg, run_rpc, GlobalConfig, ProcessControl};
fn main() {
init_config();
let config = GlobalConfig::global();
let proc_control = ProcessControl::new();
let runtime = Builder::new_multi_thread().enable_all().build().unwrap();
let rt_handle = runtime.handle();
let is_terminated: Arc<Mutex<bool>> = Arc::new(Mutex::new(false));
let logging = init_logging(rt_handle.clone(), is_terminated.clone());
let logging = init_logging(rt_handle.clone(), proc_control.is_terminated.clone());
CombinedLogger::init(logging).unwrap();
validate_ffmpeg();
if config.out.mode.to_lowercase() == "hls".to_string() {
write_hls(rt_handle, is_terminated);
} else {
player(rt_handle, is_terminated);
if config.rpc_server.enable {
rt_handle.spawn(run_rpc(proc_control.clone()));
}
if config.out.mode.to_lowercase() == "hls".to_string() {
write_hls(rt_handle, proc_control.is_terminated.clone());
} else {
player(rt_handle, proc_control);
}
info!("Playout done...");
}

View File

@ -3,7 +3,7 @@ use std::{
io::{prelude::*, BufReader, BufWriter, Read},
path::Path,
process,
process::{Child, Command, Stdio},
process::{Command, Stdio},
sync::{
mpsc::{channel, sync_channel, Receiver, SyncSender},
Arc, Mutex,
@ -12,7 +12,7 @@ use std::{
time::Duration,
};
use process_control::Terminator;
use process_control::ChildExt;
use simplelog::*;
use tokio::runtime::Handle;
@ -23,62 +23,7 @@ mod stream;
pub use hls::write_hls;
use crate::input::{ingest_server, watch_folder, CurrentProgram, Source};
use crate::utils::{sec_to_time, stderr_reader, GlobalConfig, Media};
#[derive(Debug)]
struct ProcessCleanup {
server_term: Arc<Mutex<Option<Terminator>>>,
is_terminated: Arc<Mutex<bool>>,
enc_proc: Child,
is_alive: bool,
}
impl ProcessCleanup {
fn new(
server_term: Arc<Mutex<Option<Terminator>>>,
is_terminated: Arc<Mutex<bool>>,
enc_proc: Child,
) -> Self {
Self {
server_term,
is_terminated,
enc_proc,
is_alive: true,
}
}
}
impl ProcessCleanup {
fn kill(&mut self) {
*self.is_terminated.lock().unwrap() = true;
if self.is_alive {
if let Some(server) = &*self.server_term.lock().unwrap() {
unsafe {
if let Ok(_) = server.terminate() {
info!("Terminate ingest server done");
}
}
};
self.is_alive = false;
}
if let Ok(_) = self.enc_proc.kill() {
info!("Playout done...")
}
if let Err(e) = self.enc_proc.wait() {
error!("Encoder: {e}")
};
}
}
impl Drop for ProcessCleanup {
fn drop(&mut self) {
self.kill()
}
}
use crate::utils::{sec_to_time, stderr_reader, GlobalConfig, Media, ProcessControl};
pub fn source_generator(
rt_handle: &Handle,
@ -127,17 +72,17 @@ pub fn source_generator(
(get_source, init_playlist)
}
pub fn player(rt_handle: &Handle, is_terminated: Arc<Mutex<bool>>) {
pub fn player(rt_handle: &Handle, proc_control: ProcessControl) {
let config = GlobalConfig::global();
let dec_settings = config.processing.clone().settings.unwrap();
let ff_log_format = format!("level+{}", config.logging.ffmpeg_level.to_lowercase());
let server_term: Arc<Mutex<Option<Terminator>>> = Arc::new(Mutex::new(None));
let server_is_running: Arc<Mutex<bool>> = Arc::new(Mutex::new(false));
let mut buffer: [u8; 65088] = [0; 65088];
let mut live_on = false;
let (get_source, init_playlist) =
source_generator(rt_handle, config.clone(), is_terminated.clone());
source_generator(rt_handle, config.clone(), proc_control.is_terminated.clone());
let mut enc_proc = match config.out.mode.as_str() {
"desktop" => desktop::output(ff_log_format.clone()),
@ -162,17 +107,13 @@ pub fn player(rt_handle: &Handle, is_terminated: Arc<Mutex<bool>>) {
ff_log_format.clone(),
ingest_sender,
rt_handle.clone(),
server_term.clone(),
is_terminated.clone(),
server_is_running.clone(),
proc_control.clone(),
));
}
let mut proc_cleanup =
ProcessCleanup::new(server_term.clone(), is_terminated.clone(), enc_proc);
'source_iter: for node in get_source {
println!("{:?}", &node.clone());
*proc_control.current_media.lock().unwrap() = Some(node.clone());
let cmd = match node.cmd {
Some(cmd) => cmd,
None => break,
@ -223,6 +164,10 @@ pub fn player(rt_handle: &Handle, is_terminated: Arc<Mutex<bool>>) {
"Decoder".to_string(),
));
if let Ok(dec_terminator) = dec_proc.terminator() {
*proc_control.decoder_term.lock().unwrap() = Some(dec_terminator);
};
loop {
if *server_is_running.lock().unwrap() {
if !live_on {
@ -291,5 +236,7 @@ pub fn player(rt_handle: &Handle, is_terminated: Arc<Mutex<bool>>) {
sleep(Duration::from_secs(1));
proc_cleanup.kill();
if let Err(e) = enc_proc.wait() {
panic!("Encoder error: {:?}", e)
};
}

View File

@ -15,6 +15,7 @@ use crate::utils::{get_args, time_to_sec};
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct GlobalConfig {
pub general: General,
pub rpc_server: RpcServer,
pub mail: Mail,
pub logging: Logging,
pub processing: Processing,
@ -30,6 +31,13 @@ pub struct General {
pub stop_threshold: f64,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct RpcServer {
pub enable: bool,
pub address: String,
pub authorization: String,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Mail {
pub subject: String,
@ -39,6 +47,7 @@ pub struct Mail {
pub sender_pass: String,
pub recipient: String,
pub mail_level: String,
pub interval: i32,
}
#[derive(Debug, Serialize, Deserialize, Clone)]

View File

@ -1,6 +1,7 @@
extern crate log;
extern crate simplelog;
use chrono::prelude::*;
use regex::Regex;
use std::{
path::Path,
@ -51,11 +52,15 @@ fn send_mail(msg: String) {
}
}
async fn mail_queue(messages: Arc<Mutex<Vec<String>>>, is_terminated: Arc<Mutex<bool>>) {
let mut count = 0;
async fn mail_queue(
messages: Arc<Mutex<Vec<String>>>,
is_terminated: Arc<Mutex<bool>>,
interval: i32,
) {
let mut count: i32 = 0;
loop {
if *is_terminated.lock().unwrap() || count == 60 {
if *is_terminated.lock().unwrap() || count == interval {
// check every 30 seconds for messages and send them
if messages.lock().unwrap().len() > 0 {
let msg = messages.lock().unwrap().join("\n");
@ -71,14 +76,14 @@ async fn mail_queue(messages: Arc<Mutex<Vec<String>>>, is_terminated: Arc<Mutex<
break;
}
sleep(Duration::from_millis(500));
sleep(Duration::from_secs(1));
count += 1;
}
}
pub struct LogMailer {
level: LevelFilter,
config: Config,
pub config: Config,
messages: Arc<Mutex<Vec<String>>>,
}
@ -103,21 +108,13 @@ impl Log for LogMailer {
fn log(&self, record: &Record<'_>) {
if self.enabled(record.metadata()) {
match record.level() {
Level::Error => {
self.messages
.lock()
.unwrap()
.push(record.args().to_string());
}
Level::Warn => {
self.messages
.lock()
.unwrap()
.push(record.args().to_string());
}
_ => (),
}
let local: DateTime<Local> = Local::now();
let time_stamp: String = local.format("[%Y-%m-%d %H:%M:%S%.3f]").to_string();
let level = record.level().to_string().to_uppercase();
let rec = record.args().to_string();
let full_line: String = format!("{time_stamp} [{level: >5}] {rec}");
self.messages.lock().unwrap().push(full_line);
}
}
@ -212,19 +209,24 @@ pub fn init_logging(
}
if config.mail.recipient.len() > 3 {
let mut filter = LevelFilter::Error;
let messages: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
let interval = config.mail.interval.clone();
rt_handle.spawn(mail_queue(messages.clone(), is_terminated.clone()));
rt_handle.spawn(mail_queue(
messages.clone(),
is_terminated.clone(),
interval,
));
let mail_config = log_config
.clone()
.set_time_format_str("[%Y-%m-%d %H:%M:%S%.3f]")
.build();
if config.mail.mail_level.to_lowercase() == "warning".to_string() {
filter = LevelFilter::Warn
}
let filter = match config.mail.mail_level.to_lowercase().as_str() {
"info" => LevelFilter::Info,
"warning" => LevelFilter::Warn,
_ => LevelFilter::Error,
};
app_logger.push(LogMailer::new(filter, mail_config, messages));
}

View File

@ -8,10 +8,13 @@ use std::{
path::Path,
process::exit,
process::{ChildStderr, Command, Stdio},
sync::{Arc, Mutex, RwLock},
time,
time::UNIX_EPOCH,
};
use jsonrpc_http_server::CloseHandle;
use process_control::Terminator;
use regex::Regex;
use simplelog::*;
@ -20,15 +23,88 @@ mod config;
pub mod json_reader;
mod json_validate;
mod logging;
mod rpc_server;
pub use arg_parse::get_args;
pub use config::{init_config, GlobalConfig};
pub use json_reader::{read_json, Playlist, DUMMY_LEN};
pub use json_validate::validate_playlist;
pub use logging::init_logging;
pub use rpc_server::run_rpc;
use crate::filter::filter_chains;
#[derive(Clone)]
pub struct ProcessControl {
pub decoder_term: Arc<Mutex<Option<Terminator>>>,
pub encoder_term: Arc<Mutex<Option<Terminator>>>,
pub server_term: Arc<Mutex<Option<Terminator>>>,
pub server_is_running: Arc<Mutex<bool>>,
pub rpc_handle: Arc<Mutex<Option<CloseHandle>>>,
pub is_terminated: Arc<Mutex<bool>>,
pub is_alive: Arc<RwLock<bool>>,
pub current_media: Arc<Mutex<Option<Media>>>,
}
impl ProcessControl {
pub fn new() -> Self {
Self {
decoder_term: Arc::new(Mutex::new(None)),
encoder_term: Arc::new(Mutex::new(None)),
server_term: Arc::new(Mutex::new(None)),
server_is_running: Arc::new(Mutex::new(false)),
rpc_handle: Arc::new(Mutex::new(None)),
is_terminated: Arc::new(Mutex::new(false)),
is_alive: Arc::new(RwLock::new(true)),
current_media: Arc::new(Mutex::new(None)),
}
}
}
impl ProcessControl {
pub fn kill_all(&mut self) {
*self.is_terminated.lock().unwrap() = true;
if *self.is_alive.read().unwrap() {
*self.is_alive.write().unwrap() = false;
if let Some(rpc) = &*self.rpc_handle.lock().unwrap() {
rpc.clone().close()
};
if let Some(server) = &*self.server_term.lock().unwrap() {
unsafe {
if let Err(e)= server.terminate() {
error!("Ingest server: {:?}", e);
}
}
};
if let Some(decoder) = &*self.decoder_term.lock().unwrap() {
unsafe {
if let Err(e) = decoder.terminate() {
error!("Decoder: {:?}", e);
}
}
};
if let Some(encoder) = &*self.encoder_term.lock().unwrap() {
unsafe {
if let Err(e) = encoder.terminate() {
error!("Encoder: {:?}", e);
}
}
};
}
}
}
impl Drop for ProcessControl {
fn drop(&mut self) {
self.kill_all()
}
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Media {
pub begin: Option<f64>,
@ -42,8 +118,8 @@ pub struct Media {
pub cmd: Option<Vec<String>>,
pub filter: Option<Vec<String>>,
pub probe: Option<MediaProbe>,
pub last: Option<Box<Media>>,
pub next: Option<Box<Media>>,
pub last_ad: Option<bool>,
pub next_ad: Option<bool>,
pub process: Option<bool>,
}
@ -72,8 +148,8 @@ impl Media {
cmd: Some(vec!["-i".to_string(), src]),
filter: Some(vec![]),
probe: probe,
last: None,
next: None,
last_ad: Some(false),
next_ad: Some(false),
process: Some(true),
}
}

74
src/utils/rpc_server.rs Normal file
View File

@ -0,0 +1,74 @@
use serde_json::{Map, Number};
use jsonrpc_http_server::jsonrpc_core::{IoHandler, Params, Value};
use jsonrpc_http_server::{
hyper, AccessControlAllowOrigin, DomainsValidation, Response, RestApi, ServerBuilder,
};
use simplelog::*;
use crate::utils::{GlobalConfig, ProcessControl};
pub async fn run_rpc(proc_control: ProcessControl) {
let config = GlobalConfig::global();
let mut io = IoHandler::default();
let proc = proc_control.clone();
io.add_sync_method("player", move |params: Params| {
match params {
Params::Map(map) => {
if map.contains_key("control") && map["control"] == "next".to_string() {
if let Some(decoder) = &*proc.decoder_term.lock().unwrap() {
unsafe {
if let Ok(_) = decoder.terminate() {
info!("Skip current clip");
return Ok(Value::String(format!("Skip current clip")));
}
}
}
}
if map.contains_key("media") && map["media"] == "current".to_string() {
if let Some(media) = proc.current_media.lock().unwrap().clone() {
let mut media_map = Map::new();
media_map.insert(
"begin".to_string(),
Value::Number(Number::from_f64(media.begin.unwrap_or(0.0)).unwrap()),
);
media_map.insert("source".to_string(), Value::String(media.source));
return Ok(Value::Object(media_map));
};
}
}
_ => return Ok(Value::String(format!("Wrong parameters..."))),
}
Ok(Value::String(format!("no parameters set...")))
});
let server = ServerBuilder::new(io)
.cors(DomainsValidation::AllowOnly(vec![
AccessControlAllowOrigin::Null,
]))
.request_middleware(|request: hyper::Request<hyper::Body>| {
if request.headers().contains_key("authorization")
&& request.headers()["authorization"] == config.rpc_server.authorization
{
if request.uri() == "/status" {
println!("{:?}", request.headers().contains_key("authorization"));
Response::ok("Server running OK.").into()
} else {
request.into()
}
} else {
Response::bad_request("No authorization header or valid key found!").into()
}
})
.rest_api(RestApi::Secure)
.start_http(&config.rpc_server.address.parse().unwrap())
.expect("Unable to start RPC server");
*proc_control.rpc_handle.lock().unwrap() = Some(server.close_handle().clone());
server.wait();
}