From c239e952b8d6a5d8c12f0711d740201963b1ee7d Mon Sep 17 00:00:00 2001 From: jb-alvarado Date: Wed, 6 Apr 2022 22:29:46 +0200 Subject: [PATCH] work on process status control --- assets/ffplayout.yml | 1 + src/filter/mod.rs | 9 +++--- src/input/folder.rs | 4 +-- src/input/playlist.rs | 57 ++++++++++++++++++++------------ src/main.rs | 5 +-- src/output/hls.rs | 12 +++++-- src/output/mod.rs | 10 ++++-- src/utils/config.rs | 4 +-- src/utils/mod.rs | 72 ++++++++++++++++++++++++++++++----------- src/utils/rpc_server.rs | 11 +++++-- 10 files changed, 129 insertions(+), 56 deletions(-) diff --git a/assets/ffplayout.yml b/assets/ffplayout.yml index 29551e12..52be5b88 100644 --- a/assets/ffplayout.yml +++ b/assets/ffplayout.yml @@ -25,6 +25,7 @@ mail: sender_pass: "abc123" recipient: mail_level: "ERROR" + interval: 30 logging: help_text: Logging to file, if 'log_to_file' false log to console. 'backup_count' diff --git a/src/filter/mod.rs b/src/filter/mod.rs index 97006783..26ec10c8 100644 --- a/src/filter/mod.rs +++ b/src/filter/mod.rs @@ -290,6 +290,7 @@ fn realtime_filter( chain: &mut Filters, config: &GlobalConfig, codec_type: String, + json_date: &String ) { //this realtime filter is important for HLS output to stay in sync @@ -301,7 +302,7 @@ fn realtime_filter( if config.out.mode.to_lowercase() == "hls".to_string() { let mut speed_filter = format!("{t}realtime=speed=1"); - let (delta, _) = get_delta(&node.begin.unwrap()); + let (delta, _) = get_delta(&node.begin.unwrap(), &json_date, true); let duration = node.out - node.seek; if delta < 0.0 { @@ -316,7 +317,7 @@ fn realtime_filter( } } -pub fn filter_chains(node: &mut Media) -> Vec { +pub fn filter_chains(node: &mut Media, json_date: &String) -> Vec { let config = GlobalConfig::global(); let mut filters = Filters::new(); @@ -354,12 +355,12 @@ pub fn filter_chains(node: &mut Media) -> Vec { add_text(node, &mut filters, &config); fade(node, &mut filters, "video".into()); overlay(node, &mut filters, &config); - realtime_filter(node, &mut filters, &config, "video".into()); + realtime_filter(node, &mut filters, &config, "video".into(), &json_date); add_loudnorm(node, &mut filters, &config); fade(node, &mut filters, "audio".into()); audio_volume(&mut filters, &config); - realtime_filter(node, &mut filters, &config, "audio".into()); + realtime_filter(node, &mut filters, &config, "audio".into(), &json_date); let mut filter_cmd = vec![]; let mut filter_str: String = "".to_string(); diff --git a/src/input/folder.rs b/src/input/folder.rs index c0a3bfc8..b7614b53 100644 --- a/src/input/folder.rs +++ b/src/input/folder.rs @@ -104,7 +104,7 @@ impl Iterator for Source { let i = *self.index.lock().unwrap(); self.current_node = self.nodes.lock().unwrap()[i].clone(); self.current_node.add_probe(); - self.current_node.add_filter(); + self.current_node.add_filter(&"".to_string()); self.current_node.begin = Some(get_sec()); *self.index.lock().unwrap() += 1; @@ -121,7 +121,7 @@ impl Iterator for Source { self.current_node = self.nodes.lock().unwrap()[0].clone(); self.current_node.add_probe(); - self.current_node.add_filter(); + self.current_node.add_filter(&"".to_string()); self.current_node.begin = Some(get_sec()); *self.index.lock().unwrap() = 1; diff --git a/src/input/playlist.rs b/src/input/playlist.rs index 0c8ed6f4..e99ff835 100644 --- a/src/input/playlist.rs +++ b/src/input/playlist.rs @@ -8,7 +8,7 @@ use tokio::runtime::Handle; use crate::utils::{ check_sync, gen_dummy, get_delta, get_sec, is_close, json_reader::read_json, modified_time, - seek_and_length, GlobalConfig, Media, DUMMY_LEN, + seek_and_length, GlobalConfig, Media, PlayoutStatus, DUMMY_LEN, }; #[derive(Debug)] @@ -34,10 +34,15 @@ impl CurrentProgram { global_index: Arc>, ) -> Self { let config = GlobalConfig::global(); + let status = PlayoutStatus::global(); let json = read_json(None, rt_handle.clone(), is_terminated.clone(), true, 0.0); *current_list.lock().unwrap() = json.program; + if status.date != json.date { + status.clone().write(json.date.clone(), 0.0) + } + Self { config: config.clone(), start_sec: json.start_sec.unwrap(), @@ -116,7 +121,7 @@ impl CurrentProgram { let current_time = get_sec(); let start_sec = self.config.playlist.start_sec.unwrap(); let target_length = self.config.playlist.length_sec.unwrap(); - let (delta, total_delta) = get_delta(¤t_time); + let (delta, total_delta) = get_delta(¤t_time, &self.json_date, true); let mut duration = self.current_node.out.clone(); if self.current_node.duration > self.current_node.out { @@ -137,6 +142,9 @@ impl CurrentProgram { next_start, ); + let status = PlayoutStatus::global(); + status.clone().write(json.date.clone(), 0.0); + self.json_path = json.current_file.clone(); self.json_mod = json.modified; self.json_date = json.date; @@ -202,7 +210,7 @@ impl CurrentProgram { *self.index.lock().unwrap() += 1; node_clone.seek = time_sec - node_clone.begin.unwrap(); - self.current_node = handle_list_init(node_clone); + self.current_node = handle_list_init(node_clone, &self.json_date); } } } @@ -227,7 +235,7 @@ impl Iterator for CurrentProgram { self.current_node = self.nodes.lock().unwrap()[list_length - 1].clone(); self.check_for_next_playlist(); - let new_node = self.nodes.lock().unwrap()[list_length - 1].clone(); + let new_node = self.nodes.lock().unwrap()[list_length - 1].clone(); let new_length = new_node.begin.unwrap() + new_node.duration; if new_length @@ -237,7 +245,7 @@ impl Iterator for CurrentProgram { self.init_clip(); } else { let mut current_time = get_sec(); - let (_, total_delta) = get_delta(¤t_time); + let (_, total_delta) = get_delta(¤t_time, &self.json_date, true); let mut duration = DUMMY_LEN; if DUMMY_LEN > total_delta { @@ -253,7 +261,7 @@ impl Iterator for CurrentProgram { media.duration = duration; media.out = duration; - self.current_node = gen_source(media); + self.current_node = gen_source(media, &self.json_date); self.nodes.lock().unwrap().push(self.current_node.clone()); *self.index.lock().unwrap() = self.nodes.lock().unwrap().len(); } @@ -273,7 +281,12 @@ impl Iterator for CurrentProgram { is_last = true } - self.current_node = timed_source(self.nodes.lock().unwrap()[index].clone(), &self.config, is_last); + self.current_node = timed_source( + self.nodes.lock().unwrap()[index].clone(), + &self.config, + is_last, + &self.json_date, + ); self.last_next_ad(); *self.index.lock().unwrap() += 1; @@ -285,7 +298,11 @@ impl Iterator for CurrentProgram { let last_playlist = self.json_path.clone(); let last_ad = self.current_node.last_ad.clone(); self.check_for_next_playlist(); - let (_, total_delta) = get_delta(&self.config.playlist.start_sec.unwrap()); + let (_, total_delta) = get_delta( + &self.config.playlist.start_sec.unwrap(), + &self.json_date, + true, + ); if last_playlist == self.json_path && total_delta.abs() > self.config.general.stop_threshold @@ -302,12 +319,12 @@ impl Iterator for CurrentProgram { } self.current_node.duration = duration; self.current_node.out = duration; - self.current_node = gen_source(self.current_node.clone()); + self.current_node = gen_source(self.current_node.clone(), &self.json_date); self.nodes.lock().unwrap().push(self.current_node.clone()); self.last_next_ad(); self.current_node.last_ad = last_ad; - self.current_node.add_filter(); + self.current_node.add_filter(&self.json_date); *self.index.lock().unwrap() += 1; @@ -315,7 +332,7 @@ impl Iterator for CurrentProgram { } *self.index.lock().unwrap() = 0; - self.current_node = gen_source(self.nodes.lock().unwrap()[0].clone()); + self.current_node = gen_source(self.nodes.lock().unwrap()[0].clone(), &self.json_date); self.last_next_ad(); self.current_node.last_ad = last_ad; @@ -326,12 +343,12 @@ impl Iterator for CurrentProgram { } } -fn timed_source(node: Media, config: &GlobalConfig, last: bool) -> Media { +fn timed_source(node: Media, config: &GlobalConfig, last: bool, json_date: &String) -> Media { // prepare input clip // check begin and length from clip // return clip only if we are in 24 hours time range - let (delta, total_delta) = get_delta(&node.begin.unwrap()); + let (delta, total_delta) = get_delta(&node.begin.unwrap(), &json_date, true); let mut new_node = node.clone(); new_node.process = Some(false); @@ -352,7 +369,7 @@ fn timed_source(node: Media, config: &GlobalConfig, last: bool) -> Media { || !config.playlist.length.contains(":") { // when we are in the 24 hour range, get the clip - new_node = gen_source(node); + new_node = gen_source(node, &json_date); new_node.process = Some(true); } else if total_delta <= 0.0 { info!("Begin is over play time, skip: {}", node.source); @@ -363,7 +380,7 @@ fn timed_source(node: Media, config: &GlobalConfig, last: bool) -> Media { new_node } -fn gen_source(mut node: Media) -> Media { +fn gen_source(mut node: Media, json_date: &String) -> Media { if Path::new(&node.source).is_file() { node.add_probe(); node.cmd = Some(seek_and_length( @@ -372,7 +389,7 @@ fn gen_source(mut node: Media) -> Media { node.out, node.duration, )); - node.add_filter(); + node.add_filter(&json_date); } else { if node.source.chars().count() == 0 { warn!( @@ -385,17 +402,17 @@ fn gen_source(mut node: Media) -> Media { let (source, cmd) = gen_dummy(node.out - node.seek); node.source = source; node.cmd = Some(cmd); - node.add_filter(); + node.add_filter(&json_date); } node } -fn handle_list_init(mut node: Media) -> Media { +fn handle_list_init(mut node: Media, json_date: &String) -> Media { // handle init clip, but this clip can be the last one in playlist, // this we have to figure out and calculate the right length - let (_, total_delta) = get_delta(&node.begin.unwrap()); + let (_, total_delta) = get_delta(&node.begin.unwrap(), &json_date, true); let mut out = node.out; if node.out - node.seek > total_delta { @@ -404,7 +421,7 @@ fn handle_list_init(mut node: Media) -> Media { node.out = out; - let new_node = gen_source(node); + let new_node = gen_source(node, &json_date); new_node } diff --git a/src/main.rs b/src/main.rs index e774fd28..938fb9de 100644 --- a/src/main.rs +++ b/src/main.rs @@ -11,7 +11,7 @@ mod utils; use crate::output::{player, write_hls}; use crate::utils::{ - init_config, init_logging, run_rpc, validate_ffmpeg, GlobalConfig, PlayerControl, + init_config, init_logging, init_status, run_rpc, validate_ffmpeg, GlobalConfig, PlayerControl, PlayoutStatus, ProcessControl, }; @@ -19,8 +19,8 @@ fn main() { init_config(); let config = GlobalConfig::global(); let play_control = PlayerControl::new(); + let _ = PlayoutStatus::new(); let proc_control = ProcessControl::new(); - let _playout_stat = PlayoutStatus::new(); let runtime = Builder::new_multi_thread().enable_all().build().unwrap(); let rt_handle = runtime.handle(); @@ -28,6 +28,7 @@ fn main() { let logging = init_logging(rt_handle.clone(), proc_control.is_terminated.clone()); CombinedLogger::init(logging).unwrap(); + init_status(); validate_ffmpeg(); if config.rpc_server.enable { diff --git a/src/output/hls.rs b/src/output/hls.rs index 6f367198..515ff393 100644 --- a/src/output/hls.rs +++ b/src/output/hls.rs @@ -23,9 +23,15 @@ use simplelog::*; use tokio::runtime::Handle; use crate::output::source_generator; -use crate::utils::{sec_to_time, stderr_reader, GlobalConfig, PlayerControl, ProcessControl}; +use crate::utils::{ + sec_to_time, stderr_reader, GlobalConfig, PlayerControl, ProcessControl, +}; -pub fn write_hls(rt_handle: &Handle, play_control: PlayerControl, proc_control: ProcessControl) { +pub fn write_hls( + rt_handle: &Handle, + play_control: PlayerControl, + proc_control: ProcessControl, +) { let config = GlobalConfig::global(); let dec_settings = config.out.clone().output_cmd.unwrap(); let ff_log_format = format!("level+{}", config.logging.ffmpeg_level.to_lowercase()); @@ -33,9 +39,9 @@ pub fn write_hls(rt_handle: &Handle, play_control: PlayerControl, proc_control: let (get_source, _) = source_generator( rt_handle, config.clone(), - proc_control.is_terminated.clone(), play_control.current_list.clone(), play_control.index.clone(), + proc_control.is_terminated.clone(), ); for node in get_source { diff --git a/src/output/mod.rs b/src/output/mod.rs index 3dd4d4b2..a774c989 100644 --- a/src/output/mod.rs +++ b/src/output/mod.rs @@ -30,9 +30,9 @@ use crate::utils::{ pub fn source_generator( rt_handle: &Handle, config: GlobalConfig, - is_terminated: Arc>, current_list: Arc>>, index: Arc>, + is_terminated: Arc>, ) -> (Box>, Arc>) { let mut init_playlist: Arc> = Arc::new(Mutex::new(false)); @@ -81,7 +81,11 @@ pub fn source_generator( (get_source, init_playlist) } -pub fn player(rt_handle: &Handle, play_control: PlayerControl, proc_control: ProcessControl) { +pub fn player( + rt_handle: &Handle, + play_control: PlayerControl, + proc_control: ProcessControl, +) { let config = GlobalConfig::global(); let dec_settings = config.processing.clone().settings.unwrap(); let ff_log_format = format!("level+{}", config.logging.ffmpeg_level.to_lowercase()); @@ -93,9 +97,9 @@ pub fn player(rt_handle: &Handle, play_control: PlayerControl, proc_control: Pro let (get_source, init_playlist) = source_generator( rt_handle, config.clone(), - proc_control.is_terminated.clone(), play_control.current_list.clone(), play_control.index.clone(), + proc_control.is_terminated.clone(), ); let mut enc_proc = match config.out.mode.as_str() { diff --git a/src/utils/config.rs b/src/utils/config.rs index 57207fe1..0ef61529 100644 --- a/src/utils/config.rs +++ b/src/utils/config.rs @@ -127,8 +127,6 @@ pub struct Out { pub output_cmd: Option>, } -static INSTANCE: OnceCell = OnceCell::new(); - impl GlobalConfig { fn new() -> Self { let args = get_args(); @@ -253,6 +251,8 @@ impl GlobalConfig { } } +static INSTANCE: OnceCell = OnceCell::new(); + fn pre_audio_codec(add_loudnorm: bool) -> Vec { // when add_loudnorm is False we use a different audio encoder, // s302m has higher quality, but is experimental diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 04076dfe..410d2a7f 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -5,7 +5,7 @@ use serde::{Deserialize, Serialize}; use std::{ env::temp_dir, fs, - fs::metadata, + fs::{metadata, File}, io::{BufRead, BufReader, Error}, path::Path, process::exit, @@ -14,6 +14,7 @@ use std::{ time, time::UNIX_EPOCH, }; +use once_cell::sync::OnceCell; use jsonrpc_http_server::CloseHandle; use process_control::Terminator; @@ -105,7 +106,7 @@ impl Drop for ProcessControl { } } -#[derive(Serialize, Clone)] +#[derive(Clone, Debug, Deserialize, Serialize)] pub struct PlayoutStatus { pub time_shift: f64, pub date: String, @@ -113,23 +114,51 @@ pub struct PlayoutStatus { impl PlayoutStatus { pub fn new() -> Self { - let stat_file = temp_dir().join("ffplayout.json"); + let stat_file = temp_dir().join("ffplayout_status.json"); - if !stat_file.exists() { - let data = Self { - time_shift: 0.0, - date: "".to_string(), - }; - - let json: String = serde_json::to_string(&data).expect("Serde read data failed"); - fs::write(stat_file, &json).expect("Unable to write file"); - } - - Self { + let mut data: PlayoutStatus = Self { time_shift: 0.0, date: "".to_string(), + }; + + if !stat_file.exists() { + + } else { + let file = File::options() + .read(true) + .write(false) + .open(&stat_file.display().to_string()) + .expect("Could not open status file"); + + data = serde_json::from_reader(file).expect("Could not read status file."); } + + data } + + pub fn write(mut self, date: String, time_shift: f64) { + let stat_file = temp_dir().join("ffplayout_status.json"); + + self.date = date.clone(); + self.time_shift = time_shift.clone(); + + if let Ok (json) = serde_json::to_string(&self) { + if let Err(e) = fs::write(stat_file, &json) { + error!("Unable to write status file: {e}") + }; + }; + } + + pub fn global() -> &'static PlayoutStatus { + STATUS_CELL.get().expect("Config is not initialized") + } +} + +static STATUS_CELL: OnceCell = OnceCell::new(); + +pub fn init_status() { + let status = PlayoutStatus::new(); + STATUS_CELL.set(status).unwrap(); } #[derive(Clone)] @@ -213,9 +242,9 @@ impl Media { } } - pub fn add_filter(&mut self) { + pub fn add_filter(&mut self, json_date: &String) { let mut node = self.clone(); - self.filter = Some(filter_chains(&mut node)) + self.filter = Some(filter_chains(&mut node, &json_date)) } } @@ -349,13 +378,15 @@ pub fn is_close(a: f64, b: f64, to: f64) -> bool { false } -pub fn get_delta(begin: &f64) -> (f64, f64) { +pub fn get_delta(begin: &f64, json_date: &String, shift: bool) -> (f64, f64) { let config = GlobalConfig::global(); + let status = PlayoutStatus::global(); + let mut current_time = get_sec(); let start = config.playlist.start_sec.unwrap(); let length = time_to_sec(&config.playlist.length); let mut target_length = 86400.0; - let total_delta; + let mut total_delta; if length > 0.0 && length != target_length { target_length = length @@ -378,6 +409,11 @@ pub fn get_delta(begin: &f64) -> (f64, f64) { total_delta = target_length + start - current_time; } + if shift && json_date == &status.date && status.time_shift != 0.0 { + current_delta -= status.time_shift; + total_delta -= status.time_shift; + } + (current_delta, total_delta) } diff --git a/src/utils/rpc_server.rs b/src/utils/rpc_server.rs index 126e52ba..b4e57581 100644 --- a/src/utils/rpc_server.rs +++ b/src/utils/rpc_server.rs @@ -6,7 +6,9 @@ use jsonrpc_http_server::{ }; use simplelog::*; -use crate::utils::{get_sec, sec_to_time, GlobalConfig, Media, PlayerControl, ProcessControl}; +use crate::utils::{ + get_delta, get_sec, sec_to_time, GlobalConfig, Media, PlayerControl, PlayoutStatus, ProcessControl, +}; fn get_media_map(media: Media) -> Value { json!({ @@ -55,12 +57,17 @@ pub async fn run_rpc(play_control: PlayerControl, proc_control: ProcessControl) if let Ok(_) = decoder.terminate() { info!("Move to next clip"); let index = *play.index.lock().unwrap(); + let status = PlayoutStatus::global(); if index < play.current_list.lock().unwrap().len() { let mut data_map = Map::new(); let mut media = - play.current_list.lock().unwrap()[index].clone(); + play.current_list.lock().unwrap()[index].clone(); media.add_probe(); + + let (delta, _) = get_delta(&media.begin.unwrap_or(0.0), &status.date, false); + status.clone().write(status.date.clone(), delta); + data_map.insert( "operation".to_string(), json!("Move to next clip"),