diff --git a/Cargo.lock b/Cargo.lock index a125ecaf..35c8ca0c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1314,18 +1314,24 @@ dependencies = [ "argon2", "chrono", "clap", + "crossbeam-channel", "derive_more", "faccess", "ffplayout-lib", + "ffprobe", "flexi_logger", "futures-util", "home", + "itertools", "jsonwebtoken", "lazy_static", "lettre", "lexical-sort", "local-ip-address", "log", + "notify", + "notify-debouncer-full", + "num-traits", "once_cell", "paris", "parking_lot", diff --git a/ffplayout/Cargo.toml b/ffplayout/Cargo.toml index e260070c..acd897b5 100644 --- a/ffplayout/Cargo.toml +++ b/ffplayout/Cargo.toml @@ -24,17 +24,23 @@ actix-web-static-files = "4.0" argon2 = "0.5" chrono = { version = "0.4", default-features = false, features = ["clock", "std"] } clap = { version = "4.3", features = ["derive"] } +crossbeam-channel = "0.5" derive_more = "0.99" faccess = "0.2" +ffprobe = "0.4" flexi_logger = { version = "0.28", features = ["kv", "colors"] } futures-util = { version = "0.3", default-features = false, features = ["std"] } home = "0.5" +itertools = "0.12" jsonwebtoken = "9" lazy_static = "1.4" lettre = { version = "0.11", features = ["builder", "rustls-tls", "smtp-transport", "tokio1", "tokio1-rustls-tls"], default-features = false } lexical-sort = "0.3" local-ip-address = "0.6" log = { version = "0.4", features = ["std", "serde", "kv", "kv_std", "kv_sval", "kv_serde"] } +notify = "6.0" +notify-debouncer-full = { version = "*", default-features = false } +num-traits = "0.2" once_cell = "1.18" paris = "1.5" parking_lot = "0.12" diff --git a/ffplayout/src/main.rs b/ffplayout/src/main.rs index a42ee577..0f87f130 100644 --- a/ffplayout/src/main.rs +++ b/ffplayout/src/main.rs @@ -23,7 +23,7 @@ use path_clean::PathClean; use ffplayout::{ api::{auth, routes::*}, db::{db_pool, handles, models::LoginUser}, - player::controller::ChannelController, + player::controller::{self, ChannelController, ChannelManager}, sse::{broadcast::Broadcaster, routes::*, AuthState}, utils::{ config::PlayoutConfig, @@ -69,7 +69,7 @@ async fn main() -> std::io::Result<()> { .await .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; - let _channel_controller = ChannelController::new(); + let channel_controller = Arc::new(Mutex::new(ChannelController::new())); let channels = handles::select_all_channels(&pool) .await .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; @@ -80,9 +80,6 @@ async fn main() -> std::io::Result<()> { init_logging(mail_queues.clone())?; for channel in channels.iter() { - println!("channel: {channel:?}"); - let _channel_clone = channel.clone(); - let config_path = PathBuf::from(&channel.config_path); let config = match web::block(move || PlayoutConfig::new(Some(config_path), None)).await { Ok(config) => config, @@ -92,26 +89,27 @@ async fn main() -> std::io::Result<()> { } }; + let channel_manager = Arc::new(Mutex::new(ChannelManager::new( + channel.clone(), + config.clone(), + ))); + + channel_controller + .lock() + .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))? + .add(channel_manager); + let control_clone = channel_controller.clone(); let m_queue = Arc::new(Mutex::new(MailQueue::new(channel.id, config.mail))); if let Ok(mut mqs) = mail_queues.lock() { mqs.push(m_queue.clone()); } - warn!("This logs to console"); - if channel.active { + info!(target: Target::file(), channel = channel.id; "Start Playout"); + thread::spawn(move || { - info!(target: Target::file(), channel = 1; "Start Playout"); - - thread::sleep(std::time::Duration::from_secs(1)); - - error!(target: Target::file_mail(), channel = 1; "This logs to File and Mail, channel 1"); - error!(target: Target::file_mail(), channel = 2; "This logs to File and Mail, channel 2"); - error!(target: Target::file_mail(), channel = 1; "This logs to File and Mail, channel 1"); - error!(target: Target::file_mail(), channel = 3; "This logs to File and Mail, channel 3"); - error!(target: Target::file_mail(), channel = 1; "This logs to File and Mail, channel 1"); - error!(target: Target::file_mail(), channel = 1; "This logs to File and Mail, channel 1"); + controller::start(control_clone); }); } } diff --git a/ffplayout/src/player/controller.rs b/ffplayout/src/player/controller.rs index 01948bd4..ad04b551 100644 --- a/ffplayout/src/player/controller.rs +++ b/ffplayout/src/player/controller.rs @@ -1,17 +1,21 @@ use std::{ fmt, process::Child, - sync::{atomic::AtomicBool, Arc, Mutex}, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, Mutex, + }, }; #[cfg(not(windows))] use signal_child::Signalable; +use log::*; use serde::{Deserialize, Serialize}; -// use simplelog::*; use crate::db::models::Channel; -use crate::utils::errors::ProcessError; +use crate::player::output::{player, write_hls}; +use crate::utils::{config::PlayoutConfig, errors::ProcessError}; /// Defined process units. #[derive(Clone, Debug, Default, Copy, Eq, Serialize, Deserialize, PartialEq)] @@ -36,6 +40,7 @@ use ProcessUnit::*; #[derive(Clone, Debug, Default)] pub struct ChannelManager { + pub config: Arc>, pub channel: Arc>, pub decoder: Arc>>, pub encoder: Arc>>, @@ -46,13 +51,144 @@ pub struct ChannelManager { } impl ChannelManager { - pub fn new(channel: Channel) -> Self { + pub fn new(channel: Channel, config: PlayoutConfig) -> Self { Self { + is_alive: Arc::new(AtomicBool::new(channel.active)), channel: Arc::new(Mutex::new(channel)), - is_alive: Arc::new(AtomicBool::new(true)), + config: Arc::new(Mutex::new(config)), ..Default::default() } } + + pub fn update_channel(self, other: &Channel) { + let mut channel = self.channel.lock().unwrap(); + + channel.name = other.name.clone(); + channel.preview_url = other.preview_url.clone(); + channel.config_path = other.config_path.clone(); + channel.extra_extensions = other.extra_extensions.clone(); + channel.active = other.active.clone(); + channel.modified = other.modified.clone(); + channel.time_shift = other.time_shift.clone(); + channel.utc_offset = other.utc_offset.clone(); + } + + pub fn stop(&self, unit: ProcessUnit) -> Result<(), ProcessError> { + let mut channel = self.channel.lock()?; + + match unit { + Decoder => { + if let Some(proc) = self.decoder.lock()?.as_mut() { + #[cfg(not(windows))] + proc.term() + .map_err(|e| ProcessError::Custom(format!("Decoder: {e}")))?; + + #[cfg(windows)] + proc.kill() + .map_err(|e| ProcessError::Custom(format!("Decoder: {e}")))?; + } + } + Encoder => { + if let Some(proc) = self.encoder.lock()?.as_mut() { + proc.kill() + .map_err(|e| ProcessError::Custom(format!("Encoder: {e}")))?; + } + } + Ingest => { + if let Some(proc) = self.ingest.lock()?.as_mut() { + proc.kill() + .map_err(|e| ProcessError::Custom(format!("Ingest: {e}")))?; + } + } + } + + channel.active = false; + + self.wait(unit)?; + + Ok(()) + } + + /// Wait for process to proper close. + /// This prevents orphaned/zombi processes in system + pub fn wait(&self, unit: ProcessUnit) -> Result<(), ProcessError> { + match unit { + Decoder => { + if let Some(proc) = self.decoder.lock().unwrap().as_mut() { + proc.wait() + .map_err(|e| ProcessError::Custom(format!("Decoder: {e}")))?; + } + } + Encoder => { + if let Some(proc) = self.encoder.lock().unwrap().as_mut() { + proc.wait() + .map_err(|e| ProcessError::Custom(format!("Encoder: {e}")))?; + } + } + Ingest => { + if let Some(proc) = self.ingest.lock().unwrap().as_mut() { + proc.wait() + .map_err(|e| ProcessError::Custom(format!("Ingest: {e}")))?; + } + } + } + + Ok(()) + } + + /// No matter what is running, terminate them all. + pub fn stop_all(&self) { + debug!("Stop all child processes"); + self.is_terminated.store(true, Ordering::SeqCst); + self.ingest_is_running.store(false, Ordering::SeqCst); + + if self.is_alive.load(Ordering::SeqCst) { + self.is_alive.store(false, Ordering::SeqCst); + + trace!("Playout is alive and processes are terminated"); + + for unit in [Decoder, Encoder, Ingest] { + if let Err(e) = self.stop(unit) { + if !e.to_string().contains("exited process") { + error!("{e}") + } + } + if let Err(e) = self.wait(unit) { + if !e.to_string().contains("exited process") { + error!("{e}") + } + } + } + } + } +} + +/// Global playout control, for move forward/backward clip, or resetting playlist/state. +#[derive(Clone, Debug)] +pub struct PlayoutStatus { + pub chain: Option>>>, + pub current_date: Arc>, + pub date: Arc>, + pub list_init: Arc, + pub time_shift: Arc>, +} + +impl PlayoutStatus { + pub fn new() -> Self { + Self { + chain: None, + current_date: Arc::new(Mutex::new(String::new())), + date: Arc::new(Mutex::new(String::new())), + list_init: Arc::new(AtomicBool::new(true)), + time_shift: Arc::new(Mutex::new(0.0)), + } + } +} + +impl Default for PlayoutStatus { + fn default() -> Self { + Self::new() + } } #[derive(Clone, Debug, Default)] @@ -75,98 +211,17 @@ impl ChannelController { channel.id != channel_id }); } - - pub fn update_from(&mut self, other: &Channel, channel_id: i32) { - self.channels.iter_mut().for_each(|c| { - let mut channel = c.channel.lock().unwrap(); - - if channel.id == channel_id { - channel.name = other.name.clone(); - channel.preview_url = other.preview_url.clone(); - channel.config_path = other.config_path.clone(); - channel.extra_extensions = other.extra_extensions.clone(); - channel.active = other.active.clone(); - channel.utc_offset = other.utc_offset.clone(); - } - }) - } - - pub fn stop(mut self, channel_id: i32, unit: ProcessUnit) -> Result<(), ProcessError> { - for manager in self.channels.iter_mut() { - let mut channel = manager.channel.lock().unwrap(); - - if channel.id == channel_id { - match unit { - Decoder => { - if let Some(proc) = manager.decoder.lock().unwrap().as_mut() { - #[cfg(not(windows))] - proc.term() - .map_err(|e| ProcessError::Custom(format!("Decoder: {e}")))?; - - #[cfg(windows)] - proc.kill() - .map_err(|e| ProcessError::Custom(format!("Decoder: {e}")))?; - } - } - Encoder => { - if let Some(proc) = manager.encoder.lock().unwrap().as_mut() { - proc.kill() - .map_err(|e| ProcessError::Custom(format!("Encoder: {e}")))?; - } - } - Ingest => { - if let Some(proc) = manager.ingest.lock().unwrap().as_mut() { - proc.kill() - .map_err(|e| ProcessError::Custom(format!("Ingest: {e}")))?; - } - } - } - - channel.active = false; - } - } - - self.wait(channel_id, unit)?; - - Ok(()) - } - - /// Wait for process to proper close. - /// This prevents orphaned/zombi processes in system - pub fn wait(mut self, channel_id: i32, unit: ProcessUnit) -> Result<(), ProcessError> { - for manager in self.channels.iter_mut() { - let channel = manager.channel.lock().unwrap(); - - if channel.id == channel_id { - match unit { - Decoder => { - if let Some(proc) = manager.decoder.lock().unwrap().as_mut() { - proc.wait() - .map_err(|e| ProcessError::Custom(format!("Decoder: {e}")))?; - } - } - Encoder => { - if let Some(proc) = manager.encoder.lock().unwrap().as_mut() { - proc.wait() - .map_err(|e| ProcessError::Custom(format!("Encoder: {e}")))?; - } - } - Ingest => { - if let Some(proc) = manager.ingest.lock().unwrap().as_mut() { - proc.wait() - .map_err(|e| ProcessError::Custom(format!("Ingest: {e}")))?; - } - } - } - } - } - - Ok(()) - } } -pub fn play(controller: &mut ChannelController, channel: Channel) { - let manager = ChannelManager::new(channel); +pub fn start(controller: Arc>) -> Result<(), ProcessError> { + let config = controller.lock()?.config.lock()?.clone(); - controller.add(manager); + match config.out.mode { + // write files/playlist to HLS m3u8 playlist + HLS => write_hls(&config, play_control, playout_stat, proc_control), + // play on desktop or stream to a remote target + _ => player(&config, &play_control, playout_stat, proc_control), + }; + + Ok(()) } diff --git a/ffplayout/src/player/filter/custom.rs b/ffplayout/src/player/filter/custom.rs new file mode 100644 index 00000000..851d83ef --- /dev/null +++ b/ffplayout/src/player/filter/custom.rs @@ -0,0 +1,39 @@ +use regex::Regex; +use simplelog::*; + +/// Apply custom filters +pub fn filter_node(filter: &str) -> (String, String) { + let re = Regex::new(r"^;?(\[[0-9]:[^\[]+\])?|\[[^\[]+\]$").unwrap(); // match start/end link + let mut video_filter = String::new(); + let mut audio_filter = String::new(); + + // match chain with audio and video filter + if filter.contains("[c_v_out]") && filter.contains("[c_a_out]") { + let v_pos = filter.find("[c_v_out]").unwrap(); + let a_pos = filter.find("[c_a_out]").unwrap(); + let mut delimiter = "[c_v_out]"; + + // split delimiter should be first filter output link + if v_pos > a_pos { + delimiter = "[c_a_out]"; + } + + if let Some((f_1, f_2)) = filter.split_once(delimiter) { + if f_2.contains("[c_a_out]") { + video_filter = re.replace_all(f_1, "").to_string(); + audio_filter = re.replace_all(f_2, "").to_string(); + } else { + video_filter = re.replace_all(f_2, "").to_string(); + audio_filter = re.replace_all(f_1, "").to_string(); + } + } + } else if filter.contains("[c_v_out]") { + video_filter = re.replace_all(filter, "").to_string(); + } else if filter.contains("[c_a_out]") { + audio_filter = re.replace_all(filter, "").to_string(); + } else if !filter.is_empty() && filter != "~" { + error!("Custom filter is not well formatted, use correct out link names (\"[c_v_out]\" and/or \"[c_a_out]\"). Filter skipped!") + } + + (video_filter, audio_filter) +} diff --git a/ffplayout/src/player/filter/mod.rs b/ffplayout/src/player/filter/mod.rs new file mode 100644 index 00000000..910575d0 --- /dev/null +++ b/ffplayout/src/player/filter/mod.rs @@ -0,0 +1,794 @@ +use std::{ + fmt, + path::Path, + sync::{Arc, Mutex}, +}; + +use regex::Regex; +use simplelog::*; + +mod custom; +pub mod v_drawtext; + +use crate::player::{ + controller::ProcessUnit::*, + utils::{custom_format, fps_calc, is_close, Media}, +}; +use crate::utils::config::{OutputMode::*, PlayoutConfig}; + +use crate::vec_strings; + +#[derive(Clone, Debug, Copy, Eq, PartialEq)] +pub enum FilterType { + Audio, + Video, +} + +impl fmt::Display for FilterType { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match *self { + FilterType::Audio => write!(f, "a"), + FilterType::Video => write!(f, "v"), + } + } +} + +use FilterType::*; + +#[derive(Debug, Clone)] +pub struct Filters { + pub audio_chain: String, + pub video_chain: String, + pub output_chain: Vec, + pub audio_map: Vec, + pub video_map: Vec, + pub audio_out_link: Vec, + pub video_out_link: Vec, + pub output_map: Vec, + config: PlayoutConfig, + audio_position: i32, + video_position: i32, + audio_last: i32, + video_last: i32, +} + +impl Filters { + pub fn new(config: PlayoutConfig, audio_position: i32) -> Self { + Self { + audio_chain: String::new(), + video_chain: String::new(), + output_chain: vec![], + audio_map: vec![], + video_map: vec![], + audio_out_link: vec![], + video_out_link: vec![], + output_map: vec![], + config, + audio_position, + video_position: 0, + audio_last: -1, + video_last: -1, + } + } + + pub fn add_filter(&mut self, filter: &str, track_nr: i32, filter_type: FilterType) { + let (map, chain, position, last) = match filter_type { + Audio => ( + &mut self.audio_map, + &mut self.audio_chain, + self.audio_position, + &mut self.audio_last, + ), + Video => ( + &mut self.video_map, + &mut self.video_chain, + self.video_position, + &mut self.video_last, + ), + }; + + if *last != track_nr { + // start new filter chain + let mut selector = String::new(); + let mut sep = String::new(); + if !chain.is_empty() { + selector = format!("[{filter_type}out{last}]"); + sep = ";".to_string() + } + + chain.push_str(&selector); + + if filter.starts_with("aevalsrc") || filter.starts_with("movie") { + chain.push_str(&format!("{sep}{filter}")); + } else { + chain.push_str(&format!( + // build audio/video selector like [0:a:0] + "{sep}[{position}:{filter_type}:{track_nr}]{filter}", + )); + } + + let m = format!("[{filter_type}out{track_nr}]"); + map.push(m.clone()); + self.output_map.append(&mut vec_strings!["-map", m]); + *last = track_nr; + } else if filter.starts_with(';') || filter.starts_with('[') { + chain.push_str(filter); + } else { + chain.push_str(&format!(",{filter}")) + } + } + + pub fn cmd(&mut self) -> Vec { + if !self.output_chain.is_empty() { + return self.output_chain.clone(); + } + + let mut v_chain = self.video_chain.clone(); + let mut a_chain = self.audio_chain.clone(); + + if self.video_last >= 0 && !v_chain.ends_with(']') { + v_chain.push_str(&format!("[vout{}]", self.video_last)); + } + + if self.audio_last >= 0 && !a_chain.ends_with(']') { + a_chain.push_str(&format!("[aout{}]", self.audio_last)); + } + + let mut f_chain = v_chain; + let mut cmd = vec![]; + + if !a_chain.is_empty() { + if !f_chain.is_empty() { + f_chain.push(';'); + } + + f_chain.push_str(&a_chain); + } + + if !f_chain.is_empty() { + cmd.push("-filter_complex".to_string()); + cmd.push(f_chain); + } + + cmd + } + + pub fn map(&mut self) -> Vec { + let mut o_map = self.output_map.clone(); + + if self.video_last == -1 && !self.config.processing.audio_only { + let v_map = "0:v".to_string(); + + if !o_map.contains(&v_map) { + o_map.append(&mut vec_strings!["-map", v_map]); + }; + } + + if self.audio_last == -1 { + for i in 0..self.config.processing.audio_tracks { + let a_map = format!("{}:a:{i}", self.audio_position); + + if !o_map.contains(&a_map) { + o_map.append(&mut vec_strings!["-map", a_map]); + }; + } + } + + o_map + } +} + +impl Default for Filters { + fn default() -> Self { + Self::new(PlayoutConfig::new(None, None), 0) + } +} + +fn deinterlace(field_order: &Option, chain: &mut Filters, config: &PlayoutConfig) { + if let Some(order) = field_order { + if order != "progressive" { + let deinterlace = match config + .advanced + .as_ref() + .and_then(|a| a.decoder.filters.deinterlace.clone()) + { + Some(deinterlace) => deinterlace, + None => "yadif=0:-1:0".to_string(), + }; + + chain.add_filter(&deinterlace, 0, Video); + } + } +} + +fn pad(aspect: f64, chain: &mut Filters, v_stream: &ffprobe::Stream, config: &PlayoutConfig) { + if !is_close(aspect, config.processing.aspect, 0.03) { + let mut scale = String::new(); + + if let (Some(w), Some(h)) = (v_stream.width, v_stream.height) { + if w > config.processing.width && aspect > config.processing.aspect { + scale = match config + .advanced + .as_ref() + .and_then(|a| a.decoder.filters.pad_scale_w.clone()) + { + Some(pad_scale_w) => { + custom_format(&format!("{pad_scale_w},"), &[&config.processing.width]) + } + None => format!("scale={}:-1,", config.processing.width), + }; + } else if h > config.processing.height && aspect < config.processing.aspect { + scale = match config + .advanced + .as_ref() + .and_then(|a| a.decoder.filters.pad_scale_h.clone()) + { + Some(pad_scale_h) => { + custom_format(&format!("{pad_scale_h},"), &[&config.processing.width]) + } + None => format!("scale=-1:{},", config.processing.height), + }; + } + } + + let pad = match config + .advanced + .as_ref() + .and_then(|a| a.decoder.filters.pad_video.clone()) + { + Some(pad_video) => custom_format( + &format!("{scale}{pad_video}"), + &[ + &config.processing.width.to_string(), + &config.processing.height.to_string(), + ], + ), + None => format!( + "{}pad=max(iw\\,ih*({1}/{2})):ow/({1}/{2}):(ow-iw)/2:(oh-ih)/2", + scale, config.processing.width, config.processing.height + ), + }; + + chain.add_filter(&pad, 0, Video) + } +} + +fn fps(fps: f64, chain: &mut Filters, config: &PlayoutConfig) { + if fps != config.processing.fps { + let fps_filter = match config + .advanced + .as_ref() + .and_then(|a| a.decoder.filters.fps.clone()) + { + Some(fps) => custom_format(&fps, &[&config.processing.fps]), + None => format!("fps={}", config.processing.fps), + }; + + chain.add_filter(&fps_filter, 0, Video) + } +} + +fn scale( + width: Option, + height: Option, + aspect: f64, + chain: &mut Filters, + config: &PlayoutConfig, +) { + // width: i64, height: i64 + if let (Some(w), Some(h)) = (width, height) { + if w != config.processing.width || h != config.processing.height { + let scale = match config + .advanced + .as_ref() + .and_then(|a| a.decoder.filters.scale.clone()) + { + Some(scale) => custom_format( + &scale, + &[&config.processing.width, &config.processing.height], + ), + None => format!( + "scale={}:{}", + config.processing.width, config.processing.height + ), + }; + + chain.add_filter(&scale, 0, Video); + } else { + chain.add_filter("null", 0, Video); + } + + if !is_close(aspect, config.processing.aspect, 0.03) { + let dar = match config + .advanced + .as_ref() + .and_then(|a| a.decoder.filters.set_dar.clone()) + { + Some(set_dar) => custom_format(&set_dar, &[&config.processing.aspect]), + None => format!("setdar=dar={}", config.processing.aspect), + }; + + chain.add_filter(&dar, 0, Video); + } + } else { + let scale = match config + .advanced + .as_ref() + .and_then(|a| a.decoder.filters.scale.clone()) + { + Some(scale) => custom_format( + &scale, + &[&config.processing.width, &config.processing.height], + ), + None => format!( + "scale={}:{}", + config.processing.width, config.processing.height + ), + }; + chain.add_filter(&scale, 0, Video); + + let dar = match config + .advanced + .as_ref() + .and_then(|a| a.decoder.filters.set_dar.clone()) + { + Some(set_dar) => custom_format(&set_dar, &[&config.processing.aspect]), + None => format!("setdar=dar={}", config.processing.aspect), + }; + + chain.add_filter(&dar, 0, Video); + } +} + +fn fade( + node: &mut Media, + chain: &mut Filters, + nr: i32, + filter_type: FilterType, + config: &PlayoutConfig, +) { + let mut t = ""; + let mut fade_audio = false; + + if filter_type == Audio { + t = "a"; + + if node.duration_audio > 0.0 && node.duration_audio != node.duration { + fade_audio = true; + } + } + + if node.seek > 0.0 || node.unit == Ingest { + let mut fade_in = format!("{t}fade=in:st=0:d=0.5"); + + if t == "a" { + if let Some(fade) = config + .advanced + .as_ref() + .and_then(|a| a.decoder.filters.afade_in.clone()) + { + fade_in = custom_format(&fade, &[t]); + } + } else if let Some(fade) = config + .advanced + .as_ref() + .and_then(|a| a.decoder.filters.fade_in.clone()) + { + fade_in = custom_format(&fade, &[t]); + }; + + chain.add_filter(&fade_in, nr, filter_type); + } + + if (node.out != node.duration && node.out - node.seek > 1.0) || fade_audio { + let mut fade_out = format!("{t}fade=out:st={}:d=1.0", (node.out - node.seek - 1.0)); + + if t == "a" { + if let Some(fade) = config + .advanced + .as_ref() + .and_then(|a| a.decoder.filters.afade_out.clone()) + { + fade_out = custom_format(&fade, &[node.out - node.seek - 1.0]); + } + } else if let Some(fade) = config + .advanced + .as_ref() + .and_then(|a| a.decoder.filters.fade_out.clone()) + .clone() + { + fade_out = custom_format(&fade, &[node.out - node.seek - 1.0]); + }; + + chain.add_filter(&fade_out, nr, filter_type); + } +} + +fn overlay(node: &mut Media, chain: &mut Filters, config: &PlayoutConfig) { + if config.processing.add_logo + && Path::new(&config.processing.logo).is_file() + && &node.category != "advertisement" + { + let mut logo_chain = format!( + "null[v];movie={}:loop=0,setpts=N/(FRAME_RATE*TB),format=rgba,colorchannelmixer=aa={}", + config + .processing + .logo + .replace('\\', "/") + .replace(':', "\\\\:"), + config.processing.logo_opacity, + ); + + if node.last_ad { + match config + .advanced + .as_ref() + .and_then(|a| a.decoder.filters.overlay_logo_fade_in.clone()) + { + Some(fade_in) => logo_chain.push_str(&format!(",{fade_in}")), + None => logo_chain.push_str(",fade=in:st=0:d=1.0:alpha=1"), + }; + } + + if node.next_ad { + let length = node.out - node.seek - 1.0; + + match config + .advanced + .as_ref() + .and_then(|a| a.decoder.filters.overlay_logo_fade_out.clone()) + { + Some(fade_out) => { + logo_chain.push_str(&custom_format(&format!(",{fade_out}"), &[length])) + } + None => logo_chain.push_str(&format!(",fade=out:st={length}:d=1.0:alpha=1")), + } + } + + if !config.processing.logo_scale.is_empty() { + match &config + .advanced + .as_ref() + .and_then(|a| a.decoder.filters.overlay_logo_scale.clone()) + { + Some(logo_scale) => logo_chain.push_str(&custom_format( + &format!(",{logo_scale}"), + &[&config.processing.logo_scale], + )), + None => logo_chain.push_str(&format!(",scale={}", config.processing.logo_scale)), + } + } + + match config + .advanced + .as_ref() + .and_then(|a| a.decoder.filters.overlay_logo.clone()) + { + Some(overlay) => { + if !overlay.starts_with(',') { + logo_chain.push(','); + } + + logo_chain.push_str(&custom_format( + &overlay, + &[&config.processing.logo_position], + )) + } + None => logo_chain.push_str(&format!( + "[l];[v][l]overlay={}:shortest=1", + config.processing.logo_position + )), + }; + + chain.add_filter(&logo_chain, 0, Video); + } +} + +fn extend_video(node: &mut Media, chain: &mut Filters, config: &PlayoutConfig) { + if let Some(video_duration) = node + .probe + .as_ref() + .and_then(|p| p.video_streams.first()) + .and_then(|v| v.duration.as_ref()) + .and_then(|v| v.parse::().ok()) + { + if node.out - node.seek > video_duration - node.seek + 0.1 && node.duration >= node.out { + let duration = (node.out - node.seek) - (video_duration - node.seek); + + let tpad = match config + .advanced + .as_ref() + .and_then(|a| a.decoder.filters.tpad.clone()) + { + Some(pad) => custom_format(&pad, &[duration]), + None => format!("tpad=stop_mode=add:stop_duration={duration}"), + }; + + chain.add_filter(&tpad, 0, Video) + } + } +} + +/// add drawtext filter for lower thirds messages +fn add_text( + node: &mut Media, + chain: &mut Filters, + config: &PlayoutConfig, + filter_chain: &Option>>>, +) { + if config.text.add_text + && (config.text.text_from_filename || config.out.mode == HLS || node.unit == Encoder) + { + let filter = v_drawtext::filter_node(config, Some(node), filter_chain); + + chain.add_filter(&filter, 0, Video); + } +} + +fn add_audio(node: &Media, chain: &mut Filters, nr: i32, config: &PlayoutConfig) { + let audio = match config + .advanced + .as_ref() + .and_then(|a| a.decoder.filters.aevalsrc.clone()) + { + Some(aevalsrc) => custom_format(&aevalsrc, &[node.out - node.seek]), + None => format!( + "aevalsrc=0:channel_layout=stereo:duration={}:sample_rate=48000", + node.out - node.seek + ), + }; + + chain.add_filter(&audio, nr, Audio); +} + +fn extend_audio(node: &mut Media, chain: &mut Filters, nr: i32, config: &PlayoutConfig) { + if !Path::new(&node.audio).is_file() { + if let Some(audio_duration) = node + .probe + .as_ref() + .and_then(|p| p.audio_streams.first()) + .and_then(|a| a.duration.clone()) + .and_then(|a| a.parse::().ok()) + { + if node.out - node.seek > audio_duration - node.seek + 0.1 && node.duration >= node.out + { + let apad = match config + .advanced + .as_ref() + .and_then(|a| a.decoder.filters.apad.clone()) + { + Some(apad) => custom_format(&apad, &[node.out - node.seek]), + None => format!("apad=whole_dur={}", node.out - node.seek), + }; + + chain.add_filter(&apad, nr, Audio) + } + } + } +} + +fn audio_volume(chain: &mut Filters, config: &PlayoutConfig, nr: i32) { + if config.processing.volume != 1.0 { + let volume = match config + .advanced + .as_ref() + .and_then(|a| a.decoder.filters.volume.clone()) + { + Some(volume) => custom_format(&volume, &[config.processing.volume]), + None => format!("volume={}", config.processing.volume), + }; + + chain.add_filter(&volume, nr, Audio) + } +} + +fn aspect_calc(aspect_string: &Option, config: &PlayoutConfig) -> f64 { + let mut source_aspect = config.processing.aspect; + + if let Some(aspect) = aspect_string { + let aspect_vec: Vec<&str> = aspect.split(':').collect(); + let w = aspect_vec[0].parse::().unwrap(); + let h = aspect_vec[1].parse::().unwrap(); + source_aspect = w / h; + } + + source_aspect +} + +pub fn split_filter( + chain: &mut Filters, + count: usize, + nr: i32, + filter_type: FilterType, + config: &PlayoutConfig, +) { + if count > 1 { + let out_link = match filter_type { + Audio => &mut chain.audio_out_link, + Video => &mut chain.video_out_link, + }; + + for i in 0..count { + let link = format!("[{filter_type}out_{nr}_{i}]"); + if !out_link.contains(&link) { + out_link.push(link) + } + } + + let split = match config + .advanced + .as_ref() + .and_then(|a| a.decoder.filters.split.clone()) + { + Some(split) => custom_format(&split, &[count.to_string(), out_link.join("")]), + None => format!("split={count}{}", out_link.join("")), + }; + + chain.add_filter(&split, nr, filter_type); + } +} + +/// Process output filter chain and add new filters to existing ones. +fn process_output_filters(config: &PlayoutConfig, chain: &mut Filters, custom_filter: &str) { + let filter = + if (config.text.add_text && !config.text.text_from_filename) || config.out.mode == HLS { + let re_v = Regex::new(r"\[[0:]+[v^\[]+([:0]+)?\]").unwrap(); // match video filter input link + let _re_a = Regex::new(r"\[[0:]+[a^\[]+([:0]+)?\]").unwrap(); // match video filter input link + let mut cf = custom_filter.to_string(); + + if !chain.video_chain.is_empty() { + cf = re_v + .replace(&cf, &format!("{},", chain.video_chain)) + .to_string() + } + + if !chain.audio_chain.is_empty() { + let audio_split = chain + .audio_chain + .split(';') + .enumerate() + .map(|(i, p)| p.replace(&format!("[aout{i}]"), "")) + .collect::>(); + + for i in 0..config.processing.audio_tracks { + cf = cf.replace( + &format!("[0:a:{i}]"), + &format!("{},", &audio_split[i as usize]), + ) + } + } + + cf + } else { + custom_filter.to_string() + }; + + chain.output_chain = vec_strings!["-filter_complex", filter] +} + +fn custom(filter: &str, chain: &mut Filters, nr: i32, filter_type: FilterType) { + if !filter.is_empty() { + chain.add_filter(filter, nr, filter_type); + } +} + +pub fn filter_chains( + config: &PlayoutConfig, + node: &mut Media, + filter_chain: &Option>>>, +) -> Filters { + let mut filters = Filters::new(config.clone(), 0); + + if node.unit == Encoder { + if !config.processing.audio_only { + add_text(node, &mut filters, config, filter_chain); + } + + if let Some(f) = config.out.output_filter.clone() { + process_output_filters(config, &mut filters, &f) + } else if config.out.output_count > 1 && !config.processing.audio_only { + split_filter(&mut filters, config.out.output_count, 0, Video, config); + } + + return filters; + } + + if !config.processing.audio_only && !config.processing.copy_video { + if let Some(probe) = node.probe.as_ref() { + if Path::new(&node.audio).is_file() { + filters.audio_position = 1; + } + + if let Some(v_stream) = &probe.video_streams.first() { + let aspect = aspect_calc(&v_stream.display_aspect_ratio, config); + let frame_per_sec = fps_calc(&v_stream.r_frame_rate, 1.0); + + deinterlace(&v_stream.field_order, &mut filters, config); + pad(aspect, &mut filters, v_stream, config); + fps(frame_per_sec, &mut filters, config); + scale( + v_stream.width, + v_stream.height, + aspect, + &mut filters, + config, + ); + } + + extend_video(node, &mut filters, config); + } else { + fps(0.0, &mut filters, config); + scale(None, None, 1.0, &mut filters, config); + } + + add_text(node, &mut filters, config, filter_chain); + fade(node, &mut filters, 0, Video, config); + overlay(node, &mut filters, config); + } + + let (proc_vf, proc_af) = if node.unit == Ingest { + custom::filter_node(&config.ingest.custom_filter) + } else { + custom::filter_node(&config.processing.custom_filter) + }; + + let (list_vf, list_af) = custom::filter_node(&node.custom_filter); + + if !config.processing.copy_video { + custom(&proc_vf, &mut filters, 0, Video); + custom(&list_vf, &mut filters, 0, Video); + } + + let mut audio_indexes = vec![]; + + if config.processing.audio_track_index == -1 { + for i in 0..config.processing.audio_tracks { + audio_indexes.push(i) + } + } else { + audio_indexes.push(config.processing.audio_track_index) + } + + if !config.processing.copy_audio { + for i in audio_indexes { + if node + .probe + .as_ref() + .and_then(|p| p.audio_streams.get(i as usize)) + .is_some() + || Path::new(&node.audio).is_file() + { + extend_audio(node, &mut filters, i, config); + } else if node.unit == Decoder { + if !node.source.contains("color=c=") { + warn!( + "Missing audio track (id {i}) from {}", + node.source + ); + } + + add_audio(node, &mut filters, i, config); + } + + // add at least anull filter, for correct filter construction, + // is important for split filter in HLS mode + filters.add_filter("anull", i, Audio); + + fade(node, &mut filters, i, Audio, config); + audio_volume(&mut filters, config, i); + + custom(&proc_af, &mut filters, i, Audio); + custom(&list_af, &mut filters, i, Audio); + } + } else if config.processing.audio_track_index > -1 { + error!("Setting 'audio_track_index' other than '-1' is not allowed in audio copy mode!") + } + + if config.out.mode == HLS { + if let Some(f) = config.out.output_filter.clone() { + process_output_filters(config, &mut filters, &f) + } + } + + filters +} diff --git a/ffplayout/src/player/filter/v_drawtext.rs b/ffplayout/src/player/filter/v_drawtext.rs new file mode 100644 index 00000000..80a53997 --- /dev/null +++ b/ffplayout/src/player/filter/v_drawtext.rs @@ -0,0 +1,82 @@ +use std::{ + ffi::OsStr, + path::Path, + sync::{Arc, Mutex}, +}; + +use regex::Regex; + +use crate::player::{ + controller::ProcessUnit::*, + utils::{custom_format, Media}, +}; +use crate::utils::config::PlayoutConfig; + +pub fn filter_node( + config: &PlayoutConfig, + node: Option<&Media>, + filter_chain: &Option>>>, +) -> String { + let mut filter = String::new(); + let mut font = String::new(); + + if Path::new(&config.text.fontfile).is_file() { + font = format!(":fontfile='{}'", config.text.fontfile) + } + + let zmq_socket = match node.map(|n| n.unit) { + Some(Ingest) => config.text.zmq_server_socket.clone(), + _ => config.text.zmq_stream_socket.clone(), + }; + + if config.text.text_from_filename && node.is_some() { + let source = node.unwrap_or(&Media::new(0, "", false)).source.clone(); + let text = match Regex::new(&config.text.regex) + .ok() + .and_then(|r| r.captures(&source)) + { + Some(t) => t[1].to_string(), + None => Path::new(&source) + .file_stem() + .unwrap_or_else(|| OsStr::new(&source)) + .to_string_lossy() + .to_string(), + }; + + let escaped_text = text + .replace('\'', "'\\\\\\''") + .replace('%', "\\\\\\%") + .replace(':', "\\:"); + + filter = match &config + .advanced + .clone() + .and_then(|a| a.decoder.filters.drawtext_from_file) + { + Some(drawtext) => custom_format(drawtext, &[&escaped_text, &config.text.style, &font]), + None => format!("drawtext=text='{escaped_text}':{}{font}", config.text.style), + }; + } else if let Some(socket) = zmq_socket { + let mut filter_cmd = format!("text=''{font}"); + + if let Some(chain) = filter_chain { + if let Some(link) = chain.lock().unwrap().iter().find(|&l| l.contains("text")) { + filter_cmd = link.to_string(); + } + } + + filter = match config + .advanced + .as_ref() + .and_then(|a| a.decoder.filters.drawtext_from_zmq.clone()) + { + Some(drawtext) => custom_format(&drawtext, &[&socket.replace(':', "\\:"), &filter_cmd]), + None => format!( + "zmq=b=tcp\\\\://'{}',drawtext@dyntext={filter_cmd}", + socket.replace(':', "\\:") + ), + }; + } + + filter +} diff --git a/ffplayout/src/player/input/folder.rs b/ffplayout/src/player/input/folder.rs new file mode 100644 index 00000000..ebf28bad --- /dev/null +++ b/ffplayout/src/player/input/folder.rs @@ -0,0 +1,102 @@ +use std::{ + path::Path, + sync::{ + atomic::{AtomicBool, Ordering}, + mpsc::channel, + {Arc, Mutex}, + }, + thread::sleep, + time::Duration, +}; + +use notify::{ + event::{CreateKind, ModifyKind, RemoveKind, RenameMode}, + EventKind::{Create, Modify, Remove}, + RecursiveMode, Watcher, +}; +use notify_debouncer_full::new_debouncer; +use simplelog::*; + +use ffplayout_lib::utils::{include_file_extension, Media, PlayoutConfig}; + +/// Create a watcher, which monitor file changes. +/// When a change is register, update the current file list. +/// This makes it possible, to play infinitely and and always new files to it. +pub fn watchman( + config: PlayoutConfig, + is_terminated: Arc, + sources: Arc>>, +) { + let path = Path::new(&config.storage.path); + + if !path.exists() { + error!("Folder path not exists: '{path:?}'"); + panic!("Folder path not exists: '{path:?}'"); + } + + // let (tx, rx) = channel(); + let (tx, rx) = channel(); + + let mut debouncer = new_debouncer(Duration::from_secs(1), None, tx).unwrap(); + + debouncer + .watcher() + .watch(path, RecursiveMode::Recursive) + .unwrap(); + debouncer.cache().add_root(path, RecursiveMode::Recursive); + + while !is_terminated.load(Ordering::SeqCst) { + if let Ok(result) = rx.try_recv() { + match result { + Ok(events) => events.iter().for_each(|event| match event.kind { + Create(CreateKind::File) | Modify(ModifyKind::Name(RenameMode::To)) => { + let new_path = &event.paths[0]; + + if new_path.is_file() && include_file_extension(&config, new_path) { + let index = sources.lock().unwrap().len(); + let media = Media::new(index, &new_path.to_string_lossy(), false); + + sources.lock().unwrap().push(media); + info!("Create new file: {new_path:?}"); + } + } + Remove(RemoveKind::File) | Modify(ModifyKind::Name(RenameMode::From)) => { + let old_path = &event.paths[0]; + + if !old_path.is_file() && include_file_extension(&config, old_path) { + sources + .lock() + .unwrap() + .retain(|x| x.source != old_path.to_string_lossy()); + info!("Remove file: {old_path:?}"); + } + } + Modify(ModifyKind::Name(RenameMode::Both)) => { + let old_path = &event.paths[0]; + let new_path = &event.paths[1]; + + let mut media_list = sources.lock().unwrap(); + + if let Some(index) = media_list + .iter() + .position(|x| *x.source == old_path.display().to_string()) { + let media = Media::new(index, &new_path.to_string_lossy(), false); + media_list[index] = media; + info!("Move file: {old_path:?} to {new_path:?}"); + } else if include_file_extension(&config, new_path) { + let index = media_list.len(); + let media = Media::new(index, &new_path.to_string_lossy(), false); + + media_list.push(media); + info!("Create new file: {new_path:?}"); + } + } + _ => debug!("Not tracked file event: {event:?}") + }), + Err(errors) => errors.iter().for_each(|error| error!("{error:?}")), + } + } + + sleep(Duration::from_secs(3)); + } +} diff --git a/ffplayout/src/player/input/ingest.rs b/ffplayout/src/player/input/ingest.rs new file mode 100644 index 00000000..096f5c36 --- /dev/null +++ b/ffplayout/src/player/input/ingest.rs @@ -0,0 +1,168 @@ +use std::{ + io::{BufRead, BufReader, Error, Read}, + process::{exit, ChildStderr, Command, Stdio}, + sync::atomic::Ordering, + thread, +}; + +use crossbeam_channel::Sender; +use simplelog::*; + +use crate::player::utils::valid_stream; +use crate::utils::logging::log_line; +use ffplayout_lib::{ + utils::{ + controller::ProcessUnit::*, test_tcp_port, Media, PlayoutConfig, ProcessControl, + FFMPEG_IGNORE_ERRORS, FFMPEG_UNRECOVERABLE_ERRORS, + }, + vec_strings, +}; + +fn server_monitor( + level: &str, + ignore: Vec, + buffer: BufReader, + proc_ctl: ProcessControl, +) -> Result<(), Error> { + for line in buffer.lines() { + let line = line?; + + if !FFMPEG_IGNORE_ERRORS.iter().any(|i| line.contains(*i)) + && !ignore.iter().any(|i| line.contains(i)) + { + log_line(&line, level); + } + + if line.contains("rtmp") && line.contains("Unexpected stream") && !valid_stream(&line) { + if let Err(e) = proc_ctl.stop(Ingest) { + error!("{e}"); + }; + } + + if FFMPEG_UNRECOVERABLE_ERRORS + .iter() + .any(|i| line.contains(*i)) + { + proc_ctl.stop_all(); + } + } + + Ok(()) +} + +/// ffmpeg Ingest Server +/// +/// Start ffmpeg in listen mode, and wait for input. +pub fn ingest_server( + config: PlayoutConfig, + ingest_sender: Sender<(usize, [u8; 65088])>, + proc_control: ProcessControl, +) -> Result<(), Error> { + let mut buffer: [u8; 65088] = [0; 65088]; + let mut server_cmd = vec_strings!["-hide_banner", "-nostats", "-v", "level+info"]; + let stream_input = config.ingest.input_cmd.clone().unwrap(); + let mut dummy_media = Media::new(0, "Live Stream", false); + dummy_media.unit = Ingest; + dummy_media.add_filter(&config, &None); + + if let Some(ingest_input_cmd) = config + .advanced + .as_ref() + .and_then(|a| a.ingest.input_cmd.clone()) + { + server_cmd.append(&mut ingest_input_cmd.clone()); + } + + server_cmd.append(&mut stream_input.clone()); + + if let Some(mut filter) = dummy_media.filter { + server_cmd.append(&mut filter.cmd()); + server_cmd.append(&mut filter.map()); + } + + if let Some(mut cmd) = config.processing.cmd { + server_cmd.append(&mut cmd); + } + + let mut is_running; + + if let Some(url) = stream_input.iter().find(|s| s.contains("://")) { + if !test_tcp_port(url) { + proc_control.stop_all(); + exit(1); + } + + info!("Start ingest server, listening on: {url}",); + }; + + debug!( + "Server CMD: \"ffmpeg {}\"", + server_cmd.join(" ") + ); + + while !proc_control.is_terminated.load(Ordering::SeqCst) { + let proc_ctl = proc_control.clone(); + let level = config.logging.ingest_level.clone().unwrap(); + let ignore = config.logging.ignore_lines.clone(); + let mut server_proc = match Command::new("ffmpeg") + .args(server_cmd.clone()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn() + { + Err(e) => { + error!("couldn't spawn ingest server: {e}"); + panic!("couldn't spawn ingest server: {e}") + } + Ok(proc) => proc, + }; + let mut ingest_reader = BufReader::new(server_proc.stdout.take().unwrap()); + let server_err = BufReader::new(server_proc.stderr.take().unwrap()); + let error_reader_thread = + thread::spawn(move || server_monitor(&level, ignore, server_err, proc_ctl)); + + *proc_control.server_term.lock().unwrap() = Some(server_proc); + is_running = false; + + loop { + let bytes_len = match ingest_reader.read(&mut buffer[..]) { + Ok(length) => length, + Err(e) => { + debug!("Ingest server read {e:?}"); + break; + } + }; + + if !is_running { + proc_control.server_is_running.store(true, Ordering::SeqCst); + is_running = true; + } + + if bytes_len > 0 { + if let Err(e) = ingest_sender.send((bytes_len, buffer)) { + error!("Ingest server write error: {e:?}"); + + proc_control.is_terminated.store(true, Ordering::SeqCst); + break; + } + } else { + break; + } + } + + drop(ingest_reader); + proc_control + .server_is_running + .store(false, Ordering::SeqCst); + + if let Err(e) = proc_control.wait(Ingest) { + error!("{e}") + } + + if let Err(e) = error_reader_thread.join() { + error!("{e:?}"); + }; + } + + Ok(()) +} diff --git a/ffplayout/src/player/input/mod.rs b/ffplayout/src/player/input/mod.rs new file mode 100644 index 00000000..ce69eaff --- /dev/null +++ b/ffplayout/src/player/input/mod.rs @@ -0,0 +1,51 @@ +use std::{ + sync::{atomic::AtomicBool, Arc}, + thread, +}; + +use simplelog::*; + +pub mod folder; +pub mod ingest; +pub mod playlist; + +pub use folder::watchman; +pub use ingest::ingest_server; +pub use playlist::CurrentProgram; + +use crate::utils::config::PlayoutConfig; +use ffplayout_lib::utils::{controller::PlayerControl, folder::FolderSource}; +use ffplayout_lib::utils::{Media, PlayoutStatus, ProcessMode::*}; + +/// Create a source iterator from playlist, or from folder. +pub fn source_generator( + config: PlayoutConfig, + player_control: &PlayerControl, + playout_stat: PlayoutStatus, + is_terminated: Arc, +) -> Box> { + match config.processing.mode { + Folder => { + info!("Playout in folder mode"); + debug!( + "Monitor folder: {:?}", + config.storage.path + ); + + let config_clone = config.clone(); + let folder_source = FolderSource::new(&config, playout_stat.chain, player_control); + let node_clone = folder_source.player_control.current_list.clone(); + + // Spawn a thread to monitor folder for file changes. + thread::spawn(move || watchman(config_clone, is_terminated.clone(), node_clone)); + + Box::new(folder_source) as Box> + } + Playlist => { + info!("Playout in playlist mode"); + let program = CurrentProgram::new(&config, playout_stat, is_terminated, player_control); + + Box::new(program) as Box> + } + } +} diff --git a/ffplayout/src/player/input/playlist.rs b/ffplayout/src/player/input/playlist.rs new file mode 100644 index 00000000..f91fb249 --- /dev/null +++ b/ffplayout/src/player/input/playlist.rs @@ -0,0 +1,848 @@ +use std::{ + fs, + path::Path, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, +}; + +use serde_json::json; +use simplelog::*; + +use ffplayout_lib::utils::{ + controller::PlayerControl, + gen_dummy, get_delta, is_close, is_remote, + json_serializer::{read_json, set_defaults}, + loop_filler, loop_image, modified_time, seek_and_length, time_in_seconds, JsonPlaylist, Media, + MediaProbe, PlayoutConfig, PlayoutStatus, IMAGE_FORMAT, +}; + +/// Struct for current playlist. +/// +/// Here we prepare the init clip and build a iterator where we pull our clips. +#[derive(Debug)] +pub struct CurrentProgram { + config: PlayoutConfig, + start_sec: f64, + end_sec: f64, + json_playlist: JsonPlaylist, + player_control: PlayerControl, + current_node: Media, + is_terminated: Arc, + playout_stat: PlayoutStatus, + last_json_path: Option, + last_node_ad: bool, +} + +/// Prepare a playlist iterator. +impl CurrentProgram { + pub fn new( + config: &PlayoutConfig, + playout_stat: PlayoutStatus, + is_terminated: Arc, + player_control: &PlayerControl, + ) -> Self { + Self { + config: config.clone(), + start_sec: config.playlist.start_sec.unwrap(), + end_sec: config.playlist.length_sec.unwrap(), + json_playlist: JsonPlaylist::new( + "1970-01-01".to_string(), + config.playlist.start_sec.unwrap(), + ), + player_control: player_control.clone(), + current_node: Media::new(0, "", false), + is_terminated, + playout_stat, + last_json_path: None, + last_node_ad: false, + } + } + + // Check if there is no current playlist or file got updated, + // and when is so load/reload it. + fn load_or_update_playlist(&mut self, seek: bool) { + let mut get_current = false; + let mut reload = false; + + if let Some(path) = self.json_playlist.path.clone() { + if (Path::new(&path).is_file() || is_remote(&path)) + && self.json_playlist.modified != modified_time(&path) + { + info!("Reload playlist {path}"); + self.playout_stat.list_init.store(true, Ordering::SeqCst); + get_current = true; + reload = true; + } + } else { + get_current = true; + } + + if get_current { + self.json_playlist = read_json( + &mut self.config, + &self.player_control, + self.json_playlist.path.clone(), + self.is_terminated.clone(), + seek, + false, + ); + + if !reload { + if let Some(file) = &self.json_playlist.path { + info!("Read playlist: {file}"); + } + + if *self.playout_stat.date.lock().unwrap() != self.json_playlist.date { + self.set_status(self.json_playlist.date.clone()); + } + + self.playout_stat + .current_date + .lock() + .unwrap() + .clone_from(&self.json_playlist.date); + } + + self.player_control + .current_list + .lock() + .unwrap() + .clone_from(&self.json_playlist.program); + + if self.json_playlist.path.is_none() { + trace!("missing playlist"); + + self.current_node = Media::new(0, "", false); + self.playout_stat.list_init.store(true, Ordering::SeqCst); + self.player_control.current_index.store(0, Ordering::SeqCst); + } + } + } + + // Check if day is past and it is time for a new playlist. + fn check_for_playlist(&mut self, seek: bool) -> bool { + let (delta, total_delta) = get_delta(&self.config, &time_in_seconds()); + let mut next = false; + + let duration = if self.current_node.duration >= self.current_node.out { + self.current_node.duration + } else { + // maybe out is longer to be able to loop + self.current_node.out + }; + + let node_index = self.current_node.index.unwrap_or_default(); + + let mut next_start = + self.current_node.begin.unwrap_or_default() - self.start_sec + duration + delta; + + if node_index > 0 + && node_index == self.player_control.current_list.lock().unwrap().len() - 1 + { + next_start += self.config.general.stop_threshold; + } + + trace!( + "delta: {delta} | total_delta: {total_delta}, index: {node_index} \nnext_start: {next_start} | end_sec: {} | source {}", + self.end_sec, + self.current_node.source + ); + + // Check if we over the target length or we are close to it, if so we load the next playlist. + if !self.config.playlist.infinit + && (next_start >= self.end_sec + || is_close(total_delta, 0.0, 2.0) + || is_close(total_delta, self.end_sec, 2.0)) + { + trace!("get next day"); + next = true; + + self.json_playlist = read_json( + &mut self.config, + &self.player_control, + None, + self.is_terminated.clone(), + false, + true, + ); + + if let Some(file) = &self.json_playlist.path { + info!("Read next playlist: {file}"); + } + + self.playout_stat.list_init.store(false, Ordering::SeqCst); + self.set_status(self.json_playlist.date.clone()); + + self.player_control + .current_list + .lock() + .unwrap() + .clone_from(&self.json_playlist.program); + self.player_control.current_index.store(0, Ordering::SeqCst); + } else { + self.load_or_update_playlist(seek) + } + + next + } + + fn set_status(&mut self, date: String) { + if *self.playout_stat.date.lock().unwrap() != date + && *self.playout_stat.time_shift.lock().unwrap() != 0.0 + { + info!("Reset playout status"); + } + + self.playout_stat + .current_date + .lock() + .unwrap() + .clone_from(&date); + *self.playout_stat.time_shift.lock().unwrap() = 0.0; + + if let Err(e) = fs::write( + &self.config.general.stat_file, + serde_json::to_string(&json!({ + "time_shift": 0.0, + "date": date, + })) + .unwrap(), + ) { + error!("Unable to write status file: {e}"); + }; + } + + // Check if last and/or next clip is a advertisement. + fn last_next_ad(&mut self, node: &mut Media) { + let index = self.player_control.current_index.load(Ordering::SeqCst); + let current_list = self.player_control.current_list.lock().unwrap(); + + if index + 1 < current_list.len() && ¤t_list[index + 1].category == "advertisement" { + node.next_ad = true; + } + + if index > 0 + && index < current_list.len() + && ¤t_list[index - 1].category == "advertisement" + { + node.last_ad = true; + } + } + + // Get current time and when we are before start time, + // we add full seconds of a day to it. + fn get_current_time(&mut self) -> f64 { + let mut time_sec = time_in_seconds(); + + if time_sec < self.start_sec { + time_sec += 86400.0 // self.config.playlist.length_sec.unwrap(); + } + + time_sec + } + + // On init or reload we need to seek for the current clip. + fn get_current_clip(&mut self) { + let mut time_sec = self.get_current_time(); + let shift = *self.playout_stat.time_shift.lock().unwrap(); + + if shift != 0.0 { + info!("Shift playlist start for {shift:.3} seconds"); + time_sec += shift; + } + + if self.config.playlist.infinit + && self.json_playlist.length.unwrap() < 86400.0 + && time_sec > self.json_playlist.length.unwrap() + self.start_sec + { + self.recalculate_begin(true) + } + + for (i, item) in self + .player_control + .current_list + .lock() + .unwrap() + .iter() + .enumerate() + { + if item.begin.unwrap() + item.out - item.seek > time_sec { + self.playout_stat.list_init.store(false, Ordering::SeqCst); + self.player_control.current_index.store(i, Ordering::SeqCst); + + break; + } + } + } + + // Prepare init clip. + fn init_clip(&mut self) -> bool { + trace!("init_clip"); + self.get_current_clip(); + let mut is_filler = false; + + if !self.playout_stat.list_init.load(Ordering::SeqCst) { + let time_sec = self.get_current_time(); + let index = self.player_control.current_index.load(Ordering::SeqCst); + let nodes = self.player_control.current_list.lock().unwrap(); + let last_index = nodes.len() - 1; + + // de-instance node to preserve original values in list + let mut node_clone = nodes[index].clone(); + + // Important! When no manual drop is happen here, lock is still active in handle_list_init + drop(nodes); + + trace!("Clip from init: {}", node_clone.source); + + node_clone.seek += time_sec + - (node_clone.begin.unwrap() - *self.playout_stat.time_shift.lock().unwrap()); + + self.last_next_ad(&mut node_clone); + + self.player_control + .current_index + .fetch_add(1, Ordering::SeqCst); + + self.current_node = handle_list_init( + &self.config, + node_clone, + &self.playout_stat, + &self.player_control, + last_index, + ); + + if self + .current_node + .source + .contains(&self.config.storage.path.to_string_lossy().to_string()) + || self.current_node.source.contains("color=c=#121212") + { + is_filler = true; + } + } + + is_filler + } + + fn fill_end(&mut self, total_delta: f64) { + // Fill end from playlist + let index = self.player_control.current_index.load(Ordering::SeqCst); + let mut media = Media::new(index, "", false); + media.begin = Some(time_in_seconds()); + media.duration = total_delta; + media.out = total_delta; + + self.last_next_ad(&mut media); + + self.current_node = gen_source( + &self.config, + media, + &self.playout_stat, + &self.player_control, + 0, + ); + + self.player_control + .current_list + .lock() + .unwrap() + .push(self.current_node.clone()); + + self.current_node.last_ad = self.last_node_ad; + self.current_node + .add_filter(&self.config, &self.playout_stat.chain); + + self.player_control + .current_index + .fetch_add(1, Ordering::SeqCst); + } + + fn recalculate_begin(&mut self, extend: bool) { + debug!("Infinit playlist reaches end, recalculate clip begins."); + + let mut time_sec = time_in_seconds(); + + if extend { + time_sec = self.start_sec + self.json_playlist.length.unwrap(); + } + + self.json_playlist.start_sec = Some(time_sec); + set_defaults(&mut self.json_playlist); + self.player_control + .current_list + .lock() + .unwrap() + .clone_from(&self.json_playlist.program); + } +} + +/// Build the playlist iterator +impl Iterator for CurrentProgram { + type Item = Media; + + fn next(&mut self) -> Option { + self.last_json_path.clone_from(&self.json_playlist.path); + self.last_node_ad = self.current_node.last_ad; + self.check_for_playlist(self.playout_stat.list_init.load(Ordering::SeqCst)); + + if self.playout_stat.list_init.load(Ordering::SeqCst) { + trace!("Init playlist, from next iterator"); + let mut init_clip_is_filler = false; + + if self.json_playlist.path.is_some() { + init_clip_is_filler = self.init_clip(); + } + + if self.playout_stat.list_init.load(Ordering::SeqCst) && !init_clip_is_filler { + // On init load, playlist could be not long enough, or clips are not found + // so we fill the gap with a dummy. + trace!("Init clip is no filler"); + + let mut current_time = time_in_seconds(); + let (_, total_delta) = get_delta(&self.config, ¤t_time); + + if self.start_sec > current_time { + current_time += self.end_sec + 1.0; + } + + let mut last_index = 0; + let length = self.player_control.current_list.lock().unwrap().len(); + + if length > 0 { + last_index = length - 1; + } + + let mut media = Media::new(length, "", false); + media.begin = Some(current_time); + media.duration = total_delta; + media.out = total_delta; + + self.last_next_ad(&mut media); + + self.current_node = gen_source( + &self.config, + media, + &self.playout_stat, + &self.player_control, + last_index, + ); + } + + return Some(self.current_node.clone()); + } + + if self.player_control.current_index.load(Ordering::SeqCst) + < self.player_control.current_list.lock().unwrap().len() + { + // get next clip from current playlist + + let mut is_last = false; + let index = self.player_control.current_index.load(Ordering::SeqCst); + let node_list = self.player_control.current_list.lock().unwrap(); + let mut node = node_list[index].clone(); + let last_index = node_list.len() - 1; + + drop(node_list); + + if index == last_index { + is_last = true + } + + self.last_next_ad(&mut node); + + self.current_node = timed_source( + node, + &self.config, + is_last, + &self.playout_stat, + &self.player_control, + last_index, + ); + + self.player_control + .current_index + .fetch_add(1, Ordering::SeqCst); + + Some(self.current_node.clone()) + } else { + let (_, total_delta) = get_delta(&self.config, &self.start_sec); + + if !self.config.playlist.infinit + && self.last_json_path == self.json_playlist.path + && total_delta.abs() > 1.0 + { + // Playlist is to early finish, + // and if we have to fill it with a placeholder. + trace!("Total delta on list end: {total_delta}"); + + self.fill_end(total_delta); + + return Some(self.current_node.clone()); + } + // Get first clip from next playlist. + + let c_list = self.player_control.current_list.lock().unwrap(); + let mut first_node = c_list[0].clone(); + + drop(c_list); + + if self.config.playlist.infinit { + self.recalculate_begin(false) + } + + self.player_control.current_index.store(0, Ordering::SeqCst); + self.last_next_ad(&mut first_node); + first_node.last_ad = self.last_node_ad; + + self.current_node = gen_source( + &self.config, + first_node, + &self.playout_stat, + &self.player_control, + 0, + ); + + self.player_control.current_index.store(1, Ordering::SeqCst); + + Some(self.current_node.clone()) + } + } +} + +/// Prepare input clip: +/// +/// - check begin and length from clip +/// - return clip only if we are in 24 hours time range +fn timed_source( + node: Media, + config: &PlayoutConfig, + last: bool, + playout_stat: &PlayoutStatus, + player_control: &PlayerControl, + last_index: usize, +) -> Media { + let (delta, total_delta) = get_delta(config, &node.begin.unwrap()); + let mut shifted_delta = delta; + let mut new_node = node.clone(); + new_node.process = Some(false); + + trace!("Node begin: {}", node.begin.unwrap()); + trace!("timed source is last: {last}"); + + if config.playlist.length.contains(':') { + let time_shift = playout_stat.time_shift.lock().unwrap(); + + if *playout_stat.current_date.lock().unwrap() == *playout_stat.date.lock().unwrap() + && *time_shift != 0.0 + { + shifted_delta = delta - *time_shift; + + debug!("Delta: {shifted_delta:.3}, shifted: {delta:.3}"); + } else { + debug!("Delta: {shifted_delta:.3}"); + } + + if config.general.stop_threshold > 0.0 + && shifted_delta.abs() > config.general.stop_threshold + { + error!("Clip begin out of sync for {delta:.3} seconds."); + + new_node.cmd = None; + + return new_node; + } + } + + if (total_delta > node.out - node.seek && !last) + || node.index.unwrap() < 2 + || !config.playlist.length.contains(':') + || config.playlist.infinit + { + // when we are in the 24 hour range, get the clip + new_node.process = Some(true); + new_node = gen_source(config, node, playout_stat, player_control, last_index); + } else if total_delta <= 0.0 { + info!("Begin is over play time, skip: {}", node.source); + } else if total_delta < node.duration - node.seek || last { + new_node = handle_list_end( + config, + node, + total_delta, + playout_stat, + player_control, + last_index, + ); + } + + new_node +} + +fn duplicate_for_seek_and_loop(node: &mut Media, player_control: &PlayerControl) { + warn!("Clip loops and has seek value: duplicate clip to separate loop and seek."); + let mut nodes = player_control.current_list.lock().unwrap(); + let index = node.index.unwrap_or_default(); + + let mut node_duplicate = node.clone(); + node_duplicate.seek = 0.0; + let orig_seek = node.seek; + node.out = node.duration; + + if node.seek > node.duration { + node.seek %= node.duration; + + node_duplicate.out = node_duplicate.out - orig_seek - (node.out - node.seek); + } else { + node_duplicate.out -= node_duplicate.duration; + } + + if node.seek == node.out { + node.seek = node_duplicate.seek; + node.out = node_duplicate.out; + } else if node_duplicate.out - node_duplicate.seek > 1.2 { + node_duplicate.begin = + Some(node_duplicate.begin.unwrap_or_default() + (node.out - node.seek)); + + nodes.insert(index + 1, node_duplicate); + + for (i, item) in nodes.iter_mut().enumerate() { + item.index = Some(i); + } + } +} + +/// Generate the source CMD, or when clip not exist, get a dummy. +pub fn gen_source( + config: &PlayoutConfig, + mut node: Media, + playout_stat: &PlayoutStatus, + player_control: &PlayerControl, + last_index: usize, +) -> Media { + let node_index = node.index.unwrap_or_default(); + let mut duration = node.out - node.seek; + + if duration < 1.0 { + warn!("Clip is less then 1 second long ({duration:.3}), adjust length."); + + duration = 1.2; + + if node.seek > 1.0 { + node.seek -= 1.2; + } else { + node.out = 1.2; + } + } + + trace!("Clip new length: {duration}, duration: {}", node.duration); + + if node.probe.is_none() && !node.source.is_empty() { + if let Err(e) = node.add_probe(true) { + trace!("{e:?}"); + }; + } else { + trace!("Node has a probe...") + } + + // separate if condition, because of node.add_probe() in last condition + if node.probe.is_some() { + if node + .source + .rsplit_once('.') + .map(|(_, e)| e.to_lowercase()) + .filter(|c| IMAGE_FORMAT.contains(&c.as_str())) + .is_some() + { + node.cmd = Some(loop_image(&node)); + } else { + if node.seek > 0.0 && node.out > node.duration { + duplicate_for_seek_and_loop(&mut node, player_control); + } + + node.cmd = Some(seek_and_length(&mut node)); + } + } else { + trace!("clip index: {node_index} | last index: {last_index}"); + + // Last index is the index from the last item from the node list. + if node_index < last_index { + error!("Source not found: {}", node.source); + } + + let mut filler_list = vec![]; + + match player_control.filler_list.try_lock() { + Ok(list) => filler_list = list.to_vec(), + Err(e) => error!("Lock filler list error: {e}"), + } + + // Set list_init to true, to stay in sync. + playout_stat.list_init.store(true, Ordering::SeqCst); + + if config.storage.filler.is_dir() && !filler_list.is_empty() { + let filler_index = player_control.filler_index.fetch_add(1, Ordering::SeqCst); + let mut filler_media = filler_list[filler_index].clone(); + + trace!("take filler: {}", filler_media.source); + + if filler_index == filler_list.len() - 1 { + // reset index for next round + player_control.filler_index.store(0, Ordering::SeqCst) + } + + if filler_media.probe.is_none() { + if let Err(e) = filler_media.add_probe(false) { + error!("{e:?}"); + }; + } + + if filler_media.duration > duration { + filler_media.out = duration; + } + + node.source = filler_media.source; + node.seek = 0.0; + node.out = filler_media.out; + node.duration = filler_media.duration; + node.cmd = Some(loop_filler(&node)); + node.probe = filler_media.probe; + } else { + match MediaProbe::new(&config.storage.filler.to_string_lossy()) { + Ok(probe) => { + if config + .storage + .filler + .to_string_lossy() + .to_string() + .rsplit_once('.') + .map(|(_, e)| e.to_lowercase()) + .filter(|c| IMAGE_FORMAT.contains(&c.as_str())) + .is_some() + { + node.source = config.storage.filler.clone().to_string_lossy().to_string(); + node.cmd = Some(loop_image(&node)); + node.probe = Some(probe); + } else if let Some(filler_duration) = probe + .clone() + .format + .duration + .and_then(|d| d.parse::().ok()) + { + // Create placeholder from config filler. + let mut filler_out = filler_duration; + + if filler_duration > duration { + filler_out = duration; + } + + node.source = config.storage.filler.clone().to_string_lossy().to_string(); + node.seek = 0.0; + node.out = filler_out; + node.duration = filler_duration; + node.cmd = Some(loop_filler(&node)); + node.probe = Some(probe); + } else { + // Create colored placeholder. + let (source, cmd) = gen_dummy(config, duration); + node.source = source; + node.cmd = Some(cmd); + } + } + Err(e) => { + // Create colored placeholder. + error!("Filler error: {e}"); + + let mut dummy_duration = 60.0; + + if dummy_duration > duration { + dummy_duration = duration; + } + + let (source, cmd) = gen_dummy(config, dummy_duration); + node.seek = 0.0; + node.out = dummy_duration; + node.duration = dummy_duration; + node.source = source; + node.cmd = Some(cmd); + } + } + } + + warn!( + "Generate filler with {:.2} seconds length!", + node.out + ); + } + + node.add_filter(config, &playout_stat.chain); + + trace!( + "return gen_source: {}, seek: {}, out: {}", + node.source, + node.seek, + node.out, + ); + + node +} + +/// Handle init clip, but this clip can be the last one in playlist, +/// this we have to figure out and calculate the right length. +fn handle_list_init( + config: &PlayoutConfig, + mut node: Media, + playout_stat: &PlayoutStatus, + player_control: &PlayerControl, + last_index: usize, +) -> Media { + debug!("Playlist init"); + let (_, total_delta) = get_delta(config, &node.begin.unwrap()); + + if !config.playlist.infinit && node.out - node.seek > total_delta { + node.out = total_delta + node.seek; + } + + gen_source(config, node, playout_stat, player_control, last_index) +} + +/// when we come to last clip in playlist, +/// or when we reached total playtime, +/// we end up here +fn handle_list_end( + config: &PlayoutConfig, + mut node: Media, + total_delta: f64, + playout_stat: &PlayoutStatus, + player_control: &PlayerControl, + last_index: usize, +) -> Media { + debug!("Last clip from day"); + + let mut out = if node.seek > 0.0 { + node.seek + total_delta + } else { + if node.duration > total_delta { + warn!("Adjust clip duration to: {total_delta:.2}"); + } + + total_delta + }; + + // out can't be longer then duration + if out > node.duration { + out = node.duration + } + + if node.duration > total_delta && total_delta > 1.0 && node.duration - node.seek >= total_delta + { + node.out = out; + } else { + warn!("Playlist is not long enough: {total_delta:.2} seconds needed"); + } + + node.process = Some(true); + + gen_source(config, node, playout_stat, player_control, last_index) +} diff --git a/ffplayout/src/player/mod.rs b/ffplayout/src/player/mod.rs index cb9e0ac5..5447c332 100644 --- a/ffplayout/src/player/mod.rs +++ b/ffplayout/src/player/mod.rs @@ -1 +1,5 @@ pub mod controller; +pub mod filter; +pub mod input; +pub mod output; +pub mod utils; diff --git a/ffplayout/src/player/output/desktop.rs b/ffplayout/src/player/output/desktop.rs new file mode 100644 index 00000000..b76054d2 --- /dev/null +++ b/ffplayout/src/player/output/desktop.rs @@ -0,0 +1,88 @@ +use std::process::{self, Command, Stdio}; + +use simplelog::*; + +use ffplayout_lib::{filter::v_drawtext, utils::PlayoutConfig, vec_strings}; + +/// Desktop Output +/// +/// Instead of streaming, we run a ffplay instance and play on desktop. +pub fn output(config: &PlayoutConfig, log_format: &str) -> process::Child { + let mut enc_filter: Vec = vec![]; + + let mut enc_cmd = vec_strings!["-hide_banner", "-nostats", "-v", log_format]; + + if let Some(encoder_input_cmd) = config + .advanced + .as_ref() + .and_then(|a| a.encoder.input_cmd.clone()) + { + enc_cmd.append(&mut encoder_input_cmd.clone()); + } + + enc_cmd.append(&mut vec_strings![ + "-autoexit", + "-i", + "pipe:0", + "-window_title", + "ffplayout" + ]); + + if let Some(mut cmd) = config.out.output_cmd.clone() { + if !cmd.iter().any(|i| { + [ + "-c", + "-c:v", + "-c:v:0", + "-b:v", + "-b:v:0", + "-vcodec", + "-c:a", + "-acodec", + "-crf", + "-map", + "-filter_complex", + ] + .contains(&i.as_str()) + }) { + enc_cmd.append(&mut cmd); + } else { + warn!("ffplay doesn't support given output parameters, they will be skipped!"); + } + } + + if config.text.add_text && !config.text.text_from_filename && !config.processing.audio_only { + if let Some(socket) = config.text.zmq_stream_socket.clone() { + debug!( + "Using drawtext filter, listening on address: {}", + socket + ); + + let mut filter: String = "null,".to_string(); + filter.push_str(v_drawtext::filter_node(config, None, &None).as_str()); + enc_filter = vec!["-vf".to_string(), filter]; + } + } + + enc_cmd.append(&mut enc_filter); + + debug!( + "Encoder CMD: \"ffplay {}\"", + enc_cmd.join(" ") + ); + + let enc_proc = match Command::new("ffplay") + .args(enc_cmd) + .stdin(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn() + { + Err(e) => { + error!("couldn't spawn encoder process: {e}"); + panic!("couldn't spawn encoder process: {e}") + } + Ok(proc) => proc, + }; + + enc_proc +} diff --git a/ffplayout/src/player/output/hls.rs b/ffplayout/src/player/output/hls.rs new file mode 100644 index 00000000..883073f6 --- /dev/null +++ b/ffplayout/src/player/output/hls.rs @@ -0,0 +1,276 @@ +/* +This module write the files compression directly to a hls (m3u8) playlist, +without pre- and post-processing. + +Example config: + +out: + output_param: >- + ... + + -flags +cgop + -f hls + -hls_time 6 + -hls_list_size 600 + -hls_flags append_list+delete_segments+omit_endlist+program_date_time + -hls_segment_filename /var/www/html/live/stream-%d.ts /var/www/html/live/stream.m3u8 + +*/ + +use std::{ + io::{BufRead, BufReader, Error}, + process::{exit, Command, Stdio}, + sync::atomic::Ordering, + thread::{self, sleep}, + time::Duration, +}; + +use simplelog::*; + +use crate::player::{ + controller::{PlayerControl, PlayoutStatus, ProcessControl, ProcessUnit::*}, + input::source_generator, + utils::{ + get_delta, prepare_output_cmd, sec_to_time, stderr_reader, test_tcp_port, valid_stream, + Media, + }, +}; +use crate::utils::{config::PlayoutConfig, logging::log_line, task_runner}; +use crate::vec_strings; + +/// Ingest Server for HLS +fn ingest_to_hls_server( + config: PlayoutConfig, + playout_stat: PlayoutStatus, + proc_control: ProcessControl, +) -> Result<(), Error> { + let playlist_init = playout_stat.list_init; + + let mut server_prefix = vec_strings!["-hide_banner", "-nostats", "-v", "level+info"]; + let stream_input = config.ingest.input_cmd.clone().unwrap(); + let mut dummy_media = Media::new(0, "Live Stream", false); + dummy_media.unit = Ingest; + + if let Some(ingest_input_cmd) = config + .advanced + .as_ref() + .and_then(|a| a.ingest.input_cmd.clone()) + { + server_prefix.append(&mut ingest_input_cmd.clone()); + } + + server_prefix.append(&mut stream_input.clone()); + + let mut is_running; + + if let Some(url) = stream_input.iter().find(|s| s.contains("://")) { + if !test_tcp_port(url) { + proc_control.stop_all(); + exit(1); + } + + info!("Start ingest server, listening on: {url}"); + }; + + loop { + dummy_media.add_filter(&config, &playout_stat.chain); + let server_cmd = prepare_output_cmd(&config, server_prefix.clone(), &dummy_media.filter); + + debug!( + "Server CMD: \"ffmpeg {}\"", + server_cmd.join(" ") + ); + + let proc_ctl = proc_control.clone(); + let mut server_proc = match Command::new("ffmpeg") + .args(server_cmd.clone()) + .stderr(Stdio::piped()) + .spawn() + { + Err(e) => { + error!("couldn't spawn ingest server: {e}"); + panic!("couldn't spawn ingest server: {e}"); + } + Ok(proc) => proc, + }; + + let server_err = BufReader::new(server_proc.stderr.take().unwrap()); + *proc_control.server_term.lock().unwrap() = Some(server_proc); + is_running = false; + + for line in server_err.lines() { + let line = line?; + + if line.contains("rtmp") && line.contains("Unexpected stream") && !valid_stream(&line) { + if let Err(e) = proc_ctl.stop(Ingest) { + error!("{e}"); + }; + } + + if !is_running { + proc_control.server_is_running.store(true, Ordering::SeqCst); + playlist_init.store(true, Ordering::SeqCst); + is_running = true; + + info!("Switch from {} to live ingest", config.processing.mode); + + if let Err(e) = proc_control.stop(Decoder) { + error!("{e}"); + } + } + + log_line(&line, &config.logging.ffmpeg_level); + } + + if proc_control.server_is_running.load(Ordering::SeqCst) { + info!("Switch from live ingest to {}", config.processing.mode); + } + + proc_control + .server_is_running + .store(false, Ordering::SeqCst); + + if let Err(e) = proc_control.wait(Ingest) { + error!("{e}") + } + + if proc_control.is_terminated.load(Ordering::SeqCst) { + break; + } + } + + Ok(()) +} + +/// HLS Writer +/// +/// Write with single ffmpeg instance directly to a HLS playlist. +pub fn write_hls( + config: &PlayoutConfig, + player_control: PlayerControl, + playout_stat: PlayoutStatus, + proc_control: ProcessControl, +) { + let config_clone = config.clone(); + let ff_log_format = format!("level+{}", config.logging.ffmpeg_level.to_lowercase()); + let play_stat = playout_stat.clone(); + let play_stat2 = playout_stat.clone(); + let proc_control_c = proc_control.clone(); + + let get_source = source_generator( + config.clone(), + &player_control, + playout_stat, + proc_control.is_terminated.clone(), + ); + + // spawn a thread for ffmpeg ingest server and create a channel for package sending + if config.ingest.enable { + thread::spawn(move || ingest_to_hls_server(config_clone, play_stat, proc_control_c)); + } + + for node in get_source { + *player_control.current_media.lock().unwrap() = Some(node.clone()); + let ignore = config.logging.ignore_lines.clone(); + + let mut cmd = match &node.cmd { + Some(cmd) => cmd.clone(), + None => break, + }; + + if !node.process.unwrap() { + continue; + } + + info!( + "Play for {}: {}", + sec_to_time(node.out - node.seek), + node.source + ); + + if config.task.enable { + if config.task.path.is_file() { + let task_config = config.clone(); + let task_node = node.clone(); + let server_running = proc_control.server_is_running.load(Ordering::SeqCst); + let stat = play_stat2.clone(); + + thread::spawn(move || { + task_runner::run(task_config, task_node, stat, server_running) + }); + } else { + error!( + "{:?} executable not exists!", + config.task.path + ); + } + } + + let mut enc_prefix = vec_strings!["-hide_banner", "-nostats", "-v", &ff_log_format]; + + if let Some(encoder_input_cmd) = config + .advanced + .as_ref() + .and_then(|a| a.encoder.input_cmd.clone()) + { + enc_prefix.append(&mut encoder_input_cmd.clone()); + } + + let mut read_rate = 1.0; + + if let Some(begin) = &node.begin { + let (delta, _) = get_delta(config, begin); + let duration = node.out - node.seek; + let speed = duration / (duration + delta); + + if node.seek == 0.0 + && speed > 0.0 + && speed < 1.3 + && delta < config.general.stop_threshold + { + read_rate = speed; + } + } + + enc_prefix.append(&mut vec_strings!["-readrate", read_rate]); + + enc_prefix.append(&mut cmd); + let enc_cmd = prepare_output_cmd(config, enc_prefix, &node.filter); + + debug!( + "HLS writer CMD: \"ffmpeg {}\"", + enc_cmd.join(" ") + ); + + let mut dec_proc = match Command::new("ffmpeg") + .args(enc_cmd) + .stderr(Stdio::piped()) + .spawn() + { + Ok(proc) => proc, + Err(e) => { + error!("couldn't spawn ffmpeg process: {e}"); + panic!("couldn't spawn ffmpeg process: {e}") + } + }; + + let enc_err = BufReader::new(dec_proc.stderr.take().unwrap()); + *proc_control.decoder_term.lock().unwrap() = Some(dec_proc); + + if let Err(e) = stderr_reader(enc_err, ignore, Decoder, proc_control.clone()) { + error!("{e:?}") + }; + + if let Err(e) = proc_control.wait(Decoder) { + error!("{e}"); + } + + while proc_control.server_is_running.load(Ordering::SeqCst) { + sleep(Duration::from_secs(1)); + } + } + + sleep(Duration::from_secs(1)); + + proc_control.stop_all(); +} diff --git a/ffplayout/src/player/output/mod.rs b/ffplayout/src/player/output/mod.rs new file mode 100644 index 00000000..8a97d145 --- /dev/null +++ b/ffplayout/src/player/output/mod.rs @@ -0,0 +1,263 @@ +use std::{ + io::{prelude::*, BufReader, BufWriter, Read}, + process::{Command, Stdio}, + sync::atomic::Ordering, + thread::{self, sleep}, + time::Duration, +}; + +use crossbeam_channel::bounded; +use simplelog::*; + +mod desktop; +mod hls; +mod null; +mod stream; + +pub use hls::write_hls; + +use crate::player::input::{ingest_server, source_generator}; +use crate::utils::{config::PlayoutConfig, task_runner}; + +use ffplayout_lib::utils::{ + sec_to_time, stderr_reader, OutputMode::*, PlayerControl, PlayoutStatus, ProcessControl, + ProcessUnit::*, +}; +use ffplayout_lib::vec_strings; + +/// Player +/// +/// Here we create the input file loop, from playlist, or folder source. +/// Then we read the stdout from the reader ffmpeg instance +/// and write it to the stdin from the streamer ffmpeg instance. +/// If it is configured we also fire up a ffmpeg ingest server instance, +/// for getting live feeds. +/// When a live ingest arrive, it stops the current playing and switch to the live source. +/// When ingest stops, it switch back to playlist/folder mode. +pub fn player( + config: &PlayoutConfig, + play_control: &PlayerControl, + playout_stat: PlayoutStatus, + proc_control: ProcessControl, +) { + let config_clone = config.clone(); + let ff_log_format = format!("level+{}", config.logging.ffmpeg_level.to_lowercase()); + let ignore_enc = config.logging.ignore_lines.clone(); + let mut buffer = [0; 65088]; + let mut live_on = false; + let playlist_init = playout_stat.list_init.clone(); + let play_stat = playout_stat.clone(); + + // get source iterator + let node_sources = source_generator( + config.clone(), + play_control, + playout_stat, + proc_control.is_terminated.clone(), + ); + + // get ffmpeg output instance + let mut enc_proc = match config.out.mode { + Desktop => desktop::output(config, &ff_log_format), + Null => null::output(config, &ff_log_format), + Stream => stream::output(config, &ff_log_format), + _ => panic!("Output mode doesn't exists!"), + }; + + let mut enc_writer = BufWriter::new(enc_proc.stdin.take().unwrap()); + let enc_err = BufReader::new(enc_proc.stderr.take().unwrap()); + + *proc_control.encoder_term.lock().unwrap() = Some(enc_proc); + let enc_p_ctl = proc_control.clone(); + + // spawn a thread to log ffmpeg output error messages + let error_encoder_thread = + thread::spawn(move || stderr_reader(enc_err, ignore_enc, Encoder, enc_p_ctl)); + + let proc_control_c = proc_control.clone(); + let mut ingest_receiver = None; + + // spawn a thread for ffmpeg ingest server and create a channel for package sending + if config.ingest.enable { + let (ingest_sender, rx) = bounded(96); + ingest_receiver = Some(rx); + thread::spawn(move || ingest_server(config_clone, ingest_sender, proc_control_c)); + } + + 'source_iter: for node in node_sources { + *play_control.current_media.lock().unwrap() = Some(node.clone()); + let ignore_dec = config.logging.ignore_lines.clone(); + + if proc_control.is_terminated.load(Ordering::SeqCst) { + debug!("Playout is terminated, break out from source loop"); + break; + } + + trace!("Decoder CMD: {:?}", node.cmd); + + let mut cmd = match &node.cmd { + Some(cmd) => cmd.clone(), + None => break, + }; + + if !node.process.unwrap() { + // process true/false differs from node.cmd = None in that way, + // that source is valid but to show for playing, + // so better skip it and jump to the next one. + continue; + } + + let c_index = if cfg!(debug_assertions) { + format!( + " ({}/{})", + node.index.unwrap() + 1, + play_control.current_list.lock().unwrap().len() + ) + } else { + String::new() + }; + + info!( + "Play for {}{c_index}: {} {}", + sec_to_time(node.out - node.seek), + node.source, + node.audio + ); + + if config.task.enable { + if config.task.path.is_file() { + let task_config = config.clone(); + let task_node = node.clone(); + let server_running = proc_control.server_is_running.load(Ordering::SeqCst); + let stat = play_stat.clone(); + + thread::spawn(move || { + task_runner::run(task_config, task_node, stat, server_running) + }); + } else { + error!( + "{:?} executable not exists!", + config.task.path + ); + } + } + + let mut dec_cmd = vec_strings!["-hide_banner", "-nostats", "-v", &ff_log_format]; + + if let Some(decoder_input_cmd) = config + .advanced + .as_ref() + .and_then(|a| a.decoder.input_cmd.clone()) + { + dec_cmd.append(&mut decoder_input_cmd.clone()); + } + + dec_cmd.append(&mut cmd); + + if let Some(mut filter) = node.filter { + dec_cmd.append(&mut filter.cmd()); + dec_cmd.append(&mut filter.map()); + } + + if let Some(mut cmd) = config.processing.cmd.clone() { + dec_cmd.append(&mut cmd); + } + + debug!( + "Decoder CMD: \"ffmpeg {}\"", + dec_cmd.join(" ") + ); + + // create ffmpeg decoder instance, for reading the input files + let mut dec_proc = match Command::new("ffmpeg") + .args(dec_cmd) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn() + { + Ok(proc) => proc, + Err(e) => { + error!("couldn't spawn decoder process: {e}"); + panic!("couldn't spawn decoder process: {e}") + } + }; + + let mut dec_reader = BufReader::new(dec_proc.stdout.take().unwrap()); + let dec_err = BufReader::new(dec_proc.stderr.take().unwrap()); + + *proc_control.decoder_term.lock().unwrap() = Some(dec_proc); + let dec_p_ctl = proc_control.clone(); + + let error_decoder_thread = + thread::spawn(move || stderr_reader(dec_err, ignore_dec, Decoder, dec_p_ctl)); + + loop { + // when server is running, read from it + if proc_control.server_is_running.load(Ordering::SeqCst) { + if !live_on { + info!("Switch from {} to live ingest", config.processing.mode); + + if let Err(e) = proc_control.stop(Decoder) { + error!("{e}") + } + + live_on = true; + playlist_init.store(true, Ordering::SeqCst); + } + + for rx in ingest_receiver.as_ref().unwrap().try_iter() { + if let Err(e) = enc_writer.write(&rx.1[..rx.0]) { + error!("Error from Ingest: {:?}", e); + + break 'source_iter; + }; + } + // read from decoder instance + } else { + if live_on { + info!("Switch from live ingest to {}", config.processing.mode); + + live_on = false; + break; + } + + let dec_bytes_len = match dec_reader.read(&mut buffer[..]) { + Ok(length) => length, + Err(e) => { + error!("Reading error from decoder: {e:?}"); + + break 'source_iter; + } + }; + + if dec_bytes_len > 0 { + if let Err(e) = enc_writer.write(&buffer[..dec_bytes_len]) { + error!("Encoder write error: {}", e.kind()); + + break 'source_iter; + }; + } else { + break; + } + } + } + + if let Err(e) = proc_control.wait(Decoder) { + error!("{e}") + } + + if let Err(e) = error_decoder_thread.join() { + error!("{e:?}"); + }; + } + + trace!("Out of source loop"); + + sleep(Duration::from_secs(1)); + + proc_control.stop_all(); + + if let Err(e) = error_encoder_thread.join() { + error!("{e:?}"); + }; +} diff --git a/ffplayout/src/player/output/null.rs b/ffplayout/src/player/output/null.rs new file mode 100644 index 00000000..0023ee0f --- /dev/null +++ b/ffplayout/src/player/output/null.rs @@ -0,0 +1,52 @@ +use std::process::{self, Command, Stdio}; + +use simplelog::*; + +use crate::player::utils::prepare_output_cmd; +use ffplayout_lib::{ + utils::{Media, PlayoutConfig, ProcessUnit::*}, + vec_strings, +}; + +/// Desktop Output +/// +/// Instead of streaming, we run a ffplay instance and play on desktop. +pub fn output(config: &PlayoutConfig, log_format: &str) -> process::Child { + let mut media = Media::new(0, "", false); + media.unit = Encoder; + media.add_filter(config, &None); + + let mut enc_prefix = vec_strings!["-hide_banner", "-nostats", "-v", log_format]; + + if let Some(input_cmd) = config + .advanced + .as_ref() + .and_then(|a| a.encoder.input_cmd.clone()) + { + enc_prefix.append(&mut input_cmd.clone()); + } + + enc_prefix.append(&mut vec_strings!["-re", "-i", "pipe:0"]); + + let enc_cmd = prepare_output_cmd(config, enc_prefix, &media.filter); + + debug!( + "Encoder CMD: \"ffmpeg {}\"", + enc_cmd.join(" ") + ); + + let enc_proc = match Command::new("ffmpeg") + .args(enc_cmd) + .stdin(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn() + { + Err(e) => { + error!("couldn't spawn encoder process: {e}"); + panic!("couldn't spawn encoder process: {e}") + } + Ok(proc) => proc, + }; + + enc_proc +} diff --git a/ffplayout/src/player/output/stream.rs b/ffplayout/src/player/output/stream.rs new file mode 100644 index 00000000..d0a85af1 --- /dev/null +++ b/ffplayout/src/player/output/stream.rs @@ -0,0 +1,52 @@ +use std::process::{self, Command, Stdio}; + +use simplelog::*; + +use crate::player::utils::prepare_output_cmd; +use ffplayout_lib::{ + utils::{Media, PlayoutConfig, ProcessUnit::*}, + vec_strings, +}; + +/// Streaming Output +/// +/// Prepare the ffmpeg command for streaming output +pub fn output(config: &PlayoutConfig, log_format: &str) -> process::Child { + let mut media = Media::new(0, "", false); + media.unit = Encoder; + media.add_filter(config, &None); + + let mut enc_prefix = vec_strings!["-hide_banner", "-nostats", "-v", log_format]; + + if let Some(input_cmd) = config + .advanced + .as_ref() + .and_then(|a| a.encoder.input_cmd.clone()) + { + enc_prefix.append(&mut input_cmd.clone()); + } + + enc_prefix.append(&mut vec_strings!["-re", "-i", "pipe:0"]); + + let enc_cmd = prepare_output_cmd(config, enc_prefix, &media.filter); + + debug!( + "Encoder CMD: \"ffmpeg {}\"", + enc_cmd.join(" ") + ); + + let enc_proc = match Command::new("ffmpeg") + .args(enc_cmd) + .stdin(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn() + { + Err(e) => { + error!("couldn't spawn encoder process: {e}"); + panic!("couldn't spawn encoder process: {e}") + } + Ok(proc) => proc, + }; + + enc_proc +} diff --git a/ffplayout/src/player/utils/json_serializer.rs b/ffplayout/src/player/utils/json_serializer.rs new file mode 100644 index 00000000..3e0878c6 --- /dev/null +++ b/ffplayout/src/player/utils/json_serializer.rs @@ -0,0 +1,205 @@ +use serde::{Deserialize, Serialize}; +use std::{ + fs::File, + path::Path, + sync::{atomic::AtomicBool, Arc}, + thread, +}; + +use simplelog::*; + +use crate::utils::{ + get_date, is_remote, modified_time, time_from_header, validate_playlist, Media, PlayerControl, + PlayoutConfig, DUMMY_LEN, +}; + +/// This is our main playlist object, it holds all necessary information for the current day. +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct JsonPlaylist { + #[serde(default = "default_channel")] + pub channel: String, + pub date: String, + + #[serde(skip_serializing, skip_deserializing)] + pub start_sec: Option, + + #[serde(skip_serializing, skip_deserializing)] + pub length: Option, + + #[serde(skip_serializing, skip_deserializing)] + pub path: Option, + + #[serde(skip_serializing, skip_deserializing)] + pub modified: Option, + + pub program: Vec, +} + +impl JsonPlaylist { + pub fn new(date: String, start: f64) -> Self { + let mut media = Media::new(0, "", false); + media.begin = Some(start); + media.title = None; + media.duration = DUMMY_LEN; + media.out = DUMMY_LEN; + Self { + channel: "Channel 1".into(), + date, + start_sec: Some(start), + length: Some(86400.0), + path: None, + modified: None, + program: vec![media], + } + } +} + +impl PartialEq for JsonPlaylist { + fn eq(&self, other: &Self) -> bool { + self.channel == other.channel && self.date == other.date && self.program == other.program + } +} + +impl Eq for JsonPlaylist {} + +fn default_channel() -> String { + "Channel 1".to_string() +} + +pub fn set_defaults(playlist: &mut JsonPlaylist) { + let mut start_sec = playlist.start_sec.unwrap(); + let mut length = 0.0; + + // Add extra values to every media clip + for (i, item) in playlist.program.iter_mut().enumerate() { + item.begin = Some(start_sec); + item.index = Some(i); + item.last_ad = false; + item.next_ad = false; + item.process = Some(true); + item.filter = None; + + let dur = item.out - item.seek; + start_sec += dur; + length += dur; + } + + playlist.length = Some(length) +} + +/// Read json playlist file, fills JsonPlaylist struct and set some extra values, +/// which we need to process. +pub fn read_json( + config: &mut PlayoutConfig, + player_control: &PlayerControl, + path: Option, + is_terminated: Arc, + seek: bool, + get_next: bool, +) -> JsonPlaylist { + let config_clone = config.clone(); + let control_clone = player_control.clone(); + let mut playlist_path = config.playlist.path.clone(); + let start_sec = config.playlist.start_sec.unwrap(); + let date = get_date(seek, start_sec, get_next); + + if playlist_path.is_dir() || is_remote(&config.playlist.path.to_string_lossy()) { + let d: Vec<&str> = date.split('-').collect(); + playlist_path = playlist_path + .join(d[0]) + .join(d[1]) + .join(date.clone()) + .with_extension("json"); + } + + let mut current_file = playlist_path.as_path().display().to_string(); + + if let Some(p) = path { + Path::new(&p).clone_into(&mut playlist_path); + current_file = p + } + + if is_remote(¤t_file) { + let response = reqwest::blocking::Client::new().get(¤t_file).send(); + + if let Ok(resp) = response { + if resp.status().is_success() { + let headers = resp.headers().clone(); + + if let Ok(body) = resp.text() { + let mut playlist: JsonPlaylist = match serde_json::from_str(&body) { + Ok(p) => p, + Err(e) => { + error!("Could't read remote json playlist. {e:?}"); + JsonPlaylist::new(date.clone(), start_sec) + } + }; + + playlist.path = Some(current_file); + playlist.start_sec = Some(start_sec); + + if let Some(time) = time_from_header(&headers) { + playlist.modified = Some(time.to_string()); + } + + let list_clone = playlist.clone(); + + if !config.general.skip_validation { + thread::spawn(move || { + validate_playlist( + config_clone, + control_clone, + list_clone, + is_terminated, + ) + }); + } + + set_defaults(&mut playlist); + + return playlist; + } + } + } + } else if playlist_path.is_file() { + let modified = modified_time(¤t_file); + + let f = File::options() + .read(true) + .write(false) + .open(¤t_file) + .expect("Could not open json playlist file."); + let mut playlist: JsonPlaylist = match serde_json::from_reader(f) { + Ok(p) => p, + Err(e) => { + error!("Playlist file not readable! {e}"); + JsonPlaylist::new(date.clone(), start_sec) + } + }; + + // catch empty program list + if playlist.program.is_empty() { + playlist = JsonPlaylist::new(date, start_sec) + } + + playlist.path = Some(current_file); + playlist.start_sec = Some(start_sec); + playlist.modified = modified; + + let list_clone = playlist.clone(); + + if !config.general.skip_validation { + thread::spawn(move || { + validate_playlist(config_clone, control_clone, list_clone, is_terminated) + }); + } + + set_defaults(&mut playlist); + + return playlist; + } + + error!("Playlist {current_file} not exist!"); + + JsonPlaylist::new(date, start_sec) +} diff --git a/ffplayout/src/player/utils/mod.rs b/ffplayout/src/player/utils/mod.rs new file mode 100644 index 00000000..916b5945 --- /dev/null +++ b/ffplayout/src/player/utils/mod.rs @@ -0,0 +1,1137 @@ +use std::{ + ffi::OsStr, + fmt, + fs::{self, metadata, File}, + io::{BufRead, BufReader, Error}, + net::TcpListener, + path::{Path, PathBuf}, + process::{exit, ChildStderr, Command, Stdio}, + str::FromStr, + sync::{Arc, Mutex}, +}; + +use chrono::{prelude::*, TimeDelta}; +use ffprobe::{ffprobe, Stream as FFStream}; +use rand::prelude::*; +use regex::Regex; +use reqwest::header; +use serde::{de::Deserializer, Deserialize, Serialize}; +use serde_json::{json, Map, Value}; +use simplelog::*; + +pub mod json_serializer; + +use crate::db::models::Channel; +use crate::player::{ + controller::{ + PlayoutStatus, + ProcessUnit::{self, *}, + }, + filter::{filter_chains, Filters}, +}; +use crate::utils::{ + config::{OutputMode::HLS, PlayoutConfig, FFMPEG_IGNORE_ERRORS, FFMPEG_UNRECOVERABLE_ERRORS}, + control::ProcessControl, + errors::ProcessError, +}; +pub use json_serializer::{read_json, JsonPlaylist}; + +use crate::vec_strings; + +/// Compare incoming stream name with expecting name, but ignore question mark. +pub fn valid_stream(msg: &str) -> bool { + if let Some((unexpected, expected)) = msg.split_once(',') { + let re = Regex::new(r".*Unexpected stream|expecting|[\s]+|\?$").unwrap(); + let unexpected = re.replace_all(unexpected, ""); + let expected = re.replace_all(expected, ""); + + if unexpected == expected { + return true; + } + } + + false +} + +/// Prepare output parameters +/// +/// Seek for multiple outputs and add mapping for it. +pub fn prepare_output_cmd( + config: &PlayoutConfig, + mut cmd: Vec, + filters: &Option, +) -> Vec { + let mut output_params = config.out.clone().output_cmd.unwrap(); + let mut new_params = vec![]; + let mut count = 0; + let re_v = Regex::new(r"\[?0:v(:0)?\]?").unwrap(); + + if let Some(mut filter) = filters.clone() { + for (i, param) in output_params.iter().enumerate() { + if filter.video_out_link.len() > count && re_v.is_match(param) { + // replace mapping with link from filter struct + new_params.push(filter.video_out_link[count].clone()); + } else { + new_params.push(param.clone()); + } + + // Check if parameter is a output + if i > 0 + && !param.starts_with('-') + && !output_params[i - 1].starts_with('-') + && i < output_params.len() - 1 + { + count += 1; + + if filter.video_out_link.len() > count + && !output_params.contains(&"-map".to_string()) + { + new_params.append(&mut vec_strings![ + "-map", + filter.video_out_link[count].clone() + ]); + + for i in 0..config.processing.audio_tracks { + new_params.append(&mut vec_strings!["-map", format!("0:a:{i}")]); + } + } + } + } + + output_params = new_params; + + cmd.append(&mut filter.cmd()); + + // add mapping at the begin, if needed + if !filter.map().iter().all(|item| output_params.contains(item)) + && filter.output_chain.is_empty() + && filter.video_out_link.is_empty() + { + cmd.append(&mut filter.map()) + } else if &output_params[0] != "-map" && !filter.video_out_link.is_empty() { + cmd.append(&mut vec_strings!["-map", filter.video_out_link[0].clone()]); + + for i in 0..config.processing.audio_tracks { + cmd.append(&mut vec_strings!["-map", format!("0:a:{i}")]); + } + } + } + + cmd.append(&mut output_params); + + cmd +} + +/// map media struct to json object +pub fn get_media_map(media: Media) -> Value { + let mut obj = json!({ + "in": media.seek, + "out": media.out, + "duration": media.duration, + "category": media.category, + "source": media.source, + }); + + if let Some(title) = media.title { + obj.as_object_mut() + .unwrap() + .insert("title".to_string(), Value::String(title)); + } + + obj +} + +/// prepare json object for response +pub fn get_data_map( + config: &PlayoutConfig, + channel: Channel, + media: Media, + playout_stat: &PlayoutStatus, + server_is_running: bool, +) -> Map { + let mut data_map = Map::new(); + let current_time = time_in_seconds(); + let shift = *playout_stat.time_shift.lock().unwrap(); + let begin = media.begin.unwrap_or(0.0) - shift; + let played_time = current_time - begin; + + data_map.insert("index".to_string(), json!(media.index)); + data_map.insert("ingest".to_string(), json!(server_is_running)); + data_map.insert("mode".to_string(), json!(config.processing.mode)); + data_map.insert( + "shift".to_string(), + json!((shift * 1000.0).round() / 1000.0), + ); + data_map.insert( + "elapsed".to_string(), + json!((played_time * 1000.0).round() / 1000.0), + ); + data_map.insert("media".to_string(), get_media_map(media)); + + data_map +} + +/// Video clip struct to hold some important states and comments for current media. +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct Media { + #[serde(skip_serializing, skip_deserializing)] + pub begin: Option, + + #[serde(skip_serializing, skip_deserializing)] + pub index: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub title: Option, + #[serde(rename = "in")] + pub seek: f64, + pub out: f64, + pub duration: f64, + + #[serde(skip_serializing, skip_deserializing)] + pub duration_audio: f64, + + #[serde( + default, + deserialize_with = "null_string", + skip_serializing_if = "is_empty_string" + )] + pub category: String, + #[serde(deserialize_with = "null_string")] + pub source: String, + + #[serde( + default, + deserialize_with = "null_string", + skip_serializing_if = "is_empty_string" + )] + pub audio: String, + + #[serde(skip_serializing, skip_deserializing)] + pub cmd: Option>, + + #[serde(skip_serializing, skip_deserializing)] + pub filter: Option, + + #[serde(default, skip_serializing_if = "is_empty_string")] + pub custom_filter: String, + + #[serde(skip_serializing, skip_deserializing)] + pub probe: Option, + + #[serde(skip_serializing, skip_deserializing)] + pub probe_audio: Option, + + #[serde(skip_serializing, skip_deserializing)] + pub last_ad: bool, + + #[serde(skip_serializing, skip_deserializing)] + pub next_ad: bool, + + #[serde(skip_serializing, skip_deserializing)] + pub process: Option, + + #[serde(default, skip_serializing)] + pub unit: ProcessUnit, +} + +impl Media { + pub fn new(index: usize, src: &str, do_probe: bool) -> Self { + let mut duration = 0.0; + let mut probe = None; + + if do_probe && (is_remote(src) || Path::new(src).is_file()) { + if let Ok(p) = MediaProbe::new(src) { + probe = Some(p.clone()); + + duration = p + .format + .duration + .unwrap_or_default() + .parse() + .unwrap_or_default(); + } + } + + Self { + begin: None, + index: Some(index), + title: None, + seek: 0.0, + out: duration, + duration, + duration_audio: 0.0, + category: String::new(), + source: src.to_string(), + audio: String::new(), + cmd: Some(vec_strings!["-i", src]), + filter: None, + custom_filter: String::new(), + probe, + probe_audio: None, + last_ad: false, + next_ad: false, + process: Some(true), + unit: Decoder, + } + } + + pub fn add_probe(&mut self, check_audio: bool) -> Result<(), String> { + let mut errors = vec![]; + + if self.probe.is_none() { + match MediaProbe::new(&self.source) { + Ok(probe) => { + self.probe = Some(probe.clone()); + + if let Some(dur) = probe + .format + .duration + .map(|d| d.parse().unwrap_or_default()) + .filter(|d| !is_close(*d, self.duration, 0.5)) + { + self.duration = dur; + + if self.out == 0.0 { + self.out = dur; + } + } + } + Err(e) => errors.push(e.to_string()), + }; + + if check_audio && Path::new(&self.audio).is_file() { + match MediaProbe::new(&self.audio) { + Ok(probe) => { + self.probe_audio = Some(probe.clone()); + + if !probe.audio_streams.is_empty() { + self.duration_audio = probe.audio_streams[0] + .duration + .clone() + .and_then(|d| d.parse::().ok()) + .unwrap_or_default() + } + } + Err(e) => errors.push(e.to_string()), + } + } + } + + if !errors.is_empty() { + return Err(errors.join(", ")); + } + + Ok(()) + } + + pub fn add_filter( + &mut self, + config: &PlayoutConfig, + filter_chain: &Option>>>, + ) { + let mut node = self.clone(); + self.filter = Some(filter_chains(config, &mut node, filter_chain)) + } +} + +impl PartialEq for Media { + fn eq(&self, other: &Self) -> bool { + self.title == other.title + && self.seek == other.seek + && self.out == other.out + && self.duration == other.duration + && self.source == other.source + && self.category == other.category + && self.audio == other.audio + && self.custom_filter == other.custom_filter + } +} + +impl Eq for Media {} + +fn null_string<'de, D>(d: D) -> Result +where + D: Deserializer<'de>, +{ + Deserialize::deserialize(d).map(|x: Option<_>| x.unwrap_or_default()) +} + +#[allow(clippy::trivially_copy_pass_by_ref)] +fn is_empty_string(st: &String) -> bool { + *st == String::new() +} + +/// We use the ffprobe crate, but we map the metadata to our needs. +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] +pub struct MediaProbe { + pub format: ffprobe::Format, + pub audio_streams: Vec, + pub video_streams: Vec, +} + +impl MediaProbe { + pub fn new(input: &str) -> Result { + let probe = ffprobe(input); + let mut a_stream = vec![]; + let mut v_stream = vec![]; + + match probe { + Ok(obj) => { + for stream in obj.streams { + let cp_stream = stream.clone(); + + if let Some(c_type) = cp_stream.codec_type { + match c_type.as_str() { + "audio" => a_stream.push(stream), + "video" => v_stream.push(stream), + _ => {} + } + } else { + error!("No codec type found for stream: {stream:?}") + } + } + + Ok(MediaProbe { + format: obj.format, + audio_streams: a_stream, + video_streams: v_stream, + }) + } + Err(e) => { + if !Path::new(input).is_file() && !is_remote(input) { + Err(ProcessError::Custom(format!( + "File {input} not exist!" + ))) + } else { + Err(ProcessError::Ffprobe(e)) + } + } + } + } +} + +/// Calculate fps from rate/factor string +pub fn fps_calc(r_frame_rate: &str, default: f64) -> f64 { + if let Some((r, f)) = r_frame_rate.split_once('/') { + if let (Ok(r_value), Ok(f_value)) = (r.parse::(), f.parse::()) { + return r_value / f_value; + } + } + + default +} + +pub fn json_reader(path: &PathBuf) -> Result { + let f = File::options().read(true).write(false).open(path)?; + let p = serde_json::from_reader(f)?; + + Ok(p) +} + +pub fn json_writer(path: &PathBuf, data: JsonPlaylist) -> Result<(), Error> { + let f = File::options() + .write(true) + .truncate(true) + .create(true) + .open(path)?; + serde_json::to_writer_pretty(f, &data)?; + + Ok(()) +} + +/// Write current status to status file in temp folder. +/// +/// The status file is init in main function and mostly modified in RPC server. +pub fn write_status(config: &PlayoutConfig, date: &str, shift: f64) { + let data = json!({ + "time_shift": shift, + "date": date, + }); + + match serde_json::to_string(&data) { + Ok(status) => { + if let Err(e) = fs::write(&config.general.stat_file, status) { + error!( + "Unable to write to status file {}: {e}", + config.general.stat_file + ) + }; + } + Err(e) => error!("Serialize status data failed: {e}"), + }; +} + +// pub fn get_timestamp() -> i32 { +// let local: DateTime = time_now(); + +// local.timestamp_millis() as i32 +// } + +/// Get current time in seconds. +pub fn time_in_seconds() -> f64 { + let local: DateTime = time_now(); + + (local.hour() * 3600 + local.minute() * 60 + local.second()) as f64 + + (local.nanosecond() as f64 / 1000000000.0) +} + +/// Get current date for playlist, but check time with conditions: +/// +/// - When time is before playlist start, get date from yesterday. +/// - When given next_start is over target length (normally a full day), get date from tomorrow. +pub fn get_date(seek: bool, start: f64, get_next: bool) -> String { + let local: DateTime = time_now(); + + if seek && start > time_in_seconds() { + return (local - TimeDelta::try_days(1).unwrap()) + .format("%Y-%m-%d") + .to_string(); + } + + if start == 0.0 && get_next && time_in_seconds() > 86397.9 { + return (local + TimeDelta::try_days(1).unwrap()) + .format("%Y-%m-%d") + .to_string(); + } + + local.format("%Y-%m-%d").to_string() +} + +pub fn time_from_header(headers: &header::HeaderMap) -> Option> { + if let Some(time) = headers.get(header::LAST_MODIFIED) { + if let Ok(t) = time.to_str() { + let time = DateTime::parse_from_rfc2822(t); + let date_time: DateTime = time.unwrap().into(); + return Some(date_time); + }; + } + + None +} + +/// Get file modification time. +pub fn modified_time(path: &str) -> Option { + if is_remote(path) { + let response = reqwest::blocking::Client::new().head(path).send(); + + if let Ok(resp) = response { + if resp.status().is_success() { + if let Some(time) = time_from_header(resp.headers()) { + return Some(time.to_string()); + } + } + } + + return None; + } + + if let Ok(time) = metadata(path).and_then(|metadata| metadata.modified()) { + let date_time: DateTime = time.into(); + return Some(date_time.to_string()); + } + + None +} + +/// Convert a formatted time string to seconds. +pub fn time_to_sec(time_str: &str) -> f64 { + if matches!(time_str, "now" | "" | "none") || !time_str.contains(':') { + return time_in_seconds(); + } + + let mut t = time_str.split(':').filter_map(|n| f64::from_str(n).ok()); + + t.next().unwrap_or(0.0) * 3600.0 + t.next().unwrap_or(0.0) * 60.0 + t.next().unwrap_or(0.0) +} + +/// Convert floating number (seconds) to a formatted time string. +pub fn sec_to_time(sec: f64) -> String { + let s = (sec * 1000.0).round() / 1000.0; + + format!( + "{:0>2}:{:0>2}:{:06.3}", + (s / 3600.0) as i32, + (s / 60.0 % 60.0) as i32, + (s % 60.0), + ) +} + +/// get file extension +pub fn file_extension(filename: &Path) -> Option<&str> { + filename.extension().and_then(OsStr::to_str) +} + +/// Test if given numbers are close to each other, +/// with a third number for setting the maximum range. +pub fn is_close(a: T, b: T, to: T) -> bool { + (a - b).abs() < to +} + +/// add duration from all media clips +pub fn sum_durations(clip_list: &[Media]) -> f64 { + clip_list.iter().map(|item| item.out).sum() +} + +/// Get delta between clip start and current time. This value we need to check, +/// if we still in sync. +/// +/// We also get here the global delta between clip start and time when a new playlist should start. +pub fn get_delta(config: &PlayoutConfig, begin: &f64) -> (f64, f64) { + let mut current_time = time_in_seconds(); + let start = config.playlist.start_sec.unwrap(); + let length = config.playlist.length_sec.unwrap_or(86400.0); + let mut target_length = 86400.0; + + if length > 0.0 && length != target_length { + target_length = length + } + + if begin == &start && start == 0.0 && 86400.0 - current_time < 4.0 { + current_time -= 86400.0 + } else if start >= current_time && begin != &start { + current_time += 86400.0 + } + + let mut current_delta = begin - current_time; + + if is_close( + current_delta.abs(), + 86400.0, + config.general.stop_threshold + 2.0, + ) { + current_delta = current_delta.abs() - 86400.0 + } + + let total_delta = if current_time < start { + start - current_time + } else { + target_length + start - current_time + }; + + (current_delta, total_delta) +} + +/// Loop image until target duration is reached. +pub fn loop_image(node: &Media) -> Vec { + let duration = node.out - node.seek; + let mut source_cmd: Vec = vec_strings!["-loop", "1", "-i", node.source.clone()]; + + info!( + "Loop image {}, total duration: {duration:.2}", + node.source + ); + + if Path::new(&node.audio).is_file() { + if node.seek > 0.0 { + source_cmd.append(&mut vec_strings!["-ss", node.seek]) + } + + source_cmd.append(&mut vec_strings!["-i", node.audio.clone()]); + } + + source_cmd.append(&mut vec_strings!["-t", duration]); + + source_cmd +} + +/// Loop filler until target duration is reached. +pub fn loop_filler(node: &Media) -> Vec { + let loop_count = (node.out / node.duration).ceil() as i32; + let mut source_cmd = vec![]; + + if loop_count > 1 { + info!("Loop {} {loop_count} times, total duration: {:.2}", node.source, node.out); + + source_cmd.append(&mut vec_strings!["-stream_loop", loop_count]); + } + + source_cmd.append(&mut vec_strings!["-i", node.source, "-t", node.out]); + + source_cmd +} + +/// Set clip seek in and length value. +pub fn seek_and_length(node: &mut Media) -> Vec { + let loop_count = (node.out / node.duration).ceil() as i32; + let mut source_cmd = vec![]; + let mut cut_audio = false; + let mut loop_audio = false; + let remote_source = is_remote(&node.source); + + if remote_source && node.probe.clone().and_then(|f| f.format.duration).is_none() { + node.out -= node.seek; + node.seek = 0.0; + } else if node.seek > 0.5 { + source_cmd.append(&mut vec_strings!["-ss", node.seek]) + } + + if loop_count > 1 { + info!("Loop {} {loop_count} times, total duration: {:.2}", node.source, node.out); + + source_cmd.append(&mut vec_strings!["-stream_loop", loop_count]); + } + + source_cmd.append(&mut vec_strings!["-i", node.source.clone()]); + + if node.duration > node.out || remote_source || loop_count > 1 { + source_cmd.append(&mut vec_strings!["-t", node.out - node.seek]); + } + + if !node.audio.is_empty() { + if node.seek > 0.5 { + source_cmd.append(&mut vec_strings!["-ss", node.seek]); + } + + if node.duration_audio > node.out { + cut_audio = true; + } else if node.duration_audio < node.out { + source_cmd.append(&mut vec_strings!["-stream_loop", -1]); + loop_audio = true; + } + + source_cmd.append(&mut vec_strings!["-i", node.audio.clone()]); + + if cut_audio || loop_audio || remote_source { + source_cmd.append(&mut vec_strings!["-t", node.out - node.seek]); + } + } + + source_cmd +} + +/// Create a dummy clip as a placeholder for missing video files. +pub fn gen_dummy(config: &PlayoutConfig, duration: f64) -> (String, Vec) { + let color = "#121212"; + let source = format!( + "color=c={color}:s={}x{}:d={duration}", + config.processing.width, config.processing.height + ); + let cmd: Vec = vec_strings![ + "-f", + "lavfi", + "-i", + format!( + "{source}:r={},format=pix_fmts=yuv420p", + config.processing.fps + ), + "-f", + "lavfi", + "-i", + format!("anoisesrc=d={duration}:c=pink:r=48000:a=0.3") + ]; + + (source, cmd) +} + +// fn get_output_count(cmd: &[String]) -> i32 { +// let mut count = 0; + +// if let Some(index) = cmd.iter().position(|c| c == "-var_stream_map") { +// if let Some(mapping) = cmd.get(index + 1) { +// return mapping.split(' ').count() as i32; +// }; +// }; + +// for (i, param) in cmd.iter().enumerate() { +// if i > 0 && !param.starts_with('-') && !cmd[i - 1].starts_with('-') { +// count += 1; +// } +// } + +// count +// } + +pub fn is_remote(path: &str) -> bool { + Regex::new(r"^(https?|rtmps?|rts?p|udp|tcp|srt)://.*") + .unwrap() + .is_match(&path.to_lowercase()) +} + +/// Check if file can include or has to exclude. +/// For example when a file is on given HLS output path, it should exclude. +/// Or when the file extension is set under storage config it can be include. +pub fn include_file_extension(config: &PlayoutConfig, file_path: &Path) -> bool { + let mut include = false; + + if let Some(ext) = file_extension(file_path) { + if config.storage.extensions.contains(&ext.to_lowercase()) { + include = true; + } + } + + if config.out.mode == HLS { + if let Some(ts_path) = config + .out + .output_cmd + .clone() + .unwrap_or_else(|| vec![String::new()]) + .iter() + .find(|s| s.contains(".ts")) + { + if let Some(p) = Path::new(ts_path).parent() { + if file_path.starts_with(p) { + include = false; + } + } + } + + if let Some(m3u8_path) = config + .out + .output_cmd + .clone() + .unwrap_or_else(|| vec![String::new()]) + .iter() + .find(|s| s.contains(".m3u8") && !s.contains("master.m3u8")) + { + if let Some(p) = Path::new(m3u8_path).parent() { + if file_path.starts_with(p) { + include = false; + } + } + } + } + + include +} + +/// Read ffmpeg stderr decoder and encoder instance +/// and log the output. +pub fn stderr_reader( + buffer: BufReader, + ignore: Vec, + suffix: ProcessUnit, + proc_control: ProcessControl, +) -> Result<(), Error> { + for line in buffer.lines() { + let line = line?; + + if FFMPEG_IGNORE_ERRORS.iter().any(|i| line.contains(*i)) + || ignore.iter().any(|i| line.contains(i)) + { + continue; + } + + if line.contains("[info]") { + info!( + "[{suffix}] {}", + line.replace("[info] ", "") + ) + } else if line.contains("[warning]") { + warn!( + "[{suffix}] {}", + line.replace("[warning] ", "") + ) + } else if line.contains("[error]") || line.contains("[fatal]") { + error!( + "[{suffix}] {}", + line.replace("[error] ", "").replace("[fatal] ", "") + ); + + if FFMPEG_UNRECOVERABLE_ERRORS + .iter() + .any(|i| line.contains(*i)) + || (line.contains("No such file or directory") + && !line.contains("failed to delete old segment")) + { + proc_control.stop_all(); + exit(1); + } + } + } + + Ok(()) +} + +/// Run program to test if it is in system. +fn is_in_system(name: &str) -> Result<(), String> { + match Command::new(name) + .stderr(Stdio::null()) + .stdout(Stdio::null()) + .spawn() + { + Ok(mut proc) => { + if let Err(e) = proc.wait() { + return Err(format!("{e}")); + }; + } + Err(e) => return Err(format!("{name} not found on system! {e}")), + } + + Ok(()) +} + +fn ffmpeg_filter_and_libs(config: &mut PlayoutConfig) -> Result<(), String> { + let ignore_flags = [ + "--enable-gpl", + "--enable-version3", + "--enable-runtime-cpudetect", + "--enable-avfilter", + "--enable-zlib", + "--enable-pic", + "--enable-nonfree", + ]; + + let mut ff_proc = match Command::new("ffmpeg") + .args(["-filters"]) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn() + { + Err(e) => { + return Err(format!("couldn't spawn ffmpeg process: {e}")); + } + Ok(proc) => proc, + }; + + let out_buffer = BufReader::new(ff_proc.stdout.take().unwrap()); + let err_buffer = BufReader::new(ff_proc.stderr.take().unwrap()); + + // stderr shows only the ffmpeg configuration + // get codec library's + for line in err_buffer.lines().map_while(Result::ok) { + if line.contains("configuration:") { + let configs = line.split_whitespace(); + + for flag in configs { + if flag.contains("--enable") && !ignore_flags.contains(&flag) { + config + .general + .ffmpeg_libs + .push(flag.replace("--enable-", "")); + } + } + break; + } + } + + // stdout shows filter from ffmpeg + // get filters + for line in out_buffer.lines().map_while(Result::ok) { + if line.contains('>') { + let filter_line = line.split_whitespace().collect::>(); + + if filter_line.len() > 2 { + config + .general + .ffmpeg_filters + .push(filter_line[1].to_string()) + } + } + } + + if let Err(e) = ff_proc.wait() { + error!("{e}") + }; + + Ok(()) +} + +/// Validate ffmpeg/ffprobe/ffplay. +/// +/// Check if they are in system and has all libs and codecs we need. +pub fn validate_ffmpeg(config: &mut PlayoutConfig) -> Result<(), String> { + is_in_system("ffmpeg")?; + is_in_system("ffprobe")?; + + if config.out.mode == Desktop { + is_in_system("ffplay")?; + } + + ffmpeg_filter_and_libs(config)?; + + if config + .out + .output_cmd + .as_ref() + .unwrap() + .contains(&"libx264".to_string()) + && !config.general.ffmpeg_libs.contains(&"libx264".to_string()) + { + return Err("ffmpeg contains no libx264!".to_string()); + } + + if config.text.add_text + && !config.text.text_from_filename + && !config.general.ffmpeg_libs.contains(&"libzmq".to_string()) + { + return Err( + "ffmpeg contains no libzmq! Disable add_text in config or compile ffmpeg with libzmq." + .to_string(), + ); + } + + if config + .out + .output_cmd + .as_ref() + .unwrap() + .contains(&"libfdk_aac".to_string()) + && !config + .general + .ffmpeg_libs + .contains(&"libfdk-aac".to_string()) + { + return Err("ffmpeg contains no libfdk-aac!".to_string()); + } + + Ok(()) +} + +/// get a free tcp socket +pub fn free_tcp_socket(exclude_socket: String) -> Option { + for _ in 0..100 { + let port = rand::thread_rng().gen_range(45321..54268); + let socket = format!("127.0.0.1:{port}"); + + if socket != exclude_socket && TcpListener::bind(("127.0.0.1", port)).is_ok() { + return Some(socket); + } + } + + None +} + +/// check if tcp port is free +pub fn test_tcp_port(url: &str) -> bool { + let re = Regex::new(r"^[\w]+\://").unwrap(); + let mut addr = url.to_string(); + + if re.is_match(url) { + addr = re.replace(url, "").to_string(); + } + + if let Some(socket) = addr.split_once(':') { + if TcpListener::bind(( + socket.0, + socket.1.to_string().parse::().unwrap_or_default(), + )) + .is_ok() + { + return true; + } + }; + + error!("Address {url} already in use!"); + + false +} + +/// Generate a vector with dates, from given range. +pub fn get_date_range(date_range: &[String]) -> Vec { + let mut range = vec![]; + + let start = match NaiveDate::parse_from_str(&date_range[0], "%Y-%m-%d") { + Ok(s) => s, + Err(_) => { + error!("date format error in: {:?}", date_range[0]); + exit(1); + } + }; + + let end = match NaiveDate::parse_from_str(&date_range[2], "%Y-%m-%d") { + Ok(e) => e, + Err(_) => { + error!("date format error in: {:?}", date_range[2]); + exit(1); + } + }; + + let duration = end.signed_duration_since(start); + let days = duration.num_days() + 1; + + for day in 0..days { + range.push( + (start + TimeDelta::try_days(day).unwrap()) + .format("%Y-%m-%d") + .to_string(), + ); + } + + range +} + +pub fn parse_log_level_filter(s: &str) -> Result { + match s.to_lowercase().as_str() { + "debug" => Ok(LevelFilter::Debug), + "error" => Ok(LevelFilter::Error), + "info" => Ok(LevelFilter::Info), + "trace" => Ok(LevelFilter::Trace), + "warning" => Ok(LevelFilter::Warn), + "off" => Ok(LevelFilter::Off), + _ => Err("Error level not exists!"), + } +} + +pub fn custom_format(template: &str, args: &[T]) -> String { + let mut filled_template = String::new(); + let mut arg_iter = args.iter().map(|x| format!("{}", x)); + let mut template_iter = template.chars(); + + while let Some(c) = template_iter.next() { + if c == '{' { + if let Some(nc) = template_iter.next() { + if nc == '{' { + filled_template.push('{'); + } else if nc == '}' { + if let Some(arg) = arg_iter.next() { + filled_template.push_str(&arg); + } else { + filled_template.push(c); + filled_template.push(nc); + } + } else if let Some(n) = nc.to_digit(10) { + filled_template.push_str(&args[n as usize].to_string()); + } else { + filled_template.push(nc); + } + } + } else if c == '}' { + if let Some(nc) = template_iter.next() { + if nc == '}' { + filled_template.push('}'); + continue; + } else { + filled_template.push(nc); + } + } + } else { + filled_template.push(c); + } + } + + filled_template +} + +/// Get system time, in non test/debug case. +#[cfg(not(any(test, debug_assertions)))] +pub fn time_now() -> DateTime { + Local::now() +} + +/// Get mocked system time, in test/debug case. +#[cfg(any(test, debug_assertions))] +pub mod mock_time { + use super::*; + use std::cell::RefCell; + + thread_local! { + static DATE_TIME_DIFF: RefCell> = const { RefCell::new(None) }; + } + + pub fn time_now() -> DateTime { + DATE_TIME_DIFF.with(|cell| match cell.borrow().as_ref().cloned() { + Some(diff) => Local::now() - diff, + None => Local::now(), + }) + } + + pub fn set_mock_time(date_time: &str) { + if let Ok(d) = NaiveDateTime::parse_from_str(date_time, "%Y-%m-%dT%H:%M:%S") { + let time = Local.from_local_datetime(&d).unwrap(); + + DATE_TIME_DIFF.with(|cell| *cell.borrow_mut() = Some(Local::now() - time)); + } + } +} + +#[cfg(any(test, debug_assertions))] +pub use mock_time::time_now; diff --git a/ffplayout/src/utils/errors.rs b/ffplayout/src/utils/errors.rs index 09ba63f2..e66bb617 100644 --- a/ffplayout/src/utils/errors.rs +++ b/ffplayout/src/utils/errors.rs @@ -2,6 +2,7 @@ use std::io; use actix_web::{error::ResponseError, Error, HttpResponse}; use derive_more::Display; +use ffprobe::FfProbeError; #[derive(Debug, Display)] pub enum ServiceError { @@ -113,6 +114,8 @@ pub enum ProcessError { #[display(fmt = "IO error: {}", _0)] IO(io::Error), #[display(fmt = "{}", _0)] + Ffprobe(FfProbeError), + #[display(fmt = "{}", _0)] Custom(String), } @@ -122,6 +125,12 @@ impl From for ProcessError { } } +impl From for ProcessError { + fn from(err: FfProbeError) -> Self { + Self::Ffprobe(err) + } +} + impl From for ProcessError { fn from(err: lettre::address::AddressError) -> ProcessError { ProcessError::Custom(err.to_string()) @@ -139,3 +148,9 @@ impl From for ProcessError { ProcessError::Custom(err.to_string()) } } + +impl From> for ProcessError { + fn from(err: std::sync::PoisonError) -> ProcessError { + ProcessError::Custom(err.to_string()) + } +} diff --git a/ffplayout/src/utils/logging.rs b/ffplayout/src/utils/logging.rs index 20fd612e..2dee700e 100644 --- a/ffplayout/src/utils/logging.rs +++ b/ffplayout/src/utils/logging.rs @@ -407,3 +407,24 @@ pub fn init_logging(mail_queues: Arc>>>>) -> io:: Ok(()) } + +/// Format ingest and HLS logging output +pub fn log_line(line: &str, level: &str) { + if line.contains("[info]") && level.to_lowercase() == "info" { + info!("[Server] {}", line.replace("[info] ", "")) + } else if line.contains("[warning]") + && (level.to_lowercase() == "warning" || level.to_lowercase() == "info") + { + warn!( + "[Server] {}", + line.replace("[warning] ", "") + ) + } else if line.contains("[error]") + && !line.contains("Input/output error") + && !line.contains("Broken pipe") + { + error!("[Server] {}", line.replace("[error] ", "")); + } else if line.contains("[fatal]") { + error!("[Server] {}", line.replace("[fatal] ", "")) + } +} diff --git a/ffplayout/src/utils/mod.rs b/ffplayout/src/utils/mod.rs index bd24322a..23ec6406 100644 --- a/ffplayout/src/utils/mod.rs +++ b/ffplayout/src/utils/mod.rs @@ -31,6 +31,7 @@ pub mod files; pub mod logging; pub mod playlist; pub mod system; +pub mod task_runner; use crate::db::{ db_pool, diff --git a/ffplayout/src/utils/task_runner.rs b/ffplayout/src/utils/task_runner.rs new file mode 100644 index 00000000..2e64429d --- /dev/null +++ b/ffplayout/src/utils/task_runner.rs @@ -0,0 +1,26 @@ +use std::process::Command; + +use log::*; + +use crate::player::utils::get_data_map; + +use ffplayout_lib::utils::{config::PlayoutConfig, Media, PlayoutStatus}; + +pub fn run(config: PlayoutConfig, node: Media, playout_stat: PlayoutStatus, server_running: bool) { + let obj = + serde_json::to_string(&get_data_map(&config, node, &playout_stat, server_running)).unwrap(); + trace!("Run task: {obj}"); + + match Command::new(config.task.path).arg(obj).spawn() { + Ok(mut c) => { + let status = c.wait().expect("Error in waiting for the task process!"); + + if !status.success() { + error!("Process stops with error."); + } + } + Err(e) => { + error!("Couldn't spawn task runner: {e}") + } + } +}