diff --git a/src/main.rs b/src/main.rs index 1d495df9..17ade671 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,14 +6,20 @@ mod output; mod utils; use simplelog::*; +use tokio::runtime::Runtime; use crate::output::play; use crate::utils::{init_config, init_logging}; fn main() { init_config(); - let logging = init_logging(); + + let runtime = Runtime::new().unwrap(); + let rt_handle = runtime.handle(); + + + let logging = init_logging(rt_handle.clone()); CombinedLogger::init(logging).unwrap(); - play(); + play(rt_handle); } diff --git a/src/output/mod.rs b/src/output/mod.rs index 5ad828ff..291b4b85 100644 --- a/src/output/mod.rs +++ b/src/output/mod.rs @@ -10,7 +10,7 @@ use std::{ }; use simplelog::*; -use tokio::runtime::Builder; +use tokio::runtime::Handle; mod desktop; mod stream; @@ -19,21 +19,10 @@ use crate::utils::{ sec_to_time, stderr_reader, watch_folder, CurrentProgram, GlobalConfig, Media, Source, }; -pub fn play() { +pub fn play(rt_handle: &Handle) { let config = GlobalConfig::global(); let dec_pid: Arc> = Arc::new(Mutex::new(0)); - let mut thread_count = 2; - - if config.processing.mode.as_str() == "folder" { - thread_count += 1; - } - - let runtime = Builder::new_multi_thread() - .worker_threads(thread_count) - .enable_all() - .build() - .unwrap(); let get_source = match config.processing.clone().mode.as_str() { "folder" => { @@ -55,13 +44,13 @@ pub fn play() { debug!("Monitor folder: {}", path); - runtime.spawn(watch_folder(receiver, Arc::clone(&folder_source.nodes))); + rt_handle.spawn(watch_folder(receiver, Arc::clone(&folder_source.nodes))); Box::new(folder_source) as Box> } "playlist" => { info!("Playout in playlist mode"); - Box::new(CurrentProgram::new()) as Box> + Box::new(CurrentProgram::new(rt_handle.clone())) as Box> } _ => { error!("Process Mode not exists!"); @@ -70,7 +59,7 @@ pub fn play() { }; let dec_settings = config.processing.clone().settings.unwrap(); - let ff_log_format = format!("level+{}", config.logging.ffmpeg_level); + let ff_log_format = format!("level+{}", config.logging.ffmpeg_level.to_lowercase()); let mut enc_proc = match config.out.mode.as_str() { "desktop" => desktop::output(ff_log_format.clone()), @@ -78,7 +67,7 @@ pub fn play() { _ => panic!("Output mode doesn't exists!"), }; - runtime.spawn(stderr_reader( + rt_handle.spawn(stderr_reader( enc_proc.stderr.take().unwrap(), "Encoder".to_string(), )); @@ -134,7 +123,7 @@ pub fn play() { // debug!("Decoder PID: {}", dec_pid.lock().unwrap()); - runtime.spawn(stderr_reader( + rt_handle.spawn(stderr_reader( dec_proc.stderr.take().unwrap(), "Decoder".to_string(), )); diff --git a/src/utils/json_reader.rs b/src/utils/json_reader.rs index 99362f1b..59501c12 100644 --- a/src/utils/json_reader.rs +++ b/src/utils/json_reader.rs @@ -2,12 +2,13 @@ use serde::{Deserialize, Serialize}; use std::{fs::File, path::Path}; use simplelog::*; +use tokio::runtime::Handle; -use crate::utils::{get_date, modified_time, time_to_sec, GlobalConfig, Media}; +use crate::utils::{get_date, modified_time, validate_playlist, GlobalConfig, Media}; pub const DUMMY_LEN: f64 = 20.0; -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, Clone)] pub struct Playlist { pub date: String, pub start_sec: Option, @@ -32,12 +33,11 @@ impl Playlist { } } -pub fn read_json(seek: bool, next_start: f64) -> Playlist { +pub fn read_json(rt_handle: Handle, seek: bool, next_start: f64) -> Playlist { let config = GlobalConfig::global(); let mut playlist_path = Path::new(&config.playlist.path).to_owned(); - let start = &config.playlist.day_start; - let mut start_sec = time_to_sec(start); + let mut start_sec = config.playlist.start_sec.unwrap(); let date = get_date(seek, start_sec, next_start); if playlist_path.is_dir() { @@ -82,5 +82,7 @@ pub fn read_json(seek: bool, next_start: f64) -> Playlist { start_sec += item.out - item.seek; } + rt_handle.spawn(validate_playlist(playlist.clone(), config.clone())); + playlist } diff --git a/src/utils/json_validate.rs b/src/utils/json_validate.rs new file mode 100644 index 00000000..8c440b9c --- /dev/null +++ b/src/utils/json_validate.rs @@ -0,0 +1,45 @@ +use std::path::Path; + +use simplelog::*; + +use crate::utils::{sec_to_time, GlobalConfig, MediaProbe, Playlist}; + +pub async fn validate_playlist(playlist: Playlist, config: GlobalConfig) -> Result<(), String> { + let _count = 0; + let date = playlist.date; + let length = config.playlist.length_sec.unwrap(); + let mut start_sec = 0.0; + + debug!("validate playlist from: {date}"); + + for item in playlist.program.iter() { + if Path::new(&item.source).is_file() { + let probe = MediaProbe::new(item.source.clone()); + + if probe.format.is_none() { + error!( + "No Metadata from file {} at {}", + sec_to_time(start_sec), + item.source + ); + } + } else { + error!( + "File on time {} not exists: {}", + sec_to_time(start_sec), + item.source + ); + } + + start_sec += item.out - item.seek; + } + + if length > start_sec { + error!( + "Playlist from {date} not long enough, {} needed!", + sec_to_time(length - start_sec), + ); + } + + Ok(()) +} diff --git a/src/utils/logging.rs b/src/utils/logging.rs index 84c1a1f4..ca3f0e96 100644 --- a/src/utils/logging.rs +++ b/src/utils/logging.rs @@ -8,22 +8,22 @@ use file_rotate::{compression::Compression, suffix::AppendCount, ContentLimit, F use lettre::{transport::smtp::authentication::Credentials, Message, SmtpTransport, Transport}; use log::{Level, LevelFilter, Log, Metadata, Record}; use simplelog::*; -use tokio::runtime::Runtime; +use tokio::runtime::Handle; use crate::utils::GlobalConfig; pub struct LogMailer { level: LevelFilter, config: Config, - runtime: Runtime, + handle: Handle, } impl LogMailer { - pub fn new(log_level: LevelFilter, config: Config, runtime: Runtime) -> Box { + pub fn new(log_level: LevelFilter, config: Config, handle: Handle) -> Box { Box::new(LogMailer { level: log_level, config, - runtime, + handle, }) } } @@ -37,10 +37,10 @@ impl Log for LogMailer { if self.enabled(record.metadata()) { match record.level() { Level::Error => { - self.runtime.spawn(send_mail(record.args().to_string())); + self.handle.spawn(send_mail(record.args().to_string())); }, Level::Warn => { - self.runtime.spawn(send_mail(record.args().to_string())); + self.handle.spawn(send_mail(record.args().to_string())); }, _ => (), } @@ -100,7 +100,7 @@ async fn send_mail(msg: String) { } } -pub fn init_logging() -> Vec> { +pub fn init_logging(rt_handle: Handle) -> Vec> { let config = GlobalConfig::global(); let app_config = config.logging.clone(); let mut app_logger: Vec> = vec![]; @@ -160,7 +160,6 @@ pub fn init_logging() -> Vec> { if config.mail.recipient.len() > 3 { let mut filter = LevelFilter::Error; - let runtime = Runtime::new().unwrap(); let mail_config = log_config .clone() @@ -171,7 +170,7 @@ pub fn init_logging() -> Vec> { filter = LevelFilter::Warn } - app_logger.push(LogMailer::new(filter, mail_config, runtime)); + app_logger.push(LogMailer::new(filter, mail_config, rt_handle)); } app_logger diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 254b59bb..ed3b4360 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -17,13 +17,15 @@ mod arg_parse; mod config; mod folder; mod json_reader; +mod json_validate; mod logging; mod playlist; pub use arg_parse::get_args; pub use config::{init_config, GlobalConfig}; pub use folder::{watch_folder, Source}; -pub use json_reader::{read_json, DUMMY_LEN}; +pub use json_reader::{read_json, DUMMY_LEN, Playlist}; +pub use json_validate::validate_playlist; pub use logging::init_logging; pub use playlist::CurrentProgram; diff --git a/src/utils/playlist.rs b/src/utils/playlist.rs index 8ed16547..2e3b8295 100644 --- a/src/utils/playlist.rs +++ b/src/utils/playlist.rs @@ -1,6 +1,7 @@ use std::path::Path; use simplelog::*; +use tokio::runtime::Handle; use crate::utils::{ check_sync, gen_dummy, get_delta, get_sec, is_close, json_reader::read_json, modified_time, @@ -17,12 +18,13 @@ pub struct CurrentProgram { current_node: Media, init: bool, index: usize, + rt_handle: Handle, } impl CurrentProgram { - pub fn new() -> Self { + pub fn new(rt_handle: Handle) -> Self { let config = GlobalConfig::global(); - let json = read_json(true, 0.0); + let json = read_json(rt_handle.clone(), true, 0.0); Self { config: config.clone(), @@ -33,12 +35,13 @@ impl CurrentProgram { current_node: Media::new(0, "".to_string()), init: true, index: 0, + rt_handle, } } fn check_update(&mut self, seek: bool) { if self.json_path.is_none() { - let json = read_json(seek, 0.0); + let json = read_json(self.rt_handle.clone(), seek, 0.0); self.json_path = json.current_file; self.json_mod = json.modified; @@ -52,7 +55,7 @@ impl CurrentProgram { .eq(&self.json_mod.clone().unwrap()) { // when playlist has changed, reload it - let json = read_json(false, 0.0); + let json = read_json(self.rt_handle.clone(), false, 0.0); self.json_mod = json.modified; self.nodes = json.program; @@ -92,7 +95,7 @@ impl CurrentProgram { || is_close(total_delta, 0.0, 2.0) || is_close(total_delta, target_length, 2.0) { - let json = read_json(false, next_start); + let json = read_json(self.rt_handle.clone(), false, next_start); self.json_path = json.current_file.clone(); self.json_mod = json.modified;