use only one db connection pool

This commit is contained in:
jb-alvarado 2022-11-18 14:13:24 +01:00
parent 454b84b985
commit a5f0813d2a
9 changed files with 323 additions and 204 deletions

View File

@ -21,6 +21,7 @@ use chrono::{DateTime, Datelike, Duration, Local, NaiveDateTime, TimeZone, Utc};
use regex::Regex;
use serde::{Deserialize, Serialize};
use simplelog::*;
use sqlx::{Pool, Sqlite};
use crate::auth::{create_jwt, Claims};
use crate::db::{
@ -134,8 +135,9 @@ struct ProgramItem {
/// }
/// ```
#[post("/auth/login/")]
pub async fn login(credentials: web::Json<User>) -> impl Responder {
match handles::select_login(&credentials.username).await {
pub async fn login(pool: web::Data<&Pool<Sqlite>>, credentials: web::Json<User>) -> impl Responder {
let conn = &pool.into_inner();
match handles::select_login(conn, &credentials.username).await {
Ok(mut user) => {
let pass = user.password.clone();
let hash = PasswordHash::new(&pass).unwrap();
@ -146,7 +148,7 @@ pub async fn login(credentials: web::Json<User>) -> impl Responder {
.verify_password(credentials.password.as_bytes(), &hash)
.is_ok()
{
let role = handles::select_role(&user.role_id.unwrap_or_default())
let role = handles::select_role(conn, &user.role_id.unwrap_or_default())
.await
.unwrap_or_else(|_| "guest".to_string());
let claims = Claims::new(user.id, user.username.clone(), role.clone());
@ -196,8 +198,11 @@ pub async fn login(credentials: web::Json<User>) -> impl Responder {
/// ```
#[get("/user")]
#[has_any_role("Role::Admin", "Role::User", type = "Role")]
async fn get_user(user: web::ReqData<LoginUser>) -> Result<impl Responder, ServiceError> {
match handles::select_user(&user.username).await {
async fn get_user(
pool: web::Data<&Pool<Sqlite>>,
user: web::ReqData<LoginUser>,
) -> Result<impl Responder, ServiceError> {
match handles::select_user(&pool.into_inner(), &user.username).await {
Ok(user) => Ok(web::Json(user)),
Err(e) => {
error!("{e}");
@ -215,6 +220,7 @@ async fn get_user(user: web::ReqData<LoginUser>) -> Result<impl Responder, Servi
#[put("/user/{id}")]
#[has_any_role("Role::Admin", "Role::User", type = "Role")]
async fn update_user(
pool: web::Data<&Pool<Sqlite>>,
id: web::Path<i32>,
user: web::ReqData<LoginUser>,
data: web::Json<User>,
@ -239,7 +245,10 @@ async fn update_user(
fields.push_str(format!("password = '{}', salt = '{salt}'", password_hash).as_str());
}
if handles::update_user(user.id, fields).await.is_ok() {
if handles::update_user(&pool.into_inner(), user.id, fields)
.await
.is_ok()
{
return Ok("Update Success");
};
@ -258,8 +267,11 @@ async fn update_user(
/// ```
#[post("/user/")]
#[has_any_role("Role::Admin", type = "Role")]
async fn add_user(data: web::Json<User>) -> Result<impl Responder, ServiceError> {
match handles::insert_user(data.into_inner()).await {
async fn add_user(
pool: web::Data<&Pool<Sqlite>>,
data: web::Json<User>,
) -> Result<impl Responder, ServiceError> {
match handles::insert_user(&pool.into_inner(), data.into_inner()).await {
Ok(_) => Ok("Add User Success"),
Err(e) => {
error!("{e}");
@ -291,8 +303,11 @@ async fn add_user(data: web::Json<User>) -> Result<impl Responder, ServiceError>
/// ```
#[get("/channel/{id}")]
#[has_any_role("Role::Admin", "Role::User", type = "Role")]
async fn get_channel(id: web::Path<i32>) -> Result<impl Responder, ServiceError> {
if let Ok(channel) = handles::select_channel(&id).await {
async fn get_channel(
pool: web::Data<&Pool<Sqlite>>,
id: web::Path<i32>,
) -> Result<impl Responder, ServiceError> {
if let Ok(channel) = handles::select_channel(&pool.into_inner(), &id).await {
return Ok(web::Json(channel));
}
@ -306,8 +321,8 @@ async fn get_channel(id: web::Path<i32>) -> Result<impl Responder, ServiceError>
/// ```
#[get("/channels")]
#[has_any_role("Role::Admin", type = "Role")]
async fn get_all_channels() -> Result<impl Responder, ServiceError> {
if let Ok(channel) = handles::select_all_channels().await {
async fn get_all_channels(pool: web::Data<&Pool<Sqlite>>) -> Result<impl Responder, ServiceError> {
if let Ok(channel) = handles::select_all_channels(&pool.into_inner()).await {
return Ok(web::Json(channel));
}
@ -325,10 +340,11 @@ async fn get_all_channels() -> Result<impl Responder, ServiceError> {
#[patch("/channel/{id}")]
#[has_any_role("Role::Admin", type = "Role")]
async fn patch_channel(
pool: web::Data<&Pool<Sqlite>>,
id: web::Path<i32>,
data: web::Json<Channel>,
) -> Result<impl Responder, ServiceError> {
if handles::update_channel(*id, data.into_inner())
if handles::update_channel(&pool.into_inner(), *id, data.into_inner())
.await
.is_ok()
{
@ -349,8 +365,11 @@ async fn patch_channel(
/// ```
#[post("/channel/")]
#[has_any_role("Role::Admin", type = "Role")]
async fn add_channel(data: web::Json<Channel>) -> Result<impl Responder, ServiceError> {
match create_channel(data.into_inner()).await {
async fn add_channel(
pool: web::Data<&Pool<Sqlite>>,
data: web::Json<Channel>,
) -> Result<impl Responder, ServiceError> {
match create_channel(&pool.into_inner(), data.into_inner()).await {
Ok(c) => Ok(web::Json(c)),
Err(e) => Err(e),
}
@ -363,8 +382,11 @@ async fn add_channel(data: web::Json<Channel>) -> Result<impl Responder, Service
/// ```
#[delete("/channel/{id}")]
#[has_any_role("Role::Admin", type = "Role")]
async fn remove_channel(id: web::Path<i32>) -> Result<impl Responder, ServiceError> {
if delete_channel(*id).await.is_ok() {
async fn remove_channel(
pool: web::Data<&Pool<Sqlite>>,
id: web::Path<i32>,
) -> Result<impl Responder, ServiceError> {
if delete_channel(&pool.into_inner(), *id).await.is_ok() {
return Ok("Delete Channel Success");
}
@ -383,10 +405,11 @@ async fn remove_channel(id: web::Path<i32>) -> Result<impl Responder, ServiceErr
#[get("/playout/config/{id}")]
#[has_any_role("Role::Admin", "Role::User", type = "Role")]
async fn get_playout_config(
pool: web::Data<&Pool<Sqlite>>,
id: web::Path<i32>,
_details: AuthDetails<Role>,
) -> Result<impl Responder, ServiceError> {
if let Ok(channel) = handles::select_channel(&id).await {
if let Ok(channel) = handles::select_channel(&pool.into_inner(), &id).await {
if let Ok(config) = read_playout_config(&channel.config_path) {
return Ok(web::Json(config));
}
@ -404,10 +427,11 @@ async fn get_playout_config(
#[put("/playout/config/{id}")]
#[has_any_role("Role::Admin", type = "Role")]
async fn update_playout_config(
pool: web::Data<&Pool<Sqlite>>,
id: web::Path<i32>,
data: web::Json<PlayoutConfig>,
) -> Result<impl Responder, ServiceError> {
if let Ok(channel) = handles::select_channel(&id).await {
if let Ok(channel) = handles::select_channel(&pool.into_inner(), &id).await {
if let Ok(f) = std::fs::OpenOptions::new()
.write(true)
.truncate(true)
@ -436,8 +460,11 @@ async fn update_playout_config(
/// ```
#[get("/presets/{id}")]
#[has_any_role("Role::Admin", "Role::User", type = "Role")]
async fn get_presets(id: web::Path<i32>) -> Result<impl Responder, ServiceError> {
if let Ok(presets) = handles::select_presets(*id).await {
async fn get_presets(
pool: web::Data<&Pool<Sqlite>>,
id: web::Path<i32>,
) -> Result<impl Responder, ServiceError> {
if let Ok(presets) = handles::select_presets(&pool.into_inner(), *id).await {
return Ok(web::Json(presets));
}
@ -455,10 +482,14 @@ async fn get_presets(id: web::Path<i32>) -> Result<impl Responder, ServiceError>
#[put("/presets/{id}")]
#[has_any_role("Role::Admin", "Role::User", type = "Role")]
async fn update_preset(
pool: web::Data<&Pool<Sqlite>>,
id: web::Path<i32>,
data: web::Json<TextPreset>,
) -> Result<impl Responder, ServiceError> {
if handles::update_preset(&id, data.into_inner()).await.is_ok() {
if handles::update_preset(&pool.into_inner(), &id, data.into_inner())
.await
.is_ok()
{
return Ok("Update Success");
}
@ -475,8 +506,14 @@ async fn update_preset(
/// ```
#[post("/presets/")]
#[has_any_role("Role::Admin", "Role::User", type = "Role")]
async fn add_preset(data: web::Json<TextPreset>) -> Result<impl Responder, ServiceError> {
if handles::insert_preset(data.into_inner()).await.is_ok() {
async fn add_preset(
pool: web::Data<&Pool<Sqlite>>,
data: web::Json<TextPreset>,
) -> Result<impl Responder, ServiceError> {
if handles::insert_preset(&pool.into_inner(), data.into_inner())
.await
.is_ok()
{
return Ok("Add preset Success");
}
@ -491,8 +528,14 @@ async fn add_preset(data: web::Json<TextPreset>) -> Result<impl Responder, Servi
/// ```
#[delete("/presets/{id}")]
#[has_any_role("Role::Admin", "Role::User", type = "Role")]
async fn delete_preset(id: web::Path<i32>) -> Result<impl Responder, ServiceError> {
if handles::delete_preset(&id).await.is_ok() {
async fn delete_preset(
pool: web::Data<&Pool<Sqlite>>,
id: web::Path<i32>,
) -> Result<impl Responder, ServiceError> {
if handles::delete_preset(&pool.into_inner(), &id)
.await
.is_ok()
{
return Ok("Delete preset Success");
}
@ -519,10 +562,11 @@ async fn delete_preset(id: web::Path<i32>) -> Result<impl Responder, ServiceErro
#[post("/control/{id}/text/")]
#[has_any_role("Role::Admin", "Role::User", type = "Role")]
pub async fn send_text_message(
pool: web::Data<&Pool<Sqlite>>,
id: web::Path<i32>,
data: web::Json<HashMap<String, String>>,
) -> Result<impl Responder, ServiceError> {
match send_message(*id, data.into_inner()).await {
match send_message(&pool.into_inner(), *id, data.into_inner()).await {
Ok(res) => Ok(res.text().await.unwrap_or_else(|_| "Success".into())),
Err(e) => Err(e),
}
@ -541,10 +585,11 @@ pub async fn send_text_message(
#[post("/control/{id}/playout/")]
#[has_any_role("Role::Admin", "Role::User", type = "Role")]
pub async fn control_playout(
pool: web::Data<&Pool<Sqlite>>,
id: web::Path<i32>,
control: web::Json<Process>,
) -> Result<impl Responder, ServiceError> {
match control_state(*id, control.command.clone()).await {
match control_state(&pool.into_inner(), *id, control.command.clone()).await {
Ok(res) => Ok(res.text().await.unwrap_or_else(|_| "Success".into())),
Err(e) => Err(e),
}
@ -582,8 +627,11 @@ pub async fn control_playout(
/// ```
#[get("/control/{id}/media/current")]
#[has_any_role("Role::Admin", "Role::User", type = "Role")]
pub async fn media_current(id: web::Path<i32>) -> Result<impl Responder, ServiceError> {
match media_info(*id, "current".into()).await {
pub async fn media_current(
pool: web::Data<&Pool<Sqlite>>,
id: web::Path<i32>,
) -> Result<impl Responder, ServiceError> {
match media_info(&pool.into_inner(), *id, "current".into()).await {
Ok(res) => Ok(res.text().await.unwrap_or_else(|_| "Success".into())),
Err(e) => Err(e),
}
@ -596,8 +644,11 @@ pub async fn media_current(id: web::Path<i32>) -> Result<impl Responder, Service
/// ```
#[get("/control/{id}/media/next")]
#[has_any_role("Role::Admin", "Role::User", type = "Role")]
pub async fn media_next(id: web::Path<i32>) -> Result<impl Responder, ServiceError> {
match media_info(*id, "next".into()).await {
pub async fn media_next(
pool: web::Data<&Pool<Sqlite>>,
id: web::Path<i32>,
) -> Result<impl Responder, ServiceError> {
match media_info(&pool.into_inner(), *id, "next".into()).await {
Ok(res) => Ok(res.text().await.unwrap_or_else(|_| "Success".into())),
Err(e) => Err(e),
}
@ -611,8 +662,11 @@ pub async fn media_next(id: web::Path<i32>) -> Result<impl Responder, ServiceErr
/// ```
#[get("/control/{id}/media/last")]
#[has_any_role("Role::Admin", "Role::User", type = "Role")]
pub async fn media_last(id: web::Path<i32>) -> Result<impl Responder, ServiceError> {
match media_info(*id, "last".into()).await {
pub async fn media_last(
pool: web::Data<&Pool<Sqlite>>,
id: web::Path<i32>,
) -> Result<impl Responder, ServiceError> {
match media_info(&pool.into_inner(), *id, "last".into()).await {
Ok(res) => Ok(res.text().await.unwrap_or_else(|_| "Success".into())),
Err(e) => Err(e),
}
@ -634,10 +688,11 @@ pub async fn media_last(id: web::Path<i32>) -> Result<impl Responder, ServiceErr
#[post("/control/{id}/process/")]
#[has_any_role("Role::Admin", "Role::User", type = "Role")]
pub async fn process_control(
pool: web::Data<&Pool<Sqlite>>,
id: web::Path<i32>,
proc: web::Json<Process>,
) -> Result<impl Responder, ServiceError> {
control_service(*id, &proc.command).await
control_service(&pool.into_inner(), *id, &proc.command).await
}
/// #### ffplayout Playlist Operations
@ -651,10 +706,11 @@ pub async fn process_control(
#[get("/playlist/{id}")]
#[has_any_role("Role::Admin", "Role::User", type = "Role")]
pub async fn get_playlist(
pool: web::Data<&Pool<Sqlite>>,
id: web::Path<i32>,
obj: web::Query<DateObj>,
) -> Result<impl Responder, ServiceError> {
match read_playlist(*id, obj.date.clone()).await {
match read_playlist(&pool.into_inner(), *id, obj.date.clone()).await {
Ok(playlist) => Ok(web::Json(playlist)),
Err(e) => Err(e),
}
@ -670,10 +726,11 @@ pub async fn get_playlist(
#[post("/playlist/{id}/")]
#[has_any_role("Role::Admin", "Role::User", type = "Role")]
pub async fn save_playlist(
pool: web::Data<&Pool<Sqlite>>,
id: web::Path<i32>,
data: web::Json<JsonPlaylist>,
) -> Result<impl Responder, ServiceError> {
match write_playlist(*id, data.into_inner()).await {
match write_playlist(&pool.into_inner(), *id, data.into_inner()).await {
Ok(res) => Ok(res),
Err(e) => Err(e),
}
@ -690,9 +747,10 @@ pub async fn save_playlist(
#[get("/playlist/{id}/generate/{date}")]
#[has_any_role("Role::Admin", "Role::User", type = "Role")]
pub async fn gen_playlist(
pool: web::Data<&Pool<Sqlite>>,
params: web::Path<(i32, String)>,
) -> Result<impl Responder, ServiceError> {
match generate_playlist(params.0, params.1.clone()).await {
match generate_playlist(&pool.into_inner(), params.0, params.1.clone()).await {
Ok(playlist) => Ok(web::Json(playlist)),
Err(e) => Err(e),
}
@ -707,9 +765,10 @@ pub async fn gen_playlist(
#[delete("/playlist/{id}/{date}")]
#[has_any_role("Role::Admin", "Role::User", type = "Role")]
pub async fn del_playlist(
pool: web::Data<&Pool<Sqlite>>,
params: web::Path<(i32, String)>,
) -> Result<impl Responder, ServiceError> {
match delete_playlist(params.0, &params.1).await {
match delete_playlist(&pool.into_inner(), params.0, &params.1).await {
Ok(_) => Ok(format!("Delete playlist from {} success!", params.1)),
Err(e) => Err(e),
}
@ -726,10 +785,11 @@ pub async fn del_playlist(
#[get("/log/{id}")]
#[has_any_role("Role::Admin", "Role::User", type = "Role")]
pub async fn get_log(
pool: web::Data<&Pool<Sqlite>>,
id: web::Path<i32>,
log: web::Query<DateObj>,
) -> Result<impl Responder, ServiceError> {
read_log_file(&id, &log.date).await
read_log_file(&pool.into_inner(), &id, &log.date).await
}
/// ### File Operations
@ -743,10 +803,11 @@ pub async fn get_log(
#[post("/file/{id}/browse/")]
#[has_any_role("Role::Admin", "Role::User", type = "Role")]
pub async fn file_browser(
pool: web::Data<&Pool<Sqlite>>,
id: web::Path<i32>,
data: web::Json<PathObject>,
) -> Result<impl Responder, ServiceError> {
match browser(*id, &data.into_inner()).await {
match browser(&pool.into_inner(), *id, &data.into_inner()).await {
Ok(obj) => Ok(web::Json(obj)),
Err(e) => Err(e),
}
@ -761,10 +822,11 @@ pub async fn file_browser(
#[post("/file/{id}/create-folder/")]
#[has_any_role("Role::Admin", "Role::User", type = "Role")]
pub async fn add_dir(
pool: web::Data<&Pool<Sqlite>>,
id: web::Path<i32>,
data: web::Json<PathObject>,
) -> Result<HttpResponse, ServiceError> {
create_directory(*id, &data.into_inner()).await
create_directory(&pool.into_inner(), *id, &data.into_inner()).await
}
/// **Rename File**
@ -776,10 +838,11 @@ pub async fn add_dir(
#[post("/file/{id}/rename/")]
#[has_any_role("Role::Admin", "Role::User", type = "Role")]
pub async fn move_rename(
pool: web::Data<&Pool<Sqlite>>,
id: web::Path<i32>,
data: web::Json<MoveObject>,
) -> Result<impl Responder, ServiceError> {
match rename_file(*id, &data.into_inner()).await {
match rename_file(&pool.into_inner(), *id, &data.into_inner()).await {
Ok(obj) => Ok(web::Json(obj)),
Err(e) => Err(e),
}
@ -794,10 +857,11 @@ pub async fn move_rename(
#[post("/file/{id}/remove/")]
#[has_any_role("Role::Admin", "Role::User", type = "Role")]
pub async fn remove(
pool: web::Data<&Pool<Sqlite>>,
id: web::Path<i32>,
data: web::Json<PathObject>,
) -> Result<impl Responder, ServiceError> {
match remove_file_or_folder(*id, &data.into_inner().source).await {
match remove_file_or_folder(&pool.into_inner(), *id, &data.into_inner().source).await {
Ok(obj) => Ok(web::Json(obj)),
Err(e) => Err(e),
}
@ -812,11 +876,12 @@ pub async fn remove(
#[put("/file/{id}/upload/")]
#[has_any_role("Role::Admin", "Role::User", type = "Role")]
async fn save_file(
pool: web::Data<&Pool<Sqlite>>,
id: web::Path<i32>,
payload: Multipart,
obj: web::Query<FileObj>,
) -> Result<HttpResponse, ServiceError> {
upload(*id, payload, &obj.path, false).await
upload(&pool.into_inner(), *id, payload, &obj.path, false).await
}
/// **Import playlist**
@ -831,16 +896,17 @@ async fn save_file(
#[put("/file/{id}/import/")]
#[has_any_role("Role::Admin", "Role::User", type = "Role")]
async fn import_playlist(
pool: web::Data<&Pool<Sqlite>>,
id: web::Path<i32>,
payload: Multipart,
obj: web::Query<ImportObj>,
) -> Result<HttpResponse, ServiceError> {
let file = Path::new(&obj.file).file_name().unwrap_or_default();
let path = env::temp_dir().join(file).to_string_lossy().to_string();
let (config, _) = playout_config(&id).await?;
let channel = handles::select_channel(&id).await?;
let (config, _) = playout_config(&pool.clone().into_inner(), &id).await?;
let channel = handles::select_channel(&pool.clone().into_inner(), &id).await?;
upload(*id, payload, &path, true).await?;
upload(&pool.into_inner(), *id, payload, &path, true).await?;
import_file(&config, &obj.date, Some(channel.name), &path)?;
fs::remove_file(path)?;
@ -873,10 +939,11 @@ async fn import_playlist(
#[get("/program/{id}/")]
#[has_any_role("Role::Admin", "Role::User", type = "Role")]
async fn get_program(
pool: web::Data<&Pool<Sqlite>>,
id: web::Path<i32>,
obj: web::Query<ProgramObj>,
) -> Result<impl Responder, ServiceError> {
let (config, _) = playout_config(&id).await?;
let (config, _) = playout_config(&pool.clone().into_inner(), &id).await?;
let start_sec = config.playlist.start_sec.unwrap();
let mut days = 0;
let mut program = vec![];
@ -901,13 +968,14 @@ async fn get_program(
]);
for date in date_range {
let conn = pool.clone().into_inner();
let mut naive = NaiveDateTime::parse_from_str(
&format!("{date} {}", sec_to_time(start_sec)),
"%Y-%m-%d %H:%M:%S%.3f",
)
.unwrap();
let playlist = match read_playlist(*id, date.clone()).await {
let playlist = match read_playlist(&conn, *id, date.clone()).await {
Ok(p) => p,
Err(e) => {
error!("Error in Playlist from {date}: {e}");

View File

@ -5,9 +5,12 @@ use argon2::{
use rand::{distributions::Alphanumeric, Rng};
use simplelog::*;
use sqlx::{migrate::MigrateDatabase, sqlite::SqliteQueryResult, Pool, Sqlite, SqlitePool};
use sqlx::{migrate::MigrateDatabase, sqlite::SqliteQueryResult, Pool, Sqlite};
use crate::db::models::{Channel, TextPreset, User};
use crate::db::{
db_pool,
models::{Channel, TextPreset, User},
};
use crate::utils::{db_path, local_utc_offset, GlobalSettings};
#[derive(Debug, sqlx::FromRow)]
@ -15,8 +18,7 @@ struct Role {
name: String,
}
async fn create_schema() -> Result<SqliteQueryResult, sqlx::Error> {
let conn = connection().await?;
async fn create_schema(conn: &Pool<Sqlite>) -> Result<SqliteQueryResult, sqlx::Error> {
let query = "PRAGMA foreign_keys = ON;
CREATE TABLE IF NOT EXISTS global
(
@ -71,18 +73,17 @@ async fn create_schema() -> Result<SqliteQueryResult, sqlx::Error> {
FOREIGN KEY (channel_id) REFERENCES channels (id) ON UPDATE SET NULL ON DELETE SET NULL,
UNIQUE(mail, username)
);";
let result = sqlx::query(query).execute(&conn).await;
conn.close().await;
result
sqlx::query(query).execute(conn).await
}
pub async fn db_init(domain: Option<String>) -> Result<&'static str, Box<dyn std::error::Error>> {
let conn = db_pool().await?;
let db_path = db_path()?;
if !Sqlite::database_exists(&db_path).await.unwrap_or(false) {
Sqlite::create_database(&db_path).await.unwrap();
match create_schema().await {
match create_schema(&conn).await {
Ok(_) => info!("Database created Successfully"),
Err(e) => panic!("{e}"),
}
@ -93,8 +94,6 @@ pub async fn db_init(domain: Option<String>) -> Result<&'static str, Box<dyn std
.map(char::from)
.collect();
let instances = connection().await?;
let url = match domain {
Some(d) => format!("http://{d}/live/stream.m3u8"),
None => "http://localhost/live/stream.m3u8".to_string(),
@ -120,45 +119,31 @@ pub async fn db_init(domain: Option<String>) -> Result<&'static str, Box<dyn std
sqlx::query(query)
.bind(secret)
.bind(url)
.execute(&instances)
.execute(&conn)
.await?;
instances.close().await;
Ok("Database initialized!")
}
pub async fn connection() -> Result<Pool<Sqlite>, sqlx::Error> {
let db_path = db_path().unwrap();
let conn = SqlitePool::connect(&db_path).await?;
Ok(conn)
}
pub async fn select_global() -> Result<GlobalSettings, sqlx::Error> {
let conn = connection().await?;
let conn = db_pool().await?;
let query = "SELECT secret FROM global WHERE id = 1";
let result: GlobalSettings = sqlx::query_as(query).fetch_one(&conn).await?;
conn.close().await;
Ok(result)
sqlx::query_as(query).fetch_one(&conn).await
}
pub async fn select_channel(id: &i32) -> Result<Channel, sqlx::Error> {
let conn = connection().await?;
pub async fn select_channel(conn: &Pool<Sqlite>, id: &i32) -> Result<Channel, sqlx::Error> {
let query = "SELECT * FROM channels WHERE id = $1";
let mut result: Channel = sqlx::query_as(query).bind(id).fetch_one(&conn).await?;
conn.close().await;
let mut result: Channel = sqlx::query_as(query).bind(id).fetch_one(conn).await?;
result.utc_offset = local_utc_offset();
Ok(result)
}
pub async fn select_all_channels() -> Result<Vec<Channel>, sqlx::Error> {
let conn = connection().await?;
pub async fn select_all_channels(conn: &Pool<Sqlite>) -> Result<Vec<Channel>, sqlx::Error> {
let query = "SELECT * FROM channels";
let mut results: Vec<Channel> = sqlx::query_as(query).fetch_all(&conn).await?;
conn.close().await;
let mut results: Vec<Channel> = sqlx::query_as(query).fetch_all(conn).await?;
for result in results.iter_mut() {
result.utc_offset = local_utc_offset();
@ -167,26 +152,24 @@ pub async fn select_all_channels() -> Result<Vec<Channel>, sqlx::Error> {
Ok(results)
}
pub async fn update_channel(id: i32, channel: Channel) -> Result<SqliteQueryResult, sqlx::Error> {
let conn = connection().await?;
pub async fn update_channel(
conn: &Pool<Sqlite>,
id: i32,
channel: Channel,
) -> Result<SqliteQueryResult, sqlx::Error> {
let query = "UPDATE channels SET name = $2, preview_url = $3, config_path = $4, extra_extensions = $5 WHERE id = $1";
let result: SqliteQueryResult = sqlx::query(query)
sqlx::query(query)
.bind(id)
.bind(channel.name)
.bind(channel.preview_url)
.bind(channel.config_path)
.bind(channel.extra_extensions)
.execute(&conn)
.await?;
conn.close().await;
Ok(result)
.execute(conn)
.await
}
pub async fn insert_channel(channel: Channel) -> Result<Channel, sqlx::Error> {
let conn = connection().await?;
pub async fn insert_channel(conn: &Pool<Sqlite>, channel: Channel) -> Result<Channel, sqlx::Error> {
let query = "INSERT INTO channels (name, preview_url, config_path, extra_extensions, service) VALUES($1, $2, $3, $4, $5)";
let result = sqlx::query(query)
.bind(channel.name)
@ -194,56 +177,47 @@ pub async fn insert_channel(channel: Channel) -> Result<Channel, sqlx::Error> {
.bind(channel.config_path)
.bind(channel.extra_extensions)
.bind(channel.service)
.execute(&conn)
.execute(conn)
.await?;
let new_channel: Channel = sqlx::query_as("SELECT * FROM channels WHERE id = $1")
sqlx::query_as("SELECT * FROM channels WHERE id = $1")
.bind(result.last_insert_rowid())
.fetch_one(&conn)
.await?;
conn.close().await;
Ok(new_channel)
.fetch_one(conn)
.await
}
pub async fn delete_channel(id: &i32) -> Result<SqliteQueryResult, sqlx::Error> {
let conn = connection().await?;
pub async fn delete_channel(
conn: &Pool<Sqlite>,
id: &i32,
) -> Result<SqliteQueryResult, sqlx::Error> {
let query = "DELETE FROM channels WHERE id = $1";
let result: SqliteQueryResult = sqlx::query(query).bind(id).execute(&conn).await?;
conn.close().await;
Ok(result)
sqlx::query(query).bind(id).execute(conn).await
}
pub async fn select_role(id: &i32) -> Result<String, sqlx::Error> {
let conn = connection().await?;
pub async fn select_role(conn: &Pool<Sqlite>, id: &i32) -> Result<String, sqlx::Error> {
let query = "SELECT name FROM roles WHERE id = $1";
let result: Role = sqlx::query_as(query).bind(id).fetch_one(&conn).await?;
conn.close().await;
let result: Role = sqlx::query_as(query).bind(id).fetch_one(conn).await?;
Ok(result.name)
}
pub async fn select_login(user: &str) -> Result<User, sqlx::Error> {
let conn = connection().await?;
pub async fn select_login(conn: &Pool<Sqlite>, user: &str) -> Result<User, sqlx::Error> {
let query = "SELECT id, mail, username, password, salt, role_id FROM user WHERE username = $1";
let result: User = sqlx::query_as(query).bind(user).fetch_one(&conn).await?;
conn.close().await;
Ok(result)
sqlx::query_as(query).bind(user).fetch_one(conn).await
}
pub async fn select_user(user: &str) -> Result<User, sqlx::Error> {
let conn = connection().await?;
pub async fn select_user(conn: &Pool<Sqlite>, user: &str) -> Result<User, sqlx::Error> {
let query = "SELECT id, mail, username, role_id FROM user WHERE username = $1";
let result: User = sqlx::query_as(query).bind(user).fetch_one(&conn).await?;
conn.close().await;
Ok(result)
sqlx::query_as(query).bind(user).fetch_one(conn).await
}
pub async fn insert_user(user: User) -> Result<SqliteQueryResult, sqlx::Error> {
let conn = connection().await?;
pub async fn insert_user(
conn: &Pool<Sqlite>,
user: User,
) -> Result<SqliteQueryResult, sqlx::Error> {
let salt = SaltString::generate(&mut OsRng);
let password_hash = Argon2::default()
.hash_password(user.password.clone().as_bytes(), &salt)
@ -251,43 +225,43 @@ pub async fn insert_user(user: User) -> Result<SqliteQueryResult, sqlx::Error> {
let query =
"INSERT INTO user (mail, username, password, salt, role_id) VALUES($1, $2, $3, $4, $5)";
let result = sqlx::query(query)
sqlx::query(query)
.bind(user.mail)
.bind(user.username)
.bind(password_hash.to_string())
.bind(salt.to_string())
.bind(user.role_id)
.execute(&conn)
.await?;
conn.close().await;
Ok(result)
.execute(conn)
.await
}
pub async fn update_user(id: i32, fields: String) -> Result<SqliteQueryResult, sqlx::Error> {
let conn = connection().await?;
pub async fn update_user(
conn: &Pool<Sqlite>,
id: i32,
fields: String,
) -> Result<SqliteQueryResult, sqlx::Error> {
let query = format!("UPDATE user SET {fields} WHERE id = $1");
let result: SqliteQueryResult = sqlx::query(&query).bind(id).execute(&conn).await?;
conn.close().await;
Ok(result)
sqlx::query(&query).bind(id).execute(conn).await
}
pub async fn select_presets(id: i32) -> Result<Vec<TextPreset>, sqlx::Error> {
let conn = connection().await?;
pub async fn select_presets(conn: &Pool<Sqlite>, id: i32) -> Result<Vec<TextPreset>, sqlx::Error> {
let query = "SELECT * FROM presets WHERE channel_id = $1";
let result: Vec<TextPreset> = sqlx::query_as(query).bind(id).fetch_all(&conn).await?;
conn.close().await;
Ok(result)
sqlx::query_as(query).bind(id).fetch_all(conn).await
}
pub async fn update_preset(id: &i32, preset: TextPreset) -> Result<SqliteQueryResult, sqlx::Error> {
let conn = connection().await?;
pub async fn update_preset(
conn: &Pool<Sqlite>,
id: &i32,
preset: TextPreset,
) -> Result<SqliteQueryResult, sqlx::Error> {
let query =
"UPDATE presets SET name = $1, text = $2, x = $3, y = $4, fontsize = $5, line_spacing = $6,
fontcolor = $7, alpha = $8, box = $9, boxcolor = $10, boxborderw = 11 WHERE id = $12";
let result: SqliteQueryResult = sqlx::query(query)
sqlx::query(query)
.bind(preset.name)
.bind(preset.text)
.bind(preset.x)
@ -300,19 +274,19 @@ pub async fn update_preset(id: &i32, preset: TextPreset) -> Result<SqliteQueryRe
.bind(preset.boxcolor)
.bind(preset.boxborderw)
.bind(id)
.execute(&conn)
.await?;
conn.close().await;
Ok(result)
.execute(conn)
.await
}
pub async fn insert_preset(preset: TextPreset) -> Result<SqliteQueryResult, sqlx::Error> {
let conn = connection().await?;
pub async fn insert_preset(
conn: &Pool<Sqlite>,
preset: TextPreset,
) -> Result<SqliteQueryResult, sqlx::Error> {
let query =
"INSERT INTO presets (channel_id, name, text, x, y, fontsize, line_spacing, fontcolor, alpha, box, boxcolor, boxborderw)
VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)";
let result: SqliteQueryResult = sqlx::query(query)
sqlx::query(query)
.bind(preset.channel_id)
.bind(preset.name)
.bind(preset.text)
@ -325,18 +299,15 @@ pub async fn insert_preset(preset: TextPreset) -> Result<SqliteQueryResult, sqlx
.bind(preset.r#box)
.bind(preset.boxcolor)
.bind(preset.boxborderw)
.execute(&conn)
.await?;
conn.close().await;
Ok(result)
.execute(conn)
.await
}
pub async fn delete_preset(id: &i32) -> Result<SqliteQueryResult, sqlx::Error> {
let conn = connection().await?;
pub async fn delete_preset(
conn: &Pool<Sqlite>,
id: &i32,
) -> Result<SqliteQueryResult, sqlx::Error> {
let query = "DELETE FROM presets WHERE id = $1;";
let result: SqliteQueryResult = sqlx::query(query).bind(id).execute(&conn).await?;
conn.close().await;
Ok(result)
sqlx::query(query).bind(id).execute(conn).await
}

View File

@ -1,2 +1,13 @@
use sqlx::{Pool, Sqlite, SqlitePool};
pub mod handles;
pub mod models;
use crate::utils::db_path;
pub async fn db_pool() -> Result<Pool<Sqlite>, sqlx::Error> {
let db_path = db_path().unwrap();
let conn = SqlitePool::connect(&db_path).await?;
Ok(conn)
}

View File

@ -24,7 +24,7 @@ use api::{
update_preset, update_user,
},
};
use db::models::LoginUser;
use db::{db_pool, models::LoginUser};
use utils::{args_parse::Args, db_path, init_config, run_args, Role};
use ffplayout_lib::utils::{init_logging, PlayoutConfig};
@ -64,7 +64,15 @@ async fn main() -> std::io::Result<()> {
let logging = init_logging(&config, None, None);
CombinedLogger::init(logging).unwrap();
if let Err(c) = run_args(args.clone()).await {
let pool = match db_pool().await {
Ok(p) => p,
Err(e) => {
error!("{e}");
exit(1);
}
};
if let Err(c) = run_args(&pool, args.clone()).await {
exit(c);
}
@ -85,7 +93,10 @@ async fn main() -> std::io::Result<()> {
// no allow origin here, give it to the reverse proxy
HttpServer::new(move || {
let auth = HttpAuthentication::bearer(validator);
let db_pool = web::Data::new(pool.clone());
App::new()
.app_data(db_pool)
.wrap(middleware::Logger::default())
.service(login)
.service(

View File

@ -1,12 +1,16 @@
use std::fs;
use simplelog::*;
use sqlx::{Pool, Sqlite};
use crate::utils::{control::control_service, errors::ServiceError};
use crate::db::{handles, models::Channel};
pub async fn create_channel(target_channel: Channel) -> Result<Channel, ServiceError> {
pub async fn create_channel(
conn: &Pool<Sqlite>,
target_channel: Channel,
) -> Result<Channel, ServiceError> {
if !target_channel.service.starts_with("ffplayout@") {
return Err(ServiceError::BadRequest("Bad service name!".to_string()));
}
@ -20,22 +24,22 @@ pub async fn create_channel(target_channel: Channel) -> Result<Channel, ServiceE
&target_channel.config_path,
)?;
let new_channel = handles::insert_channel(target_channel).await?;
control_service(new_channel.id, "enable").await?;
let new_channel = handles::insert_channel(conn, target_channel).await?;
control_service(conn, new_channel.id, "enable").await?;
Ok(new_channel)
}
pub async fn delete_channel(id: i32) -> Result<(), ServiceError> {
let channel = handles::select_channel(&id).await?;
control_service(channel.id, "stop").await?;
control_service(channel.id, "disable").await?;
pub async fn delete_channel(conn: &Pool<Sqlite>, id: i32) -> Result<(), ServiceError> {
let channel = handles::select_channel(conn, &id).await?;
control_service(conn, channel.id, "stop").await?;
control_service(conn, channel.id, "disable").await?;
if let Err(e) = fs::remove_file(channel.config_path) {
error!("{e}");
};
handles::delete_channel(&id).await?;
handles::delete_channel(conn, &id).await?;
Ok(())
}

View File

@ -5,6 +5,7 @@ use reqwest::{
Client, Response,
};
use serde::{Deserialize, Serialize};
use sqlx::{Pool, Sqlite};
use crate::db::handles::select_channel;
use crate::utils::{errors::ServiceError, playout_config};
@ -56,8 +57,8 @@ struct SystemD {
}
impl SystemD {
async fn new(id: i32) -> Result<Self, ServiceError> {
let channel = select_channel(&id).await?;
async fn new(conn: &Pool<Sqlite>, id: i32) -> Result<Self, ServiceError> {
let channel = select_channel(conn, &id).await?;
Ok(Self {
service: channel.service,
@ -130,11 +131,15 @@ fn create_header(auth: &str) -> HeaderMap {
headers
}
async fn post_request<T>(id: i32, obj: RpcObj<T>) -> Result<Response, ServiceError>
async fn post_request<T>(
conn: &Pool<Sqlite>,
id: i32,
obj: RpcObj<T>,
) -> Result<Response, ServiceError>
where
T: Serialize,
{
let (config, _) = playout_config(&id).await?;
let (config, _) = playout_config(conn, &id).await?;
let url = format!("http://{}", config.rpc_server.address);
let client = Client::new();
@ -151,6 +156,7 @@ where
}
pub async fn send_message(
conn: &Pool<Sqlite>,
id: i32,
message: HashMap<String, String>,
) -> Result<Response, ServiceError> {
@ -163,23 +169,35 @@ pub async fn send_message(
},
);
post_request(id, json_obj).await
post_request(conn, id, json_obj).await
}
pub async fn control_state(id: i32, command: String) -> Result<Response, ServiceError> {
pub async fn control_state(
conn: &Pool<Sqlite>,
id: i32,
command: String,
) -> Result<Response, ServiceError> {
let json_obj = RpcObj::new(id, "player".into(), ControlParams { control: command });
post_request(id, json_obj).await
post_request(conn, id, json_obj).await
}
pub async fn media_info(id: i32, command: String) -> Result<Response, ServiceError> {
pub async fn media_info(
conn: &Pool<Sqlite>,
id: i32,
command: String,
) -> Result<Response, ServiceError> {
let json_obj = RpcObj::new(id, "player".into(), MediaParams { media: command });
post_request(id, json_obj).await
post_request(conn, id, json_obj).await
}
pub async fn control_service(id: i32, command: &str) -> Result<String, ServiceError> {
let system_d = SystemD::new(id).await?;
pub async fn control_service(
conn: &Pool<Sqlite>,
id: i32,
command: &str,
) -> Result<String, ServiceError> {
let system_d = SystemD::new(conn, id).await?;
match command {
"enable" => system_d.enable(),

View File

@ -6,6 +6,7 @@ use futures_util::TryStreamExt as _;
use rand::{distributions::Alphanumeric, Rng};
use relative_path::RelativePath;
use serde::{Deserialize, Serialize};
use sqlx::{Pool, Sqlite};
use simplelog::*;
@ -87,8 +88,12 @@ fn norm_abs_path(root_path: &str, input_path: &str) -> (PathBuf, String, String)
/// Take input path and give file and folder list from it back.
/// Input should be a relative path segment, but when it is a absolut path, the norm_abs_path function
/// will take care, that user can not break out from given storage path in config.
pub async fn browser(id: i32, path_obj: &PathObject) -> Result<PathObject, ServiceError> {
let (config, _) = playout_config(&id).await?;
pub async fn browser(
conn: &Pool<Sqlite>,
id: i32,
path_obj: &PathObject,
) -> Result<PathObject, ServiceError> {
let (config, _) = playout_config(conn, &id).await?;
let extensions = config.storage.extensions;
let (path, parent, path_component) = norm_abs_path(&config.storage.path, &path_obj.source);
let mut obj = PathObject::new(path_component, Some(parent));
@ -143,10 +148,11 @@ pub async fn browser(id: i32, path_obj: &PathObject) -> Result<PathObject, Servi
}
pub async fn create_directory(
conn: &Pool<Sqlite>,
id: i32,
path_obj: &PathObject,
) -> Result<HttpResponse, ServiceError> {
let (config, _) = playout_config(&id).await?;
let (config, _) = playout_config(conn, &id).await?;
let (path, _, _) = norm_abs_path(&config.storage.path, &path_obj.source);
if let Err(e) = fs::create_dir_all(&path) {
@ -198,8 +204,12 @@ fn rename(source: &PathBuf, target: &PathBuf) -> Result<MoveObject, ServiceError
}
}
pub async fn rename_file(id: i32, move_object: &MoveObject) -> Result<MoveObject, ServiceError> {
let (config, _) = playout_config(&id).await?;
pub async fn rename_file(
conn: &Pool<Sqlite>,
id: i32,
move_object: &MoveObject,
) -> Result<MoveObject, ServiceError> {
let (config, _) = playout_config(conn, &id).await?;
let (source_path, _, _) = norm_abs_path(&config.storage.path, &move_object.source);
let (mut target_path, _, _) = norm_abs_path(&config.storage.path, &move_object.target);
@ -229,8 +239,12 @@ pub async fn rename_file(id: i32, move_object: &MoveObject) -> Result<MoveObject
Err(ServiceError::InternalServerError)
}
pub async fn remove_file_or_folder(id: i32, source_path: &str) -> Result<(), ServiceError> {
let (config, _) = playout_config(&id).await?;
pub async fn remove_file_or_folder(
conn: &Pool<Sqlite>,
id: i32,
source_path: &str,
) -> Result<(), ServiceError> {
let (config, _) = playout_config(conn, &id).await?;
let (source, _, _) = norm_abs_path(&config.storage.path, source_path);
if !source.exists() {
@ -262,8 +276,8 @@ pub async fn remove_file_or_folder(id: i32, source_path: &str) -> Result<(), Ser
Err(ServiceError::InternalServerError)
}
async fn valid_path(id: i32, path: &str) -> Result<PathBuf, ServiceError> {
let (config, _) = playout_config(&id).await?;
async fn valid_path(conn: &Pool<Sqlite>, id: i32, path: &str) -> Result<PathBuf, ServiceError> {
let (config, _) = playout_config(conn, &id).await?;
let (test_path, _, _) = norm_abs_path(&config.storage.path, path);
if !test_path.is_dir() {
@ -274,6 +288,7 @@ async fn valid_path(id: i32, path: &str) -> Result<PathBuf, ServiceError> {
}
pub async fn upload(
conn: &Pool<Sqlite>,
id: i32,
mut payload: Multipart,
path: &str,
@ -296,7 +311,7 @@ pub async fn upload(
if abs_path {
filepath = PathBuf::from(path);
} else {
let target_path = valid_path(id, path).await?;
let target_path = valid_path(conn, id, path).await?;
filepath = target_path.join(filename);
}

View File

@ -11,6 +11,7 @@ use once_cell::sync::OnceCell;
use rpassword::read_password;
use serde::{de, Deserialize, Deserializer};
use simplelog::*;
use sqlx::{Pool, Sqlite};
pub mod args_parse;
pub mod channels;
@ -89,7 +90,7 @@ pub fn db_path() -> Result<String, Box<dyn std::error::Error>> {
Ok(db_path)
}
pub async fn run_args(mut args: Args) -> Result<(), i32> {
pub async fn run_args(conn: &Pool<Sqlite>, mut args: Args) -> Result<(), i32> {
if !args.init && args.listen.is_none() && !args.ask && args.username.is_none() {
error!("Wrong number of arguments! Run ffpapi --help for more information.");
@ -161,7 +162,7 @@ pub async fn run_args(mut args: Args) -> Result<(), i32> {
token: None,
};
if let Err(e) = insert_user(user).await {
if let Err(e) = insert_user(conn, user).await {
error!("{e}");
return Err(1);
};
@ -183,8 +184,11 @@ pub fn read_playout_config(path: &str) -> Result<PlayoutConfig, Box<dyn Error>>
Ok(config)
}
pub async fn playout_config(channel_id: &i32) -> Result<(PlayoutConfig, Channel), ServiceError> {
if let Ok(channel) = select_channel(channel_id).await {
pub async fn playout_config(
conn: &Pool<Sqlite>,
channel_id: &i32,
) -> Result<(PlayoutConfig, Channel), ServiceError> {
if let Ok(channel) = select_channel(conn, channel_id).await {
if let Ok(config) = read_playout_config(&channel.config_path.clone()) {
return Ok((config, channel));
}
@ -195,8 +199,12 @@ pub async fn playout_config(channel_id: &i32) -> Result<(PlayoutConfig, Channel)
))
}
pub async fn read_log_file(channel_id: &i32, date: &str) -> Result<String, ServiceError> {
if let Ok(channel) = select_channel(channel_id).await {
pub async fn read_log_file(
conn: &Pool<Sqlite>,
channel_id: &i32,
date: &str,
) -> Result<String, ServiceError> {
if let Ok(channel) = select_channel(conn, channel_id).await {
let mut date_str = "".to_string();
if !date.is_empty() {

View File

@ -1,14 +1,19 @@
use std::{fs, path::PathBuf};
use simplelog::*;
use sqlx::{Pool, Sqlite};
use crate::utils::{errors::ServiceError, playout_config};
use ffplayout_lib::utils::{
generate_playlist as playlist_generator, json_reader, json_writer, JsonPlaylist,
};
pub async fn read_playlist(id: i32, date: String) -> Result<JsonPlaylist, ServiceError> {
let (config, _) = playout_config(&id).await?;
pub async fn read_playlist(
conn: &Pool<Sqlite>,
id: i32,
date: String,
) -> Result<JsonPlaylist, ServiceError> {
let (config, _) = playout_config(conn, &id).await?;
let mut playlist_path = PathBuf::from(&config.playlist.path);
let d: Vec<&str> = date.split('-').collect();
playlist_path = playlist_path
@ -23,8 +28,12 @@ pub async fn read_playlist(id: i32, date: String) -> Result<JsonPlaylist, Servic
}
}
pub async fn write_playlist(id: i32, json_data: JsonPlaylist) -> Result<String, ServiceError> {
let (config, _) = playout_config(&id).await?;
pub async fn write_playlist(
conn: &Pool<Sqlite>,
id: i32,
json_data: JsonPlaylist,
) -> Result<String, ServiceError> {
let (config, _) = playout_config(conn, &id).await?;
let date = json_data.date.clone();
let mut playlist_path = PathBuf::from(&config.playlist.path);
let d: Vec<&str> = date.split('-').collect();
@ -68,8 +77,12 @@ pub async fn write_playlist(id: i32, json_data: JsonPlaylist) -> Result<String,
Err(ServiceError::InternalServerError)
}
pub async fn generate_playlist(id: i32, date: String) -> Result<JsonPlaylist, ServiceError> {
let (mut config, channel) = playout_config(&id).await?;
pub async fn generate_playlist(
conn: &Pool<Sqlite>,
id: i32,
date: String,
) -> Result<JsonPlaylist, ServiceError> {
let (mut config, channel) = playout_config(conn, &id).await?;
config.general.generate = Some(vec![date.clone()]);
match playlist_generator(&config, Some(channel.name)) {
@ -89,8 +102,8 @@ pub async fn generate_playlist(id: i32, date: String) -> Result<JsonPlaylist, Se
}
}
pub async fn delete_playlist(id: i32, date: &str) -> Result<(), ServiceError> {
let (config, _) = playout_config(&id).await?;
pub async fn delete_playlist(conn: &Pool<Sqlite>, id: i32, date: &str) -> Result<(), ServiceError> {
let (config, _) = playout_config(conn, &id).await?;
let mut playlist_path = PathBuf::from(&config.playlist.path);
let d: Vec<&str> = date.split('-').collect();
playlist_path = playlist_path