Merge pull request #6 from jb-alvarado/main

send error mails only every 30 seconds
This commit is contained in:
jb-alvarado 2022-03-28 16:02:38 +02:00 committed by GitHub
commit a9f380efc9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 234 additions and 101 deletions

View File

@ -1,7 +0,0 @@
[target.x86_64-apple-darwin]
linker = "x86_64-apple-darwin20.4-clang"
ar = "x86_64-apple-darwin20.4-ar"
[target.aarch64-apple-darwin]
linker = "aarch64-apple-darwin20.4-clang"
ar = "aarch64-apple-darwin20.4-ar"

22
Cargo.lock generated
View File

@ -143,7 +143,7 @@ dependencies = [
[[package]] [[package]]
name = "ffplayout-rs" name = "ffplayout-rs"
version = "0.6.1" version = "0.7.0"
dependencies = [ dependencies = [
"chrono", "chrono",
"clap", "clap",
@ -295,9 +295,9 @@ dependencies = [
[[package]] [[package]]
name = "getrandom" name = "getrandom"
version = "0.2.5" version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d39cd93900197114fa1fcb7ae84ca742095eed9442088988ae74fa744e930e77" checksum = "9be70c98951c83b8d2f8f60d7065fa6d5146873094452a1008da8c2f1e4205ad"
dependencies = [ dependencies = [
"cfg-if 1.0.0", "cfg-if 1.0.0",
"libc", "libc",
@ -463,9 +463,9 @@ checksum = "7fb9b38af92608140b86b693604b9ffcc5824240a484d1ecd4795bacb2fe88f3"
[[package]] [[package]]
name = "log" name = "log"
version = "0.4.15" version = "0.4.16"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1c4dcd960cc540667f619483fc99102f88d6118b87730e24e8fbe8054b7445e4" checksum = "6389c490849ff5bc16be905ae24bc913a9c8892e19b2341dbc175e14c341c2b8"
dependencies = [ dependencies = [
"cfg-if 1.0.0", "cfg-if 1.0.0",
] ]
@ -555,9 +555,9 @@ dependencies = [
[[package]] [[package]]
name = "native-tls" name = "native-tls"
version = "0.2.8" version = "0.2.9"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "48ba9f7719b5a0f42f338907614285fb5fd70e53858141f69898a1fb7203b24d" checksum = "09bf6f32a3afefd0b587ee42ed19acd945c6d1f3b5424040f50b2f24ab16be77"
dependencies = [ dependencies = [
"lazy_static", "lazy_static",
"libc", "libc",
@ -772,9 +772,9 @@ dependencies = [
[[package]] [[package]]
name = "quote" name = "quote"
version = "1.0.16" version = "1.0.17"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4af2ec4714533fcdf07e886f17025ace8b997b9ce51204ee69b6da831c3da57" checksum = "632d02bff7f874a36f33ea8bb416cd484b90cc66c1194b1a1110d067a7013f58"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
] ]
@ -817,9 +817,9 @@ dependencies = [
[[package]] [[package]]
name = "redox_syscall" name = "redox_syscall"
version = "0.2.11" version = "0.2.12"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8380fe0152551244f0747b1bf41737e0f8a74f97a14ccefd1148187271634f3c" checksum = "8ae183fc1b06c149f0c1793e1eb447c8b04bfe46d48e9e48bfb8d2d7ed64ecf0"
dependencies = [ dependencies = [
"bitflags", "bitflags",
] ]

View File

@ -1,26 +1,26 @@
[package] [package]
name = "ffplayout-rs" name = "ffplayout-rs"
version = "0.6.1" version = "0.7.0"
edition = "2021" edition = "2021"
[dependencies] [dependencies]
chrono = "0.4" chrono = "0.4"
clap = { version = "3.1.6", features = ["derive"] } clap = { version = "3.1", features = ["derive"] }
ffprobe = "0.3" ffprobe = "0.3"
file-rotate = "0.6.0" file-rotate = "0.6"
lettre = "0.10.0-rc.4" lettre = "0.10.0-rc.4"
log = "0.4.14" log = "0.4"
notify = "4.0.17" notify = "4.0"
once_cell = "1.10" once_cell = "1.10"
process_control = "3.3" process_control = "3.3"
rand = "0.8.5" rand = "0.8"
regex = "1" regex = "1"
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0" serde_json = "1.0"
serde_yaml = "0.8" serde_yaml = "0.8"
shlex = "1.1" shlex = "1.1"
simplelog = { version = "^0.11.2", features = ["paris"] } simplelog = { version = "^0.11", features = ["paris"] }
tokio = { version = "1.16.1", features = ["rt-multi-thread"] } tokio = { version = "1.16", features = ["rt-multi-thread"] }
walkdir = "2" walkdir = "2"
[target.x86_64-unknown-linux-musl.dependencies] [target.x86_64-unknown-linux-musl.dependencies]

View File

@ -41,6 +41,18 @@ rustup target add aarch64-apple-darwin
rustup target add x86_64-apple-darwin rustup target add x86_64-apple-darwin
``` ```
Add linker and ar settings to `~/.cargo/config`:
```Bash
[target.x86_64-apple-darwin]
linker = "x86_64-apple-darwin20.4-clang"
ar = "x86_64-apple-darwin20.4-ar"
[target.aarch64-apple-darwin]
linker = "aarch64-apple-darwin20.4-clang"
ar = "aarch64-apple-darwin20.4-ar"
```
Follow this guide: [rust-cross-compile-linux-to-macos](https://wapl.es/rust/2019/02/17/rust-cross-compile-linux-to-macos.html) Follow this guide: [rust-cross-compile-linux-to-macos](https://wapl.es/rust/2019/02/17/rust-cross-compile-linux-to-macos.html)
Or setup [osxcross](https://github.com/tpoechtrager/osxcross) correctly. Or setup [osxcross](https://github.com/tpoechtrager/osxcross) correctly.

59
examples/timer.rs Normal file
View File

@ -0,0 +1,59 @@
use chrono::prelude::*;
use std::{
thread::sleep};
fn get_timestamp() -> i64 {
let local: DateTime<Local> = Local::now();
local.timestamp_millis() as i64
}
struct Timer {
init: bool,
timestamp: i64,
limit: i64,
messages: Vec<String>,
}
impl Timer {
fn new() -> Self {
Self {
init: true,
timestamp: get_timestamp(),
limit: 10 * 1000,
messages: vec![],
}
}
fn reset(&mut self) {
self.messages.clear();
self.timestamp = get_timestamp();
}
fn send(&mut self, msg: String) {
let now = get_timestamp();
self.messages.push(msg);
if self.init {
self.reset();
self.init = false;
}
if now >= self.timestamp + self.limit {
println!("Send messages: {:?}", self.messages);
self.reset();
}
}
}
fn main() {
let mut timer = Timer::new();
for i in 0..40 {
println!("{:?}", i);
timer.send(format!("{:?}", i));
sleep(std::time::Duration::from_secs(1));
}
}

View File

@ -7,8 +7,8 @@ use simplelog::*;
use tokio::runtime::Handle; use tokio::runtime::Handle;
use crate::utils::{ use crate::utils::{
check_sync, gen_dummy, get_delta, get_sec, is_close, json_reader::read_json, modified_time, check_sync, gen_dummy, get_date, get_delta, get_sec, is_close, json_reader::read_json,
seek_and_length, GlobalConfig, Media, DUMMY_LEN, modified_time, seek_and_length, GlobalConfig, Media, DUMMY_LEN,
}; };
#[derive(Debug)] #[derive(Debug)]
@ -17,6 +17,7 @@ pub struct CurrentProgram {
start_sec: f64, start_sec: f64,
json_mod: Option<String>, json_mod: Option<String>,
json_path: Option<String>, json_path: Option<String>,
json_date: String,
nodes: Vec<Media>, nodes: Vec<Media>,
current_node: Media, current_node: Media,
pub init: Arc<Mutex<bool>>, pub init: Arc<Mutex<bool>>,
@ -35,6 +36,7 @@ impl CurrentProgram {
start_sec: json.start_sec.unwrap(), start_sec: json.start_sec.unwrap(),
json_mod: json.modified, json_mod: json.modified,
json_path: json.current_file, json_path: json.current_file,
json_date: json.date,
nodes: json.program, nodes: json.program,
current_node: Media::new(0, "".to_string()), current_node: Media::new(0, "".to_string()),
init: Arc::new(Mutex::new(true)), init: Arc::new(Mutex::new(true)),
@ -66,6 +68,11 @@ impl CurrentProgram {
.eq(&self.json_mod.clone().unwrap()) .eq(&self.json_mod.clone().unwrap())
{ {
// when playlist has changed, reload it // when playlist has changed, reload it
info!(
"Reload playlist <b><magenta>{}</></b>",
self.json_path.clone().unwrap()
);
let json = read_json( let json = read_json(
self.json_path.clone(), self.json_path.clone(),
self.rt_handle.clone(), self.rt_handle.clone(),
@ -107,10 +114,12 @@ impl CurrentProgram {
} }
let next_start = self.current_node.begin.unwrap() - start_sec + duration + delta; let next_start = self.current_node.begin.unwrap() - start_sec + duration + delta;
let date = get_date(false, start_sec, next_start);
if next_start >= target_length if (next_start >= target_length
|| is_close(total_delta, 0.0, 2.0) || is_close(total_delta, 0.0, 2.0)
|| is_close(total_delta, target_length, 2.0) || is_close(total_delta, target_length, 2.0))
&& date != self.json_date
{ {
let json = read_json( let json = read_json(
None, None,
@ -122,6 +131,7 @@ impl CurrentProgram {
self.json_path = json.current_file.clone(); self.json_path = json.current_file.clone();
self.json_mod = json.modified; self.json_mod = json.modified;
self.json_date = json.date;
self.nodes = json.program; self.nodes = json.program;
self.index = 0; self.index = 0;
@ -250,6 +260,7 @@ impl Iterator for CurrentProgram {
self.check_for_next_playlist(); self.check_for_next_playlist();
Some(self.current_node.clone()) Some(self.current_node.clone())
} else { } else {
println!("last: {:?}", self.json_path);
let last_playlist = self.json_path.clone(); let last_playlist = self.json_path.clone();
self.check_for_next_playlist(); self.check_for_next_playlist();
let (_, total_delta) = get_delta(&self.config.playlist.start_sec.unwrap()); let (_, total_delta) = get_delta(&self.config.playlist.start_sec.unwrap());

View File

@ -1,13 +1,15 @@
extern crate log; extern crate log;
extern crate simplelog; extern crate simplelog;
use std::sync::{Arc, Mutex};
mod filter; mod filter;
mod input; mod input;
mod output; mod output;
mod utils; mod utils;
use simplelog::*; use simplelog::*;
use tokio::runtime::Runtime; use tokio::runtime::Builder;
use crate::output::play; use crate::output::play;
use crate::utils::{init_config, init_logging, validate_ffmpeg}; use crate::utils::{init_config, init_logging, validate_ffmpeg};
@ -15,13 +17,17 @@ use crate::utils::{init_config, init_logging, validate_ffmpeg};
fn main() { fn main() {
init_config(); init_config();
let runtime = Runtime::new().unwrap(); let runtime = Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();
let rt_handle = runtime.handle(); let rt_handle = runtime.handle();
let is_terminated: Arc<Mutex<bool>> = Arc::new(Mutex::new(false));
let logging = init_logging(rt_handle.clone()); let logging = init_logging(rt_handle.clone(), is_terminated.clone());
CombinedLogger::init(logging).unwrap(); CombinedLogger::init(logging).unwrap();
validate_ffmpeg(); validate_ffmpeg();
play(rt_handle); play(rt_handle, is_terminated);
} }

View File

@ -77,13 +77,12 @@ impl Drop for ProcessCleanup {
} }
} }
pub fn play(rt_handle: &Handle) { pub fn play(rt_handle: &Handle, is_terminated: Arc<Mutex<bool>>) {
let config = GlobalConfig::global(); let config = GlobalConfig::global();
let dec_settings = config.processing.clone().settings.unwrap(); let dec_settings = config.processing.clone().settings.unwrap();
let ff_log_format = format!("level+{}", config.logging.ffmpeg_level.to_lowercase()); let ff_log_format = format!("level+{}", config.logging.ffmpeg_level.to_lowercase());
let server_term: Arc<Mutex<Option<Terminator>>> = Arc::new(Mutex::new(None)); let server_term: Arc<Mutex<Option<Terminator>>> = Arc::new(Mutex::new(None));
let is_terminated: Arc<Mutex<bool>> = Arc::new(Mutex::new(false));
let server_is_running: Arc<Mutex<bool>> = Arc::new(Mutex::new(false)); let server_is_running: Arc<Mutex<bool>> = Arc::new(Mutex::new(false));
let mut init_playlist: Option<Arc<Mutex<bool>>> = None; let mut init_playlist: Option<Arc<Mutex<bool>>> = None;
let mut live_on = false; let mut live_on = false;

View File

@ -1,5 +1,9 @@
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::{fs::File, path::Path, sync::{Arc, Mutex}}; use std::{
fs::File,
path::Path,
sync::{Arc, Mutex},
};
use simplelog::*; use simplelog::*;
use tokio::runtime::Handle; use tokio::runtime::Handle;
@ -33,7 +37,13 @@ impl Playlist {
} }
} }
pub fn read_json(path: Option<String>, rt_handle: Handle, is_terminated: Arc<Mutex<bool>>, seek: bool, next_start: f64) -> Playlist { pub fn read_json(
path: Option<String>,
rt_handle: Handle,
is_terminated: Arc<Mutex<bool>>,
seek: bool,
next_start: f64,
) -> Playlist {
let config = GlobalConfig::global(); let config = GlobalConfig::global();
let mut playlist_path = Path::new(&config.playlist.path).to_owned(); let mut playlist_path = Path::new(&config.playlist.path).to_owned();
@ -86,7 +96,11 @@ pub fn read_json(path: Option<String>, rt_handle: Handle, is_terminated: Arc<Mut
start_sec += item.out - item.seek; start_sec += item.out - item.seek;
} }
rt_handle.spawn(validate_playlist(playlist.clone(), is_terminated, config.clone())); rt_handle.spawn(validate_playlist(
playlist.clone(),
is_terminated,
config.clone(),
));
playlist playlist
} }

View File

@ -2,7 +2,12 @@ extern crate log;
extern crate simplelog; extern crate simplelog;
use regex::Regex; use regex::Regex;
use std::path::Path; use std::{
path::Path,
sync::{Arc, Mutex},
thread::sleep,
time::Duration,
};
use file_rotate::{compression::Compression, suffix::AppendCount, ContentLimit, FileRotate}; use file_rotate::{compression::Compression, suffix::AppendCount, ContentLimit, FileRotate};
use lettre::{transport::smtp::authentication::Credentials, Message, SmtpTransport, Transport}; use lettre::{transport::smtp::authentication::Credentials, Message, SmtpTransport, Transport};
@ -12,65 +17,7 @@ use tokio::runtime::Handle;
use crate::utils::GlobalConfig; use crate::utils::GlobalConfig;
pub struct LogMailer { fn send_mail(msg: String) {
level: LevelFilter,
config: Config,
handle: Handle,
}
impl LogMailer {
pub fn new(log_level: LevelFilter, config: Config, handle: Handle) -> Box<LogMailer> {
Box::new(LogMailer {
level: log_level,
config,
handle,
})
}
}
impl Log for LogMailer {
fn enabled(&self, metadata: &Metadata<'_>) -> bool {
metadata.level() <= self.level
}
fn log(&self, record: &Record<'_>) {
if self.enabled(record.metadata()) {
match record.level() {
Level::Error => {
self.handle.spawn(send_mail(record.args().to_string()));
},
Level::Warn => {
self.handle.spawn(send_mail(record.args().to_string()));
},
_ => (),
}
}
}
fn flush(&self) {}
}
impl SharedLogger for LogMailer {
fn level(&self) -> LevelFilter {
self.level
}
fn config(&self) -> Option<&Config> {
Some(&self.config)
}
fn as_log(self: Box<Self>) -> Box<dyn Log> {
Box::new(*self)
}
}
fn clean_string(text: String) -> String {
let regex: Regex = Regex::new(r"\x1b\[[0-9;]*[mGKF]").unwrap();
regex.replace_all(text.as_str(), "").to_string()
}
async fn send_mail(msg: String) {
let config = GlobalConfig::global(); let config = GlobalConfig::global();
let email = Message::builder() let email = Message::builder()
@ -100,7 +47,93 @@ async fn send_mail(msg: String) {
} }
} }
pub fn init_logging(rt_handle: Handle) -> Vec<Box<dyn SharedLogger>> { async fn mail_queue(messages: Arc<Mutex<Vec<String>>>, is_terminated: Arc<Mutex<bool>>) {
let mut count = 0;
loop {
if *is_terminated.lock().unwrap() || count == 60 {
// check every 30 seconds for messages and send them
if messages.lock().unwrap().len() > 0 {
let msg = messages.lock().unwrap().join("\n");
send_mail(msg);
messages.lock().unwrap().clear();
}
count = 0;
}
if *is_terminated.lock().unwrap() {
break;
}
sleep(Duration::from_millis(500));
count += 1;
}
}
pub struct LogMailer {
level: LevelFilter,
config: Config,
messages: Arc<Mutex<Vec<String>>>,
}
impl LogMailer {
pub fn new(log_level: LevelFilter, config: Config, messages: Arc<Mutex<Vec<String>>>) -> Box<LogMailer> {
Box::new(LogMailer {
level: log_level,
config,
messages,
})
}
}
impl Log for LogMailer {
fn enabled(&self, metadata: &Metadata<'_>) -> bool {
metadata.level() <= self.level
}
fn log(&self, record: &Record<'_>) {
if self.enabled(record.metadata()) {
match record.level() {
Level::Error => {
self.messages.lock().unwrap().push(record.args().to_string());
}
Level::Warn => {
self.messages.lock().unwrap().push(record.args().to_string());
}
_ => (),
}
}
}
fn flush(&self) {}
}
impl SharedLogger for LogMailer {
fn level(&self) -> LevelFilter {
self.level
}
fn config(&self) -> Option<&Config> {
Some(&self.config)
}
fn as_log(self: Box<Self>) -> Box<dyn Log> {
Box::new(*self)
}
}
fn clean_string(text: String) -> String {
let regex: Regex = Regex::new(r"\x1b\[[0-9;]*[mGKF]").unwrap();
regex.replace_all(text.as_str(), "").to_string()
}
pub fn init_logging(
rt_handle: Handle,
is_terminated: Arc<Mutex<bool>>,
) -> Vec<Box<dyn SharedLogger>> {
let config = GlobalConfig::global(); let config = GlobalConfig::global();
let app_config = config.logging.clone(); let app_config = config.logging.clone();
let mut time_level = LevelFilter::Off; let mut time_level = LevelFilter::Off;
@ -166,6 +199,12 @@ pub fn init_logging(rt_handle: Handle) -> Vec<Box<dyn SharedLogger>> {
if config.mail.recipient.len() > 3 { if config.mail.recipient.len() > 3 {
let mut filter = LevelFilter::Error; let mut filter = LevelFilter::Error;
let messages: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
rt_handle.spawn(mail_queue(
messages.clone(),
is_terminated.clone(),
));
let mail_config = log_config let mail_config = log_config
.clone() .clone()
@ -176,7 +215,7 @@ pub fn init_logging(rt_handle: Handle) -> Vec<Box<dyn SharedLogger>> {
filter = LevelFilter::Warn filter = LevelFilter::Warn
} }
app_logger.push(LogMailer::new(filter, mail_config, rt_handle)); app_logger.push(LogMailer::new(filter, mail_config, messages));
} }
app_logger app_logger