fix folder watch

This commit is contained in:
jb-alvarado 2022-04-11 17:47:43 +02:00
parent f0ebfb710c
commit ee96742c2c
6 changed files with 128 additions and 114 deletions

10
Cargo.lock generated
View File

@ -167,7 +167,7 @@ dependencies = [
[[package]]
name = "ffplayout-rs"
version = "0.9.1"
version = "0.9.2"
dependencies = [
"chrono",
"clap",
@ -1040,9 +1040,9 @@ dependencies = [
[[package]]
name = "quote"
version = "1.0.17"
version = "1.0.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "632d02bff7f874a36f33ea8bb416cd484b90cc66c1194b1a1110d067a7013f58"
checksum = "a1feb54ed693b93a84e14094943b84b7c4eae204c512b7ccb95ab0c66d278ad1"
dependencies = [
"proc-macro2",
]
@ -1375,9 +1375,9 @@ checksum = "360dfd1d6d30e05fda32ace2c8c70e9c0a9da713275777f5a4dbb8a1893930c6"
[[package]]
name = "tracing"
version = "0.1.32"
version = "0.1.33"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4a1bdf54a7c28a2bbf701e1d2233f6c77f473486b94bee4f9678da5a148dca7f"
checksum = "80b9fa4360528139bc96100c160b7ae879f5567f49f1782b0b02035b0358ebf3"
dependencies = [
"cfg-if 1.0.0",
"pin-project-lite",

View File

@ -2,7 +2,7 @@
name = "ffplayout-rs"
description = "24/7 playout based on rust and ffmpeg"
license = "GPL-3.0"
version = "0.9.1"
version = "0.9.2"
edition = "2021"
[dependencies]

View File

@ -1,13 +1,19 @@
use notify::DebouncedEvent::{Create, Remove, Rename};
use notify::{
DebouncedEvent::{Create, Remove, Rename},
{watcher, RecursiveMode, Watcher},
};
use rand::{seq::SliceRandom, thread_rng};
use simplelog::*;
use std::{
ffi::OsStr,
path::Path,
sync::{
mpsc::Receiver,
mpsc::channel,
{Arc, Mutex},
},
thread::sleep,
time::Duration,
};
use walkdir::WalkDir;
@ -23,10 +29,7 @@ pub struct Source {
}
impl Source {
pub fn new(
current_list: Arc<Mutex<Vec<Media>>>,
global_index: Arc<Mutex<usize>>,
) -> Self {
pub fn new(current_list: Arc<Mutex<Vec<Media>>>, global_index: Arc<Mutex<usize>>) -> Self {
let config = GlobalConfig::global();
let mut media_list = vec![];
let mut index: usize = 0;
@ -141,40 +144,61 @@ fn file_extension(filename: &Path) -> Option<&str> {
filename.extension().and_then(OsStr::to_str)
}
pub async fn file_worker(
receiver: Receiver<notify::DebouncedEvent>,
pub async fn watchman(
sources: Arc<Mutex<Vec<Media>>>,
is_terminated: Arc<Mutex<bool>>,
) {
while let Ok(res) = receiver.recv() {
match res {
Create(new_path) => {
let index = sources.lock().unwrap().len();
let media = Media::new(index, new_path.display().to_string(), false);
let config = GlobalConfig::global();
let (tx, rx) = channel();
sources.lock().unwrap().push(media);
info!("Create new file: {new_path:?}");
}
Remove(old_path) => {
sources
.lock()
.unwrap()
.retain(|x| x.source != old_path.display().to_string());
info!("Remove file: {old_path:?}");
}
Rename(old_path, new_path) => {
let index = sources
.lock()
.unwrap()
.iter()
.position(|x| *x.source == old_path.display().to_string())
.unwrap();
let path = config.storage.path.clone();
let media = Media::new(index, new_path.display().to_string(), false);
sources.lock().unwrap()[index] = media;
if !Path::new(&path).exists() {
error!("Folder path not exists: '{path}'");
panic!("Folder path not exists: '{path}'");
}
info!("Rename file: {old_path:?} to {new_path:?}");
}
_ => (),
let mut watcher = watcher(tx, Duration::from_secs(2)).unwrap();
watcher.watch(path, RecursiveMode::Recursive).unwrap();
loop {
if *is_terminated.lock().unwrap() {
break
}
if let Ok(res) = rx.try_recv() {
match res {
Create(new_path) => {
let index = sources.lock().unwrap().len();
let media = Media::new(index, new_path.display().to_string(), false);
sources.lock().unwrap().push(media);
info!("Create new file: <b><magenta>{new_path:?}</></b>");
}
Remove(old_path) => {
sources
.lock()
.unwrap()
.retain(|x| x.source != old_path.display().to_string());
info!("Remove file: <b><magenta>{old_path:?}</></b>");
}
Rename(old_path, new_path) => {
let index = sources
.lock()
.unwrap()
.iter()
.position(|x| *x.source == old_path.display().to_string())
.unwrap();
let media = Media::new(index, new_path.display().to_string(), false);
sources.lock().unwrap()[index] = media;
info!("Rename file: <b><magenta>{old_path:?}</></b> to <b><magenta>{new_path:?}</></b>");
}
_ => (),
}
}
sleep(Duration::from_secs(4));
}
}

View File

@ -1,7 +1,60 @@
use std::{
process,
sync::{Arc, Mutex},
};
use simplelog::*;
use tokio::runtime::Handle;
use crate::utils::{GlobalConfig, Media, PlayoutStatus};
pub mod folder;
pub mod ingest;
pub mod playlist;
pub use folder::{watchman, Source};
pub use ingest::ingest_server;
pub use folder::{file_worker, Source};
pub use playlist::CurrentProgram;
pub fn source_generator(
rt_handle: &Handle,
config: GlobalConfig,
current_list: Arc<Mutex<Vec<Media>>>,
index: Arc<Mutex<usize>>,
playout_stat: PlayoutStatus,
is_terminated: Arc<Mutex<bool>>,
) -> Box<dyn Iterator<Item = Media>> {
let get_source = match config.processing.clone().mode.as_str() {
"folder" => {
let path = config.storage.path.clone();
info!("Playout in folder mode.");
let folder_source = Source::new(current_list, index);
debug!("Monitor folder: <b><magenta>{}</></b>", path);
rt_handle.spawn(watchman(folder_source.nodes.clone(), is_terminated.clone()));
Box::new(folder_source) as Box<dyn Iterator<Item = Media>>
}
"playlist" => {
info!("Playout in playlist mode");
let program = CurrentProgram::new(
rt_handle.clone(),
playout_stat,
is_terminated.clone(),
current_list,
index,
);
Box::new(program) as Box<dyn Iterator<Item = Media>>
}
_ => {
error!("Process Mode not exists!");
process::exit(0x0100);
}
};
get_source
}

View File

@ -17,12 +17,14 @@ out:
*/
use std::process::{Command, Stdio};
use std::{
process::{Command, Stdio},
};
use simplelog::*;
use tokio::runtime::Handle;
use crate::output::source_generator;
use crate::input::source_generator;
use crate::utils::{
sec_to_time, stderr_reader, GlobalConfig, PlayerControl, PlayoutStatus, ProcessControl,
};

View File

@ -1,13 +1,7 @@
use notify::{watcher, RecursiveMode, Watcher};
use std::{
io::{prelude::*, BufReader, BufWriter, Read},
path::Path,
process,
process::{Command, Stdio},
sync::{
mpsc::{channel, sync_channel, Receiver, SyncSender},
Arc, Mutex,
},
sync::mpsc::{sync_channel, Receiver, SyncSender},
thread::sleep,
time::Duration,
};
@ -22,64 +16,11 @@ mod stream;
pub use hls::write_hls;
use crate::input::{file_worker, ingest_server, CurrentProgram, Source};
use crate::input::{ingest_server, source_generator};
use crate::utils::{
sec_to_time, stderr_reader, GlobalConfig, Media, PlayerControl, PlayoutStatus, ProcessControl,
sec_to_time, stderr_reader, GlobalConfig, PlayerControl, PlayoutStatus, ProcessControl,
};
pub fn source_generator(
rt_handle: &Handle,
config: GlobalConfig,
current_list: Arc<Mutex<Vec<Media>>>,
index: Arc<Mutex<usize>>,
playout_stat: PlayoutStatus,
is_terminated: Arc<Mutex<bool>>,
) -> Box<dyn Iterator<Item = Media>> {
let get_source = match config.processing.clone().mode.as_str() {
"folder" => {
let path = config.storage.path.clone();
if !Path::new(&path).exists() {
error!("Folder path not exists: '{path}'");
process::exit(0x0100);
}
info!("Playout in folder mode.");
let folder_source = Source::new(current_list, index);
let (sender, receiver) = channel();
let mut watchman = watcher(sender, Duration::from_secs(2)).unwrap();
watchman
.watch(path.clone(), RecursiveMode::Recursive)
.unwrap();
debug!("Monitor folder: <b><magenta>{}</></b>", path);
rt_handle.spawn(file_worker(receiver, folder_source.nodes.clone()));
Box::new(folder_source) as Box<dyn Iterator<Item = Media>>
}
"playlist" => {
info!("Playout in playlist mode");
let program = CurrentProgram::new(
rt_handle.clone(),
playout_stat,
is_terminated.clone(),
current_list,
index,
);
Box::new(program) as Box<dyn Iterator<Item = Media>>
}
_ => {
error!("Process Mode not exists!");
process::exit(0x0100);
}
};
get_source
}
pub fn player(
rt_handle: &Handle,
play_control: PlayerControl,
@ -110,10 +51,7 @@ pub fn player(
let mut enc_writer = BufWriter::new(enc_proc.stdin.take().unwrap());
rt_handle.spawn(stderr_reader(
enc_proc.stderr.take().unwrap(),
"Encoder",
));
rt_handle.spawn(stderr_reader(enc_proc.stderr.take().unwrap(), "Encoder"));
let (ingest_sender, ingest_receiver): (
SyncSender<(usize, [u8; 65088])>,
@ -177,10 +115,7 @@ pub fn player(
let mut dec_reader = BufReader::new(dec_proc.stdout.take().unwrap());
rt_handle.spawn(stderr_reader(
dec_proc.stderr.take().unwrap(),
"Decoder",
));
rt_handle.spawn(stderr_reader(dec_proc.stderr.take().unwrap(), "Decoder"));
if let Ok(dec_terminator) = dec_proc.terminator() {
*proc_control.decoder_term.lock().unwrap() = Some(dec_terminator);