diff --git a/Cargo.lock b/Cargo.lock index e2d19de5..fb5a48f6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -143,7 +143,7 @@ dependencies = [ [[package]] name = "ffplayout-rs" -version = "0.6.1" +version = "0.7.0" dependencies = [ "chrono", "clap", diff --git a/Cargo.toml b/Cargo.toml index f49cb065..77c320b3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ffplayout-rs" -version = "0.6.1" +version = "0.7.0" edition = "2021" [dependencies] diff --git a/src/input/playlist.rs b/src/input/playlist.rs index 8fdf3fee..d0cb37a8 100644 --- a/src/input/playlist.rs +++ b/src/input/playlist.rs @@ -7,8 +7,8 @@ use simplelog::*; use tokio::runtime::Handle; use crate::utils::{ - check_sync, gen_dummy, get_delta, get_sec, is_close, json_reader::read_json, modified_time, - seek_and_length, GlobalConfig, Media, DUMMY_LEN, + check_sync, gen_dummy, get_date, get_delta, get_sec, is_close, json_reader::read_json, + modified_time, seek_and_length, GlobalConfig, Media, DUMMY_LEN, }; #[derive(Debug)] @@ -17,6 +17,7 @@ pub struct CurrentProgram { start_sec: f64, json_mod: Option, json_path: Option, + json_date: String, nodes: Vec, current_node: Media, pub init: Arc>, @@ -35,6 +36,7 @@ impl CurrentProgram { start_sec: json.start_sec.unwrap(), json_mod: json.modified, json_path: json.current_file, + json_date: json.date, nodes: json.program, current_node: Media::new(0, "".to_string()), init: Arc::new(Mutex::new(true)), @@ -66,6 +68,11 @@ impl CurrentProgram { .eq(&self.json_mod.clone().unwrap()) { // when playlist has changed, reload it + info!( + "Reload playlist {}", + self.json_path.clone().unwrap() + ); + let json = read_json( self.json_path.clone(), self.rt_handle.clone(), @@ -107,10 +114,12 @@ impl CurrentProgram { } let next_start = self.current_node.begin.unwrap() - start_sec + duration + delta; + let date = get_date(false, start_sec, next_start); - if next_start >= target_length + if (next_start >= target_length || is_close(total_delta, 0.0, 2.0) - || is_close(total_delta, target_length, 2.0) + || is_close(total_delta, target_length, 2.0)) + && date != self.json_date { let json = read_json( None, @@ -122,6 +131,7 @@ impl CurrentProgram { self.json_path = json.current_file.clone(); self.json_mod = json.modified; + self.json_date = json.date; self.nodes = json.program; self.index = 0; @@ -250,6 +260,7 @@ impl Iterator for CurrentProgram { self.check_for_next_playlist(); Some(self.current_node.clone()) } else { + println!("last: {:?}", self.json_path); let last_playlist = self.json_path.clone(); self.check_for_next_playlist(); let (_, total_delta) = get_delta(&self.config.playlist.start_sec.unwrap()); diff --git a/src/main.rs b/src/main.rs index 058c295d..b61c759e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,13 +1,15 @@ extern crate log; extern crate simplelog; +use std::sync::{Arc, Mutex}; + mod filter; mod input; mod output; mod utils; use simplelog::*; -use tokio::runtime::Runtime; +use tokio::runtime::Builder; use crate::output::play; use crate::utils::{init_config, init_logging, validate_ffmpeg}; @@ -15,13 +17,17 @@ use crate::utils::{init_config, init_logging, validate_ffmpeg}; fn main() { init_config(); - let runtime = Runtime::new().unwrap(); + let runtime = Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(); let rt_handle = runtime.handle(); + let is_terminated: Arc> = Arc::new(Mutex::new(false)); - let logging = init_logging(rt_handle.clone()); + let logging = init_logging(rt_handle.clone(), is_terminated.clone()); CombinedLogger::init(logging).unwrap(); validate_ffmpeg(); - play(rt_handle); + play(rt_handle, is_terminated); } diff --git a/src/output/mod.rs b/src/output/mod.rs index 95b524bc..2fadc3ae 100644 --- a/src/output/mod.rs +++ b/src/output/mod.rs @@ -77,13 +77,12 @@ impl Drop for ProcessCleanup { } } -pub fn play(rt_handle: &Handle) { +pub fn play(rt_handle: &Handle, is_terminated: Arc>) { let config = GlobalConfig::global(); let dec_settings = config.processing.clone().settings.unwrap(); let ff_log_format = format!("level+{}", config.logging.ffmpeg_level.to_lowercase()); let server_term: Arc>> = Arc::new(Mutex::new(None)); - let is_terminated: Arc> = Arc::new(Mutex::new(false)); let server_is_running: Arc> = Arc::new(Mutex::new(false)); let mut init_playlist: Option>> = None; let mut live_on = false; diff --git a/src/utils/json_reader.rs b/src/utils/json_reader.rs index 488a94e3..3f43a597 100644 --- a/src/utils/json_reader.rs +++ b/src/utils/json_reader.rs @@ -1,5 +1,9 @@ use serde::{Deserialize, Serialize}; -use std::{fs::File, path::Path, sync::{Arc, Mutex}}; +use std::{ + fs::File, + path::Path, + sync::{Arc, Mutex}, +}; use simplelog::*; use tokio::runtime::Handle; @@ -33,7 +37,13 @@ impl Playlist { } } -pub fn read_json(path: Option, rt_handle: Handle, is_terminated: Arc>, seek: bool, next_start: f64) -> Playlist { +pub fn read_json( + path: Option, + rt_handle: Handle, + is_terminated: Arc>, + seek: bool, + next_start: f64, +) -> Playlist { let config = GlobalConfig::global(); let mut playlist_path = Path::new(&config.playlist.path).to_owned(); @@ -86,7 +96,11 @@ pub fn read_json(path: Option, rt_handle: Handle, is_terminated: Arc>, - timestamp: Arc>, - limit: i64, - messages: Arc>>, - rt_handle: Handle, -} - -impl Timer { - fn new(rt_handle: Handle) -> Self { - Self { - init: Arc::new(Mutex::new(true)), - timestamp: Arc::new(Mutex::new(get_timestamp())), - limit: 30 * 1000, - messages: Arc::new(Mutex::new(vec![])), - rt_handle, - } - } - - fn reset(&self) { - self.messages.lock().unwrap().clear(); - *self.timestamp.lock().unwrap() = get_timestamp(); - } - - fn queue(&self, msg: String) { - let now = get_timestamp(); - self.messages.lock().unwrap().push(msg); - - if *self.init.lock().unwrap() { - self.reset(); - *self.init.lock().unwrap() = false; - } - - if now >= *self.timestamp.lock().unwrap() + self.limit { - self.rt_handle.spawn(send_mail(self.messages.lock().unwrap().clone())); - - self.reset(); - } - } -} - -async fn send_mail(messages: Vec) { +fn send_mail(msg: String) { let config = GlobalConfig::global(); - let msg = messages.join("\n"); let email = Message::builder() .from(config.mail.sender_addr.parse().unwrap()) @@ -87,18 +47,43 @@ async fn send_mail(messages: Vec) { } } +async fn mail_queue(messages: Arc>>, is_terminated: Arc>) { + let mut count = 0; + + loop { + if *is_terminated.lock().unwrap() || count == 60 { + // check every 30 seconds for messages and send them + if messages.lock().unwrap().len() > 0 { + let msg = messages.lock().unwrap().join("\n"); + send_mail(msg); + + messages.lock().unwrap().clear(); + } + + count = 0; + } + + if *is_terminated.lock().unwrap() { + break; + } + + sleep(Duration::from_millis(500)); + count += 1; + } +} + pub struct LogMailer { level: LevelFilter, config: Config, - timer: Timer, + messages: Arc>>, } impl LogMailer { - pub fn new(log_level: LevelFilter, config: Config, timer: Timer) -> Box { + pub fn new(log_level: LevelFilter, config: Config, messages: Arc>>) -> Box { Box::new(LogMailer { level: log_level, config, - timer, + messages, }) } } @@ -112,10 +97,10 @@ impl Log for LogMailer { if self.enabled(record.metadata()) { match record.level() { Level::Error => { - self.timer.queue(record.args().to_string()); + self.messages.lock().unwrap().push(record.args().to_string()); } Level::Warn => { - self.timer.queue(record.args().to_string()); + self.messages.lock().unwrap().push(record.args().to_string()); } _ => (), } @@ -145,7 +130,10 @@ fn clean_string(text: String) -> String { regex.replace_all(text.as_str(), "").to_string() } -pub fn init_logging(rt_handle: Handle) -> Vec> { +pub fn init_logging( + rt_handle: Handle, + is_terminated: Arc>, +) -> Vec> { let config = GlobalConfig::global(); let app_config = config.logging.clone(); let mut time_level = LevelFilter::Off; @@ -211,7 +199,12 @@ pub fn init_logging(rt_handle: Handle) -> Vec> { if config.mail.recipient.len() > 3 { let mut filter = LevelFilter::Error; - let timer = Timer::new(rt_handle); + let messages: Arc>> = Arc::new(Mutex::new(Vec::new())); + + rt_handle.spawn(mail_queue( + messages.clone(), + is_terminated.clone(), + )); let mail_config = log_config .clone() @@ -222,7 +215,7 @@ pub fn init_logging(rt_handle: Handle) -> Vec> { filter = LevelFilter::Warn } - app_logger.push(LogMailer::new(filter, mail_config, timer)); + app_logger.push(LogMailer::new(filter, mail_config, messages)); } app_logger diff --git a/src/utils/mod.rs b/src/utils/mod.rs index f485558e..ca93036c 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -151,11 +151,11 @@ impl MediaProbe { } } -pub fn get_timestamp() -> i64 { - let local: DateTime = Local::now(); +// pub fn get_timestamp() -> i64 { +// let local: DateTime = Local::now(); - local.timestamp_millis() as i64 -} +// local.timestamp_millis() as i64 +// } pub fn get_sec() -> f64 { let local: DateTime = Local::now();