use notify::DebouncedEvent::{Create, Remove, Rename}; use notify::{watcher, RecursiveMode, Watcher}; use std::{ ffi::OsStr, path::Path, sync::{ mpsc::{channel, Receiver}, {Arc, Mutex}, }, thread::sleep, time::Duration, }; use walkdir::WalkDir; use tokio::runtime::Builder; #[derive(Debug, Clone)] pub struct Source { stop: Arc>, nodes: Arc>>, index: usize, } impl Source { pub fn new(path: String) -> Self { let mut file_list = vec![]; for entry in WalkDir::new(path.clone()) .into_iter() .filter_map(|e| e.ok()) { if entry.path().is_file() { let ext = file_extension(entry.path()); if ext.is_some() && ["mp4".to_string(), "mkv".to_string()] .clone() .contains(&ext.unwrap().to_lowercase()) { file_list.push(entry.path().display().to_string()); } } } Self { stop: Arc::new(Mutex::new(false)), nodes: Arc::new(Mutex::new(file_list)), index: 0, } } } impl Iterator for Source { type Item = String; fn next(&mut self) -> Option { if self.index < self.nodes.lock().unwrap().len() { let current_file = self.nodes.lock().unwrap()[self.index].clone(); self.index += 1; Some(current_file) } else { let current_file = self.nodes.lock().unwrap()[0].clone(); self.index = 1; Some(current_file) } } } async fn watch( receiver: Receiver, stop: Arc>, sources: Arc>>, ) { loop { if *stop.lock().unwrap() { println!("in stop: {}", stop.lock().unwrap()); break; } match receiver.recv() { Ok(event) => match event { Create(new_path) => { sources.lock().unwrap().push(new_path.display().to_string()); println!("Create new file: {:?}", new_path); } Remove(old_path) => { sources .lock() .unwrap() .retain(|x| x != &old_path.display().to_string()); println!("Remove file: {:?}", old_path); } Rename(old_path, new_path) => { let i = sources .lock() .unwrap() .iter() .position(|x| *x == old_path.display().to_string()) .unwrap(); sources.lock().unwrap()[i] = new_path.display().to_string(); println!("Rename file: {:?} to {:?}", old_path, new_path); } _ => (), }, Err(e) => { println!("{:?}", e); } } sleep(Duration::from_secs(1)); } } fn file_extension(filename: &Path) -> Option<&str> { filename.extension().and_then(OsStr::to_str) } fn main() { let path = "/home/jonathan/Videos/tv-media/ADtv/01 - Intro".to_string(); let sources = Source::new(path.clone()); let stop = Arc::clone(&sources.stop); let (sender, receiver) = channel(); let runtime = Builder::new_multi_thread() .worker_threads(1) .thread_name("file_watcher") .enable_all() .build() .expect("Creating Tokio runtime"); let mut watcher = watcher(sender, Duration::from_secs(2)).unwrap(); watcher .watch(path.clone(), RecursiveMode::Recursive) .unwrap(); runtime.spawn(watch( receiver, Arc::clone(&stop), Arc::clone(&sources.nodes), )); let mut count = 0; {for node in sources { println!("task: {:?}", node); sleep(Duration::from_secs(1)); count += 1; if count == 5 { break; } }} *stop.lock().unwrap() = true; watcher.unwatch(path).unwrap(); println!("after loop"); // watch.stop(); // watch.run = false; //runtime.block_on(watch.stop()); }