replace std sleep with tokio sleep

this makes it more easy to shutdown a async task
This commit is contained in:
jb-alvarado 2022-04-21 11:18:10 +02:00
parent 2d561400ab
commit 372c3ba773
6 changed files with 26 additions and 60 deletions

View File

@ -6,8 +6,6 @@ use std::{
mpsc::channel, mpsc::channel,
{Arc, Mutex}, {Arc, Mutex},
}, },
thread::sleep,
time::Duration,
}; };
use notify::{ use notify::{
@ -16,6 +14,7 @@ use notify::{
}; };
use rand::{seq::SliceRandom, thread_rng}; use rand::{seq::SliceRandom, thread_rng};
use simplelog::*; use simplelog::*;
use tokio::time::{sleep, Duration};
use walkdir::WalkDir; use walkdir::WalkDir;
use crate::utils::{get_sec, GlobalConfig, Media}; use crate::utils::{get_sec, GlobalConfig, Media};
@ -158,7 +157,7 @@ fn file_extension(filename: &Path) -> Option<&str> {
filename.extension().and_then(OsStr::to_str) filename.extension().and_then(OsStr::to_str)
} }
pub async fn watchman(sources: Arc<Mutex<Vec<Media>>>, is_terminated: Arc<Mutex<bool>>) { pub async fn watchman(sources: Arc<Mutex<Vec<Media>>>) {
let config = GlobalConfig::global(); let config = GlobalConfig::global();
let (tx, rx) = channel(); let (tx, rx) = channel();
@ -169,14 +168,10 @@ pub async fn watchman(sources: Arc<Mutex<Vec<Media>>>, is_terminated: Arc<Mutex<
panic!("Folder path not exists: '{path}'"); panic!("Folder path not exists: '{path}'");
} }
let mut watcher = watcher(tx, Duration::from_secs(2)).unwrap(); let mut watcher = watcher(tx, Duration::from_secs(1)).unwrap();
watcher.watch(path, RecursiveMode::Recursive).unwrap(); watcher.watch(path, RecursiveMode::Recursive).unwrap();
loop { loop {
if *is_terminated.lock().unwrap() {
break;
}
if let Ok(res) = rx.try_recv() { if let Ok(res) = rx.try_recv() {
match res { match res {
Create(new_path) => { Create(new_path) => {
@ -210,6 +205,6 @@ pub async fn watchman(sources: Arc<Mutex<Vec<Media>>>, is_terminated: Arc<Mutex<
} }
} }
sleep(Duration::from_secs(4)); sleep(Duration::from_secs(5)).await;
} }
} }

View File

@ -2,8 +2,6 @@ use std::{
io::{BufReader, Error, Read}, io::{BufReader, Error, Read},
path::Path, path::Path,
process::{Command, Stdio}, process::{Command, Stdio},
thread::sleep,
time::Duration,
}; };
use crossbeam_channel::Sender; use crossbeam_channel::Sender;
@ -100,10 +98,7 @@ pub async fn ingest_server(
server_cmd.join(" ") server_cmd.join(" ")
); );
loop { 'ingest_iter: loop {
if *proc_control.is_terminated.lock().unwrap() {
break;
}
let mut server_proc = match Command::new("ffmpeg") let mut server_proc = match Command::new("ffmpeg")
.args(server_cmd.clone()) .args(server_cmd.clone())
.stdout(Stdio::piped()) .stdout(Stdio::piped())
@ -143,7 +138,7 @@ pub async fn ingest_server(
error!("Ingest server write error: {e:?}"); error!("Ingest server write error: {e:?}");
*proc_control.is_terminated.lock().unwrap() = true; *proc_control.is_terminated.lock().unwrap() = true;
break; break 'ingest_iter;
} }
} else { } else {
break; break;
@ -151,11 +146,8 @@ pub async fn ingest_server(
} }
drop(ingest_reader); drop(ingest_reader);
*proc_control.server_is_running.lock().unwrap() = false; *proc_control.server_is_running.lock().unwrap() = false;
sleep(Duration::from_secs(1));
if let Err(e) = proc_control.wait(Ingest) { if let Err(e) = proc_control.wait(Ingest) {
error!("{e}") error!("{e}")
} }

View File

@ -30,7 +30,7 @@ pub fn source_generator(
debug!("Monitor folder: <b><magenta>{}</></b>", &config.storage.path); debug!("Monitor folder: <b><magenta>{}</></b>", &config.storage.path);
let folder_source = Source::new(current_list, index); let folder_source = Source::new(current_list, index);
rt_handle.spawn(watchman(folder_source.nodes.clone(), is_terminated.clone())); rt_handle.spawn(watchman(folder_source.nodes.clone()));
Box::new(folder_source) as Box<dyn Iterator<Item = Media>> Box::new(folder_source) as Box<dyn Iterator<Item = Media>>
} }

View File

@ -63,7 +63,7 @@ fn main() {
let runtime = Builder::new_multi_thread().enable_all().build().unwrap(); let runtime = Builder::new_multi_thread().enable_all().build().unwrap();
let rt_handle = runtime.handle(); let rt_handle = runtime.handle();
let logging = init_logging(rt_handle.clone(), proc_control.is_terminated.clone()); let logging = init_logging(rt_handle.clone());
CombinedLogger::init(logging).unwrap(); CombinedLogger::init(logging).unwrap();
validate_ffmpeg(); validate_ffmpeg();

View File

@ -51,7 +51,7 @@ pub struct Mail {
pub sender_pass: String, pub sender_pass: String,
pub recipient: String, pub recipient: String,
pub mail_level: String, pub mail_level: String,
pub interval: i32, pub interval: u64,
} }
#[derive(Debug, Serialize, Deserialize, Clone)] #[derive(Debug, Serialize, Deserialize, Clone)]

View File

@ -1,13 +1,9 @@
extern crate log; extern crate log;
extern crate simplelog; extern crate simplelog;
use chrono::prelude::*;
use regex::Regex;
use std::{ use std::{
path::Path, path::Path,
sync::{Arc, Mutex}, sync::{Arc, Mutex},
thread::sleep,
time::Duration,
}; };
use file_rotate::{compression::Compression, suffix::AppendCount, ContentLimit, FileRotate}; use file_rotate::{compression::Compression, suffix::AppendCount, ContentLimit, FileRotate};
@ -15,9 +11,15 @@ use lettre::{
message::header, transport::smtp::authentication::Credentials, Message, SmtpTransport, message::header, transport::smtp::authentication::Credentials, Message, SmtpTransport,
Transport, Transport,
}; };
use chrono::prelude::*;
use log::{Level, LevelFilter, Log, Metadata, Record}; use log::{Level, LevelFilter, Log, Metadata, Record};
use regex::Regex;
use simplelog::*; use simplelog::*;
use tokio::runtime::Handle; use tokio::{
runtime::Handle,
time::{sleep, Duration},
};
use crate::utils::GlobalConfig; use crate::utils::GlobalConfig;
@ -52,32 +54,18 @@ fn send_mail(msg: String) {
} }
} }
async fn mail_queue( async fn mail_queue(messages: Arc<Mutex<Vec<String>>>, interval: u64) {
messages: Arc<Mutex<Vec<String>>>, // check every give seconds for messages and send them
is_terminated: Arc<Mutex<bool>>,
interval: i32,
) {
let mut count: i32 = 0;
loop { loop {
if *is_terminated.lock().unwrap() || count == interval { if messages.lock().unwrap().len() > 0 {
// check every 30 seconds for messages and send them let msg = messages.lock().unwrap().join("\n");
if messages.lock().unwrap().len() > 0 { send_mail(msg);
let msg = messages.lock().unwrap().join("\n");
send_mail(msg);
messages.lock().unwrap().clear(); messages.lock().unwrap().clear();
}
count = 0;
} }
if *is_terminated.lock().unwrap() { sleep(Duration::from_secs(interval)).await;
break;
}
sleep(Duration::from_secs(1));
count += 1;
} }
} }
@ -141,10 +129,7 @@ fn clean_string(text: &str) -> String {
regex.replace_all(text, "").to_string() regex.replace_all(text, "").to_string()
} }
pub fn init_logging( pub fn init_logging(rt_handle: Handle) -> Vec<Box<dyn SharedLogger>> {
rt_handle: Handle,
is_terminated: Arc<Mutex<bool>>,
) -> Vec<Box<dyn SharedLogger>> {
let config = GlobalConfig::global(); let config = GlobalConfig::global();
let app_config = config.logging.clone(); let app_config = config.logging.clone();
let mut time_level = LevelFilter::Off; let mut time_level = LevelFilter::Off;
@ -212,15 +197,9 @@ pub fn init_logging(
let messages: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new())); let messages: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
let interval = config.mail.interval.clone(); let interval = config.mail.interval.clone();
rt_handle.spawn(mail_queue( rt_handle.spawn(mail_queue(messages.clone(), interval));
messages.clone(),
is_terminated.clone(),
interval,
));
let mail_config = log_config let mail_config = log_config.clone().build();
.clone()
.build();
let filter = match config.mail.mail_level.to_lowercase().as_str() { let filter = match config.mail.mail_level.to_lowercase().as_str() {
"info" => LevelFilter::Info, "info" => LevelFilter::Info,