From b8d041be57cba0f00832d3bcd0c2c279a4d9fd4e Mon Sep 17 00:00:00 2001 From: jb-alvarado Date: Wed, 31 Jan 2024 17:52:17 +0100 Subject: [PATCH] add skip validation argument, fix case where clips not exist (introduced in v.20), fix lock deadlock, add more tests --- Cargo.lock | 30 +- ffplayout-engine/src/input/playlist.rs | 172 +- ffplayout-engine/src/output/mod.rs | 4 +- ffplayout-engine/src/utils/arg_parse.rs | 3 + ffplayout-engine/src/utils/mod.rs | 2 + lib/src/utils/config.rs | 3 + lib/src/utils/json_serializer.rs | 25 +- lib/src/utils/json_validate.rs | 33 +- lib/src/utils/mod.rs | 1 - .../assets/playlists/2024/01/2024-01-30.json | 22526 ++++++++++++++++ .../assets/playlists/2024/01/2024-01-31.json | 22526 ++++++++++++++++ tests/src/engine_cmd.rs | 38 +- tests/src/engine_playlist.rs | 133 + 13 files changed, 45369 insertions(+), 127 deletions(-) create mode 100644 tests/assets/playlists/2024/01/2024-01-30.json create mode 100644 tests/assets/playlists/2024/01/2024-01-31.json diff --git a/Cargo.lock b/Cargo.lock index c8c268da..c7daf071 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4,11 +4,11 @@ version = 3 [[package]] name = "actix-codec" -version = "0.5.1" +version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "617a8268e3537fe1d8c9ead925fca49ef6400927ee7bc26750e90ecee14ce4b8" +checksum = "5f7b0a21988c1bf877cf4759ef5ddaac04c1c9fe808c9142ecb78ba97d97a28a" dependencies = [ - "bitflags 1.3.2", + "bitflags 2.4.2", "bytes", "futures-core", "futures-sink", @@ -368,9 +368,9 @@ dependencies = [ [[package]] name = "anstyle" -version = "1.0.4" +version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7079075b41f533b8c61d2a4d073c4676e1f8b249ff94a393b0595db304e0dd87" +checksum = "2faccea4cc4ab4a667ce676a30e8ec13922a692c99bb8f5b11f1502c72e04220" [[package]] name = "anstyle-parse" @@ -997,9 +997,9 @@ dependencies = [ [[package]] name = "darling" -version = "0.20.3" +version = "0.20.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0209d94da627ab5605dcccf08bb18afa5009cfbef48d8a8b7d7bdbc79be25c5e" +checksum = "fc5d6b04b3fd0ba9926f945895de7d806260a2d7431ba82e7edaecb043c4c6b8" dependencies = [ "darling_core", "darling_macro", @@ -1007,9 +1007,9 @@ dependencies = [ [[package]] name = "darling_core" -version = "0.20.3" +version = "0.20.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "177e3443818124b357d8e76f53be906d60937f0d3a90773a664fa63fa253e621" +checksum = "04e48a959bcd5c761246f5d090ebc2fbf7b9cd527a492b07a67510c108f1e7e3" dependencies = [ "fnv", "ident_case", @@ -1021,9 +1021,9 @@ dependencies = [ [[package]] name = "darling_macro" -version = "0.20.3" +version = "0.20.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "836a9bbc7ad63342d6d6e7b815ccab164bc77a2d95d84bc3117a8c0d5c98e2d5" +checksum = "1d1545d67a2149e1d93b7e5c7752dce5a7426eb5d1357ddcfd89336b94444f77" dependencies = [ "darling_core", "quote", @@ -1823,9 +1823,9 @@ checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3" [[package]] name = "itertools" -version = "0.12.0" +version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "25db6b064527c5d482d0423354fcd07a89a2dfe07b67892e62411946db7f07b0" +checksum = "ba291022dbbd398a455acf126c1e341954079855bc60dfdda641363bd6922569" dependencies = [ "either", ] @@ -3733,9 +3733,9 @@ checksum = "4e8257fbc510f0a46eb602c10215901938b5c2a7d5e70fc11483b1d3c9b5b18c" [[package]] name = "value-bag" -version = "1.6.0" +version = "1.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7cdbaf5e132e593e9fc1de6a15bbec912395b11fb9719e061cf64f804524c503" +checksum = "126e423afe2dd9ac52142e7e9d5ce4135d7e13776c529d27fd6bc49f19e3280b" [[package]] name = "vcpkg" diff --git a/ffplayout-engine/src/input/playlist.rs b/ffplayout-engine/src/input/playlist.rs index f9ae57ea..efaaf8a4 100644 --- a/ffplayout-engine/src/input/playlist.rs +++ b/ffplayout-engine/src/input/playlist.rs @@ -3,7 +3,7 @@ use std::{ path::Path, sync::{ atomic::{AtomicBool, Ordering}, - Arc, Mutex, + Arc, }, }; @@ -160,7 +160,10 @@ impl CurrentProgram { duration = self.current_node.duration } - trace!("delta: {delta}, total_delta: {total_delta}"); + trace!( + "delta: {delta}, total_delta: {total_delta}, current index: {}", + self.current_node.index.unwrap_or_default() + ); let mut next_start = self.current_node.begin.unwrap_or_default() - start_sec + duration + delta; @@ -284,9 +287,10 @@ impl CurrentProgram { } // Prepare init clip. - fn init_clip(&mut self) { + fn init_clip(&mut self) -> bool { trace!("init_clip"); self.get_current_clip(); + let mut is_filler = false; if !self.playout_stat.list_init.load(Ordering::SeqCst) { let time_sec = self.get_current_time(); @@ -299,17 +303,31 @@ impl CurrentProgram { // de-instance node to preserve original values in list let mut node_clone = nodes[index].clone(); + + // Important! When no manual drop is happen here, lock is still active in handle_list_init + drop(nodes); + node_clone.seek = time_sec - (node_clone.begin.unwrap() - *self.playout_stat.time_shift.lock().unwrap()); self.current_node = handle_list_init( &self.config, node_clone, - &self.playout_stat.chain, + &self.playout_stat, &self.player_control, last_index, ); + + if self + .current_node + .source + .contains(&self.config.storage.path.to_string_lossy().to_string()) + { + is_filler = true; + } } + + is_filler } } @@ -322,20 +340,47 @@ impl Iterator for CurrentProgram { if self.playout_stat.list_init.load(Ordering::SeqCst) { trace!("Init playlist, from next iterator"); + let mut init_clip_is_filler = false; + if self.json_path.is_some() { - self.init_clip(); + init_clip_is_filler = self.init_clip(); } if self.playout_stat.list_init.load(Ordering::SeqCst) { // On init load, playlist could be not long enough, // so we check if we can take the next playlist already, // or we fill the gap with a dummy. - let last_index = self.player_control.current_list.lock().unwrap().len() - 1; - self.current_node = - self.player_control.current_list.lock().unwrap()[last_index].clone(); - let new_node = self.player_control.current_list.lock().unwrap()[last_index].clone(); - let new_length = new_node.begin.unwrap_or_default() + new_node.duration; - trace!("Init playlist after playlist end"); + + let (last_index, new_length) = if let Ok(c_list) = + self.player_control.current_list.try_lock() + { + let last_index = c_list.len() - 1; + let mut new_length = + self.current_node.begin.unwrap_or_default() + self.current_node.duration; + + if self.current_node.index.unwrap_or_default() >= last_index { + // Only take last clip when index match or over + trace!( + "change current node, because index ({}) is over last item index: {last_index}", + self.current_node.index.unwrap_or_default() + ); + + self.current_node = c_list[last_index].clone(); + new_length = c_list[last_index].begin.unwrap_or_default() + + c_list[last_index].duration; + } + + (last_index, new_length) + } else { + trace!("Lock from current_list not possible!"); + + (0, 0.0) + }; + + trace!( + "Init playlist after playlist end, or missing files, index: {}", + self.current_node.index.unwrap_or_default() + ); let next_playlist = self.check_for_next_playlist(); @@ -352,16 +397,23 @@ impl Iterator for CurrentProgram { .current_index .fetch_add(1, Ordering::SeqCst); + let nodes = self.player_control.current_list.lock().unwrap(); + let node_clone = nodes[index].clone(); + + drop(nodes); + self.current_node = gen_source( &self.config, - self.player_control.current_list.lock().unwrap()[index].clone(), - &self.playout_stat.chain, + node_clone, + &self.playout_stat, &self.player_control, 0, ); return Some(self.current_node.clone()); - } else { + } else if !init_clip_is_filler { + // TODO: maybe this should be never true + // fill missing length from playlist let mut current_time = get_sec(); let (_, total_delta) = get_delta(&self.config, ¤t_time); @@ -396,7 +448,7 @@ impl Iterator for CurrentProgram { self.current_node = gen_source( &self.config, media, - &self.playout_stat.chain, + &self.playout_stat, &self.player_control, last_index, ); @@ -475,7 +527,7 @@ impl Iterator for CurrentProgram { self.current_node = gen_source( &self.config, self.current_node.clone(), - &self.playout_stat.chain, + &self.playout_stat, &self.player_control, 0, ); @@ -502,7 +554,7 @@ impl Iterator for CurrentProgram { self.current_node = gen_source( &self.config, self.player_control.current_list.lock().unwrap()[0].clone(), - &self.playout_stat.chain, + &self.playout_stat, &self.player_control, 0, ); @@ -566,13 +618,7 @@ fn timed_source( { // when we are in the 24 hour range, get the clip new_node.process = Some(true); - new_node = gen_source( - config, - node, - &playout_stat.chain, - player_control, - last_index, - ); + new_node = gen_source(config, node, &playout_stat, player_control, last_index); } else if total_delta <= 0.0 { info!("Begin is over play time, skip: {}", node.source); } else if total_delta < node.duration - node.seek || last { @@ -580,7 +626,7 @@ fn timed_source( config, node, total_delta, - &playout_stat.chain, + &playout_stat, player_control, last_index, ); @@ -593,11 +639,18 @@ fn timed_source( pub fn gen_source( config: &PlayoutConfig, mut node: Media, - filter_chain: &Option>>>, + playout_stat: &PlayoutStatus, player_control: &PlayerControl, last_index: usize, ) -> Media { - let duration = node.out - node.seek; + let node_index = node.index.unwrap_or_default(); + let mut duration = node.out - node.seek; + + if duration < 1.0 { + warn!("Clip is less then 1 second long ({duration:.3}), adjust length."); + + duration = 1.2; + } trace!("Clip out: {duration}, duration: {}", node.duration); @@ -609,6 +662,11 @@ pub fn gen_source( // separate if condition, because of node.add_probe() in last condition if node.probe.is_some() { + if node.seek > 0.0 { + node.seek = node.seek - 1.0; + } else { + node.out = node.out + 1.0; + } if node .source .rsplit_once('.') @@ -621,14 +679,10 @@ pub fn gen_source( node.cmd = Some(seek_and_length(&mut node)); } } else { - trace!( - "clip index: {} | last index: {}", - node.index.unwrap_or_default(), - last_index - ); + trace!("clip index: {node_index} | last index: {last_index}"); - // last index is the index from the last item from the node list. - if node.index.unwrap_or_default() < last_index { + // Last index is the index from the last item from the node list. + if node_index < last_index { error!("Source not found: \"{}\"", node.source); } @@ -638,6 +692,11 @@ pub fn gen_source( let filler_index = player_control.filler_index.fetch_add(1, Ordering::SeqCst); let mut filler_media = player_control.filler_list.lock().unwrap()[filler_index].clone(); + trace!("take filler: {}", filler_media.source); + + // Set list_init to true, to stay in sync. + playout_stat.list_init.store(true, Ordering::SeqCst); + if filler_index == player_control.filler_list.lock().unwrap().len() - 1 { player_control.filler_index.store(0, Ordering::SeqCst) } @@ -650,38 +709,12 @@ pub fn gen_source( filler_media.out = duration; } - // If necessary filler clip will be injected to the current list, - // original clip get new begin and seek value, to keep everything in sync. - - if node.index.unwrap_or_default() < last_index { - player_control.current_list.lock().unwrap()[node.index.unwrap_or_default()].begin = - Some(node.begin.unwrap_or_default() + filler_media.out); - - player_control.current_list.lock().unwrap()[node.index.unwrap_or_default()].seek = - node.seek + filler_media.out; - } - node.source = filler_media.source; node.seek = 0.0; node.out = filler_media.out; node.duration = filler_media.duration; node.cmd = Some(loop_filler(&node)); node.probe = filler_media.probe; - - if node.out < duration - 1.0 && node.index.unwrap_or_default() < last_index { - player_control - .current_list - .lock() - .unwrap() - .insert(node.index.unwrap_or_default(), node.clone()); - - for (i, item) in (*player_control.current_list.lock().unwrap()) - .iter_mut() - .enumerate() - { - item.index = Some(i); - } - } } else { match MediaProbe::new(&config.storage.filler.to_string_lossy()) { Ok(probe) => { @@ -734,16 +767,9 @@ pub fn gen_source( ); } - node.add_filter(config, filter_chain); + node.add_filter(config, &playout_stat.chain); - if duration < 1.0 { - warn!( - "Clip is less then 1 second long ({duration:.3}), skip: {}", - node.source - ); - - node.process = Some(false); - } + trace!("return gen_source: {}", node.source); node } @@ -753,7 +779,7 @@ pub fn gen_source( fn handle_list_init( config: &PlayoutConfig, mut node: Media, - filter_chain: &Option>>>, + playout_stat: &PlayoutStatus, player_control: &PlayerControl, last_index: usize, ) -> Media { @@ -767,7 +793,7 @@ fn handle_list_init( node.out = out; - gen_source(config, node, filter_chain, player_control, last_index) + gen_source(config, node, playout_stat, player_control, last_index) } /// when we come to last clip in playlist, @@ -777,7 +803,7 @@ fn handle_list_end( config: &PlayoutConfig, mut node: Media, total_delta: f64, - filter_chain: &Option>>>, + playout_stat: &PlayoutStatus, player_control: &PlayerControl, last_index: usize, ) -> Media { @@ -807,5 +833,5 @@ fn handle_list_end( node.process = Some(true); - gen_source(config, node, filter_chain, player_control, last_index) + gen_source(config, node, playout_stat, player_control, last_index) } diff --git a/ffplayout-engine/src/output/mod.rs b/ffplayout-engine/src/output/mod.rs index de1e2b5e..76c9b5d3 100644 --- a/ffplayout-engine/src/output/mod.rs +++ b/ffplayout-engine/src/output/mod.rs @@ -48,7 +48,7 @@ pub fn player( let play_stat = playout_stat.clone(); // get source iterator - let get_source = source_generator( + let node_sources = source_generator( config.clone(), play_control, playout_stat, @@ -82,7 +82,7 @@ pub fn player( thread::spawn(move || ingest_server(config_clone, ingest_sender, proc_control_c)); } - 'source_iter: for node in get_source { + 'source_iter: for node in node_sources { *play_control.current_media.lock().unwrap() = Some(node.clone()); if proc_control.is_terminated.load(Ordering::SeqCst) { diff --git a/ffplayout-engine/src/utils/arg_parse.rs b/ffplayout-engine/src/utils/arg_parse.rs index 21ff05b8..0aa43f09 100644 --- a/ffplayout-engine/src/utils/arg_parse.rs +++ b/ffplayout-engine/src/utils/arg_parse.rs @@ -87,6 +87,9 @@ pub struct Args { #[clap(short, long, help = "Set audio volume")] pub volume: Option, + #[clap(long, help = "Skip validation process")] + pub skip_validation: bool, + #[clap(long, help = "validate given playlist")] pub validate: bool, } diff --git a/ffplayout-engine/src/utils/mod.rs b/ffplayout-engine/src/utils/mod.rs index 5c556319..dc741e9a 100644 --- a/ffplayout-engine/src/utils/mod.rs +++ b/ffplayout-engine/src/utils/mod.rs @@ -121,6 +121,8 @@ pub fn get_config(args: Args) -> Result { } } + config.general.skip_validation = args.skip_validation; + if let Some(volume) = args.volume { config.processing.volume = volume; } diff --git a/lib/src/utils/config.rs b/lib/src/utils/config.rs index 2beb101c..3fa053e0 100644 --- a/lib/src/utils/config.rs +++ b/lib/src/utils/config.rs @@ -180,6 +180,9 @@ pub struct General { #[serde(skip_serializing, skip_deserializing)] pub template: Option