From 4bfecda7575b56c690253295abd119bbcba5c203 Mon Sep 17 00:00:00 2001 From: jb-alvarado Date: Mon, 21 Mar 2022 15:10:33 +0100 Subject: [PATCH] cleanup --- examples/custom_iter.rs | 57 ------- examples/delta_check.rs | 33 ---- examples/global_config.rs | 52 ------- examples/logging.rs | 121 -------------- examples/pipe_ffmpeg.rs | 320 -------------------------------------- examples/pipe_ffmpeg2.rs | 262 ------------------------------- examples/watch.rs | 141 ----------------- examples/watch_simple.rs | 79 ---------- 8 files changed, 1065 deletions(-) delete mode 100644 examples/custom_iter.rs delete mode 100644 examples/delta_check.rs delete mode 100644 examples/global_config.rs delete mode 100644 examples/logging.rs delete mode 100644 examples/pipe_ffmpeg.rs delete mode 100644 examples/pipe_ffmpeg2.rs delete mode 100644 examples/watch.rs delete mode 100644 examples/watch_simple.rs diff --git a/examples/custom_iter.rs b/examples/custom_iter.rs deleted file mode 100644 index cc804640..00000000 --- a/examples/custom_iter.rs +++ /dev/null @@ -1,57 +0,0 @@ -use std::{ - thread::sleep, - time::Duration, -}; - -struct List { - arr: Vec, - msg: String, - i: usize, -} - -impl List { - fn new() -> Self { - Self { - arr: (0..10).collect(), - msg: "fist init".to_string(), - i: 0, - } - } - - fn fill(&mut self, val: String) { - println!("{val}"); - self.msg = "new fill".to_string(); - } -} - -impl Iterator for List { - type Item = u8; - - fn next(&mut self) -> Option { - if self.i == 0 { - println!("{}", self.msg); - } - if self.i < self.arr.len() { - let current = self.arr[self.i]; - self.i += 1; - - Some(current) - } else { - self.i = 1; - let current = self.arr[0]; - self.fill("pass to function".to_string()); - println!("{}", self.msg); - - Some(current) - } - } -} - -fn main() { - let list = List::new(); - - for i in list { - println!("{i}"); - sleep(Duration::from_millis(300)); - } -} diff --git a/examples/delta_check.rs b/examples/delta_check.rs deleted file mode 100644 index c5988c31..00000000 --- a/examples/delta_check.rs +++ /dev/null @@ -1,33 +0,0 @@ -use chrono::prelude::*; -use std::{time, time::UNIX_EPOCH}; - -pub fn get_sec() -> f64 { - let local: DateTime = Local::now(); - - (local.hour() * 3600 + local.minute() * 60 + local.second()) as f64 - + (local.nanosecond() as f64 / 1000000000.0) -} - -pub fn sec_to_time(sec: f64) -> String { - let d = UNIX_EPOCH + time::Duration::from_secs(sec as u64); - // Create DateTime from SystemTime - let date_time = DateTime::::from(d); - - date_time.format("%H:%M:%S").to_string() -} - -fn main() { - let current_time = get_sec(); - let start = 21600.0; - let target_length = 86400.0; - let total_delta; - - if current_time < start { - total_delta = start - current_time; - } else { - total_delta = target_length + start - current_time; - } - - println!("Total Seconds: {total_delta}"); - println!("Total Time: {}", sec_to_time(total_delta)); -} diff --git a/examples/global_config.rs b/examples/global_config.rs deleted file mode 100644 index d6c0724b..00000000 --- a/examples/global_config.rs +++ /dev/null @@ -1,52 +0,0 @@ -use serde::{Deserialize, Serialize}; -use serde_yaml::{self}; -use std::{fs::File}; - -use once_cell::sync::OnceCell; - -#[derive(Debug, Serialize, Deserialize, Clone)] -pub struct Config { - pub general: General, - pub mail: Mail, -} - -#[derive(Debug, Serialize, Deserialize, Clone)] -pub struct General { - pub stop_threshold: f64, -} - -#[derive(Debug, Serialize, Deserialize, Clone)] -pub struct Mail { - pub subject: String, - pub smtp_server: String, - pub starttls: bool, - pub sender_addr: String, - pub sender_pass: String, - pub recipient: String, - pub mail_level: String, -} - -static INSTANCE: OnceCell = OnceCell::new(); - -impl Config { - fn new() -> Self { - let config_path = "/etc/ffplayout/ffplayout.yml".to_string(); - let f = File::open(&config_path).unwrap(); - - let config: Config = serde_yaml::from_reader(f).expect("Could not read config file."); - - config - } - - pub fn init() -> &'static Config { - INSTANCE.get().expect("Config is not initialized") - } -} - -pub fn main() { - let config = Config::new(); - INSTANCE.set(config).unwrap(); - let config = Config::init(); - - println!("{:#?}", config); -} diff --git a/examples/logging.rs b/examples/logging.rs deleted file mode 100644 index a7d817f4..00000000 --- a/examples/logging.rs +++ /dev/null @@ -1,121 +0,0 @@ -extern crate log; -extern crate simplelog; - -use std::{thread::sleep, time::Duration}; - -use simplelog::*; - -use file_rotate::{compression::Compression, suffix::AppendCount, ContentLimit, FileRotate}; -use log::{Level, LevelFilter, Log, Metadata, Record}; - -pub struct LogMailer { - level: LevelFilter, - config: Config, -} - -impl LogMailer { - pub fn new(log_level: LevelFilter, config: Config) -> Box { - Box::new(LogMailer { - level: log_level, - config, - }) - } -} - -impl Log for LogMailer { - fn enabled(&self, metadata: &Metadata<'_>) -> bool { - metadata.level() <= self.level - } - - fn log(&self, record: &Record<'_>) { - if self.enabled(record.metadata()) { - match record.level() { - Level::Error => { - println!("Send Error Mail: {:?}", record.args()) - } - Level::Warn => { - println!("Send Warn Mail: {:?}", record.args()) - } - Level::Info => { - println!("Send Info Mail: {:?}", record.args()) - } - _ => (), - } - } - } - - fn flush(&self) {} -} - -impl SharedLogger for LogMailer { - fn level(&self) -> LevelFilter { - self.level - } - - fn config(&self) -> Option<&Config> { - Some(&self.config) - } - - fn as_log(self: Box) -> Box { - Box::new(*self) - } -} - -fn main() { - let log = || { - FileRotate::new( - "logs/ffplayout.log", - AppendCount::new(7), - ContentLimit::Lines(1000), - Compression::None, - ) - }; - - let def_config = simplelog::ConfigBuilder::new() - .set_target_level(LevelFilter::Off) - .set_thread_level(LevelFilter::Off) - .set_level_padding(LevelPadding::Left) - .set_time_to_local(true) - .clone(); - - let term_config = def_config - .clone() - .set_level_color(Level::Debug, Some(Color::Ansi256(12))) - .set_level_color(Level::Info, Some(Color::Ansi256(10))) - .set_level_color(Level::Warn, Some(Color::Ansi256(208))) - .set_level_color(Level::Error, Some(Color::Ansi256(9))) - .set_time_format_str("\x1b[30;1m[%Y-%m-%d %H:%M:%S%.3f]\x1b[0m") - .build(); - - let file_config = def_config - .clone() - .set_time_format_str("[%Y-%m-%d %H:%M:%S%.3f]") - .build(); - - let mail_config = def_config - .clone() - .set_time_format_str("[%Y-%m-%d %H:%M:%S%.3f]") - .build(); - - CombinedLogger::init(vec![ - TermLogger::new( - LevelFilter::Debug, - term_config, - TerminalMode::Stderr, - ColorChoice::Auto, - ), - WriteLogger::new(LevelFilter::Debug, file_config, log()), - LogMailer::new(LevelFilter::Info, mail_config), - ]) - .unwrap(); - - debug!("this is a debug message"); - info!("this is a info message"); - warn!("this is a warning message"); - error!("this is a error message"); - - for idx in 1..10 { - info!("{idx}"); - sleep(Duration::from_secs(2)); - } -} diff --git a/examples/pipe_ffmpeg.rs b/examples/pipe_ffmpeg.rs deleted file mode 100644 index 06183501..00000000 --- a/examples/pipe_ffmpeg.rs +++ /dev/null @@ -1,320 +0,0 @@ -use std::{ - io::{prelude::*, BufReader, Error, Read}, - process::{ChildStderr, Command, Stdio}, - sync::{ - mpsc::{channel, Receiver, Sender}, - Arc, Mutex, - }, - thread::sleep, - time::Duration, -}; - -use process_control::{ChildExt, Terminator}; -use tokio::runtime::{Handle, Runtime}; - -async fn ingest_server( - dec_setting: Vec<&str>, - ingest_sender: Sender<[u8; 65424]>, - proc_terminator: Arc>>, - is_terminated: Arc>, - rt_handle: Handle, -) -> Result<(), Error> { - let mut buffer: [u8; 65424] = [0; 65424]; - let filter = "[0:v]fps=25,scale=1024:576,setdar=dar=1.778[vout1]"; - let mut filter_list = vec!["-filter_complex", &filter, "-map", "[vout1]", "-map", "0:a"]; - let mut server_cmd = vec!["-hide_banner", "-nostats", "-v", "level+error"]; - let mut stream_input = vec![ - "-f", - "live_flv", - "-listen", - "1", - "-i", - "rtmp://localhost:1936/live/stream", - ]; - - server_cmd.append(&mut stream_input); - server_cmd.append(&mut filter_list); - server_cmd.append(&mut dec_setting.clone()); - - loop { - if *is_terminated.lock().unwrap() { - break; - } - - let mut server_proc = match Command::new("ffmpeg") - .args(server_cmd.clone()) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()) - .spawn() - { - Err(e) => { - panic!("couldn't spawn ingest server: {}", e) - } - Ok(proc) => proc, - }; - - let serv_terminator = server_proc.terminator()?; - *proc_terminator.lock().unwrap() = Some(serv_terminator); - - rt_handle.spawn(stderr_reader( - server_proc.stderr.take().unwrap(), - "Server".to_string(), - proc_terminator.clone(), - is_terminated.clone(), - - )); - - let ingest_reader = server_proc.stdout.as_mut().unwrap(); - - loop { - match ingest_reader.read_exact(&mut buffer[..]) { - Ok(length) => length, - Err(_) => break, - }; - - if let Err(e) = ingest_sender.send(buffer) { - println!("Ingest server error: {:?}", e); - break; - } - } - - sleep(Duration::from_secs(1)); - - if let Err(e) = server_proc.wait() { - panic!("Server error: {:?}", e) - }; - } - - Ok(()) -} - -pub async fn stderr_reader( - std_errors: ChildStderr, - suffix: String, - server_term: Arc>>, - is_terminated: Arc>, -) -> Result<(), Error> { - // read ffmpeg stderr decoder and encoder instance - // and log the output - - fn format_line(line: String, level: String) -> String { - line.replace(&format!("[{}] ", level), "") - } - - let buffer = BufReader::new(std_errors); - - for line in buffer.lines() { - let line = line?; - - if line.contains("[info]") { - println!("[{suffix}] {}", format_line(line, "info".to_string())) - } else if line.contains("[warning]") { - println!("[{suffix}] {}", format_line(line, "warning".to_string())) - } else { - if suffix != "server" && !line.contains("Input/output error") { - println!( - "[{suffix}] {}", - format_line(line.clone(), "level+error".to_string()) - ); - } - - if line.contains("Error closing file pipe:: Broken pipe") { - *is_terminated.lock().unwrap() = true; - - match &*server_term.lock().unwrap() { - Some(serv) => unsafe { - if let Ok(_) = serv.terminate() { - println!("Terminate server done"); - } - }, - None => (), - } - } - } - } - - Ok(()) -} - -fn main() { - let decoder_term: Arc>> = Arc::new(Mutex::new(None)); - let player_term: Arc>> = Arc::new(Mutex::new(None)); - let server_term: Arc>> = Arc::new(Mutex::new(None)); - let is_terminated: Arc> = Arc::new(Mutex::new(false)); - - let runtime = Runtime::new().unwrap(); - let rt_handle = runtime.handle(); - - let dec_setting: Vec<&str> = vec![ - "-pix_fmt", - "yuv420p", - "-c:v", - "mpeg2video", - "-g", - "1", - "-b:v", - "50000k", - "-minrate", - "50000k", - "-maxrate", - "50000k", - "-bufsize", - "25000k", - "-c:a", - "s302m", - "-strict", - "-2", - "-ar", - "48000", - "-ac", - "2", - "-f", - "mpegts", - "-", - ]; - - let mut player_proc = match Command::new("ffplay") - .args([ - "-v", - "level+error", - "-hide_banner", - "-nostats", - "-i", - "pipe:0", - ]) - .stdin(Stdio::piped()) - .stderr(Stdio::piped()) - .spawn() - { - Err(e) => panic!("couldn't spawn ffplay: {}", e), - Ok(proc) => proc, - }; - - rt_handle.spawn(stderr_reader( - player_proc.stderr.take().unwrap(), - "Player".to_string(), - server_term.clone(), - is_terminated.clone(), - )); - - let player_terminator = match player_proc.terminator() { - Ok(proc) => Some(proc), - Err(_) => None, - }; - *player_term.lock().unwrap() = player_terminator; - - let (ingest_sender, ingest_receiver): (Sender<[u8; 65424]>, Receiver<[u8; 65424]>) = channel(); - - rt_handle.spawn(ingest_server( - dec_setting.clone(), - ingest_sender, - server_term.clone(), - is_terminated.clone(), - rt_handle.clone(), - )); - - let mut buffer: [u8; 65424] = [0; 65424]; - let mut dec_cmd = vec![ - "-v", - "level+error", - "-hide_banner", - "-nostats", - "-f", - "lavfi", - "-i", - "testsrc=duration=20:size=1024x576:rate=25", - "-f", - "lavfi", - "-i", - "anoisesrc=d=20:c=pink:r=48000:a=0.5", - ]; - - dec_cmd.append(&mut dec_setting.clone()); - - let mut dec_proc = match Command::new("ffmpeg") - .args(dec_cmd) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()) - .spawn() - { - Err(e) => panic!("couldn't spawn ffmpeg: {}", e), - Ok(proc) => proc, - }; - - rt_handle.spawn(stderr_reader( - dec_proc.stderr.take().unwrap(), - "Decoder".to_string(), - server_term.clone(), - is_terminated.clone(), - )); - - let dec_terminator = match dec_proc.terminator() { - Ok(proc) => Some(proc), - Err(_) => None, - }; - *decoder_term.lock().unwrap() = dec_terminator; - - let mut player_writer = player_proc.stdin.as_ref().unwrap(); - let dec_reader = dec_proc.stdout.as_mut().unwrap(); - - loop { - let bytes_len = match dec_reader.read(&mut buffer[..]) { - Ok(length) => length, - Err(e) => panic!("Reading error from decoder: {:?}", e), - }; - - if let Ok(receive) = ingest_receiver.try_recv() { - if let Err(e) = player_writer.write_all(&receive) { - panic!("Err: {:?}", e) - }; - continue; - } - - if let Err(e) = player_writer.write(&buffer[..bytes_len]) { - panic!("Err: {:?}", e) - }; - - if bytes_len == 0 { - break; - } - } - - *is_terminated.lock().unwrap() = true; - - sleep(Duration::from_secs(1)); - - println!("Terminate decoder..."); - - match &*decoder_term.lock().unwrap() { - Some(dec) => unsafe { - if let Ok(_) = dec.terminate() { - println!("Terminate decoder done"); - } - }, - None => (), - } - - println!("Terminate encoder..."); - - match &*player_term.lock().unwrap() { - Some(enc) => unsafe { - if let Ok(_) = enc.terminate() { - println!("Terminate encoder done"); - } - }, - None => (), - } - - println!("Terminate server..."); - - match &*server_term.lock().unwrap() { - Some(serv) => unsafe { - if let Ok(_) = serv.terminate() { - println!("Terminate server done"); - } - }, - None => (), - } - - println!("Terminate done..."); -} diff --git a/examples/pipe_ffmpeg2.rs b/examples/pipe_ffmpeg2.rs deleted file mode 100644 index 3f6f3c02..00000000 --- a/examples/pipe_ffmpeg2.rs +++ /dev/null @@ -1,262 +0,0 @@ -use std::{ - io::{prelude::*, BufReader, Error, Read}, - process::{Command, Stdio}, - sync::{ - mpsc::{sync_channel, Receiver, SyncSender}, - Arc, Mutex, - }, - thread::sleep, - time::Duration, -}; - -use process_control::{ChildExt, Terminator}; -use tokio::runtime::Runtime; - -async fn ingest_server( - dec_setting: Vec<&str>, - ingest_sender: SyncSender<(usize, [u8; 65088])>, - proc_terminator: Arc>>, - is_terminated: Arc>, - server_is_running: Arc>, -) -> Result<(), Error> { - let mut buffer: [u8; 65088] = [0; 65088]; - let filter = "[0:v]fps=25,scale=1024:576,setdar=dar=1.778[vout1]"; - let mut filter_list = vec!["-filter_complex", &filter, "-map", "[vout1]", "-map", "0:a"]; - let mut server_cmd = vec!["-hide_banner", "-nostats", "-v", "error"]; - - let mut stream_input = vec![ - "-f", - "live_flv", - "-listen", - "1", - "-i", - "rtmp://localhost:1936/live/stream", - ]; - - server_cmd.append(&mut stream_input); - server_cmd.append(&mut filter_list); - server_cmd.append(&mut dec_setting.clone()); - - let mut is_running; - - loop { - if *is_terminated.lock().unwrap() { - break; - } - - let mut server_proc = match Command::new("ffmpeg") - .args(server_cmd.clone()) - .stdout(Stdio::piped()) - .spawn() - { - Err(e) => { - panic!("couldn't spawn ingest server: {}", e) - } - Ok(proc) => proc, - }; - - let serv_terminator = server_proc.terminator()?; - *proc_terminator.lock().unwrap() = Some(serv_terminator); - let ingest_reader = server_proc.stdout.as_mut().unwrap(); - is_running = false; - - loop { - let bytes_len = match ingest_reader.read(&mut buffer[..]) { - Ok(length) => length, - Err(e) => { - println!("Reading error from ingest server: {:?}", e); - - break; - } - }; - - if !is_running { - *server_is_running.lock().unwrap() = true; - is_running = true; - } - - if bytes_len > 0 { - if let Err(e) = ingest_sender.send((bytes_len, buffer)) { - println!("Ingest server write error: {:?}", e); - - *is_terminated.lock().unwrap() = true; - break; - } - } else { - break; - } - } - - *server_is_running.lock().unwrap() = false; - - sleep(Duration::from_secs(1)); - - if let Err(e) = server_proc.kill() { - print!("Ingest server {:?}", e) - }; - - if let Err(e) = server_proc.wait() { - panic!("Decoder error: {:?}", e) - }; - } - - Ok(()) -} -fn main() { - let server_term: Arc>> = Arc::new(Mutex::new(None)); - let is_terminated: Arc> = Arc::new(Mutex::new(false)); - let server_is_running: Arc> = Arc::new(Mutex::new(false)); - - let dec_setting: Vec<&str> = vec![ - "-pix_fmt", - "yuv420p", - "-c:v", - "mpeg2video", - "-g", - "1", - "-b:v", - "50000k", - "-minrate", - "50000k", - "-maxrate", - "50000k", - "-bufsize", - "25000k", - "-c:a", - "s302m", - "-strict", - "-2", - "-ar", - "48000", - "-ac", - "2", - "-f", - "mpegts", - "-", - ]; - - let mut player_proc = match Command::new("ffplay") - .args(["-v", "error", "-hide_banner", "-nostats", "-i", "pipe:0"]) - .stdin(Stdio::piped()) - .spawn() - { - Err(e) => panic!("couldn't spawn ffplay: {}", e), - Ok(proc) => proc, - }; - - let (ingest_sender, ingest_receiver): ( - SyncSender<(usize, [u8; 65088])>, - Receiver<(usize, [u8; 65088])>, - ) = sync_channel(1); - let runtime = Runtime::new().unwrap(); - - runtime.spawn(ingest_server( - dec_setting.clone(), - ingest_sender, - server_term.clone(), - is_terminated.clone(), - server_is_running.clone(), - )); - - let mut buffer: [u8; 65088] = [0; 65088]; - - let mut dec_cmd = vec![ - "-v", - "error", - "-hide_banner", - "-nostats", - "-f", - "lavfi", - "-i", - "testsrc=duration=120:size=1024x576:rate=25", - "-f", - "lavfi", - "-i", - "anoisesrc=d=120:c=pink:r=48000:a=0.5", - ]; - - dec_cmd.append(&mut dec_setting.clone()); - - let mut dec_proc = match Command::new("ffmpeg") - .args(dec_cmd) - .stdout(Stdio::piped()) - .spawn() - { - Err(e) => panic!("couldn't spawn ffmpeg: {}", e), - Ok(proc) => proc, - }; - - let mut player_writer = player_proc.stdin.as_ref().unwrap(); - let mut dec_reader = BufReader::new(dec_proc.stdout.take().unwrap()); - - let mut live_on = false; - - let mut count = 0; - - loop { - count += 1; - - if *server_is_running.lock().unwrap() { - if let Ok(receive) = ingest_receiver.try_recv() { - if let Err(e) = player_writer.write(&receive.1[..receive.0]) { - println!("Ingest receiver error: {:?}", e); - - break; - }; - } - - if !live_on { - println!("Switch from offline source to live"); - - live_on = true; - } - } else { - println!("{count}"); - let dec_bytes_len = match dec_reader.read(&mut buffer[..]) { - Ok(length) => length, - Err(e) => { - println!("Reading error from decoder: {:?}", e); - - break; - } - }; - - if dec_bytes_len > 0 { - if let Err(e) = player_writer.write(&buffer[..dec_bytes_len]) { - println!("Encoder write error: {:?}", e); - - break; - }; - } else { - if live_on { - println!("Switch from live ingest to offline source"); - - live_on = false; - } - - player_writer.flush().unwrap(); - } - } - } - - *is_terminated.lock().unwrap() = true; - - if let Some(server) = &*server_term.lock().unwrap() { - unsafe { - if let Ok(_) = server.terminate() { - println!("Terminate ingest server done"); - } - } - }; - - sleep(Duration::from_secs(1)); - - match player_proc.kill() { - Ok(_) => println!("Playout done..."), - Err(e) => panic!("Encoder error: {:?}", e), - } - - if let Err(e) = player_proc.wait() { - println!("Encoder: {e}") - }; -} diff --git a/examples/watch.rs b/examples/watch.rs deleted file mode 100644 index af7f3837..00000000 --- a/examples/watch.rs +++ /dev/null @@ -1,141 +0,0 @@ -use notify::DebouncedEvent::{Create, Remove, Rename}; -use notify::{watcher, RecursiveMode, Watcher}; -use std::{ - ffi::OsStr, - path::Path, - sync::{ - mpsc::{channel, Receiver}, - {Arc, Mutex}, - }, - thread::sleep, - time::Duration, -}; - -use walkdir::WalkDir; - -use tokio::runtime::Builder; - -#[derive(Debug, Clone)] -pub struct Source { - nodes: Arc>>, - index: usize, -} - -impl Source { - pub fn new(path: String) -> Self { - let mut file_list = vec![]; - - for entry in WalkDir::new(path.clone()) - .into_iter() - .filter_map(|e| e.ok()) - { - if entry.path().is_file() { - let ext = file_extension(entry.path()); - - if ext.is_some() - && ["mp4".to_string(), "mkv".to_string()] - .clone() - .contains(&ext.unwrap().to_lowercase()) - { - file_list.push(entry.path().display().to_string()); - } - } - } - - Self { - nodes: Arc::new(Mutex::new(file_list)), - index: 0, - } - } -} - -impl Iterator for Source { - type Item = String; - - fn next(&mut self) -> Option { - if self.index < self.nodes.lock().unwrap().len() { - let current_file = self.nodes.lock().unwrap()[self.index].clone(); - self.index += 1; - - Some(current_file) - } else { - let current_file = self.nodes.lock().unwrap()[0].clone(); - - self.index = 1; - - Some(current_file) - } - } -} - -async fn watch(receiver: Receiver, sources: Arc>>) { - while let Ok(res) = receiver.recv() { - match res { - Create(new_path) => { - sources.lock().unwrap().push(new_path.display().to_string()); - println!("Create new file: {:?}", new_path); - } - Remove(old_path) => { - sources - .lock() - .unwrap() - .retain(|x| x != &old_path.display().to_string()); - println!("Remove file: {:?}", old_path); - } - Rename(old_path, new_path) => { - let i = sources - .lock() - .unwrap() - .iter() - .position(|x| *x == old_path.display().to_string()) - .unwrap(); - sources.lock().unwrap()[i] = new_path.display().to_string(); - println!("Rename file: {:?} to {:?}", old_path, new_path); - } - _ => (), - } - } -} - -fn file_extension(filename: &Path) -> Option<&str> { - filename.extension().and_then(OsStr::to_str) -} - -fn main() { - let path = "/home/jb/Videos/tv-media/ADtv/01 - Intro".to_string(); - let sources = Source::new(path.clone()); - - let (sender, receiver) = channel(); - let runtime = Builder::new_multi_thread() - .worker_threads(1) - .thread_name("file_watcher") - .enable_all() - .build() - .expect("Creating Tokio runtime"); - - let mut watcher = watcher(sender, Duration::from_secs(2)).unwrap(); - - watcher - .watch(path.clone(), RecursiveMode::Recursive) - .unwrap(); - - runtime.spawn(watch( - receiver, - Arc::clone(&sources.nodes), - )); - - let mut count = 0; - - for node in sources { - println!("task: {:?}", node); - sleep(Duration::from_secs(1)); - - count += 1; - - if count == 5 { - break; - } - } - - println!("after loop"); -} diff --git a/examples/watch_simple.rs b/examples/watch_simple.rs deleted file mode 100644 index 795de7fe..00000000 --- a/examples/watch_simple.rs +++ /dev/null @@ -1,79 +0,0 @@ -use notify::DebouncedEvent::{Create, Remove, Rename}; -use notify::{watcher, RecursiveMode, Watcher}; -use std::{ - sync::{ - mpsc::{channel, Receiver}, - {Arc, Mutex}, - }, - thread::sleep, - time::Duration, -}; - -use tokio::runtime::Builder; - -async fn watch(receiver: Receiver, stop: Arc>) { - loop { - if *stop.lock().unwrap() { - break; - } - - match receiver.recv() { - Ok(event) => match event { - Create(new_path) => { - println!("Create new file: {:?}", new_path); - } - Remove(old_path) => { - println!("Remove file: {:?}", old_path); - } - Rename(old_path, new_path) => { - println!("Rename file: {:?} to {:?}", old_path, new_path); - } - _ => (), - }, - Err(e) => { - println!("{:?}", e); - } - } - - sleep(Duration::from_secs(1)); - } -} - -fn main() { - let path = "/home/jb/Videos/tv-media/ADtv/01 - Intro".to_string(); - let stop = Arc::new(Mutex::new(false)); - - let (sender, receiver) = channel(); - let mut watcher = watcher(sender, Duration::from_secs(2)).unwrap(); - watcher.watch(path.clone(), RecursiveMode::Recursive).unwrap(); - - let runtime = Builder::new_multi_thread() - .worker_threads(1) - .thread_name("file_watcher") - .enable_all() - .build() - .expect("Creating Tokio runtime"); - - if true { - runtime.spawn(watch(receiver, Arc::clone(&stop))); - } - - let mut count = 0; - - loop { - println!("task: {count}"); - sleep(Duration::from_secs(1)); - - count += 1; - - if count == 5 { - break; - } - } - - *stop.lock().unwrap() = true; - - watcher.unwatch(path).unwrap(); - - println!("after loop"); -}