diff --git a/ffplayout/src/api/routes.rs b/ffplayout/src/api/routes.rs index becb849c..7000013a 100644 --- a/ffplayout/src/api/routes.rs +++ b/ffplayout/src/api/routes.rs @@ -25,7 +25,6 @@ use actix_web::{ patch, post, put, web, HttpRequest, HttpResponse, Responder, }; use actix_web_grants::{authorities::AuthDetails, proc_macro::protect}; -use shlex::split; use argon2::{ password_hash::{rand_core::OsRng, PasswordHash, SaltString}, @@ -162,13 +161,12 @@ struct ProgramItem { /// ``` #[post("/auth/login/")] pub async fn login(pool: web::Data>, credentials: web::Json) -> impl Responder { - let conn = pool.into_inner(); let username = credentials.username.clone(); let password = credentials.password.clone(); - match handles::select_login(&conn, &username).await { + match handles::select_login(&pool, &username).await { Ok(mut user) => { - let role = handles::select_role(&conn, &user.role_id.unwrap_or_default()) + let role = handles::select_role(&pool, &user.role_id.unwrap_or_default()) .await .unwrap_or(Role::Guest); @@ -244,7 +242,7 @@ async fn get_user( pool: web::Data>, user: web::ReqData, ) -> Result { - match handles::select_user(&pool.into_inner(), user.id).await { + match handles::select_user(&pool, user.id).await { Ok(user) => Ok(web::Json(user)), Err(e) => { error!("{e}"); @@ -265,7 +263,7 @@ async fn get_by_name( pool: web::Data>, id: web::Path, ) -> Result { - match handles::select_user(&pool.into_inner(), *id).await { + match handles::select_user(&pool, *id).await { Ok(user) => Ok(web::Json(user)), Err(e) => { error!("{e}"); @@ -283,7 +281,7 @@ async fn get_by_name( #[get("/users")] #[protect("Role::GlobalAdmin", ty = "Role")] async fn get_users(pool: web::Data>) -> Result { - match handles::select_users(&pool.into_inner()).await { + match handles::select_users(&pool).await { Ok(users) => Ok(web::Json(users)), Err(e) => { error!("{e}"); @@ -356,10 +354,7 @@ async fn update_user( fields.push_str(&format!("password = '{password_hash}'")); } - if handles::update_user(&pool.into_inner(), *id, fields) - .await - .is_ok() - { + if handles::update_user(&pool, *id, fields).await.is_ok() { return Ok("Update Success"); }; @@ -379,7 +374,7 @@ async fn add_user( pool: web::Data>, data: web::Json, ) -> Result { - match handles::insert_user(&pool.into_inner(), data.into_inner()).await { + match handles::insert_user(&pool, data.into_inner()).await { Ok(_) => Ok("Add User Success"), Err(e) => { error!("{e}"); @@ -400,7 +395,7 @@ async fn remove_user( pool: web::Data>, id: web::Path, ) -> Result { - match handles::delete_user(&pool.into_inner(), *id).await { + match handles::delete_user(&pool, *id).await { Ok(_) => return Ok("Delete user success"), Err(e) => { error!("{e}"); @@ -440,7 +435,7 @@ async fn get_channel( role: AuthDetails, user: web::ReqData, ) -> Result { - if let Ok(channel) = handles::select_channel(&pool.into_inner(), &id).await { + if let Ok(channel) = handles::select_channel(&pool, &id).await { return Ok(web::Json(channel)); } @@ -461,9 +456,7 @@ async fn get_all_channels( pool: web::Data>, user: web::ReqData, ) -> Result { - if let Ok(channel) = - handles::select_related_channels(&pool.into_inner(), Some(user.channels.clone())).await - { + if let Ok(channel) = handles::select_related_channels(&pool, Some(user.id.clone())).await { return Ok(web::Json(channel)); } @@ -491,7 +484,7 @@ async fn patch_channel( role: AuthDetails, user: web::ReqData, ) -> Result { - if handles::update_channel(&pool.into_inner(), *id, data.into_inner()) + if handles::update_channel(&pool, *id, data.into_inner()) .await .is_ok() { @@ -517,7 +510,7 @@ async fn add_channel( queue: web::Data>>>>, ) -> Result { match create_channel( - &pool.into_inner(), + &pool, controllers.into_inner(), queue.into_inner(), data.into_inner(), @@ -542,14 +535,9 @@ async fn remove_channel( controllers: web::Data>, queue: web::Data>>>>, ) -> Result { - if delete_channel( - &pool.into_inner(), - *id, - controllers.into_inner(), - queue.into_inner(), - ) - .await - .is_ok() + if delete_channel(&pool, *id, controllers.into_inner(), queue.into_inner()) + .await + .is_ok() { return Ok("Delete Channel Success"); } @@ -606,100 +594,11 @@ async fn update_advanced_config( user: web::ReqData, ) -> Result { let manager = controllers.lock().unwrap().get(*id).unwrap(); - let id = manager.config.lock().unwrap().general.id; - if let Err(e) = - handles::update_advanced_configuration(&pool.into_inner(), id, data.clone()).await - { - return Err(ServiceError::Conflict(format!("{e}"))); - } + handles::update_advanced_configuration(&pool, *id, data.clone()).await?; + let new_config = PlayoutConfig::new(&pool, *id).await; - let mut cfg = manager.config.lock().unwrap(); - - cfg.advanced - .decoder - .input_param - .clone_from(&data.decoder.input_param); - cfg.advanced - .decoder - .output_param - .clone_from(&data.decoder.output_param); - cfg.advanced.decoder.input_cmd = split(&data.decoder.input_param.clone().unwrap_or_default()); - cfg.advanced.decoder.output_cmd = split(&data.decoder.output_param.clone().unwrap_or_default()); - cfg.advanced - .encoder - .input_param - .clone_from(&data.encoder.input_param); - cfg.advanced.encoder.input_cmd = split(&data.encoder.input_param.clone().unwrap_or_default()); - cfg.advanced - .ingest - .input_param - .clone_from(&data.encoder.input_param); - cfg.advanced.ingest.input_cmd = split(&data.ingest.input_param.clone().unwrap_or_default()); - cfg.advanced - .filter - .deinterlace - .clone_from(&data.filter.deinterlace); - cfg.advanced - .filter - .pad_scale_w - .clone_from(&data.filter.pad_scale_w); - cfg.advanced - .filter - .pad_scale_h - .clone_from(&data.filter.pad_scale_h); - cfg.advanced - .filter - .pad_video - .clone_from(&data.filter.pad_video); - cfg.advanced.filter.fps.clone_from(&data.filter.fps); - cfg.advanced.filter.scale.clone_from(&data.filter.scale); - cfg.advanced.filter.set_dar.clone_from(&data.filter.set_dar); - cfg.advanced.filter.fade_in.clone_from(&data.filter.fade_in); - cfg.advanced - .filter - .fade_out - .clone_from(&data.filter.fade_out); - cfg.advanced - .filter - .overlay_logo_scale - .clone_from(&data.filter.overlay_logo_scale); - cfg.advanced - .filter - .overlay_logo_fade_in - .clone_from(&data.filter.overlay_logo_fade_in); - cfg.advanced - .filter - .overlay_logo_fade_out - .clone_from(&data.filter.overlay_logo_fade_out); - cfg.advanced - .filter - .overlay_logo - .clone_from(&data.filter.overlay_logo); - cfg.advanced.filter.tpad.clone_from(&data.filter.tpad); - cfg.advanced - .filter - .drawtext_from_file - .clone_from(&data.filter.drawtext_from_file); - cfg.advanced - .filter - .drawtext_from_zmq - .clone_from(&data.filter.drawtext_from_zmq); - cfg.advanced - .filter - .aevalsrc - .clone_from(&data.filter.aevalsrc); - cfg.advanced - .filter - .afade_in - .clone_from(&data.filter.afade_in); - cfg.advanced - .filter - .afade_out - .clone_from(&data.filter.afade_out); - cfg.advanced.filter.apad.clone_from(&data.filter.apad); - cfg.advanced.filter.volume.clone_from(&data.filter.volume); - cfg.advanced.filter.split.clone_from(&data.filter.split); + manager.update_config(new_config); Ok(web::Json("Update success")) } @@ -751,100 +650,12 @@ async fn update_playout_config( user: web::ReqData, ) -> Result { let manager = controllers.lock().unwrap().get(*id).unwrap(); - let id = manager.config.lock().unwrap().general.id; + let config_id = manager.config.lock().unwrap().general.id; - if let Err(e) = handles::update_configuration(&pool.into_inner(), id, data.clone()).await { - return Err(ServiceError::Conflict(format!("{e}"))); - } + handles::update_configuration(&pool, config_id, data.clone()).await?; + let new_config = PlayoutConfig::new(&pool, *id).await; - let mut config = manager.config.lock().unwrap(); - let (filler_path, _, _) = norm_abs_path( - &config.global.storage_path, - data.storage.filler.to_string_lossy().as_ref(), - )?; - - config.general.stop_threshold = data.general.stop_threshold; - config.mail.subject.clone_from(&data.mail.subject); - config.mail.smtp_server.clone_from(&data.mail.smtp_server); - config.mail.starttls = data.mail.starttls; - config.mail.sender_addr.clone_from(&data.mail.sender_addr); - config.mail.sender_pass.clone_from(&data.mail.sender_pass); - config.mail.recipient.clone_from(&data.mail.recipient); - config.mail.mail_level = data.mail.mail_level; - config.mail.interval = data.mail.interval; - config - .logging - .ffmpeg_level - .clone_from(&data.logging.ffmpeg_level); - config - .logging - .ingest_level - .clone_from(&data.logging.ingest_level); - config.logging.detect_silence = data.logging.detect_silence; - config - .logging - .ignore_lines - .clone_from(&data.logging.ignore_lines); - config.processing.mode.clone_from(&data.processing.mode); - config.processing.audio_only = data.processing.audio_only; - config.processing.audio_track_index = data.processing.audio_track_index; - config.processing.copy_audio = data.processing.copy_audio; - config.processing.copy_video = data.processing.copy_video; - config.processing.width = data.processing.width; - config.processing.height = data.processing.height; - config.processing.aspect = data.processing.aspect; - config.processing.fps = data.processing.fps; - config.processing.add_logo = data.processing.add_logo; - config.processing.logo.clone_from(&data.processing.logo); - config - .processing - .logo_scale - .clone_from(&data.processing.logo_scale); - config.processing.logo_opacity = data.processing.logo_opacity; - config - .processing - .logo_position - .clone_from(&data.processing.logo_position); - config.processing.audio_tracks = data.processing.audio_tracks; - config.processing.audio_channels = data.processing.audio_channels; - config.processing.volume = data.processing.volume; - config - .processing - .custom_filter - .clone_from(&data.processing.custom_filter); - config.ingest.enable = data.ingest.enable; - config - .ingest - .input_param - .clone_from(&data.ingest.input_param); - config - .ingest - .custom_filter - .clone_from(&data.ingest.custom_filter); - config - .playlist - .day_start - .clone_from(&data.playlist.day_start); - config.playlist.length.clone_from(&data.playlist.length); - config.playlist.infinit = data.playlist.infinit; - config.storage.filler.clone_from(&filler_path); - config - .storage - .extensions - .clone_from(&data.storage.extensions); - config.storage.shuffle = data.storage.shuffle; - config.text.add_text = data.text.add_text; - config.text.fontfile.clone_from(&data.text.fontfile); - config.text.text_from_filename = data.text.text_from_filename; - config.text.style.clone_from(&data.text.style); - config.text.regex.clone_from(&data.text.regex); - config.task.enable = data.task.enable; - config.task.path.clone_from(&data.task.path); - config.output.mode.clone_from(&data.output.mode); - config - .output - .output_param - .clone_from(&data.output.output_param); + manager.update_config(new_config); Ok(web::Json("Update success")) } @@ -871,7 +682,7 @@ async fn get_presets( role: AuthDetails, user: web::ReqData, ) -> Result { - if let Ok(presets) = handles::select_presets(&pool.into_inner(), *id).await { + if let Ok(presets) = handles::select_presets(&pool, *id).await { return Ok(web::Json(presets)); } @@ -898,7 +709,7 @@ async fn update_preset( role: AuthDetails, user: web::ReqData, ) -> Result { - if handles::update_preset(&pool.into_inner(), &id, data.into_inner()) + if handles::update_preset(&pool, &id, data.into_inner()) .await .is_ok() { @@ -928,7 +739,7 @@ async fn add_preset( role: AuthDetails, user: web::ReqData, ) -> Result { - if handles::insert_preset(&pool.into_inner(), data.into_inner()) + if handles::insert_preset(&pool, data.into_inner()) .await .is_ok() { @@ -956,10 +767,7 @@ async fn delete_preset( role: AuthDetails, user: web::ReqData, ) -> Result { - if handles::delete_preset(&pool.into_inner(), &id) - .await - .is_ok() - { + if handles::delete_preset(&pool, &id).await.is_ok() { return Ok("Delete preset Success"); } diff --git a/ffplayout/src/db/handles.rs b/ffplayout/src/db/handles.rs index 2ef90904..9cfa90b0 100644 --- a/ffplayout/src/db/handles.rs +++ b/ffplayout/src/db/handles.rs @@ -5,7 +5,7 @@ use argon2::{ use log::*; use rand::{distributions::Alphanumeric, Rng}; -use sqlx::{sqlite::SqliteQueryResult, Pool, Sqlite}; +use sqlx::{sqlite::SqliteQueryResult, Pool, Row, Sqlite}; use tokio::task; use super::models::{AdvancedConfiguration, Configuration}; @@ -73,18 +73,18 @@ pub async fn select_channel(conn: &Pool, id: &i32) -> Result, - ids: Option>, + user_id: Option, ) -> Result, sqlx::Error> { - let query = match ids { - Some(ids) => format!( - "SELECT * FROM channels WHERE id IN ({}) ORDER BY id ASC;", - ids.iter() - .map(|i| i.to_string()) - .collect::>() - .join(", ") + let query = match user_id { + Some(id) => format!( + "SELECT c.id, c.name, c.preview_url, c.extra_extensions, c.active, c.last_date, c.time_shift FROM channels c + left join user_channels uc on uc.channel_id = c.id + left join user u on u.id = uc.user_id + WHERE u.id = {id} ORDER BY c.id ASC;" ), None => "SELECT * FROM channels ORDER BY id ASC;".to_string(), }; + let mut results: Vec = sqlx::query_as(&query).fetch_all(conn).await?; for result in results.iter_mut() { @@ -318,19 +318,25 @@ pub async fn select_role(conn: &Pool, id: &i32) -> Result, user: &str) -> Result { let query = - "SELECT id, mail, username, password, role_id, channel_ids FROM user WHERE username = $1"; + "SELECT u.id, u.mail, u.username, u.password, u.role_id, group_concat(uc.channel_id, ',') as channel_ids FROM user u + left join user_channels uc on uc.user_id = u.id + WHERE u.username = $1"; sqlx::query_as(query).bind(user).fetch_one(conn).await } pub async fn select_user(conn: &Pool, id: i32) -> Result { - let query = "SELECT id, mail, username, role_id, channel_ids FROM user WHERE id = $1"; + let query = "SELECT u.id, u.mail, u.username, u.role_id, group_concat(uc.channel_id, ',') as channel_ids FROM user u + left join user_channels uc on uc.user_id = u.id + WHERE u.id = $1"; sqlx::query_as(query).bind(id).fetch_one(conn).await } pub async fn select_global_admins(conn: &Pool) -> Result, sqlx::Error> { - let query = "SELECT id, mail, username, role_id, channel_ids FROM user WHERE role_id = 1"; + let query = "SELECT u.id, u.mail, u.username, u.role_id, group_concat(uc.channel_id, ',') as channel_ids FROM user u + left join user_channels uc on uc.user_id = u.id + WHERE u.role_id = 1"; sqlx::query_as(query).fetch_all(conn).await } @@ -341,10 +347,7 @@ pub async fn select_users(conn: &Pool) -> Result, sqlx::Error> sqlx::query_as(query).fetch_all(conn).await } -pub async fn insert_user( - conn: &Pool, - user: User, -) -> Result { +pub async fn insert_user(conn: &Pool, user: User) -> Result<(), sqlx::Error> { let password_hash = task::spawn_blocking(move || { let salt = SaltString::generate(&mut OsRng); let hash = Argon2::default() @@ -356,23 +359,23 @@ pub async fn insert_user( .await .unwrap(); - let query = "INSERT INTO user (mail, username, password, role_id, channel_ids) VALUES($1, $2, $3, $4, $5)"; + let query = + "INSERT INTO user (mail, username, password, role_id) VALUES($1, $2, $3, $4) RETURNING id"; - sqlx::query(query) + let user_id: i32 = sqlx::query(query) .bind(user.mail) .bind(user.username) .bind(password_hash) .bind(user.role_id) - .bind( - user.channel_ids - .unwrap_or_default() - .iter() - .map(|i| i.to_string()) - .collect::>() - .join(","), - ) - .execute(conn) - .await + .fetch_one(conn) + .await? + .get("id"); + + if let Some(channel_ids) = user.channel_ids { + insert_user_channel(conn, user_id, channel_ids).await?; + } + + Ok(()) } pub async fn update_user( @@ -385,14 +388,22 @@ pub async fn update_user( sqlx::query(&query).bind(id).execute(conn).await } -pub async fn update_user_channel( +pub async fn insert_user_channel( conn: &Pool, - id: i32, - ids: String, -) -> Result { - let query = "UPDATE user SET channel_ids = $2 WHERE id = $1"; + user_id: i32, + channel_ids: Vec, +) -> Result<(), sqlx::Error> { + for channel in &channel_ids { + let query = "INSERT OR IGNORE INTO user_channels (channel_id, user_id) VALUES ($1, $2);"; - sqlx::query(query).bind(id).bind(ids).execute(conn).await + sqlx::query(query) + .bind(channel) + .bind(user_id) + .execute(conn) + .await?; + } + + Ok(()) } pub async fn delete_user(conn: &Pool, id: i32) -> Result { diff --git a/ffplayout/src/main.rs b/ffplayout/src/main.rs index 405ac218..60c9878b 100644 --- a/ffplayout/src/main.rs +++ b/ffplayout/src/main.rs @@ -115,7 +115,7 @@ async fn main() -> std::io::Result<()> { let ip_port = conn.split(':').collect::>(); let addr = ip_port[0]; let port = ip_port[1].parse::().unwrap(); - let controllers = web::Data::from(channel_controllers); + let controllers = web::Data::from(channel_controllers.clone()); let auth_state = web::Data::new(SseAuthState { uuids: tokio::sync::Mutex::new(HashSet::new()), }); @@ -226,7 +226,7 @@ async fn main() -> std::io::Result<()> { .bind((addr, port))? .workers(thread_count) .run() - .await + .await?; } else if let Some(channels) = &ARGS.channels { for (index, channel_id) in channels.iter().enumerate() { let channel = handles::select_channel(&pool, channel_id).await.unwrap(); @@ -245,11 +245,13 @@ async fn main() -> std::io::Result<()> { manager.foreground_start(index).await; } - - Ok(()) } else { error!("Run ffplayout with parameters! Run ffplayout -h for more information."); - - Ok(()) } + + for channel in &channel_controllers.lock().unwrap().channels { + channel.stop_all(); + } + + Ok(()) } diff --git a/ffplayout/src/player/controller.rs b/ffplayout/src/player/controller.rs index ce3c77e7..e1389de9 100644 --- a/ffplayout/src/player/controller.rs +++ b/ffplayout/src/player/controller.rs @@ -99,6 +99,11 @@ impl ChannelManager { channel.utc_offset.clone_from(&other.utc_offset); } + pub fn update_config(&self, new_config: PlayoutConfig) { + let mut config = self.config.lock().unwrap(); + *config = new_config; + } + pub async fn async_start(&self) { if !self.is_alive.load(Ordering::SeqCst) { self.run_count.fetch_add(1, Ordering::SeqCst); @@ -228,6 +233,7 @@ impl ChannelManager { pub async fn async_stop(&self) { debug!("Stop all child processes"); self.is_terminated.store(true, Ordering::SeqCst); + self.is_alive.store(false, Ordering::SeqCst); self.ingest_is_running.store(false, Ordering::SeqCst); self.run_count.fetch_sub(1, Ordering::SeqCst); let pool = self.db_pool.clone().unwrap(); @@ -237,21 +243,10 @@ impl ChannelManager { error!("Unable write to player status: {e}"); }; - if self.is_alive.load(Ordering::SeqCst) { - self.is_alive.store(false, Ordering::SeqCst); - - trace!("Playout is alive and processes are terminated"); - - for unit in [Decoder, Encoder, Ingest] { - if let Err(e) = self.stop(unit) { - if !e.to_string().contains("exited process") { - error!("{e}") - } - } - if let Err(e) = self.wait(unit) { - if !e.to_string().contains("exited process") { - error!("{e}") - } + for unit in [Decoder, Encoder, Ingest] { + if let Err(e) = self.stop(unit) { + if !e.to_string().contains("exited process") { + error!("{e}") } } } diff --git a/ffplayout/src/player/input/ingest.rs b/ffplayout/src/player/input/ingest.rs index a1b73be4..a574cab9 100644 --- a/ffplayout/src/player/input/ingest.rs +++ b/ffplayout/src/player/input/ingest.rs @@ -1,6 +1,6 @@ use std::{ io::{BufRead, BufReader, Read}, - process::{exit, ChildStderr, Command, Stdio}, + process::{ChildStderr, Command, Stdio}, sync::atomic::Ordering, thread, }; @@ -92,7 +92,6 @@ pub fn ingest_server( if let Some(url) = stream_input.iter().find(|s| s.contains("://")) { if !test_tcp_port(id, url) { channel_mgr.stop_all(); - exit(1); } info!(target: Target::file_mail(), channel = id; "Start ingest server, listening on: {url}",); diff --git a/ffplayout/src/player/output/hls.rs b/ffplayout/src/player/output/hls.rs index 2ed1c136..3621b7f7 100644 --- a/ffplayout/src/player/output/hls.rs +++ b/ffplayout/src/player/output/hls.rs @@ -19,10 +19,10 @@ out: use std::{ io::{BufRead, BufReader}, - process::{exit, Command, Stdio}, + process::{Command, Stdio}, sync::atomic::Ordering, thread::{self, sleep}, - time::Duration, + time::{Duration, SystemTime}, }; use log::*; @@ -67,7 +67,6 @@ fn ingest_to_hls_server(manager: ChannelManager) -> Result<(), ProcessError> { if let Some(url) = stream_input.iter().find(|s| s.contains("://")) { if !test_tcp_port(id, url) { manager.stop_all(); - exit(1); } info!(target: Target::file_mail(), channel = id; "Start ingest server, listening on: {url}"); @@ -151,6 +150,7 @@ pub fn write_hls(manager: ChannelManager) -> Result<(), ProcessError> { let config = manager.config.lock()?.clone(); let id = config.general.channel_id; let current_media = manager.current_media.clone(); + let is_terminated = manager.is_terminated.clone(); let ff_log_format = format!("level+{}", config.logging.ffmpeg_level.to_lowercase()); @@ -164,9 +164,16 @@ pub fn write_hls(manager: ChannelManager) -> Result<(), ProcessError> { thread::spawn(move || ingest_to_hls_server(channel_mgr_2)); } + let mut error_count = 0; + for node in get_source { *current_media.lock().unwrap() = Some(node.clone()); let ignore = config.logging.ignore_lines.clone(); + let timer = SystemTime::now(); + + if is_terminated.load(Ordering::SeqCst) { + break; + } let mut cmd = match &node.cmd { Some(cmd) => cmd.clone(), @@ -196,10 +203,10 @@ pub fn write_hls(manager: ChannelManager) -> Result<(), ProcessError> { } } - let mut enc_prefix = vec_strings!["-hide_banner", "-nostats", "-v", &ff_log_format]; + let mut dec_prefix = vec_strings!["-hide_banner", "-nostats", "-v", &ff_log_format]; - if let Some(encoder_input_cmd) = &config.advanced.encoder.input_cmd { - enc_prefix.append(&mut encoder_input_cmd.clone()); + if let Some(decoder_input_cmd) = &config.advanced.decoder.input_cmd { + dec_prefix.append(&mut decoder_input_cmd.clone()); } let mut read_rate = 1.0; @@ -218,18 +225,18 @@ pub fn write_hls(manager: ChannelManager) -> Result<(), ProcessError> { } } - enc_prefix.append(&mut vec_strings!["-readrate", read_rate]); + dec_prefix.append(&mut vec_strings!["-readrate", read_rate]); - enc_prefix.append(&mut cmd); - let enc_cmd = prepare_output_cmd(&config, enc_prefix, &node.filter); + dec_prefix.append(&mut cmd); + let dec_cmd = prepare_output_cmd(&config, dec_prefix, &node.filter); debug!(target: Target::file_mail(), channel = id; "HLS writer CMD: \"ffmpeg {}\"", - enc_cmd.join(" ") + dec_cmd.join(" ") ); let mut dec_proc = match Command::new("ffmpeg") - .args(enc_cmd) + .args(dec_cmd) .stderr(Stdio::piped()) .spawn() { @@ -240,10 +247,10 @@ pub fn write_hls(manager: ChannelManager) -> Result<(), ProcessError> { } }; - let enc_err = BufReader::new(dec_proc.stderr.take().unwrap()); + let dec_err = BufReader::new(dec_proc.stderr.take().unwrap()); *manager.decoder.lock().unwrap() = Some(dec_proc); - if let Err(e) = stderr_reader(enc_err, ignore, Decoder, manager.clone()) { + if let Err(e) = stderr_reader(dec_err, ignore, Decoder, manager.clone()) { error!(target: Target::file_mail(), channel = id; "{e:?}") }; @@ -254,6 +261,19 @@ pub fn write_hls(manager: ChannelManager) -> Result<(), ProcessError> { while ingest_is_running.load(Ordering::SeqCst) { sleep(Duration::from_secs(1)); } + + if let Ok(elapsed) = timer.elapsed() { + if elapsed.as_millis() < 300 { + error_count += 1; + + if error_count > 10 { + error!(target: Target::file_mail(), channel = id; "Reach fatal error count, terminate channel!"); + break; + } + } else { + error_count = 0; + } + } } sleep(Duration::from_secs(1)); diff --git a/ffplayout/src/player/output/mod.rs b/ffplayout/src/player/output/mod.rs index 64ad38db..368b7e5b 100644 --- a/ffplayout/src/player/output/mod.rs +++ b/ffplayout/src/player/output/mod.rs @@ -3,7 +3,7 @@ use std::{ process::{Command, Stdio}, sync::atomic::Ordering, thread::{self, sleep}, - time::Duration, + time::{Duration, SystemTime}, }; use crossbeam_channel::bounded; @@ -79,11 +79,14 @@ pub fn player(manager: ChannelManager) -> Result<(), ProcessError> { drop(config); + let mut error_count = 0; + 'source_iter: for node in node_sources { let config = manager.config.lock()?.clone(); *manager.current_media.lock().unwrap() = Some(node.clone()); let ignore_dec = config.logging.ignore_lines.clone(); + let timer = SystemTime::now(); if is_terminated.load(Ordering::SeqCst) { debug!(target: Target::file_mail(), channel = id; "Playout is terminated, break out from source loop"); @@ -237,6 +240,19 @@ pub fn player(manager: ChannelManager) -> Result<(), ProcessError> { if let Err(e) = error_decoder_thread.join() { error!(target: Target::file_mail(), channel = id; "{e:?}"); }; + + if let Ok(elapsed) = timer.elapsed() { + if elapsed.as_millis() < 300 { + error_count += 1; + + if error_count > 10 { + error!(target: Target::file_mail(), channel = id; "Reach fatal error count, terminate channel!"); + break; + } + } else { + error_count = 0; + } + } } trace!("Out of source loop"); diff --git a/ffplayout/src/player/utils/mod.rs b/ffplayout/src/player/utils/mod.rs index c4aefd57..89ea3704 100644 --- a/ffplayout/src/player/utils/mod.rs +++ b/ffplayout/src/player/utils/mod.rs @@ -812,7 +812,6 @@ pub fn stderr_reader( && !line.contains("failed to delete old segment")) { manager.stop_all(); - exit(1); } } } diff --git a/ffplayout/src/utils/args_parse.rs b/ffplayout/src/utils/args_parse.rs index b78edd0a..eeaf71cf 100644 --- a/ffplayout/src/utils/args_parse.rs +++ b/ffplayout/src/utils/args_parse.rs @@ -149,7 +149,7 @@ pub async fn run_args(pool: &Pool) -> Result<(), i32> { let mut args = ARGS.clone(); if args.init { - let check_user = handles::select_user(pool, 1).await; + let check_user = handles::select_users(pool).await; let mut storage = String::new(); let mut playlist = String::new(); @@ -166,7 +166,7 @@ pub async fn run_args(pool: &Pool) -> Result<(), i32> { shared_storage: false, }; - if check_user.is_err() { + if check_user.unwrap_or_default().is_empty() { global_user(&mut args); } diff --git a/ffplayout/src/utils/channels.rs b/ffplayout/src/utils/channels.rs index 0403ba99..eadd12b1 100644 --- a/ffplayout/src/utils/channels.rs +++ b/ffplayout/src/utils/channels.rs @@ -16,16 +16,9 @@ async fn map_global_admins(conn: &Pool) -> Result<(), ServiceError> { let admins = handles::select_global_admins(conn).await?; for admin in admins { - if let Err(e) = handles::update_user_channel( - conn, - admin.id, - channels - .iter() - .map(|c| c.id.to_string()) - .collect::>() - .join(","), - ) - .await + if let Err(e) = + handles::insert_user_channel(conn, admin.id, channels.iter().map(|c| c.id).collect()) + .await { error!("Update global admin: {e}"); }; @@ -42,7 +35,7 @@ pub async fn create_channel( ) -> Result { let channel = handles::insert_channel(conn, target_channel).await?; - let output_param = format!("-c:v libx264 -crf 23 -x264-params keyint=50:min-keyint=25:scenecut=-1 -maxrate 1300k -bufsize 2600k -preset faster -tune zerolatency -profile:v Main -level 3.1 -c:a aac -ar 44100 -b:a 128k -flags +cgop -f hls -hls_time 6 -hls_list_size 600 -hls_flags append_list+delete_segments+omit_endlist -hls_segment_filename /usr/share/ffplayout/public/live/stream{0}-%d.ts /usr/share/ffplayout/public/live/stream{0}.m3u8", channel.id); + let output_param = format!("-c:v libx264 -crf 23 -x264-params keyint=50:min-keyint=25:scenecut=-1 -maxrate 1300k -bufsize 2600k -preset faster -tune zerolatency -profile:v Main -level 3.1 -c:a aac -ar 44100 -b:a 128k -flags +cgop -f hls -hls_time 6 -hls_list_size 600 -hls_flags append_list+delete_segments+omit_endlist -hls_segment_filename live/stream{0}-%d.ts live/stream{0}.m3u8", channel.id); handles::insert_advanced_configuration(conn, channel.id).await?; handles::insert_configuration(conn, channel.id, output_param).await?; diff --git a/ffplayout/src/utils/config.rs b/ffplayout/src/utils/config.rs index dd307075..f0533670 100644 --- a/ffplayout/src/utils/config.rs +++ b/ffplayout/src/utils/config.rs @@ -678,6 +678,14 @@ impl PlayoutConfig { cmd.remove(i); } + for item in cmd.iter_mut() { + if item.ends_with(".ts") || item.ends_with(".m3u8") { + if let Ok((hls_path, _, _)) = norm_abs_path(&global.hls_path, &item) { + item.clone_from(&hls_path.to_string_lossy().to_string()); + }; + } + } + output.output_cmd = Some(cmd); } diff --git a/ffplayout/src/utils/generator.rs b/ffplayout/src/utils/generator.rs index b5ffd6eb..edd7dd2d 100644 --- a/ffplayout/src/utils/generator.rs +++ b/ffplayout/src/utils/generator.rs @@ -7,7 +7,6 @@ use std::{ fs::{create_dir_all, write}, io::Error, - process::exit, }; use chrono::Timelike; @@ -227,8 +226,6 @@ pub fn playlist_generator(manager: &ChannelManager) -> Result, "Playlist folder {:?} not exists!", config.global.playlist_path ); - - exit(1); } if let Some(range) = config.general.generate.clone() { diff --git a/ffplayout/src/utils/logging.rs b/ffplayout/src/utils/logging.rs index b0d3d77c..3cb2521b 100644 --- a/ffplayout/src/utils/logging.rs +++ b/ffplayout/src/utils/logging.rs @@ -274,7 +274,7 @@ fn file_formatter( ) -> std::io::Result<()> { write!( w, - "[{}] {} {}", + "[{}] [{:>5}] {}", now.now().format("%Y-%m-%d %H:%M:%S%.6f"), record.level(), record.args() diff --git a/migrations/00001_create_tables.sql b/migrations/00001_create_tables.sql index 99ce7ec9..19d543db 100644 --- a/migrations/00001_create_tables.sql +++ b/migrations/00001_create_tables.sql @@ -57,11 +57,21 @@ CREATE TABLE username TEXT NOT NULL, password TEXT NOT NULL, role_id INTEGER NOT NULL DEFAULT 3, - channel_ids TEXT DEFAULT "1", FOREIGN KEY (role_id) REFERENCES roles (id) ON UPDATE SET NULL ON DELETE SET DEFAULT, UNIQUE (mail, username) ); +CREATE TABLE + user_channels ( + id INTEGER PRIMARY KEY, + channel_id INTEGER NOT NULL, + user_id INTEGER NOT NULL, + FOREIGN KEY (channel_id) REFERENCES channels (id) ON UPDATE CASCADE ON DELETE CASCADE, + FOREIGN KEY (user_id) REFERENCES user (id) ON UPDATE CASCADE ON DELETE CASCADE + ); + +CREATE UNIQUE INDEX IF NOT EXISTS idx_user_channels_unique ON user_channels (channel_id, user_id); + CREATE TABLE configurations ( id INTEGER PRIMARY KEY AUTOINCREMENT,