implement mail queue and logger

This commit is contained in:
jb-alvarado 2024-06-05 16:33:05 +02:00
parent 4d49f0918a
commit 44e27fa15f
10 changed files with 406 additions and 165 deletions

16
Cargo.lock generated
View File

@ -1915,7 +1915,7 @@ dependencies = [
"rustls 0.22.4", "rustls 0.22.4",
"rustls-pki-types", "rustls-pki-types",
"tokio", "tokio",
"tokio-rustls", "tokio-rustls 0.25.0",
"tower-service", "tower-service",
] ]
@ -2173,6 +2173,7 @@ dependencies = [
"rustls-pemfile", "rustls-pemfile",
"socket2 0.5.7", "socket2 0.5.7",
"tokio", "tokio",
"tokio-rustls 0.26.0",
"url", "url",
"webpki-roots", "webpki-roots",
] ]
@ -2916,7 +2917,7 @@ dependencies = [
"serde_urlencoded", "serde_urlencoded",
"sync_wrapper", "sync_wrapper",
"tokio", "tokio",
"tokio-rustls", "tokio-rustls 0.25.0",
"tower-service", "tower-service",
"url", "url",
"wasm-bindgen", "wasm-bindgen",
@ -3922,6 +3923,17 @@ dependencies = [
"tokio", "tokio",
] ]
[[package]]
name = "tokio-rustls"
version = "0.26.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c7bc40d0e5a97695bb96e27995cd3a08538541b0a846f65bba7a359f36700d4"
dependencies = [
"rustls 0.23.5",
"rustls-pki-types",
"tokio",
]
[[package]] [[package]]
name = "tokio-stream" name = "tokio-stream"
version = "0.1.15" version = "0.1.15"

View File

@ -20,7 +20,7 @@ authorization = "av2Kx8g67lF9qj5wEH3ym1bI4cCs"
help_text = """Send error messages to email address, like missing playlist; invalid \ help_text = """Send error messages to email address, like missing playlist; invalid \
json format; missing clip path. Leave recipient blank, if you don't need this. json format; missing clip path. Leave recipient blank, if you don't need this.
'mail_level' can be INFO, WARNING or ERROR. 'mail_level' can be INFO, WARNING or ERROR.
'interval' means seconds until a new mail will be sended.""" 'interval' means seconds until a new mail will be sended, value must be in increments of 10."""
subject = "Playout Error" subject = "Playout Error"
smtp_server = "mail.example.org" smtp_server = "mail.example.org"
starttls = true starttls = true
@ -28,7 +28,7 @@ sender_addr = "ffplayout@example.org"
sender_pass = "abc123" sender_pass = "abc123"
recipient = "" recipient = ""
mail_level = "ERROR" mail_level = "ERROR"
interval = 30 interval = 120
[logging] [logging]
help_text = """If 'log_to_file' is true, log to file, when is false log to console. help_text = """If 'log_to_file' is true, log to file, when is false log to console.

View File

@ -31,7 +31,7 @@ futures-util = { version = "0.3", default-features = false, features = ["std"] }
home = "0.5" home = "0.5"
jsonwebtoken = "9" jsonwebtoken = "9"
lazy_static = "1.4" lazy_static = "1.4"
lettre = { version = "0.11", features = ["builder", "rustls-tls", "smtp-transport", "tokio1"], default-features = false } lettre = { version = "0.11", features = ["builder", "rustls-tls", "smtp-transport", "tokio1", "tokio1-rustls-tls"], default-features = false }
lexical-sort = "0.3" lexical-sort = "0.3"
local-ip-address = "0.6" local-ip-address = "0.6"
log = { version = "0.4", features = ["std", "serde", "kv", "kv_std", "kv_sval", "kv_serde"] } log = { version = "0.4", features = ["std", "serde", "kv", "kv_std", "kv_sval", "kv_serde"] }

View File

@ -38,6 +38,8 @@ async fn create_schema(conn: &Pool<Sqlite>) -> Result<SqliteQueryResult, sqlx::E
config_path TEXT NOT NULL, config_path TEXT NOT NULL,
extra_extensions TEXT NOT NULL, extra_extensions TEXT NOT NULL,
active INTEGER NOT NULL DEFAULT 0, active INTEGER NOT NULL DEFAULT 0,
modified TEXT,
time_shift REAL NOT NULL DEFAULT 0,
UNIQUE(name) UNIQUE(name)
); );
CREATE TABLE IF NOT EXISTS presets CREATE TABLE IF NOT EXISTS presets

View File

@ -1,4 +1,11 @@
use std::{collections::HashSet, env, process::exit, sync::Arc, thread}; use std::{
collections::HashSet,
env, io,
path::PathBuf,
process::exit,
sync::{Arc, Mutex},
thread,
};
use actix_files::Files; use actix_files::Files;
use actix_web::{ use actix_web::{
@ -10,24 +17,27 @@ use actix_web_httpauth::{extractors::bearer::BearerAuth, middleware::HttpAuthent
#[cfg(all(not(debug_assertions), feature = "embed_frontend"))] #[cfg(all(not(debug_assertions), feature = "embed_frontend"))]
use actix_web_static_files::ResourceFiles; use actix_web_static_files::ResourceFiles;
use log::*;
use path_clean::PathClean; use path_clean::PathClean;
use simplelog::*;
use tokio::sync::Mutex;
use ffplayout::{ use ffplayout::{
api::{auth, routes::*}, api::{auth, routes::*},
db::{db_pool, handles, models::LoginUser}, db::{db_pool, handles, models::LoginUser},
player::controller::ChannelController, player::controller::ChannelController,
sse::{broadcast::Broadcaster, routes::*, AuthState}, sse::{broadcast::Broadcaster, routes::*, AuthState},
utils::{control::ProcessControl, db_path, init_config, run_args}, utils::{
config::PlayoutConfig,
control::ProcessControl,
db_path, init_globales,
logging::{init_logging, MailQueue},
round_to_nearest_ten, run_args,
},
ARGS, ARGS,
}; };
#[cfg(any(debug_assertions, not(feature = "embed_frontend")))] #[cfg(any(debug_assertions, not(feature = "embed_frontend")))]
use ffplayout::utils::public_path; use ffplayout::utils::public_path;
use ffplayout_lib::utils::{init_logging, PlayoutConfig};
#[cfg(all(not(debug_assertions), feature = "embed_frontend"))] #[cfg(all(not(debug_assertions), feature = "embed_frontend"))]
include!(concat!(env!("OUT_DIR"), "/generated.rs")); include!(concat!(env!("OUT_DIR"), "/generated.rs"));
@ -51,52 +61,68 @@ async fn validator(
#[actix_web::main] #[actix_web::main]
async fn main() -> std::io::Result<()> { async fn main() -> std::io::Result<()> {
let mut config = PlayoutConfig::new(None, None);
config.mail.recipient = String::new();
config.logging.log_to_file = false;
config.logging.timestamp = false;
let logging = init_logging(&config, None, None);
CombinedLogger::init(logging).unwrap();
if let Err(c) = run_args().await { if let Err(c) = run_args().await {
exit(c); exit(c);
} }
let pool = match db_pool().await { let pool = db_pool()
Ok(p) => p, .await
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
let _channel_controller = ChannelController::new();
let channels = handles::select_all_channels(&pool)
.await
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
let mail_queues = Arc::new(Mutex::new(vec![]));
let mail_messages = Arc::new(Mutex::new(vec![]));
for channel in channels.iter() {
println!("channel: {channel:?}");
let _channel_clone = channel.clone();
let config_path = PathBuf::from(&channel.config_path);
let config = match web::block(move || PlayoutConfig::new(Some(config_path), None)).await {
Ok(config) => config,
Err(e) => { Err(e) => {
error!("{e}"); error!("Failed to load configuration: {}", e);
exit(1); continue;
} }
}; };
let channel_controller = ChannelController::new(); let queue = MailQueue::new(
let mut channels = handles::select_all_channels(&pool) channel.id,
.await round_to_nearest_ten(config.mail.interval),
.unwrap_or_default(); config.mail,
);
if let Ok(mut q) = mail_queues.lock() {
q.push(queue);
}
for channel in channels.iter_mut() {
let channel_clone = channel.clone();
if channel.active { if channel.active {
thread::spawn(move || { thread::spawn(move || {
println!("{channel_clone:?}"); thread::sleep(std::time::Duration::from_secs(5));
error!(target: "{mail}", channel = 1; "This logs to File and Mail");
}); });
} }
} }
init_logging(mail_queues, mail_messages)?;
if let Some(conn) = &ARGS.listen { if let Some(conn) = &ARGS.listen {
if db_path().is_err() { if db_path().is_err() {
error!("Database is not initialized! Init DB first and add admin user."); error!("Database is not initialized! Init DB first and add admin user.");
exit(1); exit(1);
} }
init_config(&pool).await; init_globales(&pool).await;
let ip_port = conn.split(':').collect::<Vec<&str>>(); let ip_port = conn.split(':').collect::<Vec<&str>>();
let addr = ip_port[0]; let addr = ip_port[0];
let port = ip_port[1].parse::<u16>().unwrap(); let port = ip_port[1].parse::<u16>().unwrap();
let engine_process = web::Data::new(ProcessControl::new()); let engine_process = web::Data::new(ProcessControl::new());
let auth_state = web::Data::new(AuthState { let auth_state = web::Data::new(AuthState {
uuids: Mutex::new(HashSet::new()), uuids: tokio::sync::Mutex::new(HashSet::new()),
}); });
let broadcast_data = Broadcaster::create(); let broadcast_data = Broadcaster::create();

View File

@ -19,6 +19,18 @@ pub struct Args {
#[clap(short, long, help = "Listen on IP:PORT, like: 127.0.0.1:8787")] #[clap(short, long, help = "Listen on IP:PORT, like: 127.0.0.1:8787")]
pub listen: Option<String>, pub listen: Option<String>,
#[clap(long, help = "Keep log file for given days")]
pub log_backup_count: Option<usize>,
#[clap(long, help = "Override logging level: trace, debug, info, warn, error")]
pub log_level: Option<String>,
#[clap(long, help = "Logging path")]
pub log_path: Option<PathBuf>,
#[clap(long, help = "Log to console")]
pub log_to_console: bool,
#[clap(short, long, help = "Initialize Database")] #[clap(short, long, help = "Initialize Database")]
pub init: bool, pub init: bool,

View File

@ -8,7 +8,7 @@ use std::{
}; };
use chrono::NaiveTime; use chrono::NaiveTime;
use log::LevelFilter; use flexi_logger::Level;
use serde::{de, Deserialize, Deserializer, Serialize, Serializer}; use serde::{de, Deserialize, Deserializer, Serialize, Serializer};
use shlex::split; use shlex::split;
@ -97,37 +97,6 @@ impl FromStr for ProcessMode {
} }
} }
pub fn string_to_log_level<'de, D>(deserializer: D) -> Result<LevelFilter, D::Error>
where
D: Deserializer<'de>,
{
let s: String = Deserialize::deserialize(deserializer)?;
match s.to_lowercase().as_str() {
"debug" => Ok(LevelFilter::Debug),
"error" => Ok(LevelFilter::Error),
"info" => Ok(LevelFilter::Info),
"trace" => Ok(LevelFilter::Trace),
"warn" => Ok(LevelFilter::Warn),
"off" => Ok(LevelFilter::Off),
_ => Err(de::Error::custom("Error level not exists!")),
}
}
fn log_level_to_string<S>(l: &LevelFilter, s: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
match l {
LevelFilter::Debug => s.serialize_str("DEBUG"),
LevelFilter::Error => s.serialize_str("ERROR"),
LevelFilter::Info => s.serialize_str("INFO"),
LevelFilter::Trace => s.serialize_str("TRACE"),
LevelFilter::Warn => s.serialize_str("WARNING"),
LevelFilter::Off => s.serialize_str("OFF"),
}
}
#[derive(Debug, Serialize, Deserialize, Clone)] #[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Template { pub struct Template {
pub sources: Vec<Source>, pub sources: Vec<Source>,
@ -209,25 +178,17 @@ pub struct Mail {
pub sender_addr: String, pub sender_addr: String,
pub sender_pass: String, pub sender_pass: String,
pub recipient: String, pub recipient: String,
pub mail_level: String, #[serde(
serialize_with = "log_level_to_string",
deserialize_with = "string_to_log_level"
)]
pub mail_level: Level,
pub interval: u64, pub interval: u64,
} }
#[derive(Debug, Serialize, Deserialize, Clone)] #[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Logging { pub struct Logging {
pub help_text: String, pub help_text: String,
pub log_to_file: bool,
pub backup_count: usize,
pub local_time: bool,
pub timestamp: bool,
#[serde(alias = "log_path")]
pub path: PathBuf,
#[serde(
alias = "log_level",
serialize_with = "log_level_to_string",
deserialize_with = "string_to_log_level"
)]
pub level: LevelFilter,
pub ffmpeg_level: String, pub ffmpeg_level: String,
pub ingest_level: Option<String>, pub ingest_level: Option<String>,
#[serde(default)] #[serde(default)]
@ -351,6 +312,35 @@ pub struct Out {
pub output_cmd: Option<Vec<String>>, pub output_cmd: Option<Vec<String>>,
} }
pub fn string_to_log_level<'de, D>(deserializer: D) -> Result<Level, D::Error>
where
D: Deserializer<'de>,
{
let s: String = Deserialize::deserialize(deserializer)?;
match s.to_lowercase().as_str() {
"debug" => Ok(Level::Debug),
"error" => Ok(Level::Error),
"info" => Ok(Level::Info),
"trace" => Ok(Level::Trace),
"warning" => Ok(Level::Warn),
_ => Err(de::Error::custom("Error level not exists!")),
}
}
fn log_level_to_string<S>(l: &Level, s: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
match l {
Level::Debug => s.serialize_str("DEBUG"),
Level::Error => s.serialize_str("ERROR"),
Level::Info => s.serialize_str("INFO"),
Level::Trace => s.serialize_str("TRACE"),
Level::Warn => s.serialize_str("WARNING"),
}
}
fn default_track_index() -> i32 { fn default_track_index() -> i32 {
-1 -1
} }

View File

@ -121,3 +121,21 @@ impl From<std::io::Error> for ProcessError {
ProcessError::IO(err) ProcessError::IO(err)
} }
} }
impl From<lettre::address::AddressError> for ProcessError {
fn from(err: lettre::address::AddressError) -> ProcessError {
ProcessError::Custom(err.to_string())
}
}
impl From<lettre::transport::smtp::Error> for ProcessError {
fn from(err: lettre::transport::smtp::Error) -> ProcessError {
ProcessError::Custom(err.to_string())
}
}
impl From<lettre::error::Error> for ProcessError {
fn from(err: lettre::error::Error) -> ProcessError {
ProcessError::Custom(err.to_string())
}
}

View File

@ -1,16 +1,17 @@
use std::{ use std::{
collections::HashMap, collections::HashMap,
env,
io::{self, ErrorKind, Write}, io::{self, ErrorKind, Write},
path::PathBuf, path::PathBuf,
sync::{atomic::Ordering, Arc, Mutex}, sync::{Arc, Mutex},
thread::{self, sleep},
time::Duration, time::Duration,
}; };
use actix_web::{rt::time::interval, web}; use actix_web::rt::time::interval;
use chrono::prelude::*;
use flexi_logger::writers::{FileLogWriter, LogWriter}; use flexi_logger::writers::{FileLogWriter, LogWriter};
use flexi_logger::{Age, Cleanup, Criterion, DeferredNow, FileSpec, Logger, Naming}; use flexi_logger::{
Age, Cleanup, Criterion, DeferredNow, FileSpec, Level, LogSpecification, Logger, Naming,
};
use lettre::{ use lettre::{
message::header, transport::smtp::authentication::Credentials, AsyncSmtpTransport, message::header, transport::smtp::authentication::Credentials, AsyncSmtpTransport,
AsyncTransport, Message, Tokio1Executor, AsyncTransport, Message, Tokio1Executor,
@ -18,10 +19,9 @@ use lettre::{
use log::{kv::Value, *}; use log::{kv::Value, *};
use paris::formatter::colorize_string; use paris::formatter::colorize_string;
use crate::utils::{ use super::ARGS;
config::{Logging, PlayoutConfig},
control::ProcessControl, use crate::utils::{config::Mail, errors::ProcessError, round_to_nearest_ten};
};
pub struct LogConsole; pub struct LogConsole;
@ -38,51 +38,55 @@ impl LogWriter for LogConsole {
} }
struct MultiFileLogger { struct MultiFileLogger {
config: Logging, log_path: PathBuf,
writers: Arc<Mutex<HashMap<String, Arc<Mutex<FileLogWriter>>>>>, writers: Arc<Mutex<HashMap<i32, Arc<Mutex<FileLogWriter>>>>>,
} }
impl MultiFileLogger { impl MultiFileLogger {
pub fn new(config: &Logging) -> Self { pub fn new(log_path: PathBuf) -> Self {
MultiFileLogger { MultiFileLogger {
config: config.clone(), log_path,
writers: Arc::new(Mutex::new(HashMap::new())), writers: Arc::new(Mutex::new(HashMap::new())),
} }
} }
fn get_writer(&self, channel: &str) -> io::Result<Arc<Mutex<FileLogWriter>>> { fn get_writer(&self, channel: i32) -> io::Result<Arc<Mutex<FileLogWriter>>> {
let mut writers = self.writers.lock().unwrap(); let mut writers = self.writers.lock().unwrap();
if !writers.contains_key(channel) { if !writers.contains_key(&channel) {
let writer = FileLogWriter::builder( let writer = FileLogWriter::builder(
FileSpec::default() FileSpec::default()
.suppress_timestamp() .suppress_timestamp()
.directory(self.config.path.clone()) .directory(&self.log_path)
.basename("ffplayout") .basename("ffplayout")
.discriminant(channel), .discriminant(channel.to_string()),
) )
.format(file_formatter) .format(file_formatter)
.append() .append()
.rotate( .rotate(
Criterion::Age(Age::Day), Criterion::Age(Age::Day),
Naming::TimestampsDirect, Naming::Timestamps,
Cleanup::KeepLogFiles(self.config.backup_count), Cleanup::KeepLogFiles(ARGS.log_backup_count.unwrap_or(14)),
) )
.try_build() .try_build()
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
writers.insert(channel.to_string(), Arc::new(Mutex::new(writer))); writers.insert(channel, Arc::new(Mutex::new(writer)));
} }
Ok(writers.get(channel).unwrap().clone()) Ok(writers.get(&channel).unwrap().clone())
} }
} }
impl LogWriter for MultiFileLogger { impl LogWriter for MultiFileLogger {
fn write(&self, now: &mut DeferredNow, record: &Record) -> io::Result<()> { fn write(&self, now: &mut DeferredNow, record: &Record) -> io::Result<()> {
let channel = record let channel = i32::try_from(
record
.key_values() .key_values()
.get("channel".into()) .get("channel".into())
.unwrap_or(Value::null()) .unwrap_or(Value::null())
.to_string(); .to_i64()
let writer = self.get_writer(&channel); .unwrap_or(0),
)
.unwrap_or(0);
let writer = self.get_writer(channel);
let w = writer?.lock().unwrap().write(now, record); let w = writer?.lock().unwrap().write(now, record);
w w
@ -97,20 +101,125 @@ impl LogWriter for MultiFileLogger {
} }
} }
pub struct LogMailer {
pub messages: Arc<Mutex<Vec<MailMessage>>>,
}
impl LogMailer {
pub fn new(messages: Arc<Mutex<Vec<MailMessage>>>) -> Self {
Self { messages }
}
fn push(&self, msg: MailMessage) {
if let Ok(mut list) = self.messages.lock() {
list.push(msg)
}
}
}
impl LogWriter for LogMailer {
fn write(&self, now: &mut DeferredNow, record: &Record<'_>) -> std::io::Result<()> {
let id = i32::try_from(
record
.key_values()
.get("channel".into())
.unwrap_or(Value::null())
.to_i64()
.unwrap_or(0),
)
.unwrap_or(0);
let msg = MailMessage::new(
id,
record.level(),
format!(
"[{}] [{:>5}] {}",
now.now().format("%Y-%m-%d %H:%M:%S"),
record.level(),
record.args()
),
);
self.push(msg.clone());
Ok(())
}
fn flush(&self) -> std::io::Result<()> {
Ok(())
}
}
#[derive(Clone, Debug)]
pub struct MailQueue {
pub id: i32,
pub expire: u64,
pub config: Mail,
pub lines: Vec<String>,
}
impl MailQueue {
pub fn new(id: i32, expire: u64, config: Mail) -> Self {
Self {
id,
expire,
config,
lines: vec![],
}
}
pub fn update(&mut self, expire: u64, config: Mail) {
self.expire = expire;
self.config = config;
}
pub fn clear(&mut self) {
self.lines.clear();
}
fn text(&self) -> String {
self.lines.join("\n")
}
fn is_empty(&self) -> bool {
self.lines.is_empty()
}
}
#[derive(Clone, Debug)]
pub struct MailMessage {
pub id: i32,
pub level: Level,
pub line: String,
}
impl MailMessage {
pub fn new(id: i32, level: Level, line: String) -> Self {
Self { id, level, line }
}
fn eq(&self, level: Level) -> bool {
match level {
Level::Error => self.level == Level::Error,
Level::Warn => matches!(self.level, Level::Warn | Level::Error),
Level::Info => matches!(self.level, Level::Info | Level::Warn | Level::Error),
_ => false,
}
}
}
fn console_formatter(w: &mut dyn Write, _now: &mut DeferredNow, record: &Record) -> io::Result<()> { fn console_formatter(w: &mut dyn Write, _now: &mut DeferredNow, record: &Record) -> io::Result<()> {
let level = match record.level() { let level = match record.level() {
Level::Debug => colorize_string("<bright magenta>[DEBUG]</>"), Level::Debug => "<bright-blue>[DEBUG]</>",
Level::Error => colorize_string("<bright red>[ERROR]</>"), Level::Error => "<bright-red>[ERROR]</>",
Level::Info => colorize_string("<bright green>[ INFO]</>"), Level::Info => "<bright-green>[ INFO]</>",
Level::Trace => colorize_string("<bright yellow>[TRACE]</>"), Level::Trace => "<bright-yellow>[TRACE]</>",
Level::Warn => colorize_string("<yellow>[ WARN]</>"), Level::Warn => "<yellow>[ WARN]</>",
}; };
write!( write!(
w, w,
"{} {}", "{}",
level, colorize_string(format!("{level} {}", record.args()))
colorize_string(record.args().to_string()),
) )
} }
@ -128,20 +237,28 @@ fn file_formatter(
) )
} }
fn file_logger(config: &Logging) -> Box<dyn LogWriter> { fn file_logger() -> Box<dyn LogWriter> {
if config.log_to_file { let mut log_path = ARGS
let logger = MultiFileLogger::new(config); .log_path
.clone()
.unwrap_or(PathBuf::from("/var/log/ffplayout"));
if !log_path.is_dir() {
log_path = env::current_dir().unwrap();
}
if ARGS.log_to_console {
Box::new(LogConsole)
} else {
let logger = MultiFileLogger::new(log_path);
Box::new(logger) Box::new(logger)
} else {
Box::new(LogConsole)
} }
} }
/// send log messages to mail recipient /// send log messages to mail recipient
pub async fn send_mail(config: &PlayoutConfig, msg: String) { pub async fn send_mail(config: &Mail, msg: String) -> Result<(), ProcessError> {
let recipient = config let recipient = config
.mail
.recipient .recipient
.split_terminator([',', ';', ' ']) .split_terminator([',', ';', ' '])
.filter(|s| s.contains('@')) .filter(|s| s.contains('@'))
@ -149,61 +266,89 @@ pub async fn send_mail(config: &PlayoutConfig, msg: String) {
.collect::<Vec<&str>>(); .collect::<Vec<&str>>();
let mut message = Message::builder() let mut message = Message::builder()
.from(config.mail.sender_addr.parse().unwrap()) .from(config.sender_addr.parse()?)
.subject(&config.mail.subject) .subject(&config.subject)
.header(header::ContentType::TEXT_PLAIN); .header(header::ContentType::TEXT_PLAIN);
for r in recipient { for r in recipient {
message = message.to(r.parse().unwrap()); message = message.to(r.parse()?);
} }
if let Ok(mail) = message.body(msg) { let mail = message.body(msg)?;
let credentials = Credentials::new( let credentials = Credentials::new(config.sender_addr.clone(), config.sender_pass.clone());
config.mail.sender_addr.clone(),
config.mail.sender_pass.clone(),
);
let mut transporter = let mut transporter =
AsyncSmtpTransport::<Tokio1Executor>::relay(config.mail.smtp_server.clone().as_str()); AsyncSmtpTransport::<Tokio1Executor>::relay(config.smtp_server.clone().as_str());
if config.mail.starttls { if config.starttls {
transporter = AsyncSmtpTransport::<Tokio1Executor>::starttls_relay( transporter = AsyncSmtpTransport::<Tokio1Executor>::starttls_relay(
config.mail.smtp_server.clone().as_str(), config.smtp_server.clone().as_str(),
); );
} }
let mailer = transporter.unwrap().credentials(credentials).build(); let mailer = transporter?.credentials(credentials).build();
// Send the mail // Send the mail
if let Err(e) = mailer.send(&mail).await? { mailer.send(mail).await?;
error!(target: "{file}", channel = 1; "Could not send mail: {e}");
} Ok(())
} else {
error!(target: "{file}", channel = 1; "Mail Message failed!");
}
} }
/// Basic Mail Queue /// Basic Mail Queue
/// ///
/// Check every give seconds for messages and send them. /// Check every give seconds for messages and send them.
fn mail_queue(config: PlayoutConfig, messages: Arc<Mutex<Vec<String>>>) { pub fn mail_queue(mail_queues: Arc<Mutex<Vec<MailQueue>>>, messages: Arc<Mutex<Vec<MailMessage>>>) {
let sec = config.mail.interval;
actix_web::rt::spawn(async move { actix_web::rt::spawn(async move {
let sec = 10;
let mut interval = interval(Duration::from_secs(sec)); let mut interval = interval(Duration::from_secs(sec));
let mut counter = 0;
loop { loop {
let mut msg = messages.lock().unwrap(); interval.tick().await;
if msg.len() > 0 { // Reset the counter after one day
send_mail(&config, msg.join("\n")).await; if counter >= 86400 {
counter = 0;
msg.clear(); } else {
counter += sec;
} }
drop(msg); let mut msg_list = match mail_queues.lock() {
Ok(l) => l,
Err(e) => {
error!("Failed to lock mail_queues {e}");
continue;
}
};
interval.tick().await; let mut msgs = match messages.lock() {
Ok(m) => m,
Err(e) => {
error!("Failed to lock messages {e}");
continue;
}
};
while let Some(msg) = msgs.pop() {
if let Some(queue) = msg_list.iter_mut().find(|q| q.id == msg.id) {
if msg.eq(queue.config.mail_level) {
queue.lines.push(msg.line.clone());
}
}
}
// Process mail queues and send emails
for queue in msg_list.iter_mut() {
let interval = round_to_nearest_ten(counter);
if interval % queue.expire == 0 && !queue.is_empty() {
if let Err(e) = send_mail(&queue.config, queue.text()).await {
error!(target: "{file}", channel = queue.id; "Failed to send mail: {e}");
}
// Clear the messages after sending the email
queue.clear();
}
}
} }
}); });
} }
@ -214,16 +359,44 @@ fn mail_queue(config: PlayoutConfig, messages: Arc<Mutex<Vec<String>>>) {
/// - file logger /// - file logger
/// - mail logger /// - mail logger
pub fn init_logging( pub fn init_logging(
config: &PlayoutConfig, mail_queues: Arc<Mutex<Vec<MailQueue>>>,
proc_ctl: Option<ProcessControl>, messages: Arc<Mutex<Vec<MailMessage>>>,
messages: Option<Arc<Mutex<Vec<String>>>>,
) -> io::Result<()> { ) -> io::Result<()> {
Logger::try_with_str(config.logging.level.as_str()) let log_level = match ARGS
.map_err(|e| io::Error::new(ErrorKind::Other, e.to_string()))? .log_level
.clone()
.unwrap_or("debug".to_string())
.to_lowercase()
.as_str()
{
"debug" => LevelFilter::Debug,
"error" => LevelFilter::Error,
"info" => LevelFilter::Info,
"trace" => LevelFilter::Trace,
"warn" => LevelFilter::Warn,
"off" => LevelFilter::Off,
_ => LevelFilter::Debug,
};
mail_queue(mail_queues, messages.clone());
// Build the initial log specification
let mut builder = LogSpecification::builder();
builder
.default(log_level)
.module("hyper", LevelFilter::Error)
.module("libc", LevelFilter::Error)
.module("neli", LevelFilter::Error)
.module("reqwest", LevelFilter::Error)
.module("rustls", LevelFilter::Error)
.module("serial_test", LevelFilter::Error)
.module("sqlx", LevelFilter::Error);
Logger::with(builder.build())
.format(console_formatter) .format(console_formatter)
.log_to_stderr() .log_to_stderr()
.add_writer("file", file_logger(&config.logging)) .add_writer("file", file_logger())
// .add_writer("Mail", Box::new(LogMailer)) .add_writer("mail", Box::new(LogMailer::new(messages)))
.start() .start()
.map_err(|e| io::Error::new(ErrorKind::Other, e.to_string()))?; .map_err(|e| io::Error::new(ErrorKind::Other, e.to_string()))?;

View File

@ -126,7 +126,7 @@ impl GlobalSettings {
static INSTANCE: OnceCell<GlobalSettings> = OnceCell::new(); static INSTANCE: OnceCell<GlobalSettings> = OnceCell::new();
pub async fn init_config(conn: &Pool<Sqlite>) { pub async fn init_globales(conn: &Pool<Sqlite>) {
let config = GlobalSettings::new(conn).await; let config = GlobalSettings::new(conn).await;
INSTANCE.set(config).unwrap(); INSTANCE.set(config).unwrap();
} }
@ -407,3 +407,11 @@ pub fn free_tcp_socket(exclude_socket: String) -> Option<String> {
None None
} }
pub fn round_to_nearest_ten(num: u64) -> u64 {
if num % 10 >= 5 {
((num / 10) + 1) * 10
} else {
(num / 10) * 10
}
}