From e33f5a1b5a9b9be063b37730e751d66d0c45ed8c Mon Sep 17 00:00:00 2001 From: jb-alvarado Date: Wed, 6 Apr 2022 15:53:27 +0200 Subject: [PATCH] continue work on json rpc server --- src/input/folder.rs | 83 +++++++++++++++++++++++++++------------- src/input/mod.rs | 2 +- src/input/playlist.rs | 8 ++-- src/main.rs | 2 +- src/output/desktop.rs | 2 +- src/output/hls.rs | 30 +++++++-------- src/output/mod.rs | 28 +++++++++----- src/output/stream.rs | 2 +- src/utils/json_reader.rs | 2 +- src/utils/mod.rs | 21 ++++++++-- src/utils/rpc_server.rs | 82 +++++++++++++++++++++++++++++++++------ 11 files changed, 189 insertions(+), 73 deletions(-) diff --git a/src/input/folder.rs b/src/input/folder.rs index 43ca504e..c0a3bfc8 100644 --- a/src/input/folder.rs +++ b/src/input/folder.rs @@ -12,20 +12,21 @@ use std::{ use walkdir::WalkDir; -use crate::utils::{GlobalConfig, Media}; +use crate::utils::{get_sec, GlobalConfig, Media}; #[derive(Debug, Clone)] pub struct Source { config: GlobalConfig, - pub nodes: Arc>>, + pub nodes: Arc>>, current_node: Media, - index: usize, + index: Arc>, } impl Source { - pub fn new() -> Self { + pub fn new(current_list: Arc>>, global_index: Arc>) -> Self { let config = GlobalConfig::global(); - let mut file_list = vec![]; + let mut media_list = vec![]; + let mut index: usize = 0; for entry in WalkDir::new(config.storage.path.clone()) .into_iter() @@ -41,7 +42,8 @@ impl Source { .clone() .contains(&ext.unwrap().to_lowercase()) { - file_list.push(entry.path().display().to_string()); + let media = Media::new(0, entry.path().display().to_string(), false); + media_list.push(media); } } } @@ -49,26 +51,48 @@ impl Source { if config.storage.shuffle { info!("Shuffle files"); let mut rng = thread_rng(); - file_list.shuffle(&mut rng); + media_list.shuffle(&mut rng); } else { - file_list.sort(); + media_list.sort_by(|d1, d2| d1.source.cmp(&d2.source)); } + for item in media_list.iter_mut() { + item.index = Some(index); + + index += 1; + } + + *current_list.lock().unwrap() = media_list; + Self { config: config.clone(), - nodes: Arc::new(Mutex::new(file_list)), - current_node: Media::new(0, "".to_string()), - index: 0, + nodes: current_list, + current_node: Media::new(0, "".to_string(), false), + index: global_index, } } fn shuffle(&mut self) { let mut rng = thread_rng(); self.nodes.lock().unwrap().shuffle(&mut rng); + let mut index: usize = 0; + + for item in self.nodes.lock().unwrap().iter_mut() { + item.index = Some(index); + + index += 1; + } } fn sort(&mut self) { - self.nodes.lock().unwrap().sort(); + self.nodes.lock().unwrap().sort_by(|d1, d2| d1.source.cmp(&d2.source)); + let mut index: usize = 0; + + for item in self.nodes.lock().unwrap().iter_mut() { + item.index = Some(index); + + index += 1; + } } } @@ -76,13 +100,14 @@ impl Iterator for Source { type Item = Media; fn next(&mut self) -> Option { - if self.index < self.nodes.lock().unwrap().len() { - let current_file = self.nodes.lock().unwrap()[self.index].clone(); - self.current_node = Media::new(self.index, current_file); + if *self.index.lock().unwrap() < self.nodes.lock().unwrap().len() { + let i = *self.index.lock().unwrap(); + self.current_node = self.nodes.lock().unwrap()[i].clone(); self.current_node.add_probe(); self.current_node.add_filter(); + self.current_node.begin = Some(get_sec()); - self.index += 1; + *self.index.lock().unwrap() += 1; Some(self.current_node.clone()) } else { @@ -94,12 +119,12 @@ impl Iterator for Source { self.sort(); } - let current_file = self.nodes.lock().unwrap()[0].clone(); - self.current_node = Media::new(self.index, current_file); + self.current_node = self.nodes.lock().unwrap()[0].clone(); self.current_node.add_probe(); self.current_node.add_filter(); + self.current_node.begin = Some(get_sec()); - self.index = 1; + *self.index.lock().unwrap() = 1; Some(self.current_node.clone()) } @@ -110,31 +135,37 @@ fn file_extension(filename: &Path) -> Option<&str> { filename.extension().and_then(OsStr::to_str) } -pub async fn watch_folder( +pub async fn file_worker( receiver: Receiver, - sources: Arc>>, + sources: Arc>>, ) { while let Ok(res) = receiver.recv() { match res { Create(new_path) => { - sources.lock().unwrap().push(new_path.display().to_string()); + let index = sources.lock().unwrap().len(); + let media = Media::new(index, new_path.display().to_string(), false); + + sources.lock().unwrap().push(media); info!("Create new file: {:?}", new_path); } Remove(old_path) => { sources .lock() .unwrap() - .retain(|x| x != &old_path.display().to_string()); + .retain(|x| x.source != old_path.display().to_string()); info!("Remove file: {:?}", old_path); } Rename(old_path, new_path) => { - let i = sources + let index = sources .lock() .unwrap() .iter() - .position(|x| *x == old_path.display().to_string()) + .position(|x| *x.source == old_path.display().to_string()) .unwrap(); - sources.lock().unwrap()[i] = new_path.display().to_string(); + + let media = Media::new(index, new_path.display().to_string(), false); + sources.lock().unwrap()[index] = media; + info!("Rename file: {:?} to {:?}", old_path, new_path); } _ => (), diff --git a/src/input/mod.rs b/src/input/mod.rs index ccdf5271..c5273137 100644 --- a/src/input/mod.rs +++ b/src/input/mod.rs @@ -3,5 +3,5 @@ pub mod ingest; pub mod playlist; pub use ingest::ingest_server; -pub use folder::{watch_folder, Source}; +pub use folder::{file_worker, Source}; pub use playlist::CurrentProgram; diff --git a/src/input/playlist.rs b/src/input/playlist.rs index 5d3d87f0..d04f8446 100644 --- a/src/input/playlist.rs +++ b/src/input/playlist.rs @@ -38,7 +38,7 @@ impl CurrentProgram { json_path: json.current_file, json_date: json.date, nodes: json.program, - current_node: Media::new(0, "".to_string()), + current_node: Media::new(0, "".to_string(), false), init: Arc::new(Mutex::new(true)), index: 0, rt_handle, @@ -92,7 +92,7 @@ impl CurrentProgram { "Playlist {} not exists!", self.json_path.clone().unwrap() ); - let mut media = Media::new(0, "".to_string()); + let mut media = Media::new(0, "".to_string(), false); media.begin = Some(get_sec()); media.duration = DUMMY_LEN; media.out = DUMMY_LEN; @@ -236,7 +236,7 @@ impl Iterator for CurrentProgram { if self.config.playlist.start_sec.unwrap() > current_time { current_time += self.config.playlist.length_sec.unwrap() + 1.0; } - let mut media = Media::new(0, "".to_string()); + let mut media = Media::new(0, "".to_string(), false); media.begin = Some(current_time); media.duration = duration; media.out = duration; @@ -279,7 +279,7 @@ impl Iterator for CurrentProgram { { // Test if playlist is to early finish, // and if we have to fill it with a placeholder. - self.current_node = Media::new(self.index, "".to_string()); + self.current_node = Media::new(self.index, "".to_string(), false); self.current_node.begin = Some(get_sec()); let mut duration = total_delta.abs(); diff --git a/src/main.rs b/src/main.rs index ee4a65ed..77a1d76a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -30,7 +30,7 @@ fn main() { } if config.out.mode.to_lowercase() == "hls".to_string() { - write_hls(rt_handle, proc_control.is_terminated.clone()); + write_hls(rt_handle, proc_control); } else { player(rt_handle, proc_control); } diff --git a/src/output/desktop.rs b/src/output/desktop.rs index 59c214bb..052b429e 100644 --- a/src/output/desktop.rs +++ b/src/output/desktop.rs @@ -29,7 +29,7 @@ pub fn output(log_format: String) -> process::Child { ); let mut filter: String = "null,".to_string(); - filter.push_str(v_drawtext::filter_node(&mut Media::new(0, "".to_string())).as_str()); + filter.push_str(v_drawtext::filter_node(&mut Media::new(0, "".to_string(), false)).as_str()); enc_filter = vec!["-vf".to_string(), filter]; } diff --git a/src/output/hls.rs b/src/output/hls.rs index 8186322a..95926c1b 100644 --- a/src/output/hls.rs +++ b/src/output/hls.rs @@ -1,16 +1,3 @@ -use std::{ - process::{Command, Stdio}, - sync::{ - Arc, Mutex, - }, -}; - -use simplelog::*; -use tokio::runtime::Handle; - -use crate::output::source_generator; -use crate::utils::{sec_to_time, stderr_reader, GlobalConfig}; - /* This module write the files compression directly to a hls (m3u8) playlist, without pre- and post-processing. @@ -30,7 +17,15 @@ out: */ -pub fn write_hls(rt_handle: &Handle, is_terminated: Arc>) { +use std::process::{Command, Stdio}; + +use simplelog::*; +use tokio::runtime::Handle; + +use crate::output::source_generator; +use crate::utils::{sec_to_time, stderr_reader, GlobalConfig, ProcessControl}; + +pub fn write_hls(rt_handle: &Handle, proc_control: ProcessControl) { let config = GlobalConfig::global(); let dec_settings = config.out.clone().output_cmd.unwrap(); let ff_log_format = format!("level+{}", config.logging.ffmpeg_level.to_lowercase()); @@ -38,10 +33,15 @@ pub fn write_hls(rt_handle: &Handle, is_terminated: Arc>) { let (get_source, _) = source_generator( rt_handle, config.clone(), - is_terminated.clone(), + proc_control.is_terminated.clone(), + proc_control.current_list.clone(), + proc_control.index.clone(), ); for node in get_source { + *proc_control.current_media.lock().unwrap() = Some(node.clone()); + *proc_control.index.lock().unwrap() = node.index.clone().unwrap(); + let cmd = match node.cmd { Some(cmd) => cmd, None => break, diff --git a/src/output/mod.rs b/src/output/mod.rs index 2982c9d3..83337e6a 100644 --- a/src/output/mod.rs +++ b/src/output/mod.rs @@ -22,14 +22,19 @@ mod stream; pub use hls::write_hls; -use crate::input::{ingest_server, watch_folder, CurrentProgram, Source}; +use crate::input::{file_worker, ingest_server, CurrentProgram, Source}; use crate::utils::{sec_to_time, stderr_reader, GlobalConfig, Media, ProcessControl}; pub fn source_generator( rt_handle: &Handle, config: GlobalConfig, is_terminated: Arc>, -) -> (Box>, Arc>) { + current_list: Arc>>, + index: Arc>, +) -> ( + Box>, + Arc>, +) { let mut init_playlist: Arc> = Arc::new(Mutex::new(false)); let get_source = match config.processing.clone().mode.as_str() { @@ -42,17 +47,17 @@ pub fn source_generator( info!("Playout in folder mode."); - let folder_source = Source::new(); - let (sender, receiver) = channel(); - let mut watcher = watcher(sender, Duration::from_secs(2)).unwrap(); + let folder_source = Source::new(current_list, index); - watcher + let (sender, receiver) = channel(); + let mut watchman = watcher(sender, Duration::from_secs(2)).unwrap(); + watchman .watch(path.clone(), RecursiveMode::Recursive) .unwrap(); debug!("Monitor folder: {}", path); - rt_handle.spawn(watch_folder(receiver, Arc::clone(&folder_source.nodes))); + rt_handle.spawn(file_worker(receiver, folder_source.nodes.clone())); Box::new(folder_source) as Box> } @@ -81,8 +86,13 @@ pub fn player(rt_handle: &Handle, proc_control: ProcessControl) { let mut buffer: [u8; 65088] = [0; 65088]; let mut live_on = false; - let (get_source, init_playlist) = - source_generator(rt_handle, config.clone(), proc_control.is_terminated.clone()); + let (get_source, init_playlist) = source_generator( + rt_handle, + config.clone(), + proc_control.is_terminated.clone(), + proc_control.current_list.clone(), + proc_control.index.clone(), + ); let mut enc_proc = match config.out.mode.as_str() { "desktop" => desktop::output(ff_log_format.clone()), diff --git a/src/output/stream.rs b/src/output/stream.rs index 39328788..307678c7 100644 --- a/src/output/stream.rs +++ b/src/output/stream.rs @@ -32,7 +32,7 @@ pub fn output(log_format: String) -> process::Child { ); let mut filter: String = "[0:v]null,".to_string(); - filter.push_str(v_drawtext::filter_node(&mut Media::new(0, "".to_string())).as_str()); + filter.push_str(v_drawtext::filter_node(&mut Media::new(0, "".to_string(), false)).as_str()); if config.out.preview { filter.push_str(",split=2[v_out1][v_out2]"); diff --git a/src/utils/json_reader.rs b/src/utils/json_reader.rs index a08a83fc..07fbc3f1 100644 --- a/src/utils/json_reader.rs +++ b/src/utils/json_reader.rs @@ -23,7 +23,7 @@ pub struct Playlist { impl Playlist { fn new(date: String, start: f64) -> Self { - let mut media = Media::new(0, "".to_string()); + let mut media = Media::new(0, "".to_string(), false); media.begin = Some(start); media.duration = DUMMY_LEN; media.out = DUMMY_LEN; diff --git a/src/utils/mod.rs b/src/utils/mod.rs index cf29c038..a9f097b7 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -44,6 +44,8 @@ pub struct ProcessControl { pub is_terminated: Arc>, pub is_alive: Arc>, pub current_media: Arc>>, + pub current_list: Arc>>, + pub index: Arc>, } impl ProcessControl { @@ -57,6 +59,8 @@ impl ProcessControl { is_terminated: Arc::new(Mutex::new(false)), is_alive: Arc::new(RwLock::new(true)), current_media: Arc::new(Mutex::new(None)), + current_list: Arc::new(Mutex::new(vec!(Media::new(0, "".to_string(), false)))), + index: Arc::new(Mutex::new(0)), } } } @@ -124,11 +128,11 @@ pub struct Media { } impl Media { - pub fn new(index: usize, src: String) -> Self { + pub fn new(index: usize, src: String, do_probe: bool) -> Self { let mut duration: f64 = 0.0; let mut probe = None; - if Path::new(&src).is_file() { + if do_probe && Path::new(&src).is_file() { probe = Some(MediaProbe::new(src.clone())); duration = match probe.clone().unwrap().format.unwrap().duration { @@ -155,7 +159,18 @@ impl Media { } pub fn add_probe(&mut self) { - self.probe = Some(MediaProbe::new(self.source.clone())) + let probe = MediaProbe::new(self.source.clone()); + self.probe = Some(probe.clone()); + + if self.duration == 0.0 { + let duration = match probe.format.unwrap().duration { + Some(dur) => dur.parse().unwrap(), + None => 0.0, + }; + + self.out = duration; + self.duration = duration; + } } pub fn add_filter(&mut self) { diff --git a/src/utils/rpc_server.rs b/src/utils/rpc_server.rs index 8d5ec0ea..a704ce95 100644 --- a/src/utils/rpc_server.rs +++ b/src/utils/rpc_server.rs @@ -1,4 +1,4 @@ -use serde_json::{Map, Number}; +use serde_json::{Map, json}; use jsonrpc_http_server::jsonrpc_core::{IoHandler, Params, Value}; use jsonrpc_http_server::{ @@ -6,7 +6,39 @@ use jsonrpc_http_server::{ }; use simplelog::*; -use crate::utils::{GlobalConfig, ProcessControl}; +use crate::utils::{get_sec, sec_to_time, GlobalConfig, Media, ProcessControl}; + +fn get_media_map(media: Media) -> Value { + json!({ + "seek": media.seek, + "out": media.out, + "duration": media.duration, + "category": media.category, + "source": media.source, + }) +} + +fn get_data_map(config: &GlobalConfig, media: Media) -> Map { + let mut data_map = Map::new(); + let begin = media.begin.unwrap_or(0.0); + + data_map.insert("play_mode".to_string(), json!(config.processing.mode)); + data_map.insert("index".to_string(), json!(media.index)); + data_map.insert("start_sec".to_string(), json!(begin)); + + if begin > 0.0 { + let played_time = get_sec() - begin; + let remaining_time = media.out - played_time; + + data_map.insert("start_time".to_string(), json!(sec_to_time(begin))); + data_map.insert("played_sec".to_string(), json!(played_time)); + data_map.insert("remaining_sec".to_string(), json!(remaining_time)); + } + + data_map.insert("current_media".to_string(), get_media_map(media)); + + data_map +} pub async fn run_rpc(proc_control: ProcessControl) { let config = GlobalConfig::global(); @@ -20,23 +52,51 @@ pub async fn run_rpc(proc_control: ProcessControl) { if let Some(decoder) = &*proc.decoder_term.lock().unwrap() { unsafe { if let Ok(_) = decoder.terminate() { - info!("Skip current clip"); - return Ok(Value::String(format!("Skip current clip"))); + info!("Move to next clip"); + + if let Some(media) = proc.current_media.lock().unwrap().clone() { + let mut data_map = Map::new(); + data_map.insert("operation".to_string(), json!("Move to next clip")); + data_map.insert("media".to_string(), get_media_map(media)); + + return Ok(Value::Object(data_map)); + }; + + return Ok(Value::String(format!("Move failed"))); } } } } + if map.contains_key("control") && map["control"] == "back".to_string() { + if let Some(decoder) = &*proc.decoder_term.lock().unwrap() { + let index = *proc.index.lock().unwrap(); + + if index > 1 && proc.current_list.lock().unwrap().len() > 1 { + info!("Move to last clip"); + let mut data_map = Map::new(); + let mut media = proc.current_list.lock().unwrap()[index - 2].clone(); + *proc.index.lock().unwrap() = index - 2; + media.add_probe(); + data_map.insert("operation".to_string(), json!("Move to last clip")); + data_map.insert("media".to_string(), get_media_map(media)); + + unsafe { + if let Ok(_) = decoder.terminate() { + return Ok(Value::Object(data_map)); + } + } + } + + return Ok(Value::String(format!("Move failed"))); + } + } + if map.contains_key("media") && map["media"] == "current".to_string() { if let Some(media) = proc.current_media.lock().unwrap().clone() { - let mut media_map = Map::new(); - media_map.insert( - "begin".to_string(), - Value::Number(Number::from_f64(media.begin.unwrap_or(0.0)).unwrap()), - ); - media_map.insert("source".to_string(), Value::String(media.source)); + let data_map = get_data_map(config, media); - return Ok(Value::Object(media_map)); + return Ok(Value::Object(data_map)); }; } }