diff --git a/ffplayout/src/main.rs b/ffplayout/src/main.rs index 5ee06c91..5f81315f 100644 --- a/ffplayout/src/main.rs +++ b/ffplayout/src/main.rs @@ -30,7 +30,7 @@ use ffplayout::{ control::ProcessControl, db_path, init_globales, logging::{init_logging, MailQueue}, - round_to_nearest_ten, run_args, + run_args, }, ARGS, }; @@ -75,7 +75,9 @@ async fn main() -> std::io::Result<()> { .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; let mail_queues = Arc::new(Mutex::new(vec![])); - let mail_messages = Arc::new(Mutex::new(vec![])); + + init_globales(&pool).await; + init_logging(mail_queues.clone())?; for channel in channels.iter() { println!("channel: {channel:?}"); @@ -90,33 +92,31 @@ async fn main() -> std::io::Result<()> { } }; - let queue = MailQueue::new( - channel.id, - round_to_nearest_ten(config.mail.interval), - config.mail, - ); + let queue = MailQueue::new(channel.id, config.mail); - if let Ok(mut q) = mail_queues.lock() { - q.push(queue); + if let Ok(mut mq) = mail_queues.lock() { + mq.push(queue); } + warn!("This logs to console"); + if channel.active { thread::spawn(move || { - thread::sleep(std::time::Duration::from_secs(5)); + info!(target: "{file}", channel = 1; "Start Playout"); - error!(target: "{mail}", channel = 1; "This logs to File and Mail"); + thread::sleep(std::time::Duration::from_secs(1)); + + error!(target: "{file,mail}", channel = 1; "This logs to File and Mail"); }); } } - init_logging(mail_queues, mail_messages)?; - if let Some(conn) = &ARGS.listen { if db_path().is_err() { error!("Database is not initialized! Init DB first and add admin user."); exit(1); } - init_globales(&pool).await; + let ip_port = conn.split(':').collect::>(); let addr = ip_port[0]; let port = ip_port[1].parse::().unwrap(); @@ -130,6 +130,8 @@ async fn main() -> std::io::Result<()> { // no 'allow origin' here, give it to the reverse proxy HttpServer::new(move || { + let queues = mail_queues.clone(); + let auth = HttpAuthentication::bearer(validator); let db_pool = web::Data::new(pool.clone()); // Customize logging format to get IP though proxies. @@ -138,6 +140,7 @@ async fn main() -> std::io::Result<()> { let mut web_app = App::new() .app_data(db_pool) + .app_data(web::Data::from(queues)) .app_data(engine_process.clone()) .app_data(auth_state.clone()) .app_data(web::Data::from(Arc::clone(&broadcast_data))) diff --git a/ffplayout/src/utils/logging.rs b/ffplayout/src/utils/logging.rs index 6d6148e2..b6b86917 100644 --- a/ffplayout/src/utils/logging.rs +++ b/ffplayout/src/utils/logging.rs @@ -102,18 +102,12 @@ impl LogWriter for MultiFileLogger { } pub struct LogMailer { - pub messages: Arc>>, + pub mail_queues: Arc>>, } impl LogMailer { - pub fn new(messages: Arc>>) -> Self { - Self { messages } - } - - fn push(&self, msg: MailMessage) { - if let Ok(mut list) = self.messages.lock() { - list.push(msg) - } + pub fn new(mail_queues: Arc>>) -> Self { + Self { mail_queues } } } @@ -129,18 +123,20 @@ impl LogWriter for LogMailer { ) .unwrap_or(0); - let msg = MailMessage::new( - id, + let msg = format!( + "[{}] [{:>5}] {}", + now.now().format("%Y-%m-%d %H:%M:%S"), record.level(), - format!( - "[{}] [{:>5}] {}", - now.now().format("%Y-%m-%d %H:%M:%S"), - record.level(), - record.args() - ), + record.args() ); - self.push(msg.clone()); + if let Ok(mut queues) = self.mail_queues.lock() { + if let Some(queue) = queues.iter_mut().find(|q| q.id == id) { + if queue.level_eq(record.level()) { + queue.push(msg) + } + } + } Ok(()) } @@ -152,23 +148,32 @@ impl LogWriter for LogMailer { #[derive(Clone, Debug)] pub struct MailQueue { pub id: i32, - pub expire: u64, pub config: Mail, pub lines: Vec, } impl MailQueue { - pub fn new(id: i32, expire: u64, config: Mail) -> Self { + pub fn new(id: i32, config: Mail) -> Self { Self { id, - expire, config, lines: vec![], } } - pub fn update(&mut self, expire: u64, config: Mail) { - self.expire = expire; + pub fn level_eq(&self, level: Level) -> bool { + match level { + Level::Error => self.config.mail_level == Level::Error, + Level::Warn => matches!(self.config.mail_level, Level::Warn | Level::Error), + Level::Info => matches!( + self.config.mail_level, + Level::Info | Level::Warn | Level::Error + ), + _ => false, + } + } + + pub fn update(&mut self, config: Mail) { self.config = config; } @@ -176,6 +181,10 @@ impl MailQueue { self.lines.clear(); } + pub fn push(&mut self, line: String) { + self.lines.push(line); + } + fn text(&self) -> String { self.lines.join("\n") } @@ -185,28 +194,6 @@ impl MailQueue { } } -#[derive(Clone, Debug)] -pub struct MailMessage { - pub id: i32, - pub level: Level, - pub line: String, -} - -impl MailMessage { - pub fn new(id: i32, level: Level, line: String) -> Self { - Self { id, level, line } - } - - fn eq(&self, level: Level) -> bool { - match level { - Level::Error => self.level == Level::Error, - Level::Warn => matches!(self.level, Level::Warn | Level::Error), - Level::Info => matches!(self.level, Level::Info | Level::Warn | Level::Error), - _ => false, - } - } -} - fn console_formatter(w: &mut dyn Write, _now: &mut DeferredNow, record: &Record) -> io::Result<()> { let level = match record.level() { Level::Debug => "[DEBUG]", @@ -250,9 +237,7 @@ fn file_logger() -> Box { if ARGS.log_to_console { Box::new(LogConsole) } else { - let logger = MultiFileLogger::new(log_path); - - Box::new(logger) + Box::new(MultiFileLogger::new(log_path)) } } @@ -297,7 +282,7 @@ pub async fn send_mail(config: &Mail, msg: String) -> Result<(), ProcessError> { /// Basic Mail Queue /// /// Check every give seconds for messages and send them. -pub fn mail_queue(mail_queues: Arc>>, messages: Arc>>) { +pub fn mail_queue(mail_queues: Arc>>) { actix_web::rt::spawn(async move { let sec = 10; let mut interval = interval(Duration::from_secs(sec)); @@ -313,7 +298,7 @@ pub fn mail_queue(mail_queues: Arc>>, messages: Arc l, Err(e) => { error!("Failed to lock mail_queues {e}"); @@ -321,30 +306,18 @@ pub fn mail_queue(mail_queues: Arc>>, messages: Arc m, - Err(e) => { - error!("Failed to lock messages {e}"); - continue; - } - }; - - while let Some(msg) = msgs.pop() { - if let Some(queue) = msg_list.iter_mut().find(|q| q.id == msg.id) { - if msg.eq(queue.config.mail_level) { - queue.lines.push(msg.line.clone()); - } - } - } - // Process mail queues and send emails - for queue in msg_list.iter_mut() { + for queue in queues.iter_mut() { let interval = round_to_nearest_ten(counter); + let expire = round_to_nearest_ten(queue.config.interval); - if interval % queue.expire == 0 && !queue.is_empty() { - if let Err(e) = send_mail(&queue.config, queue.text()).await { - error!(target: "{file}", channel = queue.id; "Failed to send mail: {e}"); + if interval % expire == 0 && !queue.is_empty() { + if queue.config.recipient.contains('@') { + if let Err(e) = send_mail(&queue.config, queue.text()).await { + error!(target: "{file}", channel = queue.id; "Failed to send mail: {e}"); + } } + // Clear the messages after sending the email queue.clear(); } @@ -358,10 +331,7 @@ pub fn mail_queue(mail_queues: Arc>>, messages: Arc>>, - messages: Arc>>, -) -> io::Result<()> { +pub fn init_logging(mail_queues: Arc>>) -> io::Result<()> { let log_level = match ARGS .log_level .clone() @@ -378,7 +348,7 @@ pub fn init_logging( _ => LevelFilter::Debug, }; - mail_queue(mail_queues, messages.clone()); + mail_queue(mail_queues.clone()); // Build the initial log specification let mut builder = LogSpecification::builder(); @@ -396,7 +366,7 @@ pub fn init_logging( .format(console_formatter) .log_to_stderr() .add_writer("file", file_logger()) - .add_writer("mail", Box::new(LogMailer::new(messages))) + .add_writer("mail", Box::new(LogMailer::new(mail_queues))) .start() .map_err(|e| io::Error::new(ErrorKind::Other, e.to_string()))?;