use tokio runtime gobally, add validation thread

This commit is contained in:
jb-alvarado 2022-03-16 20:12:12 +01:00
parent 555978aa12
commit 6644271baf
7 changed files with 86 additions and 40 deletions

View File

@ -6,14 +6,20 @@ mod output;
mod utils; mod utils;
use simplelog::*; use simplelog::*;
use tokio::runtime::Runtime;
use crate::output::play; use crate::output::play;
use crate::utils::{init_config, init_logging}; use crate::utils::{init_config, init_logging};
fn main() { fn main() {
init_config(); init_config();
let logging = init_logging();
let runtime = Runtime::new().unwrap();
let rt_handle = runtime.handle();
let logging = init_logging(rt_handle.clone());
CombinedLogger::init(logging).unwrap(); CombinedLogger::init(logging).unwrap();
play(); play(rt_handle);
} }

View File

@ -10,7 +10,7 @@ use std::{
}; };
use simplelog::*; use simplelog::*;
use tokio::runtime::Builder; use tokio::runtime::Handle;
mod desktop; mod desktop;
mod stream; mod stream;
@ -19,21 +19,10 @@ use crate::utils::{
sec_to_time, stderr_reader, watch_folder, CurrentProgram, GlobalConfig, Media, Source, sec_to_time, stderr_reader, watch_folder, CurrentProgram, GlobalConfig, Media, Source,
}; };
pub fn play() { pub fn play(rt_handle: &Handle) {
let config = GlobalConfig::global(); let config = GlobalConfig::global();
let dec_pid: Arc<Mutex<u32>> = Arc::new(Mutex::new(0)); let dec_pid: Arc<Mutex<u32>> = Arc::new(Mutex::new(0));
let mut thread_count = 2;
if config.processing.mode.as_str() == "folder" {
thread_count += 1;
}
let runtime = Builder::new_multi_thread()
.worker_threads(thread_count)
.enable_all()
.build()
.unwrap();
let get_source = match config.processing.clone().mode.as_str() { let get_source = match config.processing.clone().mode.as_str() {
"folder" => { "folder" => {
@ -55,13 +44,13 @@ pub fn play() {
debug!("Monitor folder: <b><magenta>{}</></b>", path); debug!("Monitor folder: <b><magenta>{}</></b>", path);
runtime.spawn(watch_folder(receiver, Arc::clone(&folder_source.nodes))); rt_handle.spawn(watch_folder(receiver, Arc::clone(&folder_source.nodes)));
Box::new(folder_source) as Box<dyn Iterator<Item = Media>> Box::new(folder_source) as Box<dyn Iterator<Item = Media>>
} }
"playlist" => { "playlist" => {
info!("Playout in playlist mode"); info!("Playout in playlist mode");
Box::new(CurrentProgram::new()) as Box<dyn Iterator<Item = Media>> Box::new(CurrentProgram::new(rt_handle.clone())) as Box<dyn Iterator<Item = Media>>
} }
_ => { _ => {
error!("Process Mode not exists!"); error!("Process Mode not exists!");
@ -70,7 +59,7 @@ pub fn play() {
}; };
let dec_settings = config.processing.clone().settings.unwrap(); let dec_settings = config.processing.clone().settings.unwrap();
let ff_log_format = format!("level+{}", config.logging.ffmpeg_level); let ff_log_format = format!("level+{}", config.logging.ffmpeg_level.to_lowercase());
let mut enc_proc = match config.out.mode.as_str() { let mut enc_proc = match config.out.mode.as_str() {
"desktop" => desktop::output(ff_log_format.clone()), "desktop" => desktop::output(ff_log_format.clone()),
@ -78,7 +67,7 @@ pub fn play() {
_ => panic!("Output mode doesn't exists!"), _ => panic!("Output mode doesn't exists!"),
}; };
runtime.spawn(stderr_reader( rt_handle.spawn(stderr_reader(
enc_proc.stderr.take().unwrap(), enc_proc.stderr.take().unwrap(),
"Encoder".to_string(), "Encoder".to_string(),
)); ));
@ -134,7 +123,7 @@ pub fn play() {
// debug!("Decoder PID: <yellow>{}</>", dec_pid.lock().unwrap()); // debug!("Decoder PID: <yellow>{}</>", dec_pid.lock().unwrap());
runtime.spawn(stderr_reader( rt_handle.spawn(stderr_reader(
dec_proc.stderr.take().unwrap(), dec_proc.stderr.take().unwrap(),
"Decoder".to_string(), "Decoder".to_string(),
)); ));

View File

@ -2,12 +2,13 @@ use serde::{Deserialize, Serialize};
use std::{fs::File, path::Path}; use std::{fs::File, path::Path};
use simplelog::*; use simplelog::*;
use tokio::runtime::Handle;
use crate::utils::{get_date, modified_time, time_to_sec, GlobalConfig, Media}; use crate::utils::{get_date, modified_time, validate_playlist, GlobalConfig, Media};
pub const DUMMY_LEN: f64 = 20.0; pub const DUMMY_LEN: f64 = 20.0;
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Playlist { pub struct Playlist {
pub date: String, pub date: String,
pub start_sec: Option<f64>, pub start_sec: Option<f64>,
@ -32,12 +33,11 @@ impl Playlist {
} }
} }
pub fn read_json(seek: bool, next_start: f64) -> Playlist { pub fn read_json(rt_handle: Handle, seek: bool, next_start: f64) -> Playlist {
let config = GlobalConfig::global(); let config = GlobalConfig::global();
let mut playlist_path = Path::new(&config.playlist.path).to_owned(); let mut playlist_path = Path::new(&config.playlist.path).to_owned();
let start = &config.playlist.day_start; let mut start_sec = config.playlist.start_sec.unwrap();
let mut start_sec = time_to_sec(start);
let date = get_date(seek, start_sec, next_start); let date = get_date(seek, start_sec, next_start);
if playlist_path.is_dir() { if playlist_path.is_dir() {
@ -82,5 +82,7 @@ pub fn read_json(seek: bool, next_start: f64) -> Playlist {
start_sec += item.out - item.seek; start_sec += item.out - item.seek;
} }
rt_handle.spawn(validate_playlist(playlist.clone(), config.clone()));
playlist playlist
} }

View File

@ -0,0 +1,45 @@
use std::path::Path;
use simplelog::*;
use crate::utils::{sec_to_time, GlobalConfig, MediaProbe, Playlist};
pub async fn validate_playlist(playlist: Playlist, config: GlobalConfig) -> Result<(), String> {
let _count = 0;
let date = playlist.date;
let length = config.playlist.length_sec.unwrap();
let mut start_sec = 0.0;
debug!("validate playlist from: <yellow>{date}</>");
for item in playlist.program.iter() {
if Path::new(&item.source).is_file() {
let probe = MediaProbe::new(item.source.clone());
if probe.format.is_none() {
error!(
"No Metadata from file <b><magenta>{}</></b> at <yellow>{}</>",
sec_to_time(start_sec),
item.source
);
}
} else {
error!(
"File on time <yellow>{}</> not exists: <b><magenta>{}</></b>",
sec_to_time(start_sec),
item.source
);
}
start_sec += item.out - item.seek;
}
if length > start_sec {
error!(
"Playlist from <yellow>{date}</> not long enough, <yellow>{}</> needed!",
sec_to_time(length - start_sec),
);
}
Ok(())
}

View File

@ -8,22 +8,22 @@ use file_rotate::{compression::Compression, suffix::AppendCount, ContentLimit, F
use lettre::{transport::smtp::authentication::Credentials, Message, SmtpTransport, Transport}; use lettre::{transport::smtp::authentication::Credentials, Message, SmtpTransport, Transport};
use log::{Level, LevelFilter, Log, Metadata, Record}; use log::{Level, LevelFilter, Log, Metadata, Record};
use simplelog::*; use simplelog::*;
use tokio::runtime::Runtime; use tokio::runtime::Handle;
use crate::utils::GlobalConfig; use crate::utils::GlobalConfig;
pub struct LogMailer { pub struct LogMailer {
level: LevelFilter, level: LevelFilter,
config: Config, config: Config,
runtime: Runtime, handle: Handle,
} }
impl LogMailer { impl LogMailer {
pub fn new(log_level: LevelFilter, config: Config, runtime: Runtime) -> Box<LogMailer> { pub fn new(log_level: LevelFilter, config: Config, handle: Handle) -> Box<LogMailer> {
Box::new(LogMailer { Box::new(LogMailer {
level: log_level, level: log_level,
config, config,
runtime, handle,
}) })
} }
} }
@ -37,10 +37,10 @@ impl Log for LogMailer {
if self.enabled(record.metadata()) { if self.enabled(record.metadata()) {
match record.level() { match record.level() {
Level::Error => { Level::Error => {
self.runtime.spawn(send_mail(record.args().to_string())); self.handle.spawn(send_mail(record.args().to_string()));
}, },
Level::Warn => { Level::Warn => {
self.runtime.spawn(send_mail(record.args().to_string())); self.handle.spawn(send_mail(record.args().to_string()));
}, },
_ => (), _ => (),
} }
@ -100,7 +100,7 @@ async fn send_mail(msg: String) {
} }
} }
pub fn init_logging() -> Vec<Box<dyn SharedLogger>> { pub fn init_logging(rt_handle: Handle) -> Vec<Box<dyn SharedLogger>> {
let config = GlobalConfig::global(); let config = GlobalConfig::global();
let app_config = config.logging.clone(); let app_config = config.logging.clone();
let mut app_logger: Vec<Box<dyn SharedLogger>> = vec![]; let mut app_logger: Vec<Box<dyn SharedLogger>> = vec![];
@ -160,7 +160,6 @@ pub fn init_logging() -> Vec<Box<dyn SharedLogger>> {
if config.mail.recipient.len() > 3 { if config.mail.recipient.len() > 3 {
let mut filter = LevelFilter::Error; let mut filter = LevelFilter::Error;
let runtime = Runtime::new().unwrap();
let mail_config = log_config let mail_config = log_config
.clone() .clone()
@ -171,7 +170,7 @@ pub fn init_logging() -> Vec<Box<dyn SharedLogger>> {
filter = LevelFilter::Warn filter = LevelFilter::Warn
} }
app_logger.push(LogMailer::new(filter, mail_config, runtime)); app_logger.push(LogMailer::new(filter, mail_config, rt_handle));
} }
app_logger app_logger

View File

@ -17,13 +17,15 @@ mod arg_parse;
mod config; mod config;
mod folder; mod folder;
mod json_reader; mod json_reader;
mod json_validate;
mod logging; mod logging;
mod playlist; mod playlist;
pub use arg_parse::get_args; pub use arg_parse::get_args;
pub use config::{init_config, GlobalConfig}; pub use config::{init_config, GlobalConfig};
pub use folder::{watch_folder, Source}; pub use folder::{watch_folder, Source};
pub use json_reader::{read_json, DUMMY_LEN}; pub use json_reader::{read_json, DUMMY_LEN, Playlist};
pub use json_validate::validate_playlist;
pub use logging::init_logging; pub use logging::init_logging;
pub use playlist::CurrentProgram; pub use playlist::CurrentProgram;

View File

@ -1,6 +1,7 @@
use std::path::Path; use std::path::Path;
use simplelog::*; use simplelog::*;
use tokio::runtime::Handle;
use crate::utils::{ use crate::utils::{
check_sync, gen_dummy, get_delta, get_sec, is_close, json_reader::read_json, modified_time, check_sync, gen_dummy, get_delta, get_sec, is_close, json_reader::read_json, modified_time,
@ -17,12 +18,13 @@ pub struct CurrentProgram {
current_node: Media, current_node: Media,
init: bool, init: bool,
index: usize, index: usize,
rt_handle: Handle,
} }
impl CurrentProgram { impl CurrentProgram {
pub fn new() -> Self { pub fn new(rt_handle: Handle) -> Self {
let config = GlobalConfig::global(); let config = GlobalConfig::global();
let json = read_json(true, 0.0); let json = read_json(rt_handle.clone(), true, 0.0);
Self { Self {
config: config.clone(), config: config.clone(),
@ -33,12 +35,13 @@ impl CurrentProgram {
current_node: Media::new(0, "".to_string()), current_node: Media::new(0, "".to_string()),
init: true, init: true,
index: 0, index: 0,
rt_handle,
} }
} }
fn check_update(&mut self, seek: bool) { fn check_update(&mut self, seek: bool) {
if self.json_path.is_none() { if self.json_path.is_none() {
let json = read_json(seek, 0.0); let json = read_json(self.rt_handle.clone(), seek, 0.0);
self.json_path = json.current_file; self.json_path = json.current_file;
self.json_mod = json.modified; self.json_mod = json.modified;
@ -52,7 +55,7 @@ impl CurrentProgram {
.eq(&self.json_mod.clone().unwrap()) .eq(&self.json_mod.clone().unwrap())
{ {
// when playlist has changed, reload it // when playlist has changed, reload it
let json = read_json(false, 0.0); let json = read_json(self.rt_handle.clone(), false, 0.0);
self.json_mod = json.modified; self.json_mod = json.modified;
self.nodes = json.program; self.nodes = json.program;
@ -92,7 +95,7 @@ impl CurrentProgram {
|| is_close(total_delta, 0.0, 2.0) || is_close(total_delta, 0.0, 2.0)
|| is_close(total_delta, target_length, 2.0) || is_close(total_delta, target_length, 2.0)
{ {
let json = read_json(false, next_start); let json = read_json(self.rt_handle.clone(), false, next_start);
self.json_path = json.current_file.clone(); self.json_path = json.current_file.clone();
self.json_mod = json.modified; self.json_mod = json.modified;