work on single app

This commit is contained in:
jb-alvarado 2024-05-28 13:28:13 +02:00
parent c526f09c0c
commit 7ffb44263b
31 changed files with 4168 additions and 19 deletions

View File

@ -19,6 +19,7 @@
},
"cSpell.words": [
"actix",
"ffpengine",
"rsplit",
"starttls",
"tokio",

60
Cargo.lock generated
View File

@ -1294,22 +1294,44 @@ checksum = "9fc0510504f03c51ada170672ac806f1f105a88aa97a5281117e1ddc3368e51a"
name = "ffplayout"
version = "0.23.0"
dependencies = [
"actix-files",
"actix-multipart",
"actix-web",
"actix-web-grants",
"actix-web-httpauth",
"actix-web-lab",
"actix-web-static-files",
"argon2",
"chrono",
"clap",
"crossbeam-channel",
"derive_more",
"faccess",
"ffplayout-lib",
"futures",
"itertools",
"notify",
"notify-debouncer-full",
"futures-util",
"home",
"jsonwebtoken",
"lazy_static",
"lexical-sort",
"local-ip-address",
"once_cell",
"parking_lot",
"path-clean",
"rand",
"regex",
"relative-path",
"reqwest",
"rpassword",
"sanitize-filename",
"serde",
"serde_json",
"simplelog",
"tiny_http",
"zeromq",
"sqlx",
"static-files",
"sysinfo",
"tokio",
"tokio-stream",
"toml_edit",
"uuid",
]
[[package]]
@ -1356,6 +1378,28 @@ dependencies = [
"uuid",
]
[[package]]
name = "ffplayout-engine"
version = "0.23.0"
dependencies = [
"chrono",
"clap",
"crossbeam-channel",
"ffplayout-lib",
"futures",
"itertools",
"notify",
"notify-debouncer-full",
"rand",
"regex",
"reqwest",
"serde",
"serde_json",
"simplelog",
"tiny_http",
"zeromq",
]
[[package]]
name = "ffplayout-lib"
version = "0.23.0"
@ -3593,7 +3637,7 @@ version = "0.23.0"
dependencies = [
"chrono",
"crossbeam-channel",
"ffplayout",
"ffplayout-engine",
"ffplayout-lib",
"ffprobe",
"file-rotate",

View File

@ -1,6 +1,6 @@
[workspace]
members = ["ffplayout-api", "ffplayout-engine", "lib", "tests"]
default-members = ["ffplayout-api", "ffplayout-engine", "tests"]
members = ["ffplayout", "ffplayout-api", "ffplayout-engine", "lib", "tests"]
default-members = ["ffplayout", "ffplayout-api", "ffplayout-engine", "tests"]
resolver = "2"
[workspace.package]

View File

@ -1,5 +1,5 @@
[package]
name = "ffplayout"
name = "ffplayout-engine"
description = "24/7 playout based on rust and ffmpeg"
readme = "README.md"
version.workspace = true
@ -8,7 +8,7 @@ authors.workspace = true
repository.workspace = true
edition.workspace = true
default-run = "ffplayout"
default-run = "ffplayout_engine"
[dependencies]
ffplayout-lib = { path = "../lib" }
@ -32,7 +32,7 @@ zeromq = { version = "0.3", default-features = false, features = [
] }
[[bin]]
name = "ffplayout"
name = "ffplayout_engine"
path = "src/main.rs"
# DEBIAN DEB PACKAGE

View File

@ -12,7 +12,7 @@ use serde::{Deserialize, Serialize};
use serde_json::json;
use simplelog::*;
use ffplayout::{
use ffplayout_engine::{
output::{player, write_hls},
rpc::run_server,
utils::{arg_parse::get_args, get_config},
@ -25,7 +25,7 @@ use ffplayout_lib::utils::{
};
#[cfg(debug_assertions)]
use ffplayout::utils::Args;
use ffplayout_engine::utils::Args;
#[cfg(debug_assertions)]
use ffplayout_lib::utils::{mock_time, time_now};

60
ffplayout/Cargo.toml Normal file
View File

@ -0,0 +1,60 @@
[package]
name = "ffplayout"
description = "Rest API for ffplayout"
readme = "README.md"
version.workspace = true
license.workspace = true
authors.workspace = true
repository.workspace = true
edition.workspace = true
[features]
default = ["embed_frontend"]
embed_frontend = []
[dependencies]
ffplayout-lib = { path = "../lib" }
actix-files = "0.6"
actix-multipart = "0.6"
actix-web = "4"
actix-web-grants = "4"
actix-web-httpauth = "0.8"
actix-web-lab = "0.20"
actix-web-static-files = "4.0"
argon2 = "0.5"
chrono = { version = "0.4", default-features = false, features = ["clock", "std"] }
clap = { version = "4.3", features = ["derive"] }
derive_more = "0.99"
faccess = "0.2"
futures-util = { version = "0.3", default-features = false, features = ["std"] }
home = "0.5"
jsonwebtoken = "9"
lazy_static = "1.4"
lexical-sort = "0.3"
local-ip-address = "0.6"
once_cell = "1.18"
parking_lot = "0.12"
path-clean = "1.0"
rand = "0.8"
regex = "1"
relative-path = "1.8"
reqwest = { version = "0.12", default-features = false, features = ["blocking", "json", "rustls-tls"] }
rpassword = "7.2"
sanitize-filename = "0.5"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
simplelog = { version = "0.12", features = ["paris"] }
static-files = "0.2"
sysinfo ={ version = "0.30", features = ["linux-netdevs"] }
sqlx = { version = "0.7", features = ["runtime-tokio", "sqlite"] }
tokio = { version = "1.29", features = ["full"] }
tokio-stream = "0.1"
toml_edit = {version ="0.22", features = ["serde"]}
uuid = "1.8"
[build-dependencies]
static-files = "0.2"
[[bin]]
name = "ffplayout"
path = "src/main.rs"

15
ffplayout/build.rs Normal file
View File

@ -0,0 +1,15 @@
use static_files::NpmBuild;
fn main() -> std::io::Result<()> {
if !cfg!(debug_assertions) && cfg!(feature = "embed_frontend") {
NpmBuild::new("../ffplayout-frontend")
.install()?
.run("generate")?
.target("../ffplayout-frontend/.output/public")
.change_detection()
.to_resource_dir()
.build()
} else {
Ok(())
}
}

46
ffplayout/src/api/auth.rs Normal file
View File

@ -0,0 +1,46 @@
use actix_web::error::ErrorUnauthorized;
use actix_web::Error;
use chrono::{TimeDelta, Utc};
use jsonwebtoken::{self, DecodingKey, EncodingKey, Header, Validation};
use serde::{Deserialize, Serialize};
use crate::utils::{GlobalSettings, Role};
// Token lifetime
const JWT_EXPIRATION_DAYS: i64 = 7;
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub struct Claims {
pub id: i32,
pub username: String,
pub role: Role,
exp: i64,
}
impl Claims {
pub fn new(id: i32, username: String, role: Role) -> Self {
Self {
id,
username,
role,
exp: (Utc::now() + TimeDelta::try_days(JWT_EXPIRATION_DAYS).unwrap()).timestamp(),
}
}
}
/// Create a json web token (JWT)
pub fn create_jwt(claims: Claims) -> Result<String, Error> {
let config = GlobalSettings::global();
let encoding_key = EncodingKey::from_secret(config.secret.as_bytes());
jsonwebtoken::encode(&Header::default(), &claims, &encoding_key)
.map_err(|e| ErrorUnauthorized(e.to_string()))
}
/// Decode a json web token (JWT)
pub async fn decode_jwt(token: &str) -> Result<Claims, Error> {
let config = GlobalSettings::global();
let decoding_key = DecodingKey::from_secret(config.secret.as_bytes());
jsonwebtoken::decode::<Claims>(token, &decoding_key, &Validation::default())
.map(|data| data.claims)
.map_err(|e| ErrorUnauthorized(e.to_string()))
}

2
ffplayout/src/api/mod.rs Normal file
View File

@ -0,0 +1,2 @@
pub mod auth;
pub mod routes;

1230
ffplayout/src/api/routes.rs Normal file

File diff suppressed because it is too large Load Diff

351
ffplayout/src/db/handles.rs Normal file
View File

@ -0,0 +1,351 @@
use std::env;
use argon2::{
password_hash::{rand_core::OsRng, SaltString},
Argon2, PasswordHasher,
};
use rand::{distributions::Alphanumeric, Rng};
use simplelog::*;
use sqlx::{migrate::MigrateDatabase, sqlite::SqliteQueryResult, Pool, Sqlite};
use tokio::task;
use crate::db::{
db_pool,
models::{Channel, TextPreset, User},
};
use crate::utils::{db_path, local_utc_offset, GlobalSettings, Role};
async fn create_schema(conn: &Pool<Sqlite>) -> Result<SqliteQueryResult, sqlx::Error> {
let query = r#"PRAGMA foreign_keys = ON;
CREATE TABLE IF NOT EXISTS global
(
id INTEGER PRIMARY KEY AUTOINCREMENT,
secret TEXT NOT NULL,
UNIQUE(secret)
);
CREATE TABLE IF NOT EXISTS roles
(
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
UNIQUE(name)
);
CREATE TABLE IF NOT EXISTS channels
(
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
preview_url TEXT NOT NULL,
config_path TEXT NOT NULL,
extra_extensions TEXT NOT NULL,
active INTEGER NOT NULL DEFAULT 0,
UNIQUE(name)
);
CREATE TABLE IF NOT EXISTS presets
(
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
text TEXT NOT NULL,
x TEXT NOT NULL,
y TEXT NOT NULL,
fontsize TEXT NOT NULL,
line_spacing TEXT NOT NULL,
fontcolor TEXT NOT NULL,
box TEXT NOT NULL,
boxcolor TEXT NOT NULL,
boxborderw TEXT NOT NULL,
alpha TEXT NOT NULL,
channel_id INTEGER NOT NULL DEFAULT 1,
FOREIGN KEY (channel_id) REFERENCES channels (id) ON UPDATE SET NULL ON DELETE SET NULL,
UNIQUE(name)
);
CREATE TABLE IF NOT EXISTS user
(
id INTEGER PRIMARY KEY AUTOINCREMENT,
mail TEXT NOT NULL,
username TEXT NOT NULL,
password TEXT NOT NULL,
role_id INTEGER NOT NULL DEFAULT 2,
channel_id INTEGER NOT NULL DEFAULT 1,
FOREIGN KEY (role_id) REFERENCES roles (id) ON UPDATE SET NULL ON DELETE SET NULL,
FOREIGN KEY (channel_id) REFERENCES channels (id) ON UPDATE SET NULL ON DELETE SET NULL,
UNIQUE(mail, username)
);"#;
sqlx::query(query).execute(conn).await
}
pub async fn db_init(domain: Option<String>) -> Result<&'static str, Box<dyn std::error::Error>> {
let db_path = db_path()?;
if !Sqlite::database_exists(db_path).await.unwrap_or(false) {
Sqlite::create_database(db_path).await.unwrap();
let pool = db_pool().await?;
match create_schema(&pool).await {
Ok(_) => info!("Database created Successfully"),
Err(e) => panic!("{e}"),
}
}
let secret: String = rand::thread_rng()
.sample_iter(&Alphanumeric)
.take(80)
.map(char::from)
.collect();
let url = match domain {
Some(d) => format!("http://{d}/live/stream.m3u8"),
None => "http://localhost/live/stream.m3u8".to_string(),
};
let config_path = if env::consts::OS == "linux" {
"/etc/ffplayout/ffplayout.toml"
} else {
"./assets/ffplayout.toml"
};
let query = "CREATE TRIGGER global_row_count
BEFORE INSERT ON global
WHEN (SELECT COUNT(*) FROM global) >= 1
BEGIN
SELECT RAISE(FAIL, 'Database is already initialized!');
END;
INSERT INTO global(secret) VALUES($1);
INSERT INTO channels(name, preview_url, config_path, extra_extensions, active)
VALUES('Channel 1', $2, $3, 'jpg,jpeg,png', 0);
INSERT INTO roles(name) VALUES('admin'), ('user'), ('guest');
INSERT INTO presets(name, text, x, y, fontsize, line_spacing, fontcolor, box, boxcolor, boxborderw, alpha, channel_id)
VALUES('Default', 'Wellcome to ffplayout messenger!', '(w-text_w)/2', '(h-text_h)/2', '24', '4', '#ffffff@0xff', '0', '#000000@0x80', '4', '1.0', '1'),
('Empty Text', '', '0', '0', '24', '4', '#000000', '0', '#000000', '0', '0', '1'),
('Bottom Text fade in', 'The upcoming event will be delayed by a few minutes.', '(w-text_w)/2', '(h-line_h)*0.9', '24', '4', '#ffffff',
'1', '#000000@0x80', '4', 'ifnot(ld(1),st(1,t));if(lt(t,ld(1)+1),0,if(lt(t,ld(1)+2),(t-(ld(1)+1))/1,if(lt(t,ld(1)+8),1,if(lt(t,ld(1)+9),(1-(t-(ld(1)+8)))/1,0))))', '1'),
('Scrolling Text', 'We have a very important announcement to make.', 'ifnot(ld(1),st(1,t));if(lt(t,ld(1)+1),w+4,w-w/12*mod(t-ld(1),12*(w+tw)/w))', '(h-line_h)*0.9',
'24', '4', '#ffffff', '1', '#000000@0x80', '4', '1.0', '1');";
let pool = db_pool().await?;
sqlx::query(query)
.bind(secret)
.bind(url)
.bind(config_path)
.execute(&pool)
.await?;
Ok("Database initialized!")
}
pub async fn select_global(conn: &Pool<Sqlite>) -> Result<GlobalSettings, sqlx::Error> {
let query = "SELECT secret FROM global WHERE id = 1";
sqlx::query_as(query).fetch_one(conn).await
}
pub async fn select_channel(conn: &Pool<Sqlite>, id: &i32) -> Result<Channel, sqlx::Error> {
let query = "SELECT * FROM channels WHERE id = $1";
let mut result: Channel = sqlx::query_as(query).bind(id).fetch_one(conn).await?;
result.utc_offset = local_utc_offset();
Ok(result)
}
pub async fn select_all_channels(conn: &Pool<Sqlite>) -> Result<Vec<Channel>, sqlx::Error> {
let query = "SELECT * FROM channels";
let mut results: Vec<Channel> = sqlx::query_as(query).fetch_all(conn).await?;
for result in results.iter_mut() {
result.utc_offset = local_utc_offset();
}
Ok(results)
}
pub async fn update_channel(
conn: &Pool<Sqlite>,
id: i32,
channel: Channel,
) -> Result<SqliteQueryResult, sqlx::Error> {
let query = "UPDATE channels SET name = $2, preview_url = $3, config_path = $4, extra_extensions = $5 WHERE id = $1";
sqlx::query(query)
.bind(id)
.bind(channel.name)
.bind(channel.preview_url)
.bind(channel.config_path)
.bind(channel.extra_extensions)
.execute(conn)
.await
}
pub async fn insert_channel(conn: &Pool<Sqlite>, channel: Channel) -> Result<Channel, sqlx::Error> {
let query = "INSERT INTO channels (name, preview_url, config_path, extra_extensions) VALUES($1, $2, $3, $4)";
let result = sqlx::query(query)
.bind(channel.name)
.bind(channel.preview_url)
.bind(channel.config_path)
.bind(channel.extra_extensions)
.execute(conn)
.await?;
sqlx::query_as("SELECT * FROM channels WHERE id = $1")
.bind(result.last_insert_rowid())
.fetch_one(conn)
.await
}
pub async fn delete_channel(
conn: &Pool<Sqlite>,
id: &i32,
) -> Result<SqliteQueryResult, sqlx::Error> {
let query = "DELETE FROM channels WHERE id = $1";
sqlx::query(query).bind(id).execute(conn).await
}
pub async fn select_last_channel(conn: &Pool<Sqlite>) -> Result<i32, sqlx::Error> {
let query = "SELECT id FROM channels ORDER BY id DESC LIMIT 1;";
sqlx::query_scalar(query).fetch_one(conn).await
}
pub async fn select_role(conn: &Pool<Sqlite>, id: &i32) -> Result<Role, sqlx::Error> {
let query = "SELECT name FROM roles WHERE id = $1";
let result: Role = sqlx::query_as(query).bind(id).fetch_one(conn).await?;
Ok(result)
}
pub async fn select_login(conn: &Pool<Sqlite>, user: &str) -> Result<User, sqlx::Error> {
let query = "SELECT id, mail, username, password, role_id FROM user WHERE username = $1";
sqlx::query_as(query).bind(user).fetch_one(conn).await
}
pub async fn select_user(conn: &Pool<Sqlite>, user: &str) -> Result<User, sqlx::Error> {
let query = "SELECT id, mail, username, role_id FROM user WHERE username = $1";
sqlx::query_as(query).bind(user).fetch_one(conn).await
}
pub async fn select_user_by_id(conn: &Pool<Sqlite>, id: i32) -> Result<User, sqlx::Error> {
let query = "SELECT id, mail, username, role_id FROM user WHERE id = $1";
sqlx::query_as(query).bind(id).fetch_one(conn).await
}
pub async fn select_users(conn: &Pool<Sqlite>) -> Result<Vec<User>, sqlx::Error> {
let query = "SELECT id, username FROM user";
sqlx::query_as(query).fetch_all(conn).await
}
pub async fn insert_user(
conn: &Pool<Sqlite>,
user: User,
) -> Result<SqliteQueryResult, sqlx::Error> {
let password_hash = task::spawn_blocking(move || {
let salt = SaltString::generate(&mut OsRng);
let hash = Argon2::default()
.hash_password(user.password.clone().as_bytes(), &salt)
.unwrap();
hash.to_string()
})
.await
.unwrap();
let query = "INSERT INTO user (mail, username, password, role_id) VALUES($1, $2, $3, $4)";
sqlx::query(query)
.bind(user.mail)
.bind(user.username)
.bind(password_hash)
.bind(user.role_id)
.execute(conn)
.await
}
pub async fn update_user(
conn: &Pool<Sqlite>,
id: i32,
fields: String,
) -> Result<SqliteQueryResult, sqlx::Error> {
let query = format!("UPDATE user SET {fields} WHERE id = $1");
sqlx::query(&query).bind(id).execute(conn).await
}
pub async fn delete_user(
conn: &Pool<Sqlite>,
name: &str,
) -> Result<SqliteQueryResult, sqlx::Error> {
let query = "DELETE FROM user WHERE username = $1;";
sqlx::query(query).bind(name).execute(conn).await
}
pub async fn select_presets(conn: &Pool<Sqlite>, id: i32) -> Result<Vec<TextPreset>, sqlx::Error> {
let query = "SELECT * FROM presets WHERE channel_id = $1";
sqlx::query_as(query).bind(id).fetch_all(conn).await
}
pub async fn update_preset(
conn: &Pool<Sqlite>,
id: &i32,
preset: TextPreset,
) -> Result<SqliteQueryResult, sqlx::Error> {
let query =
"UPDATE presets SET name = $1, text = $2, x = $3, y = $4, fontsize = $5, line_spacing = $6,
fontcolor = $7, alpha = $8, box = $9, boxcolor = $10, boxborderw = $11 WHERE id = $12";
sqlx::query(query)
.bind(preset.name)
.bind(preset.text)
.bind(preset.x)
.bind(preset.y)
.bind(preset.fontsize)
.bind(preset.line_spacing)
.bind(preset.fontcolor)
.bind(preset.alpha)
.bind(preset.r#box)
.bind(preset.boxcolor)
.bind(preset.boxborderw)
.bind(id)
.execute(conn)
.await
}
pub async fn insert_preset(
conn: &Pool<Sqlite>,
preset: TextPreset,
) -> Result<SqliteQueryResult, sqlx::Error> {
let query =
"INSERT INTO presets (channel_id, name, text, x, y, fontsize, line_spacing, fontcolor, alpha, box, boxcolor, boxborderw)
VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)";
sqlx::query(query)
.bind(preset.channel_id)
.bind(preset.name)
.bind(preset.text)
.bind(preset.x)
.bind(preset.y)
.bind(preset.fontsize)
.bind(preset.line_spacing)
.bind(preset.fontcolor)
.bind(preset.alpha)
.bind(preset.r#box)
.bind(preset.boxcolor)
.bind(preset.boxborderw)
.execute(conn)
.await
}
pub async fn delete_preset(
conn: &Pool<Sqlite>,
id: &i32,
) -> Result<SqliteQueryResult, sqlx::Error> {
let query = "DELETE FROM presets WHERE id = $1;";
sqlx::query(query).bind(id).execute(conn).await
}

13
ffplayout/src/db/mod.rs Normal file
View File

@ -0,0 +1,13 @@
use sqlx::{Pool, Sqlite, SqlitePool};
pub mod handles;
pub mod models;
use crate::utils::db_path;
pub async fn db_pool() -> Result<Pool<Sqlite>, sqlx::Error> {
let db_path = db_path().unwrap();
let conn = SqlitePool::connect(db_path).await?;
Ok(conn)
}

123
ffplayout/src/db/models.rs Normal file
View File

@ -0,0 +1,123 @@
use ffplayout_lib::utils::ProcessControl;
use regex::Regex;
use serde::{
de::{self, Visitor},
Deserialize, Serialize,
};
#[derive(Debug, Deserialize, Serialize, sqlx::FromRow)]
pub struct User {
#[sqlx(default)]
#[serde(skip_deserializing)]
pub id: i32,
#[sqlx(default)]
#[serde(skip_serializing_if = "Option::is_none")]
pub mail: Option<String>,
pub username: String,
#[sqlx(default)]
#[serde(skip_serializing, default = "empty_string")]
pub password: String,
#[sqlx(default)]
#[serde(skip_serializing)]
pub role_id: Option<i32>,
#[sqlx(default)]
#[serde(skip_serializing)]
pub channel_id: Option<i32>,
#[sqlx(default)]
#[serde(skip_serializing_if = "Option::is_none")]
pub token: Option<String>,
}
fn empty_string() -> String {
"".to_string()
}
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct LoginUser {
pub id: i32,
pub username: String,
}
impl LoginUser {
pub fn new(id: i32, username: String) -> Self {
Self { id, username }
}
}
#[derive(Debug, Deserialize, Serialize, Clone, sqlx::FromRow)]
pub struct TextPreset {
#[sqlx(default)]
#[serde(skip_deserializing)]
pub id: i32,
pub channel_id: i32,
pub name: String,
pub text: String,
pub x: String,
pub y: String,
#[serde(deserialize_with = "deserialize_number_or_string")]
pub fontsize: String,
#[serde(deserialize_with = "deserialize_number_or_string")]
pub line_spacing: String,
pub fontcolor: String,
pub r#box: String,
pub boxcolor: String,
#[serde(deserialize_with = "deserialize_number_or_string")]
pub boxborderw: String,
#[serde(deserialize_with = "deserialize_number_or_string")]
pub alpha: String,
}
/// Deserialize number or string
pub fn deserialize_number_or_string<'de, D>(deserializer: D) -> Result<String, D::Error>
where
D: serde::Deserializer<'de>,
{
struct StringOrNumberVisitor;
impl<'de> Visitor<'de> for StringOrNumberVisitor {
type Value = String;
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
formatter.write_str("a string or a number")
}
fn visit_str<E: de::Error>(self, value: &str) -> Result<Self::Value, E> {
let re = Regex::new(r"0,([0-9]+)").unwrap();
let clean_string = re.replace_all(value, "0.$1").to_string();
Ok(clean_string)
}
fn visit_u64<E: de::Error>(self, value: u64) -> Result<Self::Value, E> {
Ok(value.to_string())
}
fn visit_i64<E: de::Error>(self, value: i64) -> Result<Self::Value, E> {
Ok(value.to_string())
}
fn visit_f64<E: de::Error>(self, value: f64) -> Result<Self::Value, E> {
Ok(value.to_string())
}
}
deserializer.deserialize_any(StringOrNumberVisitor)
}
#[derive(Debug, Deserialize, Serialize, sqlx::FromRow)]
pub struct Channel {
#[serde(skip_deserializing)]
pub id: i32,
pub name: String,
pub preview_url: String,
pub config_path: String,
pub extra_extensions: String,
pub active: bool,
#[sqlx(default)]
#[serde(default)]
pub utc_offset: i32,
#[serde(skip_serializing, skip_deserializing)]
#[sqlx(skip)]
pub control: ProcessControl,
}

21
ffplayout/src/lib.rs Normal file
View File

@ -0,0 +1,21 @@
use std::sync::{Arc, Mutex};
use clap::Parser;
use lazy_static::lazy_static;
use sysinfo::{Disks, Networks, System};
pub mod api;
pub mod db;
pub mod sse;
pub mod utils;
use utils::args_parse::Args;
lazy_static! {
pub static ref ARGS: Args = Args::parse();
pub static ref DISKS: Arc<Mutex<Disks>> =
Arc::new(Mutex::new(Disks::new_with_refreshed_list()));
pub static ref NETWORKS: Arc<Mutex<Networks>> =
Arc::new(Mutex::new(Networks::new_with_refreshed_list()));
pub static ref SYS: Arc<Mutex<System>> = Arc::new(Mutex::new(System::new_all()));
}

216
ffplayout/src/main.rs Normal file
View File

@ -0,0 +1,216 @@
use std::{
collections::HashSet,
env,
process::exit,
sync::{atomic::Ordering, Arc},
thread,
};
use actix_files::Files;
use actix_web::{
dev::ServiceRequest, middleware::Logger, web, App, Error, HttpMessage, HttpServer,
};
use actix_web_grants::authorities::AttachAuthorities;
use actix_web_httpauth::{extractors::bearer::BearerAuth, middleware::HttpAuthentication};
#[cfg(all(not(debug_assertions), feature = "embed_frontend"))]
use actix_web_static_files::ResourceFiles;
use path_clean::PathClean;
use simplelog::*;
use tokio::sync::Mutex;
use ffplayout::{
api::{auth, routes::*},
db::{db_pool, handles, models::LoginUser},
sse::{broadcast::Broadcaster, routes::*, AuthState},
utils::{control::ProcessControl, db_path, init_config, run_args},
ARGS,
};
#[cfg(any(debug_assertions, not(feature = "embed_frontend")))]
use ffplayout::utils::public_path;
use ffplayout_lib::utils::{init_logging, PlayoutConfig};
#[cfg(all(not(debug_assertions), feature = "embed_frontend"))]
include!(concat!(env!("OUT_DIR"), "/generated.rs"));
async fn validator(
req: ServiceRequest,
credentials: BearerAuth,
) -> Result<ServiceRequest, (Error, ServiceRequest)> {
// We just get permissions from JWT
match auth::decode_jwt(credentials.token()).await {
Ok(claims) => {
req.attach(vec![claims.role]);
req.extensions_mut()
.insert(LoginUser::new(claims.id, claims.username));
Ok(req)
}
Err(e) => Err((e, req)),
}
}
#[actix_web::main]
async fn main() -> std::io::Result<()> {
let mut config = PlayoutConfig::new(None, None);
config.mail.recipient = String::new();
config.logging.log_to_file = false;
config.logging.timestamp = false;
let logging = init_logging(&config, None, None);
CombinedLogger::init(logging).unwrap();
if let Err(c) = run_args().await {
exit(c);
}
let pool = match db_pool().await {
Ok(p) => p,
Err(e) => {
error!("{e}");
exit(1);
}
};
let mut channels = handles::select_all_channels(&pool)
.await
.unwrap_or_default();
for channel in channels.iter_mut() {
// TODO: maybe run here the player
channel
.control
.is_alive
.store(channel.active, Ordering::SeqCst)
}
thread::spawn(move || {
println!("{channels:?}");
});
if let Some(conn) = &ARGS.listen {
if db_path().is_err() {
error!("Database is not initialized! Init DB first and add admin user.");
exit(1);
}
init_config(&pool).await;
let ip_port = conn.split(':').collect::<Vec<&str>>();
let addr = ip_port[0];
let port = ip_port[1].parse::<u16>().unwrap();
let engine_process = web::Data::new(ProcessControl::new());
let auth_state = web::Data::new(AuthState {
uuids: Mutex::new(HashSet::new()),
});
let broadcast_data = Broadcaster::create();
info!("running ffplayout API, listen on http://{conn}");
// no 'allow origin' here, give it to the reverse proxy
HttpServer::new(move || {
let auth = HttpAuthentication::bearer(validator);
let db_pool = web::Data::new(pool.clone());
// Customize logging format to get IP though proxies.
let logger = Logger::new("%{r}a \"%r\" %s %b \"%{Referer}i\" \"%{User-Agent}i\" %T")
.exclude_regex(r"/_nuxt/*");
let mut web_app = App::new()
.app_data(db_pool)
.app_data(engine_process.clone())
.app_data(auth_state.clone())
.app_data(web::Data::from(Arc::clone(&broadcast_data)))
.wrap(logger)
.service(login)
.service(
web::scope("/api")
.wrap(auth.clone())
.service(add_user)
.service(get_user)
.service(get_by_name)
.service(get_users)
.service(remove_user)
.service(get_playout_config)
.service(update_playout_config)
.service(add_preset)
.service(get_presets)
.service(update_preset)
.service(delete_preset)
.service(get_channel)
.service(get_all_channels)
.service(patch_channel)
.service(add_channel)
.service(remove_channel)
.service(update_user)
.service(send_text_message)
.service(control_playout)
.service(media_current)
.service(media_next)
.service(media_last)
.service(process_control)
.service(get_playlist)
.service(save_playlist)
.service(gen_playlist)
.service(del_playlist)
.service(get_log)
.service(file_browser)
.service(add_dir)
.service(move_rename)
.service(remove)
.service(save_file)
.service(import_playlist)
.service(get_program)
.service(get_system_stat)
.service(generate_uuid),
)
.service(
web::scope("/data")
.service(validate_uuid)
.service(event_stream),
)
.service(get_file);
if let Some(public) = &ARGS.public {
// When public path is set as argument use this path for serving extra static files,
// is useful for HLS stream etc.
let absolute_path = if public.is_absolute() {
public.to_path_buf()
} else {
env::current_dir().unwrap_or_default().join(public)
}
.clean();
web_app = web_app.service(Files::new("/", absolute_path));
} else {
// When no public path is given as argument, use predefine keywords in path,
// like /live; /preview; /public, or HLS extensions to recognize file should get from public folder
web_app = web_app.service(get_public);
}
#[cfg(all(not(debug_assertions), feature = "embed_frontend"))]
{
// in release mode embed frontend
let generated = generate();
web_app =
web_app.service(ResourceFiles::new("/", generated).resolve_not_found_to_root());
}
#[cfg(any(debug_assertions, not(feature = "embed_frontend")))]
{
// in debug mode get frontend from path
web_app = web_app.service(Files::new("/", public_path()).index_file("index.html"));
}
web_app
})
.bind((addr, port))?
.run()
.await
} else {
error!("Run ffpapi with listen parameter!");
Ok(())
}
}

View File

View File

@ -0,0 +1,160 @@
use std::{sync::Arc, time::Duration};
use actix_web::{rt::time::interval, web};
use actix_web_lab::{
sse::{self, Sse},
util::InfallibleStream,
};
use ffplayout_lib::utils::PlayoutConfig;
use parking_lot::Mutex;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use crate::utils::{control::media_info, system};
#[derive(Debug, Clone)]
struct Client {
_channel: i32,
config: PlayoutConfig,
endpoint: String,
sender: mpsc::Sender<sse::Event>,
}
impl Client {
fn new(
_channel: i32,
config: PlayoutConfig,
endpoint: String,
sender: mpsc::Sender<sse::Event>,
) -> Self {
Self {
_channel,
config,
endpoint,
sender,
}
}
}
pub struct Broadcaster {
inner: Mutex<BroadcasterInner>,
}
#[derive(Debug, Clone, Default)]
struct BroadcasterInner {
clients: Vec<Client>,
}
impl Broadcaster {
/// Constructs new broadcaster and spawns ping loop.
pub fn create() -> Arc<Self> {
let this = Arc::new(Broadcaster {
inner: Mutex::new(BroadcasterInner::default()),
});
Broadcaster::spawn_ping(Arc::clone(&this));
this
}
/// Pings clients every 10 seconds to see if they are alive and remove them from the broadcast
/// list if not.
fn spawn_ping(this: Arc<Self>) {
actix_web::rt::spawn(async move {
let mut interval = interval(Duration::from_secs(1));
let mut counter = 0;
loop {
interval.tick().await;
if counter % 10 == 0 {
this.remove_stale_clients().await;
}
this.broadcast_playout().await;
this.broadcast_system().await;
counter = (counter + 1) % 61;
}
});
}
/// Removes all non-responsive clients from broadcast list.
async fn remove_stale_clients(&self) {
let clients = self.inner.lock().clients.clone();
let mut ok_clients = Vec::new();
for client in clients {
if client
.sender
.send(sse::Event::Comment("ping".into()))
.await
.is_ok()
{
ok_clients.push(client.clone());
}
}
self.inner.lock().clients = ok_clients;
}
/// Registers client with broadcaster, returning an SSE response body.
pub async fn new_client(
&self,
channel: i32,
config: PlayoutConfig,
endpoint: String,
) -> Sse<InfallibleStream<ReceiverStream<sse::Event>>> {
let (tx, rx) = mpsc::channel(10);
tx.send(sse::Data::new("connected").into()).await.unwrap();
self.inner
.lock()
.clients
.push(Client::new(channel, config, endpoint, tx));
Sse::from_infallible_receiver(rx)
}
/// Broadcasts playout status to clients.
pub async fn broadcast_playout(&self) {
let clients = self.inner.lock().clients.clone();
for client in clients.iter().filter(|client| client.endpoint == "playout") {
match media_info(&client.config, "current".into()).await {
Ok(res) => {
let _ = client
.sender
.send(
sse::Data::new(res.text().await.unwrap_or_else(|_| "Success".into()))
.into(),
)
.await;
}
Err(_) => {
let _ = client
.sender
.send(sse::Data::new("not running").into())
.await;
}
};
}
}
/// Broadcasts system status to clients.
pub async fn broadcast_system(&self) {
let clients = self.inner.lock().clients.clone();
for client in clients {
if &client.endpoint == "system" {
if let Ok(stat) = web::block(move || system::stat(client.config.clone())).await {
let stat_string = stat.to_string();
let _ = client.sender.send(sse::Data::new(stat_string).into()).await;
};
}
}
}
}

55
ffplayout/src/sse/mod.rs Normal file
View File

@ -0,0 +1,55 @@
use std::{
collections::HashSet,
time::{Duration, SystemTime},
};
use tokio::sync::Mutex;
use uuid::Uuid;
use crate::utils::errors::ServiceError;
pub mod broadcast;
pub mod routes;
#[derive(Debug, Eq, Hash, PartialEq, Clone, Copy)]
pub struct UuidData {
pub uuid: Uuid,
pub expiration: SystemTime,
}
impl UuidData {
pub fn new() -> Self {
Self {
uuid: Uuid::new_v4(),
expiration: SystemTime::now() + Duration::from_secs(2 * 3600), // 2 hours
}
}
}
impl Default for UuidData {
fn default() -> Self {
Self::new()
}
}
pub struct AuthState {
pub uuids: Mutex<HashSet<UuidData>>,
}
/// Remove all UUIDs from HashSet which are older the expiration time.
pub fn prune_uuids(uuids: &mut HashSet<UuidData>) {
uuids.retain(|entry| entry.expiration > SystemTime::now());
}
pub fn check_uuid(uuids: &mut HashSet<UuidData>, uuid: &str) -> Result<&'static str, ServiceError> {
let client_uuid = Uuid::parse_str(uuid)?;
prune_uuids(uuids);
match uuids.iter().find(|entry| entry.uuid == client_uuid) {
Some(_) => Ok("UUID is valid"),
None => Err(ServiceError::Unauthorized(
"Invalid or expired UUID".to_string(),
)),
}
}

View File

@ -0,0 +1,82 @@
use actix_web::{get, post, web, Responder};
use actix_web_grants::proc_macro::protect;
use serde::{Deserialize, Serialize};
use sqlx::{Pool, Sqlite};
use super::{check_uuid, prune_uuids, AuthState, UuidData};
use crate::sse::broadcast::Broadcaster;
use crate::utils::{errors::ServiceError, playout_config, Role};
#[derive(Deserialize, Serialize)]
struct User {
#[serde(default, skip_serializing)]
endpoint: String,
uuid: String,
}
impl User {
fn new(endpoint: String, uuid: String) -> Self {
Self { endpoint, uuid }
}
}
/// **Get generated UUID**
///
/// ```BASH
/// curl -X GET 'http://127.0.0.1:8787/api/generate-uuid' -H 'Authorization: Bearer <TOKEN>'
/// ```
#[post("/generate-uuid")]
#[protect(any("Role::Admin", "Role::User"), ty = "Role")]
async fn generate_uuid(data: web::Data<AuthState>) -> Result<impl Responder, ServiceError> {
let mut uuids = data.uuids.lock().await;
let new_uuid = UuidData::new();
let user_auth = User::new(String::new(), new_uuid.uuid.to_string());
prune_uuids(&mut uuids);
uuids.insert(new_uuid);
Ok(web::Json(user_auth))
}
/// **Validate UUID**
///
/// ```BASH
/// curl -X GET 'http://127.0.0.1:8787/data/validate?uuid=f2f8c29b-712a-48c5-8919-b535d3a05a3a'
/// ```
#[get("/validate")]
async fn validate_uuid(
data: web::Data<AuthState>,
user: web::Query<User>,
) -> Result<impl Responder, ServiceError> {
let mut uuids = data.uuids.lock().await;
match check_uuid(&mut uuids, user.uuid.as_str()) {
Ok(s) => Ok(web::Json(s)),
Err(e) => Err(e),
}
}
/// **Connect to event handler**
///
/// ```BASH
/// curl -X GET 'http://127.0.0.1:8787/data/event/1?endpoint=system&uuid=f2f8c29b-712a-48c5-8919-b535d3a05a3a'
/// ```
#[get("/event/{channel}")]
async fn event_stream(
pool: web::Data<Pool<Sqlite>>,
broadcaster: web::Data<Broadcaster>,
data: web::Data<AuthState>,
id: web::Path<i32>,
user: web::Query<User>,
) -> Result<impl Responder, ServiceError> {
let mut uuids = data.uuids.lock().await;
check_uuid(&mut uuids, user.uuid.as_str())?;
let (config, _) = playout_config(&pool.clone().into_inner(), &id).await?;
Ok(broadcaster
.new_client(*id, config, user.endpoint.clone())
.await)
}

View File

@ -0,0 +1,36 @@
use std::path::PathBuf;
use clap::Parser;
#[derive(Parser, Debug, Clone)]
#[clap(version,
about = "REST API for ffplayout",
long_about = None)]
pub struct Args {
#[clap(short, long, help = "ask for user credentials")]
pub ask: bool,
#[clap(long, help = "path to database file")]
pub db: Option<PathBuf>,
#[clap(long, help = "path to public files")]
pub public: Option<PathBuf>,
#[clap(short, long, help = "Listen on IP:PORT, like: 127.0.0.1:8787")]
pub listen: Option<String>,
#[clap(short, long, help = "Initialize Database")]
pub init: bool,
#[clap(short, long, help = "domain name for initialization")]
pub domain: Option<String>,
#[clap(short, long, help = "Create admin user")]
pub username: Option<String>,
#[clap(short, long, help = "Admin mail address")]
pub mail: Option<String>,
#[clap(short, long, help = "Admin password")]
pub password: Option<String>,
}

View File

@ -0,0 +1,70 @@
use std::{fs, path::PathBuf};
use rand::prelude::*;
use simplelog::*;
use sqlx::{Pool, Sqlite};
use crate::utils::{
control::{control_service, ServiceCmd},
errors::ServiceError,
};
use ffplayout_lib::utils::PlayoutConfig;
use crate::db::{handles, models::Channel};
use crate::utils::playout_config;
pub async fn create_channel(
conn: &Pool<Sqlite>,
target_channel: Channel,
) -> Result<Channel, ServiceError> {
if !target_channel.config_path.starts_with("/etc/ffplayout") {
return Err(ServiceError::BadRequest("Bad config path!".to_string()));
}
let channel_name = target_channel.name.to_lowercase().replace(' ', "");
let channel_num = match handles::select_last_channel(conn).await {
Ok(num) => num + 1,
Err(_) => rand::thread_rng().gen_range(71..99),
};
let mut config = PlayoutConfig::new(
Some(PathBuf::from("/usr/share/ffplayout/ffplayout.toml.orig")),
None,
);
config.general.stat_file = format!(".ffp_{channel_name}",);
config.logging.path = config.logging.path.join(&channel_name);
config.rpc_server.address = format!("127.0.0.1:70{:7>2}", channel_num);
config.playlist.path = config.playlist.path.join(channel_name);
config.out.output_param = config
.out
.output_param
.replace("stream.m3u8", &format!("stream{channel_num}.m3u8"))
.replace("stream-%d.ts", &format!("stream{channel_num}-%d.ts"));
let toml_string = toml_edit::ser::to_string(&config)?;
fs::write(&target_channel.config_path, toml_string)?;
let new_channel = handles::insert_channel(conn, target_channel).await?;
control_service(conn, &config, new_channel.id, &ServiceCmd::Enable, None).await?;
Ok(new_channel)
}
pub async fn delete_channel(conn: &Pool<Sqlite>, id: i32) -> Result<(), ServiceError> {
let channel = handles::select_channel(conn, &id).await?;
let (config, _) = playout_config(conn, &id).await?;
control_service(conn, &config, channel.id, &ServiceCmd::Stop, None).await?;
control_service(conn, &config, channel.id, &ServiceCmd::Disable, None).await?;
if let Err(e) = fs::remove_file(channel.config_path) {
error!("{e}");
};
handles::delete_channel(conn, &id).await?;
Ok(())
}

View File

@ -0,0 +1,345 @@
use std::{
collections::HashMap,
env, fmt,
str::FromStr,
sync::atomic::{AtomicBool, Ordering},
};
use actix_web::web;
use reqwest::{header::AUTHORIZATION, Client, Response};
use serde::{Deserialize, Serialize};
use sqlx::{Pool, Sqlite};
use tokio::{
process::{Child, Command},
sync::Mutex,
};
use crate::db::handles::select_channel;
use crate::utils::errors::ServiceError;
use ffplayout_lib::{utils::PlayoutConfig, vec_strings};
#[derive(Debug, Deserialize, Serialize, Clone)]
struct TextParams {
control: String,
message: HashMap<String, String>,
}
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct ControlParams {
pub control: String,
}
#[derive(Debug, Deserialize, Serialize, Clone)]
struct MediaParams {
media: String,
}
/// ffplayout engine process
///
/// When running not on Linux, or with environment variable `PIGGYBACK_MODE=true`,
/// the engine get startet and controlled from ffpapi
pub struct ProcessControl {
pub engine_child: Mutex<Option<Child>>,
pub is_running: AtomicBool,
pub piggyback: AtomicBool,
}
impl ProcessControl {
pub fn new() -> Self {
let piggyback = if env::consts::OS != "linux" || env::var("PIGGYBACK_MODE").is_ok() {
AtomicBool::new(true)
} else {
AtomicBool::new(false)
};
Self {
engine_child: Mutex::new(None),
is_running: AtomicBool::new(false),
piggyback,
}
}
}
impl ProcessControl {
pub async fn start(&self) -> Result<String, ServiceError> {
#[cfg(not(debug_assertions))]
let engine_path = "ffplayout";
#[cfg(debug_assertions)]
let engine_path = "./target/debug/ffplayout";
match Command::new(engine_path).kill_on_drop(true).spawn() {
Ok(proc) => *self.engine_child.lock().await = Some(proc),
Err(_) => return Err(ServiceError::InternalServerError),
};
self.is_running.store(true, Ordering::SeqCst);
Ok("Success".to_string())
}
pub async fn stop(&self) -> Result<String, ServiceError> {
if let Some(proc) = self.engine_child.lock().await.as_mut() {
if proc.kill().await.is_err() {
return Err(ServiceError::InternalServerError);
};
}
self.wait().await?;
self.is_running.store(false, Ordering::SeqCst);
Ok("Success".to_string())
}
pub async fn restart(&self) -> Result<String, ServiceError> {
self.stop().await?;
self.start().await?;
self.is_running.store(true, Ordering::SeqCst);
Ok("Success".to_string())
}
/// Wait for process to proper close.
/// This prevents orphaned/zombi processes in system
pub async fn wait(&self) -> Result<String, ServiceError> {
if let Some(proc) = self.engine_child.lock().await.as_mut() {
if proc.wait().await.is_err() {
return Err(ServiceError::InternalServerError);
};
}
Ok("Success".to_string())
}
pub fn status(&self) -> Result<String, ServiceError> {
if self.is_running.load(Ordering::SeqCst) {
Ok("active".to_string())
} else {
Ok("not running".to_string())
}
}
}
impl Default for ProcessControl {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq)]
#[serde(rename_all = "snake_case")]
pub enum ServiceCmd {
Enable,
Disable,
Start,
Stop,
Restart,
Status,
}
impl FromStr for ServiceCmd {
type Err = String;
fn from_str(input: &str) -> Result<Self, Self::Err> {
match input.to_lowercase().as_str() {
"enable" => Ok(Self::Enable),
"disable" => Ok(Self::Disable),
"start" => Ok(Self::Start),
"stop" => Ok(Self::Stop),
"restart" => Ok(Self::Restart),
"status" => Ok(Self::Status),
_ => Err(format!("Command '{input}' not found!")),
}
}
}
impl fmt::Display for ServiceCmd {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
Self::Enable => write!(f, "enable"),
Self::Disable => write!(f, "disable"),
Self::Start => write!(f, "start"),
Self::Stop => write!(f, "stop"),
Self::Restart => write!(f, "restart"),
Self::Status => write!(f, "status"),
}
}
}
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct Process {
pub command: ServiceCmd,
}
struct SystemD {
service: String,
cmd: Vec<String>,
}
impl SystemD {
async fn new(conn: &Pool<Sqlite>, id: i32) -> Result<Self, ServiceError> {
let _channel = select_channel(conn, &id).await?;
Ok(Self {
service: String::new(), // TODO: ...
cmd: vec_strings!["/usr/bin/systemctl"],
})
}
fn enable(mut self) -> Result<String, ServiceError> {
self.cmd
.append(&mut vec!["enable".to_string(), self.service]);
Command::new("sudo").args(self.cmd).spawn()?;
Ok("Success".to_string())
}
fn disable(mut self) -> Result<String, ServiceError> {
self.cmd
.append(&mut vec!["disable".to_string(), self.service]);
Command::new("sudo").args(self.cmd).spawn()?;
Ok("Success".to_string())
}
fn start(mut self) -> Result<String, ServiceError> {
self.cmd
.append(&mut vec!["start".to_string(), self.service]);
Command::new("sudo").args(self.cmd).spawn()?;
Ok("Success".to_string())
}
fn stop(mut self) -> Result<String, ServiceError> {
self.cmd.append(&mut vec!["stop".to_string(), self.service]);
Command::new("sudo").args(self.cmd).spawn()?;
Ok("Success".to_string())
}
fn restart(mut self) -> Result<String, ServiceError> {
self.cmd
.append(&mut vec!["restart".to_string(), self.service]);
Command::new("sudo").args(self.cmd).spawn()?;
Ok("Success".to_string())
}
async fn status(mut self) -> Result<String, ServiceError> {
self.cmd
.append(&mut vec!["is-active".to_string(), self.service]);
let output = Command::new("sudo").args(self.cmd).output().await?;
Ok(String::from_utf8_lossy(&output.stdout).trim().to_string())
}
}
async fn post_request<T>(config: &PlayoutConfig, obj: T) -> Result<Response, ServiceError>
where
T: Serialize,
{
let url = format!("http://{}", config.rpc_server.address);
let client = Client::new();
match client
.post(&url)
.header(AUTHORIZATION, &config.rpc_server.authorization)
.json(&obj)
.send()
.await
{
Ok(result) => Ok(result),
Err(e) => Err(ServiceError::ServiceUnavailable(e.to_string())),
}
}
pub async fn send_message(
config: &PlayoutConfig,
message: HashMap<String, String>,
) -> Result<Response, ServiceError> {
let json_obj = TextParams {
control: "text".into(),
message,
};
post_request(config, json_obj).await
}
pub async fn control_state(
config: &PlayoutConfig,
command: &str,
) -> Result<Response, ServiceError> {
let json_obj = ControlParams {
control: command.to_owned(),
};
post_request(config, json_obj).await
}
pub async fn media_info(config: &PlayoutConfig, command: String) -> Result<Response, ServiceError> {
let json_obj = MediaParams { media: command };
post_request(config, json_obj).await
}
pub async fn control_service(
conn: &Pool<Sqlite>,
config: &PlayoutConfig,
id: i32,
command: &ServiceCmd,
engine: Option<web::Data<ProcessControl>>,
) -> Result<String, ServiceError> {
if let Some(en) = engine {
if en.piggyback.load(Ordering::SeqCst) {
match command {
ServiceCmd::Start => en.start().await,
ServiceCmd::Stop => {
if control_state(config, "stop_all").await.is_ok() {
en.stop().await
} else {
Err(ServiceError::NoContent("Nothing to stop".to_string()))
}
}
ServiceCmd::Restart => {
if control_state(config, "stop_all").await.is_ok() {
en.restart().await
} else {
Err(ServiceError::NoContent("Nothing to restart".to_string()))
}
}
ServiceCmd::Status => en.status(),
_ => Err(ServiceError::Conflict(
"Engine runs in piggyback mode, in this mode this command is not allowed."
.to_string(),
)),
}
} else {
execute_systemd(conn, id, command).await
}
} else {
execute_systemd(conn, id, command).await
}
}
async fn execute_systemd(
conn: &Pool<Sqlite>,
id: i32,
command: &ServiceCmd,
) -> Result<String, ServiceError> {
let system_d = SystemD::new(conn, id).await?;
match command {
ServiceCmd::Enable => system_d.enable(),
ServiceCmd::Disable => system_d.disable(),
ServiceCmd::Start => system_d.start(),
ServiceCmd::Stop => system_d.stop(),
ServiceCmd::Restart => system_d.restart(),
ServiceCmd::Status => system_d.status().await,
}
}

View File

@ -0,0 +1,105 @@
use actix_web::{error::ResponseError, Error, HttpResponse};
use derive_more::Display;
#[derive(Debug, Display)]
pub enum ServiceError {
#[display(fmt = "Internal Server Error")]
InternalServerError,
#[display(fmt = "BadRequest: {_0}")]
BadRequest(String),
#[display(fmt = "Conflict: {_0}")]
Conflict(String),
#[display(fmt = "Forbidden: {_0}")]
Forbidden(String),
#[display(fmt = "Unauthorized: {_0}")]
Unauthorized(String),
#[display(fmt = "NoContent: {_0}")]
NoContent(String),
#[display(fmt = "ServiceUnavailable: {_0}")]
ServiceUnavailable(String),
}
// impl ResponseError trait allows to convert our errors into http responses with appropriate data
impl ResponseError for ServiceError {
fn error_response(&self) -> HttpResponse {
match self {
ServiceError::InternalServerError => {
HttpResponse::InternalServerError().json("Internal Server Error. Please try later.")
}
ServiceError::BadRequest(ref message) => HttpResponse::BadRequest().json(message),
ServiceError::Conflict(ref message) => HttpResponse::Conflict().json(message),
ServiceError::Forbidden(ref message) => HttpResponse::Forbidden().json(message),
ServiceError::Unauthorized(ref message) => HttpResponse::Unauthorized().json(message),
ServiceError::NoContent(ref message) => HttpResponse::NoContent().json(message),
ServiceError::ServiceUnavailable(ref message) => {
HttpResponse::ServiceUnavailable().json(message)
}
}
}
}
impl From<String> for ServiceError {
fn from(err: String) -> ServiceError {
ServiceError::BadRequest(err)
}
}
impl From<Error> for ServiceError {
fn from(err: Error) -> ServiceError {
ServiceError::BadRequest(err.to_string())
}
}
impl From<actix_multipart::MultipartError> for ServiceError {
fn from(err: actix_multipart::MultipartError) -> ServiceError {
ServiceError::BadRequest(err.to_string())
}
}
impl From<std::io::Error> for ServiceError {
fn from(err: std::io::Error) -> ServiceError {
ServiceError::NoContent(err.to_string())
}
}
impl From<std::num::ParseIntError> for ServiceError {
fn from(err: std::num::ParseIntError) -> ServiceError {
ServiceError::BadRequest(err.to_string())
}
}
impl From<actix_web::error::BlockingError> for ServiceError {
fn from(err: actix_web::error::BlockingError) -> ServiceError {
ServiceError::BadRequest(err.to_string())
}
}
impl From<sqlx::Error> for ServiceError {
fn from(err: sqlx::Error) -> ServiceError {
ServiceError::BadRequest(err.to_string())
}
}
impl From<tokio::task::JoinError> for ServiceError {
fn from(err: tokio::task::JoinError) -> ServiceError {
ServiceError::BadRequest(err.to_string())
}
}
impl From<toml_edit::ser::Error> for ServiceError {
fn from(err: toml_edit::ser::Error) -> ServiceError {
ServiceError::BadRequest(err.to_string())
}
}
impl From<uuid::Error> for ServiceError {
fn from(err: uuid::Error) -> ServiceError {
ServiceError::BadRequest(err.to_string())
}
}

View File

@ -0,0 +1,456 @@
use std::{
io::Write,
path::{Path, PathBuf},
};
use actix_multipart::Multipart;
use actix_web::{web, HttpResponse};
use futures_util::TryStreamExt as _;
use lazy_static::lazy_static;
use lexical_sort::{natural_lexical_cmp, PathSort};
use rand::{distributions::Alphanumeric, Rng};
use relative_path::RelativePath;
use serde::{Deserialize, Serialize};
use sqlx::{Pool, Sqlite};
use tokio::fs;
use simplelog::*;
use crate::utils::{errors::ServiceError, playout_config};
use ffplayout_lib::utils::{file_extension, MediaProbe};
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct PathObject {
pub source: String,
parent: Option<String>,
parent_folders: Option<Vec<String>>,
folders: Option<Vec<String>>,
files: Option<Vec<VideoFile>>,
#[serde(default)]
pub folders_only: bool,
}
impl PathObject {
fn new(source: String, parent: Option<String>) -> Self {
Self {
source,
parent,
parent_folders: Some(vec![]),
folders: Some(vec![]),
files: Some(vec![]),
folders_only: false,
}
}
}
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct MoveObject {
source: String,
target: String,
}
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct VideoFile {
name: String,
duration: f64,
}
lazy_static! {
pub static ref HOME_DIR: String = home::home_dir()
.unwrap_or("/home/h1wl3n2og".into()) // any random not existing folder
.as_os_str()
.to_string_lossy()
.to_string();
}
const FOLDER_WHITELIST: &[&str; 6] = &[
"/media",
"/mnt",
"/playlists",
"/tv-media",
"/usr/share/ffplayout",
"/var/lib/ffplayout",
];
/// Normalize absolut path
///
/// This function takes care, that it is not possible to break out from root_path.
pub fn norm_abs_path(
root_path: &Path,
input_path: &str,
) -> Result<(PathBuf, String, String), ServiceError> {
let path_relative = RelativePath::new(&root_path.to_string_lossy())
.normalize()
.to_string()
.replace("../", "");
let mut source_relative = RelativePath::new(input_path)
.normalize()
.to_string()
.replace("../", "");
let path_suffix = root_path
.file_name()
.unwrap_or_default()
.to_string_lossy()
.to_string();
if input_path.starts_with(&*root_path.to_string_lossy())
|| source_relative.starts_with(&path_relative)
{
source_relative = source_relative
.strip_prefix(&path_relative)
.and_then(|s| s.strip_prefix('/'))
.unwrap_or_default()
.to_string();
} else {
source_relative = source_relative
.strip_prefix(&path_suffix)
.and_then(|s| s.strip_prefix('/'))
.unwrap_or(&source_relative)
.to_string();
}
let path = &root_path.join(&source_relative);
if !FOLDER_WHITELIST.iter().any(|f| path.starts_with(f))
&& !path.starts_with(HOME_DIR.to_string())
{
return Err(ServiceError::Forbidden(
"Access forbidden: Folder cannot be opened.".to_string(),
));
}
Ok((path.to_path_buf(), path_suffix, source_relative))
}
/// File Browser
///
/// Take input path and give file and folder list from it back.
/// Input should be a relative path segment, but when it is a absolut path, the norm_abs_path function
/// will take care, that user can not break out from given storage path in config.
pub async fn browser(
conn: &Pool<Sqlite>,
id: i32,
path_obj: &PathObject,
) -> Result<PathObject, ServiceError> {
let (config, channel) = playout_config(conn, &id).await?;
let mut channel_extensions = channel
.extra_extensions
.split(',')
.map(|e| e.to_string())
.collect::<Vec<String>>();
let mut parent_folders = vec![];
let mut extensions = config.storage.extensions;
extensions.append(&mut channel_extensions);
let (path, parent, path_component) = norm_abs_path(&config.storage.path, &path_obj.source)?;
let parent_path = if !path_component.is_empty() {
path.parent().unwrap()
} else {
&config.storage.path
};
let mut obj = PathObject::new(path_component, Some(parent));
obj.folders_only = path_obj.folders_only;
if path != parent_path && !path_obj.folders_only {
let mut parents = fs::read_dir(&parent_path).await?;
while let Some(child) = parents.next_entry().await? {
if child.metadata().await?.is_dir() {
parent_folders.push(
child
.path()
.file_name()
.unwrap()
.to_string_lossy()
.to_string(),
);
}
}
parent_folders.path_sort(natural_lexical_cmp);
obj.parent_folders = Some(parent_folders);
}
let mut paths_obj = fs::read_dir(path).await?;
let mut files = vec![];
let mut folders = vec![];
while let Some(child) = paths_obj.next_entry().await? {
let f_meta = child.metadata().await?;
// ignore hidden files/folders on unix
if child.path().to_string_lossy().to_string().contains("/.") {
continue;
}
if f_meta.is_dir() {
folders.push(
child
.path()
.file_name()
.unwrap()
.to_string_lossy()
.to_string(),
);
} else if f_meta.is_file() && !path_obj.folders_only {
if let Some(ext) = file_extension(&child.path()) {
if extensions.contains(&ext.to_string().to_lowercase()) {
files.push(child.path())
}
}
}
}
folders.path_sort(natural_lexical_cmp);
files.path_sort(natural_lexical_cmp);
let mut media_files = vec![];
for file in files {
match MediaProbe::new(file.to_string_lossy().as_ref()) {
Ok(probe) => {
let mut duration = 0.0;
if let Some(dur) = probe.format.duration {
duration = dur.parse().unwrap_or_default()
}
let video = VideoFile {
name: file.file_name().unwrap().to_string_lossy().to_string(),
duration,
};
media_files.push(video);
}
Err(e) => error!("{e:?}"),
};
}
obj.folders = Some(folders);
obj.files = Some(media_files);
Ok(obj)
}
pub async fn create_directory(
conn: &Pool<Sqlite>,
id: i32,
path_obj: &PathObject,
) -> Result<HttpResponse, ServiceError> {
let (config, _) = playout_config(conn, &id).await?;
let (path, _, _) = norm_abs_path(&config.storage.path, &path_obj.source)?;
if let Err(e) = fs::create_dir_all(&path).await {
return Err(ServiceError::BadRequest(e.to_string()));
}
info!(
"create folder: <b><magenta>{}</></b>",
path.to_string_lossy()
);
Ok(HttpResponse::Ok().into())
}
async fn copy_and_delete(source: &PathBuf, target: &PathBuf) -> Result<MoveObject, ServiceError> {
match fs::copy(&source, &target).await {
Ok(_) => {
if let Err(e) = fs::remove_file(source).await {
error!("{e}");
return Err(ServiceError::BadRequest(
"Removing File not possible!".into(),
));
};
return Ok(MoveObject {
source: source
.file_name()
.unwrap_or_default()
.to_string_lossy()
.to_string(),
target: target
.file_name()
.unwrap_or_default()
.to_string_lossy()
.to_string(),
});
}
Err(e) => {
error!("{e}");
Err(ServiceError::BadRequest("Error in file copy!".into()))
}
}
}
async fn rename(source: &PathBuf, target: &PathBuf) -> Result<MoveObject, ServiceError> {
match fs::rename(source, target).await {
Ok(_) => Ok(MoveObject {
source: source
.file_name()
.unwrap_or_default()
.to_string_lossy()
.to_string(),
target: target
.file_name()
.unwrap_or_default()
.to_string_lossy()
.to_string(),
}),
Err(e) => {
error!("{e}");
copy_and_delete(source, target).await
}
}
}
pub async fn rename_file(
conn: &Pool<Sqlite>,
id: i32,
move_object: &MoveObject,
) -> Result<MoveObject, ServiceError> {
let (config, _) = playout_config(conn, &id).await?;
let (source_path, _, _) = norm_abs_path(&config.storage.path, &move_object.source)?;
let (mut target_path, _, _) = norm_abs_path(&config.storage.path, &move_object.target)?;
if !source_path.exists() {
return Err(ServiceError::BadRequest("Source file not exist!".into()));
}
if (source_path.is_dir() || source_path.is_file()) && source_path.parent() == Some(&target_path)
{
return rename(&source_path, &target_path).await;
}
if target_path.is_dir() {
target_path = target_path.join(source_path.file_name().unwrap());
}
if target_path.is_file() {
return Err(ServiceError::BadRequest(
"Target file already exists!".into(),
));
}
if source_path.is_file() && target_path.parent().is_some() {
return rename(&source_path, &target_path).await;
}
Err(ServiceError::InternalServerError)
}
pub async fn remove_file_or_folder(
conn: &Pool<Sqlite>,
id: i32,
source_path: &str,
) -> Result<(), ServiceError> {
let (config, _) = playout_config(conn, &id).await?;
let (source, _, _) = norm_abs_path(&config.storage.path, source_path)?;
if !source.exists() {
return Err(ServiceError::BadRequest("Source does not exists!".into()));
}
if source.is_dir() {
match fs::remove_dir(source).await {
Ok(_) => return Ok(()),
Err(e) => {
error!("{e}");
return Err(ServiceError::BadRequest(
"Delete folder failed! (Folder must be empty)".into(),
));
}
};
}
if source.is_file() {
match fs::remove_file(source).await {
Ok(_) => return Ok(()),
Err(e) => {
error!("{e}");
return Err(ServiceError::BadRequest("Delete file failed!".into()));
}
};
}
Err(ServiceError::InternalServerError)
}
async fn valid_path(conn: &Pool<Sqlite>, id: i32, path: &str) -> Result<PathBuf, ServiceError> {
let (config, _) = playout_config(conn, &id).await?;
let (test_path, _, _) = norm_abs_path(&config.storage.path, path)?;
if !test_path.is_dir() {
return Err(ServiceError::BadRequest("Target folder not exists!".into()));
}
Ok(test_path)
}
pub async fn upload(
conn: &Pool<Sqlite>,
id: i32,
_size: u64,
mut payload: Multipart,
path: &Path,
abs_path: bool,
) -> Result<HttpResponse, ServiceError> {
while let Some(mut field) = payload.try_next().await? {
let content_disposition = field.content_disposition();
debug!("{content_disposition}");
let rand_string: String = rand::thread_rng()
.sample_iter(&Alphanumeric)
.take(20)
.map(char::from)
.collect();
let filename = content_disposition
.get_filename()
.map_or_else(|| rand_string.to_string(), sanitize_filename::sanitize);
let filepath = if abs_path {
path.to_path_buf()
} else {
valid_path(conn, id, &path.to_string_lossy())
.await?
.join(filename)
};
let filepath_clone = filepath.clone();
let _file_size = match filepath.metadata() {
Ok(metadata) => metadata.len(),
Err(_) => 0,
};
// INFO: File exist check should be enough because file size and content length are different.
// The error catching in the loop should normally prevent unfinished files from existing on disk.
// If this is not enough, a second check can be implemented: is_close(file_size as i64, size as i64, 1000)
if filepath.is_file() {
return Err(ServiceError::Conflict("Target already exists!".into()));
}
let mut f = web::block(|| std::fs::File::create(filepath_clone)).await??;
loop {
match field.try_next().await {
Ok(Some(chunk)) => {
f = web::block(move || f.write_all(&chunk).map(|_| f)).await??;
}
Ok(None) => break,
Err(e) => {
if e.to_string().contains("stream is incomplete") {
info!("Delete non finished file: {filepath:?}");
tokio::fs::remove_file(filepath).await?
}
return Err(e.into());
}
}
}
}
Ok(HttpResponse::Ok().into())
}

390
ffplayout/src/utils/mod.rs Normal file
View File

@ -0,0 +1,390 @@
use std::{
env,
error::Error,
fmt,
fs::{self, metadata, File},
io::{stdin, stdout, Read, Write},
path::{Path, PathBuf},
str::FromStr,
};
use chrono::{format::ParseErrorKind, prelude::*};
use faccess::PathExt;
use once_cell::sync::OnceCell;
use path_clean::PathClean;
use rpassword::read_password;
use serde::{de, Deserialize, Deserializer, Serialize};
use simplelog::*;
use sqlx::{sqlite::SqliteRow, FromRow, Pool, Row, Sqlite};
use crate::ARGS;
pub mod args_parse;
pub mod channels;
pub mod control;
pub mod errors;
pub mod files;
pub mod playlist;
pub mod system;
use crate::db::{
db_pool,
handles::{db_init, insert_user, select_channel, select_global},
models::{Channel, User},
};
use crate::utils::errors::ServiceError;
use ffplayout_lib::utils::{time_to_sec, PlayoutConfig};
#[derive(Clone, Debug, Eq, Hash, PartialEq, Serialize, Deserialize)]
pub enum Role {
Admin,
User,
Guest,
}
impl Role {
pub fn set_role(role: &str) -> Self {
match role {
"admin" => Role::Admin,
"user" => Role::User,
_ => Role::Guest,
}
}
}
impl FromStr for Role {
type Err = String;
fn from_str(input: &str) -> Result<Self, Self::Err> {
match input {
"admin" => Ok(Self::Admin),
"user" => Ok(Self::User),
_ => Ok(Self::Guest),
}
}
}
impl fmt::Display for Role {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
Self::Admin => write!(f, "admin"),
Self::User => write!(f, "user"),
Self::Guest => write!(f, "guest"),
}
}
}
impl<'r> sqlx::decode::Decode<'r, ::sqlx::Sqlite> for Role
where
&'r str: sqlx::decode::Decode<'r, sqlx::Sqlite>,
{
fn decode(
value: <sqlx::Sqlite as sqlx::database::HasValueRef<'r>>::ValueRef,
) -> Result<Role, Box<dyn Error + 'static + Send + Sync>> {
let value = <&str as sqlx::decode::Decode<sqlx::Sqlite>>::decode(value)?;
Ok(value.parse()?)
}
}
impl FromRow<'_, SqliteRow> for Role {
fn from_row(row: &SqliteRow) -> sqlx::Result<Self> {
match row.get("name") {
"admin" => Ok(Self::Admin),
"user" => Ok(Self::User),
_ => Ok(Self::Guest),
}
}
}
#[derive(Debug, sqlx::FromRow)]
pub struct GlobalSettings {
pub secret: String,
}
impl GlobalSettings {
async fn new(conn: &Pool<Sqlite>) -> Self {
let global_settings = select_global(conn);
match global_settings.await {
Ok(g) => g,
Err(_) => GlobalSettings {
secret: String::new(),
},
}
}
pub fn global() -> &'static GlobalSettings {
INSTANCE.get().expect("Config is not initialized")
}
}
static INSTANCE: OnceCell<GlobalSettings> = OnceCell::new();
pub async fn init_config(conn: &Pool<Sqlite>) {
let config = GlobalSettings::new(conn).await;
INSTANCE.set(config).unwrap();
}
pub fn db_path() -> Result<&'static str, Box<dyn std::error::Error>> {
if let Some(path) = ARGS.db.clone() {
let absolute_path = if path.is_absolute() {
path
} else {
env::current_dir()?.join(path)
}
.clean();
if let Some(abs_path) = absolute_path.parent() {
if abs_path.writable() {
return Ok(Box::leak(
absolute_path.to_string_lossy().to_string().into_boxed_str(),
));
}
error!("Given database path is not writable!");
}
}
let sys_path = Path::new("/usr/share/ffplayout/db");
let mut db_path = "./ffplayout.db";
if sys_path.is_dir() && !sys_path.writable() {
error!("Path {} is not writable!", sys_path.display());
}
if sys_path.is_dir() && sys_path.writable() {
db_path = "/usr/share/ffplayout/db/ffplayout.db";
} else if Path::new("./assets").is_dir() {
db_path = "./assets/ffplayout.db";
}
Ok(db_path)
}
pub fn public_path() -> PathBuf {
let path = PathBuf::from("./ffplayout-frontend/.output/public/");
if cfg!(debug_assertions) && path.is_dir() {
return path;
}
let path = PathBuf::from("/usr/share/ffplayout/public/");
if path.is_dir() {
return path;
}
PathBuf::from("./public/")
}
pub async fn run_args() -> Result<(), i32> {
let mut args = ARGS.clone();
if !args.init && args.listen.is_none() && !args.ask && args.username.is_none() {
error!("Wrong number of arguments! Run ffpapi --help for more information.");
return Err(0);
}
if args.init {
if let Err(e) = db_init(args.domain).await {
panic!("{e}");
};
return Err(0);
}
if args.ask {
let mut user = String::new();
print!("Username: ");
stdout().flush().unwrap();
stdin()
.read_line(&mut user)
.expect("Did not enter a correct name?");
if let Some('\n') = user.chars().next_back() {
user.pop();
}
if let Some('\r') = user.chars().next_back() {
user.pop();
}
args.username = Some(user);
print!("Password: ");
stdout().flush().unwrap();
let password = read_password();
args.password = password.ok();
let mut mail = String::new();
print!("Mail: ");
stdout().flush().unwrap();
stdin()
.read_line(&mut mail)
.expect("Did not enter a correct name?");
if let Some('\n') = mail.chars().next_back() {
mail.pop();
}
if let Some('\r') = mail.chars().next_back() {
mail.pop();
}
args.mail = Some(mail);
}
if let Some(username) = args.username {
if args.mail.is_none() || args.password.is_none() {
error!("Mail/password missing!");
return Err(1);
}
let user = User {
id: 0,
mail: Some(args.mail.unwrap()),
username: username.clone(),
password: args.password.unwrap(),
role_id: Some(1),
channel_id: Some(1),
token: None,
};
match db_pool().await {
Ok(conn) => {
if let Err(e) = insert_user(&conn, user).await {
error!("{e}");
return Err(1);
};
}
Err(e) => {
error!("{e}");
return Err(1);
}
};
info!("Create admin user \"{username}\" done...");
return Err(0);
}
Ok(())
}
pub fn read_playout_config(path: &str) -> Result<PlayoutConfig, Box<dyn Error>> {
let mut file = File::open(path)?;
let mut contents = String::new();
file.read_to_string(&mut contents)?;
let mut config: PlayoutConfig = toml_edit::de::from_str(&contents)?;
config.playlist.start_sec = Some(time_to_sec(&config.playlist.day_start));
config.playlist.length_sec = Some(time_to_sec(&config.playlist.length));
Ok(config)
}
pub async fn playout_config(
conn: &Pool<Sqlite>,
channel_id: &i32,
) -> Result<(PlayoutConfig, Channel), ServiceError> {
if let Ok(channel) = select_channel(conn, channel_id).await {
match read_playout_config(&channel.config_path.clone()) {
Ok(config) => return Ok((config, channel)),
Err(e) => error!("{e}"),
}
}
Err(ServiceError::BadRequest(
"Error in getting config!".to_string(),
))
}
pub async fn read_log_file(
conn: &Pool<Sqlite>,
channel_id: &i32,
date: &str,
) -> Result<String, ServiceError> {
if let Ok(channel) = select_channel(conn, channel_id).await {
let mut date_str = "".to_string();
if !date.is_empty() {
date_str.push('.');
date_str.push_str(date);
}
if let Ok(config) = read_playout_config(&channel.config_path) {
let mut log_path = Path::new(&config.logging.path)
.join("ffplayout.log")
.display()
.to_string();
log_path.push_str(&date_str);
let file_size = metadata(&log_path)?.len() as f64;
let file_content = if file_size > 5000000.0 {
error!("Log file to big: {}", sizeof_fmt(file_size));
format!("The log file is larger ({}) than the hard limit of 5MB, the probability is very high that something is wrong with the playout. Check this on the server with `less {log_path}`.", sizeof_fmt(file_size))
} else {
fs::read_to_string(log_path)?
};
return Ok(file_content);
}
}
Err(ServiceError::NoContent(
"Requested log file not exists, or not readable.".to_string(),
))
}
/// get human readable file size
pub fn sizeof_fmt(mut num: f64) -> String {
let suffix = 'B';
for unit in ["", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi"] {
if num.abs() < 1024.0 {
return format!("{num:.1}{unit}{suffix}");
}
num /= 1024.0;
}
format!("{num:.1}Yi{suffix}")
}
pub fn local_utc_offset() -> i32 {
let mut offset = Local::now().format("%:z").to_string();
let operator = offset.remove(0);
let mut utc_offset = 0;
if let Some((r, f)) = offset.split_once(':') {
utc_offset = r.parse::<i32>().unwrap_or(0) * 60 + f.parse::<i32>().unwrap_or(0);
if operator == '-' && utc_offset > 0 {
utc_offset = -utc_offset;
}
}
utc_offset
}
pub fn naive_date_time_from_str<'de, D>(deserializer: D) -> Result<NaiveDateTime, D::Error>
where
D: Deserializer<'de>,
{
let s: String = Deserialize::deserialize(deserializer)?;
match NaiveDateTime::parse_from_str(&s, "%Y-%m-%dT%H:%M:%S") {
Ok(date_time) => Ok(date_time),
Err(e) => {
if e.kind() == ParseErrorKind::TooShort {
NaiveDateTime::parse_from_str(&format!("{s}T00:00:00"), "%Y-%m-%dT%H:%M:%S")
.map_err(de::Error::custom)
} else {
NaiveDateTime::parse_from_str(&s, "%Y-%m-%dT%H:%M:%S%#z").map_err(de::Error::custom)
}
}
}
}

View File

@ -0,0 +1,152 @@
use std::{fs, path::PathBuf};
use simplelog::*;
use sqlx::{Pool, Sqlite};
use crate::utils::{errors::ServiceError, files::norm_abs_path, playout_config};
use ffplayout_lib::utils::{
generate_playlist as playlist_generator, json_reader, json_writer, JsonPlaylist, PlayoutConfig,
};
pub async fn read_playlist(
conn: &Pool<Sqlite>,
id: i32,
date: String,
) -> Result<JsonPlaylist, ServiceError> {
let (config, _) = playout_config(conn, &id).await?;
let (path, _, _) = norm_abs_path(&config.playlist.path, "")?;
let mut playlist_path = path;
let d: Vec<&str> = date.split('-').collect();
playlist_path = playlist_path
.join(d[0])
.join(d[1])
.join(date.clone())
.with_extension("json");
match json_reader(&playlist_path) {
Ok(p) => Ok(p),
Err(e) => Err(ServiceError::NoContent(e.to_string())),
}
}
pub async fn write_playlist(
conn: &Pool<Sqlite>,
id: i32,
json_data: JsonPlaylist,
) -> Result<String, ServiceError> {
let (config, _) = playout_config(conn, &id).await?;
let date = json_data.date.clone();
let mut playlist_path = config.playlist.path;
let d: Vec<&str> = date.split('-').collect();
if !playlist_path
.extension()
.and_then(|ext| ext.to_str())
.map(|ext| ext.eq_ignore_ascii_case("json"))
.unwrap_or(false)
{
playlist_path = playlist_path
.join(d[0])
.join(d[1])
.join(date.clone())
.with_extension("json");
}
let mut file_exists = false;
if let Some(p) = playlist_path.parent() {
fs::create_dir_all(p)?;
}
if playlist_path.is_file() {
file_exists = true;
if let Ok(existing_data) = json_reader(&playlist_path) {
if json_data == existing_data {
return Err(ServiceError::Conflict(format!(
"Playlist from {date}, already exists!"
)));
}
}
}
match json_writer(&playlist_path, json_data) {
Ok(_) => {
let mut msg = format!("Write playlist from {date} success!");
if file_exists {
msg = format!("Update playlist from {date} success!");
}
return Ok(msg);
}
Err(e) => {
error!("{e}");
}
}
Err(ServiceError::InternalServerError)
}
pub async fn generate_playlist(
mut config: PlayoutConfig,
channel: String,
) -> Result<JsonPlaylist, ServiceError> {
if let Some(mut template) = config.general.template.take() {
for source in template.sources.iter_mut() {
let mut paths = vec![];
for path in &source.paths {
let (safe_path, _, _) =
norm_abs_path(&config.storage.path, &path.to_string_lossy())?;
paths.push(safe_path);
}
source.paths = paths;
}
config.general.template = Some(template);
}
match playlist_generator(&config, Some(channel)) {
Ok(playlists) => {
if !playlists.is_empty() {
Ok(playlists[0].clone())
} else {
Err(ServiceError::Conflict(
"The playlist could not be written, maybe it already exists!".into(),
))
}
}
Err(e) => {
error!("{e}");
Err(ServiceError::InternalServerError)
}
}
}
pub async fn delete_playlist(
conn: &Pool<Sqlite>,
id: i32,
date: &str,
) -> Result<String, ServiceError> {
let (config, _) = playout_config(conn, &id).await?;
let mut playlist_path = PathBuf::from(&config.playlist.path);
let d: Vec<&str> = date.split('-').collect();
playlist_path = playlist_path
.join(d[0])
.join(d[1])
.join(date)
.with_extension("json");
if playlist_path.is_file() {
match fs::remove_file(playlist_path) {
Ok(_) => Ok(format!("Delete playlist from {date} success!")),
Err(e) => {
error!("{e}");
Err(ServiceError::InternalServerError)
}
}
} else {
Ok(format!("No playlist to delete on: {date}"))
}
}

View File

@ -0,0 +1,176 @@
use std::fmt;
use local_ip_address::list_afinet_netifas;
use serde::Serialize;
use sysinfo::System;
use crate::{DISKS, NETWORKS, SYS};
use ffplayout_lib::utils::PlayoutConfig;
const IGNORE_INTERFACES: [&str; 7] = ["docker", "lxdbr", "tab", "tun", "virbr", "veth", "vnet"];
#[derive(Debug, Serialize)]
pub struct Cpu {
pub cores: f32,
pub usage: f32,
}
#[derive(Debug, Default, Serialize)]
pub struct Storage {
pub path: String,
pub total: u64,
pub used: u64,
}
#[derive(Debug, Serialize)]
pub struct Load {
pub one: f64,
pub five: f64,
pub fifteen: f64,
}
#[derive(Debug, Serialize)]
pub struct Memory {
pub total: u64,
pub used: u64,
pub free: u64,
}
#[derive(Debug, Default, Serialize)]
pub struct Network {
pub name: String,
pub current_in: u64,
pub total_in: u64,
pub current_out: u64,
pub total_out: u64,
}
#[derive(Debug, Serialize)]
pub struct MySystem {
pub name: Option<String>,
pub kernel: Option<String>,
pub version: Option<String>,
pub ffp_version: Option<String>,
}
#[derive(Debug, Serialize)]
pub struct Swap {
pub total: u64,
pub used: u64,
pub free: u64,
}
#[derive(Debug, Serialize)]
pub struct SystemStat {
pub cpu: Cpu,
pub load: Load,
pub memory: Memory,
pub network: Network,
pub storage: Storage,
pub swap: Swap,
pub system: MySystem,
}
impl fmt::Display for SystemStat {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", serde_json::to_string(self).unwrap())
}
}
pub fn stat(config: PlayoutConfig) -> SystemStat {
let mut disks = DISKS.lock().unwrap();
let mut networks = NETWORKS.lock().unwrap();
let mut sys = SYS.lock().unwrap();
let network_interfaces = list_afinet_netifas().unwrap_or_default();
let mut usage = 0.0;
let mut interfaces = vec![];
for (name, ip) in network_interfaces.iter() {
if !ip.is_loopback()
&& !IGNORE_INTERFACES
.iter()
.any(|&prefix| name.starts_with(prefix))
{
interfaces.push((name, ip))
}
}
interfaces.dedup_by(|a, b| a.0 == b.0);
disks.refresh();
networks.refresh();
sys.refresh_cpu_usage();
sys.refresh_memory();
let cores = sys.cpus().len() as f32;
for cpu in sys.cpus() {
usage += cpu.cpu_usage();
}
let cpu = Cpu {
cores,
usage: usage * cores / 100.0,
};
let mut storage = Storage::default();
for disk in &*disks {
if disk.mount_point().to_string_lossy().len() > 1
&& config.storage.path.starts_with(disk.mount_point())
{
storage.path = disk.name().to_string_lossy().to_string();
storage.total = disk.total_space();
storage.used = disk.available_space();
}
}
let load_avg = System::load_average();
let load = Load {
one: load_avg.one,
five: load_avg.five,
fifteen: load_avg.fifteen,
};
let memory = Memory {
total: sys.total_memory(),
used: sys.used_memory(),
free: sys.total_memory() - sys.used_memory(),
};
let mut network = Network::default();
for (interface_name, data) in &*networks {
if !interfaces.is_empty() && interface_name == interfaces[0].0 {
network.name.clone_from(interface_name);
network.current_in = data.received();
network.total_in = data.total_received();
network.current_out = data.transmitted();
network.total_out = data.total_transmitted();
}
}
let swap = Swap {
total: sys.total_swap(),
used: sys.used_swap(),
free: sys.free_swap(),
};
let system = MySystem {
name: System::name(),
kernel: System::kernel_version(),
version: System::os_version(),
ffp_version: Some(env!("CARGO_PKG_VERSION").to_string()),
};
SystemStat {
cpu,
storage,
load,
memory,
network,
system,
swap,
}
}

View File

@ -40,7 +40,7 @@ use ProcessUnit::*;
///
/// We save here some global states, about what is running and which processes are alive.
/// This we need for process termination, skipping clip decoder etc.
#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct ProcessControl {
pub decoder_term: Arc<Mutex<Option<Child>>>,
pub encoder_term: Arc<Mutex<Option<Child>>>,

View File

@ -8,7 +8,7 @@ edition.workspace = true
publish = false
[dev-dependencies]
ffplayout = { path = "../ffplayout-engine" }
ffplayout-engine = { path = "../ffplayout-engine" }
# ffplayout-api = { path = "../ffplayout-api" }
ffplayout-lib = { path = "../lib" }

View File

@ -1,6 +1,6 @@
use std::{fs, path::PathBuf};
use ffplayout::{input::playlist::gen_source, utils::prepare_output_cmd};
use ffplayout_engine::{input::playlist::gen_source, utils::prepare_output_cmd};
use ffplayout_lib::{
utils::{Media, OutputMode::*, PlayerControl, PlayoutConfig, PlayoutStatus, ProcessUnit::*},
vec_strings,

View File

@ -6,7 +6,7 @@ use std::{
use serial_test::serial;
use simplelog::*;
use ffplayout::{input::playlist::gen_source, output::player};
use ffplayout_engine::{input::playlist::gen_source, output::player};
use ffplayout_lib::{utils::*, vec_strings};
fn timed_stop(sec: u64, proc_ctl: ProcessControl) {