continue work on json rpc server

This commit is contained in:
jb-alvarado 2022-04-06 15:53:27 +02:00
parent 8a68c9bd86
commit e33f5a1b5a
11 changed files with 189 additions and 73 deletions

View File

@ -12,20 +12,21 @@ use std::{
use walkdir::WalkDir;
use crate::utils::{GlobalConfig, Media};
use crate::utils::{get_sec, GlobalConfig, Media};
#[derive(Debug, Clone)]
pub struct Source {
config: GlobalConfig,
pub nodes: Arc<Mutex<Vec<String>>>,
pub nodes: Arc<Mutex<Vec<Media>>>,
current_node: Media,
index: usize,
index: Arc<Mutex<usize>>,
}
impl Source {
pub fn new() -> Self {
pub fn new(current_list: Arc<Mutex<Vec<Media>>>, global_index: Arc<Mutex<usize>>) -> Self {
let config = GlobalConfig::global();
let mut file_list = vec![];
let mut media_list = vec![];
let mut index: usize = 0;
for entry in WalkDir::new(config.storage.path.clone())
.into_iter()
@ -41,7 +42,8 @@ impl Source {
.clone()
.contains(&ext.unwrap().to_lowercase())
{
file_list.push(entry.path().display().to_string());
let media = Media::new(0, entry.path().display().to_string(), false);
media_list.push(media);
}
}
}
@ -49,26 +51,48 @@ impl Source {
if config.storage.shuffle {
info!("Shuffle files");
let mut rng = thread_rng();
file_list.shuffle(&mut rng);
media_list.shuffle(&mut rng);
} else {
file_list.sort();
media_list.sort_by(|d1, d2| d1.source.cmp(&d2.source));
}
for item in media_list.iter_mut() {
item.index = Some(index);
index += 1;
}
*current_list.lock().unwrap() = media_list;
Self {
config: config.clone(),
nodes: Arc::new(Mutex::new(file_list)),
current_node: Media::new(0, "".to_string()),
index: 0,
nodes: current_list,
current_node: Media::new(0, "".to_string(), false),
index: global_index,
}
}
fn shuffle(&mut self) {
let mut rng = thread_rng();
self.nodes.lock().unwrap().shuffle(&mut rng);
let mut index: usize = 0;
for item in self.nodes.lock().unwrap().iter_mut() {
item.index = Some(index);
index += 1;
}
}
fn sort(&mut self) {
self.nodes.lock().unwrap().sort();
self.nodes.lock().unwrap().sort_by(|d1, d2| d1.source.cmp(&d2.source));
let mut index: usize = 0;
for item in self.nodes.lock().unwrap().iter_mut() {
item.index = Some(index);
index += 1;
}
}
}
@ -76,13 +100,14 @@ impl Iterator for Source {
type Item = Media;
fn next(&mut self) -> Option<Self::Item> {
if self.index < self.nodes.lock().unwrap().len() {
let current_file = self.nodes.lock().unwrap()[self.index].clone();
self.current_node = Media::new(self.index, current_file);
if *self.index.lock().unwrap() < self.nodes.lock().unwrap().len() {
let i = *self.index.lock().unwrap();
self.current_node = self.nodes.lock().unwrap()[i].clone();
self.current_node.add_probe();
self.current_node.add_filter();
self.current_node.begin = Some(get_sec());
self.index += 1;
*self.index.lock().unwrap() += 1;
Some(self.current_node.clone())
} else {
@ -94,12 +119,12 @@ impl Iterator for Source {
self.sort();
}
let current_file = self.nodes.lock().unwrap()[0].clone();
self.current_node = Media::new(self.index, current_file);
self.current_node = self.nodes.lock().unwrap()[0].clone();
self.current_node.add_probe();
self.current_node.add_filter();
self.current_node.begin = Some(get_sec());
self.index = 1;
*self.index.lock().unwrap() = 1;
Some(self.current_node.clone())
}
@ -110,31 +135,37 @@ fn file_extension(filename: &Path) -> Option<&str> {
filename.extension().and_then(OsStr::to_str)
}
pub async fn watch_folder(
pub async fn file_worker(
receiver: Receiver<notify::DebouncedEvent>,
sources: Arc<Mutex<Vec<String>>>,
sources: Arc<Mutex<Vec<Media>>>,
) {
while let Ok(res) = receiver.recv() {
match res {
Create(new_path) => {
sources.lock().unwrap().push(new_path.display().to_string());
let index = sources.lock().unwrap().len();
let media = Media::new(index, new_path.display().to_string(), false);
sources.lock().unwrap().push(media);
info!("Create new file: {:?}", new_path);
}
Remove(old_path) => {
sources
.lock()
.unwrap()
.retain(|x| x != &old_path.display().to_string());
.retain(|x| x.source != old_path.display().to_string());
info!("Remove file: {:?}", old_path);
}
Rename(old_path, new_path) => {
let i = sources
let index = sources
.lock()
.unwrap()
.iter()
.position(|x| *x == old_path.display().to_string())
.position(|x| *x.source == old_path.display().to_string())
.unwrap();
sources.lock().unwrap()[i] = new_path.display().to_string();
let media = Media::new(index, new_path.display().to_string(), false);
sources.lock().unwrap()[index] = media;
info!("Rename file: {:?} to {:?}", old_path, new_path);
}
_ => (),

View File

@ -3,5 +3,5 @@ pub mod ingest;
pub mod playlist;
pub use ingest::ingest_server;
pub use folder::{watch_folder, Source};
pub use folder::{file_worker, Source};
pub use playlist::CurrentProgram;

View File

@ -38,7 +38,7 @@ impl CurrentProgram {
json_path: json.current_file,
json_date: json.date,
nodes: json.program,
current_node: Media::new(0, "".to_string()),
current_node: Media::new(0, "".to_string(), false),
init: Arc::new(Mutex::new(true)),
index: 0,
rt_handle,
@ -92,7 +92,7 @@ impl CurrentProgram {
"Playlist <b><magenta>{}</></b> not exists!",
self.json_path.clone().unwrap()
);
let mut media = Media::new(0, "".to_string());
let mut media = Media::new(0, "".to_string(), false);
media.begin = Some(get_sec());
media.duration = DUMMY_LEN;
media.out = DUMMY_LEN;
@ -236,7 +236,7 @@ impl Iterator for CurrentProgram {
if self.config.playlist.start_sec.unwrap() > current_time {
current_time += self.config.playlist.length_sec.unwrap() + 1.0;
}
let mut media = Media::new(0, "".to_string());
let mut media = Media::new(0, "".to_string(), false);
media.begin = Some(current_time);
media.duration = duration;
media.out = duration;
@ -279,7 +279,7 @@ impl Iterator for CurrentProgram {
{
// Test if playlist is to early finish,
// and if we have to fill it with a placeholder.
self.current_node = Media::new(self.index, "".to_string());
self.current_node = Media::new(self.index, "".to_string(), false);
self.current_node.begin = Some(get_sec());
let mut duration = total_delta.abs();

View File

@ -30,7 +30,7 @@ fn main() {
}
if config.out.mode.to_lowercase() == "hls".to_string() {
write_hls(rt_handle, proc_control.is_terminated.clone());
write_hls(rt_handle, proc_control);
} else {
player(rt_handle, proc_control);
}

View File

@ -29,7 +29,7 @@ pub fn output(log_format: String) -> process::Child {
);
let mut filter: String = "null,".to_string();
filter.push_str(v_drawtext::filter_node(&mut Media::new(0, "".to_string())).as_str());
filter.push_str(v_drawtext::filter_node(&mut Media::new(0, "".to_string(), false)).as_str());
enc_filter = vec!["-vf".to_string(), filter];
}

View File

@ -1,16 +1,3 @@
use std::{
process::{Command, Stdio},
sync::{
Arc, Mutex,
},
};
use simplelog::*;
use tokio::runtime::Handle;
use crate::output::source_generator;
use crate::utils::{sec_to_time, stderr_reader, GlobalConfig};
/*
This module write the files compression directly to a hls (m3u8) playlist,
without pre- and post-processing.
@ -30,7 +17,15 @@ out:
*/
pub fn write_hls(rt_handle: &Handle, is_terminated: Arc<Mutex<bool>>) {
use std::process::{Command, Stdio};
use simplelog::*;
use tokio::runtime::Handle;
use crate::output::source_generator;
use crate::utils::{sec_to_time, stderr_reader, GlobalConfig, ProcessControl};
pub fn write_hls(rt_handle: &Handle, proc_control: ProcessControl) {
let config = GlobalConfig::global();
let dec_settings = config.out.clone().output_cmd.unwrap();
let ff_log_format = format!("level+{}", config.logging.ffmpeg_level.to_lowercase());
@ -38,10 +33,15 @@ pub fn write_hls(rt_handle: &Handle, is_terminated: Arc<Mutex<bool>>) {
let (get_source, _) = source_generator(
rt_handle,
config.clone(),
is_terminated.clone(),
proc_control.is_terminated.clone(),
proc_control.current_list.clone(),
proc_control.index.clone(),
);
for node in get_source {
*proc_control.current_media.lock().unwrap() = Some(node.clone());
*proc_control.index.lock().unwrap() = node.index.clone().unwrap();
let cmd = match node.cmd {
Some(cmd) => cmd,
None => break,

View File

@ -22,14 +22,19 @@ mod stream;
pub use hls::write_hls;
use crate::input::{ingest_server, watch_folder, CurrentProgram, Source};
use crate::input::{file_worker, ingest_server, CurrentProgram, Source};
use crate::utils::{sec_to_time, stderr_reader, GlobalConfig, Media, ProcessControl};
pub fn source_generator(
rt_handle: &Handle,
config: GlobalConfig,
is_terminated: Arc<Mutex<bool>>,
) -> (Box<dyn Iterator<Item = Media>>, Arc<Mutex<bool>>) {
current_list: Arc<Mutex<Vec<Media>>>,
index: Arc<Mutex<usize>>,
) -> (
Box<dyn Iterator<Item = Media>>,
Arc<Mutex<bool>>,
) {
let mut init_playlist: Arc<Mutex<bool>> = Arc::new(Mutex::new(false));
let get_source = match config.processing.clone().mode.as_str() {
@ -42,17 +47,17 @@ pub fn source_generator(
info!("Playout in folder mode.");
let folder_source = Source::new();
let (sender, receiver) = channel();
let mut watcher = watcher(sender, Duration::from_secs(2)).unwrap();
let folder_source = Source::new(current_list, index);
watcher
let (sender, receiver) = channel();
let mut watchman = watcher(sender, Duration::from_secs(2)).unwrap();
watchman
.watch(path.clone(), RecursiveMode::Recursive)
.unwrap();
debug!("Monitor folder: <b><magenta>{}</></b>", path);
rt_handle.spawn(watch_folder(receiver, Arc::clone(&folder_source.nodes)));
rt_handle.spawn(file_worker(receiver, folder_source.nodes.clone()));
Box::new(folder_source) as Box<dyn Iterator<Item = Media>>
}
@ -81,8 +86,13 @@ pub fn player(rt_handle: &Handle, proc_control: ProcessControl) {
let mut buffer: [u8; 65088] = [0; 65088];
let mut live_on = false;
let (get_source, init_playlist) =
source_generator(rt_handle, config.clone(), proc_control.is_terminated.clone());
let (get_source, init_playlist) = source_generator(
rt_handle,
config.clone(),
proc_control.is_terminated.clone(),
proc_control.current_list.clone(),
proc_control.index.clone(),
);
let mut enc_proc = match config.out.mode.as_str() {
"desktop" => desktop::output(ff_log_format.clone()),

View File

@ -32,7 +32,7 @@ pub fn output(log_format: String) -> process::Child {
);
let mut filter: String = "[0:v]null,".to_string();
filter.push_str(v_drawtext::filter_node(&mut Media::new(0, "".to_string())).as_str());
filter.push_str(v_drawtext::filter_node(&mut Media::new(0, "".to_string(), false)).as_str());
if config.out.preview {
filter.push_str(",split=2[v_out1][v_out2]");

View File

@ -23,7 +23,7 @@ pub struct Playlist {
impl Playlist {
fn new(date: String, start: f64) -> Self {
let mut media = Media::new(0, "".to_string());
let mut media = Media::new(0, "".to_string(), false);
media.begin = Some(start);
media.duration = DUMMY_LEN;
media.out = DUMMY_LEN;

View File

@ -44,6 +44,8 @@ pub struct ProcessControl {
pub is_terminated: Arc<Mutex<bool>>,
pub is_alive: Arc<RwLock<bool>>,
pub current_media: Arc<Mutex<Option<Media>>>,
pub current_list: Arc<Mutex<Vec<Media>>>,
pub index: Arc<Mutex<usize>>,
}
impl ProcessControl {
@ -57,6 +59,8 @@ impl ProcessControl {
is_terminated: Arc::new(Mutex::new(false)),
is_alive: Arc::new(RwLock::new(true)),
current_media: Arc::new(Mutex::new(None)),
current_list: Arc::new(Mutex::new(vec!(Media::new(0, "".to_string(), false)))),
index: Arc::new(Mutex::new(0)),
}
}
}
@ -124,11 +128,11 @@ pub struct Media {
}
impl Media {
pub fn new(index: usize, src: String) -> Self {
pub fn new(index: usize, src: String, do_probe: bool) -> Self {
let mut duration: f64 = 0.0;
let mut probe = None;
if Path::new(&src).is_file() {
if do_probe && Path::new(&src).is_file() {
probe = Some(MediaProbe::new(src.clone()));
duration = match probe.clone().unwrap().format.unwrap().duration {
@ -155,7 +159,18 @@ impl Media {
}
pub fn add_probe(&mut self) {
self.probe = Some(MediaProbe::new(self.source.clone()))
let probe = MediaProbe::new(self.source.clone());
self.probe = Some(probe.clone());
if self.duration == 0.0 {
let duration = match probe.format.unwrap().duration {
Some(dur) => dur.parse().unwrap(),
None => 0.0,
};
self.out = duration;
self.duration = duration;
}
}
pub fn add_filter(&mut self) {

View File

@ -1,4 +1,4 @@
use serde_json::{Map, Number};
use serde_json::{Map, json};
use jsonrpc_http_server::jsonrpc_core::{IoHandler, Params, Value};
use jsonrpc_http_server::{
@ -6,7 +6,39 @@ use jsonrpc_http_server::{
};
use simplelog::*;
use crate::utils::{GlobalConfig, ProcessControl};
use crate::utils::{get_sec, sec_to_time, GlobalConfig, Media, ProcessControl};
fn get_media_map(media: Media) -> Value {
json!({
"seek": media.seek,
"out": media.out,
"duration": media.duration,
"category": media.category,
"source": media.source,
})
}
fn get_data_map(config: &GlobalConfig, media: Media) -> Map<String, Value> {
let mut data_map = Map::new();
let begin = media.begin.unwrap_or(0.0);
data_map.insert("play_mode".to_string(), json!(config.processing.mode));
data_map.insert("index".to_string(), json!(media.index));
data_map.insert("start_sec".to_string(), json!(begin));
if begin > 0.0 {
let played_time = get_sec() - begin;
let remaining_time = media.out - played_time;
data_map.insert("start_time".to_string(), json!(sec_to_time(begin)));
data_map.insert("played_sec".to_string(), json!(played_time));
data_map.insert("remaining_sec".to_string(), json!(remaining_time));
}
data_map.insert("current_media".to_string(), get_media_map(media));
data_map
}
pub async fn run_rpc(proc_control: ProcessControl) {
let config = GlobalConfig::global();
@ -20,23 +52,51 @@ pub async fn run_rpc(proc_control: ProcessControl) {
if let Some(decoder) = &*proc.decoder_term.lock().unwrap() {
unsafe {
if let Ok(_) = decoder.terminate() {
info!("Skip current clip");
return Ok(Value::String(format!("Skip current clip")));
info!("Move to next clip");
if let Some(media) = proc.current_media.lock().unwrap().clone() {
let mut data_map = Map::new();
data_map.insert("operation".to_string(), json!("Move to next clip"));
data_map.insert("media".to_string(), get_media_map(media));
return Ok(Value::Object(data_map));
};
return Ok(Value::String(format!("Move failed")));
}
}
}
}
if map.contains_key("control") && map["control"] == "back".to_string() {
if let Some(decoder) = &*proc.decoder_term.lock().unwrap() {
let index = *proc.index.lock().unwrap();
if index > 1 && proc.current_list.lock().unwrap().len() > 1 {
info!("Move to last clip");
let mut data_map = Map::new();
let mut media = proc.current_list.lock().unwrap()[index - 2].clone();
*proc.index.lock().unwrap() = index - 2;
media.add_probe();
data_map.insert("operation".to_string(), json!("Move to last clip"));
data_map.insert("media".to_string(), get_media_map(media));
unsafe {
if let Ok(_) = decoder.terminate() {
return Ok(Value::Object(data_map));
}
}
}
return Ok(Value::String(format!("Move failed")));
}
}
if map.contains_key("media") && map["media"] == "current".to_string() {
if let Some(media) = proc.current_media.lock().unwrap().clone() {
let mut media_map = Map::new();
media_map.insert(
"begin".to_string(),
Value::Number(Number::from_f64(media.begin.unwrap_or(0.0)).unwrap()),
);
media_map.insert("source".to_string(), Value::String(media.source));
let data_map = get_data_map(config, media);
return Ok(Value::Object(media_map));
return Ok(Value::Object(data_map));
};
}
}