From 95c9d18b69a702c7f9ca34b6a206380fdd7f8b12 Mon Sep 17 00:00:00 2001 From: jb-alvarado Date: Sun, 27 Mar 2022 22:00:25 +0200 Subject: [PATCH 1/4] cross link config should be in user settings, not in repo --- .cargo/config | 7 ------- docs/developer.md | 12 ++++++++++++ 2 files changed, 12 insertions(+), 7 deletions(-) delete mode 100644 .cargo/config diff --git a/.cargo/config b/.cargo/config deleted file mode 100644 index 4391721d..00000000 --- a/.cargo/config +++ /dev/null @@ -1,7 +0,0 @@ -[target.x86_64-apple-darwin] -linker = "x86_64-apple-darwin20.4-clang" -ar = "x86_64-apple-darwin20.4-ar" - -[target.aarch64-apple-darwin] -linker = "aarch64-apple-darwin20.4-clang" -ar = "aarch64-apple-darwin20.4-ar" diff --git a/docs/developer.md b/docs/developer.md index a6e6bded..749a3e95 100644 --- a/docs/developer.md +++ b/docs/developer.md @@ -41,6 +41,18 @@ rustup target add aarch64-apple-darwin rustup target add x86_64-apple-darwin ``` +Add linker and ar settings to `~/.cargo/config`: + +```Bash +[target.x86_64-apple-darwin] +linker = "x86_64-apple-darwin20.4-clang" +ar = "x86_64-apple-darwin20.4-ar" + +[target.aarch64-apple-darwin] +linker = "aarch64-apple-darwin20.4-clang" +ar = "aarch64-apple-darwin20.4-ar" +``` + Follow this guide: [rust-cross-compile-linux-to-macos](https://wapl.es/rust/2019/02/17/rust-cross-compile-linux-to-macos.html) Or setup [osxcross](https://github.com/tpoechtrager/osxcross) correctly. From f54d29b977b81f279796c735dc95ef3859ac5778 Mon Sep 17 00:00:00 2001 From: jb-alvarado Date: Sun, 27 Mar 2022 22:00:39 +0200 Subject: [PATCH 2/4] work on timer --- examples/timer.rs | 59 +++++++++++++++++ src/utils/logging.rs | 148 ++++++++++++++++++++++++++++--------------- src/utils/mod.rs | 8 +-- 3 files changed, 160 insertions(+), 55 deletions(-) create mode 100644 examples/timer.rs diff --git a/examples/timer.rs b/examples/timer.rs new file mode 100644 index 00000000..90965752 --- /dev/null +++ b/examples/timer.rs @@ -0,0 +1,59 @@ +use chrono::prelude::*; +use std::{ + thread::sleep}; + +fn get_timestamp() -> i64 { + let local: DateTime = Local::now(); + + local.timestamp_millis() as i64 +} + +struct Timer { + init: bool, + timestamp: i64, + limit: i64, + messages: Vec, +} + +impl Timer { + fn new() -> Self { + Self { + init: true, + timestamp: get_timestamp(), + limit: 10 * 1000, + messages: vec![], + } + } + + fn reset(&mut self) { + self.messages.clear(); + self.timestamp = get_timestamp(); + } + + fn send(&mut self, msg: String) { + let now = get_timestamp(); + self.messages.push(msg); + + if self.init { + self.reset(); + self.init = false; + } + + if now >= self.timestamp + self.limit { + println!("Send messages: {:?}", self.messages); + + self.reset(); + } + } +} + +fn main() { + let mut timer = Timer::new(); + + for i in 0..40 { + println!("{:?}", i); + timer.send(format!("{:?}", i)); + + sleep(std::time::Duration::from_secs(1)); + } +} diff --git a/src/utils/logging.rs b/src/utils/logging.rs index a8df6ccb..7fc779c9 100644 --- a/src/utils/logging.rs +++ b/src/utils/logging.rs @@ -2,7 +2,10 @@ extern crate log; extern crate simplelog; use regex::Regex; -use std::path::Path; +use std::{ + path::Path, + sync::{Arc, Mutex}, +}; use file_rotate::{compression::Compression, suffix::AppendCount, ContentLimit, FileRotate}; use lettre::{transport::smtp::authentication::Credentials, Message, SmtpTransport, Transport}; @@ -10,68 +13,52 @@ use log::{Level, LevelFilter, Log, Metadata, Record}; use simplelog::*; use tokio::runtime::Handle; -use crate::utils::GlobalConfig; +use crate::utils::{get_timestamp, GlobalConfig}; -pub struct LogMailer { - level: LevelFilter, - config: Config, - handle: Handle, +pub struct Timer { + init: Arc>, + timestamp: Arc>, + limit: i64, + messages: Arc>>, + rt_handle: Handle, } -impl LogMailer { - pub fn new(log_level: LevelFilter, config: Config, handle: Handle) -> Box { - Box::new(LogMailer { - level: log_level, - config, - handle, - }) - } -} - -impl Log for LogMailer { - fn enabled(&self, metadata: &Metadata<'_>) -> bool { - metadata.level() <= self.level - } - - fn log(&self, record: &Record<'_>) { - if self.enabled(record.metadata()) { - match record.level() { - Level::Error => { - self.handle.spawn(send_mail(record.args().to_string())); - }, - Level::Warn => { - self.handle.spawn(send_mail(record.args().to_string())); - }, - _ => (), - } +impl Timer { + fn new(rt_handle: Handle) -> Self { + Self { + init: Arc::new(Mutex::new(true)), + timestamp: Arc::new(Mutex::new(get_timestamp())), + limit: 30 * 1000, + messages: Arc::new(Mutex::new(vec![])), + rt_handle, } } - fn flush(&self) {} -} - -impl SharedLogger for LogMailer { - fn level(&self) -> LevelFilter { - self.level + fn reset(&self) { + self.messages.lock().unwrap().clear(); + *self.timestamp.lock().unwrap() = get_timestamp(); } - fn config(&self) -> Option<&Config> { - Some(&self.config) - } + fn queue(&self, msg: String) { + let now = get_timestamp(); + self.messages.lock().unwrap().push(msg); - fn as_log(self: Box) -> Box { - Box::new(*self) + if *self.init.lock().unwrap() { + self.reset(); + *self.init.lock().unwrap() = false; + } + + if now >= *self.timestamp.lock().unwrap() + self.limit { + self.rt_handle.spawn(send_mail(self.messages.lock().unwrap().clone())); + + self.reset(); + } } } -fn clean_string(text: String) -> String { - let regex: Regex = Regex::new(r"\x1b\[[0-9;]*[mGKF]").unwrap(); - - regex.replace_all(text.as_str(), "").to_string() -} - -async fn send_mail(msg: String) { +async fn send_mail(messages: Vec) { let config = GlobalConfig::global(); + let msg = messages.join("\n"); let email = Message::builder() .from(config.mail.sender_addr.parse().unwrap()) @@ -100,6 +87,64 @@ async fn send_mail(msg: String) { } } +pub struct LogMailer { + level: LevelFilter, + config: Config, + timer: Timer, +} + +impl LogMailer { + pub fn new(log_level: LevelFilter, config: Config, timer: Timer) -> Box { + Box::new(LogMailer { + level: log_level, + config, + timer, + }) + } +} + +impl Log for LogMailer { + fn enabled(&self, metadata: &Metadata<'_>) -> bool { + metadata.level() <= self.level + } + + fn log(&self, record: &Record<'_>) { + if self.enabled(record.metadata()) { + match record.level() { + Level::Error => { + self.timer.queue(record.args().to_string()); + } + Level::Warn => { + self.timer.queue(record.args().to_string()); + } + _ => (), + } + } + } + + fn flush(&self) {} +} + +impl SharedLogger for LogMailer { + fn level(&self) -> LevelFilter { + self.level + } + + fn config(&self) -> Option<&Config> { + Some(&self.config) + } + + fn as_log(self: Box) -> Box { + Box::new(*self) + } +} + +fn clean_string(text: String) -> String { + let regex: Regex = Regex::new(r"\x1b\[[0-9;]*[mGKF]").unwrap(); + + regex.replace_all(text.as_str(), "").to_string() +} + pub fn init_logging(rt_handle: Handle) -> Vec> { let config = GlobalConfig::global(); let app_config = config.logging.clone(); @@ -166,6 +211,7 @@ pub fn init_logging(rt_handle: Handle) -> Vec> { if config.mail.recipient.len() > 3 { let mut filter = LevelFilter::Error; + let timer = Timer::new(rt_handle); let mail_config = log_config .clone() @@ -176,7 +222,7 @@ pub fn init_logging(rt_handle: Handle) -> Vec> { filter = LevelFilter::Warn } - app_logger.push(LogMailer::new(filter, mail_config, rt_handle)); + app_logger.push(LogMailer::new(filter, mail_config, timer)); } app_logger diff --git a/src/utils/mod.rs b/src/utils/mod.rs index ca93036c..f485558e 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -151,11 +151,11 @@ impl MediaProbe { } } -// pub fn get_timestamp() -> i64 { -// let local: DateTime = Local::now(); +pub fn get_timestamp() -> i64 { + let local: DateTime = Local::now(); -// local.timestamp_millis() as i64 -// } + local.timestamp_millis() as i64 +} pub fn get_sec() -> f64 { let local: DateTime = Local::now(); From 8759198b0a5380606f3b59d3407a08c53311a978 Mon Sep 17 00:00:00 2001 From: jb-alvarado Date: Mon, 28 Mar 2022 15:52:03 +0200 Subject: [PATCH 3/4] add mail queue, to send only every 30 seconds new error mails. get only one time a new playlist. --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/input/playlist.rs | 19 ++++++-- src/main.rs | 14 ++++-- src/output/mod.rs | 3 +- src/utils/json_reader.rs | 20 +++++++-- src/utils/logging.rs | 97 +++++++++++++++++++--------------------- src/utils/mod.rs | 8 ++-- 8 files changed, 94 insertions(+), 71 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e2d19de5..fb5a48f6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -143,7 +143,7 @@ dependencies = [ [[package]] name = "ffplayout-rs" -version = "0.6.1" +version = "0.7.0" dependencies = [ "chrono", "clap", diff --git a/Cargo.toml b/Cargo.toml index f49cb065..77c320b3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ffplayout-rs" -version = "0.6.1" +version = "0.7.0" edition = "2021" [dependencies] diff --git a/src/input/playlist.rs b/src/input/playlist.rs index 8fdf3fee..d0cb37a8 100644 --- a/src/input/playlist.rs +++ b/src/input/playlist.rs @@ -7,8 +7,8 @@ use simplelog::*; use tokio::runtime::Handle; use crate::utils::{ - check_sync, gen_dummy, get_delta, get_sec, is_close, json_reader::read_json, modified_time, - seek_and_length, GlobalConfig, Media, DUMMY_LEN, + check_sync, gen_dummy, get_date, get_delta, get_sec, is_close, json_reader::read_json, + modified_time, seek_and_length, GlobalConfig, Media, DUMMY_LEN, }; #[derive(Debug)] @@ -17,6 +17,7 @@ pub struct CurrentProgram { start_sec: f64, json_mod: Option, json_path: Option, + json_date: String, nodes: Vec, current_node: Media, pub init: Arc>, @@ -35,6 +36,7 @@ impl CurrentProgram { start_sec: json.start_sec.unwrap(), json_mod: json.modified, json_path: json.current_file, + json_date: json.date, nodes: json.program, current_node: Media::new(0, "".to_string()), init: Arc::new(Mutex::new(true)), @@ -66,6 +68,11 @@ impl CurrentProgram { .eq(&self.json_mod.clone().unwrap()) { // when playlist has changed, reload it + info!( + "Reload playlist {}", + self.json_path.clone().unwrap() + ); + let json = read_json( self.json_path.clone(), self.rt_handle.clone(), @@ -107,10 +114,12 @@ impl CurrentProgram { } let next_start = self.current_node.begin.unwrap() - start_sec + duration + delta; + let date = get_date(false, start_sec, next_start); - if next_start >= target_length + if (next_start >= target_length || is_close(total_delta, 0.0, 2.0) - || is_close(total_delta, target_length, 2.0) + || is_close(total_delta, target_length, 2.0)) + && date != self.json_date { let json = read_json( None, @@ -122,6 +131,7 @@ impl CurrentProgram { self.json_path = json.current_file.clone(); self.json_mod = json.modified; + self.json_date = json.date; self.nodes = json.program; self.index = 0; @@ -250,6 +260,7 @@ impl Iterator for CurrentProgram { self.check_for_next_playlist(); Some(self.current_node.clone()) } else { + println!("last: {:?}", self.json_path); let last_playlist = self.json_path.clone(); self.check_for_next_playlist(); let (_, total_delta) = get_delta(&self.config.playlist.start_sec.unwrap()); diff --git a/src/main.rs b/src/main.rs index 058c295d..b61c759e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,13 +1,15 @@ extern crate log; extern crate simplelog; +use std::sync::{Arc, Mutex}; + mod filter; mod input; mod output; mod utils; use simplelog::*; -use tokio::runtime::Runtime; +use tokio::runtime::Builder; use crate::output::play; use crate::utils::{init_config, init_logging, validate_ffmpeg}; @@ -15,13 +17,17 @@ use crate::utils::{init_config, init_logging, validate_ffmpeg}; fn main() { init_config(); - let runtime = Runtime::new().unwrap(); + let runtime = Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(); let rt_handle = runtime.handle(); + let is_terminated: Arc> = Arc::new(Mutex::new(false)); - let logging = init_logging(rt_handle.clone()); + let logging = init_logging(rt_handle.clone(), is_terminated.clone()); CombinedLogger::init(logging).unwrap(); validate_ffmpeg(); - play(rt_handle); + play(rt_handle, is_terminated); } diff --git a/src/output/mod.rs b/src/output/mod.rs index 95b524bc..2fadc3ae 100644 --- a/src/output/mod.rs +++ b/src/output/mod.rs @@ -77,13 +77,12 @@ impl Drop for ProcessCleanup { } } -pub fn play(rt_handle: &Handle) { +pub fn play(rt_handle: &Handle, is_terminated: Arc>) { let config = GlobalConfig::global(); let dec_settings = config.processing.clone().settings.unwrap(); let ff_log_format = format!("level+{}", config.logging.ffmpeg_level.to_lowercase()); let server_term: Arc>> = Arc::new(Mutex::new(None)); - let is_terminated: Arc> = Arc::new(Mutex::new(false)); let server_is_running: Arc> = Arc::new(Mutex::new(false)); let mut init_playlist: Option>> = None; let mut live_on = false; diff --git a/src/utils/json_reader.rs b/src/utils/json_reader.rs index 488a94e3..3f43a597 100644 --- a/src/utils/json_reader.rs +++ b/src/utils/json_reader.rs @@ -1,5 +1,9 @@ use serde::{Deserialize, Serialize}; -use std::{fs::File, path::Path, sync::{Arc, Mutex}}; +use std::{ + fs::File, + path::Path, + sync::{Arc, Mutex}, +}; use simplelog::*; use tokio::runtime::Handle; @@ -33,7 +37,13 @@ impl Playlist { } } -pub fn read_json(path: Option, rt_handle: Handle, is_terminated: Arc>, seek: bool, next_start: f64) -> Playlist { +pub fn read_json( + path: Option, + rt_handle: Handle, + is_terminated: Arc>, + seek: bool, + next_start: f64, +) -> Playlist { let config = GlobalConfig::global(); let mut playlist_path = Path::new(&config.playlist.path).to_owned(); @@ -86,7 +96,11 @@ pub fn read_json(path: Option, rt_handle: Handle, is_terminated: Arc>, - timestamp: Arc>, - limit: i64, - messages: Arc>>, - rt_handle: Handle, -} - -impl Timer { - fn new(rt_handle: Handle) -> Self { - Self { - init: Arc::new(Mutex::new(true)), - timestamp: Arc::new(Mutex::new(get_timestamp())), - limit: 30 * 1000, - messages: Arc::new(Mutex::new(vec![])), - rt_handle, - } - } - - fn reset(&self) { - self.messages.lock().unwrap().clear(); - *self.timestamp.lock().unwrap() = get_timestamp(); - } - - fn queue(&self, msg: String) { - let now = get_timestamp(); - self.messages.lock().unwrap().push(msg); - - if *self.init.lock().unwrap() { - self.reset(); - *self.init.lock().unwrap() = false; - } - - if now >= *self.timestamp.lock().unwrap() + self.limit { - self.rt_handle.spawn(send_mail(self.messages.lock().unwrap().clone())); - - self.reset(); - } - } -} - -async fn send_mail(messages: Vec) { +fn send_mail(msg: String) { let config = GlobalConfig::global(); - let msg = messages.join("\n"); let email = Message::builder() .from(config.mail.sender_addr.parse().unwrap()) @@ -87,18 +47,43 @@ async fn send_mail(messages: Vec) { } } +async fn mail_queue(messages: Arc>>, is_terminated: Arc>) { + let mut count = 0; + + loop { + if *is_terminated.lock().unwrap() || count == 60 { + // check every 30 seconds for messages and send them + if messages.lock().unwrap().len() > 0 { + let msg = messages.lock().unwrap().join("\n"); + send_mail(msg); + + messages.lock().unwrap().clear(); + } + + count = 0; + } + + if *is_terminated.lock().unwrap() { + break; + } + + sleep(Duration::from_millis(500)); + count += 1; + } +} + pub struct LogMailer { level: LevelFilter, config: Config, - timer: Timer, + messages: Arc>>, } impl LogMailer { - pub fn new(log_level: LevelFilter, config: Config, timer: Timer) -> Box { + pub fn new(log_level: LevelFilter, config: Config, messages: Arc>>) -> Box { Box::new(LogMailer { level: log_level, config, - timer, + messages, }) } } @@ -112,10 +97,10 @@ impl Log for LogMailer { if self.enabled(record.metadata()) { match record.level() { Level::Error => { - self.timer.queue(record.args().to_string()); + self.messages.lock().unwrap().push(record.args().to_string()); } Level::Warn => { - self.timer.queue(record.args().to_string()); + self.messages.lock().unwrap().push(record.args().to_string()); } _ => (), } @@ -145,7 +130,10 @@ fn clean_string(text: String) -> String { regex.replace_all(text.as_str(), "").to_string() } -pub fn init_logging(rt_handle: Handle) -> Vec> { +pub fn init_logging( + rt_handle: Handle, + is_terminated: Arc>, +) -> Vec> { let config = GlobalConfig::global(); let app_config = config.logging.clone(); let mut time_level = LevelFilter::Off; @@ -211,7 +199,12 @@ pub fn init_logging(rt_handle: Handle) -> Vec> { if config.mail.recipient.len() > 3 { let mut filter = LevelFilter::Error; - let timer = Timer::new(rt_handle); + let messages: Arc>> = Arc::new(Mutex::new(Vec::new())); + + rt_handle.spawn(mail_queue( + messages.clone(), + is_terminated.clone(), + )); let mail_config = log_config .clone() @@ -222,7 +215,7 @@ pub fn init_logging(rt_handle: Handle) -> Vec> { filter = LevelFilter::Warn } - app_logger.push(LogMailer::new(filter, mail_config, timer)); + app_logger.push(LogMailer::new(filter, mail_config, messages)); } app_logger diff --git a/src/utils/mod.rs b/src/utils/mod.rs index f485558e..ca93036c 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -151,11 +151,11 @@ impl MediaProbe { } } -pub fn get_timestamp() -> i64 { - let local: DateTime = Local::now(); +// pub fn get_timestamp() -> i64 { +// let local: DateTime = Local::now(); - local.timestamp_millis() as i64 -} +// local.timestamp_millis() as i64 +// } pub fn get_sec() -> f64 { let local: DateTime = Local::now(); From c5684f4c141a669838e0bf93970ac4a5b3f2cd2a Mon Sep 17 00:00:00 2001 From: jb-alvarado Date: Mon, 28 Mar 2022 15:59:58 +0200 Subject: [PATCH 4/4] updates --- Cargo.lock | 20 ++++++++++---------- Cargo.toml | 14 +++++++------- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index fb5a48f6..878a72ee 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -295,9 +295,9 @@ dependencies = [ [[package]] name = "getrandom" -version = "0.2.5" +version = "0.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d39cd93900197114fa1fcb7ae84ca742095eed9442088988ae74fa744e930e77" +checksum = "9be70c98951c83b8d2f8f60d7065fa6d5146873094452a1008da8c2f1e4205ad" dependencies = [ "cfg-if 1.0.0", "libc", @@ -463,9 +463,9 @@ checksum = "7fb9b38af92608140b86b693604b9ffcc5824240a484d1ecd4795bacb2fe88f3" [[package]] name = "log" -version = "0.4.15" +version = "0.4.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1c4dcd960cc540667f619483fc99102f88d6118b87730e24e8fbe8054b7445e4" +checksum = "6389c490849ff5bc16be905ae24bc913a9c8892e19b2341dbc175e14c341c2b8" dependencies = [ "cfg-if 1.0.0", ] @@ -555,9 +555,9 @@ dependencies = [ [[package]] name = "native-tls" -version = "0.2.8" +version = "0.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "48ba9f7719b5a0f42f338907614285fb5fd70e53858141f69898a1fb7203b24d" +checksum = "09bf6f32a3afefd0b587ee42ed19acd945c6d1f3b5424040f50b2f24ab16be77" dependencies = [ "lazy_static", "libc", @@ -772,9 +772,9 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.16" +version = "1.0.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b4af2ec4714533fcdf07e886f17025ace8b997b9ce51204ee69b6da831c3da57" +checksum = "632d02bff7f874a36f33ea8bb416cd484b90cc66c1194b1a1110d067a7013f58" dependencies = [ "proc-macro2", ] @@ -817,9 +817,9 @@ dependencies = [ [[package]] name = "redox_syscall" -version = "0.2.11" +version = "0.2.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8380fe0152551244f0747b1bf41737e0f8a74f97a14ccefd1148187271634f3c" +checksum = "8ae183fc1b06c149f0c1793e1eb447c8b04bfe46d48e9e48bfb8d2d7ed64ecf0" dependencies = [ "bitflags", ] diff --git a/Cargo.toml b/Cargo.toml index 77c320b3..14b79859 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,22 +5,22 @@ edition = "2021" [dependencies] chrono = "0.4" -clap = { version = "3.1.6", features = ["derive"] } +clap = { version = "3.1", features = ["derive"] } ffprobe = "0.3" -file-rotate = "0.6.0" +file-rotate = "0.6" lettre = "0.10.0-rc.4" -log = "0.4.14" -notify = "4.0.17" +log = "0.4" +notify = "4.0" once_cell = "1.10" process_control = "3.3" -rand = "0.8.5" +rand = "0.8" regex = "1" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" serde_yaml = "0.8" shlex = "1.1" -simplelog = { version = "^0.11.2", features = ["paris"] } -tokio = { version = "1.16.1", features = ["rt-multi-thread"] } +simplelog = { version = "^0.11", features = ["paris"] } +tokio = { version = "1.16", features = ["rt-multi-thread"] } walkdir = "2" [target.x86_64-unknown-linux-musl.dependencies]