From ee96742c2c14573dda7259e94ed6e85e78d21037 Mon Sep 17 00:00:00 2001 From: jb-alvarado Date: Mon, 11 Apr 2022 17:47:43 +0200 Subject: [PATCH] fix folder watch --- Cargo.lock | 10 ++--- Cargo.toml | 2 +- src/input/folder.rs | 94 ++++++++++++++++++++++++++++----------------- src/input/mod.rs | 55 +++++++++++++++++++++++++- src/output/hls.rs | 6 ++- src/output/mod.rs | 75 +++--------------------------------- 6 files changed, 128 insertions(+), 114 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index beeaa855..37152d7d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -167,7 +167,7 @@ dependencies = [ [[package]] name = "ffplayout-rs" -version = "0.9.1" +version = "0.9.2" dependencies = [ "chrono", "clap", @@ -1040,9 +1040,9 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.17" +version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "632d02bff7f874a36f33ea8bb416cd484b90cc66c1194b1a1110d067a7013f58" +checksum = "a1feb54ed693b93a84e14094943b84b7c4eae204c512b7ccb95ab0c66d278ad1" dependencies = [ "proc-macro2", ] @@ -1375,9 +1375,9 @@ checksum = "360dfd1d6d30e05fda32ace2c8c70e9c0a9da713275777f5a4dbb8a1893930c6" [[package]] name = "tracing" -version = "0.1.32" +version = "0.1.33" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4a1bdf54a7c28a2bbf701e1d2233f6c77f473486b94bee4f9678da5a148dca7f" +checksum = "80b9fa4360528139bc96100c160b7ae879f5567f49f1782b0b02035b0358ebf3" dependencies = [ "cfg-if 1.0.0", "pin-project-lite", diff --git a/Cargo.toml b/Cargo.toml index 4ede8c10..c7b88abf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,7 @@ name = "ffplayout-rs" description = "24/7 playout based on rust and ffmpeg" license = "GPL-3.0" -version = "0.9.1" +version = "0.9.2" edition = "2021" [dependencies] diff --git a/src/input/folder.rs b/src/input/folder.rs index 8c2773cd..f0ab30cf 100644 --- a/src/input/folder.rs +++ b/src/input/folder.rs @@ -1,13 +1,19 @@ -use notify::DebouncedEvent::{Create, Remove, Rename}; +use notify::{ + DebouncedEvent::{Create, Remove, Rename}, + {watcher, RecursiveMode, Watcher}, +}; + use rand::{seq::SliceRandom, thread_rng}; use simplelog::*; use std::{ ffi::OsStr, path::Path, sync::{ - mpsc::Receiver, + mpsc::channel, {Arc, Mutex}, }, + thread::sleep, + time::Duration, }; use walkdir::WalkDir; @@ -23,10 +29,7 @@ pub struct Source { } impl Source { - pub fn new( - current_list: Arc>>, - global_index: Arc>, - ) -> Self { + pub fn new(current_list: Arc>>, global_index: Arc>) -> Self { let config = GlobalConfig::global(); let mut media_list = vec![]; let mut index: usize = 0; @@ -141,40 +144,61 @@ fn file_extension(filename: &Path) -> Option<&str> { filename.extension().and_then(OsStr::to_str) } -pub async fn file_worker( - receiver: Receiver, +pub async fn watchman( sources: Arc>>, + is_terminated: Arc>, ) { - while let Ok(res) = receiver.recv() { - match res { - Create(new_path) => { - let index = sources.lock().unwrap().len(); - let media = Media::new(index, new_path.display().to_string(), false); + let config = GlobalConfig::global(); + let (tx, rx) = channel(); - sources.lock().unwrap().push(media); - info!("Create new file: {new_path:?}"); - } - Remove(old_path) => { - sources - .lock() - .unwrap() - .retain(|x| x.source != old_path.display().to_string()); - info!("Remove file: {old_path:?}"); - } - Rename(old_path, new_path) => { - let index = sources - .lock() - .unwrap() - .iter() - .position(|x| *x.source == old_path.display().to_string()) - .unwrap(); + let path = config.storage.path.clone(); - let media = Media::new(index, new_path.display().to_string(), false); - sources.lock().unwrap()[index] = media; + if !Path::new(&path).exists() { + error!("Folder path not exists: '{path}'"); + panic!("Folder path not exists: '{path}'"); + } - info!("Rename file: {old_path:?} to {new_path:?}"); - } - _ => (), + let mut watcher = watcher(tx, Duration::from_secs(2)).unwrap(); + watcher.watch(path, RecursiveMode::Recursive).unwrap(); + + loop { + if *is_terminated.lock().unwrap() { + break } + + if let Ok(res) = rx.try_recv() { + match res { + Create(new_path) => { + let index = sources.lock().unwrap().len(); + let media = Media::new(index, new_path.display().to_string(), false); + + sources.lock().unwrap().push(media); + info!("Create new file: {new_path:?}"); + } + Remove(old_path) => { + sources + .lock() + .unwrap() + .retain(|x| x.source != old_path.display().to_string()); + info!("Remove file: {old_path:?}"); + } + Rename(old_path, new_path) => { + let index = sources + .lock() + .unwrap() + .iter() + .position(|x| *x.source == old_path.display().to_string()) + .unwrap(); + + let media = Media::new(index, new_path.display().to_string(), false); + sources.lock().unwrap()[index] = media; + + info!("Rename file: {old_path:?} to {new_path:?}"); + } + _ => (), + } + } + + sleep(Duration::from_secs(4)); } } diff --git a/src/input/mod.rs b/src/input/mod.rs index c5273137..20bcd515 100644 --- a/src/input/mod.rs +++ b/src/input/mod.rs @@ -1,7 +1,60 @@ +use std::{ + process, + sync::{Arc, Mutex}, +}; + +use simplelog::*; +use tokio::runtime::Handle; + +use crate::utils::{GlobalConfig, Media, PlayoutStatus}; + pub mod folder; pub mod ingest; pub mod playlist; +pub use folder::{watchman, Source}; pub use ingest::ingest_server; -pub use folder::{file_worker, Source}; pub use playlist::CurrentProgram; + +pub fn source_generator( + rt_handle: &Handle, + config: GlobalConfig, + current_list: Arc>>, + index: Arc>, + playout_stat: PlayoutStatus, + is_terminated: Arc>, +) -> Box> { + let get_source = match config.processing.clone().mode.as_str() { + "folder" => { + let path = config.storage.path.clone(); + + info!("Playout in folder mode."); + + let folder_source = Source::new(current_list, index); + + debug!("Monitor folder: {}", path); + + rt_handle.spawn(watchman(folder_source.nodes.clone(), is_terminated.clone())); + + Box::new(folder_source) as Box> + } + "playlist" => { + info!("Playout in playlist mode"); + let program = CurrentProgram::new( + rt_handle.clone(), + playout_stat, + is_terminated.clone(), + current_list, + index, + ); + + Box::new(program) as Box> + } + _ => { + error!("Process Mode not exists!"); + process::exit(0x0100); + } + }; + + get_source +} diff --git a/src/output/hls.rs b/src/output/hls.rs index b92fb03d..79079dbe 100644 --- a/src/output/hls.rs +++ b/src/output/hls.rs @@ -17,12 +17,14 @@ out: */ -use std::process::{Command, Stdio}; +use std::{ + process::{Command, Stdio}, +}; use simplelog::*; use tokio::runtime::Handle; -use crate::output::source_generator; +use crate::input::source_generator; use crate::utils::{ sec_to_time, stderr_reader, GlobalConfig, PlayerControl, PlayoutStatus, ProcessControl, }; diff --git a/src/output/mod.rs b/src/output/mod.rs index 32e26b3a..454ab10a 100644 --- a/src/output/mod.rs +++ b/src/output/mod.rs @@ -1,13 +1,7 @@ -use notify::{watcher, RecursiveMode, Watcher}; use std::{ io::{prelude::*, BufReader, BufWriter, Read}, - path::Path, - process, process::{Command, Stdio}, - sync::{ - mpsc::{channel, sync_channel, Receiver, SyncSender}, - Arc, Mutex, - }, + sync::mpsc::{sync_channel, Receiver, SyncSender}, thread::sleep, time::Duration, }; @@ -22,64 +16,11 @@ mod stream; pub use hls::write_hls; -use crate::input::{file_worker, ingest_server, CurrentProgram, Source}; +use crate::input::{ingest_server, source_generator}; use crate::utils::{ - sec_to_time, stderr_reader, GlobalConfig, Media, PlayerControl, PlayoutStatus, ProcessControl, + sec_to_time, stderr_reader, GlobalConfig, PlayerControl, PlayoutStatus, ProcessControl, }; -pub fn source_generator( - rt_handle: &Handle, - config: GlobalConfig, - current_list: Arc>>, - index: Arc>, - playout_stat: PlayoutStatus, - is_terminated: Arc>, -) -> Box> { - let get_source = match config.processing.clone().mode.as_str() { - "folder" => { - let path = config.storage.path.clone(); - if !Path::new(&path).exists() { - error!("Folder path not exists: '{path}'"); - process::exit(0x0100); - } - - info!("Playout in folder mode."); - - let folder_source = Source::new(current_list, index); - - let (sender, receiver) = channel(); - let mut watchman = watcher(sender, Duration::from_secs(2)).unwrap(); - watchman - .watch(path.clone(), RecursiveMode::Recursive) - .unwrap(); - - debug!("Monitor folder: {}", path); - - rt_handle.spawn(file_worker(receiver, folder_source.nodes.clone())); - - Box::new(folder_source) as Box> - } - "playlist" => { - info!("Playout in playlist mode"); - let program = CurrentProgram::new( - rt_handle.clone(), - playout_stat, - is_terminated.clone(), - current_list, - index, - ); - - Box::new(program) as Box> - } - _ => { - error!("Process Mode not exists!"); - process::exit(0x0100); - } - }; - - get_source -} - pub fn player( rt_handle: &Handle, play_control: PlayerControl, @@ -110,10 +51,7 @@ pub fn player( let mut enc_writer = BufWriter::new(enc_proc.stdin.take().unwrap()); - rt_handle.spawn(stderr_reader( - enc_proc.stderr.take().unwrap(), - "Encoder", - )); + rt_handle.spawn(stderr_reader(enc_proc.stderr.take().unwrap(), "Encoder")); let (ingest_sender, ingest_receiver): ( SyncSender<(usize, [u8; 65088])>, @@ -177,10 +115,7 @@ pub fn player( let mut dec_reader = BufReader::new(dec_proc.stdout.take().unwrap()); - rt_handle.spawn(stderr_reader( - dec_proc.stderr.take().unwrap(), - "Decoder", - )); + rt_handle.spawn(stderr_reader(dec_proc.stderr.take().unwrap(), "Decoder")); if let Ok(dec_terminator) = dec_proc.terminator() { *proc_control.decoder_term.lock().unwrap() = Some(dec_terminator);