change mail_queue structure

This commit is contained in:
jb-alvarado 2024-06-06 09:17:30 +02:00
parent 17174e2045
commit 3c2801acab
4 changed files with 77 additions and 35 deletions

View File

@ -20,7 +20,15 @@
"cSpell.words": [
"actix",
"ffpengine",
"flexi",
"lettre",
"libc",
"neli",
"paris",
"reqwest",
"rsplit",
"rustls",
"sqlx",
"starttls",
"tokio",
"uuids"

View File

@ -2,17 +2,11 @@ use std::fmt;
use flexi_logger::{
filter::{LogLineFilter, LogLineWriter},
DeferredNow, FlexiLoggerError, FormatFunction, Logger,
DeferredNow, FlexiLoggerError, Logger,
};
use log::info;
use log::kv::Key;
use flexi_logger::writers::LogWriter;
use std::{
io::{Error, ErrorKind},
sync::{Arc, Mutex},
};
#[derive(Debug)]
enum Target {
Terminal,
@ -56,7 +50,7 @@ fn main() -> Result<(), FlexiLoggerError> {
.filter(Box::new(Console))
.start()?;
info!(target = Target::Terminal.as_str(); "info logging");
info!(target: Target::Terminal.as_str(), "info logging");
Ok(())
}

View File

@ -29,7 +29,7 @@ use ffplayout::{
config::PlayoutConfig,
control::ProcessControl,
db_path, init_globales,
logging::{init_logging, MailQueue},
logging::{init_logging, MailQueue, Target},
run_args,
},
ARGS,
@ -92,21 +92,26 @@ async fn main() -> std::io::Result<()> {
}
};
let queue = MailQueue::new(channel.id, config.mail);
let m_queue = Arc::new(Mutex::new(MailQueue::new(channel.id, config.mail)));
if let Ok(mut mq) = mail_queues.lock() {
mq.push(queue);
if let Ok(mut mqs) = mail_queues.lock() {
mqs.push(m_queue.clone());
}
warn!("This logs to console");
if channel.active {
thread::spawn(move || {
info!(target: "{file}", channel = 1; "Start Playout");
info!(target: Target::file(), channel = 1; "Start Playout");
thread::sleep(std::time::Duration::from_secs(1));
error!(target: "{file,mail}", channel = 1; "This logs to File and Mail");
error!(target: Target::file_mail(), channel = 1; "This logs to File and Mail, channel 1");
error!(target: Target::file_mail(), channel = 2; "This logs to File and Mail, channel 2");
error!(target: Target::file_mail(), channel = 1; "This logs to File and Mail, channel 1");
error!(target: Target::file_mail(), channel = 3; "This logs to File and Mail, channel 3");
error!(target: Target::file_mail(), channel = 1; "This logs to File and Mail, channel 1");
error!(target: Target::file_mail(), channel = 1; "This logs to File and Mail, channel 1");
});
}
}

View File

@ -23,6 +23,27 @@ use super::ARGS;
use crate::utils::{config::Mail, errors::ProcessError, round_to_nearest_ten};
#[derive(Debug)]
pub struct Target;
impl Target {
pub fn console() -> &'static str {
"{console}"
}
pub fn file() -> &'static str {
"{file}"
}
pub fn mail() -> &'static str {
"{mail}"
}
pub fn file_mail() -> &'static str {
"{file,mail}"
}
}
pub struct LogConsole;
impl LogWriter for LogConsole {
@ -102,11 +123,11 @@ impl LogWriter for MultiFileLogger {
}
pub struct LogMailer {
pub mail_queues: Arc<Mutex<Vec<MailQueue>>>,
pub mail_queues: Arc<Mutex<Vec<Arc<Mutex<MailQueue>>>>>,
}
impl LogMailer {
pub fn new(mail_queues: Arc<Mutex<Vec<MailQueue>>>) -> Self {
pub fn new(mail_queues: Arc<Mutex<Vec<Arc<Mutex<MailQueue>>>>>) -> Self {
Self { mail_queues }
}
}
@ -123,18 +144,26 @@ impl LogWriter for LogMailer {
)
.unwrap_or(0);
let msg = format!(
let mut queues = self.mail_queues.lock().unwrap_or_else(|poisoned| {
error!("Queues mutex was poisoned");
poisoned.into_inner()
});
for queue in queues.iter_mut() {
let mut q_lock = queue.lock().unwrap_or_else(|poisoned| {
error!("Queue mutex was poisoned");
poisoned.into_inner()
});
if q_lock.id == id && q_lock.level_eq(record.level()) {
q_lock.push(format!(
"[{}] [{:>5}] {}",
now.now().format("%Y-%m-%d %H:%M:%S"),
record.level(),
record.args()
);
));
if let Ok(mut queues) = self.mail_queues.lock() {
if let Some(queue) = queues.iter_mut().find(|q| q.id == id) {
if queue.level_eq(record.level()) {
queue.push(msg)
}
break;
}
}
@ -282,7 +311,7 @@ pub async fn send_mail(config: &Mail, msg: String) -> Result<(), ProcessError> {
/// Basic Mail Queue
///
/// Check every give seconds for messages and send them.
pub fn mail_queue(mail_queues: Arc<Mutex<Vec<MailQueue>>>) {
pub fn mail_queue(mail_queues: Arc<Mutex<Vec<Arc<Mutex<MailQueue>>>>>) {
actix_web::rt::spawn(async move {
let sec = 10;
let mut interval = interval(Duration::from_secs(sec));
@ -309,17 +338,23 @@ pub fn mail_queue(mail_queues: Arc<Mutex<Vec<MailQueue>>>) {
// Process mail queues and send emails
for queue in queues.iter_mut() {
let interval = round_to_nearest_ten(counter);
let expire = round_to_nearest_ten(queue.config.interval);
let mut q_lock = queue.lock().unwrap_or_else(|poisoned| {
error!("Queue mutex was poisoned");
if interval % expire == 0 && !queue.is_empty() {
if queue.config.recipient.contains('@') {
if let Err(e) = send_mail(&queue.config, queue.text()).await {
error!(target: "{file}", channel = queue.id; "Failed to send mail: {e}");
poisoned.into_inner()
});
let expire = round_to_nearest_ten(q_lock.config.interval);
if interval % expire == 0 && !q_lock.is_empty() {
if q_lock.config.recipient.contains('@') {
if let Err(e) = send_mail(&q_lock.config, q_lock.text()).await {
error!(target: "{file}", channel = q_lock.id; "Failed to send mail: {e}");
}
}
// Clear the messages after sending the email
queue.clear();
q_lock.clear();
}
}
}
@ -331,7 +366,7 @@ pub fn mail_queue(mail_queues: Arc<Mutex<Vec<MailQueue>>>) {
/// - console logger
/// - file logger
/// - mail logger
pub fn init_logging(mail_queues: Arc<Mutex<Vec<MailQueue>>>) -> io::Result<()> {
pub fn init_logging(mail_queues: Arc<Mutex<Vec<Arc<Mutex<MailQueue>>>>>) -> io::Result<()> {
let log_level = match ARGS
.log_level
.clone()