Merge branch 'api' into master

This commit is contained in:
jb-alvarado 2022-06-13 14:07:43 +02:00 committed by GitHub
commit 4f1f47f32e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 1714 additions and 76 deletions

1
.gitignore vendored
View File

@ -18,5 +18,6 @@
*tar.gz
*.deb
*.rpm
/assets/*.db*
.vscode/

942
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -9,16 +9,25 @@ edition = "2021"
default-run = "ffplayout"
[dependencies]
actix-web = "4"
actix-web-grants = "3"
actix-web-httpauth = "0.6"
argon2 = "0.4"
chrono = { git = "https://github.com/sbrocket/chrono", branch = "parse-error-kind-public" }
clap = { version = "3.1", features = ["derive"] }
crossbeam-channel = "0.5"
derive_more = "0.99"
faccess = "0.2"
ffprobe = "0.3"
file-rotate = { git = "https://github.com/Ploppz/file-rotate.git", branch = "timestamp-parse-fix" }
jsonrpc-http-server = "18.0"
jsonwebtoken = "8"
lettre = "0.10.0-rc.7"
log = "0.4"
notify = "4.0"
once_cell = "1.10"
rand = "0.8"
rand_core = { version = "0.6", features = ["std"] }
regex = "1"
reqwest = { version = "0.11", features = ["blocking"] }
serde = { version = "1.0", features = ["derive"] }
@ -26,6 +35,11 @@ serde_json = "1.0"
serde_yaml = "0.8"
shlex = "1.1"
simplelog = { version = "^0.12", features = ["paris"] }
sqlx = { version = "0.5", features = [
"chrono",
"runtime-actix-native-tls",
"sqlite"
] }
time = { version = "0.3", features = ["formatting", "macros"] }
walkdir = "2"
@ -36,6 +50,10 @@ openssl = { version = "0.10", features = ["vendored"] }
name = "ffplayout"
path = "src/main.rs"
[[bin]]
name = "ffpapi"
path = "src/bin/ffpapi.rs"
[profile.release]
opt-level = 3
strip = true

24
src/api/args_parse.rs Normal file
View File

@ -0,0 +1,24 @@
use clap::Parser;
#[derive(Parser, Debug, Clone)]
#[clap(version,
name = "ffpapi",
version = "0.1.0",
about = "ffplayout REST API",
long_about = None)]
pub struct Args {
#[clap(short, long, help = "Listen on IP:PORT, like: 127.0.0.1:8080")]
pub listen: Option<String>,
#[clap(short, long, help = "Initialize Database")]
pub init: bool,
#[clap(short, long, help = "Create admin user")]
pub username: Option<String>,
#[clap(short, long, help = "Admin email")]
pub email: Option<String>,
#[clap(short, long, help = "Admin password")]
pub password: Option<String>,
}

46
src/api/auth.rs Normal file
View File

@ -0,0 +1,46 @@
use actix_web::error::ErrorUnauthorized;
use actix_web::Error;
use chrono::{Duration, Utc};
use jsonwebtoken::{self, DecodingKey, EncodingKey, Header, Validation};
use serde::{Deserialize, Serialize};
use crate::api::utils::GlobalSettings;
// Token lifetime and Secret key are hardcoded for clarity
const JWT_EXPIRATION_MINUTES: i64 = 60;
#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
pub struct Claims {
pub id: i64,
pub username: String,
pub permissions: Vec<String>,
exp: i64,
}
impl Claims {
pub fn new(id: i64, username: String, permissions: Vec<String>) -> Self {
Self {
id,
username,
permissions,
exp: (Utc::now() + Duration::minutes(JWT_EXPIRATION_MINUTES)).timestamp(),
}
}
}
/// Create a json web token (JWT)
pub fn create_jwt(claims: Claims) -> Result<String, Error> {
let config = GlobalSettings::global();
let encoding_key = EncodingKey::from_secret(config.secret.as_bytes());
jsonwebtoken::encode(&Header::default(), &claims, &encoding_key)
.map_err(|e| ErrorUnauthorized(e.to_string()))
}
/// Decode a json web token (JWT)
pub async fn decode_jwt(token: &str) -> Result<Claims, Error> {
let config = GlobalSettings::global();
let decoding_key = DecodingKey::from_secret(config.secret.as_bytes());
jsonwebtoken::decode::<Claims>(token, &decoding_key, &Validation::default())
.map(|data| data.claims)
.map_err(|e| ErrorUnauthorized(e.to_string()))
}

27
src/api/errors.rs Normal file
View File

@ -0,0 +1,27 @@
use actix_web::{error::ResponseError, HttpResponse};
use derive_more::Display;
#[derive(Debug, Display)]
pub enum ServiceError {
#[display(fmt = "Internal Server Error")]
InternalServerError,
#[display(fmt = "BadRequest: {}", _0)]
BadRequest(String),
#[display(fmt = "Unauthorized")]
Unauthorized,
}
// impl ResponseError trait allows to convert our errors into http responses with appropriate data
impl ResponseError for ServiceError {
fn error_response(&self) -> HttpResponse {
match self {
ServiceError::InternalServerError => {
HttpResponse::InternalServerError().json("Internal Server Error. Please try later.")
}
ServiceError::BadRequest(ref message) => HttpResponse::BadRequest().json(message),
ServiceError::Unauthorized => HttpResponse::Unauthorized().json("No Permission!"),
}
}
}

203
src/api/handles.rs Normal file
View File

@ -0,0 +1,203 @@
use std::path::Path;
use argon2::{
password_hash::{rand_core::OsRng, SaltString},
Argon2, PasswordHasher,
};
use faccess::PathExt;
use rand::{distributions::Alphanumeric, Rng};
use simplelog::*;
use sqlx::{migrate::MigrateDatabase, sqlite::SqliteQueryResult, Pool, Sqlite, SqlitePool};
use crate::api::models::{Settings, User};
use crate::api::utils::GlobalSettings;
#[derive(Debug, sqlx::FromRow)]
struct Role {
name: String,
}
pub fn db_path() -> Result<String, Box<dyn std::error::Error>> {
let sys_path = Path::new("/usr/share/ffplayout");
let mut db_path = String::from("./ffplayout.db");
if sys_path.is_dir() && sys_path.writable() {
db_path = String::from("/usr/share/ffplayout/ffplayout.db");
} else if Path::new("./assets").is_dir() {
db_path = String::from("./assets/ffplayout.db");
}
Ok(db_path)
}
async fn create_schema() -> Result<SqliteQueryResult, sqlx::Error> {
let conn = db_connection().await?;
let query = "PRAGMA foreign_keys = ON;
CREATE TABLE IF NOT EXISTS global
(
id INTEGER PRIMARY KEY AUTOINCREMENT,
secret TEXT NOT NULL,
UNIQUE(secret)
);
CREATE TABLE IF NOT EXISTS roles
(
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
UNIQUE(name)
);
CREATE TABLE IF NOT EXISTS settings
(
id INTEGER PRIMARY KEY AUTOINCREMENT,
channel_name TEXT NOT NULL,
preview_url TEXT NOT NULL,
config_path TEXT NOT NULL,
extra_extensions TEXT NOT NULL,
UNIQUE(channel_name)
);
CREATE TABLE IF NOT EXISTS user
(
id INTEGER PRIMARY KEY AUTOINCREMENT,
email TEXT NOT NULL,
username TEXT NOT NULL,
password TEXT NOT NULL,
salt TEXT NOT NULL,
role_id INTEGER NOT NULL DEFAULT 2,
FOREIGN KEY (role_id) REFERENCES roles (id) ON UPDATE SET NULL ON DELETE SET NULL,
UNIQUE(email, username)
);";
let result = sqlx::query(query).execute(&conn).await;
conn.close().await;
result
}
pub async fn db_init() -> Result<&'static str, Box<dyn std::error::Error>> {
let db_path = db_path()?;
if !Sqlite::database_exists(&db_path).await.unwrap_or(false) {
Sqlite::create_database(&db_path).await.unwrap();
match create_schema().await {
Ok(_) => info!("Database created Successfully"),
Err(e) => panic!("{e}"),
}
}
let secret: String = rand::thread_rng()
.sample_iter(&Alphanumeric)
.take(80)
.map(char::from)
.collect();
let instances = db_connection().await?;
let query = "CREATE TRIGGER global_row_count
BEFORE INSERT ON global
WHEN (SELECT COUNT(*) FROM global) >= 1
BEGIN
SELECT RAISE(FAIL, 'Database is already init!');
END;
INSERT INTO global(secret) VALUES($1);
INSERT INTO roles(name) VALUES('admin'), ('user'), ('guest');
INSERT INTO settings(channel_name, preview_url, config_path, extra_extensions)
VALUES('Channel 1', 'http://localhost/live/preview.m3u8',
'/etc/ffplayout/ffplayout.yml', '.jpg,.jpeg,.png');";
sqlx::query(query).bind(secret).execute(&instances).await?;
instances.close().await;
Ok("Database initialized!")
}
pub async fn db_connection() -> Result<Pool<Sqlite>, sqlx::Error> {
let db_path = db_path().unwrap();
let conn = SqlitePool::connect(&db_path).await?;
Ok(conn)
}
pub async fn db_global() -> Result<GlobalSettings, sqlx::Error> {
let conn = db_connection().await?;
let query = "SELECT secret FROM global WHERE id = 1";
let result: GlobalSettings = sqlx::query_as(query).fetch_one(&conn).await?;
conn.close().await;
Ok(result)
}
pub async fn db_get_settings(id: &i64) -> Result<Settings, sqlx::Error> {
let conn = db_connection().await?;
let query = "SELECT * FROM settings WHERE id = $1";
let result: Settings = sqlx::query_as(query).bind(id).fetch_one(&conn).await?;
conn.close().await;
println!("{:#?}", result);
Ok(result)
}
pub async fn db_update_settings(
id: i64,
settings: Settings,
) -> Result<SqliteQueryResult, sqlx::Error> {
let conn = db_connection().await?;
let query = "UPDATE settings SET channel_name = $2, preview_url = $3, config_path = $4, extra_extensions = $5 WHERE id = $1";
let result: SqliteQueryResult = sqlx::query(query)
.bind(id)
.bind(settings.channel_name.clone())
.bind(settings.preview_url.clone())
.bind(settings.config_path.clone())
.bind(settings.extra_extensions.clone())
.execute(&conn)
.await?;
conn.close().await;
Ok(result)
}
pub async fn db_role(id: &i64) -> Result<String, sqlx::Error> {
let conn = db_connection().await?;
let query = "SELECT name FROM roles WHERE id = $1";
let result: Role = sqlx::query_as(query).bind(id).fetch_one(&conn).await?;
conn.close().await;
Ok(result.name)
}
pub async fn db_login(user: &str) -> Result<User, sqlx::Error> {
let conn = db_connection().await?;
let query = "SELECT id, email, username, password, salt, role_id FROM user WHERE username = $1";
let result: User = sqlx::query_as(query).bind(user).fetch_one(&conn).await?;
conn.close().await;
Ok(result)
}
pub async fn db_add_user(user: User) -> Result<SqliteQueryResult, sqlx::Error> {
let conn = db_connection().await?;
let salt = SaltString::generate(&mut OsRng);
let password_hash = Argon2::default()
.hash_password(user.password.clone().as_bytes(), &salt)
.unwrap();
let query =
"INSERT INTO user (email, username, password, salt, role_id) VALUES($1, $2, $3, $4, $5)";
let result = sqlx::query(query)
.bind(user.email)
.bind(user.username)
.bind(password_hash.to_string())
.bind(salt.to_string())
.bind(user.role_id)
.execute(&conn)
.await?;
conn.close().await;
Ok(result)
}
pub async fn db_update_user(id: i64, fields: String) -> Result<SqliteQueryResult, sqlx::Error> {
let conn = db_connection().await?;
let query = format!("UPDATE user SET {fields} WHERE id = $1");
let result: SqliteQueryResult = sqlx::query(&query).bind(id).execute(&conn).await?;
conn.close().await;
Ok(result)
}

7
src/api/mod.rs Normal file
View File

@ -0,0 +1,7 @@
pub mod args_parse;
pub mod auth;
pub mod errors;
pub mod handles;
pub mod models;
pub mod routes;
pub mod utils;

51
src/api/models.rs Normal file
View File

@ -0,0 +1,51 @@
use serde::{Deserialize, Serialize};
#[derive(Debug, Deserialize, Serialize, sqlx::FromRow)]
pub struct User {
#[sqlx(default)]
#[serde(skip_deserializing)]
pub id: i64,
#[sqlx(default)]
pub email: Option<String>,
pub username: String,
#[sqlx(default)]
#[serde(skip_serializing, default = "empty_string")]
pub password: String,
#[sqlx(default)]
#[serde(skip_serializing)]
pub salt: Option<String>,
#[sqlx(default)]
#[serde(skip_serializing)]
pub role_id: Option<i64>,
#[sqlx(default)]
pub token: Option<String>,
}
fn empty_string() -> String {
"".to_string()
}
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct LoginUser {
pub id: i64,
pub username: String,
}
impl LoginUser {
pub fn new(id: i64, username: String) -> Self {
Self { id, username }
}
}
#[derive(Debug, Deserialize, Serialize, sqlx::FromRow)]
pub struct Settings {
#[serde(skip_deserializing)]
pub id: i64,
pub channel_name: String,
pub preview_url: String,
pub config_path: String,
pub extra_extensions: String,
#[sqlx(default)]
#[serde(skip_serializing, skip_deserializing)]
pub secret: String,
}

167
src/api/routes.rs Normal file
View File

@ -0,0 +1,167 @@
use actix_web::{get, http::StatusCode, patch, post, put, web, Responder};
use actix_web_grants::proc_macro::has_permissions;
use argon2::{
password_hash::{rand_core::OsRng, PasswordHash, SaltString},
Argon2, PasswordHasher, PasswordVerifier,
};
use serde::Serialize;
use simplelog::*;
use crate::api::{
auth::{create_jwt, Claims},
errors::ServiceError,
handles::{
db_add_user, db_get_settings, db_login, db_role, db_update_settings, db_update_user,
},
models::{LoginUser, Settings, User},
};
#[derive(Serialize)]
struct ResponseObj<T> {
message: String,
status: i32,
data: Option<T>,
}
/// curl -X GET http://127.0.0.1:8080/api/settings/1 -H "Authorization: Bearer <TOKEN>"
#[get("/settings/{id}")]
#[has_permissions("admin", "user")]
async fn get_settings(id: web::Path<i64>) -> Result<impl Responder, ServiceError> {
if let Ok(settings) = db_get_settings(&id).await {
return Ok(web::Json(ResponseObj {
message: format!("Settings from {}", settings.channel_name),
status: 200,
data: Some(settings),
}));
}
Err(ServiceError::InternalServerError)
}
/// curl -X PATCH http://127.0.0.1:8080/api/settings/1 -H "Content-Type: application/json" \
/// --data '{"id":1,"channel_name":"Channel 1","preview_url":"http://localhost/live/stream.m3u8", \
/// "config_path":"/etc/ffplayout/ffplayout.yml","extra_extensions":".jpg,.jpeg,.png"}' \
/// -H "Authorization: Bearer <TOKEN>"
#[patch("/settings/{id}")]
#[has_permissions("admin")]
async fn patch_settings(
id: web::Path<i64>,
data: web::Json<Settings>,
) -> Result<impl Responder, ServiceError> {
if db_update_settings(*id, data.into_inner()).await.is_ok() {
return Ok("Update Success");
};
Err(ServiceError::InternalServerError)
}
/// curl -X PUT http://localhost:8080/api/user/1 --header 'Content-Type: application/json' \
/// --data '{"email": "<EMAIL>", "password": "<PASS>"}' --header 'Authorization: <TOKEN>'
#[put("/user/{id}")]
#[has_permissions("admin", "user")]
async fn update_user(
id: web::Path<i64>,
user: web::ReqData<LoginUser>,
data: web::Json<User>,
) -> Result<impl Responder, ServiceError> {
if id.into_inner() == user.id {
let mut fields = String::new();
if let Some(email) = data.email.clone() {
fields.push_str(format!("email = '{email}'").as_str());
}
if !data.password.is_empty() {
if !fields.is_empty() {
fields.push_str(", ");
}
let salt = SaltString::generate(&mut OsRng);
let password_hash = Argon2::default()
.hash_password(data.password.clone().as_bytes(), &salt)
.unwrap();
fields.push_str(format!("password = '{}', salt = '{salt}'", password_hash).as_str());
}
if db_update_user(user.id, fields).await.is_ok() {
return Ok("Update Success");
};
return Err(ServiceError::InternalServerError);
}
Err(ServiceError::Unauthorized)
}
/// curl -X POST 'http://localhost:8080/api/user/' --header 'Content-Type: application/json' \
/// -d '{"email": "<EMAIL>", "username": "<USER>", "password": "<PASS>", "role_id": 1}' \
/// --header 'Authorization: Bearer <TOKEN>'
#[post("/user/")]
#[has_permissions("admin")]
async fn add_user(data: web::Json<User>) -> Result<impl Responder, ServiceError> {
match db_add_user(data.into_inner()).await {
Ok(_) => Ok("Add User Success"),
Err(e) => {
error!("{e}");
Err(ServiceError::InternalServerError)
}
}
}
/// curl -X POST http://127.0.0.1:8080/auth/login/ -H "Content-Type: application/json" \
/// -d '{"username": "<USER>", "password": "<PASS>" }'
#[post("/auth/login/")]
pub async fn login(credentials: web::Json<User>) -> impl Responder {
match db_login(&credentials.username).await {
Ok(mut user) => {
let pass = user.password.clone();
let hash = PasswordHash::new(&pass).unwrap();
user.password = "".into();
user.salt = None;
if Argon2::default()
.verify_password(credentials.password.as_bytes(), &hash)
.is_ok()
{
let role = db_role(&user.role_id.unwrap_or_default())
.await
.unwrap_or_else(|_| "guest".to_string());
let claims = Claims::new(user.id, user.username.clone(), vec![role.clone()]);
if let Ok(token) = create_jwt(claims) {
user.token = Some(token);
};
info!("user {} login, with role: {role}", credentials.username);
web::Json(ResponseObj {
message: "login correct!".into(),
status: 200,
data: Some(user),
})
.customize()
.with_status(StatusCode::OK)
} else {
error!("Wrong password for {}!", credentials.username);
web::Json(ResponseObj {
message: "Wrong password!".into(),
status: 403,
data: None,
})
.customize()
.with_status(StatusCode::FORBIDDEN)
}
}
Err(e) => {
error!("Login {} failed! {e}", credentials.username);
return web::Json(ResponseObj {
message: format!("Login {} failed!", credentials.username),
status: 400,
data: None,
})
.customize()
.with_status(StatusCode::BAD_REQUEST);
}
}
}

81
src/api/utils.rs Normal file
View File

@ -0,0 +1,81 @@
use once_cell::sync::OnceCell;
use simplelog::*;
use crate::api::{
args_parse::Args,
handles::{db_add_user, db_global, db_init},
models::User,
};
#[derive(Debug, sqlx::FromRow)]
pub struct GlobalSettings {
pub secret: String,
}
impl GlobalSettings {
async fn new() -> Self {
let global_settings = db_global();
match global_settings.await {
Ok(g) => g,
Err(_) => GlobalSettings {
secret: String::new(),
},
}
}
pub fn global() -> &'static GlobalSettings {
INSTANCE.get().expect("Config is not initialized")
}
}
static INSTANCE: OnceCell<GlobalSettings> = OnceCell::new();
pub async fn init_config() {
let config = GlobalSettings::new().await;
INSTANCE.set(config).unwrap();
}
pub async fn run_args(args: Args) -> Result<(), i32> {
if !args.init && args.listen.is_none() && args.username.is_none() {
error!("Wrong number of arguments! Run ffpapi --help for more information.");
return Err(0);
}
if args.init {
if let Err(e) = db_init().await {
panic!("{e}");
};
return Err(0);
}
if let Some(username) = args.username {
if args.email.is_none() || args.password.is_none() {
error!("Email/password missing!");
return Err(1);
}
let user = User {
id: 0,
email: Some(args.email.unwrap()),
username: username.clone(),
password: args.password.unwrap(),
salt: None,
role_id: Some(1),
token: None,
};
if let Err(e) = db_add_user(user).await {
error!("{e}");
return Err(1);
};
info!("Create admin user \"{username}\" done...");
return Err(0);
}
Ok(())
}

79
src/bin/ffpapi.rs Normal file
View File

@ -0,0 +1,79 @@
use std::process::exit;
use actix_web::{dev::ServiceRequest, middleware, web, App, Error, HttpMessage, HttpServer};
use actix_web_grants::permissions::AttachPermissions;
use actix_web_httpauth::extractors::bearer::BearerAuth;
use actix_web_httpauth::middleware::HttpAuthentication;
use clap::Parser;
use simplelog::*;
use ffplayout_engine::{
api::{
args_parse::Args,
auth,
models::LoginUser,
routes::{add_user, get_settings, login, patch_settings, update_user},
utils::{init_config, run_args},
},
utils::{init_logging, GlobalConfig},
};
async fn validator(req: ServiceRequest, credentials: BearerAuth) -> Result<ServiceRequest, Error> {
// We just get permissions from JWT
let claims = auth::decode_jwt(credentials.token()).await?;
req.attach(claims.permissions);
req.extensions_mut()
.insert(LoginUser::new(claims.id, claims.username));
Ok(req)
}
#[actix_web::main]
async fn main() -> std::io::Result<()> {
let args = Args::parse();
let mut config = GlobalConfig::new(None);
config.mail.recipient = String::new();
config.logging.log_to_file = false;
config.logging.timestamp = false;
let logging = init_logging(&config, None, None);
CombinedLogger::init(logging).unwrap();
if let Err(c) = run_args(args.clone()).await {
exit(c);
}
if let Some(conn) = args.listen {
init_config().await;
let ip_port = conn.split(':').collect::<Vec<&str>>();
let addr = ip_port[0];
let port = ip_port[1].parse::<u16>().unwrap();
info!("running ffplayout API, listen on {conn}");
// TODO: add allow origin (or give it to the proxy)
HttpServer::new(move || {
let auth = HttpAuthentication::bearer(validator);
App::new()
.wrap(middleware::Logger::default())
.service(login)
.service(
web::scope("/api")
.wrap(auth)
.service(add_user)
.service(get_settings)
.service(patch_settings)
.service(update_user),
)
})
.bind((addr, port))?
.run()
.await
} else {
error!("Run ffpapi with listen parameter!");
Ok(())
}
}

View File

@ -1,6 +1,7 @@
extern crate log;
extern crate simplelog;
pub mod api;
pub mod filter;
pub mod input;
pub mod macros;

View File

@ -14,8 +14,8 @@ use ffplayout_engine::{
output::{player, write_hls},
rpc::json_rpc_server,
utils::{
generate_playlist, init_logging, send_mail, validate_ffmpeg, GlobalConfig, PlayerControl,
PlayoutStatus, ProcessControl,
generate_playlist, get_args, init_logging, send_mail, validate_ffmpeg, GlobalConfig,
PlayerControl, PlayoutStatus, ProcessControl,
},
};
@ -56,7 +56,8 @@ fn status_file(stat_file: &str, playout_stat: &PlayoutStatus) {
}
fn main() {
let config = GlobalConfig::new();
let args = get_args();
let config = GlobalConfig::new(Some(args));
let config_clone = config.clone();
let play_control = PlayerControl::new();
let playout_stat = PlayoutStatus::new();
@ -67,7 +68,7 @@ fn main() {
let proc_ctl2 = proc_control.clone();
let messages = Arc::new(Mutex::new(Vec::new()));
let logging = init_logging(&config, proc_ctl1, messages.clone());
let logging = init_logging(&config, Some(proc_ctl1), Some(messages.clone()));
CombinedLogger::init(logging).unwrap();
validate_ffmpeg(&config);

View File

@ -1,5 +1,4 @@
use std::{
sync::{Arc, Mutex},
thread::{self, sleep},
time::Duration,
};
@ -22,26 +21,24 @@ fn timed_kill(sec: u64, mut proc_ctl: ProcessControl) {
#[test]
#[ignore]
fn playlist_change_at_midnight() {
let mut config = GlobalConfig::new();
let mut config = GlobalConfig::new(None);
config.mail.recipient = "".into();
config.processing.mode = "playlist".into();
config.playlist.day_start = "00:00:00".into();
config.playlist.length = "24:00:00".into();
config.logging.log_to_file = false;
let messages = Arc::new(Mutex::new(Vec::new()));
let play_control = PlayerControl::new();
let playout_stat = PlayoutStatus::new();
let proc_control = ProcessControl::new();
let proc_ctl = proc_control.clone();
let proc_ctl2 = proc_control.clone();
let logging = init_logging(&config, proc_ctl, messages);
let logging = init_logging(&config, None, None);
CombinedLogger::init(logging).unwrap();
mock_time::set_mock_time("2022-05-09T23:59:45");
thread::spawn(move || timed_kill(30, proc_ctl2));
thread::spawn(move || timed_kill(30, proc_ctl));
player(&config, play_control, playout_stat, proc_control);
}
@ -49,26 +46,24 @@ fn playlist_change_at_midnight() {
#[test]
#[ignore]
fn playlist_change_at_six() {
let mut config = GlobalConfig::new();
let mut config = GlobalConfig::new(None);
config.mail.recipient = "".into();
config.processing.mode = "playlist".into();
config.playlist.day_start = "06:00:00".into();
config.playlist.length = "24:00:00".into();
config.logging.log_to_file = false;
let messages = Arc::new(Mutex::new(Vec::new()));
let play_control = PlayerControl::new();
let playout_stat = PlayoutStatus::new();
let proc_control = ProcessControl::new();
let proc_ctl = proc_control.clone();
let proc_ctl2 = proc_control.clone();
let logging = init_logging(&config, proc_ctl, messages);
let logging = init_logging(&config, None, None);
CombinedLogger::init(logging).unwrap();
mock_time::set_mock_time("2022-05-09T05:59:45");
thread::spawn(move || timed_kill(30, proc_ctl2));
thread::spawn(move || timed_kill(30, proc_ctl));
player(&config, play_control, playout_stat, proc_control);
}

View File

@ -39,7 +39,7 @@ fn get_date_tomorrow() {
#[test]
fn test_delta() {
let mut config = GlobalConfig::new();
let mut config = GlobalConfig::new(None);
config.mail.recipient = "".into();
config.processing.mode = "playlist".into();
config.playlist.day_start = "00:00:00".into();

View File

@ -1,6 +1,6 @@
use clap::Parser;
#[derive(Parser, Debug)]
#[derive(Parser, Debug, Clone)]
#[clap(version,
about = "ffplayout, Rust based 24/7 playout solution.",
override_usage = "Run without any command to use config file only, or with commands to override parameters:\n\n ffplayout [OPTIONS]",

View File

@ -8,7 +8,7 @@ use std::{
use serde::{Deserialize, Serialize};
use shlex::split;
use crate::utils::{get_args, time_to_sec};
use crate::utils::{time_to_sec, Args};
use crate::vec_strings;
/// Global Config
@ -136,11 +136,10 @@ pub struct Out {
impl GlobalConfig {
/// Read config from YAML file, and set some extra config values.
pub fn new() -> Self {
let args = get_args();
pub fn new(args: Option<Args>) -> Self {
let mut config_path = PathBuf::from("/etc/ffplayout/ffplayout.yml");
if let Some(cfg) = args.config {
if let Some(cfg) = args.clone().and_then(|a| a.config) {
config_path = PathBuf::from(cfg);
}
@ -219,55 +218,57 @@ impl GlobalConfig {
// Read command line arguments, and override the config with them.
if let Some(gen) = args.generate {
config.general.generate = Some(gen);
}
if let Some(log_path) = args.log {
if Path::new(&log_path).is_dir() {
config.logging.log_to_file = true;
if let Some(arg) = args {
if let Some(gen) = arg.generate {
config.general.generate = Some(gen);
}
config.logging.log_path = log_path;
}
if let Some(playlist) = args.playlist {
config.playlist.path = playlist;
}
if let Some(mode) = args.play_mode {
config.processing.mode = mode;
}
if let Some(folder) = args.folder {
config.storage.path = folder;
config.processing.mode = "folder".into();
}
if let Some(start) = args.start {
config.playlist.day_start = start.clone();
config.playlist.start_sec = Some(time_to_sec(&start));
}
if let Some(length) = args.length {
config.playlist.length = length.clone();
if length.contains(':') {
config.playlist.length_sec = Some(time_to_sec(&length));
} else {
config.playlist.length_sec = Some(86400.0);
if let Some(log_path) = arg.log {
if Path::new(&log_path).is_dir() {
config.logging.log_to_file = true;
}
config.logging.log_path = log_path;
}
}
if args.infinit {
config.playlist.infinit = args.infinit;
}
if let Some(playlist) = arg.playlist {
config.playlist.path = playlist;
}
if let Some(output) = args.output {
config.out.mode = output;
}
if let Some(mode) = arg.play_mode {
config.processing.mode = mode;
}
if let Some(volume) = args.volume {
config.processing.volume = volume;
if let Some(folder) = arg.folder {
config.storage.path = folder;
config.processing.mode = "folder".into();
}
if let Some(start) = arg.start {
config.playlist.day_start = start.clone();
config.playlist.start_sec = Some(time_to_sec(&start));
}
if let Some(length) = arg.length {
config.playlist.length = length.clone();
if length.contains(':') {
config.playlist.length_sec = Some(time_to_sec(&length));
} else {
config.playlist.length_sec = Some(86400.0);
}
}
if arg.infinit {
config.playlist.infinit = arg.infinit;
}
if let Some(output) = arg.output {
config.out.mode = output;
}
if let Some(volume) = arg.volume {
config.processing.volume = volume;
}
}
config
@ -276,7 +277,7 @@ impl GlobalConfig {
impl Default for GlobalConfig {
fn default() -> Self {
Self::new()
Self::new(None)
}
}

View File

@ -167,8 +167,8 @@ fn clean_string(text: &str) -> String {
/// - mail logger
pub fn init_logging(
config: &GlobalConfig,
proc_ctl: ProcessControl,
messages: Arc<Mutex<Vec<String>>>,
proc_ctl: Option<ProcessControl>,
messages: Option<Arc<Mutex<Vec<String>>>>,
) -> Vec<Box<dyn SharedLogger>> {
let config_clone = config.clone();
let app_config = config.logging.clone();
@ -182,6 +182,8 @@ pub fn init_logging(
let mut log_config = ConfigBuilder::new()
.set_thread_level(LevelFilter::Off)
.set_target_level(LevelFilter::Off)
.add_filter_ignore_str("sqlx")
.add_filter_ignore_str("reqwest")
.set_level_padding(LevelPadding::Left)
.set_time_level(time_level)
.clone();
@ -247,10 +249,12 @@ pub fn init_logging(
// set mail logger only the recipient is set in config
if config.mail.recipient.contains('@') && config.mail.recipient.contains('.') {
let messages_clone = messages.clone();
let messages_clone = messages.clone().unwrap();
let interval = config.mail.interval;
thread::spawn(move || mail_queue(config_clone, proc_ctl, messages_clone, interval));
thread::spawn(move || {
mail_queue(config_clone, proc_ctl.unwrap(), messages_clone, interval)
});
let mail_config = log_config.build();
@ -260,7 +264,7 @@ pub fn init_logging(
_ => LevelFilter::Error,
};
app_logger.push(LogMailer::new(filter, mail_config, messages));
app_logger.push(LogMailer::new(filter, mail_config, messages.unwrap()));
}
app_logger

View File

@ -23,7 +23,7 @@ pub mod json_serializer;
mod json_validate;
mod logging;
pub use arg_parse::get_args;
pub use arg_parse::{get_args, Args};
pub use config::GlobalConfig;
pub use controller::{PlayerControl, PlayoutStatus, ProcessControl, ProcessUnit::*};
pub use generator::generate_playlist;