cleanup and simplify code, add user_channels table for many to many, select related channels based on user_channels

This commit is contained in:
jb-alvarado 2024-06-20 23:31:35 +02:00
parent 7c2cbc5a09
commit 92363b1921
14 changed files with 166 additions and 308 deletions

View File

@ -25,7 +25,6 @@ use actix_web::{
patch, post, put, web, HttpRequest, HttpResponse, Responder,
};
use actix_web_grants::{authorities::AuthDetails, proc_macro::protect};
use shlex::split;
use argon2::{
password_hash::{rand_core::OsRng, PasswordHash, SaltString},
@ -162,13 +161,12 @@ struct ProgramItem {
/// ```
#[post("/auth/login/")]
pub async fn login(pool: web::Data<Pool<Sqlite>>, credentials: web::Json<User>) -> impl Responder {
let conn = pool.into_inner();
let username = credentials.username.clone();
let password = credentials.password.clone();
match handles::select_login(&conn, &username).await {
match handles::select_login(&pool, &username).await {
Ok(mut user) => {
let role = handles::select_role(&conn, &user.role_id.unwrap_or_default())
let role = handles::select_role(&pool, &user.role_id.unwrap_or_default())
.await
.unwrap_or(Role::Guest);
@ -244,7 +242,7 @@ async fn get_user(
pool: web::Data<Pool<Sqlite>>,
user: web::ReqData<UserMeta>,
) -> Result<impl Responder, ServiceError> {
match handles::select_user(&pool.into_inner(), user.id).await {
match handles::select_user(&pool, user.id).await {
Ok(user) => Ok(web::Json(user)),
Err(e) => {
error!("{e}");
@ -265,7 +263,7 @@ async fn get_by_name(
pool: web::Data<Pool<Sqlite>>,
id: web::Path<i32>,
) -> Result<impl Responder, ServiceError> {
match handles::select_user(&pool.into_inner(), *id).await {
match handles::select_user(&pool, *id).await {
Ok(user) => Ok(web::Json(user)),
Err(e) => {
error!("{e}");
@ -283,7 +281,7 @@ async fn get_by_name(
#[get("/users")]
#[protect("Role::GlobalAdmin", ty = "Role")]
async fn get_users(pool: web::Data<Pool<Sqlite>>) -> Result<impl Responder, ServiceError> {
match handles::select_users(&pool.into_inner()).await {
match handles::select_users(&pool).await {
Ok(users) => Ok(web::Json(users)),
Err(e) => {
error!("{e}");
@ -356,10 +354,7 @@ async fn update_user(
fields.push_str(&format!("password = '{password_hash}'"));
}
if handles::update_user(&pool.into_inner(), *id, fields)
.await
.is_ok()
{
if handles::update_user(&pool, *id, fields).await.is_ok() {
return Ok("Update Success");
};
@ -379,7 +374,7 @@ async fn add_user(
pool: web::Data<Pool<Sqlite>>,
data: web::Json<User>,
) -> Result<impl Responder, ServiceError> {
match handles::insert_user(&pool.into_inner(), data.into_inner()).await {
match handles::insert_user(&pool, data.into_inner()).await {
Ok(_) => Ok("Add User Success"),
Err(e) => {
error!("{e}");
@ -400,7 +395,7 @@ async fn remove_user(
pool: web::Data<Pool<Sqlite>>,
id: web::Path<i32>,
) -> Result<impl Responder, ServiceError> {
match handles::delete_user(&pool.into_inner(), *id).await {
match handles::delete_user(&pool, *id).await {
Ok(_) => return Ok("Delete user success"),
Err(e) => {
error!("{e}");
@ -440,7 +435,7 @@ async fn get_channel(
role: AuthDetails<Role>,
user: web::ReqData<UserMeta>,
) -> Result<impl Responder, ServiceError> {
if let Ok(channel) = handles::select_channel(&pool.into_inner(), &id).await {
if let Ok(channel) = handles::select_channel(&pool, &id).await {
return Ok(web::Json(channel));
}
@ -461,9 +456,7 @@ async fn get_all_channels(
pool: web::Data<Pool<Sqlite>>,
user: web::ReqData<UserMeta>,
) -> Result<impl Responder, ServiceError> {
if let Ok(channel) =
handles::select_related_channels(&pool.into_inner(), Some(user.channels.clone())).await
{
if let Ok(channel) = handles::select_related_channels(&pool, Some(user.id.clone())).await {
return Ok(web::Json(channel));
}
@ -491,7 +484,7 @@ async fn patch_channel(
role: AuthDetails<Role>,
user: web::ReqData<UserMeta>,
) -> Result<impl Responder, ServiceError> {
if handles::update_channel(&pool.into_inner(), *id, data.into_inner())
if handles::update_channel(&pool, *id, data.into_inner())
.await
.is_ok()
{
@ -517,7 +510,7 @@ async fn add_channel(
queue: web::Data<Mutex<Vec<Arc<Mutex<MailQueue>>>>>,
) -> Result<impl Responder, ServiceError> {
match create_channel(
&pool.into_inner(),
&pool,
controllers.into_inner(),
queue.into_inner(),
data.into_inner(),
@ -542,14 +535,9 @@ async fn remove_channel(
controllers: web::Data<Mutex<ChannelController>>,
queue: web::Data<Mutex<Vec<Arc<Mutex<MailQueue>>>>>,
) -> Result<impl Responder, ServiceError> {
if delete_channel(
&pool.into_inner(),
*id,
controllers.into_inner(),
queue.into_inner(),
)
.await
.is_ok()
if delete_channel(&pool, *id, controllers.into_inner(), queue.into_inner())
.await
.is_ok()
{
return Ok("Delete Channel Success");
}
@ -606,100 +594,11 @@ async fn update_advanced_config(
user: web::ReqData<UserMeta>,
) -> Result<impl Responder, ServiceError> {
let manager = controllers.lock().unwrap().get(*id).unwrap();
let id = manager.config.lock().unwrap().general.id;
if let Err(e) =
handles::update_advanced_configuration(&pool.into_inner(), id, data.clone()).await
{
return Err(ServiceError::Conflict(format!("{e}")));
}
handles::update_advanced_configuration(&pool, *id, data.clone()).await?;
let new_config = PlayoutConfig::new(&pool, *id).await;
let mut cfg = manager.config.lock().unwrap();
cfg.advanced
.decoder
.input_param
.clone_from(&data.decoder.input_param);
cfg.advanced
.decoder
.output_param
.clone_from(&data.decoder.output_param);
cfg.advanced.decoder.input_cmd = split(&data.decoder.input_param.clone().unwrap_or_default());
cfg.advanced.decoder.output_cmd = split(&data.decoder.output_param.clone().unwrap_or_default());
cfg.advanced
.encoder
.input_param
.clone_from(&data.encoder.input_param);
cfg.advanced.encoder.input_cmd = split(&data.encoder.input_param.clone().unwrap_or_default());
cfg.advanced
.ingest
.input_param
.clone_from(&data.encoder.input_param);
cfg.advanced.ingest.input_cmd = split(&data.ingest.input_param.clone().unwrap_or_default());
cfg.advanced
.filter
.deinterlace
.clone_from(&data.filter.deinterlace);
cfg.advanced
.filter
.pad_scale_w
.clone_from(&data.filter.pad_scale_w);
cfg.advanced
.filter
.pad_scale_h
.clone_from(&data.filter.pad_scale_h);
cfg.advanced
.filter
.pad_video
.clone_from(&data.filter.pad_video);
cfg.advanced.filter.fps.clone_from(&data.filter.fps);
cfg.advanced.filter.scale.clone_from(&data.filter.scale);
cfg.advanced.filter.set_dar.clone_from(&data.filter.set_dar);
cfg.advanced.filter.fade_in.clone_from(&data.filter.fade_in);
cfg.advanced
.filter
.fade_out
.clone_from(&data.filter.fade_out);
cfg.advanced
.filter
.overlay_logo_scale
.clone_from(&data.filter.overlay_logo_scale);
cfg.advanced
.filter
.overlay_logo_fade_in
.clone_from(&data.filter.overlay_logo_fade_in);
cfg.advanced
.filter
.overlay_logo_fade_out
.clone_from(&data.filter.overlay_logo_fade_out);
cfg.advanced
.filter
.overlay_logo
.clone_from(&data.filter.overlay_logo);
cfg.advanced.filter.tpad.clone_from(&data.filter.tpad);
cfg.advanced
.filter
.drawtext_from_file
.clone_from(&data.filter.drawtext_from_file);
cfg.advanced
.filter
.drawtext_from_zmq
.clone_from(&data.filter.drawtext_from_zmq);
cfg.advanced
.filter
.aevalsrc
.clone_from(&data.filter.aevalsrc);
cfg.advanced
.filter
.afade_in
.clone_from(&data.filter.afade_in);
cfg.advanced
.filter
.afade_out
.clone_from(&data.filter.afade_out);
cfg.advanced.filter.apad.clone_from(&data.filter.apad);
cfg.advanced.filter.volume.clone_from(&data.filter.volume);
cfg.advanced.filter.split.clone_from(&data.filter.split);
manager.update_config(new_config);
Ok(web::Json("Update success"))
}
@ -751,100 +650,12 @@ async fn update_playout_config(
user: web::ReqData<UserMeta>,
) -> Result<impl Responder, ServiceError> {
let manager = controllers.lock().unwrap().get(*id).unwrap();
let id = manager.config.lock().unwrap().general.id;
let config_id = manager.config.lock().unwrap().general.id;
if let Err(e) = handles::update_configuration(&pool.into_inner(), id, data.clone()).await {
return Err(ServiceError::Conflict(format!("{e}")));
}
handles::update_configuration(&pool, config_id, data.clone()).await?;
let new_config = PlayoutConfig::new(&pool, *id).await;
let mut config = manager.config.lock().unwrap();
let (filler_path, _, _) = norm_abs_path(
&config.global.storage_path,
data.storage.filler.to_string_lossy().as_ref(),
)?;
config.general.stop_threshold = data.general.stop_threshold;
config.mail.subject.clone_from(&data.mail.subject);
config.mail.smtp_server.clone_from(&data.mail.smtp_server);
config.mail.starttls = data.mail.starttls;
config.mail.sender_addr.clone_from(&data.mail.sender_addr);
config.mail.sender_pass.clone_from(&data.mail.sender_pass);
config.mail.recipient.clone_from(&data.mail.recipient);
config.mail.mail_level = data.mail.mail_level;
config.mail.interval = data.mail.interval;
config
.logging
.ffmpeg_level
.clone_from(&data.logging.ffmpeg_level);
config
.logging
.ingest_level
.clone_from(&data.logging.ingest_level);
config.logging.detect_silence = data.logging.detect_silence;
config
.logging
.ignore_lines
.clone_from(&data.logging.ignore_lines);
config.processing.mode.clone_from(&data.processing.mode);
config.processing.audio_only = data.processing.audio_only;
config.processing.audio_track_index = data.processing.audio_track_index;
config.processing.copy_audio = data.processing.copy_audio;
config.processing.copy_video = data.processing.copy_video;
config.processing.width = data.processing.width;
config.processing.height = data.processing.height;
config.processing.aspect = data.processing.aspect;
config.processing.fps = data.processing.fps;
config.processing.add_logo = data.processing.add_logo;
config.processing.logo.clone_from(&data.processing.logo);
config
.processing
.logo_scale
.clone_from(&data.processing.logo_scale);
config.processing.logo_opacity = data.processing.logo_opacity;
config
.processing
.logo_position
.clone_from(&data.processing.logo_position);
config.processing.audio_tracks = data.processing.audio_tracks;
config.processing.audio_channels = data.processing.audio_channels;
config.processing.volume = data.processing.volume;
config
.processing
.custom_filter
.clone_from(&data.processing.custom_filter);
config.ingest.enable = data.ingest.enable;
config
.ingest
.input_param
.clone_from(&data.ingest.input_param);
config
.ingest
.custom_filter
.clone_from(&data.ingest.custom_filter);
config
.playlist
.day_start
.clone_from(&data.playlist.day_start);
config.playlist.length.clone_from(&data.playlist.length);
config.playlist.infinit = data.playlist.infinit;
config.storage.filler.clone_from(&filler_path);
config
.storage
.extensions
.clone_from(&data.storage.extensions);
config.storage.shuffle = data.storage.shuffle;
config.text.add_text = data.text.add_text;
config.text.fontfile.clone_from(&data.text.fontfile);
config.text.text_from_filename = data.text.text_from_filename;
config.text.style.clone_from(&data.text.style);
config.text.regex.clone_from(&data.text.regex);
config.task.enable = data.task.enable;
config.task.path.clone_from(&data.task.path);
config.output.mode.clone_from(&data.output.mode);
config
.output
.output_param
.clone_from(&data.output.output_param);
manager.update_config(new_config);
Ok(web::Json("Update success"))
}
@ -871,7 +682,7 @@ async fn get_presets(
role: AuthDetails<Role>,
user: web::ReqData<UserMeta>,
) -> Result<impl Responder, ServiceError> {
if let Ok(presets) = handles::select_presets(&pool.into_inner(), *id).await {
if let Ok(presets) = handles::select_presets(&pool, *id).await {
return Ok(web::Json(presets));
}
@ -898,7 +709,7 @@ async fn update_preset(
role: AuthDetails<Role>,
user: web::ReqData<UserMeta>,
) -> Result<impl Responder, ServiceError> {
if handles::update_preset(&pool.into_inner(), &id, data.into_inner())
if handles::update_preset(&pool, &id, data.into_inner())
.await
.is_ok()
{
@ -928,7 +739,7 @@ async fn add_preset(
role: AuthDetails<Role>,
user: web::ReqData<UserMeta>,
) -> Result<impl Responder, ServiceError> {
if handles::insert_preset(&pool.into_inner(), data.into_inner())
if handles::insert_preset(&pool, data.into_inner())
.await
.is_ok()
{
@ -956,10 +767,7 @@ async fn delete_preset(
role: AuthDetails<Role>,
user: web::ReqData<UserMeta>,
) -> Result<impl Responder, ServiceError> {
if handles::delete_preset(&pool.into_inner(), &id)
.await
.is_ok()
{
if handles::delete_preset(&pool, &id).await.is_ok() {
return Ok("Delete preset Success");
}

View File

@ -5,7 +5,7 @@ use argon2::{
use log::*;
use rand::{distributions::Alphanumeric, Rng};
use sqlx::{sqlite::SqliteQueryResult, Pool, Sqlite};
use sqlx::{sqlite::SqliteQueryResult, Pool, Row, Sqlite};
use tokio::task;
use super::models::{AdvancedConfiguration, Configuration};
@ -73,18 +73,18 @@ pub async fn select_channel(conn: &Pool<Sqlite>, id: &i32) -> Result<Channel, sq
pub async fn select_related_channels(
conn: &Pool<Sqlite>,
ids: Option<Vec<i32>>,
user_id: Option<i32>,
) -> Result<Vec<Channel>, sqlx::Error> {
let query = match ids {
Some(ids) => format!(
"SELECT * FROM channels WHERE id IN ({}) ORDER BY id ASC;",
ids.iter()
.map(|i| i.to_string())
.collect::<Vec<String>>()
.join(", ")
let query = match user_id {
Some(id) => format!(
"SELECT c.id, c.name, c.preview_url, c.extra_extensions, c.active, c.last_date, c.time_shift FROM channels c
left join user_channels uc on uc.channel_id = c.id
left join user u on u.id = uc.user_id
WHERE u.id = {id} ORDER BY c.id ASC;"
),
None => "SELECT * FROM channels ORDER BY id ASC;".to_string(),
};
let mut results: Vec<Channel> = sqlx::query_as(&query).fetch_all(conn).await?;
for result in results.iter_mut() {
@ -318,19 +318,25 @@ pub async fn select_role(conn: &Pool<Sqlite>, id: &i32) -> Result<Role, sqlx::Er
pub async fn select_login(conn: &Pool<Sqlite>, user: &str) -> Result<User, sqlx::Error> {
let query =
"SELECT id, mail, username, password, role_id, channel_ids FROM user WHERE username = $1";
"SELECT u.id, u.mail, u.username, u.password, u.role_id, group_concat(uc.channel_id, ',') as channel_ids FROM user u
left join user_channels uc on uc.user_id = u.id
WHERE u.username = $1";
sqlx::query_as(query).bind(user).fetch_one(conn).await
}
pub async fn select_user(conn: &Pool<Sqlite>, id: i32) -> Result<User, sqlx::Error> {
let query = "SELECT id, mail, username, role_id, channel_ids FROM user WHERE id = $1";
let query = "SELECT u.id, u.mail, u.username, u.role_id, group_concat(uc.channel_id, ',') as channel_ids FROM user u
left join user_channels uc on uc.user_id = u.id
WHERE u.id = $1";
sqlx::query_as(query).bind(id).fetch_one(conn).await
}
pub async fn select_global_admins(conn: &Pool<Sqlite>) -> Result<Vec<User>, sqlx::Error> {
let query = "SELECT id, mail, username, role_id, channel_ids FROM user WHERE role_id = 1";
let query = "SELECT u.id, u.mail, u.username, u.role_id, group_concat(uc.channel_id, ',') as channel_ids FROM user u
left join user_channels uc on uc.user_id = u.id
WHERE u.role_id = 1";
sqlx::query_as(query).fetch_all(conn).await
}
@ -341,10 +347,7 @@ pub async fn select_users(conn: &Pool<Sqlite>) -> Result<Vec<User>, sqlx::Error>
sqlx::query_as(query).fetch_all(conn).await
}
pub async fn insert_user(
conn: &Pool<Sqlite>,
user: User,
) -> Result<SqliteQueryResult, sqlx::Error> {
pub async fn insert_user(conn: &Pool<Sqlite>, user: User) -> Result<(), sqlx::Error> {
let password_hash = task::spawn_blocking(move || {
let salt = SaltString::generate(&mut OsRng);
let hash = Argon2::default()
@ -356,23 +359,23 @@ pub async fn insert_user(
.await
.unwrap();
let query = "INSERT INTO user (mail, username, password, role_id, channel_ids) VALUES($1, $2, $3, $4, $5)";
let query =
"INSERT INTO user (mail, username, password, role_id) VALUES($1, $2, $3, $4) RETURNING id";
sqlx::query(query)
let user_id: i32 = sqlx::query(query)
.bind(user.mail)
.bind(user.username)
.bind(password_hash)
.bind(user.role_id)
.bind(
user.channel_ids
.unwrap_or_default()
.iter()
.map(|i| i.to_string())
.collect::<Vec<String>>()
.join(","),
)
.execute(conn)
.await
.fetch_one(conn)
.await?
.get("id");
if let Some(channel_ids) = user.channel_ids {
insert_user_channel(conn, user_id, channel_ids).await?;
}
Ok(())
}
pub async fn update_user(
@ -385,14 +388,22 @@ pub async fn update_user(
sqlx::query(&query).bind(id).execute(conn).await
}
pub async fn update_user_channel(
pub async fn insert_user_channel(
conn: &Pool<Sqlite>,
id: i32,
ids: String,
) -> Result<SqliteQueryResult, sqlx::Error> {
let query = "UPDATE user SET channel_ids = $2 WHERE id = $1";
user_id: i32,
channel_ids: Vec<i32>,
) -> Result<(), sqlx::Error> {
for channel in &channel_ids {
let query = "INSERT OR IGNORE INTO user_channels (channel_id, user_id) VALUES ($1, $2);";
sqlx::query(query).bind(id).bind(ids).execute(conn).await
sqlx::query(query)
.bind(channel)
.bind(user_id)
.execute(conn)
.await?;
}
Ok(())
}
pub async fn delete_user(conn: &Pool<Sqlite>, id: i32) -> Result<SqliteQueryResult, sqlx::Error> {

View File

@ -115,7 +115,7 @@ async fn main() -> std::io::Result<()> {
let ip_port = conn.split(':').collect::<Vec<&str>>();
let addr = ip_port[0];
let port = ip_port[1].parse::<u16>().unwrap();
let controllers = web::Data::from(channel_controllers);
let controllers = web::Data::from(channel_controllers.clone());
let auth_state = web::Data::new(SseAuthState {
uuids: tokio::sync::Mutex::new(HashSet::new()),
});
@ -226,7 +226,7 @@ async fn main() -> std::io::Result<()> {
.bind((addr, port))?
.workers(thread_count)
.run()
.await
.await?;
} else if let Some(channels) = &ARGS.channels {
for (index, channel_id) in channels.iter().enumerate() {
let channel = handles::select_channel(&pool, channel_id).await.unwrap();
@ -245,11 +245,13 @@ async fn main() -> std::io::Result<()> {
manager.foreground_start(index).await;
}
Ok(())
} else {
error!("Run ffplayout with parameters! Run ffplayout -h for more information.");
Ok(())
}
for channel in &channel_controllers.lock().unwrap().channels {
channel.stop_all();
}
Ok(())
}

View File

@ -99,6 +99,11 @@ impl ChannelManager {
channel.utc_offset.clone_from(&other.utc_offset);
}
pub fn update_config(&self, new_config: PlayoutConfig) {
let mut config = self.config.lock().unwrap();
*config = new_config;
}
pub async fn async_start(&self) {
if !self.is_alive.load(Ordering::SeqCst) {
self.run_count.fetch_add(1, Ordering::SeqCst);
@ -228,6 +233,7 @@ impl ChannelManager {
pub async fn async_stop(&self) {
debug!("Stop all child processes");
self.is_terminated.store(true, Ordering::SeqCst);
self.is_alive.store(false, Ordering::SeqCst);
self.ingest_is_running.store(false, Ordering::SeqCst);
self.run_count.fetch_sub(1, Ordering::SeqCst);
let pool = self.db_pool.clone().unwrap();
@ -237,21 +243,10 @@ impl ChannelManager {
error!("Unable write to player status: {e}");
};
if self.is_alive.load(Ordering::SeqCst) {
self.is_alive.store(false, Ordering::SeqCst);
trace!("Playout is alive and processes are terminated");
for unit in [Decoder, Encoder, Ingest] {
if let Err(e) = self.stop(unit) {
if !e.to_string().contains("exited process") {
error!("{e}")
}
}
if let Err(e) = self.wait(unit) {
if !e.to_string().contains("exited process") {
error!("{e}")
}
for unit in [Decoder, Encoder, Ingest] {
if let Err(e) = self.stop(unit) {
if !e.to_string().contains("exited process") {
error!("{e}")
}
}
}

View File

@ -1,6 +1,6 @@
use std::{
io::{BufRead, BufReader, Read},
process::{exit, ChildStderr, Command, Stdio},
process::{ChildStderr, Command, Stdio},
sync::atomic::Ordering,
thread,
};
@ -92,7 +92,6 @@ pub fn ingest_server(
if let Some(url) = stream_input.iter().find(|s| s.contains("://")) {
if !test_tcp_port(id, url) {
channel_mgr.stop_all();
exit(1);
}
info!(target: Target::file_mail(), channel = id; "Start ingest server, listening on: <b><magenta>{url}</></b>",);

View File

@ -19,10 +19,10 @@ out:
use std::{
io::{BufRead, BufReader},
process::{exit, Command, Stdio},
process::{Command, Stdio},
sync::atomic::Ordering,
thread::{self, sleep},
time::Duration,
time::{Duration, SystemTime},
};
use log::*;
@ -67,7 +67,6 @@ fn ingest_to_hls_server(manager: ChannelManager) -> Result<(), ProcessError> {
if let Some(url) = stream_input.iter().find(|s| s.contains("://")) {
if !test_tcp_port(id, url) {
manager.stop_all();
exit(1);
}
info!(target: Target::file_mail(), channel = id; "Start ingest server, listening on: <b><magenta>{url}</></b>");
@ -151,6 +150,7 @@ pub fn write_hls(manager: ChannelManager) -> Result<(), ProcessError> {
let config = manager.config.lock()?.clone();
let id = config.general.channel_id;
let current_media = manager.current_media.clone();
let is_terminated = manager.is_terminated.clone();
let ff_log_format = format!("level+{}", config.logging.ffmpeg_level.to_lowercase());
@ -164,9 +164,16 @@ pub fn write_hls(manager: ChannelManager) -> Result<(), ProcessError> {
thread::spawn(move || ingest_to_hls_server(channel_mgr_2));
}
let mut error_count = 0;
for node in get_source {
*current_media.lock().unwrap() = Some(node.clone());
let ignore = config.logging.ignore_lines.clone();
let timer = SystemTime::now();
if is_terminated.load(Ordering::SeqCst) {
break;
}
let mut cmd = match &node.cmd {
Some(cmd) => cmd.clone(),
@ -196,10 +203,10 @@ pub fn write_hls(manager: ChannelManager) -> Result<(), ProcessError> {
}
}
let mut enc_prefix = vec_strings!["-hide_banner", "-nostats", "-v", &ff_log_format];
let mut dec_prefix = vec_strings!["-hide_banner", "-nostats", "-v", &ff_log_format];
if let Some(encoder_input_cmd) = &config.advanced.encoder.input_cmd {
enc_prefix.append(&mut encoder_input_cmd.clone());
if let Some(decoder_input_cmd) = &config.advanced.decoder.input_cmd {
dec_prefix.append(&mut decoder_input_cmd.clone());
}
let mut read_rate = 1.0;
@ -218,18 +225,18 @@ pub fn write_hls(manager: ChannelManager) -> Result<(), ProcessError> {
}
}
enc_prefix.append(&mut vec_strings!["-readrate", read_rate]);
dec_prefix.append(&mut vec_strings!["-readrate", read_rate]);
enc_prefix.append(&mut cmd);
let enc_cmd = prepare_output_cmd(&config, enc_prefix, &node.filter);
dec_prefix.append(&mut cmd);
let dec_cmd = prepare_output_cmd(&config, dec_prefix, &node.filter);
debug!(target: Target::file_mail(), channel = id;
"HLS writer CMD: <bright-blue>\"ffmpeg {}\"</>",
enc_cmd.join(" ")
dec_cmd.join(" ")
);
let mut dec_proc = match Command::new("ffmpeg")
.args(enc_cmd)
.args(dec_cmd)
.stderr(Stdio::piped())
.spawn()
{
@ -240,10 +247,10 @@ pub fn write_hls(manager: ChannelManager) -> Result<(), ProcessError> {
}
};
let enc_err = BufReader::new(dec_proc.stderr.take().unwrap());
let dec_err = BufReader::new(dec_proc.stderr.take().unwrap());
*manager.decoder.lock().unwrap() = Some(dec_proc);
if let Err(e) = stderr_reader(enc_err, ignore, Decoder, manager.clone()) {
if let Err(e) = stderr_reader(dec_err, ignore, Decoder, manager.clone()) {
error!(target: Target::file_mail(), channel = id; "{e:?}")
};
@ -254,6 +261,19 @@ pub fn write_hls(manager: ChannelManager) -> Result<(), ProcessError> {
while ingest_is_running.load(Ordering::SeqCst) {
sleep(Duration::from_secs(1));
}
if let Ok(elapsed) = timer.elapsed() {
if elapsed.as_millis() < 300 {
error_count += 1;
if error_count > 10 {
error!(target: Target::file_mail(), channel = id; "Reach fatal error count, terminate channel!");
break;
}
} else {
error_count = 0;
}
}
}
sleep(Duration::from_secs(1));

View File

@ -3,7 +3,7 @@ use std::{
process::{Command, Stdio},
sync::atomic::Ordering,
thread::{self, sleep},
time::Duration,
time::{Duration, SystemTime},
};
use crossbeam_channel::bounded;
@ -79,11 +79,14 @@ pub fn player(manager: ChannelManager) -> Result<(), ProcessError> {
drop(config);
let mut error_count = 0;
'source_iter: for node in node_sources {
let config = manager.config.lock()?.clone();
*manager.current_media.lock().unwrap() = Some(node.clone());
let ignore_dec = config.logging.ignore_lines.clone();
let timer = SystemTime::now();
if is_terminated.load(Ordering::SeqCst) {
debug!(target: Target::file_mail(), channel = id; "Playout is terminated, break out from source loop");
@ -237,6 +240,19 @@ pub fn player(manager: ChannelManager) -> Result<(), ProcessError> {
if let Err(e) = error_decoder_thread.join() {
error!(target: Target::file_mail(), channel = id; "{e:?}");
};
if let Ok(elapsed) = timer.elapsed() {
if elapsed.as_millis() < 300 {
error_count += 1;
if error_count > 10 {
error!(target: Target::file_mail(), channel = id; "Reach fatal error count, terminate channel!");
break;
}
} else {
error_count = 0;
}
}
}
trace!("Out of source loop");

View File

@ -812,7 +812,6 @@ pub fn stderr_reader(
&& !line.contains("failed to delete old segment"))
{
manager.stop_all();
exit(1);
}
}
}

View File

@ -149,7 +149,7 @@ pub async fn run_args(pool: &Pool<Sqlite>) -> Result<(), i32> {
let mut args = ARGS.clone();
if args.init {
let check_user = handles::select_user(pool, 1).await;
let check_user = handles::select_users(pool).await;
let mut storage = String::new();
let mut playlist = String::new();
@ -166,7 +166,7 @@ pub async fn run_args(pool: &Pool<Sqlite>) -> Result<(), i32> {
shared_storage: false,
};
if check_user.is_err() {
if check_user.unwrap_or_default().is_empty() {
global_user(&mut args);
}

View File

@ -16,16 +16,9 @@ async fn map_global_admins(conn: &Pool<Sqlite>) -> Result<(), ServiceError> {
let admins = handles::select_global_admins(conn).await?;
for admin in admins {
if let Err(e) = handles::update_user_channel(
conn,
admin.id,
channels
.iter()
.map(|c| c.id.to_string())
.collect::<Vec<String>>()
.join(","),
)
.await
if let Err(e) =
handles::insert_user_channel(conn, admin.id, channels.iter().map(|c| c.id).collect())
.await
{
error!("Update global admin: {e}");
};
@ -42,7 +35,7 @@ pub async fn create_channel(
) -> Result<Channel, ServiceError> {
let channel = handles::insert_channel(conn, target_channel).await?;
let output_param = format!("-c:v libx264 -crf 23 -x264-params keyint=50:min-keyint=25:scenecut=-1 -maxrate 1300k -bufsize 2600k -preset faster -tune zerolatency -profile:v Main -level 3.1 -c:a aac -ar 44100 -b:a 128k -flags +cgop -f hls -hls_time 6 -hls_list_size 600 -hls_flags append_list+delete_segments+omit_endlist -hls_segment_filename /usr/share/ffplayout/public/live/stream{0}-%d.ts /usr/share/ffplayout/public/live/stream{0}.m3u8", channel.id);
let output_param = format!("-c:v libx264 -crf 23 -x264-params keyint=50:min-keyint=25:scenecut=-1 -maxrate 1300k -bufsize 2600k -preset faster -tune zerolatency -profile:v Main -level 3.1 -c:a aac -ar 44100 -b:a 128k -flags +cgop -f hls -hls_time 6 -hls_list_size 600 -hls_flags append_list+delete_segments+omit_endlist -hls_segment_filename live/stream{0}-%d.ts live/stream{0}.m3u8", channel.id);
handles::insert_advanced_configuration(conn, channel.id).await?;
handles::insert_configuration(conn, channel.id, output_param).await?;

View File

@ -678,6 +678,14 @@ impl PlayoutConfig {
cmd.remove(i);
}
for item in cmd.iter_mut() {
if item.ends_with(".ts") || item.ends_with(".m3u8") {
if let Ok((hls_path, _, _)) = norm_abs_path(&global.hls_path, &item) {
item.clone_from(&hls_path.to_string_lossy().to_string());
};
}
}
output.output_cmd = Some(cmd);
}

View File

@ -7,7 +7,6 @@
use std::{
fs::{create_dir_all, write},
io::Error,
process::exit,
};
use chrono::Timelike;
@ -227,8 +226,6 @@ pub fn playlist_generator(manager: &ChannelManager) -> Result<Vec<JsonPlaylist>,
"Playlist folder <b><magenta>{:?}</></b> not exists!",
config.global.playlist_path
);
exit(1);
}
if let Some(range) = config.general.generate.clone() {

View File

@ -274,7 +274,7 @@ fn file_formatter(
) -> std::io::Result<()> {
write!(
w,
"[{}] {} {}",
"[{}] [{:>5}] {}",
now.now().format("%Y-%m-%d %H:%M:%S%.6f"),
record.level(),
record.args()

View File

@ -57,11 +57,21 @@ CREATE TABLE
username TEXT NOT NULL,
password TEXT NOT NULL,
role_id INTEGER NOT NULL DEFAULT 3,
channel_ids TEXT DEFAULT "1",
FOREIGN KEY (role_id) REFERENCES roles (id) ON UPDATE SET NULL ON DELETE SET DEFAULT,
UNIQUE (mail, username)
);
CREATE TABLE
user_channels (
id INTEGER PRIMARY KEY,
channel_id INTEGER NOT NULL,
user_id INTEGER NOT NULL,
FOREIGN KEY (channel_id) REFERENCES channels (id) ON UPDATE CASCADE ON DELETE CASCADE,
FOREIGN KEY (user_id) REFERENCES user (id) ON UPDATE CASCADE ON DELETE CASCADE
);
CREATE UNIQUE INDEX IF NOT EXISTS idx_user_channels_unique ON user_channels (channel_id, user_id);
CREATE TABLE
configurations (
id INTEGER PRIMARY KEY AUTOINCREMENT,