From fcd82f8021434b636e943f38ed50327011648035 Mon Sep 17 00:00:00 2001 From: jb-alvarado Date: Mon, 28 Feb 2022 18:01:45 +0100 Subject: [PATCH] working folder watch --- examples/watch.rs | 201 ++++++++++++++++++++++++++---------------- src/output/desktop.rs | 29 +++++- src/utils/folder.rs | 84 ++++++++---------- 3 files changed, 191 insertions(+), 123 deletions(-) diff --git a/examples/watch.rs b/examples/watch.rs index d8e44438..70e3e764 100644 --- a/examples/watch.rs +++ b/examples/watch.rs @@ -1,79 +1,130 @@ -use notify::EventKind::{Create, Modify, Remove}; -use notify::{RecommendedWatcher, RecursiveMode, Watcher, Event}; +use notify::DebouncedEvent::{Create, Remove, Rename}; +use notify::{watcher, RecursiveMode, Watcher}; use std::{ + ffi::OsStr, path::Path, - sync::mpsc::{channel, Receiver, Sender}, + sync::{ + mpsc::{channel, Receiver}, + {Arc, Mutex}, + }, thread::sleep, time::Duration, }; -use std::sync::{Arc, Mutex}; - -//std::sync::mpsc::Receiver +use walkdir::WalkDir; use tokio::runtime::Builder; -// #[derive(Debug, Copy, Clone)] -// struct WatchMan { -// stop: bool, -// } +#[derive(Debug, Clone)] +pub struct Source { + stop: Arc>, + nodes: Arc>>, + index: usize, +} -// impl WatchMan { -// fn new() -> Self { -// Self { -// stop: false, -// } -// } +impl Source { + pub fn new(path: String) -> Self { + let mut file_list = vec![]; -// async fn start(self, receiver: Receiver) -> Result<(), String> { -// loop { -// if self.stop { -// println!("break out"); -// break -// } + for entry in WalkDir::new(path.clone()) + .into_iter() + .filter_map(|e| e.ok()) + { + if entry.path().is_file() { + let ext = file_extension(entry.path()); -// match receiver.recv() { -// Ok(event) => match event { -// Create(new_path) => { -// println!("Create new file: {:?}", new_path); -// } -// Remove(old_path) => { -// println!("Remove file: {:?}", old_path); -// } -// Rename(old_path, new_path) => { -// println!("Rename file: {:?} to {:?}", old_path, new_path); -// } -// _ => (), -// }, -// Err(e) => { -// println!("watch error: {:?}", e); -// println!("watch error: {:?}", self.stop); -// sleep(Duration::from_secs(1)); -// // return Err(e.to_string()) -// }, -// } -// } + if ext.is_some() + && ["mp4".to_string(), "mkv".to_string()] + .clone() + .contains(&ext.unwrap().to_lowercase()) + { + file_list.push(entry.path().display().to_string()); + } + } + } -// Ok(()) -// } - -// fn stop(&mut self) { -// println!("stop watching"); -// self.stop = true; - -// } -// } - -async fn watch(receiver: Receiver>) { - for res in receiver { - match res { - Ok(event) => println!("changed: {:?}", event), - Err(e) => println!("watch error: {:?}", e), + Self { + stop: Arc::new(Mutex::new(false)), + nodes: Arc::new(Mutex::new(file_list)), + index: 0, } } } -fn main() -> notify::Result<()> { +impl Iterator for Source { + type Item = String; + + fn next(&mut self) -> Option { + if self.index < self.nodes.lock().unwrap().len() { + let current_file = self.nodes.lock().unwrap()[self.index].clone(); + self.index += 1; + + Some(current_file) + } else { + let current_file = self.nodes.lock().unwrap()[0].clone(); + + self.index = 1; + + Some(current_file) + } + } +} + +async fn watch( + receiver: Receiver, + stop: Arc>, + sources: Arc>>, +) { + loop { + if *stop.lock().unwrap() { + println!("in stop: {}", stop.lock().unwrap()); + break; + } + + match receiver.recv() { + Ok(event) => match event { + Create(new_path) => { + sources.lock().unwrap().push(new_path.display().to_string()); + println!("Create new file: {:?}", new_path); + } + Remove(old_path) => { + sources + .lock() + .unwrap() + .retain(|x| x != &old_path.display().to_string()); + println!("Remove file: {:?}", old_path); + } + Rename(old_path, new_path) => { + let i = sources + .lock() + .unwrap() + .iter() + .position(|x| *x == old_path.display().to_string()) + .unwrap(); + sources.lock().unwrap()[i] = new_path.display().to_string(); + println!("Rename file: {:?} to {:?}", old_path, new_path); + } + _ => (), + }, + Err(e) => { + println!("{:?}", e); + } + } + + sleep(Duration::from_secs(1)); + } +} + +fn file_extension(filename: &Path) -> Option<&str> { + filename.extension().and_then(OsStr::to_str) +} + +fn main() { + let path = "/home/jonathan/Videos/tv-media/ADtv/01 - Intro".to_string(); + let sources = Source::new(path.clone()); + let stop = Arc::clone(&sources.stop); + + let (sender, receiver) = channel(); let runtime = Builder::new_multi_thread() .worker_threads(1) .thread_name("file_watcher") @@ -81,35 +132,37 @@ fn main() -> notify::Result<()> { .build() .expect("Creating Tokio runtime"); - // let mut watch = WatchMan::new(); + let mut watcher = watcher(sender, Duration::from_secs(2)).unwrap(); + watcher + .watch(path.clone(), RecursiveMode::Recursive) + .unwrap(); - let (sender, receiver) = channel(); - - - // let (tx, rx) = channel(); - let mut watcher = RecommendedWatcher::new(sender)?; - watcher.watch(Path::new("/home/jb/Videos/"), RecursiveMode::Recursive).unwrap(); - - runtime.spawn(watch(receiver)); + runtime.spawn(watch( + receiver, + Arc::clone(&stop), + Arc::clone(&sources.nodes), + )); let mut count = 0; - loop { - println!("task: {}", count); + + {for node in sources { + println!("task: {:?}", node); sleep(Duration::from_secs(1)); count += 1; - if count == 15 { + if count == 5 { break; } - } + }} + + *stop.lock().unwrap() = true; + watcher.unwatch(path).unwrap(); println!("after loop"); - watcher.unwatch(Path::new("/home/jb/Videos/")).unwrap(); // watch.stop(); // watch.run = false; //runtime.block_on(watch.stop()); - Ok(()) } diff --git a/src/output/desktop.rs b/src/output/desktop.rs index 4291c3bb..aaefabd7 100644 --- a/src/output/desktop.rs +++ b/src/output/desktop.rs @@ -1,14 +1,17 @@ +use notify::{watcher, RecursiveMode, Watcher}; use std::{ io::{prelude::*, Read}, path::Path, process, process::{Command, Stdio}, - sync::Arc, + sync::{ + mpsc::channel, + {Arc, Mutex}, + }, thread::sleep, time::Duration, }; -use tokio::sync::Mutex; use tokio::runtime::Builder; use simplelog::*; @@ -16,6 +19,9 @@ use simplelog::*; use crate::utils::{sec_to_time, watch_folder, Config, CurrentProgram, Media, Source}; pub fn play(config: Config) { + let stop = Arc::new(Mutex::new(false)); + let dec_pid: Arc> = Arc::new(Mutex::new(0)); + let get_source = match config.processing.mode.clone().as_str() { "folder" => { let path = config.storage.path.clone(); @@ -31,9 +37,18 @@ pub fn play(config: Config) { .unwrap(); let folder_source = Source::new(config.clone()); - let mut folder_sync = Arc::new(Mutex::new(folder_source.clone())); + let (sender, receiver) = channel(); - runtime.spawn(watch_folder(&path, &mut folder_sync)); + let mut watcher = watcher(sender, Duration::from_secs(2)).unwrap(); + watcher + .watch(path.clone(), RecursiveMode::Recursive) + .unwrap(); + + runtime.spawn(watch_folder( + receiver, + Arc::clone(&stop), + Arc::clone(&folder_source.nodes), + )); Box::new(folder_source) as Box> } @@ -122,9 +137,13 @@ pub fn play(config: Config) { Ok(proc) => proc, }; + *dec_pid.lock().unwrap() = dec_proc.id(); + let mut enc_writer = enc_proc.stdin.as_ref().unwrap(); let dec_reader = dec_proc.stdout.as_mut().unwrap(); + debug!("Decoder PID: {}", dec_pid.lock().unwrap()); + loop { let dec_bytes_len = match dec_reader.read(&mut buffer[..]) { Ok(length) => length, @@ -145,6 +164,8 @@ pub fn play(config: Config) { }; } + *stop.lock().unwrap() = true; + sleep(Duration::from_secs(1)); match enc_proc.kill() { diff --git a/src/utils/folder.rs b/src/utils/folder.rs index 9d22e009..e0f0fbb2 100644 --- a/src/utils/folder.rs +++ b/src/utils/folder.rs @@ -1,25 +1,25 @@ use notify::DebouncedEvent::{Create, Remove, Rename}; -use notify::{watcher, RecursiveMode, Watcher}; use rand::{seq::SliceRandom, thread_rng}; +use simplelog::*; use std::{ ffi::OsStr, path::Path, - sync::{mpsc::channel, Arc}, + sync::{ + mpsc::Receiver, + {Arc, Mutex}, + }, + thread::sleep, time::Duration, }; -use tokio::sync::Mutex; - use walkdir::WalkDir; -use simplelog::*; - use crate::utils::{Config, Media}; #[derive(Debug, Clone)] pub struct Source { config: Config, - nodes: Vec, + pub nodes: Arc>>, index: usize, } @@ -56,31 +56,18 @@ impl Source { Self { config: config, - nodes: file_list, + nodes: Arc::new(Mutex::new(file_list)), index: 0, } } - fn push(&mut self, file: String) { - self.nodes.push(file) - } - - fn rm(&mut self, file: String) { - self.nodes.retain(|x| x != &file); - } - - fn mv(&mut self, old_file: String, new_file: String) { - let i = self.nodes.iter().position(|x| *x == old_file).unwrap(); - self.nodes[i] = new_file; - } - fn shuffle(&mut self) { let mut rng = thread_rng(); - self.nodes.shuffle(&mut rng); + self.nodes.lock().unwrap().shuffle(&mut rng); } fn sort(&mut self) { - self.nodes.sort(); + self.nodes.lock().unwrap().sort(); } } @@ -88,8 +75,8 @@ impl Iterator for Source { type Item = Media; fn next(&mut self) -> Option { - if self.index < self.nodes.len() { - let current_file = self.nodes[self.index].clone(); + if self.index < self.nodes.lock().unwrap().len() { + let current_file = self.nodes.lock().unwrap()[self.index].clone(); let mut media = Media::new(self.index, current_file); media.add_probe(); media.add_filter(&self.config, false, false); @@ -104,7 +91,7 @@ impl Iterator for Source { self.sort(); } - let current_file = self.nodes[0].clone(); + let current_file = self.nodes.lock().unwrap()[0].clone(); let mut media = Media::new(self.index, current_file); media.add_probe(); media.add_filter(&self.config, false, false); @@ -119,39 +106,46 @@ fn file_extension(filename: &Path) -> Option<&str> { filename.extension().and_then(OsStr::to_str) } -pub async fn watch_folder(path: &String, source: &mut Arc>) { - // let mut source = Source::new(); - let (sender, receiver) = channel(); - - let mut watcher = watcher(sender, Duration::from_secs(2)).unwrap(); - watcher.watch(path, RecursiveMode::Recursive).unwrap(); - - println!("watch path: '{}'", path); - +pub async fn watch_folder( + receiver: Receiver, + stop: Arc>, + sources: Arc>>, +) { loop { + if *stop.lock().unwrap() { + break; + } + match receiver.recv() { Ok(event) => match event { Create(new_path) => { + sources.lock().unwrap().push(new_path.display().to_string()); println!("Create new file: {:?}", new_path); - let mut lock = source.lock().await; - lock.push(new_path.display().to_string()); } Remove(old_path) => { + sources + .lock() + .unwrap() + .retain(|x| x != &old_path.display().to_string()); println!("Remove file: {:?}", old_path); - let mut lock = source.lock().await; - lock.rm(old_path.display().to_string()); } Rename(old_path, new_path) => { + let i = sources + .lock() + .unwrap() + .iter() + .position(|x| *x == old_path.display().to_string()) + .unwrap(); + sources.lock().unwrap()[i] = new_path.display().to_string(); println!("Rename file: {:?} to {:?}", old_path, new_path); - let mut lock = source.lock().await; - lock.mv( - old_path.display().to_string(), - new_path.display().to_string(), - ); } _ => (), }, - Err(e) => println!("watch error: {:?}", e), + Err(e) => { + println!("{:?}", e); + } } + + sleep(Duration::from_secs(2)); } }