diff --git a/ffplayout/src/api/routes.rs b/ffplayout/src/api/routes.rs index e9f67085..719b9a6e 100644 --- a/ffplayout/src/api/routes.rs +++ b/ffplayout/src/api/routes.rs @@ -493,8 +493,10 @@ async fn get_playout_config( id: web::Path, _details: AuthDetails, ) -> Result { - if let Ok(channel) = handles::select_channel(&pool.into_inner(), &id).await { + if let Ok(_channel) = handles::select_channel(&pool.into_inner(), &id).await { // TODO: get config + + return Ok("Update playout config success."); }; Err(ServiceError::InternalServerError) @@ -511,9 +513,9 @@ async fn get_playout_config( async fn update_playout_config( pool: web::Data>, id: web::Path, - data: web::Json, + _data: web::Json, ) -> Result { - if let Ok(channel) = handles::select_channel(&pool.into_inner(), &id).await { + if let Ok(_channel) = handles::select_channel(&pool.into_inner(), &id).await { // TODO: update config return Ok("Update playout config success."); @@ -722,13 +724,10 @@ pub async fn media_current( #[post("/control/{id}/process/")] #[protect(any("Role::Admin", "Role::User"), ty = "Role")] pub async fn process_control( - pool: web::Data>, - id: web::Path, + _id: web::Path, _proc: web::Json, _engine_process: web::Data, ) -> Result { - let (_config, _) = playout_config(&pool.clone().into_inner(), &id).await?; - Ok(web::Json("no implemented")) } @@ -743,11 +742,14 @@ pub async fn process_control( #[get("/playlist/{id}")] #[protect(any("Role::Admin", "Role::User"), ty = "Role")] pub async fn get_playlist( - pool: web::Data>, id: web::Path, obj: web::Query, + controllers: web::Data>, ) -> Result { - match read_playlist(&pool.into_inner(), *id, obj.date.clone()).await { + let manager = controllers.lock().unwrap().get(*id).unwrap(); + let config = manager.config.lock().unwrap().clone(); + + match read_playlist(&config, obj.date.clone()).await { Ok(playlist) => Ok(web::Json(playlist)), Err(e) => Err(e), } @@ -763,11 +765,14 @@ pub async fn get_playlist( #[post("/playlist/{id}/")] #[protect(any("Role::Admin", "Role::User"), ty = "Role")] pub async fn save_playlist( - pool: web::Data>, id: web::Path, data: web::Json, + controllers: web::Data>, ) -> Result { - match write_playlist(&pool.into_inner(), *id, data.into_inner()).await { + let manager = controllers.lock().unwrap().get(*id).unwrap(); + let config = manager.config.lock().unwrap().clone(); + + match write_playlist(&config, data.into_inner()).await { Ok(res) => Ok(web::Json(res)), Err(e) => Err(e), } @@ -794,11 +799,13 @@ pub async fn save_playlist( #[post("/playlist/{id}/generate/{date}")] #[protect(any("Role::Admin", "Role::User"), ty = "Role")] pub async fn gen_playlist( - pool: web::Data>, params: web::Path<(i32, String)>, data: Option>, + controllers: web::Data>, ) -> Result { - let (mut config, channel) = playout_config(&pool.into_inner(), ¶ms.0).await?; + let manager = controllers.lock().unwrap().get(params.0).unwrap(); + let channel_name = manager.channel.lock().unwrap().name.clone(); + let mut config = manager.config.lock().unwrap(); config.general.generate = Some(vec![params.1.clone()]); if let Some(obj) = data { @@ -817,7 +824,7 @@ pub async fn gen_playlist( config.general.template.clone_from(&obj.template); } - match generate_playlist(config.to_owned(), channel.name).await { + match generate_playlist(config.clone(), channel_name).await { Ok(playlist) => Ok(web::Json(playlist)), Err(e) => Err(e), } @@ -832,10 +839,13 @@ pub async fn gen_playlist( #[delete("/playlist/{id}/{date}")] #[protect(any("Role::Admin", "Role::User"), ty = "Role")] pub async fn del_playlist( - pool: web::Data>, params: web::Path<(i32, String)>, + controllers: web::Data>, ) -> Result { - match delete_playlist(&pool.into_inner(), params.0, ¶ms.1).await { + let manager = controllers.lock().unwrap().get(params.0).unwrap(); + let config = manager.config.lock().unwrap().clone(); + + match delete_playlist(&config, ¶ms.1).await { Ok(m) => Ok(web::Json(m)), Err(e) => Err(e), } @@ -869,11 +879,15 @@ pub async fn get_log( #[post("/file/{id}/browse/")] #[protect(any("Role::Admin", "Role::User"), ty = "Role")] pub async fn file_browser( - pool: web::Data>, id: web::Path, data: web::Json, + controllers: web::Data>, ) -> Result { - match browser(&pool.into_inner(), *id, &data.into_inner()).await { + let manager = controllers.lock().unwrap().get(*id).unwrap(); + let channel = manager.channel.lock().unwrap().clone(); + let config = manager.config.lock().unwrap().clone(); + + match browser(&config, &channel, &data.into_inner()).await { Ok(obj) => Ok(web::Json(obj)), Err(e) => Err(e), } @@ -888,11 +902,14 @@ pub async fn file_browser( #[post("/file/{id}/create-folder/")] #[protect(any("Role::Admin", "Role::User"), ty = "Role")] pub async fn add_dir( - pool: web::Data>, id: web::Path, data: web::Json, + controllers: web::Data>, ) -> Result { - create_directory(&pool.into_inner(), *id, &data.into_inner()).await + let manager = controllers.lock().unwrap().get(*id).unwrap(); + let config = manager.config.lock().unwrap().clone(); + + create_directory(&config, &data.into_inner()).await } /// **Rename File** @@ -904,11 +921,14 @@ pub async fn add_dir( #[post("/file/{id}/rename/")] #[protect(any("Role::Admin", "Role::User"), ty = "Role")] pub async fn move_rename( - pool: web::Data>, id: web::Path, data: web::Json, + controllers: web::Data>, ) -> Result { - match rename_file(&pool.into_inner(), *id, &data.into_inner()).await { + let manager = controllers.lock().unwrap().get(*id).unwrap(); + let config = manager.config.lock().unwrap().clone(); + + match rename_file(&config, &data.into_inner()).await { Ok(obj) => Ok(web::Json(obj)), Err(e) => Err(e), } @@ -923,11 +943,14 @@ pub async fn move_rename( #[post("/file/{id}/remove/")] #[protect(any("Role::Admin", "Role::User"), ty = "Role")] pub async fn remove( - pool: web::Data>, id: web::Path, data: web::Json, + controllers: web::Data>, ) -> Result { - match remove_file_or_folder(&pool.into_inner(), *id, &data.into_inner().source).await { + let manager = controllers.lock().unwrap().get(*id).unwrap(); + let config = manager.config.lock().unwrap().clone(); + + match remove_file_or_folder(&config, &data.into_inner().source).await { Ok(obj) => Ok(web::Json(obj)), Err(e) => Err(e), } @@ -942,12 +965,15 @@ pub async fn remove( #[put("/file/{id}/upload/")] #[protect(any("Role::Admin", "Role::User"), ty = "Role")] async fn save_file( - pool: web::Data>, id: web::Path, req: HttpRequest, payload: Multipart, obj: web::Query, + controllers: web::Data>, ) -> Result { + let manager = controllers.lock().unwrap().get(*id).unwrap(); + let config = manager.config.lock().unwrap().clone(); + let size: u64 = req .headers() .get("content-length") @@ -955,7 +981,7 @@ async fn save_file( .and_then(|cls| cls.parse().ok()) .unwrap_or(0); - upload(&pool.into_inner(), *id, size, payload, &obj.path, false).await + upload(&config, size, payload, &obj.path, false).await } /// **Get File** @@ -967,12 +993,13 @@ async fn save_file( /// ``` #[get("/file/{id}/{filename:.*}")] async fn get_file( - pool: web::Data>, req: HttpRequest, + controllers: web::Data>, ) -> Result { let id: i32 = req.match_info().query("id").parse()?; - let (config, _) = playout_config(&pool.into_inner(), &id).await?; - let storage_path = config.storage.path; + let manager = controllers.lock().unwrap().get(id).unwrap(); + let config = manager.config.lock().unwrap(); + let storage_path = config.storage.path.clone(); let file_path = req.match_info().query("filename"); let (path, _, _) = norm_abs_path(&storage_path, file_path)?; let file = actix_files::NamedFile::open(path)?; @@ -1026,17 +1053,18 @@ async fn get_public(public: web::Path) -> Result>, id: web::Path, req: HttpRequest, payload: Multipart, obj: web::Query, + controllers: web::Data>, ) -> Result { + let manager = controllers.lock().unwrap().get(*id).unwrap(); + let channel_name = manager.channel.lock().unwrap().name.clone(); + let config = manager.config.lock().unwrap().clone(); let file = obj.file.file_name().unwrap_or_default(); let path = env::temp_dir().join(file); let path_clone = path.clone(); - let (config, _) = playout_config(&pool.clone().into_inner(), &id).await?; - let channel = handles::select_channel(&pool.clone().into_inner(), &id).await?; let size: u64 = req .headers() .get("content-length") @@ -1044,10 +1072,10 @@ async fn import_playlist( .and_then(|cls| cls.parse().ok()) .unwrap_or(0); - upload(&pool.into_inner(), *id, size, payload, &path, true).await?; + upload(&config, size, payload, &path, true).await?; let response = task::spawn_blocking(move || { - import_file(&config, &obj.date, Some(channel.name), &path_clone) + import_file(&config, &obj.date, Some(channel_name), &path_clone) }) .await??; @@ -1081,11 +1109,12 @@ async fn import_playlist( #[get("/program/{id}/")] #[protect(any("Role::Admin", "Role::User"), ty = "Role")] async fn get_program( - pool: web::Data>, id: web::Path, obj: web::Query, + controllers: web::Data>, ) -> Result { - let (config, _) = playout_config(&pool.clone().into_inner(), &id).await?; + let manager = controllers.lock().unwrap().get(*id).unwrap(); + let config = manager.config.lock().unwrap().clone(); let start_sec = config.playlist.start_sec.unwrap(); let mut days = 0; let mut program = vec![]; @@ -1110,14 +1139,13 @@ async fn get_program( ]); for date in date_range { - let conn = pool.clone().into_inner(); let mut naive = NaiveDateTime::parse_from_str( &format!("{date} {}", sec_to_time(start_sec)), "%Y-%m-%d %H:%M:%S%.3f", ) .unwrap(); - let playlist = match read_playlist(&conn, *id, date.clone()).await { + let playlist = match read_playlist(&config, date.clone()).await { Ok(p) => p, Err(e) => { error!("Error in Playlist from {date}: {e}"); @@ -1169,10 +1197,11 @@ async fn get_program( #[get("/system/{id}")] #[protect(any("Role::Admin", "Role::User"), ty = "Role")] pub async fn get_system_stat( - pool: web::Data>, id: web::Path, + controllers: web::Data>, ) -> Result { - let (config, _) = playout_config(&pool.clone().into_inner(), &id).await?; + let manager = controllers.lock().unwrap().get(*id).unwrap(); + let config = manager.config.lock().unwrap().clone(); let stat = web::block(move || system::stat(config)).await?; diff --git a/ffplayout/src/db/models.rs b/ffplayout/src/db/models.rs index 3b6f124f..6eb31c2b 100644 --- a/ffplayout/src/db/models.rs +++ b/ffplayout/src/db/models.rs @@ -108,7 +108,6 @@ pub struct Channel { pub id: i32, pub name: String, pub preview_url: String, - pub config_path: String, pub extra_extensions: String, pub active: bool, pub current_date: Option, diff --git a/ffplayout/src/player/controller.rs b/ffplayout/src/player/controller.rs index 8895ab43..747d5964 100644 --- a/ffplayout/src/player/controller.rs +++ b/ffplayout/src/player/controller.rs @@ -72,6 +72,11 @@ impl ChannelManager { is_alive: Arc::new(AtomicBool::new(channel.active)), channel: Arc::new(Mutex::new(channel)), config: Arc::new(Mutex::new(config)), + current_media: Arc::new(Mutex::new(None)), + current_list: Arc::new(Mutex::new(vec![Media::new(0, "", false)])), + filler_list: Arc::new(Mutex::new(vec![])), + current_index: Arc::new(AtomicUsize::new(0)), + filler_index: Arc::new(AtomicUsize::new(0)), ..Default::default() } } @@ -178,62 +183,6 @@ impl ChannelManager { } } -/// Global player control, to get infos about current clip etc. -#[derive(Clone, Debug)] -pub struct PlayerControl { - pub current_media: Arc>>, - pub current_list: Arc>>, - pub filler_list: Arc>>, - pub current_index: Arc, - pub filler_index: Arc, -} - -impl PlayerControl { - pub fn new() -> Self { - Self { - current_media: Arc::new(Mutex::new(None)), - current_list: Arc::new(Mutex::new(vec![Media::new(0, "", false)])), - filler_list: Arc::new(Mutex::new(vec![])), - current_index: Arc::new(AtomicUsize::new(0)), - filler_index: Arc::new(AtomicUsize::new(0)), - } - } -} - -impl Default for PlayerControl { - fn default() -> Self { - Self::new() - } -} - -/// Global playout control, for move forward/backward clip, or resetting playlist/state. -#[derive(Clone, Debug)] -pub struct PlayoutStatus { - pub chain: Option>>>, - pub current_date: Arc>, - pub date: Arc>, - pub list_init: Arc, - pub time_shift: Arc>, -} - -impl PlayoutStatus { - pub fn new() -> Self { - Self { - chain: None, - current_date: Arc::new(Mutex::new(String::new())), - date: Arc::new(Mutex::new(String::new())), - list_init: Arc::new(AtomicBool::new(true)), - time_shift: Arc::new(Mutex::new(0.0)), - } - } -} - -impl Default for PlayoutStatus { - fn default() -> Self { - Self::new() - } -} - #[derive(Clone, Debug, Default)] pub struct ChannelController { pub channels: Vec, @@ -273,22 +222,20 @@ impl ChannelController { } } -pub fn start(db_pool: Pool, channel: ChannelManager) -> Result<(), ProcessError> { - let config = channel.config.lock()?.clone(); +pub fn start(db_pool: Pool, manager: ChannelManager) -> Result<(), ProcessError> { + let config = manager.config.lock()?.clone(); let mode = config.output.mode.clone(); - let play_control = PlayerControl::new(); - let play_control_clone = play_control.clone(); - let play_status = PlayoutStatus::new(); + let filler_list = manager.filler_list.clone(); // Fill filler list, can also be a single file. thread::spawn(move || { - fill_filler_list(&config, Some(play_control_clone)); + fill_filler_list(&config, Some(filler_list)); }); match mode { // write files/playlist to HLS m3u8 playlist - HLS => write_hls(channel, db_pool, play_control, play_status), + HLS => write_hls(manager, db_pool), // play on desktop or stream to a remote target - _ => player(channel, db_pool, &play_control, play_status), + _ => player(manager, db_pool), } } diff --git a/ffplayout/src/player/input/mod.rs b/ffplayout/src/player/input/mod.rs index 30c99833..b8d3ea3b 100644 --- a/ffplayout/src/player/input/mod.rs +++ b/ffplayout/src/player/input/mod.rs @@ -1,7 +1,4 @@ -use std::{ - sync::{atomic::AtomicBool, Arc}, - thread, -}; +use std::thread; use simplelog::*; use sqlx::{Pool, Sqlite}; @@ -15,19 +12,22 @@ pub use ingest::ingest_server; pub use playlist::CurrentProgram; use crate::player::{ - controller::{PlayerControl, PlayoutStatus}, + controller::ChannelManager, utils::{folder::FolderSource, Media}, }; -use crate::utils::config::{PlayoutConfig, ProcessMode::*}; +use crate::utils::config::ProcessMode::*; /// Create a source iterator from playlist, or from folder. pub fn source_generator( - config: PlayoutConfig, + manager: ChannelManager, db_pool: Pool, - player_control: &PlayerControl, - playout_stat: PlayoutStatus, - is_terminated: Arc, ) -> Box> { + let config = manager.config.lock().unwrap().clone(); + let is_terminated = manager.is_terminated.clone(); + let chain = manager.chain.clone(); + let current_list = manager.current_list.clone(); + let current_index = manager.current_index.clone(); + match config.processing.mode { Folder => { info!("Playout in folder mode"); @@ -37,23 +37,18 @@ pub fn source_generator( ); let config_clone = config.clone(); - let folder_source = FolderSource::new(&config, playout_stat.chain, player_control); - let node_clone = folder_source.player_control.current_list.clone(); + let folder_source = + FolderSource::new(&config, chain, current_list.clone(), current_index); + let list_clone = current_list.clone(); // Spawn a thread to monitor folder for file changes. - thread::spawn(move || watchman(config_clone, is_terminated.clone(), node_clone)); + thread::spawn(move || watchman(config_clone, is_terminated.clone(), list_clone)); Box::new(folder_source) as Box> } Playlist => { info!("Playout in playlist mode"); - let program = CurrentProgram::new( - &config, - db_pool, - playout_stat, - is_terminated, - player_control, - ); + let program = CurrentProgram::new(manager, db_pool); Box::new(program) as Box> } diff --git a/ffplayout/src/player/input/playlist.rs b/ffplayout/src/player/input/playlist.rs index 9dd05276..9f8ddd0a 100644 --- a/ffplayout/src/player/input/playlist.rs +++ b/ffplayout/src/player/input/playlist.rs @@ -12,7 +12,7 @@ use sqlx::{Pool, Sqlite}; use crate::db::handles; use crate::player::{ - controller::{PlayerControl, PlayoutStatus}, + controller::ChannelManager, utils::{ gen_dummy, get_delta, is_close, is_remote, json_serializer::{read_json, set_defaults}, @@ -27,29 +27,26 @@ use crate::utils::config::{PlayoutConfig, IMAGE_FORMAT}; /// Here we prepare the init clip and build a iterator where we pull our clips. #[derive(Debug)] pub struct CurrentProgram { + manager: ChannelManager, config: PlayoutConfig, db_pool: Pool, start_sec: f64, end_sec: f64, json_playlist: JsonPlaylist, - player_control: PlayerControl, current_node: Media, is_terminated: Arc, - playout_stat: PlayoutStatus, last_json_path: Option, last_node_ad: bool, } /// Prepare a playlist iterator. impl CurrentProgram { - pub fn new( - config: &PlayoutConfig, - db_pool: Pool, - playout_stat: PlayoutStatus, - is_terminated: Arc, - player_control: &PlayerControl, - ) -> Self { + pub fn new(manager: ChannelManager, db_pool: Pool) -> Self { + let config = manager.config.lock().unwrap().clone(); + let is_terminated = manager.is_terminated.clone(); + Self { + manager, config: config.clone(), db_pool, start_sec: config.playlist.start_sec.unwrap(), @@ -58,10 +55,8 @@ impl CurrentProgram { "1970-01-01".to_string(), config.playlist.start_sec.unwrap(), ), - player_control: player_control.clone(), current_node: Media::new(0, "", false), is_terminated, - playout_stat, last_json_path: None, last_node_ad: false, } @@ -78,7 +73,7 @@ impl CurrentProgram { && self.json_playlist.modified != modified_time(&path) { info!("Reload playlist {path}"); - self.playout_stat.list_init.store(true, Ordering::SeqCst); + self.manager.list_init.store(true, Ordering::SeqCst); get_current = true; reload = true; } @@ -89,7 +84,7 @@ impl CurrentProgram { if get_current { self.json_playlist = read_json( &mut self.config, - &self.player_control, + self.manager.current_list.clone(), self.json_playlist.path.clone(), self.is_terminated.clone(), seek, @@ -101,18 +96,27 @@ impl CurrentProgram { info!("Read playlist: {file}"); } - if *self.playout_stat.date.lock().unwrap() != self.json_playlist.date { + if *self + .manager + .channel + .lock() + .unwrap() + .current_date + .clone() + .unwrap_or_default() + != self.json_playlist.date + { self.set_status(self.json_playlist.date.clone()); } - self.playout_stat + self.manager .current_date .lock() .unwrap() .clone_from(&self.json_playlist.date); } - self.player_control + self.manager .current_list .lock() .unwrap() @@ -122,8 +126,8 @@ impl CurrentProgram { trace!("missing playlist"); self.current_node = Media::new(0, "", false); - self.playout_stat.list_init.store(true, Ordering::SeqCst); - self.player_control.current_index.store(0, Ordering::SeqCst); + self.manager.list_init.store(true, Ordering::SeqCst); + self.manager.current_index.store(0, Ordering::SeqCst); } } } @@ -145,9 +149,7 @@ impl CurrentProgram { let mut next_start = self.current_node.begin.unwrap_or_default() - self.start_sec + duration + delta; - if node_index > 0 - && node_index == self.player_control.current_list.lock().unwrap().len() - 1 - { + if node_index > 0 && node_index == self.manager.current_list.lock().unwrap().len() - 1 { next_start += self.config.general.stop_threshold; } @@ -168,7 +170,7 @@ impl CurrentProgram { self.json_playlist = read_json( &mut self.config, - &self.player_control, + self.manager.current_list.clone(), None, self.is_terminated.clone(), false, @@ -179,15 +181,15 @@ impl CurrentProgram { info!("Read next playlist: {file}"); } - self.playout_stat.list_init.store(false, Ordering::SeqCst); + self.manager.list_init.store(false, Ordering::SeqCst); self.set_status(self.json_playlist.date.clone()); - self.player_control + self.manager .current_list .lock() .unwrap() .clone_from(&self.json_playlist.program); - self.player_control.current_index.store(0, Ordering::SeqCst); + self.manager.current_index.store(0, Ordering::SeqCst); } else { self.load_or_update_playlist(seek) } @@ -196,18 +198,20 @@ impl CurrentProgram { } fn set_status(&mut self, date: String) { - if *self.playout_stat.date.lock().unwrap() != date - && *self.playout_stat.time_shift.lock().unwrap() != 0.0 + if self.manager.channel.lock().unwrap().current_date != Some(date.clone()) + && self.manager.channel.lock().unwrap().time_shift != 0.0 { info!("Reset playout status"); } - self.playout_stat - .current_date + self.manager.current_date.lock().unwrap().clone_from(&date); + self.manager + .channel .lock() .unwrap() - .clone_from(&date); - *self.playout_stat.time_shift.lock().unwrap() = 0.0; + .current_date + .clone_from(&Some(date.clone())); + self.manager.channel.lock().unwrap().time_shift = 0.0; if let Err(e) = executor::block_on(handles::update_stat( &self.db_pool, @@ -221,8 +225,8 @@ impl CurrentProgram { // Check if last and/or next clip is a advertisement. fn last_next_ad(&mut self, node: &mut Media) { - let index = self.player_control.current_index.load(Ordering::SeqCst); - let current_list = self.player_control.current_list.lock().unwrap(); + let index = self.manager.current_index.load(Ordering::SeqCst); + let current_list = self.manager.current_list.lock().unwrap(); if index + 1 < current_list.len() && ¤t_list[index + 1].category == "advertisement" { node.next_ad = true; @@ -251,7 +255,7 @@ impl CurrentProgram { // On init or reload we need to seek for the current clip. fn get_current_clip(&mut self) { let mut time_sec = self.get_current_time(); - let shift = *self.playout_stat.time_shift.lock().unwrap(); + let shift = self.manager.channel.lock().unwrap().time_shift.clone(); if shift != 0.0 { info!("Shift playlist start for {shift:.3} seconds"); @@ -265,17 +269,10 @@ impl CurrentProgram { self.recalculate_begin(true) } - for (i, item) in self - .player_control - .current_list - .lock() - .unwrap() - .iter() - .enumerate() - { + for (i, item) in self.manager.current_list.lock().unwrap().iter().enumerate() { if item.begin.unwrap() + item.out - item.seek > time_sec { - self.playout_stat.list_init.store(false, Ordering::SeqCst); - self.player_control.current_index.store(i, Ordering::SeqCst); + self.manager.list_init.store(false, Ordering::SeqCst); + self.manager.current_index.store(i, Ordering::SeqCst); break; } @@ -288,10 +285,10 @@ impl CurrentProgram { self.get_current_clip(); let mut is_filler = false; - if !self.playout_stat.list_init.load(Ordering::SeqCst) { + if !self.manager.list_init.load(Ordering::SeqCst) { let time_sec = self.get_current_time(); - let index = self.player_control.current_index.load(Ordering::SeqCst); - let nodes = self.player_control.current_list.lock().unwrap(); + let index = self.manager.current_index.load(Ordering::SeqCst); + let nodes = self.manager.current_list.lock().unwrap(); let last_index = nodes.len() - 1; // de-instance node to preserve original values in list @@ -303,13 +300,11 @@ impl CurrentProgram { trace!("Clip from init: {}", node_clone.source); node_clone.seek += time_sec - - (node_clone.begin.unwrap() - *self.playout_stat.time_shift.lock().unwrap()); + - (node_clone.begin.unwrap() - self.manager.channel.lock().unwrap().time_shift); self.last_next_ad(&mut node_clone); - self.player_control - .current_index - .fetch_add(1, Ordering::SeqCst); + self.manager.current_index.fetch_add(1, Ordering::SeqCst); self.current_node = handle_list_init( &self.config, @@ -334,7 +329,7 @@ impl CurrentProgram { fn fill_end(&mut self, total_delta: f64) { // Fill end from playlist - let index = self.player_control.current_index.load(Ordering::SeqCst); + let index = self.manager.current_index.load(Ordering::SeqCst); let mut media = Media::new(index, "", false); media.begin = Some(time_in_seconds()); media.duration = total_delta; diff --git a/ffplayout/src/player/output/hls.rs b/ffplayout/src/player/output/hls.rs index 8f7523f8..1d109cbd 100644 --- a/ffplayout/src/player/output/hls.rs +++ b/ffplayout/src/player/output/hls.rs @@ -28,11 +28,11 @@ use std::{ use log::*; use sqlx::{Pool, Sqlite}; -use crate::utils::{config::PlayoutConfig, logging::log_line, task_runner}; +use crate::utils::{logging::log_line, task_runner}; use crate::vec_strings; use crate::{ player::{ - controller::{ChannelManager, PlayerControl, PlayoutStatus, ProcessUnit::*}, + controller::{ChannelManager, ProcessUnit::*}, input::source_generator, utils::{ get_delta, prepare_output_cmd, sec_to_time, stderr_reader, test_tcp_port, valid_stream, @@ -43,20 +43,18 @@ use crate::{ }; /// Ingest Server for HLS -fn ingest_to_hls_server( - config: PlayoutConfig, - playout_stat: PlayoutStatus, - channel_mgr: ChannelManager, -) -> Result<(), ProcessError> { - let playlist_init = playout_stat.list_init; +fn ingest_to_hls_server(manager: ChannelManager) -> Result<(), ProcessError> { + let config = manager.config.lock().unwrap(); + let playlist_init = manager.list_init.clone(); + let chain = manager.chain.clone(); let mut server_prefix = vec_strings!["-hide_banner", "-nostats", "-v", "level+info"]; let stream_input = config.ingest.input_cmd.clone().unwrap(); let mut dummy_media = Media::new(0, "Live Stream", false); dummy_media.unit = Ingest; - let is_terminated = channel_mgr.is_terminated.clone(); - let ingest_is_running = channel_mgr.ingest_is_running.clone(); + let is_terminated = manager.is_terminated.clone(); + let ingest_is_running = manager.ingest_is_running.clone(); if let Some(ingest_input_cmd) = &config.advanced.ingest.input_cmd { server_prefix.append(&mut ingest_input_cmd.clone()); @@ -68,15 +66,18 @@ fn ingest_to_hls_server( if let Some(url) = stream_input.iter().find(|s| s.contains("://")) { if !test_tcp_port(url) { - channel_mgr.stop_all(); + manager.stop_all(); exit(1); } info!("Start ingest server, listening on: {url}"); }; + drop(config); + loop { - dummy_media.add_filter(&config, &playout_stat.chain); + let config = manager.config.lock().unwrap().clone(); + dummy_media.add_filter(&config, &chain); let server_cmd = prepare_output_cmd(&config, server_prefix.clone(), &dummy_media.filter); debug!( @@ -84,7 +85,7 @@ fn ingest_to_hls_server( server_cmd.join(" ") ); - let proc_ctl = channel_mgr.clone(); + let proc_ctl = manager.clone(); let mut server_proc = match Command::new("ffmpeg") .args(server_cmd.clone()) .stderr(Stdio::piped()) @@ -98,7 +99,7 @@ fn ingest_to_hls_server( }; let server_err = BufReader::new(server_proc.stderr.take().unwrap()); - *channel_mgr.ingest.lock().unwrap() = Some(server_proc); + *manager.ingest.lock().unwrap() = Some(server_proc); is_running = false; for line in server_err.lines() { @@ -117,7 +118,7 @@ fn ingest_to_hls_server( info!("Switch from {} to live ingest", config.processing.mode); - if let Err(e) = channel_mgr.stop(Decoder) { + if let Err(e) = manager.stop(Decoder) { error!("{e}"); } } @@ -131,7 +132,7 @@ fn ingest_to_hls_server( ingest_is_running.store(false, Ordering::SeqCst); - if let Err(e) = channel_mgr.wait(Ingest) { + if let Err(e) = manager.wait(Ingest) { error!("{e}") } @@ -146,35 +147,24 @@ fn ingest_to_hls_server( /// HLS Writer /// /// Write with single ffmpeg instance directly to a HLS playlist. -pub fn write_hls( - channel_mgr: ChannelManager, - db_pool: Pool, - player_control: PlayerControl, - playout_stat: PlayoutStatus, -) -> Result<(), ProcessError> { - let config = channel_mgr.config.lock()?.clone(); - let config_clone = config.clone(); - let ff_log_format = format!("level+{}", config.logging.ffmpeg_level.to_lowercase()); - let play_stat = playout_stat.clone(); - let channel_mgr_2 = channel_mgr.clone(); - let is_terminated = channel_mgr.is_terminated.clone(); - let ingest_is_running = channel_mgr.ingest_is_running.clone(); +pub fn write_hls(manager: ChannelManager, db_pool: Pool) -> Result<(), ProcessError> { + let config = manager.config.lock()?.clone(); + let current_media = manager.current_media.clone(); - let get_source = source_generator( - config.clone(), - db_pool, - &player_control, - playout_stat, - is_terminated.clone(), - ); + let ff_log_format = format!("level+{}", config.logging.ffmpeg_level.to_lowercase()); + + let channel_mgr_2 = manager.clone(); + let ingest_is_running = manager.ingest_is_running.clone(); + + let get_source = source_generator(manager.clone(), db_pool); // spawn a thread for ffmpeg ingest server and create a channel for package sending if config.ingest.enable { - thread::spawn(move || ingest_to_hls_server(config_clone, play_stat, channel_mgr_2)); + thread::spawn(move || ingest_to_hls_server(channel_mgr_2)); } for node in get_source { - *player_control.current_media.lock().unwrap() = Some(node.clone()); + *current_media.lock().unwrap() = Some(node.clone()); let ignore = config.logging.ignore_lines.clone(); let mut cmd = match &node.cmd { @@ -194,7 +184,7 @@ pub fn write_hls( if config.task.enable { if config.task.path.is_file() { - let channel_mgr_3 = channel_mgr.clone(); + let channel_mgr_3 = manager.clone(); thread::spawn(move || task_runner::run(channel_mgr_3)); } else { @@ -250,13 +240,13 @@ pub fn write_hls( }; let enc_err = BufReader::new(dec_proc.stderr.take().unwrap()); - *channel_mgr.decoder.lock().unwrap() = Some(dec_proc); + *manager.decoder.lock().unwrap() = Some(dec_proc); - if let Err(e) = stderr_reader(enc_err, ignore, Decoder, channel_mgr.clone()) { + if let Err(e) = stderr_reader(enc_err, ignore, Decoder, manager.clone()) { error!("{e:?}") }; - if let Err(e) = channel_mgr.wait(Decoder) { + if let Err(e) = manager.wait(Decoder) { error!("{e}"); } @@ -267,7 +257,7 @@ pub fn write_hls( sleep(Duration::from_secs(1)); - channel_mgr.stop_all(); + manager.stop_all(); Ok(()) } diff --git a/ffplayout/src/player/output/mod.rs b/ffplayout/src/player/output/mod.rs index a35c11a6..d765daa7 100644 --- a/ffplayout/src/player/output/mod.rs +++ b/ffplayout/src/player/output/mod.rs @@ -18,7 +18,7 @@ mod stream; pub use hls::write_hls; use crate::player::{ - controller::{ChannelManager, PlayerControl, PlayoutStatus, ProcessUnit::*}, + controller::{ChannelManager, ProcessUnit::*}, input::{ingest_server, source_generator}, utils::{sec_to_time, stderr_reader}, }; @@ -34,31 +34,20 @@ use crate::vec_strings; /// for getting live feeds. /// When a live ingest arrive, it stops the current playing and switch to the live source. /// When ingest stops, it switch back to playlist/folder mode. -pub fn player( - channel_mgr: ChannelManager, - db_pool: Pool, - play_control: &PlayerControl, - playout_stat: PlayoutStatus, -) -> Result<(), ProcessError> { - let config = channel_mgr.config.lock()?.clone(); +pub fn player(manager: ChannelManager, db_pool: Pool) -> Result<(), ProcessError> { + let config = manager.config.lock()?.clone(); let config_clone = config.clone(); let ff_log_format = format!("level+{}", config.logging.ffmpeg_level.to_lowercase()); let ignore_enc = config.logging.ignore_lines.clone(); let mut buffer = [0; 65088]; let mut live_on = false; - let playlist_init = playout_stat.list_init.clone(); + let playlist_init = manager.list_init.clone(); - let is_terminated = channel_mgr.is_terminated.clone(); - let ingest_is_running = channel_mgr.ingest_is_running.clone(); + let is_terminated = manager.is_terminated.clone(); + let ingest_is_running = manager.ingest_is_running.clone(); // get source iterator - let node_sources = source_generator( - config.clone(), - db_pool, - play_control, - playout_stat, - is_terminated.clone(), - ); + let node_sources = source_generator(manager.clone(), db_pool); // get ffmpeg output instance let mut enc_proc = match config.output.mode { @@ -71,14 +60,14 @@ pub fn player( let mut enc_writer = BufWriter::new(enc_proc.stdin.take().unwrap()); let enc_err = BufReader::new(enc_proc.stderr.take().unwrap()); - *channel_mgr.encoder.lock().unwrap() = Some(enc_proc); - let enc_p_ctl = channel_mgr.clone(); + *manager.encoder.lock().unwrap() = Some(enc_proc); + let enc_p_ctl = manager.clone(); // spawn a thread to log ffmpeg output error messages let error_encoder_thread = thread::spawn(move || stderr_reader(enc_err, ignore_enc, Encoder, enc_p_ctl)); - let channel_mgr_2 = channel_mgr.clone(); + let channel_mgr_2 = manager.clone(); let mut ingest_receiver = None; // spawn a thread for ffmpeg ingest server and create a channel for package sending @@ -88,8 +77,12 @@ pub fn player( thread::spawn(move || ingest_server(config_clone, ingest_sender, channel_mgr_2)); } + drop(config); + 'source_iter: for node in node_sources { - *play_control.current_media.lock().unwrap() = Some(node.clone()); + let config = manager.config.lock()?.clone(); + + *manager.current_media.lock().unwrap() = Some(node.clone()); let ignore_dec = config.logging.ignore_lines.clone(); if is_terminated.load(Ordering::SeqCst) { @@ -115,7 +108,7 @@ pub fn player( format!( " ({}/{})", node.index.unwrap() + 1, - play_control.current_list.lock().unwrap().len() + manager.current_list.lock().unwrap().len() ) } else { String::new() @@ -130,7 +123,7 @@ pub fn player( if config.task.enable { if config.task.path.is_file() { - let channel_mgr_3 = channel_mgr.clone(); + let channel_mgr_3 = manager.clone(); thread::spawn(move || task_runner::run(channel_mgr_3)); } else { @@ -180,8 +173,8 @@ pub fn player( let mut dec_reader = BufReader::new(dec_proc.stdout.take().unwrap()); let dec_err = BufReader::new(dec_proc.stderr.take().unwrap()); - *channel_mgr.clone().decoder.lock().unwrap() = Some(dec_proc); - let channel_mgr_c = channel_mgr.clone(); + *manager.clone().decoder.lock().unwrap() = Some(dec_proc); + let channel_mgr_c = manager.clone(); let error_decoder_thread = thread::spawn(move || stderr_reader(dec_err, ignore_dec, Decoder, channel_mgr_c)); @@ -192,7 +185,7 @@ pub fn player( if !live_on { info!("Switch from {} to live ingest", config.processing.mode); - if let Err(e) = channel_mgr.stop(Decoder) { + if let Err(e) = manager.stop(Decoder) { error!("{e}") } @@ -237,7 +230,7 @@ pub fn player( } } - if let Err(e) = channel_mgr.wait(Decoder) { + if let Err(e) = manager.wait(Decoder) { error!("{e}") } @@ -250,7 +243,7 @@ pub fn player( sleep(Duration::from_secs(1)); - channel_mgr.stop_all(); + manager.stop_all(); if let Err(e) = error_encoder_thread.join() { error!("{e:?}"); diff --git a/ffplayout/src/player/utils/folder.rs b/ffplayout/src/player/utils/folder.rs index 5c015f35..f093c87a 100644 --- a/ffplayout/src/player/utils/folder.rs +++ b/ffplayout/src/player/utils/folder.rs @@ -1,5 +1,5 @@ use std::sync::{ - atomic::Ordering, + atomic::{AtomicUsize, Ordering}, {Arc, Mutex}, }; @@ -8,7 +8,6 @@ use rand::{seq::SliceRandom, thread_rng}; use simplelog::*; use walkdir::WalkDir; -use crate::player::controller::PlayerControl; use crate::player::utils::{include_file_extension, time_in_seconds, Media, PlayoutConfig}; /// Folder Sources @@ -18,7 +17,8 @@ use crate::player::utils::{include_file_extension, time_in_seconds, Media, Playo pub struct FolderSource { config: PlayoutConfig, filter_chain: Option>>>, - pub player_control: PlayerControl, + pub current_list: Arc>>, + pub current_index: Arc, current_node: Media, } @@ -26,7 +26,8 @@ impl FolderSource { pub fn new( config: &PlayoutConfig, filter_chain: Option>>>, - player_control: &PlayerControl, + current_list: Arc>>, + current_index: Arc, ) -> Self { let mut path_list = vec![]; let mut media_list = vec![]; @@ -77,12 +78,13 @@ impl FolderSource { index += 1; } - *player_control.current_list.lock().unwrap() = media_list; + *current_list.lock().unwrap() = media_list; Self { config: config.clone(), filter_chain, - player_control: player_control.clone(), + current_list, + current_index, current_node: Media::new(0, "", false), } } @@ -90,22 +92,24 @@ impl FolderSource { pub fn from_list( config: &PlayoutConfig, filter_chain: Option>>>, - player_control: &PlayerControl, + current_list: Arc>>, + current_index: Arc, list: Vec, ) -> Self { - *player_control.current_list.lock().unwrap() = list; + *current_list.lock().unwrap() = list; Self { config: config.clone(), filter_chain, - player_control: player_control.clone(), + current_list, + current_index, current_node: Media::new(0, "", false), } } fn shuffle(&mut self) { let mut rng = thread_rng(); - let mut nodes = self.player_control.current_list.lock().unwrap(); + let mut nodes = self.current_list.lock().unwrap(); nodes.shuffle(&mut rng); @@ -115,7 +119,7 @@ impl FolderSource { } fn sort(&mut self) { - let mut nodes = self.player_control.current_list.lock().unwrap(); + let mut nodes = self.current_list.lock().unwrap(); nodes.sort_by(|d1, d2| d1.source.cmp(&d2.source)); @@ -130,19 +134,15 @@ impl Iterator for FolderSource { type Item = Media; fn next(&mut self) -> Option { - if self.player_control.current_index.load(Ordering::SeqCst) - < self.player_control.current_list.lock().unwrap().len() - { - let i = self.player_control.current_index.load(Ordering::SeqCst); - self.current_node = self.player_control.current_list.lock().unwrap()[i].clone(); + if self.current_index.load(Ordering::SeqCst) < self.current_list.lock().unwrap().len() { + let i = self.current_index.load(Ordering::SeqCst); + self.current_node = self.current_list.lock().unwrap()[i].clone(); let _ = self.current_node.add_probe(false).ok(); self.current_node .add_filter(&self.config, &self.filter_chain); self.current_node.begin = Some(time_in_seconds()); - self.player_control - .current_index - .fetch_add(1, Ordering::SeqCst); + self.current_index.fetch_add(1, Ordering::SeqCst); Some(self.current_node.clone()) } else { @@ -160,13 +160,13 @@ impl Iterator for FolderSource { self.sort(); } - self.current_node = self.player_control.current_list.lock().unwrap()[0].clone(); + self.current_node = self.current_list.lock().unwrap()[0].clone(); let _ = self.current_node.add_probe(false).ok(); self.current_node .add_filter(&self.config, &self.filter_chain); self.current_node.begin = Some(time_in_seconds()); - self.player_control.current_index.store(1, Ordering::SeqCst); + self.current_index.store(1, Ordering::SeqCst); Some(self.current_node.clone()) } @@ -175,7 +175,7 @@ impl Iterator for FolderSource { pub fn fill_filler_list( config: &PlayoutConfig, - player_control: Option, + fillers: Option>>>, ) -> Vec { let mut filler_list = vec![]; let filler_path = &config.storage.filler; @@ -190,7 +190,7 @@ pub fn fill_filler_list( { let mut media = Media::new(index, &entry.path().to_string_lossy(), false); - if player_control.is_none() { + if fillers.is_none() { if let Err(e) = media.add_probe(false) { error!("{e:?}"); }; @@ -211,13 +211,13 @@ pub fn fill_filler_list( item.index = Some(index); } - if let Some(control) = player_control.as_ref() { - control.filler_list.lock().unwrap().clone_from(&filler_list); + if let Some(f) = fillers.as_ref() { + f.lock().unwrap().clone_from(&filler_list); } } else if filler_path.is_file() { let mut media = Media::new(0, &config.storage.filler.to_string_lossy(), false); - if player_control.is_none() { + if fillers.is_none() { if let Err(e) = media.add_probe(false) { error!("{e:?}"); }; @@ -225,8 +225,8 @@ pub fn fill_filler_list( filler_list.push(media); - if let Some(control) = player_control.as_ref() { - control.filler_list.lock().unwrap().clone_from(&filler_list); + if let Some(f) = fillers.as_ref() { + f.lock().unwrap().clone_from(&filler_list); } } diff --git a/ffplayout/src/player/utils/json_serializer.rs b/ffplayout/src/player/utils/json_serializer.rs index 4c4fd0cb..c7c7affd 100644 --- a/ffplayout/src/player/utils/json_serializer.rs +++ b/ffplayout/src/player/utils/json_serializer.rs @@ -2,18 +2,15 @@ use serde::{Deserialize, Serialize}; use std::{ fs::File, path::Path, - sync::{atomic::AtomicBool, Arc}, + sync::{atomic::AtomicBool, Arc, Mutex}, thread, }; use simplelog::*; -use crate::player::{ - controller::PlayerControl, - utils::{ - get_date, is_remote, json_validate::validate_playlist, modified_time, time_from_header, - Media, PlayoutConfig, - }, +use crate::player::utils::{ + get_date, is_remote, json_validate::validate_playlist, modified_time, time_from_header, Media, + PlayoutConfig, }; use crate::utils::config::DUMMY_LEN; @@ -95,14 +92,13 @@ pub fn set_defaults(playlist: &mut JsonPlaylist) { /// which we need to process. pub fn read_json( config: &mut PlayoutConfig, - player_control: &PlayerControl, + current_list: Arc>>, path: Option, is_terminated: Arc, seek: bool, get_next: bool, ) -> JsonPlaylist { let config_clone = config.clone(); - let control_clone = player_control.clone(); let mut playlist_path = config.playlist.path.clone(); let start_sec = config.playlist.start_sec.unwrap(); let date = get_date(seek, start_sec, get_next); @@ -150,12 +146,7 @@ pub fn read_json( if !config.general.skip_validation { thread::spawn(move || { - validate_playlist( - config_clone, - control_clone, - list_clone, - is_terminated, - ) + validate_playlist(config_clone, current_list, list_clone, is_terminated) }); } @@ -194,7 +185,7 @@ pub fn read_json( if !config.general.skip_validation { thread::spawn(move || { - validate_playlist(config_clone, control_clone, list_clone, is_terminated) + validate_playlist(config_clone, current_list, list_clone, is_terminated) }); } diff --git a/ffplayout/src/player/utils/json_validate.rs b/ffplayout/src/player/utils/json_validate.rs index 7c3ddaf2..4d78399d 100644 --- a/ffplayout/src/player/utils/json_validate.rs +++ b/ffplayout/src/player/utils/json_validate.rs @@ -3,7 +3,7 @@ use std::{ process::{Command, Stdio}, sync::{ atomic::{AtomicBool, Ordering}, - Arc, + Arc, Mutex, }, time::Instant, }; @@ -12,9 +12,8 @@ use log::*; use regex::Regex; use crate::player::filter::FilterType::Audio; -use crate::player::{ - controller::PlayerControl, - utils::{is_close, is_remote, loop_image, sec_to_time, seek_and_length, JsonPlaylist, Media}, +use crate::player::utils::{ + is_close, is_remote, loop_image, sec_to_time, seek_and_length, JsonPlaylist, Media, }; use crate::utils::{ config::{OutputMode::Null, PlayoutConfig, FFMPEG_IGNORE_ERRORS, IMAGE_FORMAT}, @@ -155,7 +154,7 @@ fn check_media( /// This function we run in a thread, to don't block the main function. pub fn validate_playlist( mut config: PlayoutConfig, - player_control: PlayerControl, + current_list: Arc>>, mut playlist: JsonPlaylist, is_terminated: Arc, ) { @@ -206,7 +205,7 @@ pub fn validate_playlist( sec_to_time(begin), item.source ) - } else if let Ok(mut list) = player_control.current_list.try_lock() { + } else if let Ok(mut list) = current_list.try_lock() { // Filter out same item in current playlist, then add the probe to it. // Check also if duration differs with playlist value, log error if so and adjust that value. list.iter_mut().filter(|list_item| list_item.source == item.source).for_each(|o| { diff --git a/ffplayout/src/utils/channels.rs b/ffplayout/src/utils/channels.rs index ca61353f..7e097d7a 100644 --- a/ffplayout/src/utils/channels.rs +++ b/ffplayout/src/utils/channels.rs @@ -1,11 +1,8 @@ -use std::fs; - use rand::prelude::*; -use simplelog::*; use sqlx::{Pool, Sqlite}; use crate::db::{handles, models::Channel}; -use crate::utils::{config::PlayoutConfig, errors::ServiceError, playout_config}; +use crate::utils::{config::PlayoutConfig, errors::ServiceError}; pub async fn create_channel( conn: &Pool, @@ -34,12 +31,10 @@ pub async fn create_channel( } pub async fn delete_channel(conn: &Pool, id: i32) -> Result<(), ServiceError> { - let _channel = handles::select_channel(conn, &id).await?; - let (_config, _) = playout_config(conn, &id).await?; - + let channel = handles::select_channel(conn, &id).await?; // TODO: Remove Channel controller - handles::delete_channel(conn, &id).await?; + handles::delete_channel(conn, &channel.id).await?; Ok(()) } diff --git a/ffplayout/src/utils/control.rs b/ffplayout/src/utils/control.rs index 462511e6..fca1c9b6 100644 --- a/ffplayout/src/utils/control.rs +++ b/ffplayout/src/utils/control.rs @@ -189,7 +189,8 @@ pub async fn control_state( let (delta, _) = get_delta(&config, &media.begin.unwrap_or(0.0)); manager.channel.lock().unwrap().time_shift = delta; date.clone_from(¤t_date); - handles::update_stat(conn, config.general.channel_id, current_date, delta); + handles::update_stat(conn, config.general.channel_id, current_date, delta) + .await?; data_map.insert("operation".to_string(), json!("move_to_last")); data_map.insert("shifted_seconds".to_string(), json!(delta)); @@ -225,7 +226,8 @@ pub async fn control_state( let (delta, _) = get_delta(&config, &media.begin.unwrap_or(0.0)); manager.channel.lock().unwrap().time_shift = delta; date.clone_from(¤t_date); - handles::update_stat(conn, config.general.channel_id, current_date, delta); + handles::update_stat(conn, config.general.channel_id, current_date, delta) + .await?; data_map.insert("operation".to_string(), json!("move_to_next")); data_map.insert("shifted_seconds".to_string(), json!(delta)); @@ -254,7 +256,7 @@ pub async fn control_state( date.clone_from(¤t_date); manager.list_init.store(true, Ordering::SeqCst); - handles::update_stat(conn, config.general.channel_id, current_date, 0.0); + handles::update_stat(conn, config.general.channel_id, current_date, 0.0).await?; data_map.insert("operation".to_string(), json!("reset_playout_state")); diff --git a/ffplayout/src/utils/files.rs b/ffplayout/src/utils/files.rs index 4dca58e3..50278f1e 100644 --- a/ffplayout/src/utils/files.rs +++ b/ffplayout/src/utils/files.rs @@ -11,13 +11,13 @@ use lexical_sort::{natural_lexical_cmp, PathSort}; use rand::{distributions::Alphanumeric, Rng}; use relative_path::RelativePath; use serde::{Deserialize, Serialize}; -use sqlx::{Pool, Sqlite}; use tokio::fs; -use simplelog::*; +use log::*; -use crate::utils::{errors::ServiceError, playout_config}; -use ffplayout_lib::utils::{file_extension, MediaProbe}; +use crate::db::models::Channel; +use crate::player::utils::{file_extension, MediaProbe}; +use crate::utils::{config::PlayoutConfig, errors::ServiceError}; #[derive(Debug, Deserialize, Serialize, Clone)] pub struct PathObject { @@ -128,18 +128,17 @@ pub fn norm_abs_path( /// Input should be a relative path segment, but when it is a absolut path, the norm_abs_path function /// will take care, that user can not break out from given storage path in config. pub async fn browser( - conn: &Pool, - id: i32, + config: &PlayoutConfig, + channel: &Channel, path_obj: &PathObject, ) -> Result { - let (config, channel) = playout_config(conn, &id).await?; let mut channel_extensions = channel .extra_extensions .split(',') .map(|e| e.to_string()) .collect::>(); let mut parent_folders = vec![]; - let mut extensions = config.storage.extensions; + let mut extensions = config.storage.extensions.clone(); extensions.append(&mut channel_extensions); let (path, parent, path_component) = norm_abs_path(&config.storage.path, &path_obj.source)?; @@ -235,11 +234,9 @@ pub async fn browser( } pub async fn create_directory( - conn: &Pool, - id: i32, + config: &PlayoutConfig, path_obj: &PathObject, ) -> Result { - let (config, _) = playout_config(conn, &id).await?; let (path, _, _) = norm_abs_path(&config.storage.path, &path_obj.source)?; if let Err(e) = fs::create_dir_all(&path).await { @@ -306,11 +303,9 @@ async fn rename(source: &PathBuf, target: &PathBuf) -> Result, - id: i32, + config: &PlayoutConfig, move_object: &MoveObject, ) -> Result { - let (config, _) = playout_config(conn, &id).await?; let (source_path, _, _) = norm_abs_path(&config.storage.path, &move_object.source)?; let (mut target_path, _, _) = norm_abs_path(&config.storage.path, &move_object.target)?; @@ -341,11 +336,9 @@ pub async fn rename_file( } pub async fn remove_file_or_folder( - conn: &Pool, - id: i32, + config: &PlayoutConfig, source_path: &str, ) -> Result<(), ServiceError> { - let (config, _) = playout_config(conn, &id).await?; let (source, _, _) = norm_abs_path(&config.storage.path, source_path)?; if !source.exists() { @@ -377,8 +370,7 @@ pub async fn remove_file_or_folder( Err(ServiceError::InternalServerError) } -async fn valid_path(conn: &Pool, id: i32, path: &str) -> Result { - let (config, _) = playout_config(conn, &id).await?; +async fn valid_path(config: &PlayoutConfig, path: &str) -> Result { let (test_path, _, _) = norm_abs_path(&config.storage.path, path)?; if !test_path.is_dir() { @@ -389,8 +381,7 @@ async fn valid_path(conn: &Pool, id: i32, path: &str) -> Result, - id: i32, + config: &PlayoutConfig, _size: u64, mut payload: Multipart, path: &Path, @@ -411,7 +402,7 @@ pub async fn upload( let filepath = if abs_path { path.to_path_buf() } else { - valid_path(conn, id, &path.to_string_lossy()) + valid_path(&config, &path.to_string_lossy()) .await? .join(filename) }; diff --git a/ffplayout/src/utils/generator.rs b/ffplayout/src/utils/generator.rs index 1216ff4c..724a11c2 100644 --- a/ffplayout/src/utils/generator.rs +++ b/ffplayout/src/utils/generator.rs @@ -16,14 +16,11 @@ use rand::{seq::SliceRandom, thread_rng, Rng}; use simplelog::*; use walkdir::WalkDir; -use crate::player::{ - controller::PlayerControl, - utils::{ - folder::{fill_filler_list, FolderSource}, - gen_dummy, get_date_range, include_file_extension, - json_serializer::JsonPlaylist, - sum_durations, Media, - }, +use crate::player::utils::{ + folder::{fill_filler_list, FolderSource}, + gen_dummy, get_date_range, include_file_extension, + json_serializer::JsonPlaylist, + sum_durations, Media, }; use crate::utils::{ config::{PlayoutConfig, Template}, diff --git a/ffplayout/src/utils/mod.rs b/ffplayout/src/utils/mod.rs index 3529fc8a..26171804 100644 --- a/ffplayout/src/utils/mod.rs +++ b/ffplayout/src/utils/mod.rs @@ -2,8 +2,8 @@ use std::{ env, error::Error, fmt, - fs::{self, metadata, File}, - io::{stdin, stdout, Read, Write}, + fs::{self, metadata}, + io::{stdin, stdout, Write}, net::TcpListener, path::{Path, PathBuf}, str::FromStr, @@ -40,11 +40,11 @@ pub mod task_runner; use crate::db::{ db_pool, - handles::{insert_user, select_channel, select_global}, - models::{Channel, User}, + handles::{insert_user, select_global}, + models::User, }; use crate::player::utils::time_to_sec; -use crate::utils::{config::PlayoutConfig, errors::ServiceError, logging::log_file_path}; +use crate::utils::{errors::ServiceError, logging::log_file_path}; #[derive(Clone, Debug, Eq, Hash, PartialEq, Serialize, Deserialize)] pub enum Role { @@ -396,35 +396,6 @@ pub async fn run_args() -> Result<(), i32> { Ok(()) } -pub fn read_playout_config(path: &str) -> Result> { - let mut file = File::open(path)?; - let mut contents = String::new(); - file.read_to_string(&mut contents)?; - - let mut config: PlayoutConfig = toml_edit::de::from_str(&contents)?; - - config.playlist.start_sec = Some(time_to_sec(&config.playlist.day_start)); - config.playlist.length_sec = Some(time_to_sec(&config.playlist.length)); - - Ok(config) -} - -pub async fn playout_config( - conn: &Pool, - channel_id: &i32, -) -> Result<(PlayoutConfig, Channel), ServiceError> { - if let Ok(channel) = select_channel(conn, channel_id).await { - match read_playout_config(&channel.config_path.clone()) { - Ok(config) => return Ok((config, channel)), - Err(e) => error!("{e}"), - } - } - - Err(ServiceError::BadRequest( - "Error in getting config!".to_string(), - )) -} - pub async fn read_log_file(channel_id: &i32, date: &str) -> Result { let mut date_str = "".to_string(); diff --git a/ffplayout/src/utils/playlist.rs b/ffplayout/src/utils/playlist.rs index 3e2f0690..a40dff0e 100644 --- a/ffplayout/src/utils/playlist.rs +++ b/ffplayout/src/utils/playlist.rs @@ -1,20 +1,17 @@ use std::{fs, path::PathBuf}; -use simplelog::*; -use sqlx::{Pool, Sqlite}; +use log::*; use crate::player::utils::{json_reader, json_writer, JsonPlaylist}; use crate::utils::{ config::PlayoutConfig, errors::ServiceError, files::norm_abs_path, - generator::generate_playlist as playlist_generator, playout_config, + generator::generate_playlist as playlist_generator, }; pub async fn read_playlist( - conn: &Pool, - id: i32, + config: &PlayoutConfig, date: String, ) -> Result { - let (config, _) = playout_config(conn, &id).await?; let (path, _, _) = norm_abs_path(&config.playlist.path, "")?; let mut playlist_path = path; let d: Vec<&str> = date.split('-').collect(); @@ -31,13 +28,11 @@ pub async fn read_playlist( } pub async fn write_playlist( - conn: &Pool, - id: i32, + config: &PlayoutConfig, json_data: JsonPlaylist, ) -> Result { - let (config, _) = playout_config(conn, &id).await?; let date = json_data.date.clone(); - let mut playlist_path = config.playlist.path; + let mut playlist_path = config.playlist.path.clone(); let d: Vec<&str> = date.split('-').collect(); if !playlist_path @@ -125,12 +120,7 @@ pub async fn generate_playlist( } } -pub async fn delete_playlist( - conn: &Pool, - id: i32, - date: &str, -) -> Result { - let (config, _) = playout_config(conn, &id).await?; +pub async fn delete_playlist(config: &PlayoutConfig, date: &str) -> Result { let mut playlist_path = PathBuf::from(&config.playlist.path); let d: Vec<&str> = date.split('-').collect(); playlist_path = playlist_path diff --git a/migrations/00001_create_tables.sql b/migrations/00001_create_tables.sql index 5846615b..9dbcbc54 100644 --- a/migrations/00001_create_tables.sql +++ b/migrations/00001_create_tables.sql @@ -63,7 +63,7 @@ CREATE TABLE configurations ( starttls INTEGER NOT NULL DEFAULT 0, mail_level TEXT NOT NULL DEFAULT "ERROR", interval INTEGER NOT NULL DEFAULT 120, - log_help TEXT NOT NULL DEFAULT "If 'log_to_file' is true, log to file, when is false log to console. \n'backup_count' says how long log files will be saved in days.\n'local_time' to false will set log timestamps to UTC. Path to /var/log/ only if you run this program as daemon.\n'level' can be DEBUG, INFO, WARNING, ERROR.\n'ffmpeg_level/ingest_level' can be INFO, WARNING, ERROR.\n'detect_silence' logs an error message if the audio line is silent for 15 seconds during the validation process.\n'ignore_lines' makes logging to ignore strings that contains matched lines, in frontend is a semicolon separated list.", + logging_help TEXT NOT NULL DEFAULT "If 'log_to_file' is true, log to file, when is false log to console. \n'backup_count' says how long log files will be saved in days.\n'local_time' to false will set log timestamps to UTC. Path to /var/log/ only if you run this program as daemon.\n'level' can be DEBUG, INFO, WARNING, ERROR.\n'ffmpeg_level/ingest_level' can be INFO, WARNING, ERROR.\n'detect_silence' logs an error message if the audio line is silent for 15 seconds during the validation process.\n'ignore_lines' makes logging to ignore strings that contains matched lines, in frontend is a semicolon separated list.", log_to_file INTEGER NOT NULL DEFAULT 1, backup_count INTEGER NOT NULL DEFAULT 7, local_time INTEGER NOT NULL DEFAULT 1,