From 372c3ba7739dd762376328d86871746a42f50145 Mon Sep 17 00:00:00 2001 From: jb-alvarado Date: Thu, 21 Apr 2022 11:18:10 +0200 Subject: [PATCH] replace std sleep with tokio sleep this makes it more easy to shutdown a async task --- src/input/folder.rs | 13 ++++------- src/input/ingest.rs | 12 ++-------- src/input/mod.rs | 2 +- src/main.rs | 2 +- src/utils/config.rs | 2 +- src/utils/logging.rs | 55 ++++++++++++++------------------------------ 6 files changed, 26 insertions(+), 60 deletions(-) diff --git a/src/input/folder.rs b/src/input/folder.rs index 3d9fe8b6..3c83bb1a 100644 --- a/src/input/folder.rs +++ b/src/input/folder.rs @@ -6,8 +6,6 @@ use std::{ mpsc::channel, {Arc, Mutex}, }, - thread::sleep, - time::Duration, }; use notify::{ @@ -16,6 +14,7 @@ use notify::{ }; use rand::{seq::SliceRandom, thread_rng}; use simplelog::*; +use tokio::time::{sleep, Duration}; use walkdir::WalkDir; use crate::utils::{get_sec, GlobalConfig, Media}; @@ -158,7 +157,7 @@ fn file_extension(filename: &Path) -> Option<&str> { filename.extension().and_then(OsStr::to_str) } -pub async fn watchman(sources: Arc>>, is_terminated: Arc>) { +pub async fn watchman(sources: Arc>>) { let config = GlobalConfig::global(); let (tx, rx) = channel(); @@ -169,14 +168,10 @@ pub async fn watchman(sources: Arc>>, is_terminated: Arc { @@ -210,6 +205,6 @@ pub async fn watchman(sources: Arc>>, is_terminated: Arc{}", &config.storage.path); let folder_source = Source::new(current_list, index); - rt_handle.spawn(watchman(folder_source.nodes.clone(), is_terminated.clone())); + rt_handle.spawn(watchman(folder_source.nodes.clone())); Box::new(folder_source) as Box> } diff --git a/src/main.rs b/src/main.rs index bdb8e894..07fc434a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -63,7 +63,7 @@ fn main() { let runtime = Builder::new_multi_thread().enable_all().build().unwrap(); let rt_handle = runtime.handle(); - let logging = init_logging(rt_handle.clone(), proc_control.is_terminated.clone()); + let logging = init_logging(rt_handle.clone()); CombinedLogger::init(logging).unwrap(); validate_ffmpeg(); diff --git a/src/utils/config.rs b/src/utils/config.rs index 18f65aa6..68aa288c 100644 --- a/src/utils/config.rs +++ b/src/utils/config.rs @@ -51,7 +51,7 @@ pub struct Mail { pub sender_pass: String, pub recipient: String, pub mail_level: String, - pub interval: i32, + pub interval: u64, } #[derive(Debug, Serialize, Deserialize, Clone)] diff --git a/src/utils/logging.rs b/src/utils/logging.rs index 7c0a1c17..5b00ef01 100644 --- a/src/utils/logging.rs +++ b/src/utils/logging.rs @@ -1,13 +1,9 @@ extern crate log; extern crate simplelog; -use chrono::prelude::*; -use regex::Regex; use std::{ path::Path, sync::{Arc, Mutex}, - thread::sleep, - time::Duration, }; use file_rotate::{compression::Compression, suffix::AppendCount, ContentLimit, FileRotate}; @@ -15,9 +11,15 @@ use lettre::{ message::header, transport::smtp::authentication::Credentials, Message, SmtpTransport, Transport, }; + +use chrono::prelude::*; use log::{Level, LevelFilter, Log, Metadata, Record}; +use regex::Regex; use simplelog::*; -use tokio::runtime::Handle; +use tokio::{ + runtime::Handle, + time::{sleep, Duration}, +}; use crate::utils::GlobalConfig; @@ -52,32 +54,18 @@ fn send_mail(msg: String) { } } -async fn mail_queue( - messages: Arc>>, - is_terminated: Arc>, - interval: i32, -) { - let mut count: i32 = 0; +async fn mail_queue(messages: Arc>>, interval: u64) { + // check every give seconds for messages and send them loop { - if *is_terminated.lock().unwrap() || count == interval { - // check every 30 seconds for messages and send them - if messages.lock().unwrap().len() > 0 { - let msg = messages.lock().unwrap().join("\n"); - send_mail(msg); + if messages.lock().unwrap().len() > 0 { + let msg = messages.lock().unwrap().join("\n"); + send_mail(msg); - messages.lock().unwrap().clear(); - } - - count = 0; + messages.lock().unwrap().clear(); } - if *is_terminated.lock().unwrap() { - break; - } - - sleep(Duration::from_secs(1)); - count += 1; + sleep(Duration::from_secs(interval)).await; } } @@ -141,10 +129,7 @@ fn clean_string(text: &str) -> String { regex.replace_all(text, "").to_string() } -pub fn init_logging( - rt_handle: Handle, - is_terminated: Arc>, -) -> Vec> { +pub fn init_logging(rt_handle: Handle) -> Vec> { let config = GlobalConfig::global(); let app_config = config.logging.clone(); let mut time_level = LevelFilter::Off; @@ -212,15 +197,9 @@ pub fn init_logging( let messages: Arc>> = Arc::new(Mutex::new(Vec::new())); let interval = config.mail.interval.clone(); - rt_handle.spawn(mail_queue( - messages.clone(), - is_terminated.clone(), - interval, - )); + rt_handle.spawn(mail_queue(messages.clone(), interval)); - let mail_config = log_config - .clone() - .build(); + let mail_config = log_config.clone().build(); let filter = match config.mail.mail_level.to_lowercase().as_str() { "info" => LevelFilter::Info,