From 52ce6197f0898c393fdf31e9599aa6a8651b82ea Mon Sep 17 00:00:00 2001 From: jb-alvarado Date: Fri, 13 May 2022 12:17:50 +0200 Subject: [PATCH 01/12] cleanup imports, allow config path in assets --- src/output/stream.rs | 5 +---- src/utils/config.rs | 19 +++++++++++-------- src/utils/mod.rs | 14 +++++--------- 3 files changed, 17 insertions(+), 21 deletions(-) diff --git a/src/output/stream.rs b/src/output/stream.rs index 2f74d3bc..cacf0777 100644 --- a/src/output/stream.rs +++ b/src/output/stream.rs @@ -1,7 +1,4 @@ -use std::{ - process, - process::{Command, Stdio}, -}; +use std::process::{self, Command, Stdio}; use simplelog::*; diff --git a/src/utils/config.rs b/src/utils/config.rs index 08886368..5ce0c25a 100644 --- a/src/utils/config.rs +++ b/src/utils/config.rs @@ -138,22 +138,25 @@ impl GlobalConfig { /// Read config from YAML file, and set some extra config values. pub fn new() -> Self { let args = get_args(); - let mut config_path = match env::current_exe() { - Ok(path) => path.parent().unwrap().join("ffplayout.yml"), - Err(_) => PathBuf::from("./ffplayout.yml"), - }; + let mut config_path = PathBuf::from("/etc/ffplayout/ffplayout.yml"); if let Some(cfg) = args.config { config_path = PathBuf::from(cfg); - } else if Path::new("/etc/ffplayout/ffplayout.yml").is_file() { - config_path = PathBuf::from("/etc/ffplayout/ffplayout.yml"); + } + + if !config_path.is_file() { + if Path::new("./assets/ffplayout.yml").is_file() { + config_path = PathBuf::from("./assets/ffplayout.yml") + } else if let Some(p) = env::current_exe().ok().as_ref().and_then(|op| op.parent()) { + config_path = p.join("ffplayout.yml") + }; } let f = match File::open(&config_path) { Ok(file) => file, - Err(err) => { + Err(_) => { println!( - "{config_path:?} doesn't exists!\nPut \"ffplayout.yml\" in \"/etc/playout/\" or beside the executable!\n\nSystem error: {err}" + "{config_path:?} doesn't exists!\nPut \"ffplayout.yml\" in \"/etc/playout/\" or beside the executable!" ); process::exit(0x0100); } diff --git a/src/utils/mod.rs b/src/utils/mod.rs index e3e11af3..f688f2be 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -1,17 +1,13 @@ -use chrono::prelude::*; -use chrono::Duration; -use ffprobe::{ffprobe, Format, Stream}; use std::{ - fs, - fs::metadata, + fs::{self, metadata}, io::{BufRead, BufReader, Error}, path::Path, - process::exit, - process::{ChildStderr, Command, Stdio}, - time, - time::UNIX_EPOCH, + process::{exit, ChildStderr, Command, Stdio}, + time::{self, UNIX_EPOCH}, }; +use chrono::{prelude::*, Duration}; +use ffprobe::{ffprobe, Format, Stream}; use regex::Regex; use serde::{Deserialize, Serialize}; use serde_json::json; From 2902647200c20d67a6307f37caa58ebe8c6b4527 Mon Sep 17 00:00:00 2001 From: jb-alvarado Date: Fri, 13 May 2022 15:59:09 +0200 Subject: [PATCH 02/12] remove some unstable unwarps, clode cleanup --- src/filter/a_loudnorm.rs | 2 +- src/filter/ingest_filter.rs | 1 + src/filter/mod.rs | 145 ++++++++++++++++++++---------------- src/input/folder.rs | 18 ++--- src/input/playlist.rs | 40 +++++----- src/rpc/mod.rs | 1 - src/utils/config.rs | 1 + src/utils/logging.rs | 28 +++---- src/utils/mod.rs | 66 ++++++++-------- 9 files changed, 157 insertions(+), 145 deletions(-) diff --git a/src/filter/a_loudnorm.rs b/src/filter/a_loudnorm.rs index 2d503372..efc6d3b6 100644 --- a/src/filter/a_loudnorm.rs +++ b/src/filter/a_loudnorm.rs @@ -5,7 +5,7 @@ use crate::utils::GlobalConfig; /// Add loudness normalization. pub fn filter_node(config: &GlobalConfig) -> String { format!( - ",loudnorm=I={}:TP={}:LRA={}", + "loudnorm=I={}:TP={}:LRA={}", config.processing.loud_i, config.processing.loud_tp, config.processing.loud_lra ) } diff --git a/src/filter/ingest_filter.rs b/src/filter/ingest_filter.rs index fb03c8db..9838ee89 100644 --- a/src/filter/ingest_filter.rs +++ b/src/filter/ingest_filter.rs @@ -8,6 +8,7 @@ fn audio_filter(config: &GlobalConfig) -> String { let mut audio_chain = ";[0:a]afade=in:st=0:d=0.5".to_string(); if config.processing.loudnorm_ingest { + audio_chain.push(','); audio_chain.push_str(&a_loudnorm::filter_node(config)); } diff --git a/src/filter/mod.rs b/src/filter/mod.rs index aeae829a..b85d5d46 100644 --- a/src/filter/mod.rs +++ b/src/filter/mod.rs @@ -13,8 +13,8 @@ use crate::utils::{get_delta, is_close, GlobalConfig, Media}; struct Filters { audio_chain: Option, video_chain: Option, - audio_map: Option, - video_map: Option, + audio_map: String, + video_map: String, } impl Filters { @@ -22,8 +22,8 @@ impl Filters { Filters { audio_chain: None, video_chain: None, - audio_map: Some("0:a".to_string()), - video_map: Some("0:v".to_string()), + audio_map: "1:a".to_string(), + video_map: "0:v".to_string(), } } @@ -41,10 +41,9 @@ impl Filters { if filter.contains("aevalsrc") || filter.contains("anoisesrc") { self.audio_chain = Some(filter.to_string()); } else { - self.audio_chain = - Some(format!("[{}]{filter}", self.audio_map.clone().unwrap())); + self.audio_chain = Some(format!("[{}]{filter}", self.audio_map.clone())); } - self.audio_map = Some("[aout1]".to_string()); + self.audio_map = "[aout1]".to_string(); } }, "video" => match &self.video_chain { @@ -57,7 +56,7 @@ impl Filters { } None => { self.video_chain = Some(format!("[0:v]{filter}")); - self.video_map = Some("[vout1]".to_string()); + self.video_map = "[vout1]".to_string(); } }, _ => (), @@ -101,18 +100,30 @@ fn fps(fps: f64, chain: &mut Filters, config: &GlobalConfig) { } } -fn scale(width: i64, height: i64, aspect: f64, chain: &mut Filters, config: &GlobalConfig) { - if width != config.processing.width || height != config.processing.height { +fn scale(v_stream: &ffprobe::Stream, aspect: f64, chain: &mut Filters, config: &GlobalConfig) { + // width: i64, height: i64 + if let (Some(w), Some(h)) = (v_stream.width, v_stream.height) { + if w != config.processing.width || h != config.processing.height { + chain.add_filter( + &format!( + "scale={}:{}", + config.processing.width, config.processing.height + ), + "video", + ) + } + + if !is_close(aspect, config.processing.aspect, 0.03) { + chain.add_filter(&format!("setdar=dar={}", config.processing.aspect), "video") + } + } else { chain.add_filter( &format!( "scale={}:{}", config.processing.width, config.processing.height ), "video", - ) - } - - if !is_close(aspect, config.processing.aspect, 0.03) { + ); chain.add_filter(&format!("setdar=dar={}", config.processing.aspect), "video") } } @@ -161,20 +172,22 @@ fn overlay(node: &mut Media, chain: &mut Filters, config: &GlobalConfig) { } fn extend_video(node: &mut Media, chain: &mut Filters) { - let video_streams = node.probe.clone().unwrap().video_streams.unwrap(); - if !video_streams.is_empty() { - if let Some(duration) = &video_streams[0].duration { - let duration_float = duration.clone().parse::().unwrap(); + if let Some(duration) = node + .probe + .as_ref() + .and_then(|p| p.video_streams.as_ref()) + .and_then(|v| v[0].duration.as_ref()) + { + let duration_float = duration.clone().parse::().unwrap(); - if node.out - node.seek > duration_float - node.seek + 0.1 { - chain.add_filter( - &format!( - "tpad=stop_mode=add:stop_duration={}", - (node.out - node.seek) - (duration_float - node.seek) - ), - "video", - ) - } + if node.out - node.seek > duration_float - node.seek + 0.1 { + chain.add_filter( + &format!( + "tpad=stop_mode=add:stop_duration={}", + (node.out - node.seek) - (duration_float - node.seek) + ), + "video", + ) } } } @@ -198,8 +211,13 @@ fn add_text(node: &mut Media, chain: &mut Filters, config: &GlobalConfig) { } fn add_audio(node: &mut Media, chain: &mut Filters) { - let audio_streams = node.probe.clone().unwrap().audio_streams.unwrap(); - if audio_streams.is_empty() { + if node + .probe + .as_ref() + .and_then(|p| p.audio_streams.as_ref()) + .unwrap_or(&vec![]) + .is_empty() + { warn!("Clip: '{}' has no audio!", node.source); let audio = format!( "aevalsrc=0:channel_layout=stereo:duration={}:sample_rate=48000", @@ -210,29 +228,29 @@ fn add_audio(node: &mut Media, chain: &mut Filters) { } fn extend_audio(node: &mut Media, chain: &mut Filters) { - let audio_streams = node.probe.clone().unwrap().audio_streams.unwrap(); - if !audio_streams.is_empty() { - if let Some(duration) = &audio_streams[0].duration { - let duration_float = duration.clone().parse::().unwrap(); + if let Some(duration) = node + .probe + .as_ref() + .and_then(|p| p.audio_streams.as_ref()) + .and_then(|a| a[0].duration.as_ref()) + { + let duration_float = duration.clone().parse::().unwrap(); - if node.out - node.seek > duration_float - node.seek + 0.1 { - chain.add_filter(&format!("apad=whole_dur={}", node.out - node.seek), "audio") - } + if node.out - node.seek > duration_float - node.seek + 0.1 { + chain.add_filter(&format!("apad=whole_dur={}", node.out - node.seek), "audio") } } } /// Add single pass loudnorm filter to audio line. fn add_loudnorm(node: &mut Media, chain: &mut Filters, config: &GlobalConfig) { - if node.probe.is_some() + if config.processing.add_loudnorm && !node .probe - .clone() - .unwrap() - .audio_streams - .unwrap() + .as_ref() + .and_then(|p| p.audio_streams.as_ref()) + .unwrap_or(&vec![]) .is_empty() - && config.processing.add_loudnorm { let loud_filter = a_loudnorm::filter_node(config); chain.add_filter(&loud_filter, "audio"); @@ -259,7 +277,7 @@ fn aspect_calc(aspect_string: &Option, config: &GlobalConfig) -> f64 { } fn fps_calc(r_frame_rate: &str) -> f64 { - let frame_rate_vec: Vec<&str> = r_frame_rate.split('/').collect(); + let frame_rate_vec = r_frame_rate.split('/').collect::>(); let rate: f64 = frame_rate_vec[0].parse().unwrap(); let factor: f64 = frame_rate_vec[1].parse().unwrap(); let fps: f64 = rate / factor; @@ -294,29 +312,24 @@ fn realtime_filter(node: &mut Media, chain: &mut Filters, config: &GlobalConfig, pub fn filter_chains(config: &GlobalConfig, node: &mut Media) -> Vec { let mut filters = Filters::new(); - let mut audio_map = "1:a".to_string(); - filters.audio_map = Some(audio_map); - if let Some(probe) = node.probe.clone() { + if let Some(probe) = node.probe.as_ref() { if probe.audio_streams.is_some() { - audio_map = "0:a".to_string(); - filters.audio_map = Some(audio_map); + filters.audio_map = "0:a".to_string(); } - let v_stream = &probe.video_streams.unwrap()[0]; - let aspect = aspect_calc(&v_stream.display_aspect_ratio, config); - let frame_per_sec = fps_calc(&v_stream.r_frame_rate); + if let Some(v_streams) = &probe.video_streams.as_ref() { + let v_stream = &v_streams[0]; + + let aspect = aspect_calc(&v_stream.display_aspect_ratio, config); + let frame_per_sec = fps_calc(&v_stream.r_frame_rate); + + deinterlace(&v_stream.field_order, &mut filters); + pad(aspect, &mut filters, config); + fps(frame_per_sec, &mut filters, config); + scale(v_stream, aspect, &mut filters, config); + } - deinterlace(&v_stream.field_order, &mut filters); - pad(aspect, &mut filters, config); - fps(frame_per_sec, &mut filters, config); - scale( - v_stream.width.unwrap(), - v_stream.height.unwrap(), - aspect, - &mut filters, - config, - ); extend_video(node, &mut filters); add_audio(node, &mut filters); @@ -339,8 +352,8 @@ pub fn filter_chains(config: &GlobalConfig, node: &mut Media) -> Vec { if let Some(v_filters) = filters.video_chain { filter_str.push_str(v_filters.as_str()); - filter_str.push_str(filters.video_map.clone().unwrap().as_str()); - filter_map.append(&mut vec!["-map".to_string(), filters.video_map.unwrap()]); + filter_str.push_str(filters.video_map.clone().as_str()); + filter_map.append(&mut vec!["-map".to_string(), filters.video_map]); } else { filter_map.append(&mut vec!["-map".to_string(), "0:v".to_string()]); } @@ -350,10 +363,10 @@ pub fn filter_chains(config: &GlobalConfig, node: &mut Media) -> Vec { filter_str.push(';') } filter_str.push_str(a_filters.as_str()); - filter_str.push_str(filters.audio_map.clone().unwrap().as_str()); - filter_map.append(&mut vec!["-map".to_string(), filters.audio_map.unwrap()]); + filter_str.push_str(filters.audio_map.clone().as_str()); + filter_map.append(&mut vec!["-map".to_string(), filters.audio_map]); } else { - filter_map.append(&mut vec!["-map".to_string(), filters.audio_map.unwrap()]); + filter_map.append(&mut vec!["-map".to_string(), filters.audio_map]); } if filter_str.len() > 10 { diff --git a/src/input/folder.rs b/src/input/folder.rs index aa6454e0..3172922a 100644 --- a/src/input/folder.rs +++ b/src/input/folder.rs @@ -51,17 +51,15 @@ impl FolderSource { for entry in WalkDir::new(config.storage.path.clone()) .into_iter() - .filter_map(|e| e.ok()) + .flat_map(|e| e.ok()) + .filter(|f| f.path().is_file()) { - if entry.path().is_file() { - let ext = file_extension(entry.path()); - - if ext.is_some() - && config - .storage - .extensions - .clone() - .contains(&ext.unwrap().to_lowercase()) + if let Some(ext) = file_extension(entry.path()) { + if config + .storage + .extensions + .clone() + .contains(&ext.to_lowercase()) { let media = Media::new(0, entry.path().display().to_string(), false); media_list.push(media); diff --git a/src/input/playlist.rs b/src/input/playlist.rs index 2b7bbad3..983d5d46 100644 --- a/src/input/playlist.rs +++ b/src/input/playlist.rs @@ -80,30 +80,28 @@ impl CurrentProgram { } else if Path::new(&self.json_path.clone().unwrap()).is_file() { let mod_time = modified_time(&self.json_path.clone().unwrap()); - if !mod_time - .unwrap() - .to_string() - .eq(&self.json_mod.clone().unwrap()) - { - // when playlist has changed, reload it - info!( - "Reload playlist {}", - self.json_path.clone().unwrap() - ); + if let Some(m) = mod_time { + if !m.to_string().eq(&self.json_mod.clone().unwrap()) { + // when playlist has changed, reload it + info!( + "Reload playlist {}", + self.json_path.clone().unwrap() + ); - let json = read_json( - &self.config, - self.json_path.clone(), - self.is_terminated.clone(), - false, - 0.0, - ); + let json = read_json( + &self.config, + self.json_path.clone(), + self.is_terminated.clone(), + false, + 0.0, + ); - self.json_mod = json.modified; - *self.nodes.lock().unwrap() = json.program; + self.json_mod = json.modified; + *self.nodes.lock().unwrap() = json.program; - self.get_current_clip(); - self.index.fetch_add(1, Ordering::SeqCst); + self.get_current_clip(); + self.index.fetch_add(1, Ordering::SeqCst); + } } } else { error!( diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index 75d2a645..71c300c7 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -230,7 +230,6 @@ pub fn json_rpc_server( && request.headers()["authorization"] == auth { if request.uri() == "/status" { - println!("{:?}", request.headers().contains_key("authorization")); Response::ok("Server running OK.").into() } else { request.into() diff --git a/src/utils/config.rs b/src/utils/config.rs index 5ce0c25a..0bdc7c41 100644 --- a/src/utils/config.rs +++ b/src/utils/config.rs @@ -240,6 +240,7 @@ impl GlobalConfig { if let Some(folder) = args.folder { config.storage.path = folder; + config.processing.mode = "folder".into(); } if let Some(start) = args.start { diff --git a/src/utils/logging.rs b/src/utils/logging.rs index 4abaa7b9..19f239c6 100644 --- a/src/utils/logging.rs +++ b/src/utils/logging.rs @@ -26,28 +26,30 @@ use crate::utils::GlobalConfig; /// send log messages to mail recipient fn send_mail(cfg: &GlobalConfig, msg: String) { - let email = Message::builder() + if let Ok(email) = Message::builder() .from(cfg.mail.sender_addr.parse().unwrap()) .to(cfg.mail.recipient.parse().unwrap()) .subject(cfg.mail.subject.clone()) .header(header::ContentType::TEXT_PLAIN) .body(clean_string(&msg)) - .unwrap(); + { + let credentials = + Credentials::new(cfg.mail.sender_addr.clone(), cfg.mail.sender_pass.clone()); - let credentials = Credentials::new(cfg.mail.sender_addr.clone(), cfg.mail.sender_pass.clone()); + let mut transporter = SmtpTransport::relay(cfg.mail.smtp_server.clone().as_str()); - let mut transporter = SmtpTransport::relay(cfg.mail.smtp_server.clone().as_str()); + if cfg.mail.starttls { + transporter = SmtpTransport::starttls_relay(cfg.mail.smtp_server.clone().as_str()) + } - if cfg.mail.starttls { - transporter = SmtpTransport::starttls_relay(cfg.mail.smtp_server.clone().as_str()) - } + let mailer = transporter.unwrap().credentials(credentials).build(); - let mailer = transporter.unwrap().credentials(credentials).build(); - - // Send the email - match mailer.send(&email) { - Ok(_) => (), - Err(e) => info!("Could not send email: {:?}", e), + // Send the email + if let Err(e) = mailer.send(&email) { + error!("Could not send email: {:?}", e) + } + } else { + error!("Mail Message failed!") } } diff --git a/src/utils/mod.rs b/src/utils/mod.rs index f688f2be..638c2a98 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -69,16 +69,19 @@ pub struct Media { impl Media { pub fn new(index: usize, src: String, do_probe: bool) -> Self { - let mut duration: f64 = 0.0; + let mut duration = 0.0; let mut probe = None; if do_probe && Path::new(&src).is_file() { probe = Some(MediaProbe::new(src.clone())); - duration = match probe.clone().unwrap().format.unwrap().duration { - Some(dur) => dur.parse().unwrap(), - None => 0.0, - }; + if let Some(dur) = probe + .as_ref() + .and_then(|p| p.format.as_ref()) + .and_then(|f| f.duration.as_ref()) + { + duration = dur.parse().unwrap() + } } Self { @@ -103,14 +106,17 @@ impl Media { let probe = MediaProbe::new(self.source.clone()); self.probe = Some(probe.clone()); - if self.duration == 0.0 { - let duration = match probe.format.unwrap().duration { - Some(dur) => dur.parse().unwrap(), - None => 0.0, - }; + if let Some(dur) = probe + .format + .and_then(|f| f.duration) + .map(|d| d.parse().unwrap()) + .filter(|d| !is_close(*d, self.duration, 0.5)) + { + self.duration = dur; - self.out = duration; - self.duration = duration; + if self.out == 0.0 { + self.out = dur; + } } } } @@ -132,25 +138,22 @@ pub struct MediaProbe { impl MediaProbe { fn new(input: String) -> Self { let probe = ffprobe(&input); - let mut a_stream: Vec = vec![]; - let mut v_stream: Vec = vec![]; + let mut a_stream = vec![]; + let mut v_stream = vec![]; match probe { Ok(obj) => { for stream in obj.streams { let cp_stream = stream.clone(); - match cp_stream.codec_type { - Some(codec_type) => { - if codec_type == "audio" { - a_stream.push(stream) - } else if codec_type == "video" { - v_stream.push(stream) - } - } - _ => { - error!("No codec type found for stream: {stream:?}") + if let Some(c_type) = cp_stream.codec_type { + match c_type.as_str() { + "audio" => a_stream.push(stream), + "video" => v_stream.push(stream), + _ => {} } + } else { + error!("No codec type found for stream: {stream:?}") } } @@ -187,15 +190,13 @@ impl MediaProbe { /// /// The status file is init in main function and mostly modified in RPC server. pub fn write_status(config: &GlobalConfig, date: &str, shift: f64) { - let stat_file = config.general.stat_file.clone(); - let data = json!({ "time_shift": shift, "date": date, }); let status_data: String = serde_json::to_string(&data).expect("Serialize status data failed"); - if let Err(e) = fs::write(stat_file, &status_data) { + if let Err(e) = fs::write(&config.general.stat_file, &status_data) { error!("Unable to write file: {e:?}") }; } @@ -234,9 +235,7 @@ pub fn get_date(seek: bool, start: f64, next_start: f64) -> String { /// Get file modification time. pub fn modified_time(path: &str) -> Option> { - let metadata = metadata(path).unwrap(); - - if let Ok(time) = metadata.modified() { + if let Ok(time) = metadata(path).and_then(|metadata| metadata.modified()) { let date_time: DateTime = time.into(); return Some(date_time); } @@ -519,10 +518,11 @@ pub mod mock_time { } pub fn set_mock_time(date_time: &str) { - let date_obj = NaiveDateTime::parse_from_str(date_time, "%Y-%m-%dT%H:%M:%S"); - let time = Local.from_local_datetime(&date_obj.unwrap()).unwrap(); + if let Ok(d) = NaiveDateTime::parse_from_str(date_time, "%Y-%m-%dT%H:%M:%S") { + let time = Local.from_local_datetime(&d).unwrap(); - DATE_TIME_DIFF.with(|cell| *cell.borrow_mut() = Some(Local::now() - time)); + DATE_TIME_DIFF.with(|cell| *cell.borrow_mut() = Some(Local::now() - time)); + } } } From 5f7fa0541b2853b1348d1feb76f93656fe96f6dc Mon Sep 17 00:00:00 2001 From: jb-alvarado Date: Sun, 15 May 2022 19:26:57 +0200 Subject: [PATCH 03/12] run kill_all when go out from loop --- src/output/mod.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/src/output/mod.rs b/src/output/mod.rs index deffb339..7ad54712 100644 --- a/src/output/mod.rs +++ b/src/output/mod.rs @@ -17,8 +17,7 @@ pub use hls::write_hls; use crate::input::{ingest_server, source_generator}; use crate::utils::{ - sec_to_time, stderr_reader, Decoder, Encoder, GlobalConfig, PlayerControl, PlayoutStatus, - ProcessControl, + sec_to_time, stderr_reader, Decoder, GlobalConfig, PlayerControl, PlayoutStatus, ProcessControl, }; use crate::vec_strings; @@ -202,9 +201,7 @@ pub fn player( sleep(Duration::from_secs(1)); - if let Err(e) = proc_control.kill(Encoder) { - error!("{e}") - } + proc_control.kill_all(); if let Err(e) = error_encoder_thread.join() { error!("{e:?}"); From e408a89fd0366e1c8806a1841036e4933ef08c93 Mon Sep 17 00:00:00 2001 From: jb-alvarado Date: Sun, 15 May 2022 19:27:25 +0200 Subject: [PATCH 04/12] add some more infos to log --- src/rpc/mod.rs | 2 ++ src/utils/mod.rs | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index 71c300c7..d062eab1 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -219,6 +219,8 @@ pub fn json_rpc_server( Ok(Value::String("No, or wrong parameters set!".to_string())) }); + info!("Run JSON RPC server, listening on: http://{addr}"); + // build rpc server let server = ServerBuilder::new(io) .cors(DomainsValidation::AllowOnly(vec![ diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 638c2a98..d94131a7 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -313,7 +313,7 @@ pub fn get_delta(config: &GlobalConfig, begin: &f64) -> (f64, f64) { /// Check if clip in playlist is in sync with global time. pub fn check_sync(config: &GlobalConfig, delta: f64) -> bool { if delta.abs() > config.general.stop_threshold && config.general.stop_threshold > 0.0 { - error!("Clip begin out of sync for {} seconds", delta); + error!("Clip begin out of sync for {delta:.3} seconds. Stop playout!"); return false; } From e59d9b1f9e5279b9a44b83850cf179a62d38b0a9 Mon Sep 17 00:00:00 2001 From: jb-alvarado Date: Sun, 15 May 2022 19:27:53 +0200 Subject: [PATCH 05/12] temporary dependency change --- Cargo.lock | 28 ++++++++++++++-------------- Cargo.toml | 6 +++--- assets/ffplayout-engine.service | 2 +- 3 files changed, 18 insertions(+), 18 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ec5dc03e..a0981f13 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -82,13 +82,12 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "chrono" version = "0.4.19" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "670ad68c9088c2a963aaa298cb369688cf3f9465ce5e2d4ca10e6e0098a1ce73" +source = "git+https://github.com/sbrocket/chrono?branch=parse-error-kind-public#edd600cc111162573bb81e1850100205849f957d" dependencies = [ "libc", "num-integer", "num-traits", - "time 0.1.43", + "time 0.1.44", "winapi 0.3.9", ] @@ -196,7 +195,7 @@ dependencies = [ [[package]] name = "ffplayout-engine" -version = "0.9.5" +version = "0.9.6" dependencies = [ "chrono", "clap", @@ -232,7 +231,7 @@ dependencies = [ [[package]] name = "file-rotate" version = "0.6.0" -source = "git+https://github.com/jb-alvarado/file-rotate.git#ee1dc1cea05885b8cb472191b50a044869da7e04" +source = "git+https://github.com/Ploppz/file-rotate.git?branch=timestamp-parse-fix#cb1874a15a7a18de820a57df48d3513e5a4076f4" dependencies = [ "chrono", "flate2", @@ -415,7 +414,7 @@ checksum = "9be70c98951c83b8d2f8f60d7065fa6d5146873094452a1008da8c2f1e4205ad" dependencies = [ "cfg-if 1.0.0", "libc", - "wasi 0.10.2+wasi-snapshot-preview1", + "wasi 0.10.0+wasi-snapshot-preview1", ] [[package]] @@ -960,9 +959,9 @@ dependencies = [ [[package]] name = "os_str_bytes" -version = "6.0.0" +version = "6.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e22443d1643a904602595ba1cd8f7d896afe56d26712531c5ff73a15b2fbf64" +checksum = "029d8d0b2f198229de29dca79676f2738ff952edf3fde542eb8bf94d8c21b435" [[package]] name = "paris" @@ -1277,9 +1276,9 @@ checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" [[package]] name = "syn" -version = "1.0.93" +version = "1.0.94" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "04066589568b72ec65f42d65a1a52436e954b168773148893c020269563decf2" +checksum = "a07e33e919ebcd69113d5be0e4d70c5707004ff45188910106854f38b960df4a" dependencies = [ "proc-macro2", "quote", @@ -1317,11 +1316,12 @@ checksum = "b1141d4d61095b28419e22cb0bbf02755f5e54e0526f97f1e3d1d160e60885fb" [[package]] name = "time" -version = "0.1.43" +version = "0.1.44" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca8a50ef2360fbd1eeb0ecd46795a87a19024eb4b53c5dc916ca1fd95fe62438" +checksum = "6db9e6914ab8b1ae1c260a4ae7a49b6c5611b40328a735b21862567685e73255" dependencies = [ "libc", + "wasi 0.10.0+wasi-snapshot-preview1", "winapi 0.3.9", ] @@ -1497,9 +1497,9 @@ dependencies = [ [[package]] name = "wasi" -version = "0.10.2+wasi-snapshot-preview1" +version = "0.10.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6" +checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f" [[package]] name = "wasi" diff --git a/Cargo.toml b/Cargo.toml index 3290f6b0..e2bbd2d2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,15 +4,15 @@ description = "24/7 playout based on rust and ffmpeg" license = "GPL-3.0" authors = ["Jonathan Baecker jonbae77@gmail.com"] readme = "README.md" -version = "0.9.5" +version = "0.9.6" edition = "2021" [dependencies] -chrono = "0.4" +chrono = { git = "https://github.com/sbrocket/chrono", branch = "parse-error-kind-public" } clap = { version = "3.1", features = ["derive"] } crossbeam-channel = "0.5" ffprobe = "0.3" -file-rotate = { git = "https://github.com/jb-alvarado/file-rotate.git" } +file-rotate = { git = "https://github.com/Ploppz/file-rotate.git", branch = "timestamp-parse-fix" } jsonrpc-http-server = "18.0" lettre = "0.10.0-rc.6" log = "0.4" diff --git a/assets/ffplayout-engine.service b/assets/ffplayout-engine.service index 4deae5d5..a65eb9d8 100644 --- a/assets/ffplayout-engine.service +++ b/assets/ffplayout-engine.service @@ -1,5 +1,5 @@ [Unit] -Description=Rust based 24/7 playout solution +Description=Rust and ffmpeg based playout solution After=network.target remote-fs.target [Service] From 71c971a6e62c5451758abf2a6aeb2a5ed3664407 Mon Sep 17 00:00:00 2001 From: jb-alvarado Date: Sun, 15 May 2022 21:15:32 +0200 Subject: [PATCH 06/12] change log level to warning --- src/input/playlist.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/input/playlist.rs b/src/input/playlist.rs index 983d5d46..be49643b 100644 --- a/src/input/playlist.rs +++ b/src/input/playlist.rs @@ -511,7 +511,7 @@ fn handle_list_end(mut node: Media, total_delta: f64) -> Media { return node; } else { - error!("Playlist is not long enough: {total_delta:.2} seconds needed"); + warn!("Playlist is not long enough: {total_delta:.2} seconds needed"); } node.process = Some(true); From 98cb1d875c0cbd7100ee1622b0b526886b522188 Mon Sep 17 00:00:00 2001 From: jb-alvarado Date: Mon, 16 May 2022 10:05:38 +0200 Subject: [PATCH 07/12] send mails on program exit --- src/main.rs | 12 +++++++++--- src/tests/mod.rs | 7 +++++-- src/utils/logging.rs | 8 +++++--- src/utils/mod.rs | 2 +- 4 files changed, 20 insertions(+), 9 deletions(-) diff --git a/src/main.rs b/src/main.rs index 8ec7937a..13f3ea5f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,6 +5,7 @@ use std::{ fs::{self, File}, path::PathBuf, process::exit, + sync::{Arc, Mutex}, thread, }; @@ -23,8 +24,8 @@ mod utils; use crate::output::{player, write_hls}; use crate::utils::{ - generate_playlist, init_logging, validate_ffmpeg, GlobalConfig, PlayerControl, PlayoutStatus, - ProcessControl, + generate_playlist, init_logging, send_mail, validate_ffmpeg, GlobalConfig, PlayerControl, + PlayoutStatus, ProcessControl, }; use rpc::json_rpc_server; @@ -70,8 +71,9 @@ fn main() { let play_control = PlayerControl::new(); let playout_stat = PlayoutStatus::new(); let proc_control = ProcessControl::new(); + let messages = Arc::new(Mutex::new(Vec::new())); - let logging = init_logging(&config); + let logging = init_logging(&config, messages.clone()); CombinedLogger::init(logging).unwrap(); validate_ffmpeg(&config); @@ -101,5 +103,9 @@ fn main() { player(&config, play_control, playout_stat, proc_control); } + if messages.lock().unwrap().len() > 0 { + send_mail(&config, messages.lock().unwrap().join("\n")); + } + info!("Playout done..."); } diff --git a/src/tests/mod.rs b/src/tests/mod.rs index 49e87121..2d0999cc 100644 --- a/src/tests/mod.rs +++ b/src/tests/mod.rs @@ -1,4 +1,5 @@ use std::{ + sync::{Arc, Mutex}, thread::{self, sleep}, time::Duration, }; @@ -27,13 +28,14 @@ fn playlist_change_at_midnight() { config.playlist.day_start = "00:00:00".into(); config.playlist.length = "24:00:00".into(); config.logging.log_to_file = false; + let messages = Arc::new(Mutex::new(Vec::new())); let play_control = PlayerControl::new(); let playout_stat = PlayoutStatus::new(); let proc_control = ProcessControl::new(); let proc_ctl = proc_control.clone(); - let logging = init_logging(&config); + let logging = init_logging(&config, messages); CombinedLogger::init(logging).unwrap(); mock_time::set_mock_time("2022-05-09T23:59:45"); @@ -52,13 +54,14 @@ fn playlist_change_at_six() { config.playlist.day_start = "06:00:00".into(); config.playlist.length = "24:00:00".into(); config.logging.log_to_file = false; + let messages = Arc::new(Mutex::new(Vec::new())); let play_control = PlayerControl::new(); let playout_stat = PlayoutStatus::new(); let proc_control = ProcessControl::new(); let proc_ctl = proc_control.clone(); - let logging = init_logging(&config); + let logging = init_logging(&config, messages); CombinedLogger::init(logging).unwrap(); mock_time::set_mock_time("2022-05-09T05:59:45"); diff --git a/src/utils/logging.rs b/src/utils/logging.rs index 19f239c6..857a0fdd 100644 --- a/src/utils/logging.rs +++ b/src/utils/logging.rs @@ -25,7 +25,7 @@ use simplelog::*; use crate::utils::GlobalConfig; /// send log messages to mail recipient -fn send_mail(cfg: &GlobalConfig, msg: String) { +pub fn send_mail(cfg: &GlobalConfig, msg: String) { if let Ok(email) = Message::builder() .from(cfg.mail.sender_addr.parse().unwrap()) .to(cfg.mail.recipient.parse().unwrap()) @@ -138,7 +138,10 @@ fn clean_string(text: &str) -> String { /// - console logger /// - file logger /// - mail logger -pub fn init_logging(config: &GlobalConfig) -> Vec> { +pub fn init_logging( + config: &GlobalConfig, + messages: Arc>>, +) -> Vec> { let config_clone = config.clone(); let app_config = config.logging.clone(); let mut time_level = LevelFilter::Off; @@ -216,7 +219,6 @@ pub fn init_logging(config: &GlobalConfig) -> Vec> { // set mail logger only the recipient is set in config if config.mail.recipient.contains('@') && config.mail.recipient.contains('.') { - let messages: Arc>> = Arc::new(Mutex::new(Vec::new())); let messages_clone = messages.clone(); let interval = config.mail.interval; diff --git a/src/utils/mod.rs b/src/utils/mod.rs index d94131a7..ea007ad8 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -27,7 +27,7 @@ pub use controller::{PlayerControl, PlayoutStatus, ProcessControl, ProcessUnit:: pub use generator::generate_playlist; pub use json_serializer::{read_json, Playlist, DUMMY_LEN}; pub use json_validate::validate_playlist; -pub use logging::init_logging; +pub use logging::{init_logging, send_mail}; use crate::filter::filter_chains; From 48e06665ef054846f9e8c75d731706d856744526 Mon Sep 17 00:00:00 2001 From: jb-alvarado Date: Tue, 17 May 2022 11:31:32 +0200 Subject: [PATCH 08/12] add message to mail queue only it differs to old one --- Cargo.lock | 40 ++++++++++++++++++++-------------------- src/utils/logging.rs | 12 ++++++++++-- 2 files changed, 30 insertions(+), 22 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a0981f13..bc423089 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -580,9 +580,9 @@ dependencies = [ [[package]] name = "itoa" -version = "1.0.1" +version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1aab8fc367588b89dcee83ab0fd66b72b50b72fa1904d7095045ace2b0c81c35" +checksum = "112c678d4050afce233f4f2852bb2eb519230b3cf12f33585275537d7e41578d" [[package]] name = "jsonrpc-core" @@ -678,9 +678,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.125" +version = "0.2.126" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5916d2ae698f6de9bfb891ad7a8d65c09d232dc58cc4ac433c7da3b2fd84bc2b" +checksum = "349d5a591cd28b49e1d1037471617a32ddcda5731b99419008085f72d5a53836" [[package]] name = "linked-hash-map" @@ -1044,11 +1044,11 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.38" +version = "1.0.39" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9027b48e9d4c9175fa2218adf3557f91c1137021739951d4932f5f8268ac48aa" +checksum = "c54b25569025b7fc9651de43004ae593a75ad88543b17178aa5e1b9c4f15f56f" dependencies = [ - "unicode-xid", + "unicode-ident", ] [[package]] @@ -1133,9 +1133,9 @@ dependencies = [ [[package]] name = "ryu" -version = "1.0.9" +version = "1.0.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "73b4b750c782965c211b42f022f59af1fbceabdd026623714f104152f1ec149f" +checksum = "f3f6f92acf49d1b98f7a81226834412ada05458b7364277387724a237f062695" [[package]] name = "same-file" @@ -1276,13 +1276,13 @@ checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" [[package]] name = "syn" -version = "1.0.94" +version = "1.0.95" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a07e33e919ebcd69113d5be0e4d70c5707004ff45188910106854f38b960df4a" +checksum = "fbaf6116ab8924f39d52792136fb74fd60a80194cf1b1c6ffa6453eef1c3f942" dependencies = [ "proc-macro2", "quote", - "unicode-xid", + "unicode-ident", ] [[package]] @@ -1388,9 +1388,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.6.9" +version = "0.6.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e99e1983e5d376cd8eb4b66604d2e99e79f5bd988c3055891dcd8c9e2604cc0" +checksum = "36943ee01a6d67977dd3f84a5a1d2efeb4ada3a1ae771cadfaa535d9d9fc6507" dependencies = [ "bytes", "futures-core", @@ -1447,6 +1447,12 @@ version = "0.3.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "099b7128301d285f79ddd55b9a83d5e6b9e97c92e0ea0daebee7263e932de992" +[[package]] +name = "unicode-ident" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d22af068fba1eb5edcb4aea19d382b2a3deb4c8f9d475c589b6ada9e0fd493ee" + [[package]] name = "unicode-normalization" version = "0.1.19" @@ -1456,12 +1462,6 @@ dependencies = [ "tinyvec", ] -[[package]] -name = "unicode-xid" -version = "0.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "957e51f3646910546462e67d5f7599b9e4fb8acdd304b087a6494730f9eebf04" - [[package]] name = "vcpkg" version = "0.2.15" diff --git a/src/utils/logging.rs b/src/utils/logging.rs index 857a0fdd..b8fa2339 100644 --- a/src/utils/logging.rs +++ b/src/utils/logging.rs @@ -74,6 +74,7 @@ pub struct LogMailer { level: LevelFilter, pub config: Config, messages: Arc>>, + last_message: Arc>, } impl LogMailer { @@ -86,6 +87,7 @@ impl LogMailer { level: log_level, config, messages, + last_message: Arc::new(Mutex::new(String::new())), }) } } @@ -101,9 +103,15 @@ impl Log for LogMailer { let time_stamp = local.format("[%Y-%m-%d %H:%M:%S%.3f]"); let level = record.level().to_string().to_uppercase(); let rec = record.args().to_string(); - let full_line: String = format!("{time_stamp} [{level: >5}] {rec}"); - self.messages.lock().unwrap().push(full_line); + // put message only to mail queue when it differs from last message + // this we do to prevent spamming the mail box + if *self.last_message.lock().unwrap() != rec { + *self.last_message.lock().unwrap() = rec.clone(); + let full_line: String = format!("{time_stamp} [{level: >5}] {rec}"); + + self.messages.lock().unwrap().push(full_line); + } } } From fb85b84e617b42bec307a6b79dc4fd8c7638dead Mon Sep 17 00:00:00 2001 From: jb-alvarado Date: Tue, 17 May 2022 21:29:20 +0200 Subject: [PATCH 09/12] add readme --- docs/README.md | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) create mode 100644 docs/README.md diff --git a/docs/README.md b/docs/README.md new file mode 100644 index 00000000..272d85c6 --- /dev/null +++ b/docs/README.md @@ -0,0 +1,26 @@ +**ffplayout-engine Documentation** +================ + +### **[For Developer](/docs/developer.md)** + +Learn how to setup a developer environment and to cross compile for different platforms. + +### **[Folder Mode](/docs/folder_mode.md)** + +Learn more about playing the content of a folder. + +### **[Live Ingest](/docs/live_ingest.md)** + +Using live ingest to inject a live stream. + +### **[Output Modes](/docs/output.md)** + +The different output modes. + +### **[Preview Stream](/docs/preview_stream.md)** + +Setup and use a preview stream. + +### **[Remove Sources](/docs/remote_source.md)** + +Use of remote sources, like https://example.org/video.mp4 From 536cceca78c9cc1798b3c20d0d3b84a52025ce56 Mon Sep 17 00:00:00 2001 From: jb-alvarado Date: Tue, 17 May 2022 21:37:05 +0200 Subject: [PATCH 10/12] lock only ones --- src/utils/logging.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/utils/logging.rs b/src/utils/logging.rs index b8fa2339..af72aa22 100644 --- a/src/utils/logging.rs +++ b/src/utils/logging.rs @@ -103,11 +103,12 @@ impl Log for LogMailer { let time_stamp = local.format("[%Y-%m-%d %H:%M:%S%.3f]"); let level = record.level().to_string().to_uppercase(); let rec = record.args().to_string(); + let mut last_msg = self.last_message.lock().unwrap(); // put message only to mail queue when it differs from last message // this we do to prevent spamming the mail box - if *self.last_message.lock().unwrap() != rec { - *self.last_message.lock().unwrap() = rec.clone(); + if *last_msg != rec { + *last_msg = rec.clone(); let full_line: String = format!("{time_stamp} [{level: >5}] {rec}"); self.messages.lock().unwrap().push(full_line); From d02cf98c607eeb83855c8b9213e71712864410dd Mon Sep 17 00:00:00 2001 From: jb-alvarado Date: Tue, 17 May 2022 21:45:35 +0200 Subject: [PATCH 11/12] lock only ones --- src/input/folder.rs | 15 ++++++++------- src/main.rs | 6 ++++-- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/src/input/folder.rs b/src/input/folder.rs index 3172922a..00ee60a4 100644 --- a/src/input/folder.rs +++ b/src/input/folder.rs @@ -93,20 +93,21 @@ impl FolderSource { fn shuffle(&mut self) { let mut rng = thread_rng(); - self.nodes.lock().unwrap().shuffle(&mut rng); + let mut nodes = self.nodes.lock().unwrap(); - for (index, item) in self.nodes.lock().unwrap().iter_mut().enumerate() { + nodes.shuffle(&mut rng); + + for (index, item) in nodes.iter_mut().enumerate() { item.index = Some(index); } } fn sort(&mut self) { - self.nodes - .lock() - .unwrap() - .sort_by(|d1, d2| d1.source.cmp(&d2.source)); + let mut nodes = self.nodes.lock().unwrap(); - for (index, item) in self.nodes.lock().unwrap().iter_mut().enumerate() { + nodes.sort_by(|d1, d2| d1.source.cmp(&d2.source)); + + for (index, item) in nodes.iter_mut().enumerate() { item.index = Some(index); } } diff --git a/src/main.rs b/src/main.rs index 13f3ea5f..1b4450ef 100644 --- a/src/main.rs +++ b/src/main.rs @@ -103,8 +103,10 @@ fn main() { player(&config, play_control, playout_stat, proc_control); } - if messages.lock().unwrap().len() > 0 { - send_mail(&config, messages.lock().unwrap().join("\n")); + let msg = messages.lock().unwrap(); + + if msg.len() > 0 { + send_mail(&config, msg.join("\n")); } info!("Playout done..."); From 3b214d42fe258f7dd84b5e7569f566e3f2d43222 Mon Sep 17 00:00:00 2001 From: jb-alvarado Date: Wed, 18 May 2022 11:17:08 +0200 Subject: [PATCH 12/12] remove some locks, replace loop in threads with while condition --- Cargo.lock | 6 +++--- src/input/folder.rs | 10 +++++++--- src/input/mod.rs | 2 +- src/input/playlist.rs | 8 ++++---- src/main.rs | 3 ++- src/rpc/mod.rs | 18 +++++++++--------- src/tests/mod.rs | 10 ++++++---- src/utils/logging.rs | 16 +++++++++++----- 8 files changed, 43 insertions(+), 30 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bc423089..6729f8ee 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1148,12 +1148,12 @@ dependencies = [ [[package]] name = "schannel" -version = "0.1.19" +version = "0.1.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f05ba609c234e60bee0d547fe94a4c7e9da733d1c962cf6e59efa4cd9c8bc75" +checksum = "88d6731146462ea25d9244b2ed5fd1d716d25c52e4d54aa4fb0f3c4e9854dbe2" dependencies = [ "lazy_static", - "winapi 0.3.9", + "windows-sys", ] [[package]] diff --git a/src/input/folder.rs b/src/input/folder.rs index 00ee60a4..9c9cf69c 100644 --- a/src/input/folder.rs +++ b/src/input/folder.rs @@ -3,7 +3,7 @@ use std::{ path::Path, process::exit, sync::{ - atomic::{AtomicUsize, Ordering}, + atomic::{AtomicBool, AtomicUsize, Ordering}, mpsc::channel, {Arc, Mutex}, }, @@ -162,7 +162,11 @@ fn file_extension(filename: &Path) -> Option<&str> { /// Create a watcher, which monitor file changes. /// When a change is register, update the current file list. /// This makes it possible, to play infinitely and and always new files to it. -pub fn watchman(config: GlobalConfig, sources: Arc>>) { +pub fn watchman( + config: GlobalConfig, + is_terminated: Arc, + sources: Arc>>, +) { let (tx, rx) = channel(); let path = config.storage.path; @@ -175,7 +179,7 @@ pub fn watchman(config: GlobalConfig, sources: Arc>>) { let mut watcher = watcher(tx, Duration::from_secs(1)).unwrap(); watcher.watch(path, RecursiveMode::Recursive).unwrap(); - loop { + while !is_terminated.load(Ordering::SeqCst) { if let Ok(res) = rx.try_recv() { match res { Create(new_path) => { diff --git a/src/input/mod.rs b/src/input/mod.rs index 394a875a..a3dff4e5 100644 --- a/src/input/mod.rs +++ b/src/input/mod.rs @@ -40,7 +40,7 @@ pub fn source_generator( let node_clone = folder_source.nodes.clone(); // Spawn a thread to monitor folder for file changes. - thread::spawn(move || watchman(config_clone, node_clone)); + thread::spawn(move || watchman(config_clone, is_terminated.clone(), node_clone)); Box::new(folder_source) as Box> } diff --git a/src/input/playlist.rs b/src/input/playlist.rs index be49643b..d4efd1a3 100644 --- a/src/input/playlist.rs +++ b/src/input/playlist.rs @@ -209,14 +209,14 @@ impl CurrentProgram { // On init or reload we need to seek for the current clip. fn get_current_clip(&mut self) { let mut time_sec = self.get_current_time(); + let shift = self.playout_stat.time_shift.lock().unwrap(); if *self.playout_stat.current_date.lock().unwrap() == *self.playout_stat.date.lock().unwrap() - && *self.playout_stat.time_shift.lock().unwrap() != 0.0 + && *shift != 0.0 { - let shift = *self.playout_stat.time_shift.lock().unwrap(); - info!("Shift playlist start for {shift} seconds"); - time_sec += shift; + info!("Shift playlist start for {} seconds", *shift); + time_sec += *shift; } for (i, item) in self.nodes.lock().unwrap().iter_mut().enumerate() { diff --git a/src/main.rs b/src/main.rs index 1b4450ef..42973747 100644 --- a/src/main.rs +++ b/src/main.rs @@ -71,9 +71,10 @@ fn main() { let play_control = PlayerControl::new(); let playout_stat = PlayoutStatus::new(); let proc_control = ProcessControl::new(); + let proc_ctl = proc_control.clone(); let messages = Arc::new(Mutex::new(Vec::new())); - let logging = init_logging(&config, messages.clone()); + let logging = init_logging(&config, proc_ctl, messages.clone()); CombinedLogger::init(logging).unwrap(); validate_ffmpeg(&config); diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index d062eab1..fadb1834 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -71,12 +71,13 @@ pub fn json_rpc_server( let mut time_shift = playout_stat.time_shift.lock().unwrap(); let current_date = playout_stat.current_date.lock().unwrap().clone(); let mut date = playout_stat.date.lock().unwrap(); + let current_list = play_control.current_list.lock().unwrap(); // get next clip if map.contains_key("control") && &map["control"] == "next" { let index = play_control.index.load(Ordering::SeqCst); - if index < play_control.current_list.lock().unwrap().len() { + if index < current_list.len() { if let Some(proc) = proc.decoder_term.lock().unwrap().as_mut() { if let Err(e) = proc.kill() { error!("Decoder {e:?}") @@ -89,7 +90,7 @@ pub fn json_rpc_server( info!("Move to next clip"); let mut data_map = Map::new(); - let mut media = play_control.current_list.lock().unwrap()[index].clone(); + let mut media = current_list[index].clone(); media.add_probe(); let (delta, _) = get_delta(&config, &media.begin.unwrap_or(0.0)); @@ -114,7 +115,7 @@ pub fn json_rpc_server( if map.contains_key("control") && &map["control"] == "back" { let index = play_control.index.load(Ordering::SeqCst); - if index > 1 && play_control.current_list.lock().unwrap().len() > 1 { + if index > 1 && current_list.len() > 1 { if let Some(proc) = proc.decoder_term.lock().unwrap().as_mut() { if let Err(e) = proc.kill() { error!("Decoder {e:?}") @@ -126,8 +127,7 @@ pub fn json_rpc_server( info!("Move to last clip"); let mut data_map = Map::new(); - let mut media = - play_control.current_list.lock().unwrap()[index - 2].clone(); + let mut media = current_list[index - 2].clone(); play_control.index.fetch_sub(2, Ordering::SeqCst); media.add_probe(); @@ -189,8 +189,8 @@ pub fn json_rpc_server( if map.contains_key("media") && &map["media"] == "next" { let index = play_control.index.load(Ordering::SeqCst); - if index < play_control.current_list.lock().unwrap().len() { - let media = play_control.current_list.lock().unwrap()[index].clone(); + if index < current_list.len() { + let media = current_list[index].clone(); let data_map = get_data_map(&config, media); @@ -204,8 +204,8 @@ pub fn json_rpc_server( if map.contains_key("media") && &map["media"] == "last" { let index = play_control.index.load(Ordering::SeqCst); - if index > 1 && index - 2 < play_control.current_list.lock().unwrap().len() { - let media = play_control.current_list.lock().unwrap()[index - 2].clone(); + if index > 1 && index - 2 < current_list.len() { + let media = current_list[index - 2].clone(); let data_map = get_data_map(&config, media); diff --git a/src/tests/mod.rs b/src/tests/mod.rs index 2d0999cc..d12fd964 100644 --- a/src/tests/mod.rs +++ b/src/tests/mod.rs @@ -34,13 +34,14 @@ fn playlist_change_at_midnight() { let playout_stat = PlayoutStatus::new(); let proc_control = ProcessControl::new(); let proc_ctl = proc_control.clone(); + let proc_ctl2 = proc_control.clone(); - let logging = init_logging(&config, messages); + let logging = init_logging(&config, proc_ctl, messages); CombinedLogger::init(logging).unwrap(); mock_time::set_mock_time("2022-05-09T23:59:45"); - thread::spawn(move || timed_kill(30, proc_ctl)); + thread::spawn(move || timed_kill(30, proc_ctl2)); player(&config, play_control, playout_stat, proc_control); } @@ -60,13 +61,14 @@ fn playlist_change_at_six() { let playout_stat = PlayoutStatus::new(); let proc_control = ProcessControl::new(); let proc_ctl = proc_control.clone(); + let proc_ctl2 = proc_control.clone(); - let logging = init_logging(&config, messages); + let logging = init_logging(&config, proc_ctl, messages); CombinedLogger::init(logging).unwrap(); mock_time::set_mock_time("2022-05-09T05:59:45"); - thread::spawn(move || timed_kill(30, proc_ctl)); + thread::spawn(move || timed_kill(30, proc_ctl2)); player(&config, play_control, playout_stat, proc_control); } diff --git a/src/utils/logging.rs b/src/utils/logging.rs index af72aa22..56ab8f47 100644 --- a/src/utils/logging.rs +++ b/src/utils/logging.rs @@ -3,7 +3,7 @@ extern crate simplelog; use std::{ path::Path, - sync::{Arc, Mutex}, + sync::{atomic::Ordering, Arc, Mutex}, thread::{self, sleep}, time::Duration, }; @@ -22,7 +22,7 @@ use log::{Level, LevelFilter, Log, Metadata, Record}; use regex::Regex; use simplelog::*; -use crate::utils::GlobalConfig; +use crate::utils::{GlobalConfig, ProcessControl}; /// send log messages to mail recipient pub fn send_mail(cfg: &GlobalConfig, msg: String) { @@ -56,8 +56,13 @@ pub fn send_mail(cfg: &GlobalConfig, msg: String) { /// Basic Mail Queue /// /// Check every give seconds for messages and send them. -fn mail_queue(cfg: GlobalConfig, messages: Arc>>, interval: u64) { - loop { +fn mail_queue( + cfg: GlobalConfig, + proc_ctl: ProcessControl, + messages: Arc>>, + interval: u64, +) { + while !proc_ctl.is_terminated.load(Ordering::SeqCst) { if messages.lock().unwrap().len() > 0 { let msg = messages.lock().unwrap().join("\n"); send_mail(&cfg, msg); @@ -149,6 +154,7 @@ fn clean_string(text: &str) -> String { /// - mail logger pub fn init_logging( config: &GlobalConfig, + proc_ctl: ProcessControl, messages: Arc>>, ) -> Vec> { let config_clone = config.clone(); @@ -231,7 +237,7 @@ pub fn init_logging( let messages_clone = messages.clone(); let interval = config.mail.interval; - thread::spawn(move || mail_queue(config_clone, messages_clone, interval)); + thread::spawn(move || mail_queue(config_clone, proc_ctl, messages_clone, interval)); let mail_config = log_config.build();