Merge remote-tracking branch 'ffplayout-rs/main'

This commit is contained in:
jb-alvarado 2022-04-14 16:46:00 +02:00
commit c2ee7965c5
34 changed files with 5827 additions and 69 deletions

22
.github/workflows/rust.yml vendored Normal file
View File

@ -0,0 +1,22 @@
name: Rust
on:
push:
branches: [ main ]
pull_request:
branches: [ main ]
env:
CARGO_TERM_COLOR: always
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Build
run: cargo build --verbose
- name: Run tests
run: cargo test --verbose

34
.gitignore vendored
View File

@ -1,13 +1,21 @@
.ropeproject # Generated by Cargo
**temp # will have compiled files and executables
**playlists /target/
*.log*
.DS_Store # These are backup files generated by rustfmt
__pycache__/ **/*.rs.bk
*-orig.*
*.json # exclude binarys in examples folder
test/ /examples/*
.pytest_cache/ !/examples/*.rs
venv/
log/ # exlcude logging
.mypy_cache/ *.log
/logs/
*.zip
*tar.gz
*.deb
*.rpm
.vscode/

1525
Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

70
Cargo.toml Normal file
View File

@ -0,0 +1,70 @@
[package]
name = "ffplayout-rs"
description = "24/7 playout based on rust and ffmpeg"
license = "GPL-3.0"
authors = ["Jonathan Baecker jonbae77@gmail.com"]
readme = "README.md"
version = "0.9.3"
edition = "2021"
[dependencies]
chrono = "0.4"
clap = { version = "3.1", features = ["derive"] }
ffprobe = "0.3"
file-rotate = "0.6"
jsonrpc-http-server = "18.0"
lettre = "0.10.0-rc.5"
log = "0.4"
notify = "4.0"
once_cell = "1.10"
rand = "0.8"
regex = "1"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
serde_yaml = "0.8"
shlex = "1.1"
simplelog = { version = "^0.11", features = ["paris"] }
tokio = { version = "1.16", features = ["rt-multi-thread"] }
walkdir = "2"
[target.x86_64-unknown-linux-musl.dependencies]
openssl = { version = "0.10", features = ["vendored"] }
[[bin]]
name = "ffplayout"
path = "src/main.rs"
[profile.release]
opt-level = 3
strip = true
lto = true
# DEBIAN DEB PACKAGE
[package.metadata.deb]
name = "ffplayout-engine"
priority = "optional"
section = "net"
license-file = ["LICENSE", "0"]
depends = ""
suggests = "ffmpeg"
copyright = "Copyright (c) 2022, Jonathan Baecker. All rights reserved."
assets = [
["target/x86_64-unknown-linux-musl/release/ffplayout", "/usr/bin/ffplayout", "755"],
["assets/ffplayout.yml", "/etc/ffplayout/ffplayout.yml", "644"],
["assets/logo.png", "/usr/share/ffplayout/logo.png", "644"],
["README.md", "/usr/share/doc/ffplayout-engine/README", "644"],
]
systemd-units = { unit-name = "ffplayout-engine", unit-scripts = "assets", enable = false }
# REHL RPM PACKAGE
[package.metadata.generate-rpm]
name = "ffplayout-engine"
license = "GPL-3.0"
assets = [
{ source = "target/x86_64-unknown-linux-musl/release/ffplayout", dest = "/usr/bin/ffplayout", mode = "755" },
{ source = "assets/ffplayout.yml", dest = "/etc/ffplayout/ffplayout.yml", mode = "644" },
{ source = "assets/ffplayout-engine.service", dest = "/lib/systemd/system/ffplayout-engine.service", mode = "644" },
{ source = "README.md", dest = "/usr/share/doc/ffplayout-engine/README", mode = "644", doc = true },
{ source = "LICENSE", dest = "/usr/share/doc/ffplayout-engine/LICENSE", mode = "644" },
{ source = "assets/logo.png", dest = "/usr/share/ffplayout/logo.png", mode = "644" },
]

159
README.md
View File

@ -1,16 +1,9 @@
**ffplayout_engine** **ffplayout-rs**
================ ================
[![made-with-python](https://img.shields.io/badge/Made%20with-Python-1f425f.svg)](https://www.python.org/)
[![License: GPL v3](https://img.shields.io/badge/License-GPLv3-blue.svg)](https://www.gnu.org/licenses/gpl-3.0) [![License: GPL v3](https://img.shields.io/badge/License-GPLv3-blue.svg)](https://www.gnu.org/licenses/gpl-3.0)
## Attention: Version 4.0 will be the last release in Python The main purpose of ffplayout is to provide a 24/7 broadcasting solution that plays a *json* playlist for every day, while keeping the current playlist editable.
After that the code base will be changed to Rust.
-----
The purpose with ffplayout is to provide a 24/7 broadcasting solution that plays a *json* playlist for every day, while keeping the current playlist editable.
**Check [ffplayout-frontend](https://github.com/ffplayout/ffplayout-frontend): web-based GUI for ffplayout** **Check [ffplayout-frontend](https://github.com/ffplayout/ffplayout-frontend): web-based GUI for ffplayout**
@ -20,20 +13,19 @@ The purpose with ffplayout is to provide a 24/7 broadcasting solution that plays
- have all values in a separate config file - have all values in a separate config file
- dynamic playlist - dynamic playlist
- replace missing playlist or clip with a dummy clip - replace missing playlist or clip with a dummy clip
- playing clips from [watched folder](https://github.com/ffplayout/ffplayout_engine/wiki/Watch-Folder) - playing clips in [watched](/docs/folder_mode.md) folder mode
- send emails with error message - send emails with error message
- overlay a logo - overlay a logo
- overlay text, controllable through [messenger](https://github.com/ffplayout/messenger) or [ffplayout-frontend](https://github.com/ffplayout/ffplayout-frontend) (needs ffmpeg with libzmq) - overlay text, controllable through [messenger](https://github.com/ffplayout/messenger) or [ffplayout-frontend](https://github.com/ffplayout/ffplayout-frontend) (needs ffmpeg with libzmq)
- **EBU R128 loudness** normalization (single pass) (experimental) - EBU R128 loudness normalization (single pass)
- loop clip in playlist which `out` value is higher then its `duration`, see also [Loop Clip](https://github.com/ffplayout/ffplayout_engine/wiki/Loop-Clip)
- loop playlist infinitely - loop playlist infinitely
- [remote source](/docs/remote_source.md)
- trim and fade the last clip, to get full 24 hours - trim and fade the last clip, to get full 24 hours
- when playlist is not 24 hours long, loop filler clip until time is full - when playlist is not 24 hours long, loop filler clip until time is full
- set custom day start, so you can have playlist for example: from 6am to 6am, instate of 0am to 12pm - set custom day start, so you can have playlist for example: from 6am to 6am, instate of 0am to 12pm
- normal system requirements and no special tools - normal system requirements and no special tools
- no GPU power is needed - no GPU power is needed
- stream to server or play on desktop - stream to server or play on desktop
- on posix systems ffplayout can reload config with *SIGHUP*
- logging to files, or colored output to console - logging to files, or colored output to console
- add filters to input, if is necessary to match output stream: - add filters to input, if is necessary to match output stream:
- **yadif** (deinterlacing) - **yadif** (deinterlacing)
@ -43,28 +35,21 @@ The purpose with ffplayout is to provide a 24/7 broadcasting solution that plays
- **aevalsrc** (if video have no audio) - **aevalsrc** (if video have no audio)
- **apad** (add silence if audio duration is to short) - **apad** (add silence if audio duration is to short)
- **tpad** (add black frames if video duration is to short) - **tpad** (add black frames if video duration is to short)
- Live ingest (experimental) - [output](/docs/output.md):
- add custom [filters](https://github.com/ffplayout/ffplayout_engine/tree/master/ffplayout/filters)
- add custom [arguments](https://github.com/ffplayout/ffplayout_engine/tree/master/ffplayout/conf.d)
- different [play modes](https://github.com/ffplayout/ffplayout_engine/tree/master/ffplayout/player):
- different types of [output](https://github.com/ffplayout/ffplayout_engine/tree/master/ffplayout/output):
- **stream** - **stream**
- **desktop** - **desktop**
- **live_switch** - **HLS**
- **hls** - JSON RPC server, for getting infos about current playing and controlling
- **custom** - [live ingest](/docs/live_ingest.md)
- Multi channel
Requirements Requirements
----- -----
- python version 3.7+, dev version 3.9
- python module **watchdog** (only for folder mode)
- python module **colorama** if you are on windows
- python modules **PyYAML**, **requests**, **supervisor**
- **ffmpeg v4.2+** and **ffprobe** (**ffplay** if you want to play on desktop)
- if you want to overlay text, ffmpeg needs to have **libzmq**
- RAM and CPU depends on video resolution, minimum 4 threads and 3GB RAM for 720p are recommend - RAM and CPU depends on video resolution, minimum 4 threads and 3GB RAM for 720p are recommend
- **ffmpeg** v4.2+ and **ffprobe** (**ffplay** if you want to play on desktop)
- if you want to overlay text, ffmpeg needs to have **libzmq**
-----
JSON Playlist Example JSON Playlist Example
----- -----
@ -93,62 +78,124 @@ JSON Playlist Example
"in": 0, "in": 0,
"out": 2531.36, "out": 2531.36,
"duration": 2531.36, "duration": 2531.36,
"source": "/Media/clip4.mp4", "source": "https://example.org/big_buck_bunny.webm",
"category": "" "category": ""
} }
] ]
} }
``` ```
**If you need a simple playlist generator check:** [playlist-generator](https://github.com/ffplayout/playlist-generator)
The playlist can be extend, to use custom attributes in your [filters](/ffplayout/filters/).
**Warning** **Warning**
----- -----
(Endless) streaming over multiple days will only work when config have **day_start** value and the **length** value is **24 hours**. If you need only some hours for every day, use a *cron* job, or something similar. (Endless) streaming over multiple days will only work when config have **day_start** value and the **length** value is **24 hours**. If you need only some hours for every day, use a *cron* job, or something similar.
Remote source from URL
----- -----
You can use sources from remote URL in that way: HLS output
-----
```json For outputting to HLS, output parameters should look like:
{
"in": 0, ```yaml
"out": 149, out:
"duration": 149, ...
"source": "https://example.org/big_buck_bunny.webm"
} output_param: >-
...
-flags +cgop
-f hls
-hls_time 6
-hls_list_size 600
-hls_flags append_list+delete_segments+omit_endlist+program_date_time
-hls_segment_filename /var/www/html/live/stream-%09d.ts /var/www/html/live/stream.m3u8
``` ```
But be careful with it, better test it multiple times! -----
More informations in [Wiki](https://github.com/ffplayout/ffplayout_engine/wiki/Remote-URL-Source) JSON RPC
-----
The ffplayout engine can run a JSON RPC server. A request show look like:
```Bash
curl -X POST -H "Content-Type: application/json" -H "Authorization: ---auth-key---" \
-d '{"jsonrpc": "2.0", "method": "player", "params":{"control":"next"}, "id":1 }' \
127.0.0.1:7070
```
At the moment this comments are possible:
```Bash
'{"jsonrpc": "2.0", "method": "player", "params":{"media":"current"}, "id":1 }' # get infos about current clip
'{"jsonrpc": "2.0", "method": "player", "params":{"media":"next"}, "id":2 }' # get infos about next clip
'{"jsonrpc": "2.0", "method": "player", "params":{"media":"last"}, "id":3 }' # get infos about last clip
'{"jsonrpc": "2.0", "method": "player", "params":{"control":"next"}, "id":4 }' # jump to next clip
'{"jsonrpc": "2.0", "method": "player", "params":{"control":"back"}, "id":5 }' # jump to last clip
'{"jsonrpc": "2.0", "method": "player", "params":{"control":"reset"}, "id":6 }' # reset playlist to old state
```
Output from `{"media":"current"}` show:
```JSON
{
"jsonrpc": "2.0",
"result": {
"current_media": {
"category": "",
"duration": 154.2,
"out": 154.2,
"seek": 0.0,
"source": "/opt/tv-media/clip.mp4"
},
"index": 39,
"play_mode": "playlist",
"played_sec": 67.80771999300123,
"remaining_sec": 86.39228000699876,
"start_sec": 24713.631999999998,
"start_time": "06:51:53.631"
},
"id": 1
}
```
When you are in playlist mode and jumping forward or backwards in time, the time shift will be saved so the playlist is still in sync. But have in mind, that then maybe your playlist gets to short. When you are not resetting the state, it will reset on the next day automatically.
-----
Installation Installation
----- -----
Check [INSTALL.md](docs/INSTALL.md) Copy the binary to `/usr/local/bin/`
Start with Arguments Start with Arguments
----- -----
ffplayout also allows the passing of parameters: ffplayout also allows the passing of parameters:
- `-c, --config` use given config file ```
- `-f, --folder` use folder for playing OPTIONS:
- `-l, --log` for user-defined log path, *none* for console output -c, --config <CONFIG> File path to ffplayout.conf
- `-i, --loop` loop playlist infinitely -f, --folder <FOLDER> Play folder content
- `-o, --output` set output mode: **desktop**, **hls**, **stream**, ... -g, --generate <YYYY-MM-DD>... Generate playlist for date. Date-range is possible, like:
- `-p, --playlist` for playlist file 2022-01-01 - 2022-01-10.
- `-s, --start` set start time in *hh:mm:ss*, *now* for start at playlist begin -h, --help Print help information
- `-t, --length` set length in *hh:mm:ss*, *none* for no length check -i, --infinit Loop playlist infinitely
- `-pm, --play_mode` playing mode: folder, playlist, custom... -l, --log <LOG> File path for logging
-m, --play-mode <PLAY_MODE> Playing mode: folder, playlist
-o, --output <OUTPUT> Set output mode: desktop, hls, stream
-p, --playlist <PLAYLIST> Path from playlist
-s, --start <START> Start time in 'hh:mm:ss', 'now' for start with first
-t, --length <LENGTH> Set length in 'hh:mm:ss', 'none' for no length check
-v, --volume <VOLUME> Set audio volume
-V, --version Print version information
```
You can run the command like: You can run the command like:
```SHELL ```Bash
./ffplayout.py -l none -p ~/playlist.json -s now -t none -o desktop ./ffplayout -l none -p ~/playlist.json -o desktop
``` ```

View File

@ -0,0 +1,14 @@
[Unit]
Description=Rust based 24/7 playout solution
After=network.target
[Service]
ExecStart= /usr/bin/ffplayout
ExecReload=/bin/kill -1 $MAINPID
Restart=always
RestartSec=1
User=www-data
Group=www-data
[Install]
WantedBy=multi-user.target

157
assets/ffplayout.yml Normal file
View File

@ -0,0 +1,157 @@
general:
help_text: Sometimes it can happen, that a file is corrupt but still playable,
this can produce an streaming error over all following files. The only way
in this case is, to stop ffplayout and start it again. Here we only say when
it stops, the starting process is in your hand. Best way is a systemd service
on linux. 'stop_threshold' stop ffplayout, if it is async in time above this
value. A number below 3 can cause unexpected errors.
stop_threshold: 11
rpc_server:
help_text: Run a JSON RPC server, for getting infos about current playing, and
control for some functions.
enable: true
address: 127.0.0.1:7070
authorization: av2Kx8g67lF9qj5wEH3ym1bI4cCs
mail:
help_text: Send error messages to email address, like missing playlist; invalid
json format; missing clip path. Leave recipient blank, if you don't need this.
'mail_level' can be INFO, WARNING or ERROR. 'interval' means seconds until a new mail will be sended.
subject: Playout Error
smtp_server: mail.example.org
starttls: true
sender_addr: ffplayout@example.org
sender_pass: "abc123"
recipient:
mail_level: ERROR
interval: 30
logging:
help_text: Logging to file, if 'log_to_file' false log to console. 'backup_count'
says how long log files will be saved in days. 'local_time' to false will set
log timestamps to UTC. Path to /var/log/ only if you run this program as daemon.
'log_level' can be DEBUG, INFO, WARNING, ERROR. 'ffmpeg_level' can be info,
warning, error.
log_to_file: false
backup_count: 7
local_time: true
timestamp: false
log_path: /var/log/ffplayout/
log_level: DEBUG
ffmpeg_level: error
processing:
help_text: Default processing, for all clips that they get prepared in that way,
so the output is unique. Set playing mode, like playlist, or folder.
'aspect' must be a float number. 'logo' is only used if the path exist.
'logo_scale' scale the logo to target size, leave it blank when no scaling
is needed, format is 'number:number', for example '100:-1' for proportional
scaling. With 'logo_opacity' logo can become transparent. With 'logo_filter'
'overlay=W-w-12:12' you can modify the logo position. With 'use_loudnorm'
you can activate single pass EBU R128 loudness normalization.
'loud_*' can adjust the loudnorm filter.
mode: playlist
width: 1024
height: 576
aspect: 1.778
fps: 25
add_logo: true
logo: /usr/share/ffplayout/logo.png
logo_scale:
logo_opacity: 0.7
logo_filter: overlay=W-w-12:12
add_loudnorm: false
loud_i: -18
loud_tp: -1.5
loud_lra: 11
volume: 1
ingest:
help_text: Works not with direct hls output, it always needs full processing! Run a server
for a ingest stream. This stream will override the normal streaming until is done.
There is no authentication, this is up to you. The recommend way is to set address to
localhost, stream to a local server with authentication and from there stream to this app.
enable: false
input_param: -f live_flv -listen 1 -i rtmp://localhost:1936/live/stream
playlist:
help_text: >
'path' can be a path to a single file, or a directory. For directory put
only the root folder, for example '/playlists', subdirectories are read by the
script. Subdirectories needs this structure '/playlists/2018/01'. 'day_start'
means at which time the playlist should start, leave day_start blank when playlist
should always start at the begin. 'length' represent the target length from
playlist, when is blank real length will not consider. 'infinit true' works with
single playlist file and loops it infinitely.
path: /playlists
day_start: "5:59:25"
length: "24:00:00"
infinit: false
storage:
help_text: Play ordered or randomly files from path. 'filler_clip' is for fill
the end to reach 24 hours, it will loop when is necessary. 'extensions' search
only files with this extension. Set 'shuffle' to 'True' to pick files randomly.
path: "/mediaStorage"
filler_clip: "/mediaStorage/filler/filler.mp4"
extensions:
- "mp4"
- "mkv"
shuffle: true
text:
help_text: Overlay text in combination with libzmq for remote text manipulation.
On windows fontfile path need to be like this 'C\:/WINDOWS/fonts/DejaVuSans.ttf'.
In a standard environment the filter drawtext node is Parsed_drawtext_2.
'over_pre' if True text will be overlay in pre processing. Continue same text
over multiple files is in that mode not possible. 'text_from_filename' activate the
extraction from text of a filename. With 'style' you can define the drawtext
parameters like position, color, etc. Post Text over API will override this.
With 'regex' you can format file names, to get a title from it.
add_text: false
over_pre: false
bind_address: "127.0.0.1:5555"
fontfile: "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf"
text_from_filename: false
style: "x=(w-tw)/2:y=(h-line_h)*0.9:fontsize=24:fontcolor=#ffffff:box=1:boxcolor=#000000:boxborderw=4"
regex: ^.+[/\\](.*)(.mp4|.mkv)$
out:
help_text: The final playout compression. Set the settings to your needs.
'mode' has the standard options 'desktop', 'hls', 'stream'. Self made
outputs can be define, by adding script in output folder with an 'output' function
inside. 'preview' works only in streaming output and creates a separate preview stream.
mode: 'stream'
preview: false
preview_param: >-
-s 512x288
-c:v libx264
-crf 24
-x264-params keyint=50:min-keyint=25:scenecut=-1
-maxrate 800k
-bufsize 1600k
-preset ultrafast
-tune zerolatency
-profile:v Main
-level 3.1
-c:a aac
-ar 44100
-b:a 128k
-flags +global_header
-f flv rtmp://preview.local/live/stream
output_param: >-
-c:v libx264
-crf 23
-x264-params keyint=50:min-keyint=25:scenecut=-1
-maxrate 1300k
-bufsize 2600k
-preset faster
-tune zerolatency
-profile:v Main
-level 3.1
-c:a aac
-ar 44100
-b:a 128k
-flags +global_header
-f flv rtmp://localhost/live/stream

BIN
assets/logo.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 6.6 KiB

54
cross_compile_all.sh Executable file
View File

@ -0,0 +1,54 @@
#!/usr/bin/bash
targets=("x86_64-unknown-linux-musl" "x86_64-pc-windows-gnu" "x86_64-apple-darwin" "aarch64-apple-darwin")
IFS="= "
while read -r name value; do
if [[ $name == "version" ]]; then
version=${value//\"/}
fi
done < Cargo.toml
echo "Compile ffplayout-rs version is: \"$version\""
echo ""
for target in "${targets[@]}"; do
echo "compile static for $target"
echo ""
cargo build --release --target=$target
if [[ $target == "x86_64-pc-windows-gnu" ]]; then
if [[ -f "ffplayout-rs-v${version}_${target}.zip" ]]; then
rm -f "ffplayout-rs-v${version}_${target}.zip"
fi
cp ./target/${target}/release/ffplayout.exe .
zip -r "ffplayout-rs-v${version}_${target}.zip" assets docs LICENSE README.md ffplayout.exe
rm -f ffplayout.exe
else
if [[ -f "ffplayout-rs-v${version}_${target}.tar.gz" ]]; then
rm -f "ffplayout-rs-v${version}_${target}.tar.gz"
fi
cp ./target/${target}/release/ffplayout .
tar -czvf "ffplayout-rs-v${version}_${target}.tar.gz" assets docs LICENSE README.md ffplayout
rm -f ffplayout
fi
echo ""
done
echo "Create debian package"
echo ""
cargo deb --target=x86_64-unknown-linux-musl
mv ./target/x86_64-unknown-linux-musl/debian/ffplayout-engine_${version}_amd64.deb .
echo ""
echo "Create rhel package"
echo ""
cargo generate-rpm --target=x86_64-unknown-linux-musl
mv ./target/x86_64-unknown-linux-musl/generate-rpm/ffplayout-engine-${version}-1.x86_64.rpm .

84
docs/developer.md Normal file
View File

@ -0,0 +1,84 @@
### Cross Compile
For cross compiling on fedora linux, you need to install some extra packages:
- mingw compiler:
```
dnf install mingw64-filesystem mingw64-binutils mingw64-gcc{,-c++} mingw64-crt mingw64-headers mingw64-pkg-config mingw64-hamlib mingw64-libpng mingw64-libusbx mingw64-portaudio mingw64-fltk mingw64-libgnurx mingw64-gettext mingw64-winpthreads-static intltool
```
- rust tools:
```
rustup target add x86_64-pc-windows-gnu
```
[Cross](https://github.com/cross-rs/cross#dependencies) could be an option to.
To build, run: `cargo build --release --target=x86_64-pc-windows-gnu`
### Static Linking
Running `cargo build` ends up in a binary which depend on **libc.so**. But you can compile also the binary totally static:
- install musl compiler:
- `dnf install musl-gcc`
- add target:
- `rustup target add x86_64-unknown-linux-musl`
Compile with: `cargo build --release --target=x86_64-unknown-linux-musl`.
This release should run on any Linux distro.
### Compile from Linux for macOS
Add toolchain:
```Bash
# for arm64
rustup target add aarch64-apple-darwin
# for x86_64
rustup target add x86_64-apple-darwin
```
Add linker and ar settings to `~/.cargo/config`:
```Bash
[target.x86_64-apple-darwin]
linker = "x86_64-apple-darwin20.4-clang"
ar = "x86_64-apple-darwin20.4-ar"
[target.aarch64-apple-darwin]
linker = "aarch64-apple-darwin20.4-clang"
ar = "aarch64-apple-darwin20.4-ar"
```
Follow this guide: [rust-cross-compile-linux-to-macos](https://wapl.es/rust/2019/02/17/rust-cross-compile-linux-to-macos.html)
Or setup [osxcross](https://github.com/tpoechtrager/osxcross) correctly.
Add **osxcross/target/bin** to your **PATH** and run cargo with:
```Bash
# for arm64
CC="aarch64-apple-darwin20.4-clang -arch arm64e" cargo build --release --target=aarch64-apple-darwin
# for x86_64
CC="o64-clang" cargo build --release --target=x86_64-apple-darwin
```
### Create debian DEB and RHEL RPM packages
install:
- `cargo install cargo-deb`
- `cargo install cargo-generate-rpm`
And run with:
```Bash
# for debian based systems:
cargo deb --target=x86_64-unknown-linux-musl
# for rhel based systems:
cargo generate-rpm --target=x86_64-unknown-linux-musl
```

10
docs/folder_mode.md Normal file
View File

@ -0,0 +1,10 @@
### Folder Mode
ffplayout can play files from a folder, no playlists are required for this mode. This folder is monitored for changes, and when new files are added or deleted, this is registered and updated accordingly.
You just have to set `mode: folder` in the config under `processing:` and under `storage:` you have to enter the correct folder and the file extensions you want to scan for.
Additionally there is a **shuffle** mode, if this is activated, the files will be played randomly.
If shuffle mode is off, the clips will be played in sorted order.

21
docs/live_ingest.md Normal file
View File

@ -0,0 +1,21 @@
### Live Ingest
With live ingest you have the possibility to switch from playlist, or folder mode to a live stream.
It works in a way, that it crate a ffmpeg instance in _listen_ (_server_) mode. For example when you stream over RTMP to it, you can set the ingest input parameters to:
```
-f live_flv -listen 1 -i rtmp://localhost:1936/live/stream
```
Have in mind, that the ingest mode **can't** pull from a server, it only can act as its own server and listen for income.
When it notice a incoming stream, it will stop the playlist playing and continue the live source. The output will not interrupt, so you have a continuously output stream.
In rare cases it can happen, that for a short moment after switching the image freezes, but then it will continue. Also a short frame flickering can happen.
You need to know, that **ffmpeg in current version has no authentication mechanism and it just listen to the protocol and port (no path or app name).**
For security you should not expose the ingest to the world. You localhost only, with an relay/reverse proxy where you can make your authentication. You could also use a [patch](https://gist.github.com/jb-alvarado/f8ee1e7a3cf5e482e818338f2b62c95f) for ffmpeg, but there is no guarantee if this really works.
In theory you can use every [protocol](https://ffmpeg.org/ffmpeg-protocols.html) from ffmpeg which support a **listen** mode.

57
docs/output.md Normal file
View File

@ -0,0 +1,57 @@
ffplayout supports different types of outputs, let's explain them a bit:
## Stream
The streaming output can be used for ever kind of classical streaming. For example for **rtmp, srt, rtp** etc. Every streaming type, which are supported from ffmpeg should be working
### Multiple Outputs:
If you would like to have multiple outputs, you can add you settings to `output_param:` like:
```yam
...
output_param: >-
...
-flags +global_header
-f flv rtmp://127.0.0.1/live/big
-s 1280x720
-c:v libx264
-crf 23
-x264-params keyint=50:min-keyint=25:scenecut=-1
-maxrate 2400k
-bufsize 4800k
-preset medium
-profile:v Main
-level 3.1
-c:a aac
-ar 44100
-b:a 128k
-flags +global_header
-f flv rtmp://127.0.0.1/live/middle
-s 640x360
-c:v libx264
-crf 23
-x264-params keyint=50:min-keyint=25:scenecut=-1
-maxrate 600k
-bufsize 1200k
-preset medium
-profile:v Main
-level 3.1
-c:a aac
-ar 44100
-b:a 128k
-flags +global_header
-f flv rtmp://127.0.0.1/live/small
```
## Desktop
In desktop mode you will get your picture on screen. For this you need a desktop system, theoretical all platforms should work here. ffplayout will need for that **ffplay**.
## HLS
In this mode you can output directly to a hls playlist. The nice thing here is, that ffplayout need less resources then in streaming mode.
#### Activating Output
To use one of the outputs you need to edit the **ffplayout.yml** config, here under **out** set your **mode** and use the different **output** options.

17
docs/remote_source.md Normal file
View File

@ -0,0 +1,17 @@
### Video from URL
Videos from URL are videos where you can watch directly in browser or download, for example:
```json
{
"in": 0,
"out": 149,
"duration": 149,
"source": "https://example.org/big_buck_bunny.webm"
}
```
This should work in general, because most time it have a duration information and it is faster playable then a real live stream source. Avoid seeking because it can take to much time.
**Live streams as input in playlist, like rtmp is not supported.**
Be careful with it, better test it multiple times!

388
src/filter/mod.rs Normal file
View File

@ -0,0 +1,388 @@
use std::path::Path;
use simplelog::*;
pub mod v_drawtext;
use crate::utils::{get_delta, is_close, GlobalConfig, Media};
#[derive(Debug, Clone)]
struct Filters {
audio_chain: Option<String>,
video_chain: Option<String>,
audio_map: Option<String>,
video_map: Option<String>,
}
impl Filters {
fn new() -> Self {
Filters {
audio_chain: None,
video_chain: None,
audio_map: Some("0:a".to_string()),
video_map: Some("0:v".to_string()),
}
}
fn add_filter(&mut self, filter: &str, codec_type: &str) {
match codec_type {
"audio" => match &self.audio_chain {
Some(ac) => {
if filter.starts_with(";") || filter.starts_with("[") {
self.audio_chain = Some(format!("{ac}{filter}"))
} else {
self.audio_chain = Some(format!("{ac},{filter}"))
}
}
None => {
if filter.contains("aevalsrc") || filter.contains("anoisesrc") {
self.audio_chain = Some(filter.to_string());
} else {
self.audio_chain =
Some(format!("[{}]{filter}", self.audio_map.clone().unwrap()));
}
self.audio_map = Some("[aout1]".to_string());
}
},
"video" => match &self.video_chain {
Some(vc) => {
if filter.starts_with(";") || filter.starts_with("[") {
self.video_chain = Some(format!("{vc}{filter}"))
} else {
self.video_chain = Some(format!("{vc},{filter}"))
}
}
None => {
self.video_chain = Some(format!("[0:v]{filter}"));
self.video_map = Some("[vout1]".to_string());
}
},
_ => (),
}
}
}
fn deinterlace(field_order: Option<String>, chain: &mut Filters) {
if let Some(order) = field_order {
if &order != "progressive" {
chain.add_filter("yadif=0:-1:0", "video")
}
}
}
fn pad(aspect: f64, chain: &mut Filters, config: &GlobalConfig) {
if !is_close(aspect, config.processing.aspect, 0.03) {
if aspect < config.processing.aspect {
chain.add_filter(
&format!(
"pad=ih*{}/{}/sar:ih:(ow-iw)/2:(oh-ih)/2",
config.processing.width, config.processing.height
),
"video",
)
} else if aspect > config.processing.aspect {
chain.add_filter(
&format!(
"pad=iw:iw*{}/{}/sar:(ow-iw)/2:(oh-ih)/2",
config.processing.width, config.processing.height
),
"video",
)
}
}
}
fn fps(fps: f64, chain: &mut Filters, config: &GlobalConfig) {
if fps != config.processing.fps {
chain.add_filter(
&format!("fps={}", config.processing.fps),
"video",
)
}
}
fn scale(width: i64, height: i64, aspect: f64, chain: &mut Filters, config: &GlobalConfig) {
if width != config.processing.width || height != config.processing.height {
chain.add_filter(
&format!(
"scale={}:{}",
config.processing.width, config.processing.height
),
"video",
)
}
if !is_close(aspect, config.processing.aspect, 0.03) {
chain.add_filter(
&format!("setdar=dar={}", config.processing.aspect),
"video"
)
}
}
fn fade(node: &mut Media, chain: &mut Filters, codec_type: &str) {
let mut t = "";
if codec_type == "audio" {
t = "a"
}
if node.seek > 0.0 {
chain.add_filter(&format!("{t}fade=in:st=0:d=0.5"), codec_type)
}
if node.out != node.duration && node.out - node.seek - 1.0 > 0.0 {
chain.add_filter(
&format!("{t}fade=out:st={}:d=1.0", (node.out - node.seek - 1.0)),
codec_type,
)
}
}
fn overlay(node: &mut Media, chain: &mut Filters, config: &GlobalConfig) {
if config.processing.add_logo
&& Path::new(&config.processing.logo).is_file()
&& &node.category.clone().unwrap_or(String::new()) != "advertisement"
{
let opacity = format!(
"format=rgba,colorchannelmixer=aa={}",
config.processing.logo_opacity
);
let logo_loop = "loop=loop=-1:size=1:start=0";
let mut logo_chain = format!(
"null[v];movie={},{logo_loop},{opacity}",
config.processing.logo
);
if node.last_ad.unwrap() {
logo_chain.push_str(",fade=in:st=0:d=1.0:alpha=1")
}
if node.next_ad.unwrap() {
logo_chain.push_str(
format!(",fade=out:st={}:d=1.0:alpha=1", node.out - node.seek - 1.0).as_str(),
)
}
logo_chain
.push_str(format!("[l];[v][l]{}:shortest=1", config.processing.logo_filter).as_str());
chain.add_filter(&logo_chain, "video");
}
}
fn extend_video(node: &mut Media, chain: &mut Filters) {
let video_streams = node.probe.clone().unwrap().video_streams.unwrap();
if video_streams.len() > 0 {
if let Some(duration) = &video_streams[0].duration {
let duration_float = duration.clone().parse::<f64>().unwrap();
if node.out - node.seek > duration_float - node.seek + 0.1 {
chain.add_filter(
&format!(
"tpad=stop_mode=add:stop_duration={}",
(node.out - node.seek) - (duration_float - node.seek)
),
"video",
)
}
}
}
}
fn add_text(node: &mut Media, chain: &mut Filters, config: &GlobalConfig) {
// add drawtext filter for lower thirds messages
if config.text.add_text && config.text.over_pre {
let filter = v_drawtext::filter_node(node);
chain.add_filter(&filter, "video");
if let Some(filters) = &chain.video_chain {
for (i, f) in filters.split(",").enumerate() {
if f.contains("drawtext") && !config.text.text_from_filename {
debug!("drawtext node is on index: <yellow>{i}</>");
break;
}
}
}
}
}
fn add_audio(node: &mut Media, chain: &mut Filters) {
let audio_streams = node.probe.clone().unwrap().audio_streams.unwrap();
if audio_streams.len() == 0 {
warn!("Clip: '{}' has no audio!", node.source);
let audio = format!(
"aevalsrc=0:channel_layout=stereo:duration={}:sample_rate=48000",
node.out - node.seek
);
chain.add_filter(&audio, "audio");
}
}
fn extend_audio(node: &mut Media, chain: &mut Filters) {
let audio_streams = node.probe.clone().unwrap().audio_streams.unwrap();
if audio_streams.len() > 0 {
if let Some(duration) = &audio_streams[0].duration {
let duration_float = duration.clone().parse::<f64>().unwrap();
if node.out - node.seek > duration_float - node.seek + 0.1 {
chain.add_filter(
&format!("apad=whole_dur={}", node.out - node.seek),
"audio",
)
}
}
}
}
fn add_loudnorm(node: &mut Media, chain: &mut Filters, config: &GlobalConfig) {
// add single pass loudnorm filter to audio line
if node.probe.is_some()
&& node.probe.clone().unwrap().audio_streams.unwrap().len() > 0
&& config.processing.add_loudnorm
{
let loud_filter = format!(
"loudnorm=I={}:TP={}:LRA={}",
config.processing.loud_i, config.processing.loud_tp, config.processing.loud_lra
);
chain.add_filter(&loud_filter, "audio");
}
}
fn audio_volume(chain: &mut Filters, config: &GlobalConfig) {
if config.processing.volume != 1.0 {
chain.add_filter(
&format!("volume={}", config.processing.volume),
"audio",
)
}
}
fn aspect_calc(aspect_string: String) -> f64 {
let aspect_vec: Vec<&str> = aspect_string.split(':').collect();
let w: f64 = aspect_vec[0].parse().unwrap();
let h: f64 = aspect_vec[1].parse().unwrap();
let source_aspect: f64 = w as f64 / h as f64;
source_aspect
}
fn fps_calc(r_frame_rate: String) -> f64 {
let frame_rate_vec: Vec<&str> = r_frame_rate.split('/').collect();
let rate: f64 = frame_rate_vec[0].parse().unwrap();
let factor: f64 = frame_rate_vec[1].parse().unwrap();
let fps: f64 = rate / factor;
fps
}
fn realtime_filter(
node: &mut Media,
chain: &mut Filters,
config: &GlobalConfig,
codec_type: &str,
) {
// this realtime filter is important for HLS output to stay in sync
let mut t = "";
if codec_type == "audio" {
t = "a"
}
if &config.out.mode.to_lowercase() == "hls" {
let mut speed_filter = format!("{t}realtime=speed=1");
let (delta, _) = get_delta(&node.begin.unwrap());
let duration = node.out - node.seek;
if delta < 0.0 {
let speed = duration / (duration + delta);
if speed > 0.0 && speed < 1.1 && delta < config.general.stop_threshold {
speed_filter = format!("{t}realtime=speed={speed}");
}
}
chain.add_filter(&speed_filter, codec_type);
}
}
pub fn filter_chains(node: &mut Media) -> Vec<String> {
let config = GlobalConfig::global();
let mut filters = Filters::new();
let mut audio_map = "1:a".to_string();
filters.audio_map = Some(audio_map);
if let Some(probe) = node.probe.clone() {
if probe.audio_streams.is_some() {
audio_map = "0:a".to_string();
filters.audio_map = Some(audio_map);
}
let v_stream = &probe.video_streams.unwrap()[0];
let aspect = aspect_calc(v_stream.display_aspect_ratio.clone().unwrap());
let frame_per_sec = fps_calc(v_stream.r_frame_rate.clone());
deinterlace(v_stream.field_order.clone(), &mut filters);
pad(aspect, &mut filters, &config);
fps(frame_per_sec, &mut filters, &config);
scale(
v_stream.width.unwrap(),
v_stream.height.unwrap(),
aspect,
&mut filters,
&config,
);
extend_video(node, &mut filters);
add_audio(node, &mut filters);
extend_audio(node, &mut filters);
}
add_text(node, &mut filters, &config);
fade(node, &mut filters, "video".into());
overlay(node, &mut filters, &config);
realtime_filter(node, &mut filters, &config, "video".into());
add_loudnorm(node, &mut filters, &config);
fade(node, &mut filters, "audio".into());
audio_volume(&mut filters, &config);
realtime_filter(node, &mut filters, &config, "audio".into());
let mut filter_cmd = vec![];
let mut filter_str: String = String::new();
let mut filter_map: Vec<String> = vec![];
if let Some(v_filters) = filters.video_chain {
filter_str.push_str(v_filters.as_str());
filter_str.push_str(filters.video_map.clone().unwrap().as_str());
filter_map.append(&mut vec!["-map".to_string(), filters.video_map.unwrap()]);
} else {
filter_map.append(&mut vec!["-map".to_string(), "0:v".to_string()]);
}
if let Some(a_filters) = filters.audio_chain {
if filter_str.len() > 10 {
filter_str.push_str(";")
}
filter_str.push_str(a_filters.as_str());
filter_str.push_str(filters.audio_map.clone().unwrap().as_str());
filter_map.append(&mut vec!["-map".to_string(), filters.audio_map.unwrap()]);
} else {
filter_map.append(&mut vec!["-map".to_string(), filters.audio_map.unwrap()]);
}
if filter_str.len() > 10 {
filter_cmd.push("-filter_complex".to_string());
filter_cmd.push(filter_str);
}
filter_cmd.append(&mut filter_map);
filter_cmd
}

40
src/filter/v_drawtext.rs Normal file
View File

@ -0,0 +1,40 @@
use std::path::Path;
use regex::Regex;
use crate::utils::{GlobalConfig, Media};
pub fn filter_node(node: &mut Media) -> String {
let config = GlobalConfig::global();
let mut filter = String::new();
let mut font = String::new();
if config.text.add_text {
if Path::new(&config.text.fontfile).is_file() {
font = format!(":fontfile='{}'", config.text.fontfile)
}
if config.text.over_pre && config.text.text_from_filename {
let source = node.source.clone();
let regex: Regex = Regex::new(&config.text.regex).unwrap();
let text: String = match regex.captures(&source) {
Some(t) => t[1].to_string(),
None => source,
};
let escape = text
.replace("'", "'\\\\\\''")
.replace("%", "\\\\\\%")
.replace(":", "\\:");
filter = format!("drawtext=text='{escape}':{}{font}", config.text.style)
} else {
filter = format!(
"zmq=b=tcp\\\\://'{}',drawtext=text=''{font}",
config.text.bind_address.replace(":", "\\:")
)
}
}
filter
}

210
src/input/folder.rs Normal file
View File

@ -0,0 +1,210 @@
use notify::{
DebouncedEvent::{Create, Remove, Rename},
{watcher, RecursiveMode, Watcher},
};
use rand::{seq::SliceRandom, thread_rng};
use simplelog::*;
use std::{
ffi::OsStr,
path::Path,
sync::{
mpsc::channel,
{Arc, Mutex},
},
thread::sleep,
time::Duration,
};
use walkdir::WalkDir;
use crate::utils::{get_sec, GlobalConfig, Media};
#[derive(Debug, Clone)]
pub struct Source {
config: GlobalConfig,
pub nodes: Arc<Mutex<Vec<Media>>>,
current_node: Media,
index: Arc<Mutex<usize>>,
}
impl Source {
pub fn new(current_list: Arc<Mutex<Vec<Media>>>, global_index: Arc<Mutex<usize>>) -> Self {
let config = GlobalConfig::global();
let mut media_list = vec![];
let mut index: usize = 0;
for entry in WalkDir::new(config.storage.path.clone())
.into_iter()
.filter_map(|e| e.ok())
{
if entry.path().is_file() {
let ext = file_extension(entry.path());
if ext.is_some()
&& config
.storage
.extensions
.clone()
.contains(&ext.unwrap().to_lowercase())
{
let media = Media::new(0, entry.path().display().to_string(), false);
media_list.push(media);
}
}
}
if config.storage.shuffle {
info!("Shuffle files");
let mut rng = thread_rng();
media_list.shuffle(&mut rng);
} else {
media_list.sort_by(|d1, d2| d1.source.cmp(&d2.source));
}
for item in media_list.iter_mut() {
item.index = Some(index);
index += 1;
}
*current_list.lock().unwrap() = media_list;
Self {
config: config.clone(),
nodes: current_list,
current_node: Media::new(0, String::new(), false),
index: global_index,
}
}
fn shuffle(&mut self) {
let mut rng = thread_rng();
self.nodes.lock().unwrap().shuffle(&mut rng);
let mut index: usize = 0;
for item in self.nodes.lock().unwrap().iter_mut() {
item.index = Some(index);
index += 1;
}
}
fn sort(&mut self) {
self.nodes
.lock()
.unwrap()
.sort_by(|d1, d2| d1.source.cmp(&d2.source));
let mut index: usize = 0;
for item in self.nodes.lock().unwrap().iter_mut() {
item.index = Some(index);
index += 1;
}
}
}
impl Iterator for Source {
type Item = Media;
fn next(&mut self) -> Option<Self::Item> {
if *self.index.lock().unwrap() < self.nodes.lock().unwrap().len() {
let i = *self.index.lock().unwrap();
self.current_node = self.nodes.lock().unwrap()[i].clone();
self.current_node.add_probe();
self.current_node.add_filter();
self.current_node.begin = Some(get_sec());
*self.index.lock().unwrap() += 1;
Some(self.current_node.clone())
} else {
if self.config.storage.shuffle {
if self.config.general.generate.is_none() {
info!("Shuffle files");
}
self.shuffle();
} else {
if self.config.general.generate.is_none() {
info!("Sort files");
}
self.sort();
}
self.current_node = self.nodes.lock().unwrap()[0].clone();
self.current_node.add_probe();
self.current_node.add_filter();
self.current_node.begin = Some(get_sec());
*self.index.lock().unwrap() = 1;
Some(self.current_node.clone())
}
}
}
fn file_extension(filename: &Path) -> Option<&str> {
filename.extension().and_then(OsStr::to_str)
}
pub async fn watchman(
sources: Arc<Mutex<Vec<Media>>>,
is_terminated: Arc<Mutex<bool>>,
) {
let config = GlobalConfig::global();
let (tx, rx) = channel();
let path = config.storage.path.clone();
if !Path::new(&path).exists() {
error!("Folder path not exists: '{path}'");
panic!("Folder path not exists: '{path}'");
}
let mut watcher = watcher(tx, Duration::from_secs(2)).unwrap();
watcher.watch(path, RecursiveMode::Recursive).unwrap();
loop {
if *is_terminated.lock().unwrap() {
break
}
if let Ok(res) = rx.try_recv() {
match res {
Create(new_path) => {
let index = sources.lock().unwrap().len();
let media = Media::new(index, new_path.display().to_string(), false);
sources.lock().unwrap().push(media);
info!("Create new file: <b><magenta>{new_path:?}</></b>");
}
Remove(old_path) => {
sources
.lock()
.unwrap()
.retain(|x| x.source != old_path.display().to_string());
info!("Remove file: <b><magenta>{old_path:?}</></b>");
}
Rename(old_path, new_path) => {
let index = sources
.lock()
.unwrap()
.iter()
.position(|x| *x.source == old_path.display().to_string())
.unwrap();
let media = Media::new(index, new_path.display().to_string(), false);
sources.lock().unwrap()[index] = media;
info!("Rename file: <b><magenta>{old_path:?}</></b> to <b><magenta>{new_path:?}</></b>");
}
_ => (),
}
}
sleep(Duration::from_secs(4));
}
}

165
src/input/ingest.rs Normal file
View File

@ -0,0 +1,165 @@
use std::{
io::{BufReader, Error, Read},
path::Path,
process::{Command, Stdio},
sync::mpsc::SyncSender,
thread::sleep,
time::Duration,
};
use simplelog::*;
use tokio::runtime::Handle;
use crate::utils::{stderr_reader, GlobalConfig, Ingest, ProcessControl};
fn overlay(config: &GlobalConfig) -> String {
let mut logo_chain = String::new();
if config.processing.add_logo && Path::new(&config.processing.logo).is_file() {
let opacity = format!(
"format=rgba,colorchannelmixer=aa={}",
config.processing.logo_opacity
);
let logo_loop = "loop=loop=-1:size=1:start=0";
logo_chain = format!("[v];movie={},{logo_loop},{opacity}", config.processing.logo);
logo_chain
.push_str(format!("[l];[v][l]{}:shortest=1", config.processing.logo_filter).as_str());
}
logo_chain
}
fn audio_filter(config: &GlobalConfig) -> String {
let mut audio_chain = ";[0:a]afade=in:st=0:d=0.5".to_string();
if config.processing.add_loudnorm {
audio_chain.push_str(
format!(
",loudnorm=I={}:TP={}:LRA={}",
config.processing.loud_i, config.processing.loud_tp, config.processing.loud_lra
)
.as_str(),
);
}
if config.processing.volume != 1.0 {
audio_chain.push_str(format!(",volume={}", config.processing.volume).as_str());
}
audio_chain.push_str("[aout1]");
audio_chain
}
pub async fn ingest_server(
log_format: String,
ingest_sender: SyncSender<(usize, [u8; 65088])>,
rt_handle: Handle,
mut proc_control: ProcessControl,
) -> Result<(), Error> {
let config = GlobalConfig::global();
let mut buffer: [u8; 65088] = [0; 65088];
let mut filter = format!(
"[0:v]fps={},scale={}:{},setdar=dar={},fade=in:st=0:d=0.5",
config.processing.fps,
config.processing.width,
config.processing.height,
config.processing.aspect
);
filter.push_str(&overlay(&config));
filter.push_str("[vout1]");
filter.push_str(audio_filter(&config).as_str());
let mut filter_list = vec![
"-filter_complex",
&filter,
"-map",
"[vout1]",
"-map",
"[aout1]",
];
let mut server_cmd = vec!["-hide_banner", "-nostats", "-v", log_format.as_str()];
let stream_input = config.ingest.input_cmd.clone().unwrap();
let stream_settings = config.processing.settings.clone().unwrap();
server_cmd.append(&mut stream_input.iter().map(String::as_str).collect());
server_cmd.append(&mut filter_list);
server_cmd.append(&mut stream_settings.iter().map(String::as_str).collect());
let mut is_running;
info!(
"Start ingest server, listening on: <b><magenta>{}</></b>",
stream_input.last().unwrap()
);
debug!(
"Server CMD: <bright-blue>\"ffmpeg {}\"</>",
server_cmd.join(" ")
);
loop {
if *proc_control.is_terminated.lock().unwrap() {
break;
}
let mut server_proc = match Command::new("ffmpeg")
.args(server_cmd.clone())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
{
Err(e) => {
error!("couldn't spawn ingest server: {e}");
panic!("couldn't spawn ingest server: {e}")
}
Ok(proc) => proc,
};
rt_handle.spawn(stderr_reader(server_proc.stderr.take().unwrap(), "Server"));
let mut ingest_reader = BufReader::new(server_proc.stdout.take().unwrap());
*proc_control.server_term.lock().unwrap() = Some(server_proc);
is_running = false;
loop {
let bytes_len = match ingest_reader.read(&mut buffer[..]) {
Ok(length) => length,
Err(e) => {
debug!("Ingest server read {e:?}");
break;
}
};
if !is_running {
*proc_control.server_is_running.lock().unwrap() = true;
is_running = true;
}
if bytes_len > 0 {
if let Err(e) = ingest_sender.send((bytes_len, buffer)) {
error!("Ingest server write error: {e:?}");
*proc_control.is_terminated.lock().unwrap() = true;
break;
}
} else {
break;
}
}
drop(ingest_reader);
*proc_control.server_is_running.lock().unwrap() = false;
sleep(Duration::from_secs(1));
if let Err(e) = proc_control.wait(Ingest) {
error!("{e}")
}
}
Ok(())
}

56
src/input/mod.rs Normal file
View File

@ -0,0 +1,56 @@
use std::{
process,
sync::{Arc, Mutex},
};
use simplelog::*;
use tokio::runtime::Handle;
use crate::utils::{GlobalConfig, Media, PlayoutStatus};
pub mod folder;
pub mod ingest;
pub mod playlist;
pub use folder::{watchman, Source};
pub use ingest::ingest_server;
pub use playlist::CurrentProgram;
pub fn source_generator(
rt_handle: &Handle,
config: GlobalConfig,
current_list: Arc<Mutex<Vec<Media>>>,
index: Arc<Mutex<usize>>,
playout_stat: PlayoutStatus,
is_terminated: Arc<Mutex<bool>>,
) -> Box<dyn Iterator<Item = Media>> {
let get_source = match config.processing.clone().mode.as_str() {
"folder" => {
info!("Playout in folder mode");
debug!("Monitor folder: <b><magenta>{}</></b>", &config.storage.path);
let folder_source = Source::new(current_list, index);
rt_handle.spawn(watchman(folder_source.nodes.clone(), is_terminated.clone()));
Box::new(folder_source) as Box<dyn Iterator<Item = Media>>
}
"playlist" => {
info!("Playout in playlist mode");
let program = CurrentProgram::new(
rt_handle.clone(),
playout_stat,
is_terminated.clone(),
current_list,
index,
);
Box::new(program) as Box<dyn Iterator<Item = Media>>
}
_ => {
error!("Process Mode not exists!");
process::exit(0x0100);
}
};
get_source
}

521
src/input/playlist.rs Normal file
View File

@ -0,0 +1,521 @@
use std::{
fs,
path::Path,
sync::{Arc, Mutex},
};
use serde_json::json;
use simplelog::*;
use tokio::runtime::Handle;
use crate::utils::{
check_sync, gen_dummy, get_delta, get_sec, is_close, json_serializer::read_json, modified_time,
seek_and_length, GlobalConfig, Media, PlayoutStatus, DUMMY_LEN,
};
#[derive(Debug)]
pub struct CurrentProgram {
config: GlobalConfig,
start_sec: f64,
json_mod: Option<String>,
json_path: Option<String>,
json_date: String,
pub nodes: Arc<Mutex<Vec<Media>>>,
current_node: Media,
index: Arc<Mutex<usize>>,
rt_handle: Handle,
is_terminated: Arc<Mutex<bool>>,
playout_stat: PlayoutStatus,
}
impl CurrentProgram {
pub fn new(
rt_handle: Handle,
playout_stat: PlayoutStatus,
is_terminated: Arc<Mutex<bool>>,
current_list: Arc<Mutex<Vec<Media>>>,
global_index: Arc<Mutex<usize>>,
) -> Self {
let config = GlobalConfig::global();
let json = read_json(None, rt_handle.clone(), is_terminated.clone(), true, 0.0);
*current_list.lock().unwrap() = json.program;
*playout_stat.current_date.lock().unwrap() = json.date.clone();
if *playout_stat.date.lock().unwrap() != json.date {
let data = json!({
"time_shift": 0.0,
"date": json.date,
});
let json: String = serde_json::to_string(&data).expect("Serialize status data failed");
fs::write(config.general.stat_file.clone(), &json).expect("Unable to write file");
}
Self {
config: config.clone(),
start_sec: json.start_sec.unwrap(),
json_mod: json.modified,
json_path: json.current_file,
json_date: json.date,
nodes: current_list,
current_node: Media::new(0, String::new(), false),
index: global_index,
rt_handle,
is_terminated,
playout_stat,
}
}
fn check_update(&mut self, seek: bool) {
if self.json_path.is_none() {
let json = read_json(
None,
self.rt_handle.clone(),
self.is_terminated.clone(),
seek,
0.0,
);
self.json_path = json.current_file;
self.json_mod = json.modified;
*self.nodes.lock().unwrap() = json.program;
} else if Path::new(&self.json_path.clone().unwrap()).is_file() {
let mod_time = modified_time(&self.json_path.clone().unwrap());
if !mod_time
.unwrap()
.to_string()
.eq(&self.json_mod.clone().unwrap())
{
// when playlist has changed, reload it
info!(
"Reload playlist <b><magenta>{}</></b>",
self.json_path.clone().unwrap()
);
let json = read_json(
self.json_path.clone(),
self.rt_handle.clone(),
self.is_terminated.clone(),
false,
0.0,
);
self.json_mod = json.modified;
*self.nodes.lock().unwrap() = json.program;
self.get_current_clip();
*self.index.lock().unwrap() += 1;
}
} else {
error!(
"Playlist <b><magenta>{}</></b> not exists!",
self.json_path.clone().unwrap()
);
let mut media = Media::new(0, String::new(), false);
media.begin = Some(get_sec());
media.duration = DUMMY_LEN;
media.out = DUMMY_LEN;
self.json_path = None;
*self.nodes.lock().unwrap() = vec![media.clone()];
self.current_node = media;
*self.playout_stat.list_init.lock().unwrap() = true;
*self.index.lock().unwrap() = 0;
}
}
fn check_for_next_playlist(&mut self) {
let current_time = get_sec();
let start_sec = self.config.playlist.start_sec.unwrap();
let target_length = self.config.playlist.length_sec.unwrap();
let (delta, total_delta) = get_delta(&current_time);
let mut duration = self.current_node.out.clone();
if self.current_node.duration > self.current_node.out {
duration = self.current_node.duration.clone()
}
let next_start = self.current_node.begin.unwrap() - start_sec + duration + delta;
if next_start >= target_length
|| is_close(total_delta, 0.0, 2.0)
|| is_close(total_delta, target_length, 2.0)
{
let json = read_json(
None,
self.rt_handle.clone(),
self.is_terminated.clone(),
false,
next_start,
);
let data = json!({
"time_shift": 0.0,
"date": json.date,
});
*self.playout_stat.current_date.lock().unwrap() = json.date.clone();
*self.playout_stat.time_shift.lock().unwrap() = 0.0;
let status_data: String =
serde_json::to_string(&data).expect("Serialize status data failed");
fs::write(self.config.general.stat_file.clone(), &status_data)
.expect("Unable to write file");
self.json_path = json.current_file.clone();
self.json_mod = json.modified;
self.json_date = json.date;
*self.nodes.lock().unwrap() = json.program;
*self.index.lock().unwrap() = 0;
if json.current_file.is_none() {
*self.playout_stat.list_init.lock().unwrap() = true;
}
}
}
fn last_next_ad(&mut self) {
let index = *self.index.lock().unwrap();
let current_list = self.nodes.lock().unwrap();
if index + 1 < current_list.len()
&& &current_list[index + 1].category.clone().unwrap_or(String::new()) == "advertisement"
{
self.current_node.next_ad = Some(true);
}
if index > 0
&& index < current_list.len()
&& &current_list[index - 1].category.clone().unwrap_or(String::new()) == "advertisement"
{
self.current_node.last_ad = Some(true);
}
}
fn get_current_time(&mut self) -> f64 {
let mut time_sec = get_sec();
if time_sec < self.start_sec {
time_sec += self.config.playlist.length_sec.unwrap()
}
time_sec
}
fn get_current_clip(&mut self) {
let mut time_sec = self.get_current_time();
if *self.playout_stat.current_date.lock().unwrap()
== *self.playout_stat.date.lock().unwrap()
&& *self.playout_stat.time_shift.lock().unwrap() != 0.0
{
let shift = *self.playout_stat.time_shift.lock().unwrap();
info!("Shift playlist start for <yellow>{shift}</> seconds");
time_sec += shift;
}
for (i, item) in self.nodes.lock().unwrap().iter_mut().enumerate() {
if item.begin.unwrap() + item.out - item.seek > time_sec {
*self.playout_stat.list_init.lock().unwrap() = false;
*self.index.lock().unwrap() = i;
break;
}
}
}
fn init_clip(&mut self) {
self.get_current_clip();
if !*self.playout_stat.list_init.lock().unwrap() {
let time_sec = self.get_current_time();
let index = *self.index.lock().unwrap();
// de-instance node to preserve original values in list
let mut node_clone = self.nodes.lock().unwrap()[index].clone();
*self.index.lock().unwrap() += 1;
node_clone.seek = time_sec - node_clone.begin.unwrap();
self.current_node = handle_list_init(node_clone);
}
}
}
impl Iterator for CurrentProgram {
type Item = Media;
fn next(&mut self) -> Option<Self::Item> {
if *self.playout_stat.list_init.lock().unwrap() {
debug!("Playlist init");
self.check_update(true);
if self.json_path.is_some() {
self.init_clip();
}
if *self.playout_stat.list_init.lock().unwrap() {
// on init load playlist, could be not long enough,
// so we check if we can take the next playlist already,
// or we fill the gap with a dummy.
let list_length = self.nodes.lock().unwrap().len();
self.current_node = self.nodes.lock().unwrap()[list_length - 1].clone();
self.check_for_next_playlist();
let new_node = self.nodes.lock().unwrap()[list_length - 1].clone();
let new_length = new_node.begin.unwrap() + new_node.duration;
if new_length
>= self.config.playlist.length_sec.unwrap()
+ self.config.playlist.start_sec.unwrap()
{
self.init_clip();
} else {
let mut current_time = get_sec();
let (_, total_delta) = get_delta(&current_time);
let mut duration = DUMMY_LEN;
if DUMMY_LEN > total_delta {
duration = total_delta;
*self.playout_stat.list_init.lock().unwrap() = false;
}
if self.config.playlist.start_sec.unwrap() > current_time {
current_time += self.config.playlist.length_sec.unwrap() + 1.0;
}
let mut media = Media::new(0, String::new(), false);
media.begin = Some(current_time);
media.duration = duration;
media.out = duration;
self.current_node = gen_source(media);
self.nodes.lock().unwrap().push(self.current_node.clone());
*self.index.lock().unwrap() = self.nodes.lock().unwrap().len();
}
}
self.last_next_ad();
return Some(self.current_node.clone());
}
if *self.index.lock().unwrap() < self.nodes.lock().unwrap().len() {
self.check_for_next_playlist();
let mut is_last = false;
let index = *self.index.lock().unwrap();
if index == self.nodes.lock().unwrap().len() - 1 {
is_last = true
}
self.current_node = timed_source(
self.nodes.lock().unwrap()[index].clone(),
&self.config,
is_last,
&self.playout_stat,
);
self.last_next_ad();
*self.index.lock().unwrap() += 1;
// update playlist should happen after current clip,
// to prevent unknown behaviors.
self.check_update(false);
Some(self.current_node.clone())
} else {
let last_playlist = self.json_path.clone();
let last_ad = self.current_node.last_ad.clone();
self.check_for_next_playlist();
let (_, total_delta) = get_delta(&self.config.playlist.start_sec.unwrap());
if last_playlist == self.json_path
&& total_delta.abs() > self.config.general.stop_threshold
{
// Test if playlist is to early finish,
// and if we have to fill it with a placeholder.
let index = *self.index.lock().unwrap();
self.current_node = Media::new(index, String::new(), false);
self.current_node.begin = Some(get_sec());
let mut duration = total_delta.abs();
if duration > DUMMY_LEN {
duration = DUMMY_LEN;
}
self.current_node.duration = duration;
self.current_node.out = duration;
self.current_node = gen_source(self.current_node.clone());
self.nodes.lock().unwrap().push(self.current_node.clone());
self.last_next_ad();
self.current_node.last_ad = last_ad;
self.current_node.add_filter();
*self.index.lock().unwrap() += 1;
return Some(self.current_node.clone());
}
*self.index.lock().unwrap() = 0;
self.current_node = gen_source(self.nodes.lock().unwrap()[0].clone());
self.last_next_ad();
self.current_node.last_ad = last_ad;
*self.index.lock().unwrap() = 1;
Some(self.current_node.clone())
}
}
}
fn timed_source(
node: Media,
config: &GlobalConfig,
last: bool,
playout_stat: &PlayoutStatus,
) -> Media {
// prepare input clip
// check begin and length from clip
// return clip only if we are in 24 hours time range
let (delta, total_delta) = get_delta(&node.begin.unwrap());
let mut shifted_delta = delta;
let mut new_node = node.clone();
new_node.process = Some(false);
if config.playlist.length.contains(":") {
let time_shift = playout_stat.time_shift.lock().unwrap();
if *playout_stat.current_date.lock().unwrap() == *playout_stat.date.lock().unwrap()
&& *time_shift != 0.0
{
shifted_delta = delta - *time_shift;
debug!("Delta: <yellow>{shifted_delta:.3}</>, shifted: <yellow>{delta:.3}</>");
} else {
debug!("Delta: <yellow>{shifted_delta:.3}</>");
}
debug!("Total time remaining: <yellow>{total_delta:.3}</>");
let sync = check_sync(shifted_delta);
if !sync {
new_node.cmd = None;
return new_node;
}
}
if (total_delta > node.out - node.seek && !last)
|| node.index.unwrap() < 2
|| !config.playlist.length.contains(":")
{
// when we are in the 24 hour range, get the clip
new_node = gen_source(node);
new_node.process = Some(true);
} else if total_delta <= 0.0 {
info!("Begin is over play time, skip: {}", node.source);
} else if total_delta < node.duration - node.seek || last {
new_node = handle_list_end(node, total_delta);
}
new_node
}
fn gen_source(mut node: Media) -> Media {
if Path::new(&node.source).is_file() {
node.add_probe();
node.cmd = Some(seek_and_length(
node.source.clone(),
node.seek,
node.out,
node.duration,
));
node.add_filter();
} else {
if node.source.chars().count() == 0 {
warn!(
"Generate filler with <yellow>{:.2}</> seconds length!",
node.out - node.seek
);
} else {
error!("File not found: {}", node.source);
}
let (source, cmd) = gen_dummy(node.out - node.seek);
node.source = source;
node.cmd = Some(cmd);
node.add_filter();
}
node
}
fn handle_list_init(mut node: Media) -> Media {
// handle init clip, but this clip can be the last one in playlist,
// this we have to figure out and calculate the right length
let (_, total_delta) = get_delta(&node.begin.unwrap());
let mut out = node.out;
if node.out - node.seek > total_delta {
out = total_delta + node.seek;
}
node.out = out;
let new_node = gen_source(node);
new_node
}
fn handle_list_end(mut node: Media, total_delta: f64) -> Media {
// when we come to last clip in playlist,
// or when we reached total playtime,
// we end up here
debug!("Playlist end");
let mut out = if node.seek > 0.0 {
node.seek + total_delta
} else {
total_delta
};
// prevent looping
if out > node.duration {
out = node.duration
} else {
warn!("Clip length is not in time, new duration is: <yellow>{total_delta:.2}</>")
}
if node.duration > total_delta && total_delta > 1.0 && node.duration - node.seek >= total_delta
{
node.out = out;
} else if node.duration > total_delta && total_delta < 1.0 {
warn!(
"Last clip less then 1 second long, skip: <b><magenta>{}</></b>",
node.source
);
node.out = out;
node.cmd = Some(seek_and_length(
node.source.clone(),
node.seek,
node.out,
node.duration,
));
node.process = Some(false);
return node;
} else {
error!("Playlist is not long enough: <yellow>{total_delta:.2}</> seconds needed");
}
node.process = Some(true);
node.cmd = Some(seek_and_length(
node.source.clone(),
node.seek,
node.out,
node.duration,
));
node
}

92
src/main.rs Normal file
View File

@ -0,0 +1,92 @@
extern crate log;
extern crate simplelog;
use std::{
path::PathBuf,
process::exit,
{fs, fs::File},
};
use serde::{Deserialize, Serialize};
use serde_json::json;
use simplelog::*;
use tokio::runtime::Builder;
mod filter;
mod input;
mod output;
mod rpc;
mod utils;
use crate::output::{player, write_hls};
use crate::utils::{
generate_playlist, init_config, init_logging, validate_ffmpeg, GlobalConfig, PlayerControl,
PlayoutStatus, ProcessControl,
};
use rpc::json_rpc_server;
#[derive(Serialize, Deserialize)]
struct StatusData {
time_shift: f64,
date: String,
}
fn main() {
init_config();
let config = GlobalConfig::global();
let play_control = PlayerControl::new();
let playout_stat = PlayoutStatus::new();
let proc_control = ProcessControl::new();
if !PathBuf::from(config.general.stat_file.clone()).exists() {
let data = json!({
"time_shift": 0.0,
"date": String::new(),
});
let json: String = serde_json::to_string(&data).expect("Serialize status data failed");
fs::write(config.general.stat_file.clone(), &json).expect("Unable to write file");
} else {
let stat_file = File::options()
.read(true)
.write(false)
.open(&config.general.stat_file)
.expect("Could not open status file");
let data: StatusData =
serde_json::from_reader(stat_file).expect("Could not read status file.");
*playout_stat.time_shift.lock().unwrap() = data.time_shift;
*playout_stat.date.lock().unwrap() = data.date;
}
let runtime = Builder::new_multi_thread().enable_all().build().unwrap();
let rt_handle = runtime.handle();
let logging = init_logging(rt_handle.clone(), proc_control.is_terminated.clone());
CombinedLogger::init(logging).unwrap();
validate_ffmpeg();
if let Some(range) = config.general.generate.clone() {
generate_playlist(range);
exit(0);
}
if config.rpc_server.enable {
rt_handle.spawn(json_rpc_server(
play_control.clone(),
playout_stat.clone(),
proc_control.clone(),
));
}
if &config.out.mode.to_lowercase() == "hls" {
write_hls(rt_handle, play_control, playout_stat, proc_control);
} else {
player(rt_handle, play_control, playout_stat, proc_control);
}
info!("Playout done...");
}

54
src/output/desktop.rs Normal file
View File

@ -0,0 +1,54 @@
use std::{
process,
process::{Command, Stdio},
};
use simplelog::*;
use crate::filter::v_drawtext;
use crate::utils::{GlobalConfig, Media};
pub fn output(log_format: &str) -> process::Child {
let config = GlobalConfig::global();
let mut enc_filter: Vec<String> = vec![];
let mut enc_cmd = vec![
"-hide_banner",
"-nostats",
"-v",
log_format,
"-i",
"pipe:0",
];
if config.text.add_text && !config.text.over_pre {
info!(
"Using drawtext filter, listening on address: <yellow>{}</>",
config.text.bind_address
);
let mut filter: String = "null,".to_string();
filter.push_str(v_drawtext::filter_node(&mut Media::new(0, String::new(), false)).as_str());
enc_filter = vec!["-vf".to_string(), filter];
}
enc_cmd.append(&mut enc_filter.iter().map(String::as_str).collect());
debug!("Encoder CMD: <bright-blue>\"ffplay {}\"</>", enc_cmd.join(" "));
let enc_proc = match Command::new("ffplay")
.args(enc_cmd)
.stdin(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
{
Err(e) => {
error!("couldn't spawn encoder process: {e}");
panic!("couldn't spawn encoder process: {e}")
}
Ok(proc) => proc,
};
enc_proc
}

105
src/output/hls.rs Normal file
View File

@ -0,0 +1,105 @@
/*
This module write the files compression directly to a hls (m3u8) playlist,
without pre- and post-processing.
Example config:
out:
output_param: >-
...
-flags +cgop
-f hls
-hls_time 6
-hls_list_size 600
-hls_flags append_list+delete_segments+omit_endlist+program_date_time
-hls_segment_filename /var/www/html/live/stream-%09d.ts /var/www/html/live/stream.m3u8
*/
use std::{
process::{Command, Stdio},
};
use simplelog::*;
use tokio::runtime::Handle;
use crate::input::source_generator;
use crate::utils::{
sec_to_time, stderr_reader, GlobalConfig, PlayerControl, PlayoutStatus, ProcessControl,
};
pub fn write_hls(
rt_handle: &Handle,
play_control: PlayerControl,
playout_stat: PlayoutStatus,
proc_control: ProcessControl,
) {
let config = GlobalConfig::global();
let dec_settings = config.out.clone().output_cmd.unwrap();
let ff_log_format = format!("level+{}", config.logging.ffmpeg_level.to_lowercase());
let get_source = source_generator(
rt_handle,
config.clone(),
play_control.current_list.clone(),
play_control.index.clone(),
playout_stat,
proc_control.is_terminated.clone(),
);
for node in get_source {
*play_control.current_media.lock().unwrap() = Some(node.clone());
let cmd = match node.cmd {
Some(cmd) => cmd,
None => break,
};
if !node.process.unwrap() {
continue;
}
info!(
"Play for <yellow>{}</>: <b><magenta>{}</></b>",
sec_to_time(node.out - node.seek),
node.source
);
let filter = node.filter.unwrap();
let mut dec_cmd = vec!["-hide_banner", "-nostats", "-v", ff_log_format.as_str()];
dec_cmd.append(&mut cmd.iter().map(String::as_str).collect());
if filter.len() > 1 {
dec_cmd.append(&mut filter.iter().map(String::as_str).collect());
}
dec_cmd.append(&mut dec_settings.iter().map(String::as_str).collect());
debug!(
"HLS writer CMD: <bright-blue>\"ffmpeg {}\"</>",
dec_cmd.join(" ")
);
let mut dec_proc = match Command::new("ffmpeg")
.args(dec_cmd)
.stderr(Stdio::piped())
.spawn()
{
Err(e) => {
error!("couldn't spawn decoder process: {e}");
panic!("couldn't spawn decoder process: {e}")
}
Ok(proc) => proc,
};
rt_handle.spawn(stderr_reader(
dec_proc.stderr.take().unwrap(),
"Writer",
));
if let Err(e) = dec_proc.wait() {
error!("Writer: {e}")
};
}
}

187
src/output/mod.rs Normal file
View File

@ -0,0 +1,187 @@
use std::{
io::{prelude::*, BufReader, BufWriter, Read},
process::{Command, Stdio},
sync::mpsc::{sync_channel, Receiver, SyncSender},
thread::sleep,
time::Duration,
};
use simplelog::*;
use tokio::runtime::Handle;
mod desktop;
mod hls;
mod stream;
pub use hls::write_hls;
use crate::input::{ingest_server, source_generator};
use crate::utils::{
sec_to_time, stderr_reader, Decoder, Encoder, GlobalConfig, PlayerControl, PlayoutStatus,
ProcessControl,
};
pub fn player(
rt_handle: &Handle,
play_control: PlayerControl,
playout_stat: PlayoutStatus,
mut proc_control: ProcessControl,
) {
let config = GlobalConfig::global();
let dec_settings = config.processing.clone().settings.unwrap();
let ff_log_format = format!("level+{}", config.logging.ffmpeg_level.to_lowercase());
let mut buffer: [u8; 65088] = [0; 65088];
let mut live_on = false;
let playlist_init = playout_stat.list_init.clone();
let get_source = source_generator(
rt_handle,
config.clone(),
play_control.current_list.clone(),
play_control.index.clone(),
playout_stat,
proc_control.is_terminated.clone(),
);
let mut enc_proc = match config.out.mode.as_str() {
"desktop" => desktop::output(&ff_log_format),
"stream" => stream::output(&ff_log_format),
_ => panic!("Output mode doesn't exists!"),
};
let mut enc_writer = BufWriter::new(enc_proc.stdin.take().unwrap());
rt_handle.spawn(stderr_reader(enc_proc.stderr.take().unwrap(), "Encoder"));
*proc_control.decoder_term.lock().unwrap() = Some(enc_proc);
let (ingest_sender, ingest_receiver): (
SyncSender<(usize, [u8; 65088])>,
Receiver<(usize, [u8; 65088])>,
) = sync_channel(8);
if config.ingest.enable {
rt_handle.spawn(ingest_server(
ff_log_format.clone(),
ingest_sender,
rt_handle.clone(),
proc_control.clone(),
));
}
'source_iter: for node in get_source {
*play_control.current_media.lock().unwrap() = Some(node.clone());
let cmd = match node.cmd {
Some(cmd) => cmd,
None => break,
};
if !node.process.unwrap() {
continue;
}
info!(
"Play for <yellow>{}</>: <b><magenta>{}</></b>",
sec_to_time(node.out - node.seek),
node.source
);
let filter = node.filter.unwrap();
let mut dec_cmd = vec!["-hide_banner", "-nostats", "-v", ff_log_format.as_str()];
dec_cmd.append(&mut cmd.iter().map(String::as_str).collect());
if filter.len() > 1 {
dec_cmd.append(&mut filter.iter().map(String::as_str).collect());
}
dec_cmd.append(&mut dec_settings.iter().map(String::as_str).collect());
debug!(
"Decoder CMD: <bright-blue>\"ffmpeg {}\"</>",
dec_cmd.join(" ")
);
let mut dec_proc = match Command::new("ffmpeg")
.args(dec_cmd)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
{
Err(e) => {
error!("couldn't spawn decoder process: {}", e);
panic!("couldn't spawn decoder process: {}", e)
}
Ok(proc) => proc,
};
let mut dec_reader = BufReader::new(dec_proc.stdout.take().unwrap());
rt_handle.spawn(stderr_reader(dec_proc.stderr.take().unwrap(), "Decoder"));
*proc_control.decoder_term.lock().unwrap() = Some(dec_proc);
loop {
if *proc_control.server_is_running.lock().unwrap() {
if !live_on {
info!("Switch from {} to live ingest", config.processing.mode);
if let Err(e) = enc_writer.flush() {
error!("Encoder error: {e}")
}
if let Err(e) = proc_control.kill(Decoder) {
error!("{e}")
}
live_on = true;
*playlist_init.lock().unwrap() = true;
}
if let Ok(receive) = ingest_receiver.try_recv() {
if let Err(e) = enc_writer.write(&receive.1[..receive.0]) {
error!("Ingest receiver error: {:?}", e);
break 'source_iter;
};
}
} else {
if live_on {
info!("Switch from live ingest to {}", config.processing.mode);
if let Err(e) = enc_writer.flush() {
error!("Encoder error: {e}")
}
live_on = false;
}
let dec_bytes_len = match dec_reader.read(&mut buffer[..]) {
Ok(length) => length,
Err(e) => {
error!("Reading error from decoder: {e:?}");
break 'source_iter;
}
};
if dec_bytes_len > 0 {
if let Err(e) = enc_writer.write(&buffer[..dec_bytes_len]) {
error!("Encoder write error: {e:?}");
break 'source_iter;
};
} else {
break;
}
}
}
if let Err(e) = proc_control.wait(Decoder) {
error!("{e}")
}
}
sleep(Duration::from_secs(1));
if let Err(e) = proc_control.kill(Encoder) {
error!("{e}")
}
}

70
src/output/stream.rs Normal file
View File

@ -0,0 +1,70 @@
use std::{
process,
process::{Command, Stdio},
};
use simplelog::*;
use crate::filter::v_drawtext;
use crate::utils::{GlobalConfig, Media};
pub fn output(log_format: &str) -> process::Child {
let config = GlobalConfig::global();
let mut enc_filter: Vec<String> = vec![];
let mut preview: Vec<&str> = vec![];
let preview_cmd = config.out.preview_cmd.as_ref().unwrap().clone();
let output_cmd = config.out.output_cmd.as_ref().unwrap().clone();
let mut enc_cmd = vec![
"-hide_banner",
"-nostats",
"-v",
log_format,
"-re",
"-i",
"pipe:0",
];
if config.text.add_text && !config.text.over_pre {
info!(
"Using drawtext filter, listening on address: <yellow>{}</>",
config.text.bind_address
);
let mut filter: String = "[0:v]null,".to_string();
filter.push_str(v_drawtext::filter_node(&mut Media::new(0, String::new(), false)).as_str());
if config.out.preview {
filter.push_str(",split=2[v_out1][v_out2]");
preview = vec!["-map", "[v_out1]", "-map", "0:a"];
preview.append(&mut preview_cmd.iter().map(String::as_str).collect());
preview.append(&mut vec!["-map", "[v_out2]", "-map", "0:a"]);
}
enc_filter = vec!["-filter_complex".to_string(), filter];
} else if config.out.preview {
preview = preview_cmd.iter().map(String::as_str).collect()
}
enc_cmd.append(&mut enc_filter.iter().map(String::as_str).collect());
enc_cmd.append(&mut preview);
enc_cmd.append(&mut output_cmd.iter().map(String::as_str).collect());
debug!("Encoder CMD: <bright-blue>\"ffmpeg {}\"</>", enc_cmd.join(" "));
let enc_proc = match Command::new("ffmpeg")
.args(enc_cmd)
.stdin(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
{
Err(e) => {
error!("couldn't spawn encoder process: {e}");
panic!("couldn't spawn encoder process: {e}")
}
Ok(proc) => proc,
};
enc_proc
}

226
src/rpc/mod.rs Normal file
View File

@ -0,0 +1,226 @@
use jsonrpc_http_server::jsonrpc_core::{IoHandler, Params, Value};
use jsonrpc_http_server::{
hyper, AccessControlAllowOrigin, DomainsValidation, Response, RestApi, ServerBuilder,
};
use serde_json::{json, Map};
use simplelog::*;
use crate::utils::{
get_delta, get_sec, sec_to_time, write_status, GlobalConfig, Media, PlayerControl,
PlayoutStatus, ProcessControl,
};
fn get_media_map(media: Media) -> Value {
json!({
"seek": media.seek,
"out": media.out,
"duration": media.duration,
"category": media.category,
"source": media.source,
})
}
fn get_data_map(config: &GlobalConfig, media: Media) -> Map<String, Value> {
let mut data_map = Map::new();
let begin = media.begin.unwrap_or(0.0);
data_map.insert("play_mode".to_string(), json!(config.processing.mode));
data_map.insert("index".to_string(), json!(media.index));
data_map.insert("start_sec".to_string(), json!(begin));
if begin > 0.0 {
let played_time = get_sec() - begin;
let remaining_time = media.out - played_time;
data_map.insert("start_time".to_string(), json!(sec_to_time(begin)));
data_map.insert("played_sec".to_string(), json!(played_time));
data_map.insert("remaining_sec".to_string(), json!(remaining_time));
}
data_map.insert("current_media".to_string(), get_media_map(media));
data_map
}
pub async fn json_rpc_server(
play_control: PlayerControl,
playout_stat: PlayoutStatus,
proc_control: ProcessControl,
) {
let config = GlobalConfig::global();
let mut io = IoHandler::default();
let play = play_control.clone();
let proc = proc_control.clone();
io.add_sync_method("player", move |params: Params| {
if let Params::Map(map) = params {
let mut time_shift = playout_stat.time_shift.lock().unwrap();
let current_date = playout_stat.current_date.lock().unwrap().clone();
let mut date = playout_stat.date.lock().unwrap();
if map.contains_key("control") && &map["control"] == "next" {
let index = *play.index.lock().unwrap();
if index < play.current_list.lock().unwrap().len() {
if let Some(proc) = proc.decoder_term.lock().unwrap().as_mut() {
if let Err(e) = proc.kill() {
error!("Decoder {e:?}")
};
if let Err(e) = proc.wait() {
error!("Decoder {e:?}")
};
info!("Move to next clip");
let mut data_map = Map::new();
let mut media = play.current_list.lock().unwrap()[index].clone();
media.add_probe();
let (delta, _) = get_delta(&media.begin.unwrap_or(0.0));
*time_shift = delta;
*date = current_date.clone();
write_status(&current_date, delta);
data_map.insert("operation".to_string(), json!("move_to_next"));
data_map.insert("shifted_seconds".to_string(), json!(delta));
data_map.insert("media".to_string(), get_media_map(media));
return Ok(Value::Object(data_map));
}
return Ok(Value::String("Move failed".to_string()));
}
return Ok(Value::String("Last clip can not be skipped".to_string()));
}
if map.contains_key("control") && &map["control"] == "back" {
let index = *play.index.lock().unwrap();
if index > 1 && play.current_list.lock().unwrap().len() > 1 {
if let Some(proc) = proc.decoder_term.lock().unwrap().as_mut() {
if let Err(e) = proc.kill() {
error!("Decoder {e:?}")
};
if let Err(e) = proc.wait() {
error!("Decoder {e:?}")
};
info!("Move to last clip");
let mut data_map = Map::new();
let mut media = play.current_list.lock().unwrap()[index - 2].clone();
*play.index.lock().unwrap() = index - 2;
media.add_probe();
let (delta, _) = get_delta(&media.begin.unwrap_or(0.0));
*time_shift = delta;
*date = current_date.clone();
write_status(&current_date, delta);
data_map.insert("operation".to_string(), json!("move_to_last"));
data_map.insert("shifted_seconds".to_string(), json!(delta));
data_map.insert("media".to_string(), get_media_map(media));
return Ok(Value::Object(data_map));
}
return Ok(Value::String("Move failed".to_string()));
}
return Ok(Value::String("Clip index out of range".to_string()));
}
if map.contains_key("control") && &map["control"] == "reset" {
if let Some(proc) = proc.decoder_term.lock().unwrap().as_mut() {
if let Err(e) = proc.kill() {
error!("Decoder {e:?}")
};
if let Err(e) = proc.wait() {
error!("Decoder {e:?}")
};
info!("Reset playout to original state");
let mut data_map = Map::new();
*time_shift = 0.0;
*date = current_date.clone();
*playout_stat.list_init.lock().unwrap() = true;
write_status(&current_date, 0.0);
data_map.insert("operation".to_string(), json!("reset_playout_state"));
return Ok(Value::Object(data_map));
}
return Ok(Value::String("Reset playout state failed".to_string()));
}
if map.contains_key("media") && &map["media"] == "current" {
if let Some(media) = play.current_media.lock().unwrap().clone() {
let data_map = get_data_map(config, media);
return Ok(Value::Object(data_map));
};
}
if map.contains_key("media") && &map["media"] == "next" {
let index = *play.index.lock().unwrap();
if index < play.current_list.lock().unwrap().len() {
let media = play.current_list.lock().unwrap()[index].clone();
let data_map = get_data_map(config, media);
return Ok(Value::Object(data_map));
}
return Ok(Value::String("There is no next clip".to_string()));
}
if map.contains_key("media") && &map["media"] == "last" {
let index = *play.index.lock().unwrap();
if index > 1 && index - 2 < play.current_list.lock().unwrap().len() {
let media = play.current_list.lock().unwrap()[index - 2].clone();
let data_map = get_data_map(config, media);
return Ok(Value::Object(data_map));
}
return Ok(Value::String("There is no last clip".to_string()));
}
}
Ok(Value::String("No, or wrong parameters set!".to_string()))
});
let server = ServerBuilder::new(io)
.cors(DomainsValidation::AllowOnly(vec![
AccessControlAllowOrigin::Null,
]))
.request_middleware(|request: hyper::Request<hyper::Body>| {
if request.headers().contains_key("authorization")
&& request.headers()["authorization"] == config.rpc_server.authorization
{
if request.uri() == "/status" {
println!("{:?}", request.headers().contains_key("authorization"));
Response::ok("Server running OK.").into()
} else {
request.into()
}
} else {
Response::bad_request("No authorization header or valid key found!").into()
}
})
.rest_api(RestApi::Secure)
.start_http(&config.rpc_server.address.parse().unwrap())
.expect("Unable to start RPC server");
*proc_control.rpc_handle.lock().unwrap() = Some(server.close_handle().clone());
server.wait();
}

60
src/utils/arg_parse.rs Normal file
View File

@ -0,0 +1,60 @@
use clap::Parser;
#[derive(Parser, Debug)]
#[clap(version,
about = "ffplayout, Rust based 24/7 playout solution.\n\nRun without any command to use config file only, or with commands to override parameters.",
long_about = None)]
pub struct Args {
#[clap(short, long, help = "File path to ffplayout.conf")]
pub config: Option<String>,
#[clap(short, long, help = "File path for logging")]
pub log: Option<String>,
#[clap(
short,
long,
help = "Generate playlist for date. Date-range is possible, like: 2022-01-01 - 2022-01-10.",
name = "YYYY-MM-DD",
multiple_values=true
)]
pub generate: Option<Vec<String>>,
#[clap(short = 'm', long, help = "Playing mode: folder, playlist")]
pub play_mode: Option<String>,
#[clap(short, long, help = "Play folder content")]
pub folder: Option<String>,
#[clap(short, long, help = "Path from playlist")]
pub playlist: Option<String>,
#[clap(
short,
long,
help = "Start time in 'hh:mm:ss', 'now' for start with first"
)]
pub start: Option<String>,
#[clap(
short = 't',
long,
help = "Set length in 'hh:mm:ss', 'none' for no length check"
)]
pub length: Option<String>,
#[clap(short, long, help = "Loop playlist infinitely")]
pub infinit: bool,
#[clap(short, long, help = "Set output mode: desktop, hls, stream")]
pub output: Option<String>,
#[clap(short, long, help = "Set audio volume")]
pub volume: Option<f64>,
}
pub fn get_args() -> Args {
let args = Args::parse();
args
}

286
src/utils/config.rs Normal file
View File

@ -0,0 +1,286 @@
use once_cell::sync::OnceCell;
use serde::{Deserialize, Serialize};
use serde_yaml::{self};
use std::{
env,
fs::File,
path::{Path, PathBuf},
process,
};
use shlex::split;
use crate::utils::{get_args, time_to_sec};
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct GlobalConfig {
pub general: General,
pub rpc_server: RpcServer,
pub mail: Mail,
pub logging: Logging,
pub processing: Processing,
pub ingest: Ingest,
pub playlist: Playlist,
pub storage: Storage,
pub text: Text,
pub out: Out,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct General {
pub stop_threshold: f64,
pub generate: Option<Vec<String>>,
#[serde(skip_serializing, skip_deserializing)]
pub stat_file: String,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct RpcServer {
pub enable: bool,
pub address: String,
pub authorization: String,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Mail {
pub subject: String,
pub smtp_server: String,
pub starttls: bool,
pub sender_addr: String,
pub sender_pass: String,
pub recipient: String,
pub mail_level: String,
pub interval: i32,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Logging {
pub log_to_file: bool,
pub backup_count: usize,
pub local_time: bool,
pub timestamp: bool,
pub log_path: String,
pub log_level: String,
pub ffmpeg_level: String,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Processing {
pub mode: String,
pub width: i64,
pub height: i64,
pub aspect: f64,
pub fps: f64,
pub add_logo: bool,
pub logo: String,
pub logo_scale: String,
pub logo_opacity: f32,
pub logo_filter: String,
pub add_loudnorm: bool,
pub loud_i: f32,
pub loud_tp: f32,
pub loud_lra: f32,
pub volume: f64,
pub settings: Option<Vec<String>>,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Ingest {
pub enable: bool,
input_param: String,
pub input_cmd: Option<Vec<String>>,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Playlist {
pub path: String,
pub day_start: String,
pub start_sec: Option<f64>,
pub length: String,
pub length_sec: Option<f64>,
pub infinit: bool,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Storage {
pub path: String,
pub filler_clip: String,
pub extensions: Vec<String>,
pub shuffle: bool,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Text {
pub add_text: bool,
pub over_pre: bool,
pub bind_address: String,
pub fontfile: String,
pub text_from_filename: bool,
pub style: String,
pub regex: String,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Out {
pub mode: String,
pub preview: bool,
preview_param: String,
pub preview_cmd: Option<Vec<String>>,
output_param: String,
pub output_cmd: Option<Vec<String>>,
}
impl GlobalConfig {
fn new() -> Self {
let args = get_args();
let mut config_path = match env::current_exe() {
Ok(path) => path.parent().unwrap().join("ffplayout.yml"),
Err(_) => PathBuf::from("./ffplayout.yml"),
};
if let Some(cfg) = args.config {
config_path = PathBuf::from(cfg);
} else if Path::new("/etc/ffplayout/ffplayout.yml").is_file() {
config_path = PathBuf::from("/etc/ffplayout/ffplayout.yml");
}
let f = match File::open(&config_path) {
Ok(file) => file,
Err(err) => {
println!(
"{config_path:?} doesn't exists!\n{}\n\nSystem error: {err}",
"Put \"ffplayout.yml\" in \"/etc/playout/\" or beside the executable!"
);
process::exit(0x0100);
}
};
let mut config: GlobalConfig =
serde_yaml::from_reader(f).expect("Could not read config file.");
config.general.generate = None;
config.general.stat_file = env::temp_dir()
.join("ffplayout_status.json")
.display()
.to_string();
let fps = config.processing.fps.to_string();
let bitrate = config.processing.width * config.processing.height / 10;
config.playlist.start_sec = Some(time_to_sec(&config.playlist.day_start));
if config.playlist.length.contains(":") {
config.playlist.length_sec = Some(time_to_sec(&config.playlist.length));
} else {
config.playlist.length_sec = Some(86400.0);
}
let mut settings: Vec<String> = vec![
"-pix_fmt",
"yuv420p",
"-r",
&fps,
"-c:v",
"mpeg2video",
"-g",
"1",
"-b:v",
format!("{bitrate}k").as_str(),
"-minrate",
format!("{bitrate}k").as_str(),
"-maxrate",
format!("{bitrate}k").as_str(),
"-bufsize",
format!("{}k", bitrate / 2).as_str(),
]
.iter()
.map(|&s| s.into())
.collect();
settings.append(&mut pre_audio_codec(config.processing.add_loudnorm));
settings.append(
&mut vec!["-ar", "48000", "-ac", "2", "-f", "mpegts", "-"]
.iter()
.map(|&s| s.into())
.collect(),
);
config.processing.settings = Some(settings);
config.ingest.input_cmd = split(config.ingest.input_param.as_str());
config.out.preview_cmd = split(config.out.preview_param.as_str());
config.out.output_cmd = split(config.out.output_param.as_str());
if let Some(gen) = args.generate {
config.general.generate = Some(gen);
}
if let Some(log_path) = args.log {
config.logging.log_path = log_path;
}
if let Some(playlist) = args.playlist {
config.playlist.path = playlist;
}
if let Some(mode) = args.play_mode {
config.processing.mode = mode;
}
if let Some(folder) = args.folder {
config.storage.path = folder;
}
if let Some(start) = args.start {
config.playlist.day_start = start.clone();
config.playlist.start_sec = Some(time_to_sec(&start));
}
if let Some(length) = args.length {
config.playlist.length = length.clone();
if length.contains(":") {
config.playlist.length_sec = Some(time_to_sec(&length));
} else {
config.playlist.length_sec = Some(86400.0);
}
}
if args.infinit {
config.playlist.infinit = args.infinit;
}
if let Some(output) = args.output {
config.out.mode = output;
}
if let Some(volume) = args.volume {
config.processing.volume = volume;
}
config
}
pub fn global() -> &'static GlobalConfig {
INSTANCE.get().expect("Config is not initialized")
}
}
static INSTANCE: OnceCell<GlobalConfig> = OnceCell::new();
fn pre_audio_codec(add_loudnorm: bool) -> Vec<String> {
// when add_loudnorm is False we use a different audio encoder,
// s302m has higher quality, but is experimental
// and works not well together with the loudnorm filter
let mut codec = vec!["-c:a", "s302m", "-strict", "-2"];
if add_loudnorm {
codec = vec!["-c:a", "mp2", "-b:a", "384k"];
}
codec.iter().map(|&s| s.into()).collect()
}
pub fn init_config() {
let config = GlobalConfig::new();
INSTANCE.set(config).unwrap();
}

181
src/utils/controller.rs Normal file
View File

@ -0,0 +1,181 @@
use std::{
fmt,
process::Child,
sync::{Arc, Mutex, RwLock},
};
use jsonrpc_http_server::CloseHandle;
use simplelog::*;
use crate::utils::Media;
pub enum ProcessUnit {
Decoder,
Encoder,
Ingest,
}
impl fmt::Display for ProcessUnit {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
ProcessUnit::Decoder => write!(f, "Decoder"),
ProcessUnit::Encoder => write!(f, "Encoder"),
ProcessUnit::Ingest => write!(f, "Ingest"),
}
}
}
use ProcessUnit::*;
#[derive(Clone)]
pub struct ProcessControl {
pub decoder_term: Arc<Mutex<Option<Child>>>,
pub encoder_term: Arc<Mutex<Option<Child>>>,
pub server_term: Arc<Mutex<Option<Child>>>,
pub server_is_running: Arc<Mutex<bool>>,
pub rpc_handle: Arc<Mutex<Option<CloseHandle>>>,
pub is_terminated: Arc<Mutex<bool>>,
pub is_alive: Arc<RwLock<bool>>,
}
impl ProcessControl {
pub fn new() -> Self {
Self {
decoder_term: Arc::new(Mutex::new(None)),
encoder_term: Arc::new(Mutex::new(None)),
server_term: Arc::new(Mutex::new(None)),
server_is_running: Arc::new(Mutex::new(false)),
rpc_handle: Arc::new(Mutex::new(None)),
is_terminated: Arc::new(Mutex::new(false)),
is_alive: Arc::new(RwLock::new(true)),
}
}
}
impl ProcessControl {
pub fn kill(&mut self, proc: ProcessUnit) -> Result<(), String> {
match proc {
Decoder => {
if let Some(proc) = self.decoder_term.lock().unwrap().as_mut() {
if let Err(e) = proc.kill() {
return Err(format!("Decoder {e:?}"));
};
}
}
Encoder => {
if let Some(proc) = self.encoder_term.lock().unwrap().as_mut() {
if let Err(e) = proc.kill() {
return Err(format!("Encoder {e:?}"));
};
}
}
Ingest => {
if let Some(proc) = self.server_term.lock().unwrap().as_mut() {
if let Err(e) = proc.kill() {
return Err(format!("Ingest server {e:?}"));
};
}
}
}
if let Err(e) = self.wait(proc) {
return Err(e);
};
Ok(())
}
pub fn wait(&mut self, proc: ProcessUnit) -> Result<(), String> {
match proc {
Decoder => {
if let Some(proc) = self.decoder_term.lock().unwrap().as_mut() {
if let Err(e) = proc.wait() {
return Err(format!("Decoder {e:?}"));
};
}
}
Encoder => {
if let Some(proc) = self.encoder_term.lock().unwrap().as_mut() {
if let Err(e) = proc.wait() {
return Err(format!("Encoder {e:?}"));
};
}
}
Ingest => {
if let Some(proc) = self.server_term.lock().unwrap().as_mut() {
if let Err(e) = proc.wait() {
return Err(format!("Ingest server {e:?}"));
};
}
}
}
Ok(())
}
pub fn kill_all(&mut self) {
*self.is_terminated.lock().unwrap() = true;
if *self.is_alive.read().unwrap() {
*self.is_alive.write().unwrap() = false;
if let Some(rpc) = &*self.rpc_handle.lock().unwrap() {
rpc.clone().close()
};
for unit in [
Decoder,
Encoder,
Ingest,
] {
if let Err(e) = self.kill(unit) {
error!("{e}")
}
}
}
}
}
impl Drop for ProcessControl {
fn drop(&mut self) {
self.kill_all()
}
}
#[derive(Clone)]
pub struct PlayerControl {
pub current_media: Arc<Mutex<Option<Media>>>,
pub current_list: Arc<Mutex<Vec<Media>>>,
pub index: Arc<Mutex<usize>>,
}
impl PlayerControl {
pub fn new() -> Self {
Self {
current_media: Arc::new(Mutex::new(None)),
current_list: Arc::new(Mutex::new(vec![Media::new(0, String::new(), false)])),
index: Arc::new(Mutex::new(0)),
}
}
}
#[derive(Clone, Debug)]
pub struct PlayoutStatus {
pub time_shift: Arc<Mutex<f64>>,
pub date: Arc<Mutex<String>>,
pub current_date: Arc<Mutex<String>>,
pub list_init: Arc<Mutex<bool>>,
}
impl PlayoutStatus {
pub fn new() -> Self {
Self {
time_shift: Arc::new(Mutex::new(0.0)),
date: Arc::new(Mutex::new(String::new())),
current_date: Arc::new(Mutex::new(String::new())),
list_init: Arc::new(Mutex::new(true)),
}
}
}

143
src/utils/generator.rs Normal file
View File

@ -0,0 +1,143 @@
use std::{
fs::{create_dir_all, write},
path::Path,
process::exit,
sync::{Arc, Mutex},
};
use chrono::{Duration, NaiveDate};
use simplelog::*;
use crate::input::Source;
use crate::utils::{json_serializer::Playlist, GlobalConfig, Media};
fn get_date_range(date_range: &Vec<String>) -> Vec<String> {
let mut range = vec![];
let start;
let end;
match NaiveDate::parse_from_str(&date_range[0], "%Y-%m-%d") {
Ok(s) => {
start = s;
}
Err(_) => {
error!("date format error in: <yellow>{:?}</>", date_range[0]);
exit(1);
}
}
match NaiveDate::parse_from_str(&date_range[2], "%Y-%m-%d") {
Ok(e) => {
end = e;
}
Err(_) => {
error!("date format error in: <yellow>{:?}</>", date_range[2]);
exit(1);
}
}
let duration = end.signed_duration_since(start);
let days = duration.num_days() + 1;
for day in 0..days {
range.push((start + Duration::days(day)).format("%Y-%m-%d").to_string());
}
range
}
pub fn generate_playlist(mut date_range: Vec<String>) {
let config = GlobalConfig::global();
let total_length = config.playlist.length_sec.unwrap().clone();
let current_list = Arc::new(Mutex::new(vec![Media::new(0, "".to_string(), false)]));
let index = Arc::new(Mutex::new(0));
let playlist_root = Path::new(&config.playlist.path);
if !playlist_root.is_dir() {
error!(
"Playlist folder <b><magenta>{}</></b> not exists!",
&config.playlist.path
);
exit(1);
}
if date_range.contains(&"-".to_string()) && date_range.len() == 3 {
date_range = get_date_range(&date_range)
}
let media_list = Source::new(current_list, index);
let list_length = media_list.nodes.lock().unwrap().len();
for date in date_range {
let d: Vec<&str> = date.split('-').collect();
let year = d[0];
let month = d[1];
let playlist_path = playlist_root.join(year).join(month);
let playlist_file = &playlist_path.join(format!("{date}.json"));
if let Err(e) = create_dir_all(playlist_path) {
error!("Create folder failed: {e:?}");
exit(1);
}
if playlist_file.is_file() {
warn!(
"Playlist exists, skip: <b><magenta>{}</></b>",
playlist_file.display()
);
continue;
}
info!(
"Generate playlist: <b><magenta>{}</></b>",
playlist_file.display()
);
let mut filler = Media::new(0, config.storage.filler_clip.clone(), true);
let filler_length = filler.duration.clone();
let mut length = 0.0;
let mut round = 0;
let mut playlist = Playlist {
date,
current_file: None,
start_sec: None,
modified: None,
program: vec![],
};
for item in media_list.clone() {
let duration = item.duration.clone();
if total_length > length + duration {
playlist.program.push(item);
length += duration;
} else if filler_length > 0.0 && filler_length > total_length - length {
filler.out = filler_length - (total_length - length);
playlist.program.push(filler);
break;
} else if round == list_length - 1 {
break;
} else {
round += 1;
}
}
let json: String = match serde_json::to_string_pretty(&playlist) {
Ok(j) => j,
Err(e) => {
error!("Unable to serialize data: {e:?}");
exit(0);
}
};
if let Err(e) = write(playlist_file, &json) {
error!("Unable to write playlist: {e:?}");
exit(1)
};
}
}

View File

@ -0,0 +1,118 @@
use serde::{Deserialize, Serialize};
use std::{
fs::File,
path::Path,
sync::{Arc, Mutex},
};
use simplelog::*;
use tokio::runtime::Handle;
use crate::utils::{get_date, modified_time, validate_playlist, GlobalConfig, Media};
pub const DUMMY_LEN: f64 = 60.0;
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Playlist {
pub date: String,
#[serde(skip_serializing, skip_deserializing)]
pub start_sec: Option<f64>,
#[serde(skip_serializing, skip_deserializing)]
pub current_file: Option<String>,
#[serde(skip_serializing, skip_deserializing)]
pub modified: Option<String>,
pub program: Vec<Media>,
}
impl Playlist {
fn new(date: String, start: f64) -> Self {
let mut media = Media::new(0, String::new(), false);
media.begin = Some(start);
media.duration = DUMMY_LEN;
media.out = DUMMY_LEN;
Self {
date,
start_sec: Some(start),
current_file: None,
modified: Some(String::new()),
program: vec![media],
}
}
}
pub fn read_json(
path: Option<String>,
rt_handle: Handle,
is_terminated: Arc<Mutex<bool>>,
seek: bool,
next_start: f64,
) -> Playlist {
let config = GlobalConfig::global();
let mut playlist_path = Path::new(&config.playlist.path).to_owned();
let mut start_sec = config.playlist.start_sec.unwrap();
let date = get_date(seek, start_sec, next_start);
if playlist_path.is_dir() {
let d: Vec<&str> = date.split('-').collect();
playlist_path = playlist_path
.join(d[0])
.join(d[1])
.join(date.clone())
.with_extension("json");
}
let mut current_file: String = playlist_path.as_path().display().to_string();
if let Some(p) = path {
playlist_path = Path::new(&p).to_owned();
current_file = p
}
if !playlist_path.is_file() {
error!("Playlist <b><magenta>{current_file}</></b> not exists!");
return Playlist::new(date, start_sec);
}
info!("Read Playlist: <b><magenta>{current_file}</></b>");
let f = File::options()
.read(true)
.write(false)
.open(&current_file)
.expect("Could not open json playlist file.");
let mut playlist: Playlist =
serde_json::from_reader(f).expect("Could not read json playlist file.");
playlist.current_file = Some(current_file.clone());
playlist.start_sec = Some(start_sec.clone());
let modify = modified_time(&current_file);
if let Some(modi) = modify {
playlist.modified = Some(modi.to_string());
}
for (i, item) in playlist.program.iter_mut().enumerate() {
item.begin = Some(start_sec);
item.index = Some(i);
item.last_ad = Some(false);
item.next_ad = Some(false);
item.process = Some(true);
item.filter = Some(vec![]);
start_sec += item.out - item.seek;
}
rt_handle.spawn(validate_playlist(
playlist.clone(),
is_terminated,
config.clone(),
));
playlist
}

View File

@ -0,0 +1,48 @@
use std::{path::Path, sync::{Arc, Mutex},};
use simplelog::*;
use crate::utils::{sec_to_time, GlobalConfig, MediaProbe, Playlist};
pub async fn validate_playlist(playlist: Playlist, is_terminated: Arc<Mutex<bool>>, config: GlobalConfig) {
let date = playlist.date;
let mut length = config.playlist.length_sec.unwrap();
let mut begin = config.playlist.start_sec.unwrap();
length += begin;
debug!("validate playlist from: <yellow>{date}</>");
for item in playlist.program.iter() {
if *is_terminated.lock().unwrap() {
return
}
if Path::new(&item.source).is_file() {
let probe = MediaProbe::new(item.source.clone());
if probe.format.is_none() {
error!(
"No Metadata from file <b><magenta>{}</></b> at <yellow>{}</>",
sec_to_time(begin),
item.source
);
}
} else {
error!(
"File on position <yellow>{}</> not exists: <b><magenta>{}</></b>",
sec_to_time(begin),
item.source
);
}
begin += item.out - item.seek;
}
if length > begin + 1.0 {
error!(
"Playlist from <yellow>{date}</> not long enough, <yellow>{}</> needed!",
sec_to_time(length - begin),
);
}
}

235
src/utils/logging.rs Normal file
View File

@ -0,0 +1,235 @@
extern crate log;
extern crate simplelog;
use chrono::prelude::*;
use regex::Regex;
use std::{
path::Path,
sync::{Arc, Mutex},
thread::sleep,
time::Duration,
};
use file_rotate::{compression::Compression, suffix::AppendCount, ContentLimit, FileRotate};
use lettre::{
message::header, transport::smtp::authentication::Credentials, Message, SmtpTransport,
Transport,
};
use log::{Level, LevelFilter, Log, Metadata, Record};
use simplelog::*;
use tokio::runtime::Handle;
use crate::utils::GlobalConfig;
fn send_mail(msg: String) {
let config = GlobalConfig::global();
let email = Message::builder()
.from(config.mail.sender_addr.parse().unwrap())
.to(config.mail.recipient.parse().unwrap())
.subject(config.mail.subject.clone())
.header(header::ContentType::TEXT_PLAIN)
.body(clean_string(&msg))
.unwrap();
let credentials = Credentials::new(
config.mail.sender_addr.clone(),
config.mail.sender_pass.clone(),
);
let mut transporter = SmtpTransport::relay(config.mail.smtp_server.clone().as_str());
if config.mail.starttls {
transporter = SmtpTransport::starttls_relay(config.mail.smtp_server.clone().as_str())
}
let mailer = transporter.unwrap().credentials(credentials).build();
// Send the email
match mailer.send(&email) {
Ok(_) => (),
Err(e) => info!("Could not send email: {:?}", e),
}
}
async fn mail_queue(
messages: Arc<Mutex<Vec<String>>>,
is_terminated: Arc<Mutex<bool>>,
interval: i32,
) {
let mut count: i32 = 0;
loop {
if *is_terminated.lock().unwrap() || count == interval {
// check every 30 seconds for messages and send them
if messages.lock().unwrap().len() > 0 {
let msg = messages.lock().unwrap().join("\n");
send_mail(msg);
messages.lock().unwrap().clear();
}
count = 0;
}
if *is_terminated.lock().unwrap() {
break;
}
sleep(Duration::from_secs(1));
count += 1;
}
}
pub struct LogMailer {
level: LevelFilter,
pub config: Config,
messages: Arc<Mutex<Vec<String>>>,
}
impl LogMailer {
pub fn new(
log_level: LevelFilter,
config: Config,
messages: Arc<Mutex<Vec<String>>>,
) -> Box<LogMailer> {
Box::new(LogMailer {
level: log_level,
config,
messages,
})
}
}
impl Log for LogMailer {
fn enabled(&self, metadata: &Metadata<'_>) -> bool {
metadata.level() <= self.level
}
fn log(&self, record: &Record<'_>) {
if self.enabled(record.metadata()) {
let local: DateTime<Local> = Local::now();
let time_stamp = local.format("[%Y-%m-%d %H:%M:%S%.3f]");
let level = record.level().to_string().to_uppercase();
let rec = record.args().to_string();
let full_line: String = format!("{time_stamp} [{level: >5}] {rec}");
self.messages.lock().unwrap().push(full_line);
}
}
fn flush(&self) {}
}
impl SharedLogger for LogMailer {
fn level(&self) -> LevelFilter {
self.level
}
fn config(&self) -> Option<&Config> {
Some(&self.config)
}
fn as_log(self: Box<Self>) -> Box<dyn Log> {
Box::new(*self)
}
}
fn clean_string(text: &str) -> String {
let regex: Regex = Regex::new(r"\x1b\[[0-9;]*[mGKF]").unwrap();
regex.replace_all(text, "").to_string()
}
pub fn init_logging(
rt_handle: Handle,
is_terminated: Arc<Mutex<bool>>,
) -> Vec<Box<dyn SharedLogger>> {
let config = GlobalConfig::global();
let app_config = config.logging.clone();
let mut time_level = LevelFilter::Off;
let mut app_logger: Vec<Box<dyn SharedLogger>> = vec![];
if app_config.timestamp {
time_level = LevelFilter::Error;
}
let log_config = simplelog::ConfigBuilder::new()
.set_thread_level(LevelFilter::Off)
.set_target_level(LevelFilter::Off)
.set_level_padding(LevelPadding::Left)
.set_time_to_local(app_config.local_time)
.set_time_level(time_level)
.clone();
if app_config.log_to_file {
let file_config = log_config
.clone()
.set_time_format("[%Y-%m-%d %H:%M:%S%.3f]".into())
.build();
let mut log_path = "logs/ffplayout.log".to_string();
if Path::new(&app_config.log_path).is_dir() {
log_path = Path::new(&app_config.log_path)
.join("ffplayout.log")
.display()
.to_string();
} else if Path::new(&app_config.log_path).is_file() {
log_path = app_config.log_path
} else {
println!("Logging path not exists!")
}
let log = || {
FileRotate::new(
log_path,
AppendCount::new(app_config.backup_count),
ContentLimit::Lines(1000),
Compression::None,
)
};
app_logger.push(WriteLogger::new(LevelFilter::Debug, file_config, log()));
} else {
let term_config = log_config
.clone()
.set_level_color(Level::Debug, Some(Color::Ansi256(12)))
.set_level_color(Level::Info, Some(Color::Ansi256(10)))
.set_level_color(Level::Warn, Some(Color::Ansi256(208)))
.set_level_color(Level::Error, Some(Color::Ansi256(9)))
.set_time_format_str("\x1b[30;1m[%Y-%m-%d %H:%M:%S%.3f]\x1b[0m")
.build();
app_logger.push(TermLogger::new(
LevelFilter::Debug,
term_config,
TerminalMode::Mixed,
ColorChoice::Auto,
));
}
if config.mail.recipient.contains("@") && config.mail.recipient.contains(".") {
let messages: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
let interval = config.mail.interval.clone();
rt_handle.spawn(mail_queue(
messages.clone(),
is_terminated.clone(),
interval,
));
let mail_config = log_config
.clone()
.build();
let filter = match config.mail.mail_level.to_lowercase().as_str() {
"info" => LevelFilter::Info,
"warning" => LevelFilter::Warn,
_ => LevelFilter::Error,
};
app_logger.push(LogMailer::new(filter, mail_config, messages));
}
app_logger
}

487
src/utils/mod.rs Normal file
View File

@ -0,0 +1,487 @@
use chrono::prelude::*;
use chrono::Duration;
use ffprobe::{ffprobe, Format, Stream};
use std::{
fs,
fs::metadata,
io::{BufRead, BufReader, Error},
path::Path,
process::exit,
process::{ChildStderr, Command, Stdio},
time,
time::UNIX_EPOCH,
};
use regex::Regex;
use serde::{Deserialize, Serialize};
use serde_json::json;
use simplelog::*;
mod arg_parse;
mod config;
pub mod controller;
mod generator;
pub mod json_serializer;
mod json_validate;
mod logging;
pub use arg_parse::get_args;
pub use config::{init_config, GlobalConfig};
pub use controller::{PlayerControl, PlayoutStatus, ProcessControl, ProcessUnit::*};
pub use generator::generate_playlist;
pub use json_serializer::{read_json, Playlist, DUMMY_LEN};
pub use json_validate::validate_playlist;
pub use logging::init_logging;
use crate::filter::filter_chains;
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Media {
#[serde(skip_serializing, skip_deserializing)]
pub begin: Option<f64>,
#[serde(skip_serializing, skip_deserializing)]
pub index: Option<usize>,
#[serde(rename = "in")]
pub seek: f64,
pub out: f64,
pub duration: f64,
#[serde(skip_serializing)]
pub category: Option<String>,
pub source: String,
#[serde(skip_serializing, skip_deserializing)]
pub cmd: Option<Vec<String>>,
#[serde(skip_serializing, skip_deserializing)]
pub filter: Option<Vec<String>>,
#[serde(skip_serializing, skip_deserializing)]
pub probe: Option<MediaProbe>,
#[serde(skip_serializing, skip_deserializing)]
pub last_ad: Option<bool>,
#[serde(skip_serializing, skip_deserializing)]
pub next_ad: Option<bool>,
#[serde(skip_serializing, skip_deserializing)]
pub process: Option<bool>,
}
impl Media {
pub fn new(index: usize, src: String, do_probe: bool) -> Self {
let mut duration: f64 = 0.0;
let mut probe = None;
if do_probe && Path::new(&src).is_file() {
probe = Some(MediaProbe::new(src.clone()));
duration = match probe.clone().unwrap().format.unwrap().duration {
Some(dur) => dur.parse().unwrap(),
None => 0.0,
};
}
Self {
begin: None,
index: Some(index),
seek: 0.0,
out: duration,
duration: duration,
category: None,
source: src.clone(),
cmd: Some(vec!["-i".to_string(), src]),
filter: Some(vec![]),
probe: probe,
last_ad: Some(false),
next_ad: Some(false),
process: Some(true),
}
}
pub fn add_probe(&mut self) {
if self.probe.is_none() {
let probe = MediaProbe::new(self.source.clone());
self.probe = Some(probe.clone());
if self.duration == 0.0 {
let duration = match probe.format.unwrap().duration {
Some(dur) => dur.parse().unwrap(),
None => 0.0,
};
self.out = duration;
self.duration = duration;
}
}
}
pub fn add_filter(&mut self) {
let mut node = self.clone();
self.filter = Some(filter_chains(&mut node))
}
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct MediaProbe {
pub format: Option<Format>,
pub audio_streams: Option<Vec<Stream>>,
pub video_streams: Option<Vec<Stream>>,
}
impl MediaProbe {
fn new(input: String) -> Self {
let probe = ffprobe(&input);
let mut a_stream: Vec<Stream> = vec![];
let mut v_stream: Vec<Stream> = vec![];
match probe {
Ok(obj) => {
for stream in obj.streams {
let cp_stream = stream.clone();
match cp_stream.codec_type {
Some(codec_type) => {
if codec_type == "audio" {
a_stream.push(stream)
} else if codec_type == "video" {
v_stream.push(stream)
}
}
_ => {
error!("No codec type found for stream: {stream:?}")
}
}
}
MediaProbe {
format: Some(obj.format),
audio_streams: if a_stream.len() > 0 {
Some(a_stream)
} else {
None
},
video_streams: if v_stream.len() > 0 {
Some(v_stream)
} else {
None
},
}
}
Err(e) => {
error!(
"Can't read source '{input}' with ffprobe, source is probably damaged! Error is: {e:?}"
);
MediaProbe {
format: None,
audio_streams: None,
video_streams: None,
}
}
}
}
}
pub fn write_status(date: &str, shift: f64) {
let config = GlobalConfig::global();
let stat_file = config.general.stat_file.clone();
let data = json!({
"time_shift": shift,
"date": date,
});
let status_data: String = serde_json::to_string(&data).expect("Serialize status data failed");
if let Err(e) = fs::write(stat_file, &status_data) {
error!("Unable to write file: {e:?}")
};
}
// pub fn get_timestamp() -> i64 {
// let local: DateTime<Local> = Local::now();
// local.timestamp_millis() as i64
// }
pub fn get_sec() -> f64 {
let local: DateTime<Local> = Local::now();
(local.hour() * 3600 + local.minute() * 60 + local.second()) as f64
+ (local.nanosecond() as f64 / 1000000000.0)
}
pub fn get_date(seek: bool, start: f64, next_start: f64) -> String {
let local: DateTime<Local> = Local::now();
if seek && start > get_sec() {
return (local - Duration::days(1)).format("%Y-%m-%d").to_string();
}
if start == 0.0 && next_start >= 86400.0 {
return (local + Duration::days(1)).format("%Y-%m-%d").to_string();
}
local.format("%Y-%m-%d").to_string()
}
pub fn modified_time(path: &str) -> Option<DateTime<Local>> {
let metadata = metadata(path).unwrap();
if let Ok(time) = metadata.modified() {
let date_time: DateTime<Local> = time.into();
return Some(date_time);
}
None
}
pub fn time_to_sec(time_str: &str) -> f64 {
if ["now", "", "none"].contains(&time_str) || !time_str.contains(":") {
return get_sec();
}
let t: Vec<&str> = time_str.split(':').collect();
let h: f64 = t[0].parse().unwrap();
let m: f64 = t[1].parse().unwrap();
let s: f64 = t[2].parse().unwrap();
h * 3600.0 + m * 60.0 + s
}
pub fn sec_to_time(sec: f64) -> String {
let d = UNIX_EPOCH + time::Duration::from_millis((sec * 1000.0) as u64);
// Create DateTime from SystemTime
let date_time = DateTime::<Utc>::from(d);
date_time.format("%H:%M:%S%.3f").to_string()
}
pub fn is_close(a: f64, b: f64, to: f64) -> bool {
if (a - b).abs() < to {
return true;
}
false
}
pub fn get_delta(begin: &f64) -> (f64, f64) {
let config = GlobalConfig::global();
let mut current_time = get_sec();
let start = config.playlist.start_sec.unwrap();
let length = time_to_sec(&config.playlist.length);
let mut target_length = 86400.0;
let total_delta;
if length > 0.0 && length != target_length {
target_length = length
}
if begin == &start && start == 0.0 && 86400.0 - current_time < 4.0 {
current_time -= target_length
} else if start >= current_time && begin != &start {
current_time += target_length
}
let mut current_delta = begin - current_time;
if is_close(current_delta, 86400.0, config.general.stop_threshold) {
current_delta -= 86400.0
}
if current_time < start {
total_delta = start - current_time;
} else {
total_delta = target_length + start - current_time;
}
(current_delta, total_delta)
}
pub fn check_sync(delta: f64) -> bool {
let config = GlobalConfig::global();
if delta.abs() > config.general.stop_threshold && config.general.stop_threshold > 0.0 {
error!("Clip begin out of sync for <yellow>{}</> seconds", delta);
return false;
}
true
}
pub fn gen_dummy(duration: f64) -> (String, Vec<String>) {
let config = GlobalConfig::global();
let color = "#121212";
let source = format!(
"color=c={color}:s={}x{}:d={duration}",
config.processing.width, config.processing.height
);
let cmd: Vec<String> = vec![
"-f".to_string(),
"lavfi".to_string(),
"-i".to_string(),
format!(
"{source}:r={},format=pix_fmts=yuv420p",
config.processing.fps
),
"-f".to_string(),
"lavfi".to_string(),
"-i".to_string(),
format!("anoisesrc=d={duration}:c=pink:r=48000:a=0.3"),
];
(source, cmd)
}
pub fn seek_and_length(src: String, seek: f64, out: f64, duration: f64) -> Vec<String> {
let mut source_cmd: Vec<String> = vec![];
if seek > 0.0 {
source_cmd.append(&mut vec!["-ss".to_string(), format!("{seek}")])
}
source_cmd.append(&mut vec!["-i".to_string(), src]);
if duration > out {
source_cmd.append(&mut vec![
"-t".to_string(),
format!("{}", out - seek).to_string(),
]);
}
source_cmd
}
pub async fn stderr_reader(std_errors: ChildStderr, suffix: &str) -> Result<(), Error> {
// read ffmpeg stderr decoder, encoder and server instance
// and log the output
fn format_line(line: String, level: &str) -> String {
line.replace(&format!("[{level: >5}] "), "")
}
let buffer = BufReader::new(std_errors);
for line in buffer.lines() {
let line = line?;
if line.contains("[info]") {
info!("<bright black>[{suffix}]</> {}", format_line(line, "info"))
} else if line.contains("[warning]") {
warn!(
"<bright black>[{suffix}]</> {}",
format_line(line, "warning")
)
} else {
if suffix != "server"
&& !line.contains("Input/output error")
&& !line.contains("Broken pipe")
{
error!(
"<bright black>[{suffix}]</> {}",
format_line(line.clone(), "error")
);
}
}
}
Ok(())
}
fn is_in_system(name: &str) {
if let Ok(mut proc) = Command::new(name)
.stderr(Stdio::null())
.stdout(Stdio::null())
.spawn()
{
if let Err(e) = proc.wait() {
error!("{e:?}")
};
} else {
error!("{name} not found on system!");
exit(0x0100);
}
}
fn ffmpeg_libs_and_filter() -> (Vec<String>, Vec<String>) {
let mut libs: Vec<String> = vec![];
let mut filters: Vec<String> = vec![];
let re: Regex = Regex::new(r"^( ?) [TSC.]+").unwrap();
let mut ff_proc = match Command::new("ffmpeg")
.arg("-filters")
.stderr(Stdio::piped())
.stdout(Stdio::piped())
.spawn()
{
Err(e) => {
error!("couldn't spawn ffmpeg process: {}", e);
exit(0x0100);
}
Ok(proc) => proc,
};
let err_buffer = BufReader::new(ff_proc.stderr.take().unwrap());
let out_buffer = BufReader::new(ff_proc.stdout.take().unwrap());
for line in err_buffer.lines() {
if let Ok(line) = line {
if line.contains("configuration:") {
let configs = line.split_whitespace();
for config in configs {
if config.contains("--enable-lib") {
libs.push(config.replace("--enable-", ""));
}
}
}
}
}
for line in out_buffer.lines() {
if let Ok(line) = line {
if let Some(_) = re.captures(line.as_str()) {
let filter_line = line.split_whitespace();
filters.push(filter_line.collect::<Vec<&str>>()[1].to_string());
}
}
}
if let Err(e) = ff_proc.wait() {
error!("{:?}", e)
};
(libs, filters)
}
pub fn validate_ffmpeg() {
let config = GlobalConfig::global();
is_in_system("ffmpeg");
is_in_system("ffprobe");
if config.out.mode == "desktop" {
is_in_system("ffplay");
}
let (libs, filters) = ffmpeg_libs_and_filter();
if !libs.contains(&"libx264".to_string()) {
error!("ffmpeg contains no libx264!");
exit(0x0100);
}
if !libs.contains(&"libfdk-aac".to_string()) {
warn!("ffmpeg contains no libfdk-aac! Can't use high quality aac encoder...");
}
if !filters.contains(&"tpad".to_string()) {
error!("ffmpeg contains no tpad filter!");
exit(0x0100);
}
if !filters.contains(&"zmq".to_string()) {
warn!("ffmpeg contains no zmq filter! Text messages will not work...");
}
}