diff --git a/src/input/playlist.rs b/src/input/playlist.rs index d04f8446..0c8ed6f4 100644 --- a/src/input/playlist.rs +++ b/src/input/playlist.rs @@ -18,29 +18,36 @@ pub struct CurrentProgram { json_mod: Option, json_path: Option, json_date: String, - nodes: Vec, + pub nodes: Arc>>, current_node: Media, pub init: Arc>, - index: usize, + index: Arc>, rt_handle: Handle, is_terminated: Arc>, } impl CurrentProgram { - pub fn new(rt_handle: Handle, is_terminated: Arc>) -> Self { + pub fn new( + rt_handle: Handle, + is_terminated: Arc>, + current_list: Arc>>, + global_index: Arc>, + ) -> Self { let config = GlobalConfig::global(); let json = read_json(None, rt_handle.clone(), is_terminated.clone(), true, 0.0); + *current_list.lock().unwrap() = json.program; + Self { config: config.clone(), start_sec: json.start_sec.unwrap(), json_mod: json.modified, json_path: json.current_file, json_date: json.date, - nodes: json.program, + nodes: current_list, current_node: Media::new(0, "".to_string(), false), init: Arc::new(Mutex::new(true)), - index: 0, + index: global_index, rt_handle, is_terminated, } @@ -58,7 +65,7 @@ impl CurrentProgram { self.json_path = json.current_file; self.json_mod = json.modified; - self.nodes = json.program; + *self.nodes.lock().unwrap() = json.program; } else if Path::new(&self.json_path.clone().unwrap()).is_file() { let mod_time = modified_time(&self.json_path.clone().unwrap()); @@ -82,10 +89,10 @@ impl CurrentProgram { ); self.json_mod = json.modified; - self.nodes = json.program; + *self.nodes.lock().unwrap() = json.program; self.get_current_clip(); - self.index += 1; + *self.index.lock().unwrap() += 1; } } else { error!( @@ -98,10 +105,10 @@ impl CurrentProgram { media.out = DUMMY_LEN; self.json_path = None; - self.nodes = vec![media.clone()]; + *self.nodes.lock().unwrap() = vec![media.clone()]; self.current_node = media; *self.init.lock().unwrap() = true; - self.index = 0; + *self.index.lock().unwrap() = 0; } } @@ -133,8 +140,8 @@ impl CurrentProgram { self.json_path = json.current_file.clone(); self.json_mod = json.modified; self.json_date = json.date; - self.nodes = json.program; - self.index = 0; + *self.nodes.lock().unwrap() = json.program; + *self.index.lock().unwrap() = 0; if json.current_file.is_none() { *self.init.lock().unwrap() = true; @@ -143,15 +150,18 @@ impl CurrentProgram { } fn last_next_ad(&mut self) { - if self.index + 1 < self.nodes.len() - && self.nodes[self.index + 1].category == "advertisement".to_string() + let index = *self.index.lock().unwrap(); + let current_list = self.nodes.lock().unwrap(); + + if index + 1 < current_list.len() + && current_list[index + 1].category == "advertisement".to_string() { self.current_node.next_ad = Some(true); } - if self.index > 0 - && self.index < self.nodes.len() - && self.nodes[self.index - 1].category == "advertisement".to_string() + if index > 0 + && index < current_list.len() + && current_list[index - 1].category == "advertisement".to_string() { self.current_node.last_ad = Some(true); } @@ -170,10 +180,10 @@ impl CurrentProgram { fn get_current_clip(&mut self) { let time_sec = self.get_current_time(); - for (i, item) in self.nodes.iter_mut().enumerate() { + for (i, item) in self.nodes.lock().unwrap().iter_mut().enumerate() { if item.begin.unwrap() + item.out - item.seek > time_sec { *self.init.lock().unwrap() = false; - self.index = i; + *self.index.lock().unwrap() = i; break; } @@ -185,10 +195,11 @@ impl CurrentProgram { if !*self.init.lock().unwrap() { let time_sec = self.get_current_time(); + let index = *self.index.lock().unwrap(); // de-instance node to preserve original values in list - let mut node_clone = self.nodes[self.index].clone(); - self.index += 1; + let mut node_clone = self.nodes.lock().unwrap()[index].clone(); + *self.index.lock().unwrap() += 1; node_clone.seek = time_sec - node_clone.begin.unwrap(); self.current_node = handle_list_init(node_clone); @@ -212,10 +223,11 @@ impl Iterator for CurrentProgram { // on init load playlist, could be not long enough, // so we check if we can take the next playlist already, // or we fill the gap with a dummy. - self.current_node = self.nodes[self.nodes.len() - 1].clone(); + let list_length = self.nodes.lock().unwrap().len(); + self.current_node = self.nodes.lock().unwrap()[list_length - 1].clone(); self.check_for_next_playlist(); - let new_node = self.nodes[self.nodes.len() - 1].clone(); + let new_node = self.nodes.lock().unwrap()[list_length - 1].clone(); let new_length = new_node.begin.unwrap() + new_node.duration; if new_length @@ -242,8 +254,8 @@ impl Iterator for CurrentProgram { media.out = duration; self.current_node = gen_source(media); - self.nodes.push(self.current_node.clone()); - self.index = self.nodes.len(); + self.nodes.lock().unwrap().push(self.current_node.clone()); + *self.index.lock().unwrap() = self.nodes.lock().unwrap().len(); } } @@ -252,17 +264,18 @@ impl Iterator for CurrentProgram { return Some(self.current_node.clone()); } - if self.index < self.nodes.len() { + if *self.index.lock().unwrap() < self.nodes.lock().unwrap().len() { self.check_for_next_playlist(); let mut is_last = false; + let index = *self.index.lock().unwrap(); - if self.index == self.nodes.len() - 1 { + if index == self.nodes.lock().unwrap().len() - 1 { is_last = true } - self.current_node = timed_source(self.nodes[self.index].clone(), &self.config, is_last); + self.current_node = timed_source(self.nodes.lock().unwrap()[index].clone(), &self.config, is_last); self.last_next_ad(); - self.index += 1; + *self.index.lock().unwrap() += 1; // update playlist should happen after current clip, // to prevent unknown behaviors. @@ -279,7 +292,8 @@ impl Iterator for CurrentProgram { { // Test if playlist is to early finish, // and if we have to fill it with a placeholder. - self.current_node = Media::new(self.index, "".to_string(), false); + let index = *self.index.lock().unwrap(); + self.current_node = Media::new(index, "".to_string(), false); self.current_node.begin = Some(get_sec()); let mut duration = total_delta.abs(); @@ -289,23 +303,23 @@ impl Iterator for CurrentProgram { self.current_node.duration = duration; self.current_node.out = duration; self.current_node = gen_source(self.current_node.clone()); - self.nodes.push(self.current_node.clone()); + self.nodes.lock().unwrap().push(self.current_node.clone()); self.last_next_ad(); self.current_node.last_ad = last_ad; self.current_node.add_filter(); - self.index += 1; + *self.index.lock().unwrap() += 1; return Some(self.current_node.clone()); } - self.index = 0; - self.current_node = gen_source(self.nodes[self.index].clone()); + *self.index.lock().unwrap() = 0; + self.current_node = gen_source(self.nodes.lock().unwrap()[0].clone()); self.last_next_ad(); self.current_node.last_ad = last_ad; - self.index = 1; + *self.index.lock().unwrap() = 1; Some(self.current_node.clone()) } diff --git a/src/main.rs b/src/main.rs index 77a1d76a..e774fd28 100644 --- a/src/main.rs +++ b/src/main.rs @@ -10,12 +10,17 @@ mod output; mod utils; use crate::output::{player, write_hls}; -use crate::utils::{init_config, init_logging, validate_ffmpeg, run_rpc, GlobalConfig, ProcessControl}; +use crate::utils::{ + init_config, init_logging, run_rpc, validate_ffmpeg, GlobalConfig, PlayerControl, + PlayoutStatus, ProcessControl, +}; fn main() { init_config(); let config = GlobalConfig::global(); + let play_control = PlayerControl::new(); let proc_control = ProcessControl::new(); + let _playout_stat = PlayoutStatus::new(); let runtime = Builder::new_multi_thread().enable_all().build().unwrap(); let rt_handle = runtime.handle(); @@ -26,13 +31,13 @@ fn main() { validate_ffmpeg(); if config.rpc_server.enable { - rt_handle.spawn(run_rpc(proc_control.clone())); + rt_handle.spawn(run_rpc(play_control.clone(), proc_control.clone())); } if config.out.mode.to_lowercase() == "hls".to_string() { - write_hls(rt_handle, proc_control); + write_hls(rt_handle, play_control, proc_control); } else { - player(rt_handle, proc_control); + player(rt_handle, play_control, proc_control); } info!("Playout done..."); diff --git a/src/output/hls.rs b/src/output/hls.rs index 95926c1b..6f367198 100644 --- a/src/output/hls.rs +++ b/src/output/hls.rs @@ -23,9 +23,9 @@ use simplelog::*; use tokio::runtime::Handle; use crate::output::source_generator; -use crate::utils::{sec_to_time, stderr_reader, GlobalConfig, ProcessControl}; +use crate::utils::{sec_to_time, stderr_reader, GlobalConfig, PlayerControl, ProcessControl}; -pub fn write_hls(rt_handle: &Handle, proc_control: ProcessControl) { +pub fn write_hls(rt_handle: &Handle, play_control: PlayerControl, proc_control: ProcessControl) { let config = GlobalConfig::global(); let dec_settings = config.out.clone().output_cmd.unwrap(); let ff_log_format = format!("level+{}", config.logging.ffmpeg_level.to_lowercase()); @@ -34,13 +34,12 @@ pub fn write_hls(rt_handle: &Handle, proc_control: ProcessControl) { rt_handle, config.clone(), proc_control.is_terminated.clone(), - proc_control.current_list.clone(), - proc_control.index.clone(), + play_control.current_list.clone(), + play_control.index.clone(), ); for node in get_source { - *proc_control.current_media.lock().unwrap() = Some(node.clone()); - *proc_control.index.lock().unwrap() = node.index.clone().unwrap(); + *play_control.current_media.lock().unwrap() = Some(node.clone()); let cmd = match node.cmd { Some(cmd) => cmd, diff --git a/src/output/mod.rs b/src/output/mod.rs index 83337e6a..3dd4d4b2 100644 --- a/src/output/mod.rs +++ b/src/output/mod.rs @@ -23,7 +23,9 @@ mod stream; pub use hls::write_hls; use crate::input::{file_worker, ingest_server, CurrentProgram, Source}; -use crate::utils::{sec_to_time, stderr_reader, GlobalConfig, Media, ProcessControl}; +use crate::utils::{ + sec_to_time, stderr_reader, GlobalConfig, Media, PlayerControl, ProcessControl, +}; pub fn source_generator( rt_handle: &Handle, @@ -31,10 +33,7 @@ pub fn source_generator( is_terminated: Arc>, current_list: Arc>>, index: Arc>, -) -> ( - Box>, - Arc>, -) { +) -> (Box>, Arc>) { let mut init_playlist: Arc> = Arc::new(Mutex::new(false)); let get_source = match config.processing.clone().mode.as_str() { @@ -63,7 +62,12 @@ pub fn source_generator( } "playlist" => { info!("Playout in playlist mode"); - let program = CurrentProgram::new(rt_handle.clone(), is_terminated.clone()); + let program = CurrentProgram::new( + rt_handle.clone(), + is_terminated.clone(), + current_list, + index, + ); init_playlist = program.init.clone(); Box::new(program) as Box> @@ -77,7 +81,7 @@ pub fn source_generator( (get_source, init_playlist) } -pub fn player(rt_handle: &Handle, proc_control: ProcessControl) { +pub fn player(rt_handle: &Handle, play_control: PlayerControl, proc_control: ProcessControl) { let config = GlobalConfig::global(); let dec_settings = config.processing.clone().settings.unwrap(); let ff_log_format = format!("level+{}", config.logging.ffmpeg_level.to_lowercase()); @@ -90,8 +94,8 @@ pub fn player(rt_handle: &Handle, proc_control: ProcessControl) { rt_handle, config.clone(), proc_control.is_terminated.clone(), - proc_control.current_list.clone(), - proc_control.index.clone(), + play_control.current_list.clone(), + play_control.index.clone(), ); let mut enc_proc = match config.out.mode.as_str() { @@ -122,7 +126,7 @@ pub fn player(rt_handle: &Handle, proc_control: ProcessControl) { } 'source_iter: for node in get_source { - *proc_control.current_media.lock().unwrap() = Some(node.clone()); + *play_control.current_media.lock().unwrap() = Some(node.clone()); let cmd = match node.cmd { Some(cmd) => cmd, @@ -246,6 +250,10 @@ pub fn player(rt_handle: &Handle, proc_control: ProcessControl) { sleep(Duration::from_secs(1)); + if let Err(e) = enc_proc.kill() { + panic!("Encoder error: {:?}", e) + }; + if let Err(e) = enc_proc.wait() { panic!("Encoder error: {:?}", e) }; diff --git a/src/utils/mod.rs b/src/utils/mod.rs index a9f097b7..04076dfe 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -3,6 +3,8 @@ use chrono::Duration; use ffprobe::{ffprobe, Format, Stream}; use serde::{Deserialize, Serialize}; use std::{ + env::temp_dir, + fs, fs::metadata, io::{BufRead, BufReader, Error}, path::Path, @@ -43,9 +45,6 @@ pub struct ProcessControl { pub rpc_handle: Arc>>, pub is_terminated: Arc>, pub is_alive: Arc>, - pub current_media: Arc>>, - pub current_list: Arc>>, - pub index: Arc>, } impl ProcessControl { @@ -58,9 +57,6 @@ impl ProcessControl { rpc_handle: Arc::new(Mutex::new(None)), is_terminated: Arc::new(Mutex::new(false)), is_alive: Arc::new(RwLock::new(true)), - current_media: Arc::new(Mutex::new(None)), - current_list: Arc::new(Mutex::new(vec!(Media::new(0, "".to_string(), false)))), - index: Arc::new(Mutex::new(0)), } } } @@ -78,7 +74,7 @@ impl ProcessControl { if let Some(server) = &*self.server_term.lock().unwrap() { unsafe { - if let Err(e)= server.terminate() { + if let Err(e) = server.terminate() { error!("Ingest server: {:?}", e); } } @@ -109,6 +105,50 @@ impl Drop for ProcessControl { } } +#[derive(Serialize, Clone)] +pub struct PlayoutStatus { + pub time_shift: f64, + pub date: String, +} + +impl PlayoutStatus { + pub fn new() -> Self { + let stat_file = temp_dir().join("ffplayout.json"); + + if !stat_file.exists() { + let data = Self { + time_shift: 0.0, + date: "".to_string(), + }; + + let json: String = serde_json::to_string(&data).expect("Serde read data failed"); + fs::write(stat_file, &json).expect("Unable to write file"); + } + + Self { + time_shift: 0.0, + date: "".to_string(), + } + } +} + +#[derive(Clone)] +pub struct PlayerControl { + pub current_media: Arc>>, + pub current_list: Arc>>, + pub index: Arc>, +} + +impl PlayerControl { + pub fn new() -> Self { + Self { + current_media: Arc::new(Mutex::new(None)), + current_list: Arc::new(Mutex::new(vec![Media::new(0, "".to_string(), false)])), + index: Arc::new(Mutex::new(0)), + } + } +} + #[derive(Debug, Serialize, Deserialize, Clone)] pub struct Media { pub begin: Option, diff --git a/src/utils/rpc_server.rs b/src/utils/rpc_server.rs index a704ce95..126e52ba 100644 --- a/src/utils/rpc_server.rs +++ b/src/utils/rpc_server.rs @@ -1,4 +1,4 @@ -use serde_json::{Map, json}; +use serde_json::{json, Map}; use jsonrpc_http_server::jsonrpc_core::{IoHandler, Params, Value}; use jsonrpc_http_server::{ @@ -6,7 +6,7 @@ use jsonrpc_http_server::{ }; use simplelog::*; -use crate::utils::{get_sec, sec_to_time, GlobalConfig, Media, ProcessControl}; +use crate::utils::{get_sec, sec_to_time, GlobalConfig, Media, PlayerControl, ProcessControl}; fn get_media_map(media: Media) -> Value { json!({ @@ -40,9 +40,10 @@ fn get_data_map(config: &GlobalConfig, media: Media) -> Map { data_map } -pub async fn run_rpc(proc_control: ProcessControl) { +pub async fn run_rpc(play_control: PlayerControl, proc_control: ProcessControl) { let config = GlobalConfig::global(); let mut io = IoHandler::default(); + let play = play_control.clone(); let proc = proc_control.clone(); io.add_sync_method("player", move |params: Params| { @@ -53,30 +54,36 @@ pub async fn run_rpc(proc_control: ProcessControl) { unsafe { if let Ok(_) = decoder.terminate() { info!("Move to next clip"); + let index = *play.index.lock().unwrap(); - if let Some(media) = proc.current_media.lock().unwrap().clone() { + if index < play.current_list.lock().unwrap().len() { let mut data_map = Map::new(); - data_map.insert("operation".to_string(), json!("Move to next clip")); + let mut media = + play.current_list.lock().unwrap()[index].clone(); + media.add_probe(); + data_map.insert( + "operation".to_string(), + json!("Move to next clip"), + ); data_map.insert("media".to_string(), get_media_map(media)); return Ok(Value::Object(data_map)); - }; - - return Ok(Value::String(format!("Move failed"))); + } } } } + return Ok(Value::String(format!("Move failed"))); } if map.contains_key("control") && map["control"] == "back".to_string() { if let Some(decoder) = &*proc.decoder_term.lock().unwrap() { - let index = *proc.index.lock().unwrap(); + let index = *play.index.lock().unwrap(); - if index > 1 && proc.current_list.lock().unwrap().len() > 1 { + if index > 1 && play.current_list.lock().unwrap().len() > 1 { info!("Move to last clip"); let mut data_map = Map::new(); - let mut media = proc.current_list.lock().unwrap()[index - 2].clone(); - *proc.index.lock().unwrap() = index - 2; + let mut media = play.current_list.lock().unwrap()[index - 2].clone(); + *play.index.lock().unwrap() = index - 2; media.add_probe(); data_map.insert("operation".to_string(), json!("Move to last clip")); data_map.insert("media".to_string(), get_media_map(media)); @@ -87,13 +94,12 @@ pub async fn run_rpc(proc_control: ProcessControl) { } } } - - return Ok(Value::String(format!("Move failed"))); } + return Ok(Value::String(format!("Move failed"))); } if map.contains_key("media") && map["media"] == "current".to_string() { - if let Some(media) = proc.current_media.lock().unwrap().clone() { + if let Some(media) = play.current_media.lock().unwrap().clone() { let data_map = get_data_map(config, media); return Ok(Value::Object(data_map));