working folder watch

This commit is contained in:
jb-alvarado 2022-02-28 18:01:45 +01:00
parent 5fd028231c
commit fcd82f8021
3 changed files with 191 additions and 123 deletions

View File

@ -1,79 +1,130 @@
use notify::EventKind::{Create, Modify, Remove}; use notify::DebouncedEvent::{Create, Remove, Rename};
use notify::{RecommendedWatcher, RecursiveMode, Watcher, Event}; use notify::{watcher, RecursiveMode, Watcher};
use std::{ use std::{
ffi::OsStr,
path::Path, path::Path,
sync::mpsc::{channel, Receiver, Sender}, sync::{
mpsc::{channel, Receiver},
{Arc, Mutex},
},
thread::sleep, thread::sleep,
time::Duration, time::Duration,
}; };
use std::sync::{Arc, Mutex}; use walkdir::WalkDir;
//std::sync::mpsc::Receiver<notify::DebouncedEvent>
use tokio::runtime::Builder; use tokio::runtime::Builder;
// #[derive(Debug, Copy, Clone)] #[derive(Debug, Clone)]
// struct WatchMan { pub struct Source {
// stop: bool, stop: Arc<Mutex<bool>>,
// } nodes: Arc<Mutex<Vec<String>>>,
index: usize,
}
// impl WatchMan { impl Source {
// fn new() -> Self { pub fn new(path: String) -> Self {
// Self { let mut file_list = vec![];
// stop: false,
// }
// }
// async fn start(self, receiver: Receiver<notify::DebouncedEvent>) -> Result<(), String> { for entry in WalkDir::new(path.clone())
// loop { .into_iter()
// if self.stop { .filter_map(|e| e.ok())
// println!("break out"); {
// break if entry.path().is_file() {
// } let ext = file_extension(entry.path());
// match receiver.recv() { if ext.is_some()
// Ok(event) => match event { && ["mp4".to_string(), "mkv".to_string()]
// Create(new_path) => { .clone()
// println!("Create new file: {:?}", new_path); .contains(&ext.unwrap().to_lowercase())
// } {
// Remove(old_path) => { file_list.push(entry.path().display().to_string());
// println!("Remove file: {:?}", old_path);
// }
// Rename(old_path, new_path) => {
// println!("Rename file: {:?} to {:?}", old_path, new_path);
// }
// _ => (),
// },
// Err(e) => {
// println!("watch error: {:?}", e);
// println!("watch error: {:?}", self.stop);
// sleep(Duration::from_secs(1));
// // return Err(e.to_string())
// },
// }
// }
// Ok(())
// }
// fn stop(&mut self) {
// println!("stop watching");
// self.stop = true;
// }
// }
async fn watch(receiver: Receiver<Result<Event, notify::Error>>) {
for res in receiver {
match res {
Ok(event) => println!("changed: {:?}", event),
Err(e) => println!("watch error: {:?}", e),
} }
} }
} }
fn main() -> notify::Result<()> { Self {
stop: Arc::new(Mutex::new(false)),
nodes: Arc::new(Mutex::new(file_list)),
index: 0,
}
}
}
impl Iterator for Source {
type Item = String;
fn next(&mut self) -> Option<Self::Item> {
if self.index < self.nodes.lock().unwrap().len() {
let current_file = self.nodes.lock().unwrap()[self.index].clone();
self.index += 1;
Some(current_file)
} else {
let current_file = self.nodes.lock().unwrap()[0].clone();
self.index = 1;
Some(current_file)
}
}
}
async fn watch(
receiver: Receiver<notify::DebouncedEvent>,
stop: Arc<Mutex<bool>>,
sources: Arc<Mutex<Vec<String>>>,
) {
loop {
if *stop.lock().unwrap() {
println!("in stop: {}", stop.lock().unwrap());
break;
}
match receiver.recv() {
Ok(event) => match event {
Create(new_path) => {
sources.lock().unwrap().push(new_path.display().to_string());
println!("Create new file: {:?}", new_path);
}
Remove(old_path) => {
sources
.lock()
.unwrap()
.retain(|x| x != &old_path.display().to_string());
println!("Remove file: {:?}", old_path);
}
Rename(old_path, new_path) => {
let i = sources
.lock()
.unwrap()
.iter()
.position(|x| *x == old_path.display().to_string())
.unwrap();
sources.lock().unwrap()[i] = new_path.display().to_string();
println!("Rename file: {:?} to {:?}", old_path, new_path);
}
_ => (),
},
Err(e) => {
println!("{:?}", e);
}
}
sleep(Duration::from_secs(1));
}
}
fn file_extension(filename: &Path) -> Option<&str> {
filename.extension().and_then(OsStr::to_str)
}
fn main() {
let path = "/home/jonathan/Videos/tv-media/ADtv/01 - Intro".to_string();
let sources = Source::new(path.clone());
let stop = Arc::clone(&sources.stop);
let (sender, receiver) = channel();
let runtime = Builder::new_multi_thread() let runtime = Builder::new_multi_thread()
.worker_threads(1) .worker_threads(1)
.thread_name("file_watcher") .thread_name("file_watcher")
@ -81,35 +132,37 @@ fn main() -> notify::Result<()> {
.build() .build()
.expect("Creating Tokio runtime"); .expect("Creating Tokio runtime");
// let mut watch = WatchMan::new(); let mut watcher = watcher(sender, Duration::from_secs(2)).unwrap();
watcher
.watch(path.clone(), RecursiveMode::Recursive)
.unwrap();
let (sender, receiver) = channel(); runtime.spawn(watch(
receiver,
Arc::clone(&stop),
// let (tx, rx) = channel(); Arc::clone(&sources.nodes),
let mut watcher = RecommendedWatcher::new(sender)?; ));
watcher.watch(Path::new("/home/jb/Videos/"), RecursiveMode::Recursive).unwrap();
runtime.spawn(watch(receiver));
let mut count = 0; let mut count = 0;
loop {
println!("task: {}", count); {for node in sources {
println!("task: {:?}", node);
sleep(Duration::from_secs(1)); sleep(Duration::from_secs(1));
count += 1; count += 1;
if count == 15 { if count == 5 {
break; break;
} }
} }}
*stop.lock().unwrap() = true;
watcher.unwatch(path).unwrap();
println!("after loop"); println!("after loop");
watcher.unwatch(Path::new("/home/jb/Videos/")).unwrap();
// watch.stop(); // watch.stop();
// watch.run = false; // watch.run = false;
//runtime.block_on(watch.stop()); //runtime.block_on(watch.stop());
Ok(())
} }

View File

@ -1,14 +1,17 @@
use notify::{watcher, RecursiveMode, Watcher};
use std::{ use std::{
io::{prelude::*, Read}, io::{prelude::*, Read},
path::Path, path::Path,
process, process,
process::{Command, Stdio}, process::{Command, Stdio},
sync::Arc, sync::{
mpsc::channel,
{Arc, Mutex},
},
thread::sleep, thread::sleep,
time::Duration, time::Duration,
}; };
use tokio::sync::Mutex;
use tokio::runtime::Builder; use tokio::runtime::Builder;
use simplelog::*; use simplelog::*;
@ -16,6 +19,9 @@ use simplelog::*;
use crate::utils::{sec_to_time, watch_folder, Config, CurrentProgram, Media, Source}; use crate::utils::{sec_to_time, watch_folder, Config, CurrentProgram, Media, Source};
pub fn play(config: Config) { pub fn play(config: Config) {
let stop = Arc::new(Mutex::new(false));
let dec_pid: Arc<Mutex<u32>> = Arc::new(Mutex::new(0));
let get_source = match config.processing.mode.clone().as_str() { let get_source = match config.processing.mode.clone().as_str() {
"folder" => { "folder" => {
let path = config.storage.path.clone(); let path = config.storage.path.clone();
@ -31,9 +37,18 @@ pub fn play(config: Config) {
.unwrap(); .unwrap();
let folder_source = Source::new(config.clone()); let folder_source = Source::new(config.clone());
let mut folder_sync = Arc::new(Mutex::new(folder_source.clone())); let (sender, receiver) = channel();
runtime.spawn(watch_folder(&path, &mut folder_sync)); let mut watcher = watcher(sender, Duration::from_secs(2)).unwrap();
watcher
.watch(path.clone(), RecursiveMode::Recursive)
.unwrap();
runtime.spawn(watch_folder(
receiver,
Arc::clone(&stop),
Arc::clone(&folder_source.nodes),
));
Box::new(folder_source) as Box<dyn Iterator<Item = Media>> Box::new(folder_source) as Box<dyn Iterator<Item = Media>>
} }
@ -122,9 +137,13 @@ pub fn play(config: Config) {
Ok(proc) => proc, Ok(proc) => proc,
}; };
*dec_pid.lock().unwrap() = dec_proc.id();
let mut enc_writer = enc_proc.stdin.as_ref().unwrap(); let mut enc_writer = enc_proc.stdin.as_ref().unwrap();
let dec_reader = dec_proc.stdout.as_mut().unwrap(); let dec_reader = dec_proc.stdout.as_mut().unwrap();
debug!("Decoder PID: <yellow>{}</>", dec_pid.lock().unwrap());
loop { loop {
let dec_bytes_len = match dec_reader.read(&mut buffer[..]) { let dec_bytes_len = match dec_reader.read(&mut buffer[..]) {
Ok(length) => length, Ok(length) => length,
@ -145,6 +164,8 @@ pub fn play(config: Config) {
}; };
} }
*stop.lock().unwrap() = true;
sleep(Duration::from_secs(1)); sleep(Duration::from_secs(1));
match enc_proc.kill() { match enc_proc.kill() {

View File

@ -1,25 +1,25 @@
use notify::DebouncedEvent::{Create, Remove, Rename}; use notify::DebouncedEvent::{Create, Remove, Rename};
use notify::{watcher, RecursiveMode, Watcher};
use rand::{seq::SliceRandom, thread_rng}; use rand::{seq::SliceRandom, thread_rng};
use simplelog::*;
use std::{ use std::{
ffi::OsStr, ffi::OsStr,
path::Path, path::Path,
sync::{mpsc::channel, Arc}, sync::{
mpsc::Receiver,
{Arc, Mutex},
},
thread::sleep,
time::Duration, time::Duration,
}; };
use tokio::sync::Mutex;
use walkdir::WalkDir; use walkdir::WalkDir;
use simplelog::*;
use crate::utils::{Config, Media}; use crate::utils::{Config, Media};
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct Source { pub struct Source {
config: Config, config: Config,
nodes: Vec<String>, pub nodes: Arc<Mutex<Vec<String>>>,
index: usize, index: usize,
} }
@ -56,31 +56,18 @@ impl Source {
Self { Self {
config: config, config: config,
nodes: file_list, nodes: Arc::new(Mutex::new(file_list)),
index: 0, index: 0,
} }
} }
fn push(&mut self, file: String) {
self.nodes.push(file)
}
fn rm(&mut self, file: String) {
self.nodes.retain(|x| x != &file);
}
fn mv(&mut self, old_file: String, new_file: String) {
let i = self.nodes.iter().position(|x| *x == old_file).unwrap();
self.nodes[i] = new_file;
}
fn shuffle(&mut self) { fn shuffle(&mut self) {
let mut rng = thread_rng(); let mut rng = thread_rng();
self.nodes.shuffle(&mut rng); self.nodes.lock().unwrap().shuffle(&mut rng);
} }
fn sort(&mut self) { fn sort(&mut self) {
self.nodes.sort(); self.nodes.lock().unwrap().sort();
} }
} }
@ -88,8 +75,8 @@ impl Iterator for Source {
type Item = Media; type Item = Media;
fn next(&mut self) -> Option<Self::Item> { fn next(&mut self) -> Option<Self::Item> {
if self.index < self.nodes.len() { if self.index < self.nodes.lock().unwrap().len() {
let current_file = self.nodes[self.index].clone(); let current_file = self.nodes.lock().unwrap()[self.index].clone();
let mut media = Media::new(self.index, current_file); let mut media = Media::new(self.index, current_file);
media.add_probe(); media.add_probe();
media.add_filter(&self.config, false, false); media.add_filter(&self.config, false, false);
@ -104,7 +91,7 @@ impl Iterator for Source {
self.sort(); self.sort();
} }
let current_file = self.nodes[0].clone(); let current_file = self.nodes.lock().unwrap()[0].clone();
let mut media = Media::new(self.index, current_file); let mut media = Media::new(self.index, current_file);
media.add_probe(); media.add_probe();
media.add_filter(&self.config, false, false); media.add_filter(&self.config, false, false);
@ -119,39 +106,46 @@ fn file_extension(filename: &Path) -> Option<&str> {
filename.extension().and_then(OsStr::to_str) filename.extension().and_then(OsStr::to_str)
} }
pub async fn watch_folder(path: &String, source: &mut Arc<Mutex<Source>>) { pub async fn watch_folder(
// let mut source = Source::new(); receiver: Receiver<notify::DebouncedEvent>,
let (sender, receiver) = channel(); stop: Arc<Mutex<bool>>,
sources: Arc<Mutex<Vec<String>>>,
let mut watcher = watcher(sender, Duration::from_secs(2)).unwrap(); ) {
watcher.watch(path, RecursiveMode::Recursive).unwrap();
println!("watch path: '{}'", path);
loop { loop {
if *stop.lock().unwrap() {
break;
}
match receiver.recv() { match receiver.recv() {
Ok(event) => match event { Ok(event) => match event {
Create(new_path) => { Create(new_path) => {
sources.lock().unwrap().push(new_path.display().to_string());
println!("Create new file: {:?}", new_path); println!("Create new file: {:?}", new_path);
let mut lock = source.lock().await;
lock.push(new_path.display().to_string());
} }
Remove(old_path) => { Remove(old_path) => {
sources
.lock()
.unwrap()
.retain(|x| x != &old_path.display().to_string());
println!("Remove file: {:?}", old_path); println!("Remove file: {:?}", old_path);
let mut lock = source.lock().await;
lock.rm(old_path.display().to_string());
} }
Rename(old_path, new_path) => { Rename(old_path, new_path) => {
let i = sources
.lock()
.unwrap()
.iter()
.position(|x| *x == old_path.display().to_string())
.unwrap();
sources.lock().unwrap()[i] = new_path.display().to_string();
println!("Rename file: {:?} to {:?}", old_path, new_path); println!("Rename file: {:?} to {:?}", old_path, new_path);
let mut lock = source.lock().await;
lock.mv(
old_path.display().to_string(),
new_path.display().to_string(),
);
} }
_ => (), _ => (),
}, },
Err(e) => println!("watch error: {:?}", e), Err(e) => {
println!("{:?}", e);
} }
} }
sleep(Duration::from_secs(2));
}
} }