Merge pull request #4 from jb-alvarado/api

add API
This commit is contained in:
jb-alvarado 2022-06-22 18:04:23 +02:00 committed by GitHub
commit 52dc141bb8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
56 changed files with 4208 additions and 534 deletions

1
.gitignore vendored
View File

@ -18,5 +18,6 @@
*tar.gz
*.deb
*.rpm
/assets/*.db*
.vscode/

1712
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -1,77 +1,12 @@
[package]
name = "ffplayout-engine"
description = "24/7 playout based on rust and ffmpeg"
license = "GPL-3.0"
authors = ["Jonathan Baecker jonbae77@gmail.com"]
readme = "README.md"
version = "0.9.8"
edition = "2021"
default-run = "ffplayout"
[workspace]
[dependencies]
chrono = { git = "https://github.com/sbrocket/chrono", branch = "parse-error-kind-public" }
clap = { version = "3.1", features = ["derive"] }
crossbeam-channel = "0.5"
ffprobe = "0.3"
file-rotate = { git = "https://github.com/Ploppz/file-rotate.git", branch = "timestamp-parse-fix" }
jsonrpc-http-server = "18.0"
lettre = "0.10.0-rc.7"
log = "0.4"
notify = "4.0"
rand = "0.8"
regex = "1"
reqwest = { version = "0.11", features = ["blocking"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
serde_yaml = "0.8"
shlex = "1.1"
simplelog = { version = "^0.12", features = ["paris"] }
time = { version = "0.3", features = ["formatting", "macros"] }
walkdir = "2"
[target.x86_64-unknown-linux-musl.dependencies]
openssl = { version = "0.10", features = ["vendored"] }
[[bin]]
name = "ffplayout"
path = "src/main.rs"
members = [
"ffplayout-api",
"ffplayout-engine",
"lib",
]
[profile.release]
opt-level = 3
strip = true
lto = true
# DEBIAN DEB PACKAGE
[package.metadata.deb]
name = "ffplayout-engine"
priority = "optional"
section = "net"
license-file = ["LICENSE", "0"]
depends = ""
suggests = "ffmpeg"
copyright = "Copyright (c) 2022, Jonathan Baecker. All rights reserved."
conf-files = ["/etc/ffplayout/ffplayout.yml"]
assets = [
[
"target/x86_64-unknown-linux-musl/release/ffplayout",
"/usr/bin/ffplayout",
"755"
],
["assets/ffplayout.yml", "/etc/ffplayout/ffplayout.yml", "644"],
["assets/logo.png", "/usr/share/ffplayout/logo.png", "644"],
["README.md", "/usr/share/doc/ffplayout-engine/README", "644"],
]
systemd-units = { unit-name = "ffplayout-engine", unit-scripts = "assets", enable = false }
# REHL RPM PACKAGE
[package.metadata.generate-rpm]
name = "ffplayout-engine"
license = "GPL-3.0"
assets = [
{ source = "target/x86_64-unknown-linux-musl/release/ffplayout", dest = "/usr/bin/ffplayout", mode = "755" },
{ source = "assets/ffplayout.yml", dest = "/etc/ffplayout/ffplayout.yml", mode = "644", config = true },
{ source = "assets/ffplayout-engine.service", dest = "/lib/systemd/system/ffplayout-engine.service", mode = "644" },
{ source = "README.md", dest = "/usr/share/doc/ffplayout-engine/README", mode = "644", doc = true },
{ source = "LICENSE", dest = "/usr/share/doc/ffplayout-engine/LICENSE", mode = "644" },
{ source = "assets/logo.png", dest = "/usr/share/ffplayout/logo.png", mode = "644" },
]

View File

@ -16,7 +16,7 @@ The main purpose of ffplayout is to provide a 24/7 broadcasting solution that pl
- playing clips in [watched](/docs/folder_mode.md) folder mode
- send emails with error message
- overlay a logo
- overlay text, controllable through [messenger](https://github.com/ffplayout/messenger) or [ffplayout-frontend](https://github.com/ffplayout/ffplayout-frontend) (needs ffmpeg with libzmq)
- overlay text, controllable through [messenger](https://github.com/ffplayout/messenger) or [ffplayout-frontend](https://github.com/ffplayout/ffplayout-frontend) (needs ffmpeg with libzmq and enabled JSON RPC server)
- EBU R128 loudness normalization (single pass)
- loop playlist infinitely
- [remote source](/docs/remote_source.md)
@ -122,20 +122,24 @@ The ffplayout engine can run a JSON RPC server. A request show look like:
```Bash
curl -X POST -H "Content-Type: application/json" -H "Authorization: ---auth-key---" \
-d '{"jsonrpc": "2.0", "method": "player", "params":{"control":"next"}, "id":1 }' \
-d '{"jsonrpc": "2.0", "id":1, "method": "player", "params":{"control":"next"}}' \
127.0.0.1:7070
```
At the moment this comments are possible:
```Bash
'{"jsonrpc": "2.0", "method": "player", "params":{"media":"current"}, "id":1 }' # get infos about current clip
'{"jsonrpc": "2.0", "method": "player", "params":{"media":"next"}, "id":2 }' # get infos about next clip
'{"jsonrpc": "2.0", "method": "player", "params":{"media":"last"}, "id":3 }' # get infos about last clip
'{"jsonrpc": "2.0", "method": "player", "params":{"control":"next"}, "id":4 }' # jump to next clip
'{"jsonrpc": "2.0", "method": "player", "params":{"control":"back"}, "id":5 }' # jump to last clip
'{"jsonrpc": "2.0", "method": "player", "params":{"control":"reset"}, "id":6 }' # reset playlist to old state
'{"jsonrpc": "2.0", "id":1, "method": "player", "params":{"media":"current"}}' # get infos about current clip
'{"jsonrpc": "2.0", "id":2, "method": "player", "params":{"media":"next"}}' # get infos about next clip
'{"jsonrpc": "2.0", "id":3, "method": "player", "params":{"media":"last"}}' # get infos about last clip
'{"jsonrpc": "2.0", "id":4, "method": "player", "params":{"control":"next"}}' # jump to next clip
'{"jsonrpc": "2.0", "id":5, "method": "player", "params":{"control":"back"}}' # jump to last clip
'{"jsonrpc": "2.0", "id":6, "method": "player", "params":{"control":"reset"}}' # reset playlist to old state
'{"jsonrpc": "2.0", "id":7, "method": "player", "params":{"control":"text", \
"message": {"text": "Hello from ffplayout", "x": "(w-text_w)/2", "y": "(h-text_h)/2", \
"fontsize": 24, "line_spacing": 4, "fontcolor": "#ffffff", "box": 1, \
"boxcolor": "#000000", "boxborderw": 4, "alpha": 1.0}}}' # send text to drawtext filter from ffmpeg
```
Output from `{"media":"current"}` show:

View File

@ -105,16 +105,12 @@ text:
help_text: Overlay text in combination with libzmq for remote text manipulation.
On windows fontfile path need to be like this 'C\:/WINDOWS/fonts/DejaVuSans.ttf'.
In a standard environment the filter drawtext node is Parsed_drawtext_2.
'over_pre' if True text will be overlay in pre processing. Continue same text
over multiple files is in that mode not possible. 'text_from_filename' activate the
extraction from text of a filename. With 'style' you can define the drawtext
parameters like position, color, etc. Post Text over API will override this.
With 'regex' you can format file names, to get a title from it.
'text_from_filename' activate the extraction from text of a filename. With 'style'
you can define the drawtext parameters like position, color, etc. Post Text over
API will override this. With 'regex' you can format file names, to get a title from it.
add_text: false
over_pre: false
bind_address: "127.0.0.1:5555"
fontfile: "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf"
text_from_filename: false
fontfile: "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf"
style: "x=(w-tw)/2:y=(h-line_h)*0.9:fontsize=24:fontcolor=#ffffff:box=1:boxcolor=#000000:boxborderw=4"
regex: ^.+[/\\](.*)(.mp4|.mkv)$

100
build_all.sh Executable file
View File

@ -0,0 +1,100 @@
#!/usr/bin/bash
targets=("x86_64-unknown-linux-musl" "x86_64-pc-windows-gnu" "x86_64-apple-darwin" "aarch64-apple-darwin")
IFS="= "
while read -r name value; do
if [[ $name == "version" ]]; then
version=${value//\"/}
fi
done < ffplayout-engine/Cargo.toml
echo "Compile ffplayout-engine version is: \"$version\""
echo ""
for target in "${targets[@]}"; do
echo "compile static for $target"
echo ""
cargo build --release --target=$target --bin ffplayout
if [[ $target == "x86_64-pc-windows-gnu" ]]; then
if [[ -f "ffplayout-engine-v${version}_${target}.zip" ]]; then
rm -f "ffplayout-engine-v${version}_${target}.zip"
fi
cp ./target/${target}/release/ffplayout.exe .
zip -r "ffplayout-engine-v${version}_${target}.zip" assets docs LICENSE README.md ffplayout.exe -x *.db
rm -f ffplayout.exe
elif [[ $target == "x86_64-apple-darwin" ]] || [[ $target == "aarch64-apple-darwin" ]]; then
if [[ -f "ffplayout-engine-v${version}_${target}.tar.gz" ]]; then
rm -f "ffplayout-engine-v${version}_${target}.tar.gz"
fi
cp ./target/${target}/release/ffplayout .
tar -czvf "ffplayout-engine-v${version}_${target}.tar.gz" --exclude='*.db' assets docs LICENSE README.md ffplayout
rm -f ffplayout
else
if [[ -f "ffplayout-engine-v${version}_${target}.tar.gz" ]]; then
rm -f "ffplayout-engine-v${version}_${target}.tar.gz"
fi
cp ./target/${target}/release/ffplayout .
tar -czvf "ffplayout-engine-v${version}_${target}.tar.gz" --exclude='*.db' assets docs LICENSE README.md ffplayout
rm -f ffplayout
fi
echo ""
done
cargo deb --target=x86_64-unknown-linux-musl -p ffplayout-engine
mv ./target/x86_64-unknown-linux-musl/debian/ffplayout-engine_${version}_amd64.deb .
cargo generate-rpm --target=x86_64-unknown-linux-musl -p ffplayout-engine
mv ./target/x86_64-unknown-linux-musl/generate-rpm/ffplayout-engine-${version}-1.x86_64.rpm .
IFS="= "
while read -r name value; do
if [[ $name == "version" ]]; then
version=${value//\"/}
fi
done < ffplayout-api/Cargo.toml
echo "Compile ffplayout-api version is: \"$version\""
echo ""
for target in "${targets[@]}"; do
echo "compile static for $target"
echo ""
if [[ $target == "x86_64-pc-windows-gnu" ]]; then
if [[ -f "ffplayout-api-v${version}_${target}.zip" ]]; then
rm -f "ffplayout-api-v${version}_${target}.zip"
fi
cargo build --release --target=$target --bin ffpapi
cp ./target/${target}/release/ffpapi.exe .
zip -r "ffplayout-api-v${version}_${target}.zip" assets docs LICENSE README.md ffpapi.exe -x *.db
rm -f ffpapi.exe
elif [[ $target == "x86_64-unknown-linux-musl" ]]; then
if [[ -f "ffplayout-api-v${version}_${target}.tar.gz" ]]; then
rm -f "ffplayout-api-v${version}_${target}.tar.gz"
fi
cargo build --release --target=$target --bin ffpapi
cp ./target/${target}/release/ffpapi .
tar -czvf "ffplayout-api-v${version}_${target}.tar.gz" --exclude='*.db' assets docs LICENSE README.md ffpapi
rm -f ffpapi
fi
echo ""
done
cargo deb --target=x86_64-unknown-linux-musl -p ffplayout-api
mv ./target/x86_64-unknown-linux-musl/debian/ffplayout-api_${version}_amd64.deb .
cargo generate-rpm --target=x86_64-unknown-linux-musl -p ffplayout-api
mv ./target/x86_64-unknown-linux-musl/generate-rpm/ffplayout-api-${version}-1.x86_64.rpm .

View File

@ -1,54 +0,0 @@
#!/usr/bin/bash
targets=("x86_64-unknown-linux-musl" "x86_64-pc-windows-gnu" "x86_64-apple-darwin" "aarch64-apple-darwin")
IFS="= "
while read -r name value; do
if [[ $name == "version" ]]; then
version=${value//\"/}
fi
done < Cargo.toml
echo "Compile ffplayout-engine version is: \"$version\""
echo ""
for target in "${targets[@]}"; do
echo "compile static for $target"
echo ""
cargo build --release --target=$target
if [[ $target == "x86_64-pc-windows-gnu" ]]; then
if [[ -f "ffplayout-engine-v${version}_${target}.zip" ]]; then
rm -f "ffplayout-engine-v${version}_${target}.zip"
fi
cp ./target/${target}/release/ffplayout.exe .
zip -r "ffplayout-engine-v${version}_${target}.zip" assets docs LICENSE README.md ffplayout.exe -x *.db
rm -f ffplayout.exe
else
if [[ -f "ffplayout-engine-v${version}_${target}.tar.gz" ]]; then
rm -f "ffplayout-engine-v${version}_${target}.tar.gz"
fi
cp ./target/${target}/release/ffplayout .
tar -czvf "ffplayout-engine-v${version}_${target}.tar.gz" --exclude='*.db' assets docs LICENSE README.md ffplayout
rm -f ffplayout
fi
echo ""
done
echo "Create debian package"
echo ""
cargo deb --target=x86_64-unknown-linux-musl
mv ./target/x86_64-unknown-linux-musl/debian/ffplayout-engine_${version}_amd64.deb .
echo ""
echo "Create rhel package"
echo ""
cargo generate-rpm --target=x86_64-unknown-linux-musl
mv ./target/x86_64-unknown-linux-musl/generate-rpm/ffplayout-engine-${version}-1.x86_64.rpm .

79
ffplayout-api/Cargo.toml Normal file
View File

@ -0,0 +1,79 @@
[package]
name = "ffplayout-api"
description = "Rest API for ffplayout"
license = "GPL-3.0"
authors = ["Jonathan Baecker jonbae77@gmail.com"]
readme = "README.md"
version = "0.3.0"
edition = "2021"
[dependencies]
ffplayout-lib = { path = "../lib" }
actix-multipart = "0.4"
actix-web = "4"
actix-web-grants = "3"
actix-web-httpauth = "0.6"
argon2 = "0.4"
chrono = "0.4"
clap = { version = "3.2", features = ["derive"] }
derive_more = "0.99"
faccess = "0.2"
ffprobe = "0.3"
futures-util = { version = "0.3", default-features = false, features = ["std"] }
jsonwebtoken = "8"
log = "0.4"
once_cell = "1.10"
rand = "0.8"
rand_core = { version = "0.6", features = ["std"] }
relative-path = "1.6"
regex = "1"
reqwest = { version = "0.11", features = ["blocking", "json"] }
sanitize-filename = "0.3"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
serde_yaml = "0.8"
simplelog = { version = "^0.12", features = ["paris"] }
sqlx = { version = "0.5", features = [
"chrono",
"runtime-actix-native-tls",
"sqlite"
] }
[target.x86_64-unknown-linux-musl.dependencies]
openssl = { version = "0.10", features = ["vendored"] }
[[bin]]
name = "ffpapi"
path = "src/main.rs"
# DEBIAN DEB PACKAGE
[package.metadata.deb]
name = "ffplayout-api"
priority = "optional"
section = "net"
license-file = ["../LICENSE", "0"]
depends = ""
suggests = "ffmpeg"
copyright = "Copyright (c) 2022, Jonathan Baecker. All rights reserved."
conf-files = ["/etc/ffplayout/ffplayout.yml"]
assets = [
[
"../target/x86_64-unknown-linux-musl/release/ffpapi",
"/usr/bin/ffpapi",
"755"
],
["README.md", "/usr/share/doc/ffplayout/README", "644"],
]
maintainer-scripts = "debian/"
systemd-units = { enable = false, unit-scripts = "unit" }
# REHL RPM PACKAGE
[package.metadata.generate-rpm]
name = "ffplayout-api"
license = "GPL-3.0"
assets = [
{ source = "../target/x86_64-unknown-linux-musl/release/ffpapi", dest = "/usr/bin/ffpapi", mode = "755" },
{ source = "unit/ffpapi.service", dest = "/lib/systemd/system/ffpapi.service", mode = "644" },
{ source = "README.md", dest = "/usr/share/doc/ffplayout/README", mode = "644", doc = true },
{ source = "../LICENSE", dest = "/usr/share/doc/ffplayout/LICENSE", mode = "644" },
]

2
ffplayout-api/README.md Normal file
View File

@ -0,0 +1,2 @@
**ffplayout-api**
================

View File

113
ffplayout-api/src/main.rs Normal file
View File

@ -0,0 +1,113 @@
use std::{path::Path, process::exit};
use actix_web::{dev::ServiceRequest, middleware, web, App, Error, HttpMessage, HttpServer};
use actix_web_grants::permissions::AttachPermissions;
use actix_web_httpauth::extractors::bearer::BearerAuth;
use actix_web_httpauth::middleware::HttpAuthentication;
use clap::Parser;
use simplelog::*;
pub mod utils;
use utils::{
args_parse::Args,
auth, db_path, init_config,
models::LoginUser,
routes::{
add_preset, add_user, del_playlist, file_browser, gen_playlist, get_playlist,
get_playout_config, get_presets, get_settings, jump_to_last, jump_to_next, login,
media_current, media_last, media_next, move_rename, patch_settings, remove, reset_playout,
save_file, save_playlist, send_text_message, update_playout_config, update_preset,
update_user,
},
run_args, Role,
};
use ffplayout_lib::utils::{init_logging, PlayoutConfig};
async fn validator(req: ServiceRequest, credentials: BearerAuth) -> Result<ServiceRequest, Error> {
// We just get permissions from JWT
let claims = auth::decode_jwt(credentials.token()).await?;
req.attach(vec![Role::set_role(&claims.role)]);
req.extensions_mut()
.insert(LoginUser::new(claims.id, claims.username));
Ok(req)
}
#[actix_web::main]
async fn main() -> std::io::Result<()> {
let args = Args::parse();
let mut config = PlayoutConfig::new(None);
config.mail.recipient = String::new();
config.logging.log_to_file = false;
config.logging.timestamp = false;
let logging = init_logging(&config, None, None);
CombinedLogger::init(logging).unwrap();
if let Err(c) = run_args(args.clone()).await {
exit(c);
}
if let Some(conn) = args.listen {
if let Ok(p) = db_path() {
if !Path::new(&p).is_file() {
error!("Database is not initialized! Init DB first and add admin user.");
exit(1);
}
}
init_config().await;
let ip_port = conn.split(':').collect::<Vec<&str>>();
let addr = ip_port[0];
let port = ip_port[1].parse::<u16>().unwrap();
info!("running ffplayout API, listen on {conn}");
// TODO: add allow origin (or give it to the proxy)
HttpServer::new(move || {
let auth = HttpAuthentication::bearer(validator);
App::new()
.wrap(middleware::Logger::default())
.service(login)
.service(
web::scope("/api")
.wrap(auth)
.service(add_user)
.service(get_playout_config)
.service(update_playout_config)
.service(add_preset)
.service(get_presets)
.service(update_preset)
.service(get_settings)
.service(patch_settings)
.service(update_user)
.service(send_text_message)
.service(jump_to_next)
.service(jump_to_last)
.service(reset_playout)
.service(media_current)
.service(media_next)
.service(media_last)
.service(get_playlist)
.service(save_playlist)
.service(gen_playlist)
.service(del_playlist)
.service(file_browser)
.service(move_rename)
.service(remove)
.service(save_file),
)
})
.bind((addr, port))?
.run()
.await
} else {
error!("Run ffpapi with listen parameter!");
Ok(())
}
}

View File

@ -0,0 +1,22 @@
use clap::Parser;
#[derive(Parser, Debug, Clone)]
#[clap(version,
about = "REST API for ffplayout",
long_about = None)]
pub struct Args {
#[clap(short, long, help = "Listen on IP:PORT, like: 127.0.0.1:8080")]
pub listen: Option<String>,
#[clap(short, long, help = "Initialize Database")]
pub init: bool,
#[clap(short, long, help = "Create admin user")]
pub username: Option<String>,
#[clap(short, long, help = "Admin email")]
pub email: Option<String>,
#[clap(short, long, help = "Admin password")]
pub password: Option<String>,
}

View File

@ -0,0 +1,46 @@
use actix_web::error::ErrorUnauthorized;
use actix_web::Error;
use chrono::{Duration, Utc};
use jsonwebtoken::{self, DecodingKey, EncodingKey, Header, Validation};
use serde::{Deserialize, Serialize};
use crate::utils::GlobalSettings;
// Token lifetime
const JWT_EXPIRATION_DAYS: i64 = 7;
#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
pub struct Claims {
pub id: i64,
pub username: String,
pub role: String,
exp: i64,
}
impl Claims {
pub fn new(id: i64, username: String, role: String) -> Self {
Self {
id,
username,
role,
exp: (Utc::now() + Duration::days(JWT_EXPIRATION_DAYS)).timestamp(),
}
}
}
/// Create a json web token (JWT)
pub fn create_jwt(claims: Claims) -> Result<String, Error> {
let config = GlobalSettings::global();
let encoding_key = EncodingKey::from_secret(config.secret.as_bytes());
jsonwebtoken::encode(&Header::default(), &claims, &encoding_key)
.map_err(|e| ErrorUnauthorized(e.to_string()))
}
/// Decode a json web token (JWT)
pub async fn decode_jwt(token: &str) -> Result<Claims, Error> {
let config = GlobalSettings::global();
let decoding_key = DecodingKey::from_secret(config.secret.as_bytes());
jsonwebtoken::decode::<Claims>(token, &decoding_key, &Validation::default())
.map(|data| data.claims)
.map_err(|e| ErrorUnauthorized(e.to_string()))
}

View File

@ -0,0 +1,107 @@
use std::collections::HashMap;
use reqwest::{
header::{HeaderMap, AUTHORIZATION, CONTENT_TYPE},
Client, Response,
};
use serde::{Deserialize, Serialize};
use simplelog::*;
use crate::utils::{errors::ServiceError, playout_config};
#[derive(Debug, Deserialize, Serialize, Clone)]
struct RpcObj<T> {
jsonrpc: String,
id: i64,
method: String,
params: T,
}
#[derive(Debug, Deserialize, Serialize, Clone)]
struct TextParams {
control: String,
message: HashMap<String, String>,
}
#[derive(Debug, Deserialize, Serialize, Clone)]
struct ControlParams {
control: String,
}
#[derive(Debug, Deserialize, Serialize, Clone)]
struct MediaParams {
media: String,
}
impl<T> RpcObj<T> {
fn new(id: i64, method: String, params: T) -> Self {
Self {
jsonrpc: "2.0".into(),
id,
method,
params,
}
}
}
fn create_header(auth: &str) -> HeaderMap {
let mut headers = HeaderMap::new();
headers.insert(
CONTENT_TYPE,
"Content-Type: application/json".parse().unwrap(),
);
headers.insert(AUTHORIZATION, auth.parse().unwrap());
headers
}
async fn post_request<T>(id: i64, obj: RpcObj<T>) -> Result<Response, ServiceError>
where
T: Serialize,
{
let (config, _) = playout_config(&id).await?;
let url = format!("http://{}", config.rpc_server.address);
let client = Client::new();
match client
.post(&url)
.headers(create_header(&config.rpc_server.authorization))
.json(&obj)
.send()
.await
{
Ok(result) => Ok(result),
Err(e) => {
error!("{e:?}");
Err(ServiceError::BadRequest(e.to_string()))
}
}
}
pub async fn send_message(
id: i64,
message: HashMap<String, String>,
) -> Result<Response, ServiceError> {
let json_obj = RpcObj::new(
id,
"player".into(),
TextParams {
control: "text".into(),
message,
},
);
post_request(id, json_obj).await
}
pub async fn control_state(id: i64, command: String) -> Result<Response, ServiceError> {
let json_obj = RpcObj::new(id, "player".into(), ControlParams { control: command });
post_request(id, json_obj).await
}
pub async fn media_info(id: i64, command: String) -> Result<Response, ServiceError> {
let json_obj = RpcObj::new(id, "player".into(), MediaParams { media: command });
post_request(id, json_obj).await
}

View File

@ -0,0 +1,61 @@
use actix_web::{error::ResponseError, Error, HttpResponse};
use derive_more::Display;
#[derive(Debug, Display)]
pub enum ServiceError {
#[display(fmt = "Internal Server Error")]
InternalServerError,
#[display(fmt = "BadRequest: {}", _0)]
BadRequest(String),
#[display(fmt = "Conflict: {}", _0)]
Conflict(String),
#[display(fmt = "Unauthorized")]
Unauthorized,
}
// impl ResponseError trait allows to convert our errors into http responses with appropriate data
impl ResponseError for ServiceError {
fn error_response(&self) -> HttpResponse {
match self {
ServiceError::InternalServerError => {
HttpResponse::InternalServerError().json("Internal Server Error. Please try later.")
}
ServiceError::BadRequest(ref message) => HttpResponse::BadRequest().json(message),
ServiceError::Conflict(ref message) => HttpResponse::Conflict().json(message),
ServiceError::Unauthorized => HttpResponse::Unauthorized().json("No Permission!"),
}
}
}
impl From<String> for ServiceError {
fn from(err: String) -> ServiceError {
ServiceError::BadRequest(err)
}
}
impl From<Error> for ServiceError {
fn from(err: Error) -> ServiceError {
ServiceError::BadRequest(err.to_string())
}
}
impl From<actix_multipart::MultipartError> for ServiceError {
fn from(err: actix_multipart::MultipartError) -> ServiceError {
ServiceError::BadRequest(err.to_string())
}
}
impl From<std::io::Error> for ServiceError {
fn from(err: std::io::Error) -> ServiceError {
ServiceError::BadRequest(err.to_string())
}
}
impl From<actix_web::error::BlockingError> for ServiceError {
fn from(err: actix_web::error::BlockingError) -> ServiceError {
ServiceError::BadRequest(err.to_string())
}
}

View File

@ -0,0 +1,285 @@
use std::{
fs,
io::Write,
path::{Path, PathBuf},
};
use actix_multipart::Multipart;
use actix_web::{web, HttpResponse};
use futures_util::TryStreamExt as _;
use rand::{distributions::Alphanumeric, Rng};
use relative_path::RelativePath;
use serde::{Deserialize, Serialize};
use simplelog::*;
use crate::utils::{errors::ServiceError, playout_config};
use ffplayout_lib::utils::file_extension;
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct PathObject {
pub source: String,
folders: Option<Vec<String>>,
files: Option<Vec<String>>,
}
impl PathObject {
fn new(source: String) -> Self {
Self {
source,
folders: Some(vec![]),
files: Some(vec![]),
}
}
}
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct MoveObject {
source: String,
target: String,
}
pub async fn browser(id: i64, path_obj: &PathObject) -> Result<PathObject, ServiceError> {
let (config, _) = playout_config(&id).await?;
let path = PathBuf::from(config.storage.path);
let extensions = config.storage.extensions;
let path_component = RelativePath::new(&path_obj.source)
.normalize()
.to_string()
.replace("../", "");
let path = path.join(path_component.clone());
let mut obj = PathObject::new(path_component.clone());
let mut paths: Vec<_> = match fs::read_dir(path) {
Ok(p) => p.filter_map(|r| r.ok()).collect(),
Err(e) => {
error!("{e} in {path_component}");
return Err(ServiceError::InternalServerError);
}
};
paths.sort_by_key(|dir| dir.path());
for path in paths {
let file_path = path.path().to_owned();
let path_str = file_path.display().to_string();
// ignore hidden files/folders on unix
if path_str.contains("/.") {
continue;
}
if file_path.is_dir() {
if let Some(ref mut folders) = obj.folders {
folders.push(path_str);
}
} else if file_path.is_file() {
if let Some(ext) = file_extension(&file_path) {
if extensions.contains(&ext.to_string().to_lowercase()) {
if let Some(ref mut files) = obj.files {
files.push(path_str);
}
}
}
}
}
Ok(obj)
}
// fn copy_and_delete(source: &PathBuf, target: &PathBuf) -> Result<PathObject, ServiceError> {
// match fs::copy(&source, &target) {
// Ok(_) => {
// if let Err(e) = fs::remove_file(source) {
// error!("{e}");
// return Err(ServiceError::BadRequest(
// "Removing File not possible!".into(),
// ));
// };
// return Ok(PathObject::new(target.display().to_string()));
// }
// Err(e) => {
// error!("{e}");
// Err(ServiceError::BadRequest("Error in file copy!".into()))
// }
// }
// }
fn rename(source: &PathBuf, target: &PathBuf) -> Result<MoveObject, ServiceError> {
match fs::rename(&source, &target) {
Ok(_) => Ok(MoveObject {
source: source.display().to_string(),
target: target.display().to_string(),
}),
Err(e) => {
error!("{e}");
Err(ServiceError::BadRequest("Rename failed!".into()))
}
}
}
pub async fn rename_file(id: i64, move_object: &MoveObject) -> Result<MoveObject, ServiceError> {
let (config, _) = playout_config(&id).await?;
let path = PathBuf::from(&config.storage.path);
let source = RelativePath::new(&move_object.source)
.normalize()
.to_string()
.replace("../", "");
let target = RelativePath::new(&move_object.target)
.normalize()
.to_string()
.replace("../", "");
let mut source_path = PathBuf::from(source.clone());
let mut target_path = PathBuf::from(target.clone());
let relativ_path = RelativePath::new(&config.storage.path)
.normalize()
.to_string();
if !source_path.starts_with(&relativ_path) {
source_path = path.join(source);
} else {
source_path = path.join(source_path.strip_prefix(&relativ_path).unwrap());
}
if !target_path.starts_with(&relativ_path) {
target_path = path.join(target);
} else {
target_path = path.join(target_path.strip_prefix(relativ_path).unwrap());
}
if !source_path.exists() {
return Err(ServiceError::BadRequest("Source file not exist!".into()));
}
if (source_path.is_dir() || source_path.is_file()) && source_path.parent() == Some(&target_path)
{
return rename(&source_path, &target_path);
}
if target_path.is_dir() {
target_path = target_path.join(source_path.file_name().unwrap());
}
if target_path.is_file() {
return Err(ServiceError::BadRequest(
"Target file already exists!".into(),
));
}
if source_path.is_file() && target_path.parent().is_some() {
return rename(&source_path, &target_path);
}
Err(ServiceError::InternalServerError)
}
pub async fn remove_file_or_folder(id: i64, source_path: &str) -> Result<(), ServiceError> {
let (config, _) = playout_config(&id).await?;
let source = PathBuf::from(source_path);
let test_source = RelativePath::new(&source_path)
.normalize()
.to_string()
.replace("../", "");
let test_path = RelativePath::new(&config.storage.path)
.normalize()
.to_string();
if !test_source.starts_with(&test_path) {
return Err(ServiceError::BadRequest(
"Source file is not in storage!".into(),
));
}
if !source.exists() {
return Err(ServiceError::BadRequest("Source does not exists!".into()));
}
if source.is_dir() {
match fs::remove_dir(source) {
Ok(_) => return Ok(()),
Err(e) => {
error!("{e}");
return Err(ServiceError::BadRequest(
"Delete folder failed! (Folder must be empty)".into(),
));
}
};
}
if source.is_file() {
match fs::remove_file(source) {
Ok(_) => return Ok(()),
Err(e) => {
error!("{e}");
return Err(ServiceError::BadRequest("Delete file failed!".into()));
}
};
}
Err(ServiceError::InternalServerError)
}
async fn valid_path(id: i64, path: &str) -> Result<(), ServiceError> {
let (config, _) = playout_config(&id).await?;
let test_target = RelativePath::new(&path)
.normalize()
.to_string()
.replace("../", "");
let test_path = RelativePath::new(&config.storage.path)
.normalize()
.to_string();
if !test_target.starts_with(&test_path) {
return Err(ServiceError::BadRequest(
"Target folder is not in storage!".into(),
));
}
if !Path::new(path).is_dir() {
return Err(ServiceError::BadRequest("Target folder not exists!".into()));
}
Ok(())
}
pub async fn upload(id: i64, mut payload: Multipart) -> Result<HttpResponse, ServiceError> {
while let Some(mut field) = payload.try_next().await? {
let content_disposition = field.content_disposition();
debug!("{content_disposition}");
let rand_string: String = rand::thread_rng()
.sample_iter(&Alphanumeric)
.take(20)
.map(char::from)
.collect();
let path_name = content_disposition.get_name().unwrap_or(&rand_string);
let filename = content_disposition
.get_filename()
.map_or_else(|| rand_string.to_string(), sanitize_filename::sanitize);
if let Err(e) = valid_path(id, path_name).await {
return Err(e);
}
let filepath = PathBuf::from(path_name).join(filename);
if filepath.is_file() {
return Err(ServiceError::BadRequest("Target already exists!".into()));
}
// File::create is blocking operation, use threadpool
let mut f = web::block(|| std::fs::File::create(filepath)).await??;
while let Some(chunk) = field.try_next().await? {
f = web::block(move || f.write_all(&chunk).map(|_| f)).await??;
}
}
Ok(HttpResponse::Ok().into())
}

View File

@ -0,0 +1,273 @@
use argon2::{
password_hash::{rand_core::OsRng, SaltString},
Argon2, PasswordHasher,
};
use rand::{distributions::Alphanumeric, Rng};
use simplelog::*;
use sqlx::{migrate::MigrateDatabase, sqlite::SqliteQueryResult, Pool, Sqlite, SqlitePool};
use crate::utils::{
db_path,
models::{Settings, TextPreset, User},
GlobalSettings,
};
#[derive(Debug, sqlx::FromRow)]
struct Role {
name: String,
}
async fn create_schema() -> Result<SqliteQueryResult, sqlx::Error> {
let conn = db_connection().await?;
let query = "PRAGMA foreign_keys = ON;
CREATE TABLE IF NOT EXISTS global
(
id INTEGER PRIMARY KEY AUTOINCREMENT,
secret TEXT NOT NULL,
UNIQUE(secret)
);
CREATE TABLE IF NOT EXISTS roles
(
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
UNIQUE(name)
);
CREATE TABLE IF NOT EXISTS presets
(
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
text TEXT NOT NULL,
x TEXT NOT NULL,
y TEXT NOT NULL,
fontsize TEXT NOT NULL,
line_spacing TEXT NOT NULL,
fontcolor TEXT NOT NULL,
box TEXT NOT NULL,
boxcolor TEXT NOT NULL,
boxborderw TEXT NOT NULL,
alpha TEXT NOT NULL,
UNIQUE(name)
);
CREATE TABLE IF NOT EXISTS settings
(
id INTEGER PRIMARY KEY AUTOINCREMENT,
channel_name TEXT NOT NULL,
preview_url TEXT NOT NULL,
config_path TEXT NOT NULL,
extra_extensions TEXT NOT NULL,
UNIQUE(channel_name)
);
CREATE TABLE IF NOT EXISTS user
(
id INTEGER PRIMARY KEY AUTOINCREMENT,
email TEXT NOT NULL,
username TEXT NOT NULL,
password TEXT NOT NULL,
salt TEXT NOT NULL,
role_id INTEGER NOT NULL DEFAULT 2,
FOREIGN KEY (role_id) REFERENCES roles (id) ON UPDATE SET NULL ON DELETE SET NULL,
UNIQUE(email, username)
);";
let result = sqlx::query(query).execute(&conn).await;
conn.close().await;
result
}
pub async fn db_init() -> Result<&'static str, Box<dyn std::error::Error>> {
let db_path = db_path()?;
if !Sqlite::database_exists(&db_path).await.unwrap_or(false) {
Sqlite::create_database(&db_path).await.unwrap();
match create_schema().await {
Ok(_) => info!("Database created Successfully"),
Err(e) => panic!("{e}"),
}
}
let secret: String = rand::thread_rng()
.sample_iter(&Alphanumeric)
.take(80)
.map(char::from)
.collect();
let instances = db_connection().await?;
let query = "CREATE TRIGGER global_row_count
BEFORE INSERT ON global
WHEN (SELECT COUNT(*) FROM global) >= 1
BEGIN
SELECT RAISE(FAIL, 'Database is already init!');
END;
INSERT INTO global(secret) VALUES($1);
INSERT INTO presets(name, text, x, y, fontsize, line_spacing, fontcolor, alpha, box, boxcolor, boxborderw)
VALUES('Default', 'Wellcome to ffplayout messenger!', '(w-text_w)/2', '(h-text_h)/2', '24', '4', '#ffffff@0xff', '1.0', '0', '#000000@0x80', '4'),
('Empty Text', '', '0', '0', '24', '4', '#000000', '0', '0', '#000000', '0'),
('Bottom Text fade in', 'The upcoming event will be delayed by a few minutes.', '(w-text_w)/2', '(h-line_h)*0.9', '24', '4', '#ffffff',
'ifnot(ld(1),st(1,t));if(lt(t,ld(1)+1),0,if(lt(t,ld(1)+2),(t-(ld(1)+1))/1,if(lt(t,ld(1)+8),1,if(lt(t,ld(1)+9),(1-(t-(ld(1)+8)))/1,0))))', '1', '#000000@0x80', '4'),
('Scrolling Text', 'We have a very important announcement to make.', 'ifnot(ld(1),st(1,t));if(lt(t,ld(1)+1),w+4,w-w/12*mod(t-ld(1),12*(w+tw)/w))', '(h-line_h)*0.9',
'24', '4', '#ffffff', '1.0', '1', '#000000@0x80', '4');
INSERT INTO roles(name) VALUES('admin'), ('user'), ('guest');
INSERT INTO settings(channel_name, preview_url, config_path, extra_extensions)
VALUES('Channel 1', 'http://localhost/live/preview.m3u8',
'/etc/ffplayout/ffplayout.yml', '.jpg,.jpeg,.png');";
sqlx::query(query).bind(secret).execute(&instances).await?;
instances.close().await;
Ok("Database initialized!")
}
pub async fn db_connection() -> Result<Pool<Sqlite>, sqlx::Error> {
let db_path = db_path().unwrap();
let conn = SqlitePool::connect(&db_path).await?;
Ok(conn)
}
pub async fn db_global() -> Result<GlobalSettings, sqlx::Error> {
let conn = db_connection().await?;
let query = "SELECT secret FROM global WHERE id = 1";
let result: GlobalSettings = sqlx::query_as(query).fetch_one(&conn).await?;
conn.close().await;
Ok(result)
}
pub async fn db_get_settings(id: &i64) -> Result<Settings, sqlx::Error> {
let conn = db_connection().await?;
let query = "SELECT * FROM settings WHERE id = $1";
let result: Settings = sqlx::query_as(query).bind(id).fetch_one(&conn).await?;
conn.close().await;
Ok(result)
}
pub async fn db_update_settings(
id: i64,
settings: Settings,
) -> Result<SqliteQueryResult, sqlx::Error> {
let conn = db_connection().await?;
let query = "UPDATE settings SET channel_name = $2, preview_url = $3, config_path = $4, extra_extensions = $5 WHERE id = $1";
let result: SqliteQueryResult = sqlx::query(query)
.bind(id)
.bind(settings.channel_name.clone())
.bind(settings.preview_url.clone())
.bind(settings.config_path.clone())
.bind(settings.extra_extensions.clone())
.execute(&conn)
.await?;
conn.close().await;
Ok(result)
}
pub async fn db_role(id: &i64) -> Result<String, sqlx::Error> {
let conn = db_connection().await?;
let query = "SELECT name FROM roles WHERE id = $1";
let result: Role = sqlx::query_as(query).bind(id).fetch_one(&conn).await?;
conn.close().await;
Ok(result.name)
}
pub async fn db_login(user: &str) -> Result<User, sqlx::Error> {
let conn = db_connection().await?;
let query = "SELECT id, email, username, password, salt, role_id FROM user WHERE username = $1";
let result: User = sqlx::query_as(query).bind(user).fetch_one(&conn).await?;
conn.close().await;
Ok(result)
}
pub async fn db_add_user(user: User) -> Result<SqliteQueryResult, sqlx::Error> {
let conn = db_connection().await?;
let salt = SaltString::generate(&mut OsRng);
let password_hash = Argon2::default()
.hash_password(user.password.clone().as_bytes(), &salt)
.unwrap();
let query =
"INSERT INTO user (email, username, password, salt, role_id) VALUES($1, $2, $3, $4, $5)";
let result = sqlx::query(query)
.bind(user.email)
.bind(user.username)
.bind(password_hash.to_string())
.bind(salt.to_string())
.bind(user.role_id)
.execute(&conn)
.await?;
conn.close().await;
Ok(result)
}
pub async fn db_update_user(id: i64, fields: String) -> Result<SqliteQueryResult, sqlx::Error> {
let conn = db_connection().await?;
let query = format!("UPDATE user SET {fields} WHERE id = $1");
let result: SqliteQueryResult = sqlx::query(&query).bind(id).execute(&conn).await?;
conn.close().await;
Ok(result)
}
pub async fn db_get_presets() -> Result<Vec<TextPreset>, sqlx::Error> {
let conn = db_connection().await?;
let query = "SELECT * FROM presets";
let result: Vec<TextPreset> = sqlx::query_as(query).fetch_all(&conn).await?;
conn.close().await;
Ok(result)
}
pub async fn db_update_preset(
id: &i64,
preset: TextPreset,
) -> Result<SqliteQueryResult, sqlx::Error> {
let conn = db_connection().await?;
let query =
"UPDATE presets SET name = $1, text = $2, x = $3, y = $4, fontsize = $5, line_spacing = $6,
fontcolor = $7, alpha = $8, box = $9, boxcolor = $10, boxborderw = 11 WHERE id = $12";
let result: SqliteQueryResult = sqlx::query(query)
.bind(preset.name)
.bind(preset.text)
.bind(preset.x)
.bind(preset.y)
.bind(preset.fontsize)
.bind(preset.line_spacing)
.bind(preset.fontcolor)
.bind(preset.alpha)
.bind(preset.r#box)
.bind(preset.boxcolor)
.bind(preset.boxborderw)
.bind(id)
.execute(&conn)
.await?;
conn.close().await;
Ok(result)
}
pub async fn db_add_preset(preset: TextPreset) -> Result<SqliteQueryResult, sqlx::Error> {
let conn = db_connection().await?;
let query =
"INSERT INTO presets (name, text, x, y, fontsize, line_spacing, fontcolor, alpha, box, boxcolor, boxborderw)
VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)";
let result: SqliteQueryResult = sqlx::query(query)
.bind(preset.name)
.bind(preset.text)
.bind(preset.x)
.bind(preset.y)
.bind(preset.fontsize)
.bind(preset.line_spacing)
.bind(preset.fontcolor)
.bind(preset.alpha)
.bind(preset.r#box)
.bind(preset.boxcolor)
.bind(preset.boxborderw)
.execute(&conn)
.await?;
conn.close().await;
Ok(result)
}

View File

@ -0,0 +1,145 @@
use std::{error::Error, fs::File, path::Path};
use faccess::PathExt;
use once_cell::sync::OnceCell;
use simplelog::*;
pub mod args_parse;
pub mod auth;
pub mod control;
pub mod errors;
pub mod files;
pub mod handles;
pub mod models;
pub mod playlist;
pub mod routes;
use crate::utils::{
args_parse::Args,
errors::ServiceError,
handles::{db_add_user, db_get_settings, db_global, db_init},
models::{Settings, User},
};
use ffplayout_lib::utils::PlayoutConfig;
#[derive(PartialEq, Clone)]
pub enum Role {
Admin,
User,
Guest,
}
impl Role {
pub fn set_role(role: &str) -> Self {
match role {
"admin" => Role::Admin,
"user" => Role::User,
_ => Role::Guest,
}
}
}
#[derive(Debug, sqlx::FromRow)]
pub struct GlobalSettings {
pub secret: String,
}
impl GlobalSettings {
async fn new() -> Self {
let global_settings = db_global();
match global_settings.await {
Ok(g) => g,
Err(_) => GlobalSettings {
secret: String::new(),
},
}
}
pub fn global() -> &'static GlobalSettings {
INSTANCE.get().expect("Config is not initialized")
}
}
static INSTANCE: OnceCell<GlobalSettings> = OnceCell::new();
pub async fn init_config() {
let config = GlobalSettings::new().await;
INSTANCE.set(config).unwrap();
}
pub fn db_path() -> Result<String, Box<dyn std::error::Error>> {
let sys_path = Path::new("/usr/share/ffplayout");
let mut db_path = String::from("./ffplayout.db");
if sys_path.is_dir() && sys_path.writable() {
db_path = String::from("/usr/share/ffplayout/ffplayout.db");
} else if Path::new("./assets").is_dir() {
db_path = String::from("./assets/ffplayout.db");
}
Ok(db_path)
}
pub async fn run_args(args: Args) -> Result<(), i32> {
if !args.init && args.listen.is_none() && args.username.is_none() {
error!("Wrong number of arguments! Run ffpapi --help for more information.");
return Err(0);
}
if args.init {
if let Err(e) = db_init().await {
panic!("{e}");
};
return Err(0);
}
if let Some(username) = args.username {
if args.email.is_none() || args.password.is_none() {
error!("Email/password missing!");
return Err(1);
}
let user = User {
id: 0,
email: Some(args.email.unwrap()),
username: username.clone(),
password: args.password.unwrap(),
salt: None,
role_id: Some(1),
token: None,
};
if let Err(e) = db_add_user(user).await {
error!("{e}");
return Err(1);
};
info!("Create admin user \"{username}\" done...");
return Err(0);
}
Ok(())
}
pub fn read_playout_config(path: &str) -> Result<PlayoutConfig, Box<dyn Error>> {
let file = File::open(path)?;
let config: PlayoutConfig = serde_yaml::from_reader(file)?;
Ok(config)
}
pub async fn playout_config(channel_id: &i64) -> Result<(PlayoutConfig, Settings), ServiceError> {
if let Ok(settings) = db_get_settings(channel_id).await {
if let Ok(config) = read_playout_config(&settings.config_path.clone()) {
return Ok((config, settings));
}
}
Err(ServiceError::BadRequest(
"Error in getting config!".to_string(),
))
}

View File

@ -0,0 +1,69 @@
use serde::{Deserialize, Serialize};
#[derive(Debug, Deserialize, Serialize, sqlx::FromRow)]
pub struct User {
#[sqlx(default)]
#[serde(skip_deserializing)]
pub id: i64,
#[sqlx(default)]
pub email: Option<String>,
pub username: String,
#[sqlx(default)]
#[serde(skip_serializing, default = "empty_string")]
pub password: String,
#[sqlx(default)]
#[serde(skip_serializing)]
pub salt: Option<String>,
#[sqlx(default)]
#[serde(skip_serializing)]
pub role_id: Option<i64>,
#[sqlx(default)]
pub token: Option<String>,
}
fn empty_string() -> String {
"".to_string()
}
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct LoginUser {
pub id: i64,
pub username: String,
}
impl LoginUser {
pub fn new(id: i64, username: String) -> Self {
Self { id, username }
}
}
#[derive(Debug, Deserialize, Serialize, Clone, sqlx::FromRow)]
pub struct TextPreset {
#[sqlx(default)]
#[serde(skip_deserializing)]
pub id: i64,
#[serde(skip_deserializing)]
pub name: String,
pub text: String,
pub x: String,
pub y: String,
pub fontsize: String,
pub line_spacing: String,
pub fontcolor: String,
pub r#box: String,
pub boxcolor: String,
pub boxborderw: String,
pub alpha: String,
}
#[derive(Debug, Deserialize, Serialize, sqlx::FromRow)]
pub struct Settings {
#[serde(skip_deserializing)]
pub id: i64,
pub channel_name: String,
pub preview_url: String,
pub config_path: String,
pub extra_extensions: String,
#[sqlx(default)]
#[serde(skip_serializing, skip_deserializing)]
pub secret: String,
}

View File

@ -0,0 +1,116 @@
use std::{
fs::{self, File},
io::Error,
path::PathBuf,
};
use simplelog::*;
use crate::utils::{errors::ServiceError, playout_config};
use ffplayout_lib::utils::{generate_playlist as playlist_generator, JsonPlaylist};
fn json_reader(path: &PathBuf) -> Result<JsonPlaylist, Error> {
let f = File::options().read(true).write(false).open(&path)?;
let p = serde_json::from_reader(f)?;
Ok(p)
}
fn json_writer(path: &PathBuf, data: JsonPlaylist) -> Result<(), Error> {
let f = File::options()
.write(true)
.truncate(true)
.create(true)
.open(&path)?;
serde_json::to_writer_pretty(f, &data)?;
Ok(())
}
pub async fn read_playlist(id: i64, date: String) -> Result<JsonPlaylist, ServiceError> {
let (config, _) = playout_config(&id).await?;
let mut playlist_path = PathBuf::from(&config.playlist.path);
let d: Vec<&str> = date.split('-').collect();
playlist_path = playlist_path
.join(d[0])
.join(d[1])
.join(date.clone())
.with_extension("json");
if let Ok(p) = json_reader(&playlist_path) {
return Ok(p);
};
Err(ServiceError::InternalServerError)
}
pub async fn write_playlist(id: i64, json_data: JsonPlaylist) -> Result<String, ServiceError> {
let (config, _) = playout_config(&id).await?;
let date = json_data.date.clone();
let mut playlist_path = PathBuf::from(&config.playlist.path);
let d: Vec<&str> = date.split('-').collect();
playlist_path = playlist_path
.join(d[0])
.join(d[1])
.join(date.clone())
.with_extension("json");
if playlist_path.is_file() {
if let Ok(existing_data) = json_reader(&playlist_path) {
if json_data == existing_data {
return Err(ServiceError::Conflict(format!(
"Playlist from {date}, already exists!"
)));
}
}
}
match json_writer(&playlist_path, json_data) {
Ok(_) => return Ok(format!("Write playlist from {date} success!")),
Err(e) => {
error!("{e}");
}
}
Err(ServiceError::InternalServerError)
}
pub async fn generate_playlist(id: i64, date: String) -> Result<JsonPlaylist, ServiceError> {
let (config, settings) = playout_config(&id).await?;
match playlist_generator(&config, vec![date], Some(settings.channel_name)) {
Ok(playlists) => {
if !playlists.is_empty() {
Ok(playlists[0].clone())
} else {
Err(ServiceError::Conflict(
"Playlist could not be written, possible already exists!".into(),
))
}
}
Err(e) => {
error!("{e}");
Err(ServiceError::InternalServerError)
}
}
}
pub async fn delete_playlist(id: i64, date: &str) -> Result<(), ServiceError> {
let (config, _) = playout_config(&id).await?;
let mut playlist_path = PathBuf::from(&config.playlist.path);
let d: Vec<&str> = date.split('-').collect();
playlist_path = playlist_path
.join(d[0])
.join(d[1])
.join(date)
.with_extension("json");
if playlist_path.is_file() {
if let Err(e) = fs::remove_file(playlist_path) {
error!("{e}");
return Err(ServiceError::InternalServerError);
};
}
Ok(())
}

View File

@ -0,0 +1,467 @@
use std::collections::HashMap;
use actix_multipart::Multipart;
use actix_web::{delete, get, http::StatusCode, patch, post, put, web, HttpResponse, Responder};
use actix_web_grants::{permissions::AuthDetails, proc_macro::has_any_role};
use argon2::{
password_hash::{rand_core::OsRng, PasswordHash, SaltString},
Argon2, PasswordHasher, PasswordVerifier,
};
use serde::Serialize;
use simplelog::*;
use crate::utils::{
auth::{create_jwt, Claims},
control::{control_state, media_info, send_message},
errors::ServiceError,
files::{browser, remove_file_or_folder, rename_file, upload, MoveObject, PathObject},
handles::{
db_add_preset, db_add_user, db_get_presets, db_get_settings, db_login, db_role,
db_update_preset, db_update_settings, db_update_user,
},
models::{LoginUser, Settings, TextPreset, User},
playlist::{delete_playlist, generate_playlist, read_playlist, write_playlist},
read_playout_config, Role,
};
use ffplayout_lib::utils::{JsonPlaylist, PlayoutConfig};
#[derive(Serialize)]
struct ResponseObj<T> {
message: String,
status: i32,
data: Option<T>,
}
/// curl -X GET http://127.0.0.1:8080/api/settings/1 -H "Authorization: Bearer <TOKEN>"
#[get("/settings/{id}")]
#[has_any_role("Role::Admin", "Role::User", type = "Role")]
async fn get_settings(id: web::Path<i64>) -> Result<impl Responder, ServiceError> {
if let Ok(settings) = db_get_settings(&id).await {
return Ok(web::Json(ResponseObj {
message: format!("Settings from {}", settings.channel_name),
status: 200,
data: Some(settings),
}));
}
Err(ServiceError::InternalServerError)
}
/// curl -X PATCH http://127.0.0.1:8080/api/settings/1 -H "Content-Type: application/json" \
/// --data '{"id":1,"channel_name":"Channel 1","preview_url":"http://localhost/live/stream.m3u8", \
/// "config_path":"/etc/ffplayout/ffplayout.yml","extra_extensions":".jpg,.jpeg,.png"}' \
/// -H "Authorization: Bearer <TOKEN>"
#[patch("/settings/{id}")]
#[has_any_role("Role::Admin", type = "Role")]
async fn patch_settings(
id: web::Path<i64>,
data: web::Json<Settings>,
) -> Result<impl Responder, ServiceError> {
if db_update_settings(*id, data.into_inner()).await.is_ok() {
return Ok("Update Success");
};
Err(ServiceError::InternalServerError)
}
/// curl -X GET http://localhost:8080/api/playout/config/1 --header 'Authorization: <TOKEN>'
#[get("/playout/config/{id}")]
#[has_any_role("Role::Admin", "Role::User", type = "Role")]
async fn get_playout_config(
id: web::Path<i64>,
_details: AuthDetails<Role>,
) -> Result<impl Responder, ServiceError> {
if let Ok(settings) = db_get_settings(&id).await {
if let Ok(config) = read_playout_config(&settings.config_path) {
return Ok(web::Json(config));
}
};
Err(ServiceError::InternalServerError)
}
/// curl -X PUT http://localhost:8080/api/playout/config/1 -H "Content-Type: application/json" \
/// --data { <CONFIG DATA> } --header 'Authorization: <TOKEN>'
#[put("/playout/config/{id}")]
#[has_any_role("Role::Admin", type = "Role")]
async fn update_playout_config(
id: web::Path<i64>,
data: web::Json<PlayoutConfig>,
) -> Result<impl Responder, ServiceError> {
if let Ok(settings) = db_get_settings(&id).await {
if let Ok(f) = std::fs::OpenOptions::new()
.write(true)
.truncate(true)
.open(&settings.config_path)
{
serde_yaml::to_writer(f, &data).unwrap();
return Ok("Update playout config success.");
} else {
return Err(ServiceError::InternalServerError);
};
};
Err(ServiceError::InternalServerError)
}
/// curl -X PUT http://localhost:8080/api/presets/ --header 'Content-Type: application/json' \
/// --data '{"email": "<EMAIL>", "password": "<PASS>"}' --header 'Authorization: <TOKEN>'
#[get("/presets/")]
#[has_any_role("Role::Admin", "Role::User", type = "Role")]
async fn get_presets() -> Result<impl Responder, ServiceError> {
if let Ok(presets) = db_get_presets().await {
return Ok(web::Json(presets));
}
Err(ServiceError::InternalServerError)
}
/// curl -X PUT http://localhost:8080/api/presets/1 --header 'Content-Type: application/json' \
/// --data '{"name": "<PRESET NAME>", "text": "TEXT>", "x": "<X>", "y": "<Y>", "fontsize": 24, \
/// "line_spacing": 4, "fontcolor": "#ffffff", "box": 1, "boxcolor": "#000000", "boxborderw": 4, "alpha": 1.0}}' \
/// --header 'Authorization: <TOKEN>'
#[put("/presets/{id}")]
#[has_any_role("Role::Admin", "Role::User", type = "Role")]
async fn update_preset(
id: web::Path<i64>,
data: web::Json<TextPreset>,
) -> Result<impl Responder, ServiceError> {
if db_update_preset(&id, data.into_inner()).await.is_ok() {
return Ok("Update Success");
}
Err(ServiceError::InternalServerError)
}
/// curl -X POST http://localhost:8080/api/presets/ --header 'Content-Type: application/json' \
/// --data '{"name": "<PRESET NAME>", "text": "TEXT>", "x": "<X>", "y": "<Y>", "fontsize": 24, \
/// "line_spacing": 4, "fontcolor": "#ffffff", "box": 1, "boxcolor": "#000000", "boxborderw": 4, "alpha": 1.0}}' \
/// --header 'Authorization: <TOKEN>'
#[post("/presets/")]
#[has_any_role("Role::Admin", "Role::User", type = "Role")]
async fn add_preset(data: web::Json<TextPreset>) -> Result<impl Responder, ServiceError> {
if db_add_preset(data.into_inner()).await.is_ok() {
return Ok("Add preset Success");
}
Err(ServiceError::InternalServerError)
}
/// curl -X PUT http://localhost:8080/api/user/1 --header 'Content-Type: application/json' \
/// --data '{"email": "<EMAIL>", "password": "<PASS>"}' --header 'Authorization: <TOKEN>'
#[put("/user/{id}")]
#[has_any_role("Role::Admin", "Role::User", type = "Role")]
async fn update_user(
id: web::Path<i64>,
user: web::ReqData<LoginUser>,
data: web::Json<User>,
) -> Result<impl Responder, ServiceError> {
if id.into_inner() == user.id {
let mut fields = String::new();
if let Some(email) = data.email.clone() {
fields.push_str(format!("email = '{email}'").as_str());
}
if !data.password.is_empty() {
if !fields.is_empty() {
fields.push_str(", ");
}
let salt = SaltString::generate(&mut OsRng);
let password_hash = Argon2::default()
.hash_password(data.password.clone().as_bytes(), &salt)
.unwrap();
fields.push_str(format!("password = '{}', salt = '{salt}'", password_hash).as_str());
}
if db_update_user(user.id, fields).await.is_ok() {
return Ok("Update Success");
};
return Err(ServiceError::InternalServerError);
}
Err(ServiceError::Unauthorized)
}
/// curl -X POST 'http://localhost:8080/api/user/' --header 'Content-Type: application/json' \
/// -d '{"email": "<EMAIL>", "username": "<USER>", "password": "<PASS>", "role_id": 1}' \
/// --header 'Authorization: Bearer <TOKEN>'
#[post("/user/")]
#[has_any_role("Role::Admin", type = "Role")]
async fn add_user(data: web::Json<User>) -> Result<impl Responder, ServiceError> {
match db_add_user(data.into_inner()).await {
Ok(_) => Ok("Add User Success"),
Err(e) => {
error!("{e}");
Err(ServiceError::InternalServerError)
}
}
}
/// curl -X POST http://127.0.0.1:8080/auth/login/ -H "Content-Type: application/json" \
/// -d '{"username": "<USER>", "password": "<PASS>" }'
#[post("/auth/login/")]
pub async fn login(credentials: web::Json<User>) -> impl Responder {
match db_login(&credentials.username).await {
Ok(mut user) => {
let pass = user.password.clone();
let hash = PasswordHash::new(&pass).unwrap();
user.password = "".into();
user.salt = None;
if Argon2::default()
.verify_password(credentials.password.as_bytes(), &hash)
.is_ok()
{
let role = db_role(&user.role_id.unwrap_or_default())
.await
.unwrap_or_else(|_| "guest".to_string());
let claims = Claims::new(user.id, user.username.clone(), role.clone());
if let Ok(token) = create_jwt(claims) {
user.token = Some(token);
};
info!("user {} login, with role: {role}", credentials.username);
web::Json(ResponseObj {
message: "login correct!".into(),
status: 200,
data: Some(user),
})
.customize()
.with_status(StatusCode::OK)
} else {
error!("Wrong password for {}!", credentials.username);
web::Json(ResponseObj {
message: "Wrong password!".into(),
status: 403,
data: None,
})
.customize()
.with_status(StatusCode::FORBIDDEN)
}
}
Err(e) => {
error!("Login {} failed! {e}", credentials.username);
return web::Json(ResponseObj {
message: format!("Login {} failed!", credentials.username),
status: 400,
data: None,
})
.customize()
.with_status(StatusCode::BAD_REQUEST);
}
}
}
/// ----------------------------------------------------------------------------
/// ffplayout process controlling
///
/// here we communicate with the engine for:
/// - jump to last or next clip
/// - reset playlist state
/// - get infos about current, next, last clip
/// - send text the the engine, for overlaying it (as lower third etc.)
/// ----------------------------------------------------------------------------
/// curl -X POST http://localhost:8080/api/control/1/text/ \
/// --header 'Content-Type: application/json' --header 'Authorization: <TOKEN>' \
/// --data '{"text": "Hello from ffplayout", "x": "(w-text_w)/2", "y": "(h-text_h)/2", \
/// "fontsize": "24", "line_spacing": "4", "fontcolor": "#ffffff", "box": "1", \
/// "boxcolor": "#000000", "boxborderw": "4", "alpha": "1.0"}'
#[post("/control/{id}/text/")]
#[has_any_role("Role::Admin", "Role::User", type = "Role")]
pub async fn send_text_message(
id: web::Path<i64>,
data: web::Json<HashMap<String, String>>,
) -> Result<impl Responder, ServiceError> {
match send_message(*id, data.into_inner()).await {
Ok(res) => return Ok(res.text().await.unwrap_or_else(|_| "Success".into())),
Err(e) => Err(e),
}
}
/// curl -X POST http://localhost:8080/api/control/1/playout/next/
/// --header 'Content-Type: application/json' --header 'Authorization: <TOKEN>'
#[post("/control/{id}/playout/next/")]
#[has_any_role("Role::Admin", "Role::User", type = "Role")]
pub async fn jump_to_next(id: web::Path<i64>) -> Result<impl Responder, ServiceError> {
match control_state(*id, "next".into()).await {
Ok(res) => return Ok(res.text().await.unwrap_or_else(|_| "Success".into())),
Err(e) => Err(e),
}
}
/// curl -X POST http://localhost:8080/api/control/1/playout/back/
/// --header 'Content-Type: application/json' --header 'Authorization: <TOKEN>'
#[post("/control/{id}/playout/back/")]
#[has_any_role("Role::Admin", "Role::User", type = "Role")]
pub async fn jump_to_last(id: web::Path<i64>) -> Result<impl Responder, ServiceError> {
match control_state(*id, "back".into()).await {
Ok(res) => return Ok(res.text().await.unwrap_or_else(|_| "Success".into())),
Err(e) => Err(e),
}
}
/// curl -X POST http://localhost:8080/api/control/1/playout/reset/
/// --header 'Content-Type: application/json' --header 'Authorization: <TOKEN>'
#[post("/control/{id}/playout/reset/")]
#[has_any_role("Role::Admin", "Role::User", type = "Role")]
pub async fn reset_playout(id: web::Path<i64>) -> Result<impl Responder, ServiceError> {
match control_state(*id, "reset".into()).await {
Ok(res) => return Ok(res.text().await.unwrap_or_else(|_| "Success".into())),
Err(e) => Err(e),
}
}
/// curl -X GET http://localhost:8080/api/control/1/media/current/
/// --header 'Content-Type: application/json' --header 'Authorization: <TOKEN>'
#[get("/control/{id}/media/current")]
#[has_any_role("Role::Admin", "Role::User", type = "Role")]
pub async fn media_current(id: web::Path<i64>) -> Result<impl Responder, ServiceError> {
match media_info(*id, "current".into()).await {
Ok(res) => return Ok(res.text().await.unwrap_or_else(|_| "Success".into())),
Err(e) => Err(e),
}
}
/// curl -X GET http://localhost:8080/api/control/1/media/next/
/// --header 'Content-Type: application/json' --header 'Authorization: <TOKEN>'
#[get("/control/{id}/media/next")]
#[has_any_role("Role::Admin", "Role::User", type = "Role")]
pub async fn media_next(id: web::Path<i64>) -> Result<impl Responder, ServiceError> {
match media_info(*id, "next".into()).await {
Ok(res) => return Ok(res.text().await.unwrap_or_else(|_| "Success".into())),
Err(e) => Err(e),
}
}
/// curl -X GET http://localhost:8080/api/control/1/media/last/
/// --header 'Content-Type: application/json' --header 'Authorization: <TOKEN>'
#[get("/control/{id}/media/last")]
#[has_any_role("Role::Admin", "Role::User", type = "Role")]
pub async fn media_last(id: web::Path<i64>) -> Result<impl Responder, ServiceError> {
match media_info(*id, "last".into()).await {
Ok(res) => return Ok(res.text().await.unwrap_or_else(|_| "Success".into())),
Err(e) => Err(e),
}
}
/// ----------------------------------------------------------------------------
/// ffplayout playlist operations
///
/// ----------------------------------------------------------------------------
/// curl -X GET http://localhost:8080/api/playlist/1/2022-06-20
/// --header 'Content-Type: application/json' --header 'Authorization: <TOKEN>'
#[get("/playlist/{id}/{date}")]
#[has_any_role("Role::Admin", "Role::User", type = "Role")]
pub async fn get_playlist(
params: web::Path<(i64, String)>,
) -> Result<impl Responder, ServiceError> {
match read_playlist(params.0, params.1.clone()).await {
Ok(playlist) => Ok(web::Json(playlist)),
Err(e) => Err(e),
}
}
/// curl -X POST http://localhost:8080/api/playlist/1/
/// --header 'Content-Type: application/json' --header 'Authorization: <TOKEN>'
/// -- data "{<JSON playlist data>}"
#[post("/playlist/{id}/")]
#[has_any_role("Role::Admin", "Role::User", type = "Role")]
pub async fn save_playlist(
id: web::Path<i64>,
data: web::Json<JsonPlaylist>,
) -> Result<impl Responder, ServiceError> {
match write_playlist(*id, data.into_inner()).await {
Ok(res) => Ok(res),
Err(e) => Err(e),
}
}
/// curl -X GET http://localhost:8080/api/playlist/1/generate/2022-06-20
/// --header 'Content-Type: application/json' --header 'Authorization: <TOKEN>'
#[get("/playlist/{id}/generate/{date}")]
#[has_any_role("Role::Admin", "Role::User", type = "Role")]
pub async fn gen_playlist(
params: web::Path<(i64, String)>,
) -> Result<impl Responder, ServiceError> {
match generate_playlist(params.0, params.1.clone()).await {
Ok(playlist) => Ok(web::Json(playlist)),
Err(e) => Err(e),
}
}
/// curl -X DELETE http://localhost:8080/api/playlist/1/2022-06-20
/// --header 'Content-Type: application/json' --header 'Authorization: <TOKEN>'
#[delete("/playlist/{id}/{date}")]
#[has_any_role("Role::Admin", "Role::User", type = "Role")]
pub async fn del_playlist(
params: web::Path<(i64, String)>,
) -> Result<impl Responder, ServiceError> {
match delete_playlist(params.0, &params.1).await {
Ok(_) => Ok(format!("Delete playlist from {} success!", params.1)),
Err(e) => Err(e),
}
}
/// ----------------------------------------------------------------------------
/// file operations
///
/// ----------------------------------------------------------------------------
/// curl -X GET http://localhost:8080/api/file/1/browse/
/// --header 'Content-Type: application/json' --header 'Authorization: <TOKEN>'
#[post("/file/{id}/browse/")]
#[has_any_role("Role::Admin", "Role::User", type = "Role")]
pub async fn file_browser(
id: web::Path<i64>,
data: web::Json<PathObject>,
) -> Result<impl Responder, ServiceError> {
match browser(*id, &data.into_inner()).await {
Ok(obj) => Ok(web::Json(obj)),
Err(e) => Err(e),
}
}
/// curl -X POST http://localhost:8080/api/file/1/move/
/// --header 'Content-Type: application/json' --header 'Authorization: <TOKEN>'
/// -d '{"source": "<SOURCE>", "target": "<TARGET>"}'
#[post("/file/{id}/move/")]
#[has_any_role("Role::Admin", "Role::User", type = "Role")]
pub async fn move_rename(
id: web::Path<i64>,
data: web::Json<MoveObject>,
) -> Result<impl Responder, ServiceError> {
match rename_file(*id, &data.into_inner()).await {
Ok(obj) => Ok(web::Json(obj)),
Err(e) => Err(e),
}
}
/// curl -X DELETE http://localhost:8080/api/file/1/remove/
/// --header 'Content-Type: application/json' --header 'Authorization: <TOKEN>'
/// -d '{"source": "<SOURCE>", "target": ""}'
#[delete("/file/{id}/remove/")]
#[has_any_role("Role::Admin", "Role::User", type = "Role")]
pub async fn remove(
id: web::Path<i64>,
data: web::Json<PathObject>,
) -> Result<impl Responder, ServiceError> {
match remove_file_or_folder(*id, &data.into_inner().source).await {
Ok(obj) => Ok(web::Json(obj)),
Err(e) => Err(e),
}
}
#[post("/file/{id}/upload/")]
#[has_any_role("Role::Admin", "Role::User", type = "Role")]
async fn save_file(id: web::Path<i64>, payload: Multipart) -> Result<HttpResponse, ServiceError> {
upload(*id, payload).await
}

View File

@ -0,0 +1,14 @@
[Unit]
Description=Rest API for ffplayout
After=network.target remote-fs.target
[Service]
ExecStart= /usr/bin/ffpapi
ExecReload=/bin/kill -1 $MAINPID
Restart=always
RestartSec=1
User=www-data
Group=www-data
[Install]
WantedBy=multi-user.target

View File

@ -0,0 +1,79 @@
[package]
name = "ffplayout-engine"
description = "24/7 playout based on rust and ffmpeg"
license = "GPL-3.0"
authors = ["Jonathan Baecker jonbae77@gmail.com"]
readme = "README.md"
version = "0.9.9"
edition = "2021"
[dependencies]
ffplayout-lib = { path = "../lib" }
chrono = { git = "https://github.com/sbrocket/chrono", branch = "parse-error-kind-public" }
clap = { version = "3.2", features = ["derive"] }
crossbeam-channel = "0.5"
faccess = "0.2"
ffprobe = "0.3"
file-rotate = { git = "https://github.com/Ploppz/file-rotate.git", branch = "timestamp-parse-fix" }
futures = "0.3"
jsonrpc-http-server = "18.0"
lettre = "0.10.0-rc.7"
log = "0.4"
notify = "4.0"
rand = "0.8"
regex = "1"
reqwest = { version = "0.11", features = ["blocking", "json"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
serde_yaml = "0.8"
shlex = "1.1"
simplelog = { version = "^0.12", features = ["paris"] }
time = { version = "0.3", features = ["formatting", "macros"] }
walkdir = "2"
zeromq = { git = "https://github.com/zeromq/zmq.rs.git", default-features = false, features = [
"async-std-runtime",
"tcp-transport"
] }
[target.x86_64-unknown-linux-musl.dependencies]
openssl = { version = "0.10", features = ["vendored"] }
[[bin]]
name = "ffplayout"
path = "src/main.rs"
# DEBIAN DEB PACKAGE
[package.metadata.deb]
name = "ffplayout-engine"
priority = "optional"
section = "net"
license-file = ["../LICENSE", "0"]
depends = ""
suggests = "ffmpeg"
copyright = "Copyright (c) 2022, Jonathan Baecker. All rights reserved."
conf-files = ["/etc/ffplayout/ffplayout.yml"]
assets = [
[
"../target/x86_64-unknown-linux-musl/release/ffplayout",
"/usr/bin/ffplayout",
"755"
],
["../assets/ffplayout.yml", "/etc/ffplayout/ffplayout.yml", "644"],
["../assets/logo.png", "/usr/share/ffplayout/logo.png", "644"],
["../README.md", "/usr/share/doc/ffplayout/README", "644"],
]
maintainer-scripts = "debian/"
systemd-units = { enable = false, unit-scripts = "unit" }
# REHL RPM PACKAGE
[package.metadata.generate-rpm]
name = "ffplayout-engine"
license = "GPL-3.0"
assets = [
{ source = "../target/x86_64-unknown-linux-musl/release/ffplayout", dest = "/usr/bin/ffplayout", mode = "755" },
{ source = "../assets/ffplayout.yml", dest = "/etc/ffplayout/ffplayout.yml", mode = "644", config = true },
{ source = "unit/ffplayout.service", dest = "/lib/systemd/system/ffplayout.service", mode = "644" },
{ source = "../README.md", dest = "/usr/share/doc/ffplayout/README", mode = "644", doc = true },
{ source = "../LICENSE", dest = "/usr/share/doc/ffplayout/LICENSE", mode = "644" },
{ source = "../assets/logo.png", dest = "/usr/share/ffplayout/logo.png", mode = "644" },
]

View File

@ -0,0 +1,2 @@
**ffplayout-engine**
================

View File

@ -0,0 +1,76 @@
use std::{
path::Path,
sync::{
atomic::{AtomicBool, Ordering},
mpsc::channel,
{Arc, Mutex},
},
thread::sleep,
time::Duration,
};
use notify::{
DebouncedEvent::{Create, Remove, Rename},
{watcher, RecursiveMode, Watcher},
};
use simplelog::*;
use ffplayout_lib::utils::{Media, PlayoutConfig};
/// Create a watcher, which monitor file changes.
/// When a change is register, update the current file list.
/// This makes it possible, to play infinitely and and always new files to it.
pub fn watchman(
config: PlayoutConfig,
is_terminated: Arc<AtomicBool>,
sources: Arc<Mutex<Vec<Media>>>,
) {
let (tx, rx) = channel();
let path = config.storage.path;
if !Path::new(&path).exists() {
error!("Folder path not exists: '{path}'");
panic!("Folder path not exists: '{path}'");
}
let mut watcher = watcher(tx, Duration::from_secs(1)).unwrap();
watcher.watch(path, RecursiveMode::Recursive).unwrap();
while !is_terminated.load(Ordering::SeqCst) {
if let Ok(res) = rx.try_recv() {
match res {
Create(new_path) => {
let index = sources.lock().unwrap().len();
let media = Media::new(index, new_path.display().to_string(), false);
sources.lock().unwrap().push(media);
info!("Create new file: <b><magenta>{new_path:?}</></b>");
}
Remove(old_path) => {
sources
.lock()
.unwrap()
.retain(|x| x.source != old_path.display().to_string());
info!("Remove file: <b><magenta>{old_path:?}</></b>");
}
Rename(old_path, new_path) => {
let index = sources
.lock()
.unwrap()
.iter()
.position(|x| *x.source == old_path.display().to_string())
.unwrap();
let media = Media::new(index, new_path.display().to_string(), false);
sources.lock().unwrap()[index] = media;
info!("Rename file: <b><magenta>{old_path:?}</></b> to <b><magenta>{new_path:?}</></b>");
}
_ => (),
}
}
sleep(Duration::from_secs(5));
}
}

View File

@ -8,9 +8,9 @@ use std::{
use crossbeam_channel::Sender;
use simplelog::*;
use crate::filter::ingest_filter::filter_cmd;
use crate::utils::{format_log_line, GlobalConfig, Ingest, ProcessControl};
use crate::vec_strings;
use ffplayout_lib::filter::ingest_filter::filter_cmd;
use ffplayout_lib::utils::{format_log_line, Ingest, PlayoutConfig, ProcessControl};
use ffplayout_lib::vec_strings;
pub fn log_line(line: String, level: &str) {
if line.contains("[info]") && level.to_lowercase() == "info" {
@ -55,6 +55,10 @@ fn server_monitor(
);
}
if line.contains("Address already in use") {
proc_ctl.kill_all();
}
log_line(line, level);
}
@ -65,7 +69,7 @@ fn server_monitor(
///
/// Start ffmpeg in listen mode, and wait for input.
pub fn ingest_server(
config: GlobalConfig,
config: PlayoutConfig,
ingest_sender: Sender<(usize, [u8; 65088])>,
mut proc_control: ProcessControl,
) -> Result<(), Error> {

View File

@ -9,19 +9,21 @@ use std::{
use simplelog::*;
use crate::utils::{GlobalConfig, Media, PlayoutStatus};
use ffplayout_lib::utils::{Media, PlayoutConfig, PlayoutStatus};
pub mod folder;
pub mod ingest;
pub mod playlist;
pub use folder::{watchman, FolderSource};
pub use folder::watchman;
pub use ingest::ingest_server;
pub use playlist::CurrentProgram;
use ffplayout_lib::utils::folder::FolderSource;
/// Create a source iterator from playlist, or from folder.
pub fn source_generator(
config: GlobalConfig,
config: PlayoutConfig,
current_list: Arc<Mutex<Vec<Media>>>,
index: Arc<AtomicUsize>,
playout_stat: PlayoutStatus,

View File

@ -10,9 +10,9 @@ use std::{
use serde_json::json;
use simplelog::*;
use crate::utils::{
use ffplayout_lib::utils::{
check_sync, gen_dummy, get_delta, get_sec, is_close, is_remote, json_serializer::read_json,
modified_time, seek_and_length, valid_source, GlobalConfig, Media, PlayoutStatus, DUMMY_LEN,
modified_time, seek_and_length, valid_source, Media, PlayoutConfig, PlayoutStatus, DUMMY_LEN,
};
/// Struct for current playlist.
@ -20,7 +20,7 @@ use crate::utils::{
/// Here we prepare the init clip and build a iterator where we pull our clips.
#[derive(Debug)]
pub struct CurrentProgram {
config: GlobalConfig,
config: PlayoutConfig,
start_sec: f64,
json_mod: Option<String>,
json_path: Option<String>,
@ -34,7 +34,7 @@ pub struct CurrentProgram {
impl CurrentProgram {
pub fn new(
config: &GlobalConfig,
config: &PlayoutConfig,
playout_stat: PlayoutStatus,
is_terminated: Arc<AtomicBool>,
current_list: Arc<Mutex<Vec<Media>>>,
@ -56,7 +56,9 @@ impl CurrentProgram {
});
let json: String = serde_json::to_string(&data).expect("Serialize status data failed");
fs::write(config.general.stat_file.clone(), &json).expect("Unable to write file");
if let Err(e) = fs::write(config.general.stat_file.clone(), &json) {
error!("Unable to write status file: {e}");
};
}
Self {
@ -171,8 +173,10 @@ impl CurrentProgram {
*self.playout_stat.time_shift.lock().unwrap() = 0.0;
let status_data: String =
serde_json::to_string(&data).expect("Serialize status data failed");
fs::write(self.config.general.stat_file.clone(), &status_data)
.expect("Unable to write file");
if let Err(e) = fs::write(self.config.general.stat_file.clone(), &status_data) {
error!("Unable to write status file: {e}");
};
self.json_path = json.current_file.clone();
self.json_mod = json.modified;
@ -191,15 +195,13 @@ impl CurrentProgram {
let index = self.index.load(Ordering::SeqCst);
let current_list = self.nodes.lock().unwrap();
if index + 1 < current_list.len()
&& &current_list[index + 1].category.clone().unwrap_or_default() == "advertisement"
{
if index + 1 < current_list.len() && &current_list[index + 1].category == "advertisement" {
self.current_node.next_ad = Some(true);
}
if index > 0
&& index < current_list.len()
&& &current_list[index - 1].category.clone().unwrap_or_default() == "advertisement"
&& &current_list[index - 1].category == "advertisement"
{
self.current_node.last_ad = Some(true);
}
@ -390,7 +392,7 @@ impl Iterator for CurrentProgram {
/// - return clip only if we are in 24 hours time range
fn timed_source(
node: Media,
config: &GlobalConfig,
config: &PlayoutConfig,
last: bool,
playout_stat: &PlayoutStatus,
) -> Media {
@ -440,7 +442,7 @@ fn timed_source(
}
/// Generate the source CMD, or when clip not exist, get a dummy.
fn gen_source(config: &GlobalConfig, mut node: Media) -> Media {
fn gen_source(config: &PlayoutConfig, mut node: Media) -> Media {
if valid_source(&node.source) {
node.add_probe();
node.cmd = Some(seek_and_length(
@ -470,7 +472,7 @@ fn gen_source(config: &GlobalConfig, mut node: Media) -> Media {
/// Handle init clip, but this clip can be the last one in playlist,
/// this we have to figure out and calculate the right length.
fn handle_list_init(config: &GlobalConfig, mut node: Media) -> Media {
fn handle_list_init(config: &PlayoutConfig, mut node: Media) -> Media {
debug!("Playlist init");
let (_, total_delta) = get_delta(config, &node.begin.unwrap());
let mut out = node.out;

View File

@ -10,13 +10,23 @@ use serde::{Deserialize, Serialize};
use serde_json::json;
use simplelog::*;
use ffplayout_engine::{
pub mod input;
pub mod output;
pub mod rpc;
// #[cfg(test)]
// mod tests;
pub mod utils;
use utils::{arg_parse::get_args, get_config};
use crate::{
output::{player, write_hls},
rpc::json_rpc_server,
utils::{
generate_playlist, init_logging, send_mail, validate_ffmpeg, GlobalConfig, PlayerControl,
PlayoutStatus, ProcessControl,
},
};
use ffplayout_lib::utils::{
generate_playlist, init_logging, send_mail, validate_ffmpeg, PlayerControl, PlayoutStatus,
ProcessControl,
};
#[derive(Serialize, Deserialize)]
@ -39,7 +49,9 @@ fn status_file(stat_file: &str, playout_stat: &PlayoutStatus) {
});
let json: String = serde_json::to_string(&data).expect("Serialize status data failed");
fs::write(stat_file, &json).expect("Unable to write file");
if let Err(e) = fs::write(stat_file, &json) {
error!("Unable to write status file: {e}");
};
} else {
let stat_file = File::options()
.read(true)
@ -56,7 +68,8 @@ fn status_file(stat_file: &str, playout_stat: &PlayoutStatus) {
}
fn main() {
let config = GlobalConfig::new();
let args = get_args();
let config = get_config(args);
let config_clone = config.clone();
let play_control = PlayerControl::new();
let playout_stat = PlayoutStatus::new();
@ -67,14 +80,17 @@ fn main() {
let proc_ctl2 = proc_control.clone();
let messages = Arc::new(Mutex::new(Vec::new()));
let logging = init_logging(&config, proc_ctl1, messages.clone());
let logging = init_logging(&config, Some(proc_ctl1), Some(messages.clone()));
CombinedLogger::init(logging).unwrap();
validate_ffmpeg(&config);
if let Some(range) = config.general.generate.clone() {
// run a simple playlist generator and save them to disk
generate_playlist(&config, range);
if let Err(e) = generate_playlist(&config, range, None) {
error!("{e}");
exit(1);
};
exit(0);
}

View File

@ -2,29 +2,31 @@ use std::process::{self, Command, Stdio};
use simplelog::*;
use crate::filter::v_drawtext;
use crate::utils::{GlobalConfig, Media};
use crate::vec_strings;
use ffplayout_lib::filter::v_drawtext;
use ffplayout_lib::utils::{Media, PlayoutConfig};
use ffplayout_lib::vec_strings;
/// Desktop Output
///
/// Instead of streaming, we run a ffplay instance and play on desktop.
pub fn output(config: &GlobalConfig, log_format: &str) -> process::Child {
pub fn output(config: &PlayoutConfig, log_format: &str) -> process::Child {
let mut enc_filter: Vec<String> = vec![];
let mut enc_cmd = vec_strings!["-hide_banner", "-nostats", "-v", log_format, "-i", "pipe:0"];
if config.text.add_text && !config.text.over_pre {
info!(
"Using drawtext filter, listening on address: <yellow>{}</>",
config.text.bind_address
);
if config.text.add_text && !config.text.text_from_filename {
if let Some(socket) = config.text.bind_address.clone() {
debug!(
"Using drawtext filter, listening on address: <yellow>{}</>",
socket
);
let mut filter: String = "null,".to_string();
filter.push_str(
v_drawtext::filter_node(config, &mut Media::new(0, String::new(), false)).as_str(),
);
enc_filter = vec!["-vf".to_string(), filter];
let mut filter: String = "null,".to_string();
filter.push_str(
v_drawtext::filter_node(config, &Media::new(0, String::new(), false)).as_str(),
);
enc_filter = vec!["-vf".to_string(), filter];
}
}
enc_cmd.append(&mut enc_filter);

View File

@ -13,7 +13,7 @@ out:
-hls_time 6
-hls_list_size 600
-hls_flags append_list+delete_segments+omit_endlist+program_date_time
-hls_segment_filename /var/www/html/live/stream-%09d.ts /var/www/html/live/stream.m3u8
-hls_segment_filename /var/www/html/live/stream-%d.ts /var/www/html/live/stream.m3u8
*/
@ -27,17 +27,17 @@ use std::{
use simplelog::*;
use crate::filter::ingest_filter::filter_cmd;
use crate::input::{ingest::log_line, source_generator};
use crate::utils::{
prepare_output_cmd, sec_to_time, stderr_reader, Decoder, GlobalConfig, Ingest, PlayerControl,
use ffplayout_lib::filter::ingest_filter::filter_cmd;
use ffplayout_lib::utils::{
prepare_output_cmd, sec_to_time, stderr_reader, Decoder, Ingest, PlayerControl, PlayoutConfig,
PlayoutStatus, ProcessControl,
};
use crate::vec_strings;
use ffplayout_lib::vec_strings;
/// Ingest Server for HLS
fn ingest_to_hls_server(
config: GlobalConfig,
config: PlayoutConfig,
playout_stat: PlayoutStatus,
mut proc_control: ProcessControl,
) -> Result<(), Error> {
@ -131,7 +131,7 @@ fn ingest_to_hls_server(
///
/// Write with single ffmpeg instance directly to a HLS playlist.
pub fn write_hls(
config: &GlobalConfig,
config: &PlayoutConfig,
play_control: PlayerControl,
playout_stat: PlayoutStatus,
mut proc_control: ProcessControl,

View File

@ -16,10 +16,11 @@ mod stream;
pub use hls::write_hls;
use crate::input::{ingest_server, source_generator};
use crate::utils::{
sec_to_time, stderr_reader, Decoder, GlobalConfig, PlayerControl, PlayoutStatus, ProcessControl,
use ffplayout_lib::utils::{
sec_to_time, stderr_reader, Decoder, PlayerControl, PlayoutConfig, PlayoutStatus,
ProcessControl,
};
use crate::vec_strings;
use ffplayout_lib::vec_strings;
/// Player
///
@ -31,7 +32,7 @@ use crate::vec_strings;
/// When a live ingest arrive, it stops the current playing and switch to the live source.
/// When ingest stops, it switch back to playlist/folder mode.
pub fn player(
config: &GlobalConfig,
config: &PlayoutConfig,
play_control: PlayerControl,
playout_stat: PlayoutStatus,
mut proc_control: ProcessControl,

View File

@ -2,14 +2,14 @@ use std::process::{self, Command, Stdio};
use simplelog::*;
use crate::filter::v_drawtext;
use crate::utils::{prepare_output_cmd, GlobalConfig, Media};
use crate::vec_strings;
use ffplayout_lib::filter::v_drawtext;
use ffplayout_lib::utils::{prepare_output_cmd, Media, PlayoutConfig};
use ffplayout_lib::vec_strings;
/// Streaming Output
///
/// Prepare the ffmpeg command for streaming output
pub fn output(config: &GlobalConfig, log_format: &str) -> process::Child {
pub fn output(config: &PlayoutConfig, log_format: &str) -> process::Child {
let mut enc_cmd = vec![];
let mut enc_filter = vec![];
let mut preview_cmd = config.out.preview_cmd.as_ref().unwrap().clone();
@ -25,19 +25,21 @@ pub fn output(config: &GlobalConfig, log_format: &str) -> process::Child {
"pipe:0"
];
if config.text.add_text && !config.text.over_pre {
info!(
"Using drawtext filter, listening on address: <yellow>{}</>",
config.text.bind_address
);
if config.text.add_text && !config.text.text_from_filename {
if let Some(socket) = config.text.bind_address.clone() {
debug!(
"Using drawtext filter, listening on address: <yellow>{}</>",
socket
);
let mut filter = "[0:v]null,".to_string();
let mut filter = "[0:v]null,".to_string();
filter.push_str(
v_drawtext::filter_node(config, &mut Media::new(0, String::new(), false)).as_str(),
);
filter.push_str(
v_drawtext::filter_node(config, &Media::new(0, String::new(), false)).as_str(),
);
enc_filter = vec!["-filter_complex".to_string(), filter];
enc_filter = vec!["-filter_complex".to_string(), filter];
}
}
if config.out.preview {

View File

@ -1,5 +1,8 @@
use futures::executor;
use std::sync::atomic::Ordering;
mod zmq_cmd;
use jsonrpc_http_server::{
hyper,
jsonrpc_core::{IoHandler, Params, Value},
@ -8,11 +11,13 @@ use jsonrpc_http_server::{
use serde_json::{json, Map};
use simplelog::*;
use crate::utils::{
get_delta, get_sec, sec_to_time, write_status, GlobalConfig, Media, PlayerControl,
PlayoutStatus, ProcessControl,
use ffplayout_lib::utils::{
get_delta, get_filter_from_json, get_sec, sec_to_time, write_status, Media, PlayerControl,
PlayoutConfig, PlayoutStatus, ProcessControl,
};
use zmq_cmd::zmq_send;
/// map media struct to json object
fn get_media_map(media: Media) -> Value {
json!({
@ -25,7 +30,7 @@ fn get_media_map(media: Media) -> Value {
}
/// prepare json object for response
fn get_data_map(config: &GlobalConfig, media: Media) -> Map<String, Value> {
fn get_data_map(config: &PlayoutConfig, media: Media) -> Map<String, Value> {
let mut data_map = Map::new();
let begin = media.begin.unwrap_or(0.0);
@ -56,7 +61,7 @@ fn get_data_map(config: &GlobalConfig, media: Media) -> Map<String, Value> {
/// - get last clip
/// - reset player state to original clip
pub fn json_rpc_server(
config: GlobalConfig,
config: PlayoutConfig,
play_control: PlayerControl,
playout_stat: PlayoutStatus,
proc_control: ProcessControl,
@ -73,6 +78,24 @@ pub fn json_rpc_server(
let mut date = playout_stat.date.lock().unwrap();
let current_list = play_control.current_list.lock().unwrap();
// forward text message to ffmpeg
if map.contains_key("control")
&& &map["control"] == "text"
&& map.contains_key("message")
{
let mut filter = get_filter_from_json(map["message"].to_string());
let socket = config.text.bind_address.clone();
if !filter.is_empty() && config.text.bind_address.is_some() {
filter = format!("Parsed_drawtext_2 reinit {filter}");
if let Ok(reply) = executor::block_on(zmq_send(&filter, &socket.unwrap())) {
return Ok(Value::String(reply));
};
}
return Ok(Value::String("Last clip can not be skipped".to_string()));
}
// get next clip
if map.contains_key("control") && &map["control"] == "next" {
let index = play_control.index.load(Ordering::SeqCst);

View File

@ -0,0 +1,14 @@
use std::error::Error;
use zeromq::Socket;
use zeromq::{SocketRecv, SocketSend, ZmqMessage};
pub async fn zmq_send(msg: &str, socket_addr: &str) -> Result<String, Box<dyn Error>> {
let mut socket = zeromq::ReqSocket::new();
socket.connect(&format!("tcp://{socket_addr}")).await?;
socket.send(msg.into()).await?;
let repl: ZmqMessage = socket.recv().await?;
let response = String::from_utf8(repl.into_vec()[0].to_vec())?;
Ok(response)
}

View File

@ -1,15 +1,12 @@
use std::{
sync::{Arc, Mutex},
thread::{self, sleep},
time::Duration,
};
mod utils;
#[cfg(test)]
use crate::output::player;
#[cfg(test)]
use crate::utils::*;
use ffplayout_lib::utils::*;
#[cfg(test)]
use simplelog::*;
@ -22,26 +19,24 @@ fn timed_kill(sec: u64, mut proc_ctl: ProcessControl) {
#[test]
#[ignore]
fn playlist_change_at_midnight() {
let mut config = GlobalConfig::new();
let mut config = PlayoutConfig::new(None);
config.mail.recipient = "".into();
config.processing.mode = "playlist".into();
config.playlist.day_start = "00:00:00".into();
config.playlist.length = "24:00:00".into();
config.logging.log_to_file = false;
let messages = Arc::new(Mutex::new(Vec::new()));
let play_control = PlayerControl::new();
let playout_stat = PlayoutStatus::new();
let proc_control = ProcessControl::new();
let proc_ctl = proc_control.clone();
let proc_ctl2 = proc_control.clone();
let logging = init_logging(&config, proc_ctl, messages);
let logging = init_logging(&config, None, None);
CombinedLogger::init(logging).unwrap();
mock_time::set_mock_time("2022-05-09T23:59:45");
thread::spawn(move || timed_kill(30, proc_ctl2));
thread::spawn(move || timed_kill(30, proc_ctl));
player(&config, play_control, playout_stat, proc_control);
}
@ -49,26 +44,24 @@ fn playlist_change_at_midnight() {
#[test]
#[ignore]
fn playlist_change_at_six() {
let mut config = GlobalConfig::new();
let mut config = PlayoutConfig::new(None);
config.mail.recipient = "".into();
config.processing.mode = "playlist".into();
config.playlist.day_start = "06:00:00".into();
config.playlist.length = "24:00:00".into();
config.logging.log_to_file = false;
let messages = Arc::new(Mutex::new(Vec::new()));
let play_control = PlayerControl::new();
let playout_stat = PlayoutStatus::new();
let proc_control = ProcessControl::new();
let proc_ctl = proc_control.clone();
let proc_ctl2 = proc_control.clone();
let logging = init_logging(&config, proc_ctl, messages);
let logging = init_logging(&config, None, None);
CombinedLogger::init(logging).unwrap();
mock_time::set_mock_time("2022-05-09T05:59:45");
thread::spawn(move || timed_kill(30, proc_ctl2));
thread::spawn(move || timed_kill(30, proc_ctl));
player(&config, play_control, playout_stat, proc_control);
}

View File

@ -1,6 +1,6 @@
use clap::Parser;
#[derive(Parser, Debug)]
#[derive(Parser, Debug, Clone)]
#[clap(version,
about = "ffplayout, Rust based 24/7 playout solution.",
override_usage = "Run without any command to use config file only, or with commands to override parameters:\n\n ffplayout [OPTIONS]",

View File

@ -0,0 +1,64 @@
use std::path::Path;
pub mod arg_parse;
pub use arg_parse::Args;
use ffplayout_lib::utils::{time_to_sec, PlayoutConfig};
pub fn get_config(args: Args) -> PlayoutConfig {
let mut config = PlayoutConfig::new(args.config);
if let Some(gen) = args.generate {
config.general.generate = Some(gen);
}
if let Some(log_path) = args.log {
if Path::new(&log_path).is_dir() {
config.logging.log_to_file = true;
}
config.logging.log_path = log_path;
}
if let Some(playlist) = args.playlist {
config.playlist.path = playlist;
}
if let Some(mode) = args.play_mode {
config.processing.mode = mode;
}
if let Some(folder) = args.folder {
config.storage.path = folder;
config.processing.mode = "folder".into();
}
if let Some(start) = args.start {
config.playlist.day_start = start.clone();
config.playlist.start_sec = Some(time_to_sec(&start));
}
if let Some(length) = args.length {
config.playlist.length = length.clone();
if length.contains(':') {
config.playlist.length_sec = Some(time_to_sec(&length));
} else {
config.playlist.length_sec = Some(86400.0);
}
}
if args.infinit {
config.playlist.infinit = args.infinit;
}
if let Some(output) = args.output {
config.out.mode = output;
}
if let Some(volume) = args.volume {
config.processing.volume = volume;
}
config
}
// Read command line arguments, and override the config with them.

34
lib/Cargo.toml Normal file
View File

@ -0,0 +1,34 @@
[package]
name = "ffplayout-lib"
description = "Library for ffplayout"
license = "GPL-3.0"
authors = ["Jonathan Baecker jonbae77@gmail.com"]
readme = "README.md"
version = "0.9.9"
edition = "2021"
[dependencies]
chrono = { git = "https://github.com/sbrocket/chrono", branch = "parse-error-kind-public" }
crossbeam-channel = "0.5"
faccess = "0.2"
ffprobe = "0.3"
file-rotate = { git = "https://github.com/Ploppz/file-rotate.git", branch = "timestamp-parse-fix" }
futures = "0.3"
jsonrpc-http-server = "18.0"
lettre = "0.10.0-rc.7"
log = "0.4"
notify = "4.0"
once_cell = "1.10"
rand = "0.8"
regex = "1"
reqwest = { version = "0.11", features = ["blocking", "json"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
serde_yaml = "0.8"
shlex = "1.1"
simplelog = { version = "^0.12", features = ["paris"] }
time = { version = "0.3", features = ["formatting", "macros"] }
walkdir = "2"
[target.x86_64-unknown-linux-musl.dependencies]
openssl = { version = "0.10", features = ["vendored"] }

View File

@ -1,9 +1,9 @@
use crate::utils::GlobalConfig;
use crate::utils::PlayoutConfig;
/// Loudnorm Audio Filter
///
/// Add loudness normalization.
pub fn filter_node(config: &GlobalConfig) -> String {
pub fn filter_node(config: &PlayoutConfig) -> String {
format!(
"loudnorm=I={}:TP={}:LRA={}",
config.processing.loud_i, config.processing.loud_tp, config.processing.loud_lra

View File

@ -1,10 +1,10 @@
use crate::filter::{a_loudnorm, v_overlay};
use crate::utils::GlobalConfig;
use crate::utils::PlayoutConfig;
/// Audio Filter
///
/// If needed we add audio filters to the server instance.
fn audio_filter(config: &GlobalConfig) -> String {
fn audio_filter(config: &PlayoutConfig) -> String {
let mut audio_chain = ";[0:a]afade=in:st=0:d=0.5".to_string();
if config.processing.loudnorm_ingest {
@ -22,7 +22,7 @@ fn audio_filter(config: &GlobalConfig) -> String {
}
/// Create filter nodes for ingest live stream.
pub fn filter_cmd(config: &GlobalConfig) -> Vec<String> {
pub fn filter_cmd(config: &PlayoutConfig) -> Vec<String> {
let mut filter = format!(
"[0:v]fps={},scale={}:{},setdar=dar={},fade=in:st=0:d=0.5",
config.processing.fps,

View File

@ -7,7 +7,7 @@ pub mod ingest_filter;
pub mod v_drawtext;
pub mod v_overlay;
use crate::utils::{get_delta, is_close, GlobalConfig, Media};
use crate::utils::{get_delta, is_close, Media, PlayoutConfig};
#[derive(Debug, Clone)]
struct Filters {
@ -72,7 +72,7 @@ fn deinterlace(field_order: &Option<String>, chain: &mut Filters) {
}
}
fn pad(aspect: f64, chain: &mut Filters, config: &GlobalConfig) {
fn pad(aspect: f64, chain: &mut Filters, config: &PlayoutConfig) {
if !is_close(aspect, config.processing.aspect, 0.03) {
chain.add_filter(
&format!(
@ -84,13 +84,13 @@ fn pad(aspect: f64, chain: &mut Filters, config: &GlobalConfig) {
}
}
fn fps(fps: f64, chain: &mut Filters, config: &GlobalConfig) {
fn fps(fps: f64, chain: &mut Filters, config: &PlayoutConfig) {
if fps != config.processing.fps {
chain.add_filter(&format!("fps={}", config.processing.fps), "video")
}
}
fn scale(v_stream: &ffprobe::Stream, aspect: f64, chain: &mut Filters, config: &GlobalConfig) {
fn scale(v_stream: &ffprobe::Stream, aspect: f64, chain: &mut Filters, config: &PlayoutConfig) {
// width: i64, height: i64
if let (Some(w), Some(h)) = (v_stream.width, v_stream.height) {
if w != config.processing.width || h != config.processing.height {
@ -137,10 +137,10 @@ fn fade(node: &mut Media, chain: &mut Filters, codec_type: &str) {
}
}
fn overlay(node: &mut Media, chain: &mut Filters, config: &GlobalConfig) {
fn overlay(node: &mut Media, chain: &mut Filters, config: &PlayoutConfig) {
if config.processing.add_logo
&& Path::new(&config.processing.logo).is_file()
&& &node.category.clone().unwrap_or_default() != "advertisement"
&& &node.category != "advertisement"
{
let mut logo_chain = v_overlay::filter_node(config, false);
@ -183,8 +183,10 @@ fn extend_video(node: &mut Media, chain: &mut Filters) {
}
/// add drawtext filter for lower thirds messages
fn add_text(node: &mut Media, chain: &mut Filters, config: &GlobalConfig) {
if config.text.add_text && config.text.over_pre {
fn add_text(node: &mut Media, chain: &mut Filters, config: &PlayoutConfig) {
if config.text.add_text
&& (config.text.text_from_filename || config.out.mode.to_lowercase() == "hls")
{
let filter = v_drawtext::filter_node(config, node);
chain.add_filter(&filter, "video");
@ -208,7 +210,7 @@ fn add_audio(node: &mut Media, chain: &mut Filters) {
.unwrap_or(&vec![])
.is_empty()
{
warn!("Clip: '{}' has no audio!", node.source);
warn!("Clip <b><magenta>{}</></b> has no audio!", node.source);
let audio = format!(
"aevalsrc=0:channel_layout=stereo:duration={}:sample_rate=48000",
node.out - node.seek
@ -233,7 +235,7 @@ fn extend_audio(node: &mut Media, chain: &mut Filters) {
}
/// Add single pass loudnorm filter to audio line.
fn add_loudnorm(node: &mut Media, chain: &mut Filters, config: &GlobalConfig) {
fn add_loudnorm(node: &mut Media, chain: &mut Filters, config: &PlayoutConfig) {
if config.processing.add_loudnorm
&& !node
.probe
@ -247,13 +249,13 @@ fn add_loudnorm(node: &mut Media, chain: &mut Filters, config: &GlobalConfig) {
}
}
fn audio_volume(chain: &mut Filters, config: &GlobalConfig) {
fn audio_volume(chain: &mut Filters, config: &PlayoutConfig) {
if config.processing.volume != 1.0 {
chain.add_filter(&format!("volume={}", config.processing.volume), "audio")
}
}
fn aspect_calc(aspect_string: &Option<String>, config: &GlobalConfig) -> f64 {
fn aspect_calc(aspect_string: &Option<String>, config: &PlayoutConfig) -> f64 {
let mut source_aspect = config.processing.aspect;
if let Some(aspect) = aspect_string {
@ -276,7 +278,12 @@ fn fps_calc(r_frame_rate: &str) -> f64 {
}
/// This realtime filter is important for HLS output to stay in sync.
fn realtime_filter(node: &mut Media, chain: &mut Filters, config: &GlobalConfig, codec_type: &str) {
fn realtime_filter(
node: &mut Media,
chain: &mut Filters,
config: &PlayoutConfig,
codec_type: &str,
) {
let mut t = "";
if codec_type == "audio" {
@ -300,7 +307,7 @@ fn realtime_filter(node: &mut Media, chain: &mut Filters, config: &GlobalConfig,
}
}
pub fn filter_chains(config: &GlobalConfig, node: &mut Media) -> Vec<String> {
pub fn filter_chains(config: &PlayoutConfig, node: &mut Media) -> Vec<String> {
let mut filters = Filters::new();
if let Some(probe) = node.probe.as_ref() {

View File

@ -2,9 +2,9 @@ use std::path::Path;
use regex::Regex;
use crate::utils::{GlobalConfig, Media};
use crate::utils::{Media, PlayoutConfig};
pub fn filter_node(config: &GlobalConfig, node: &mut Media) -> String {
pub fn filter_node(config: &PlayoutConfig, node: &Media) -> String {
let mut filter = String::new();
let mut font = String::new();
@ -13,7 +13,7 @@ pub fn filter_node(config: &GlobalConfig, node: &mut Media) -> String {
font = format!(":fontfile='{}'", config.text.fontfile)
}
if config.text.over_pre && config.text.text_from_filename {
if config.text.text_from_filename {
let source = node.source.clone();
let regex: Regex = Regex::new(&config.text.regex).unwrap();
@ -27,10 +27,10 @@ pub fn filter_node(config: &GlobalConfig, node: &mut Media) -> String {
.replace('%', "\\\\\\%")
.replace(':', "\\:");
filter = format!("drawtext=text='{escape}':{}{font}", config.text.style)
} else {
} else if let Some(socket) = config.text.bind_address.clone() {
filter = format!(
"zmq=b=tcp\\\\://'{}',drawtext=text=''{font}",
config.text.bind_address.replace(':', "\\:")
socket.replace(':', "\\:")
)
}
}

View File

@ -1,11 +1,11 @@
use std::path::Path;
use crate::utils::GlobalConfig;
use crate::utils::PlayoutConfig;
/// Overlay Filter
///
/// When a logo is set, we create here the filter for the server.
pub fn filter_node(config: &GlobalConfig, add_tail: bool) -> String {
pub fn filter_node(config: &PlayoutConfig, add_tail: bool) -> String {
let mut logo_chain = String::new();
if config.processing.add_logo && Path::new(&config.processing.logo).is_file() {

View File

@ -2,10 +2,8 @@ extern crate log;
extern crate simplelog;
pub mod filter;
pub mod input;
pub mod macros;
pub mod output;
pub mod rpc;
pub mod utils;
#[cfg(test)]
mod tests;
pub mod utils;

View File

@ -39,7 +39,7 @@ fn get_date_tomorrow() {
#[test]
fn test_delta() {
let mut config = GlobalConfig::new();
let mut config = PlayoutConfig::new(None);
config.mail.recipient = "".into();
config.processing.mode = "playlist".into();
config.playlist.day_start = "00:00:00".into();

View File

@ -8,14 +8,14 @@ use std::{
use serde::{Deserialize, Serialize};
use shlex::split;
use crate::utils::{get_args, time_to_sec};
use crate::utils::{free_tcp_socket, time_to_sec};
use crate::vec_strings;
/// Global Config
///
/// This we init ones, when ffplayout is starting and use them globally in the hole program.
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct GlobalConfig {
pub struct PlayoutConfig {
pub general: General,
pub rpc_server: RpcServer,
pub mail: Mail,
@ -30,7 +30,10 @@ pub struct GlobalConfig {
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct General {
pub help_text: String,
pub stop_threshold: f64,
#[serde(skip_serializing, skip_deserializing)]
pub generate: Option<Vec<String>>,
#[serde(skip_serializing, skip_deserializing)]
@ -39,6 +42,7 @@ pub struct General {
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct RpcServer {
pub help_text: String,
pub enable: bool,
pub address: String,
pub authorization: String,
@ -46,6 +50,7 @@ pub struct RpcServer {
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Mail {
pub help_text: String,
pub subject: String,
pub smtp_server: String,
pub starttls: bool,
@ -58,6 +63,7 @@ pub struct Mail {
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Logging {
pub help_text: String,
pub log_to_file: bool,
pub backup_count: usize,
pub local_time: bool,
@ -69,6 +75,7 @@ pub struct Logging {
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Processing {
pub help_text: String,
pub mode: String,
pub width: i64,
pub height: i64,
@ -85,28 +92,41 @@ pub struct Processing {
pub loud_tp: f32,
pub loud_lra: f32,
pub volume: f64,
#[serde(skip_serializing, skip_deserializing)]
pub settings: Option<Vec<String>>,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Ingest {
pub help_text: String,
pub enable: bool,
input_param: String,
#[serde(skip_serializing, skip_deserializing)]
pub input_cmd: Option<Vec<String>>,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Playlist {
pub help_text: String,
pub path: String,
pub day_start: String,
#[serde(skip_serializing, skip_deserializing)]
pub start_sec: Option<f64>,
pub length: String,
#[serde(skip_serializing, skip_deserializing)]
pub length_sec: Option<f64>,
pub infinit: bool,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Storage {
pub help_text: String,
pub path: String,
pub filler_clip: String,
pub extensions: Vec<String>,
@ -115,9 +135,15 @@ pub struct Storage {
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Text {
pub help_text: String,
pub add_text: bool,
pub over_pre: bool,
pub bind_address: String,
#[serde(skip_serializing, skip_deserializing)]
pub bind_address: Option<String>,
#[serde(skip_serializing, skip_deserializing)]
pub node_pos: Option<usize>,
pub fontfile: String,
pub text_from_filename: bool,
pub style: String,
@ -126,21 +152,26 @@ pub struct Text {
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Out {
pub help_text: String,
pub mode: String,
pub preview: bool,
preview_param: String,
pub preview_param: String,
#[serde(skip_serializing, skip_deserializing)]
pub preview_cmd: Option<Vec<String>>,
output_param: String,
pub output_param: String,
#[serde(skip_serializing, skip_deserializing)]
pub output_cmd: Option<Vec<String>>,
}
impl GlobalConfig {
impl PlayoutConfig {
/// Read config from YAML file, and set some extra config values.
pub fn new() -> Self {
let args = get_args();
pub fn new(cfg_path: Option<String>) -> Self {
let mut config_path = PathBuf::from("/etc/ffplayout/ffplayout.yml");
if let Some(cfg) = args.config {
if let Some(cfg) = cfg_path {
config_path = PathBuf::from(cfg);
}
@ -162,7 +193,7 @@ impl GlobalConfig {
}
};
let mut config: GlobalConfig =
let mut config: PlayoutConfig =
serde_yaml::from_reader(f).expect("Could not read config file.");
config.general.generate = None;
config.general.stat_file = env::temp_dir()
@ -217,66 +248,24 @@ impl GlobalConfig {
config.out.preview_cmd = split(config.out.preview_param.as_str());
config.out.output_cmd = split(config.out.output_param.as_str());
// Read command line arguments, and override the config with them.
if let Some(gen) = args.generate {
config.general.generate = Some(gen);
}
if let Some(log_path) = args.log {
if Path::new(&log_path).is_dir() {
config.logging.log_to_file = true;
}
config.logging.log_path = log_path;
}
if let Some(playlist) = args.playlist {
config.playlist.path = playlist;
}
if let Some(mode) = args.play_mode {
config.processing.mode = mode;
}
if let Some(folder) = args.folder {
config.storage.path = folder;
config.processing.mode = "folder".into();
}
if let Some(start) = args.start {
config.playlist.day_start = start.clone();
config.playlist.start_sec = Some(time_to_sec(&start));
}
if let Some(length) = args.length {
config.playlist.length = length.clone();
if length.contains(':') {
config.playlist.length_sec = Some(time_to_sec(&length));
} else {
config.playlist.length_sec = Some(86400.0);
}
}
if args.infinit {
config.playlist.infinit = args.infinit;
}
if let Some(output) = args.output {
config.out.mode = output;
}
if let Some(volume) = args.volume {
config.processing.volume = volume;
// when text overlay without text_from_filename is on, turn also the RPC server on,
// to get text messages from it
if config.text.add_text && !config.text.text_from_filename {
config.rpc_server.enable = true;
config.text.bind_address = free_tcp_socket();
config.text.node_pos = Some(2);
} else {
config.text.bind_address = None;
config.text.node_pos = None;
}
config
}
}
impl Default for GlobalConfig {
impl Default for PlayoutConfig {
fn default() -> Self {
Self::new()
Self::new(None)
}
}

View File

@ -1,32 +1,24 @@
use std::{
ffi::OsStr,
path::Path,
process::exit,
sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
mpsc::channel,
atomic::{AtomicUsize, Ordering},
{Arc, Mutex},
},
thread::sleep,
time::Duration,
};
use notify::{
DebouncedEvent::{Create, Remove, Rename},
{watcher, RecursiveMode, Watcher},
};
use rand::{seq::SliceRandom, thread_rng};
use simplelog::*;
use walkdir::WalkDir;
use crate::utils::{get_sec, GlobalConfig, Media};
use crate::utils::{file_extension, get_sec, Media, PlayoutConfig};
/// Folder Sources
///
/// Like playlist source, we create here a folder list for iterate over it.
#[derive(Debug, Clone)]
pub struct FolderSource {
config: GlobalConfig,
config: PlayoutConfig,
pub nodes: Arc<Mutex<Vec<Media>>>,
current_node: Media,
index: Arc<AtomicUsize>,
@ -34,7 +26,7 @@ pub struct FolderSource {
impl FolderSource {
pub fn new(
config: &GlobalConfig,
config: &PlayoutConfig,
current_list: Arc<Mutex<Vec<Media>>>,
global_index: Arc<AtomicUsize>,
) -> Self {
@ -154,65 +146,3 @@ impl Iterator for FolderSource {
}
}
}
fn file_extension(filename: &Path) -> Option<&str> {
filename.extension().and_then(OsStr::to_str)
}
/// Create a watcher, which monitor file changes.
/// When a change is register, update the current file list.
/// This makes it possible, to play infinitely and and always new files to it.
pub fn watchman(
config: GlobalConfig,
is_terminated: Arc<AtomicBool>,
sources: Arc<Mutex<Vec<Media>>>,
) {
let (tx, rx) = channel();
let path = config.storage.path;
if !Path::new(&path).exists() {
error!("Folder path not exists: '{path}'");
panic!("Folder path not exists: '{path}'");
}
let mut watcher = watcher(tx, Duration::from_secs(1)).unwrap();
watcher.watch(path, RecursiveMode::Recursive).unwrap();
while !is_terminated.load(Ordering::SeqCst) {
if let Ok(res) = rx.try_recv() {
match res {
Create(new_path) => {
let index = sources.lock().unwrap().len();
let media = Media::new(index, new_path.display().to_string(), false);
sources.lock().unwrap().push(media);
info!("Create new file: <b><magenta>{new_path:?}</></b>");
}
Remove(old_path) => {
sources
.lock()
.unwrap()
.retain(|x| x.source != old_path.display().to_string());
info!("Remove file: <b><magenta>{old_path:?}</></b>");
}
Rename(old_path, new_path) => {
let index = sources
.lock()
.unwrap()
.iter()
.position(|x| *x.source == old_path.display().to_string())
.unwrap();
let media = Media::new(index, new_path.display().to_string(), false);
sources.lock().unwrap()[index] = media;
info!("Rename file: <b><magenta>{old_path:?}</></b> to <b><magenta>{new_path:?}</></b>");
}
_ => (),
}
}
sleep(Duration::from_secs(5));
}
}

View File

@ -8,6 +8,7 @@
/// Beside that it is really very basic, without any logic.
use std::{
fs::{create_dir_all, write},
io::Error,
path::Path,
process::exit,
sync::{atomic::AtomicUsize, Arc, Mutex},
@ -16,8 +17,8 @@ use std::{
use chrono::{Duration, NaiveDate};
use simplelog::*;
use crate::input::FolderSource;
use crate::utils::{json_serializer::Playlist, GlobalConfig, Media};
use super::folder::FolderSource;
use crate::utils::{json_serializer::JsonPlaylist, time_to_sec, Media, PlayoutConfig};
/// Generate a vector with dates, from given range.
fn get_date_range(date_range: &[String]) -> Vec<String> {
@ -50,11 +51,30 @@ fn get_date_range(date_range: &[String]) -> Vec<String> {
}
/// Generate playlists
pub fn generate_playlist(config: &GlobalConfig, mut date_range: Vec<String>) {
let total_length = config.playlist.length_sec.unwrap();
pub fn generate_playlist(
config: &PlayoutConfig,
mut date_range: Vec<String>,
channel_name: Option<String>,
) -> Result<Vec<JsonPlaylist>, Error> {
let total_length = match config.playlist.length_sec {
Some(length) => length,
None => {
if config.playlist.length.contains(':') {
time_to_sec(&config.playlist.length)
} else {
86400.0
}
}
};
let current_list = Arc::new(Mutex::new(vec![Media::new(0, "".to_string(), false)]));
let index = Arc::new(AtomicUsize::new(0));
let playlist_root = Path::new(&config.playlist.path);
let mut playlists = vec![];
let channel = match channel_name {
Some(name) => name,
None => "Channel 1".to_string(),
};
if !playlist_root.is_dir() {
error!(
@ -79,10 +99,7 @@ pub fn generate_playlist(config: &GlobalConfig, mut date_range: Vec<String>) {
let playlist_path = playlist_root.join(year).join(month);
let playlist_file = &playlist_path.join(format!("{date}.json"));
if let Err(e) = create_dir_all(playlist_path) {
error!("Create folder failed: {e:?}");
exit(1);
}
create_dir_all(playlist_path)?;
if playlist_file.is_file() {
warn!(
@ -103,7 +120,8 @@ pub fn generate_playlist(config: &GlobalConfig, mut date_range: Vec<String>) {
let mut length = 0.0;
let mut round = 0;
let mut playlist = Playlist {
let mut playlist = JsonPlaylist {
channel: channel.clone(),
date,
current_file: None,
start_sec: None,
@ -130,17 +148,12 @@ pub fn generate_playlist(config: &GlobalConfig, mut date_range: Vec<String>) {
}
}
let json: String = match serde_json::to_string_pretty(&playlist) {
Ok(j) => j,
Err(e) => {
error!("Unable to serialize data: {e:?}");
exit(0);
}
};
playlists.push(playlist.clone());
if let Err(e) = write(playlist_file, &json) {
error!("Unable to write playlist: {e:?}");
exit(1)
};
let json: String = serde_json::to_string_pretty(&playlist)?;
write(playlist_file, &json)?;
}
Ok(playlists)
}

View File

@ -9,14 +9,15 @@ use std::{
use simplelog::*;
use crate::utils::{
get_date, is_remote, modified_time, time_from_header, validate_playlist, GlobalConfig, Media,
get_date, is_remote, modified_time, time_from_header, validate_playlist, Media, PlayoutConfig,
};
pub const DUMMY_LEN: f64 = 60.0;
/// This is our main playlist object, it holds all necessary information for the current day.
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Playlist {
pub struct JsonPlaylist {
pub channel: String,
pub date: String,
#[serde(skip_serializing, skip_deserializing)]
@ -31,13 +32,14 @@ pub struct Playlist {
pub program: Vec<Media>,
}
impl Playlist {
impl JsonPlaylist {
fn new(date: String, start: f64) -> Self {
let mut media = Media::new(0, String::new(), false);
media.begin = Some(start);
media.duration = DUMMY_LEN;
media.out = DUMMY_LEN;
Self {
channel: "Channel 1".into(),
date,
start_sec: Some(start),
current_file: None,
@ -47,7 +49,19 @@ impl Playlist {
}
}
fn set_defaults(mut playlist: Playlist, current_file: String, mut start_sec: f64) -> Playlist {
impl PartialEq for JsonPlaylist {
fn eq(&self, other: &Self) -> bool {
self.channel == other.channel && self.date == other.date && self.program == other.program
}
}
impl Eq for JsonPlaylist {}
fn set_defaults(
mut playlist: JsonPlaylist,
current_file: String,
mut start_sec: f64,
) -> JsonPlaylist {
playlist.current_file = Some(current_file);
playlist.start_sec = Some(start_sec);
@ -66,15 +80,15 @@ fn set_defaults(mut playlist: Playlist, current_file: String, mut start_sec: f64
playlist
}
/// Read json playlist file, fills Playlist struct and set some extra values,
/// Read json playlist file, fills JsonPlaylist struct and set some extra values,
/// which we need to process.
pub fn read_json(
config: &GlobalConfig,
config: &PlayoutConfig,
path: Option<String>,
is_terminated: Arc<AtomicBool>,
seek: bool,
next_start: f64,
) -> Playlist {
) -> JsonPlaylist {
let config_clone = config.clone();
let mut playlist_path = Path::new(&config.playlist.path).to_owned();
let start_sec = config.playlist.start_sec.unwrap();
@ -104,7 +118,7 @@ pub fn read_json(
let headers = resp.headers().clone();
if let Ok(body) = resp.text() {
let mut playlist: Playlist =
let mut playlist: JsonPlaylist =
serde_json::from_str(&body).expect("Could't read remote json playlist.");
if let Some(time) = time_from_header(&headers) {
@ -127,7 +141,7 @@ pub fn read_json(
.write(false)
.open(&current_file)
.expect("Could not open json playlist file.");
let mut playlist: Playlist =
let mut playlist: JsonPlaylist =
serde_json::from_reader(f).expect("Could't read json playlist file.");
playlist.modified = modified_time(&current_file);
@ -138,7 +152,7 @@ pub fn read_json(
return set_defaults(playlist, current_file, start_sec);
}
error!("Read playlist error, on: <b><magenta>{current_file}</></b>!");
error!("Read playlist error, on: <b><magenta>{current_file}</></b>");
Playlist::new(date, start_sec)
JsonPlaylist::new(date, start_sec)
}

View File

@ -5,7 +5,7 @@ use std::sync::{
use simplelog::*;
use crate::utils::{sec_to_time, valid_source, GlobalConfig, MediaProbe, Playlist};
use crate::utils::{sec_to_time, valid_source, JsonPlaylist, MediaProbe, PlayoutConfig};
/// Validate a given playlist, to check if:
///
@ -14,7 +14,11 @@ use crate::utils::{sec_to_time, valid_source, GlobalConfig, MediaProbe, Playlist
/// - total playtime fits target length from config
///
/// This function we run in a thread, to don't block the main function.
pub fn validate_playlist(playlist: Playlist, is_terminated: Arc<AtomicBool>, config: GlobalConfig) {
pub fn validate_playlist(
playlist: JsonPlaylist,
is_terminated: Arc<AtomicBool>,
config: PlayoutConfig,
) {
let date = playlist.date;
let mut length = config.playlist.length_sec.unwrap();
let mut begin = config.playlist.start_sec.unwrap();

View File

@ -22,10 +22,10 @@ use log::{Level, LevelFilter, Log, Metadata, Record};
use regex::Regex;
use simplelog::*;
use crate::utils::{GlobalConfig, ProcessControl};
use crate::utils::{PlayoutConfig, ProcessControl};
/// send log messages to mail recipient
pub fn send_mail(cfg: &GlobalConfig, msg: String) {
pub fn send_mail(cfg: &PlayoutConfig, msg: String) {
let recip = cfg
.mail
.recipient
@ -68,7 +68,7 @@ pub fn send_mail(cfg: &GlobalConfig, msg: String) {
///
/// Check every give seconds for messages and send them.
fn mail_queue(
cfg: GlobalConfig,
cfg: PlayoutConfig,
proc_ctl: ProcessControl,
messages: Arc<Mutex<Vec<String>>>,
interval: u64,
@ -92,7 +92,7 @@ pub struct LogMailer {
level: LevelFilter,
pub config: Config,
messages: Arc<Mutex<Vec<String>>>,
last_message: Arc<Mutex<String>>,
last_messages: Arc<Mutex<Vec<String>>>,
}
impl LogMailer {
@ -105,7 +105,7 @@ impl LogMailer {
level: log_level,
config,
messages,
last_message: Arc::new(Mutex::new(String::new())),
last_messages: Arc::new(Mutex::new(vec![String::new()])),
})
}
}
@ -118,12 +118,15 @@ impl Log for LogMailer {
fn log(&self, record: &Record<'_>) {
if self.enabled(record.metadata()) {
let rec = record.args().to_string();
let mut last_msg = self.last_message.lock().unwrap();
let mut last_msgs = self.last_messages.lock().unwrap();
// put message only to mail queue when it differs from last message
// this we do to prevent spamming the mail box
if *last_msg != rec {
*last_msg = rec.clone();
if !last_msgs.contains(&rec) {
if last_msgs.len() > 2 {
last_msgs.clear()
}
last_msgs.push(rec.clone());
let local: DateTime<Local> = Local::now();
let time_stamp = local.format("[%Y-%m-%d %H:%M:%S%.3f]");
let level = record.level().to_string().to_uppercase();
@ -166,9 +169,9 @@ fn clean_string(text: &str) -> String {
/// - file logger
/// - mail logger
pub fn init_logging(
config: &GlobalConfig,
proc_ctl: ProcessControl,
messages: Arc<Mutex<Vec<String>>>,
config: &PlayoutConfig,
proc_ctl: Option<ProcessControl>,
messages: Option<Arc<Mutex<Vec<String>>>>,
) -> Vec<Box<dyn SharedLogger>> {
let config_clone = config.clone();
let app_config = config.logging.clone();
@ -182,6 +185,9 @@ pub fn init_logging(
let mut log_config = ConfigBuilder::new()
.set_thread_level(LevelFilter::Off)
.set_target_level(LevelFilter::Off)
.add_filter_ignore_str("hyper")
.add_filter_ignore_str("sqlx")
.add_filter_ignore_str("reqwest")
.set_level_padding(LevelPadding::Left)
.set_time_level(time_level)
.clone();
@ -193,7 +199,7 @@ pub fn init_logging(
};
};
if app_config.log_to_file {
if app_config.log_to_file && &app_config.log_path != "none" {
let file_config = log_config
.clone()
.set_time_format_custom(format_description!(
@ -247,10 +253,12 @@ pub fn init_logging(
// set mail logger only the recipient is set in config
if config.mail.recipient.contains('@') && config.mail.recipient.contains('.') {
let messages_clone = messages.clone();
let messages_clone = messages.clone().unwrap();
let interval = config.mail.interval;
thread::spawn(move || mail_queue(config_clone, proc_ctl, messages_clone, interval));
thread::spawn(move || {
mail_queue(config_clone, proc_ctl.unwrap(), messages_clone, interval)
});
let mail_config = log_config.build();
@ -260,7 +268,7 @@ pub fn init_logging(
_ => LevelFilter::Error,
};
app_logger.push(LogMailer::new(filter, mail_config, messages));
app_logger.push(LogMailer::new(filter, mail_config, messages.unwrap()));
}
app_logger

View File

@ -1,6 +1,8 @@
use std::{
ffi::OsStr,
fs::{self, metadata},
io::{BufRead, BufReader, Error},
net::TcpListener,
path::Path,
process::{exit, ChildStderr, Command, Stdio},
time::{self, UNIX_EPOCH},
@ -9,25 +11,25 @@ use std::{
use chrono::{prelude::*, Duration};
use ffprobe::{ffprobe, Format, Stream};
use jsonrpc_http_server::hyper::HeaderMap;
use rand::prelude::*;
use regex::Regex;
use reqwest::header;
use serde::{Deserialize, Serialize};
use serde_json::json;
use simplelog::*;
mod arg_parse;
mod config;
pub mod config;
pub mod controller;
pub mod folder;
mod generator;
pub mod json_serializer;
mod json_validate;
mod logging;
pub use arg_parse::get_args;
pub use config::GlobalConfig;
pub use config::{self as playout_config, PlayoutConfig};
pub use controller::{PlayerControl, PlayoutStatus, ProcessControl, ProcessUnit::*};
pub use generator::generate_playlist;
pub use json_serializer::{read_json, Playlist, DUMMY_LEN};
pub use json_serializer::{read_json, JsonPlaylist, DUMMY_LEN};
pub use json_validate::validate_playlist;
pub use logging::{init_logging, send_mail};
@ -46,8 +48,7 @@ pub struct Media {
pub out: f64,
pub duration: f64,
#[serde(skip_serializing)]
pub category: Option<String>,
pub category: String,
pub source: String,
#[serde(skip_serializing, skip_deserializing)]
@ -92,7 +93,7 @@ impl Media {
seek: 0.0,
out: duration,
duration,
category: None,
category: String::new(),
source: src.clone(),
cmd: Some(vec!["-i".to_string(), src]),
filter: Some(vec![]),
@ -123,14 +124,26 @@ impl Media {
}
}
pub fn add_filter(&mut self, config: &GlobalConfig) {
pub fn add_filter(&mut self, config: &PlayoutConfig) {
let mut node = self.clone();
self.filter = Some(filter_chains(config, &mut node))
}
}
impl PartialEq for Media {
fn eq(&self, other: &Self) -> bool {
self.seek == other.seek
&& self.out == other.out
&& self.duration == other.duration
&& self.source == other.source
&& self.category == other.category
}
}
impl Eq for Media {}
/// We use the ffprobe crate, but we map the metadata to our needs.
#[derive(Debug, Serialize, Deserialize, Clone)]
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
pub struct MediaProbe {
pub format: Option<Format>,
pub audio_streams: Option<Vec<Stream>>,
@ -188,10 +201,25 @@ impl MediaProbe {
}
}
/// Covert JSON string to ffmpeg filter command.
pub fn get_filter_from_json(raw_text: String) -> String {
let re1 = Regex::new(r#""|}|\{"#).unwrap();
let re2 = Regex::new(r#"id:[0-9]+,?|name:[^,]?,?"#).unwrap();
let re3 = Regex::new(r#"text:([^,]*)"#).unwrap();
let text = re1.replace_all(&raw_text, "");
let text = re2.replace_all(&text, "").clone();
let filter = re3
.replace_all(&text, "text:'$1'")
.replace(':', "=")
.replace(',', ":");
filter
}
/// Write current status to status file in temp folder.
///
/// The status file is init in main function and mostly modified in RPC server.
pub fn write_status(config: &GlobalConfig, date: &str, shift: f64) {
pub fn write_status(config: &PlayoutConfig, date: &str, shift: f64) {
let data = json!({
"time_shift": shift,
"date": date,
@ -199,7 +227,7 @@ pub fn write_status(config: &GlobalConfig, date: &str, shift: f64) {
let status_data: String = serde_json::to_string(&data).expect("Serialize status data failed");
if let Err(e) = fs::write(&config.general.stat_file, &status_data) {
error!("Unable to write file: {e:?}")
error!("Unable to write status file: {e:?}")
};
}
@ -294,6 +322,11 @@ pub fn sec_to_time(sec: f64) -> String {
date_time.format("%H:%M:%S%.3f").to_string()
}
/// get file extension
pub fn file_extension(filename: &Path) -> Option<&str> {
filename.extension().and_then(OsStr::to_str)
}
/// Test if given numbers are close to each other,
/// with a third number for setting the maximum range.
pub fn is_close(a: f64, b: f64, to: f64) -> bool {
@ -308,7 +341,7 @@ pub fn is_close(a: f64, b: f64, to: f64) -> bool {
/// if we still in sync.
///
/// We also get here the global delta between clip start and time when a new playlist should start.
pub fn get_delta(config: &GlobalConfig, begin: &f64) -> (f64, f64) {
pub fn get_delta(config: &PlayoutConfig, begin: &f64) -> (f64, f64) {
let mut current_time = get_sec();
let start = config.playlist.start_sec.unwrap();
let length = time_to_sec(&config.playlist.length);
@ -339,7 +372,7 @@ pub fn get_delta(config: &GlobalConfig, begin: &f64) -> (f64, f64) {
}
/// Check if clip in playlist is in sync with global time.
pub fn check_sync(config: &GlobalConfig, delta: f64) -> bool {
pub fn check_sync(config: &PlayoutConfig, delta: f64) -> bool {
if delta.abs() > config.general.stop_threshold && config.general.stop_threshold > 0.0 {
error!("Clip begin out of sync for <yellow>{delta:.3}</> seconds. Stop playout!");
return false;
@ -349,7 +382,7 @@ pub fn check_sync(config: &GlobalConfig, delta: f64) -> bool {
}
/// Create a dummy clip as a placeholder for missing video files.
pub fn gen_dummy(config: &GlobalConfig, duration: f64) -> (String, Vec<String>) {
pub fn gen_dummy(config: &PlayoutConfig, duration: f64) -> (String, Vec<String>) {
let color = "#121212";
let source = format!(
"color=c={color}:s={}x{}:d={duration}",
@ -567,7 +600,7 @@ fn ffmpeg_libs_and_filter() -> (Vec<String>, Vec<String>) {
/// Validate ffmpeg/ffprobe/ffplay.
///
/// Check if they are in system and has all filters and codecs we need.
pub fn validate_ffmpeg(config: &GlobalConfig) {
pub fn validate_ffmpeg(config: &PlayoutConfig) {
is_in_system("ffmpeg");
is_in_system("ffprobe");
@ -596,6 +629,19 @@ pub fn validate_ffmpeg(config: &GlobalConfig) {
}
}
/// get a free tcp socket
pub fn free_tcp_socket() -> Option<String> {
for _ in 0..100 {
let port = rand::thread_rng().gen_range(45321..54268);
if TcpListener::bind(("127.0.0.1", port)).is_ok() {
return Some(format!("127.0.0.1:{port}"));
}
}
None
}
/// Get system time, in non test case.
#[cfg(not(test))]
pub fn time_now() -> DateTime<Local> {