work on process status control

This commit is contained in:
jb-alvarado 2022-04-06 22:29:46 +02:00
parent f1e92181d5
commit c239e952b8
10 changed files with 129 additions and 56 deletions

View File

@ -25,6 +25,7 @@ mail:
sender_pass: "abc123"
recipient:
mail_level: "ERROR"
interval: 30
logging:
help_text: Logging to file, if 'log_to_file' false log to console. 'backup_count'

View File

@ -290,6 +290,7 @@ fn realtime_filter(
chain: &mut Filters,
config: &GlobalConfig,
codec_type: String,
json_date: &String
) {
//this realtime filter is important for HLS output to stay in sync
@ -301,7 +302,7 @@ fn realtime_filter(
if config.out.mode.to_lowercase() == "hls".to_string() {
let mut speed_filter = format!("{t}realtime=speed=1");
let (delta, _) = get_delta(&node.begin.unwrap());
let (delta, _) = get_delta(&node.begin.unwrap(), &json_date, true);
let duration = node.out - node.seek;
if delta < 0.0 {
@ -316,7 +317,7 @@ fn realtime_filter(
}
}
pub fn filter_chains(node: &mut Media) -> Vec<String> {
pub fn filter_chains(node: &mut Media, json_date: &String) -> Vec<String> {
let config = GlobalConfig::global();
let mut filters = Filters::new();
@ -354,12 +355,12 @@ pub fn filter_chains(node: &mut Media) -> Vec<String> {
add_text(node, &mut filters, &config);
fade(node, &mut filters, "video".into());
overlay(node, &mut filters, &config);
realtime_filter(node, &mut filters, &config, "video".into());
realtime_filter(node, &mut filters, &config, "video".into(), &json_date);
add_loudnorm(node, &mut filters, &config);
fade(node, &mut filters, "audio".into());
audio_volume(&mut filters, &config);
realtime_filter(node, &mut filters, &config, "audio".into());
realtime_filter(node, &mut filters, &config, "audio".into(), &json_date);
let mut filter_cmd = vec![];
let mut filter_str: String = "".to_string();

View File

@ -104,7 +104,7 @@ impl Iterator for Source {
let i = *self.index.lock().unwrap();
self.current_node = self.nodes.lock().unwrap()[i].clone();
self.current_node.add_probe();
self.current_node.add_filter();
self.current_node.add_filter(&"".to_string());
self.current_node.begin = Some(get_sec());
*self.index.lock().unwrap() += 1;
@ -121,7 +121,7 @@ impl Iterator for Source {
self.current_node = self.nodes.lock().unwrap()[0].clone();
self.current_node.add_probe();
self.current_node.add_filter();
self.current_node.add_filter(&"".to_string());
self.current_node.begin = Some(get_sec());
*self.index.lock().unwrap() = 1;

View File

@ -8,7 +8,7 @@ use tokio::runtime::Handle;
use crate::utils::{
check_sync, gen_dummy, get_delta, get_sec, is_close, json_reader::read_json, modified_time,
seek_and_length, GlobalConfig, Media, DUMMY_LEN,
seek_and_length, GlobalConfig, Media, PlayoutStatus, DUMMY_LEN,
};
#[derive(Debug)]
@ -34,10 +34,15 @@ impl CurrentProgram {
global_index: Arc<Mutex<usize>>,
) -> Self {
let config = GlobalConfig::global();
let status = PlayoutStatus::global();
let json = read_json(None, rt_handle.clone(), is_terminated.clone(), true, 0.0);
*current_list.lock().unwrap() = json.program;
if status.date != json.date {
status.clone().write(json.date.clone(), 0.0)
}
Self {
config: config.clone(),
start_sec: json.start_sec.unwrap(),
@ -116,7 +121,7 @@ impl CurrentProgram {
let current_time = get_sec();
let start_sec = self.config.playlist.start_sec.unwrap();
let target_length = self.config.playlist.length_sec.unwrap();
let (delta, total_delta) = get_delta(&current_time);
let (delta, total_delta) = get_delta(&current_time, &self.json_date, true);
let mut duration = self.current_node.out.clone();
if self.current_node.duration > self.current_node.out {
@ -137,6 +142,9 @@ impl CurrentProgram {
next_start,
);
let status = PlayoutStatus::global();
status.clone().write(json.date.clone(), 0.0);
self.json_path = json.current_file.clone();
self.json_mod = json.modified;
self.json_date = json.date;
@ -202,7 +210,7 @@ impl CurrentProgram {
*self.index.lock().unwrap() += 1;
node_clone.seek = time_sec - node_clone.begin.unwrap();
self.current_node = handle_list_init(node_clone);
self.current_node = handle_list_init(node_clone, &self.json_date);
}
}
}
@ -237,7 +245,7 @@ impl Iterator for CurrentProgram {
self.init_clip();
} else {
let mut current_time = get_sec();
let (_, total_delta) = get_delta(&current_time);
let (_, total_delta) = get_delta(&current_time, &self.json_date, true);
let mut duration = DUMMY_LEN;
if DUMMY_LEN > total_delta {
@ -253,7 +261,7 @@ impl Iterator for CurrentProgram {
media.duration = duration;
media.out = duration;
self.current_node = gen_source(media);
self.current_node = gen_source(media, &self.json_date);
self.nodes.lock().unwrap().push(self.current_node.clone());
*self.index.lock().unwrap() = self.nodes.lock().unwrap().len();
}
@ -273,7 +281,12 @@ impl Iterator for CurrentProgram {
is_last = true
}
self.current_node = timed_source(self.nodes.lock().unwrap()[index].clone(), &self.config, is_last);
self.current_node = timed_source(
self.nodes.lock().unwrap()[index].clone(),
&self.config,
is_last,
&self.json_date,
);
self.last_next_ad();
*self.index.lock().unwrap() += 1;
@ -285,7 +298,11 @@ impl Iterator for CurrentProgram {
let last_playlist = self.json_path.clone();
let last_ad = self.current_node.last_ad.clone();
self.check_for_next_playlist();
let (_, total_delta) = get_delta(&self.config.playlist.start_sec.unwrap());
let (_, total_delta) = get_delta(
&self.config.playlist.start_sec.unwrap(),
&self.json_date,
true,
);
if last_playlist == self.json_path
&& total_delta.abs() > self.config.general.stop_threshold
@ -302,12 +319,12 @@ impl Iterator for CurrentProgram {
}
self.current_node.duration = duration;
self.current_node.out = duration;
self.current_node = gen_source(self.current_node.clone());
self.current_node = gen_source(self.current_node.clone(), &self.json_date);
self.nodes.lock().unwrap().push(self.current_node.clone());
self.last_next_ad();
self.current_node.last_ad = last_ad;
self.current_node.add_filter();
self.current_node.add_filter(&self.json_date);
*self.index.lock().unwrap() += 1;
@ -315,7 +332,7 @@ impl Iterator for CurrentProgram {
}
*self.index.lock().unwrap() = 0;
self.current_node = gen_source(self.nodes.lock().unwrap()[0].clone());
self.current_node = gen_source(self.nodes.lock().unwrap()[0].clone(), &self.json_date);
self.last_next_ad();
self.current_node.last_ad = last_ad;
@ -326,12 +343,12 @@ impl Iterator for CurrentProgram {
}
}
fn timed_source(node: Media, config: &GlobalConfig, last: bool) -> Media {
fn timed_source(node: Media, config: &GlobalConfig, last: bool, json_date: &String) -> Media {
// prepare input clip
// check begin and length from clip
// return clip only if we are in 24 hours time range
let (delta, total_delta) = get_delta(&node.begin.unwrap());
let (delta, total_delta) = get_delta(&node.begin.unwrap(), &json_date, true);
let mut new_node = node.clone();
new_node.process = Some(false);
@ -352,7 +369,7 @@ fn timed_source(node: Media, config: &GlobalConfig, last: bool) -> Media {
|| !config.playlist.length.contains(":")
{
// when we are in the 24 hour range, get the clip
new_node = gen_source(node);
new_node = gen_source(node, &json_date);
new_node.process = Some(true);
} else if total_delta <= 0.0 {
info!("Begin is over play time, skip: {}", node.source);
@ -363,7 +380,7 @@ fn timed_source(node: Media, config: &GlobalConfig, last: bool) -> Media {
new_node
}
fn gen_source(mut node: Media) -> Media {
fn gen_source(mut node: Media, json_date: &String) -> Media {
if Path::new(&node.source).is_file() {
node.add_probe();
node.cmd = Some(seek_and_length(
@ -372,7 +389,7 @@ fn gen_source(mut node: Media) -> Media {
node.out,
node.duration,
));
node.add_filter();
node.add_filter(&json_date);
} else {
if node.source.chars().count() == 0 {
warn!(
@ -385,17 +402,17 @@ fn gen_source(mut node: Media) -> Media {
let (source, cmd) = gen_dummy(node.out - node.seek);
node.source = source;
node.cmd = Some(cmd);
node.add_filter();
node.add_filter(&json_date);
}
node
}
fn handle_list_init(mut node: Media) -> Media {
fn handle_list_init(mut node: Media, json_date: &String) -> Media {
// handle init clip, but this clip can be the last one in playlist,
// this we have to figure out and calculate the right length
let (_, total_delta) = get_delta(&node.begin.unwrap());
let (_, total_delta) = get_delta(&node.begin.unwrap(), &json_date, true);
let mut out = node.out;
if node.out - node.seek > total_delta {
@ -404,7 +421,7 @@ fn handle_list_init(mut node: Media) -> Media {
node.out = out;
let new_node = gen_source(node);
let new_node = gen_source(node, &json_date);
new_node
}

View File

@ -11,7 +11,7 @@ mod utils;
use crate::output::{player, write_hls};
use crate::utils::{
init_config, init_logging, run_rpc, validate_ffmpeg, GlobalConfig, PlayerControl,
init_config, init_logging, init_status, run_rpc, validate_ffmpeg, GlobalConfig, PlayerControl,
PlayoutStatus, ProcessControl,
};
@ -19,8 +19,8 @@ fn main() {
init_config();
let config = GlobalConfig::global();
let play_control = PlayerControl::new();
let _ = PlayoutStatus::new();
let proc_control = ProcessControl::new();
let _playout_stat = PlayoutStatus::new();
let runtime = Builder::new_multi_thread().enable_all().build().unwrap();
let rt_handle = runtime.handle();
@ -28,6 +28,7 @@ fn main() {
let logging = init_logging(rt_handle.clone(), proc_control.is_terminated.clone());
CombinedLogger::init(logging).unwrap();
init_status();
validate_ffmpeg();
if config.rpc_server.enable {

View File

@ -23,9 +23,15 @@ use simplelog::*;
use tokio::runtime::Handle;
use crate::output::source_generator;
use crate::utils::{sec_to_time, stderr_reader, GlobalConfig, PlayerControl, ProcessControl};
use crate::utils::{
sec_to_time, stderr_reader, GlobalConfig, PlayerControl, ProcessControl,
};
pub fn write_hls(rt_handle: &Handle, play_control: PlayerControl, proc_control: ProcessControl) {
pub fn write_hls(
rt_handle: &Handle,
play_control: PlayerControl,
proc_control: ProcessControl,
) {
let config = GlobalConfig::global();
let dec_settings = config.out.clone().output_cmd.unwrap();
let ff_log_format = format!("level+{}", config.logging.ffmpeg_level.to_lowercase());
@ -33,9 +39,9 @@ pub fn write_hls(rt_handle: &Handle, play_control: PlayerControl, proc_control:
let (get_source, _) = source_generator(
rt_handle,
config.clone(),
proc_control.is_terminated.clone(),
play_control.current_list.clone(),
play_control.index.clone(),
proc_control.is_terminated.clone(),
);
for node in get_source {

View File

@ -30,9 +30,9 @@ use crate::utils::{
pub fn source_generator(
rt_handle: &Handle,
config: GlobalConfig,
is_terminated: Arc<Mutex<bool>>,
current_list: Arc<Mutex<Vec<Media>>>,
index: Arc<Mutex<usize>>,
is_terminated: Arc<Mutex<bool>>,
) -> (Box<dyn Iterator<Item = Media>>, Arc<Mutex<bool>>) {
let mut init_playlist: Arc<Mutex<bool>> = Arc::new(Mutex::new(false));
@ -81,7 +81,11 @@ pub fn source_generator(
(get_source, init_playlist)
}
pub fn player(rt_handle: &Handle, play_control: PlayerControl, proc_control: ProcessControl) {
pub fn player(
rt_handle: &Handle,
play_control: PlayerControl,
proc_control: ProcessControl,
) {
let config = GlobalConfig::global();
let dec_settings = config.processing.clone().settings.unwrap();
let ff_log_format = format!("level+{}", config.logging.ffmpeg_level.to_lowercase());
@ -93,9 +97,9 @@ pub fn player(rt_handle: &Handle, play_control: PlayerControl, proc_control: Pro
let (get_source, init_playlist) = source_generator(
rt_handle,
config.clone(),
proc_control.is_terminated.clone(),
play_control.current_list.clone(),
play_control.index.clone(),
proc_control.is_terminated.clone(),
);
let mut enc_proc = match config.out.mode.as_str() {

View File

@ -127,8 +127,6 @@ pub struct Out {
pub output_cmd: Option<Vec<String>>,
}
static INSTANCE: OnceCell<GlobalConfig> = OnceCell::new();
impl GlobalConfig {
fn new() -> Self {
let args = get_args();
@ -253,6 +251,8 @@ impl GlobalConfig {
}
}
static INSTANCE: OnceCell<GlobalConfig> = OnceCell::new();
fn pre_audio_codec(add_loudnorm: bool) -> Vec<String> {
// when add_loudnorm is False we use a different audio encoder,
// s302m has higher quality, but is experimental

View File

@ -5,7 +5,7 @@ use serde::{Deserialize, Serialize};
use std::{
env::temp_dir,
fs,
fs::metadata,
fs::{metadata, File},
io::{BufRead, BufReader, Error},
path::Path,
process::exit,
@ -14,6 +14,7 @@ use std::{
time,
time::UNIX_EPOCH,
};
use once_cell::sync::OnceCell;
use jsonrpc_http_server::CloseHandle;
use process_control::Terminator;
@ -105,7 +106,7 @@ impl Drop for ProcessControl {
}
}
#[derive(Serialize, Clone)]
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct PlayoutStatus {
pub time_shift: f64,
pub date: String,
@ -113,23 +114,51 @@ pub struct PlayoutStatus {
impl PlayoutStatus {
pub fn new() -> Self {
let stat_file = temp_dir().join("ffplayout.json");
let stat_file = temp_dir().join("ffplayout_status.json");
if !stat_file.exists() {
let data = Self {
let mut data: PlayoutStatus = Self {
time_shift: 0.0,
date: "".to_string(),
};
let json: String = serde_json::to_string(&data).expect("Serde read data failed");
fs::write(stat_file, &json).expect("Unable to write file");
if !stat_file.exists() {
} else {
let file = File::options()
.read(true)
.write(false)
.open(&stat_file.display().to_string())
.expect("Could not open status file");
data = serde_json::from_reader(file).expect("Could not read status file.");
}
Self {
time_shift: 0.0,
date: "".to_string(),
data
}
pub fn write(mut self, date: String, time_shift: f64) {
let stat_file = temp_dir().join("ffplayout_status.json");
self.date = date.clone();
self.time_shift = time_shift.clone();
if let Ok (json) = serde_json::to_string(&self) {
if let Err(e) = fs::write(stat_file, &json) {
error!("Unable to write status file: {e}")
};
};
}
pub fn global() -> &'static PlayoutStatus {
STATUS_CELL.get().expect("Config is not initialized")
}
}
static STATUS_CELL: OnceCell<PlayoutStatus> = OnceCell::new();
pub fn init_status() {
let status = PlayoutStatus::new();
STATUS_CELL.set(status).unwrap();
}
#[derive(Clone)]
@ -213,9 +242,9 @@ impl Media {
}
}
pub fn add_filter(&mut self) {
pub fn add_filter(&mut self, json_date: &String) {
let mut node = self.clone();
self.filter = Some(filter_chains(&mut node))
self.filter = Some(filter_chains(&mut node, &json_date))
}
}
@ -349,13 +378,15 @@ pub fn is_close(a: f64, b: f64, to: f64) -> bool {
false
}
pub fn get_delta(begin: &f64) -> (f64, f64) {
pub fn get_delta(begin: &f64, json_date: &String, shift: bool) -> (f64, f64) {
let config = GlobalConfig::global();
let status = PlayoutStatus::global();
let mut current_time = get_sec();
let start = config.playlist.start_sec.unwrap();
let length = time_to_sec(&config.playlist.length);
let mut target_length = 86400.0;
let total_delta;
let mut total_delta;
if length > 0.0 && length != target_length {
target_length = length
@ -378,6 +409,11 @@ pub fn get_delta(begin: &f64) -> (f64, f64) {
total_delta = target_length + start - current_time;
}
if shift && json_date == &status.date && status.time_shift != 0.0 {
current_delta -= status.time_shift;
total_delta -= status.time_shift;
}
(current_delta, total_delta)
}

View File

@ -6,7 +6,9 @@ use jsonrpc_http_server::{
};
use simplelog::*;
use crate::utils::{get_sec, sec_to_time, GlobalConfig, Media, PlayerControl, ProcessControl};
use crate::utils::{
get_delta, get_sec, sec_to_time, GlobalConfig, Media, PlayerControl, PlayoutStatus, ProcessControl,
};
fn get_media_map(media: Media) -> Value {
json!({
@ -55,12 +57,17 @@ pub async fn run_rpc(play_control: PlayerControl, proc_control: ProcessControl)
if let Ok(_) = decoder.terminate() {
info!("Move to next clip");
let index = *play.index.lock().unwrap();
let status = PlayoutStatus::global();
if index < play.current_list.lock().unwrap().len() {
let mut data_map = Map::new();
let mut media =
play.current_list.lock().unwrap()[index].clone();
media.add_probe();
let (delta, _) = get_delta(&media.begin.unwrap_or(0.0), &status.date, false);
status.clone().write(status.date.clone(), delta);
data_map.insert(
"operation".to_string(),
json!("Move to next clip"),