add mail queue, to send only every 30 seconds new error mails. get only one time a new playlist.
This commit is contained in:
parent
aad719656e
commit
8759198b0a
2
Cargo.lock
generated
2
Cargo.lock
generated
@ -143,7 +143,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "ffplayout-rs"
|
||||
version = "0.6.1"
|
||||
version = "0.7.0"
|
||||
dependencies = [
|
||||
"chrono",
|
||||
"clap",
|
||||
|
@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "ffplayout-rs"
|
||||
version = "0.6.1"
|
||||
version = "0.7.0"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
|
@ -7,8 +7,8 @@ use simplelog::*;
|
||||
use tokio::runtime::Handle;
|
||||
|
||||
use crate::utils::{
|
||||
check_sync, gen_dummy, get_delta, get_sec, is_close, json_reader::read_json, modified_time,
|
||||
seek_and_length, GlobalConfig, Media, DUMMY_LEN,
|
||||
check_sync, gen_dummy, get_date, get_delta, get_sec, is_close, json_reader::read_json,
|
||||
modified_time, seek_and_length, GlobalConfig, Media, DUMMY_LEN,
|
||||
};
|
||||
|
||||
#[derive(Debug)]
|
||||
@ -17,6 +17,7 @@ pub struct CurrentProgram {
|
||||
start_sec: f64,
|
||||
json_mod: Option<String>,
|
||||
json_path: Option<String>,
|
||||
json_date: String,
|
||||
nodes: Vec<Media>,
|
||||
current_node: Media,
|
||||
pub init: Arc<Mutex<bool>>,
|
||||
@ -35,6 +36,7 @@ impl CurrentProgram {
|
||||
start_sec: json.start_sec.unwrap(),
|
||||
json_mod: json.modified,
|
||||
json_path: json.current_file,
|
||||
json_date: json.date,
|
||||
nodes: json.program,
|
||||
current_node: Media::new(0, "".to_string()),
|
||||
init: Arc::new(Mutex::new(true)),
|
||||
@ -66,6 +68,11 @@ impl CurrentProgram {
|
||||
.eq(&self.json_mod.clone().unwrap())
|
||||
{
|
||||
// when playlist has changed, reload it
|
||||
info!(
|
||||
"Reload playlist <b><magenta>{}</></b>",
|
||||
self.json_path.clone().unwrap()
|
||||
);
|
||||
|
||||
let json = read_json(
|
||||
self.json_path.clone(),
|
||||
self.rt_handle.clone(),
|
||||
@ -107,10 +114,12 @@ impl CurrentProgram {
|
||||
}
|
||||
|
||||
let next_start = self.current_node.begin.unwrap() - start_sec + duration + delta;
|
||||
let date = get_date(false, start_sec, next_start);
|
||||
|
||||
if next_start >= target_length
|
||||
if (next_start >= target_length
|
||||
|| is_close(total_delta, 0.0, 2.0)
|
||||
|| is_close(total_delta, target_length, 2.0)
|
||||
|| is_close(total_delta, target_length, 2.0))
|
||||
&& date != self.json_date
|
||||
{
|
||||
let json = read_json(
|
||||
None,
|
||||
@ -122,6 +131,7 @@ impl CurrentProgram {
|
||||
|
||||
self.json_path = json.current_file.clone();
|
||||
self.json_mod = json.modified;
|
||||
self.json_date = json.date;
|
||||
self.nodes = json.program;
|
||||
self.index = 0;
|
||||
|
||||
@ -250,6 +260,7 @@ impl Iterator for CurrentProgram {
|
||||
self.check_for_next_playlist();
|
||||
Some(self.current_node.clone())
|
||||
} else {
|
||||
println!("last: {:?}", self.json_path);
|
||||
let last_playlist = self.json_path.clone();
|
||||
self.check_for_next_playlist();
|
||||
let (_, total_delta) = get_delta(&self.config.playlist.start_sec.unwrap());
|
||||
|
14
src/main.rs
14
src/main.rs
@ -1,13 +1,15 @@
|
||||
extern crate log;
|
||||
extern crate simplelog;
|
||||
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
mod filter;
|
||||
mod input;
|
||||
mod output;
|
||||
mod utils;
|
||||
|
||||
use simplelog::*;
|
||||
use tokio::runtime::Runtime;
|
||||
use tokio::runtime::Builder;
|
||||
|
||||
use crate::output::play;
|
||||
use crate::utils::{init_config, init_logging, validate_ffmpeg};
|
||||
@ -15,13 +17,17 @@ use crate::utils::{init_config, init_logging, validate_ffmpeg};
|
||||
fn main() {
|
||||
init_config();
|
||||
|
||||
let runtime = Runtime::new().unwrap();
|
||||
let runtime = Builder::new_multi_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.unwrap();
|
||||
let rt_handle = runtime.handle();
|
||||
let is_terminated: Arc<Mutex<bool>> = Arc::new(Mutex::new(false));
|
||||
|
||||
let logging = init_logging(rt_handle.clone());
|
||||
let logging = init_logging(rt_handle.clone(), is_terminated.clone());
|
||||
CombinedLogger::init(logging).unwrap();
|
||||
|
||||
validate_ffmpeg();
|
||||
|
||||
play(rt_handle);
|
||||
play(rt_handle, is_terminated);
|
||||
}
|
||||
|
@ -77,13 +77,12 @@ impl Drop for ProcessCleanup {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn play(rt_handle: &Handle) {
|
||||
pub fn play(rt_handle: &Handle, is_terminated: Arc<Mutex<bool>>) {
|
||||
let config = GlobalConfig::global();
|
||||
let dec_settings = config.processing.clone().settings.unwrap();
|
||||
let ff_log_format = format!("level+{}", config.logging.ffmpeg_level.to_lowercase());
|
||||
|
||||
let server_term: Arc<Mutex<Option<Terminator>>> = Arc::new(Mutex::new(None));
|
||||
let is_terminated: Arc<Mutex<bool>> = Arc::new(Mutex::new(false));
|
||||
let server_is_running: Arc<Mutex<bool>> = Arc::new(Mutex::new(false));
|
||||
let mut init_playlist: Option<Arc<Mutex<bool>>> = None;
|
||||
let mut live_on = false;
|
||||
|
@ -1,5 +1,9 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::{fs::File, path::Path, sync::{Arc, Mutex}};
|
||||
use std::{
|
||||
fs::File,
|
||||
path::Path,
|
||||
sync::{Arc, Mutex},
|
||||
};
|
||||
|
||||
use simplelog::*;
|
||||
use tokio::runtime::Handle;
|
||||
@ -33,7 +37,13 @@ impl Playlist {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn read_json(path: Option<String>, rt_handle: Handle, is_terminated: Arc<Mutex<bool>>, seek: bool, next_start: f64) -> Playlist {
|
||||
pub fn read_json(
|
||||
path: Option<String>,
|
||||
rt_handle: Handle,
|
||||
is_terminated: Arc<Mutex<bool>>,
|
||||
seek: bool,
|
||||
next_start: f64,
|
||||
) -> Playlist {
|
||||
let config = GlobalConfig::global();
|
||||
|
||||
let mut playlist_path = Path::new(&config.playlist.path).to_owned();
|
||||
@ -86,7 +96,11 @@ pub fn read_json(path: Option<String>, rt_handle: Handle, is_terminated: Arc<Mut
|
||||
start_sec += item.out - item.seek;
|
||||
}
|
||||
|
||||
rt_handle.spawn(validate_playlist(playlist.clone(), is_terminated, config.clone()));
|
||||
rt_handle.spawn(validate_playlist(
|
||||
playlist.clone(),
|
||||
is_terminated,
|
||||
config.clone(),
|
||||
));
|
||||
|
||||
playlist
|
||||
}
|
||||
|
@ -5,6 +5,8 @@ use regex::Regex;
|
||||
use std::{
|
||||
path::Path,
|
||||
sync::{Arc, Mutex},
|
||||
thread::sleep,
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use file_rotate::{compression::Compression, suffix::AppendCount, ContentLimit, FileRotate};
|
||||
@ -13,52 +15,10 @@ use log::{Level, LevelFilter, Log, Metadata, Record};
|
||||
use simplelog::*;
|
||||
use tokio::runtime::Handle;
|
||||
|
||||
use crate::utils::{get_timestamp, GlobalConfig};
|
||||
use crate::utils::GlobalConfig;
|
||||
|
||||
pub struct Timer {
|
||||
init: Arc<Mutex<bool>>,
|
||||
timestamp: Arc<Mutex<i64>>,
|
||||
limit: i64,
|
||||
messages: Arc<Mutex<Vec<String>>>,
|
||||
rt_handle: Handle,
|
||||
}
|
||||
|
||||
impl Timer {
|
||||
fn new(rt_handle: Handle) -> Self {
|
||||
Self {
|
||||
init: Arc::new(Mutex::new(true)),
|
||||
timestamp: Arc::new(Mutex::new(get_timestamp())),
|
||||
limit: 30 * 1000,
|
||||
messages: Arc::new(Mutex::new(vec![])),
|
||||
rt_handle,
|
||||
}
|
||||
}
|
||||
|
||||
fn reset(&self) {
|
||||
self.messages.lock().unwrap().clear();
|
||||
*self.timestamp.lock().unwrap() = get_timestamp();
|
||||
}
|
||||
|
||||
fn queue(&self, msg: String) {
|
||||
let now = get_timestamp();
|
||||
self.messages.lock().unwrap().push(msg);
|
||||
|
||||
if *self.init.lock().unwrap() {
|
||||
self.reset();
|
||||
*self.init.lock().unwrap() = false;
|
||||
}
|
||||
|
||||
if now >= *self.timestamp.lock().unwrap() + self.limit {
|
||||
self.rt_handle.spawn(send_mail(self.messages.lock().unwrap().clone()));
|
||||
|
||||
self.reset();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn send_mail(messages: Vec<String>) {
|
||||
fn send_mail(msg: String) {
|
||||
let config = GlobalConfig::global();
|
||||
let msg = messages.join("\n");
|
||||
|
||||
let email = Message::builder()
|
||||
.from(config.mail.sender_addr.parse().unwrap())
|
||||
@ -87,18 +47,43 @@ async fn send_mail(messages: Vec<String>) {
|
||||
}
|
||||
}
|
||||
|
||||
async fn mail_queue(messages: Arc<Mutex<Vec<String>>>, is_terminated: Arc<Mutex<bool>>) {
|
||||
let mut count = 0;
|
||||
|
||||
loop {
|
||||
if *is_terminated.lock().unwrap() || count == 60 {
|
||||
// check every 30 seconds for messages and send them
|
||||
if messages.lock().unwrap().len() > 0 {
|
||||
let msg = messages.lock().unwrap().join("\n");
|
||||
send_mail(msg);
|
||||
|
||||
messages.lock().unwrap().clear();
|
||||
}
|
||||
|
||||
count = 0;
|
||||
}
|
||||
|
||||
if *is_terminated.lock().unwrap() {
|
||||
break;
|
||||
}
|
||||
|
||||
sleep(Duration::from_millis(500));
|
||||
count += 1;
|
||||
}
|
||||
}
|
||||
|
||||
pub struct LogMailer {
|
||||
level: LevelFilter,
|
||||
config: Config,
|
||||
timer: Timer,
|
||||
messages: Arc<Mutex<Vec<String>>>,
|
||||
}
|
||||
|
||||
impl LogMailer {
|
||||
pub fn new(log_level: LevelFilter, config: Config, timer: Timer) -> Box<LogMailer> {
|
||||
pub fn new(log_level: LevelFilter, config: Config, messages: Arc<Mutex<Vec<String>>>) -> Box<LogMailer> {
|
||||
Box::new(LogMailer {
|
||||
level: log_level,
|
||||
config,
|
||||
timer,
|
||||
messages,
|
||||
})
|
||||
}
|
||||
}
|
||||
@ -112,10 +97,10 @@ impl Log for LogMailer {
|
||||
if self.enabled(record.metadata()) {
|
||||
match record.level() {
|
||||
Level::Error => {
|
||||
self.timer.queue(record.args().to_string());
|
||||
self.messages.lock().unwrap().push(record.args().to_string());
|
||||
}
|
||||
Level::Warn => {
|
||||
self.timer.queue(record.args().to_string());
|
||||
self.messages.lock().unwrap().push(record.args().to_string());
|
||||
}
|
||||
_ => (),
|
||||
}
|
||||
@ -145,7 +130,10 @@ fn clean_string(text: String) -> String {
|
||||
regex.replace_all(text.as_str(), "").to_string()
|
||||
}
|
||||
|
||||
pub fn init_logging(rt_handle: Handle) -> Vec<Box<dyn SharedLogger>> {
|
||||
pub fn init_logging(
|
||||
rt_handle: Handle,
|
||||
is_terminated: Arc<Mutex<bool>>,
|
||||
) -> Vec<Box<dyn SharedLogger>> {
|
||||
let config = GlobalConfig::global();
|
||||
let app_config = config.logging.clone();
|
||||
let mut time_level = LevelFilter::Off;
|
||||
@ -211,7 +199,12 @@ pub fn init_logging(rt_handle: Handle) -> Vec<Box<dyn SharedLogger>> {
|
||||
|
||||
if config.mail.recipient.len() > 3 {
|
||||
let mut filter = LevelFilter::Error;
|
||||
let timer = Timer::new(rt_handle);
|
||||
let messages: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
|
||||
|
||||
rt_handle.spawn(mail_queue(
|
||||
messages.clone(),
|
||||
is_terminated.clone(),
|
||||
));
|
||||
|
||||
let mail_config = log_config
|
||||
.clone()
|
||||
@ -222,7 +215,7 @@ pub fn init_logging(rt_handle: Handle) -> Vec<Box<dyn SharedLogger>> {
|
||||
filter = LevelFilter::Warn
|
||||
}
|
||||
|
||||
app_logger.push(LogMailer::new(filter, mail_config, timer));
|
||||
app_logger.push(LogMailer::new(filter, mail_config, messages));
|
||||
}
|
||||
|
||||
app_logger
|
||||
|
@ -151,11 +151,11 @@ impl MediaProbe {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_timestamp() -> i64 {
|
||||
let local: DateTime<Local> = Local::now();
|
||||
// pub fn get_timestamp() -> i64 {
|
||||
// let local: DateTime<Local> = Local::now();
|
||||
|
||||
local.timestamp_millis() as i64
|
||||
}
|
||||
// local.timestamp_millis() as i64
|
||||
// }
|
||||
|
||||
pub fn get_sec() -> f64 {
|
||||
let local: DateTime<Local> = Local::now();
|
||||
|
Loading…
x
Reference in New Issue
Block a user