From 1daea32eb909087e74be94554a323b07569f97ae Mon Sep 17 00:00:00 2001 From: jb-alvarado Date: Wed, 12 Jun 2024 17:35:20 +0200 Subject: [PATCH] run channels with argument --- ffplayout/src/api/routes.rs | 12 +++-- ffplayout/src/main.rs | 81 +++++++++++++++++++----------- ffplayout/src/player/controller.rs | 39 ++++++++++++++ ffplayout/src/utils/args_parse.rs | 11 ++++ ffplayout/src/utils/mod.rs | 6 --- 5 files changed, 109 insertions(+), 40 deletions(-) diff --git a/ffplayout/src/api/routes.rs b/ffplayout/src/api/routes.rs index 546f0e5f..375dc2b0 100644 --- a/ffplayout/src/api/routes.rs +++ b/ffplayout/src/api/routes.rs @@ -532,15 +532,17 @@ async fn update_playout_config( controllers: web::Data>, ) -> Result { let manager = controllers.lock().unwrap().get(*id).unwrap(); - let mut config = manager.config.lock().unwrap(); - let id = config.general.id; - let channel_id = config.general.channel_id; + let general_config = manager.config.lock().unwrap().general.clone(); + let id = general_config.id; + let channel_id = general_config.channel_id; let db_config = Configuration::from(id, channel_id, data.into_inner()); if let Err(e) = handles::update_configuration(&pool.into_inner(), db_config.clone()).await { return Err(ServiceError::Conflict(format!("{e}"))); } + let mut config = manager.config.lock().unwrap(); + config.general.stop_threshold = db_config.stop_threshold; config.mail.subject = db_config.subject; config.mail.smtp_server = db_config.smtp_server; @@ -555,7 +557,7 @@ async fn update_playout_config( config.logging.detect_silence = db_config.detect_silence; config.logging.ignore_lines = db_config .ignore_lines - .split(";") + .split(';') .map(|l| l.to_string()) .collect(); config.processing.mode = string_to_processing_mode(db_config.processing_mode); @@ -587,7 +589,7 @@ async fn update_playout_config( config.storage.filler = PathBuf::from(db_config.filler); config.storage.extensions = db_config .extensions - .split(";") + .split(';') .map(|l| l.to_string()) .collect(); config.storage.shuffle = db_config.shuffle; diff --git a/ffplayout/src/main.rs b/ffplayout/src/main.rs index d6ff4715..8efdd889 100644 --- a/ffplayout/src/main.rs +++ b/ffplayout/src/main.rs @@ -26,7 +26,7 @@ use ffplayout::{ sse::{broadcast::Broadcaster, routes::*, AuthState}, utils::{ config::PlayoutConfig, - db_path, init_globales, + init_globales, logging::{init_logging, MailQueue}, run_args, }, @@ -79,41 +79,35 @@ async fn main() -> std::io::Result<()> { exit(c); } - let channel_controllers = Arc::new(Mutex::new(ChannelController::new())); - let channels = handles::select_all_channels(&pool) - .await - .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; - let mail_queues = Arc::new(Mutex::new(vec![])); init_globales(&pool).await; init_logging(mail_queues.clone())?; - for channel in channels.iter() { - let config = PlayoutConfig::new(&pool, channel.id).await; - - let channel_manager = - ChannelManager::new(Some(pool.clone()), channel.clone(), config.clone()); - - channel_controllers - .lock() - .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))? - .add(channel_manager.clone()); - let m_queue = Arc::new(Mutex::new(MailQueue::new(channel.id, config.mail))); - - if let Ok(mut mqs) = mail_queues.lock() { - mqs.push(m_queue.clone()); - } - - if channel.active { - channel_manager.async_start().await; - } - } + let channel_controllers = Arc::new(Mutex::new(ChannelController::new())); if let Some(conn) = &ARGS.listen { - if db_path().is_err() { - error!("Database is not initialized! Init DB first and add admin user."); - exit(1); + let channels = handles::select_all_channels(&pool) + .await + .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; + + for channel in channels.iter() { + let config = PlayoutConfig::new(&pool, channel.id).await; + let manager = ChannelManager::new(Some(pool.clone()), channel.clone(), config.clone()); + let m_queue = Arc::new(Mutex::new(MailQueue::new(channel.id, config.mail))); + + channel_controllers + .lock() + .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))? + .add(manager.clone()); + + if let Ok(mut mqs) = mail_queues.lock() { + mqs.push(m_queue.clone()); + } + + if channel.active { + manager.async_start().await; + } } let ip_port = conn.split(':').collect::>(); @@ -229,6 +223,35 @@ async fn main() -> std::io::Result<()> { .workers(thread_count) .run() .await + } else if ARGS.list_channels { + let channels = handles::select_all_channels(&pool) + .await + .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; + let ids = channels.iter().map(|c| c.id).collect::>(); + + info!("Available channels: {ids:?}"); + + Ok(()) + } else if let Some(channels) = &ARGS.channels { + for (index, channel_id) in channels.iter().enumerate() { + let channel = handles::select_channel(&pool, &channel_id).await.unwrap(); + let config = PlayoutConfig::new(&pool, *channel_id).await; + let manager = ChannelManager::new(Some(pool.clone()), channel.clone(), config.clone()); + let m_queue = Arc::new(Mutex::new(MailQueue::new(*channel_id, config.mail))); + + channel_controllers + .lock() + .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))? + .add(manager.clone()); + + if let Ok(mut mqs) = mail_queues.lock() { + mqs.push(m_queue.clone()); + } + + manager.foreground_start(index).await; + } + + Ok(()) } else { error!("Run ffplayout with listen parameter!"); diff --git a/ffplayout/src/player/controller.rs b/ffplayout/src/player/controller.rs index d351bef1..877b5663 100644 --- a/ffplayout/src/player/controller.rs +++ b/ffplayout/src/player/controller.rs @@ -24,6 +24,7 @@ use crate::utils::{ config::{OutputMode::*, PlayoutConfig}, errors::ProcessError, }; +use crate::ARGS; /// Defined process units. #[derive(Clone, Debug, Default, Copy, Eq, Serialize, Deserialize, PartialEq)] @@ -122,6 +123,44 @@ impl ChannelManager { } } + pub async fn foreground_start(&self, index: usize) { + if !self.is_alive.load(Ordering::SeqCst) { + self.run_count.fetch_add(1, Ordering::SeqCst); + self.is_alive.store(true, Ordering::SeqCst); + self.is_terminated.store(false, Ordering::SeqCst); + + let pool_clone = self.db_pool.clone().unwrap(); + let self_clone = self.clone(); + let channel_id = self.channel.lock().unwrap().id; + + if let Err(e) = handles::update_player(&pool_clone, channel_id, true).await { + error!("Unable write to player status: {e}"); + }; + + if index + 1 == ARGS.channels.clone().unwrap_or_default().len() { + let run_count = self_clone.run_count.clone(); + + tokio::task::spawn_blocking(move || { + if let Err(e) = start_channel(self_clone) { + run_count.fetch_sub(1, Ordering::SeqCst); + error!("{e}"); + } + }) + .await + .unwrap(); + } else { + thread::spawn(move || { + let run_count = self_clone.run_count.clone(); + + if let Err(e) = start_channel(self_clone) { + run_count.fetch_sub(1, Ordering::SeqCst); + error!("{e}"); + }; + }); + } + } + } + pub fn stop(&self, unit: ProcessUnit) -> Result<(), ProcessError> { let mut channel = self.channel.lock()?; diff --git a/ffplayout/src/utils/args_parse.rs b/ffplayout/src/utils/args_parse.rs index 1ea10019..8b1e2551 100644 --- a/ffplayout/src/utils/args_parse.rs +++ b/ffplayout/src/utils/args_parse.rs @@ -13,6 +13,17 @@ pub struct Args { #[clap(long, help = "path to database file")] pub db: Option, + #[clap( + short, + long, + help = "Run channels by ids immediately (works without webserver and frontend, no listening parameter is needed)", + num_args = 1.., + )] + pub channels: Option>, + + #[clap(long, help = "List available channel ids")] + pub list_channels: bool, + #[clap(long, help = "path to public files")] pub public: Option, diff --git a/ffplayout/src/utils/mod.rs b/ffplayout/src/utils/mod.rs index 0d76ab27..38029fd3 100644 --- a/ffplayout/src/utils/mod.rs +++ b/ffplayout/src/utils/mod.rs @@ -317,12 +317,6 @@ pub fn public_path() -> PathBuf { pub async fn run_args() -> Result<(), i32> { let mut args = ARGS.clone(); - if args.listen.is_none() && !args.ask && args.username.is_none() { - eprintln!("Wrong number of arguments! Run ffplayout --help for more information."); - - return Err(0); - } - if args.ask { let mut user = String::new(); print!("Username: ");