Merge pull request #120 from jb-alvarado/master
Optimize CPU and Ram usage
This commit is contained in:
commit
8f1dae6550
13
Cargo.lock
generated
13
Cargo.lock
generated
@ -94,9 +94,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "clap"
|
||||
version = "3.1.9"
|
||||
version = "3.1.12"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6aad2534fad53df1cc12519c5cda696dd3e20e6118a027e24054aea14a0bdcbe"
|
||||
checksum = "7c167e37342afc5f33fd87bbc870cedd020d2a6dffa05d45ccd9241fbdd146db"
|
||||
dependencies = [
|
||||
"atty",
|
||||
"bitflags",
|
||||
@ -216,15 +216,14 @@ dependencies = [
|
||||
"serde_yaml",
|
||||
"shlex",
|
||||
"simplelog",
|
||||
"tokio",
|
||||
"walkdir",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ffprobe"
|
||||
version = "0.3.1"
|
||||
version = "0.3.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e52fe7c1991d1d0f77383e9f3e584860a2e916fa22b834176b84a411fac7107a"
|
||||
checksum = "4151d364a3709c400c4aaca1988324f02dfde8d3e2e8543176e596d39eb414ac"
|
||||
dependencies = [
|
||||
"serde",
|
||||
"serde_json",
|
||||
@ -1328,9 +1327,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "tinyvec"
|
||||
version = "1.5.1"
|
||||
version = "1.6.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2c1c1d5a42b6245520c249549ec267180beaffcc0615401ac8e31853d4b6d8d2"
|
||||
checksum = "87cc5ceb3875bb20c2890005a4e226a4651264a5c75edb2421b52861a0a0cb50"
|
||||
dependencies = [
|
||||
"tinyvec_macros",
|
||||
]
|
||||
|
@ -25,7 +25,6 @@ serde_json = "1.0"
|
||||
serde_yaml = "0.8"
|
||||
shlex = "1.1"
|
||||
simplelog = { version = "^0.11", features = ["paris"] }
|
||||
tokio = { version = "1.16", features = ["rt-multi-thread"] }
|
||||
walkdir = "2"
|
||||
|
||||
[target.x86_64-unknown-linux-musl.dependencies]
|
||||
|
@ -3,6 +3,7 @@ use std::{
|
||||
path::Path,
|
||||
process::exit,
|
||||
sync::{
|
||||
atomic::{AtomicUsize, Ordering},
|
||||
mpsc::channel,
|
||||
{Arc, Mutex},
|
||||
},
|
||||
@ -25,11 +26,11 @@ pub struct Source {
|
||||
config: GlobalConfig,
|
||||
pub nodes: Arc<Mutex<Vec<Media>>>,
|
||||
current_node: Media,
|
||||
index: Arc<Mutex<usize>>,
|
||||
index: Arc<AtomicUsize>,
|
||||
}
|
||||
|
||||
impl Source {
|
||||
pub fn new(current_list: Arc<Mutex<Vec<Media>>>, global_index: Arc<Mutex<usize>>) -> Self {
|
||||
pub fn new(current_list: Arc<Mutex<Vec<Media>>>, global_index: Arc<AtomicUsize>) -> Self {
|
||||
let config = GlobalConfig::global();
|
||||
let mut media_list = vec![];
|
||||
let mut index: usize = 0;
|
||||
@ -117,14 +118,14 @@ impl Iterator for Source {
|
||||
type Item = Media;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
if *self.index.lock().unwrap() < self.nodes.lock().unwrap().len() {
|
||||
let i = *self.index.lock().unwrap();
|
||||
if self.index.load(Ordering::SeqCst) < self.nodes.lock().unwrap().len() {
|
||||
let i = self.index.load(Ordering::SeqCst);
|
||||
self.current_node = self.nodes.lock().unwrap()[i].clone();
|
||||
self.current_node.add_probe();
|
||||
self.current_node.add_filter();
|
||||
self.current_node.begin = Some(get_sec());
|
||||
|
||||
*self.index.lock().unwrap() += 1;
|
||||
self.index.fetch_add(1, Ordering::SeqCst);
|
||||
|
||||
Some(self.current_node.clone())
|
||||
} else {
|
||||
@ -147,7 +148,7 @@ impl Iterator for Source {
|
||||
self.current_node.add_filter();
|
||||
self.current_node.begin = Some(get_sec());
|
||||
|
||||
*self.index.lock().unwrap() = 1;
|
||||
self.index.store(1, Ordering::SeqCst);
|
||||
|
||||
Some(self.current_node.clone())
|
||||
}
|
||||
@ -158,7 +159,7 @@ fn file_extension(filename: &Path) -> Option<&str> {
|
||||
filename.extension().and_then(OsStr::to_str)
|
||||
}
|
||||
|
||||
pub async fn watchman(sources: Arc<Mutex<Vec<Media>>>, is_terminated: Arc<Mutex<bool>>) {
|
||||
pub fn watchman(sources: Arc<Mutex<Vec<Media>>>) {
|
||||
let config = GlobalConfig::global();
|
||||
let (tx, rx) = channel();
|
||||
|
||||
@ -169,14 +170,10 @@ pub async fn watchman(sources: Arc<Mutex<Vec<Media>>>, is_terminated: Arc<Mutex<
|
||||
panic!("Folder path not exists: '{path}'");
|
||||
}
|
||||
|
||||
let mut watcher = watcher(tx, Duration::from_secs(2)).unwrap();
|
||||
let mut watcher = watcher(tx, Duration::from_secs(1)).unwrap();
|
||||
watcher.watch(path, RecursiveMode::Recursive).unwrap();
|
||||
|
||||
loop {
|
||||
if *is_terminated.lock().unwrap() {
|
||||
break;
|
||||
}
|
||||
|
||||
if let Ok(res) = rx.try_recv() {
|
||||
match res {
|
||||
Create(new_path) => {
|
||||
@ -210,6 +207,6 @@ pub async fn watchman(sources: Arc<Mutex<Vec<Media>>>, is_terminated: Arc<Mutex<
|
||||
}
|
||||
}
|
||||
|
||||
sleep(Duration::from_secs(4));
|
||||
sleep(Duration::from_secs(5));
|
||||
}
|
||||
}
|
||||
|
@ -2,13 +2,12 @@ use std::{
|
||||
io::{BufReader, Error, Read},
|
||||
path::Path,
|
||||
process::{Command, Stdio},
|
||||
thread::sleep,
|
||||
time::Duration,
|
||||
sync::atomic::Ordering,
|
||||
thread,
|
||||
};
|
||||
|
||||
use crossbeam_channel::Sender;
|
||||
use simplelog::*;
|
||||
use tokio::runtime::Handle;
|
||||
|
||||
use crate::utils::{stderr_reader, GlobalConfig, Ingest, ProcessControl};
|
||||
|
||||
@ -52,10 +51,9 @@ fn audio_filter(config: &GlobalConfig) -> String {
|
||||
audio_chain
|
||||
}
|
||||
|
||||
pub async fn ingest_server(
|
||||
pub fn ingest_server(
|
||||
log_format: String,
|
||||
ingest_sender: Sender<(usize, [u8; 65088])>,
|
||||
rt_handle: Handle,
|
||||
mut proc_control: ProcessControl,
|
||||
) -> Result<(), Error> {
|
||||
let config = GlobalConfig::global();
|
||||
@ -100,10 +98,7 @@ pub async fn ingest_server(
|
||||
server_cmd.join(" ")
|
||||
);
|
||||
|
||||
loop {
|
||||
if *proc_control.is_terminated.lock().unwrap() {
|
||||
break;
|
||||
}
|
||||
'ingest_iter: loop {
|
||||
let mut server_proc = match Command::new("ffmpeg")
|
||||
.args(server_cmd.clone())
|
||||
.stdout(Stdio::piped())
|
||||
@ -116,10 +111,10 @@ pub async fn ingest_server(
|
||||
}
|
||||
Ok(proc) => proc,
|
||||
};
|
||||
|
||||
rt_handle.spawn(stderr_reader(server_proc.stderr.take().unwrap(), "Server"));
|
||||
|
||||
let mut ingest_reader = BufReader::new(server_proc.stdout.take().unwrap());
|
||||
let server_err = BufReader::new(server_proc.stderr.take().unwrap());
|
||||
let error_reader_thread = thread::spawn(move || stderr_reader(server_err, "Server"));
|
||||
|
||||
*proc_control.server_term.lock().unwrap() = Some(server_proc);
|
||||
|
||||
is_running = false;
|
||||
@ -134,7 +129,7 @@ pub async fn ingest_server(
|
||||
};
|
||||
|
||||
if !is_running {
|
||||
*proc_control.server_is_running.lock().unwrap() = true;
|
||||
proc_control.server_is_running.store(true, Ordering::SeqCst);
|
||||
is_running = true;
|
||||
}
|
||||
|
||||
@ -142,8 +137,8 @@ pub async fn ingest_server(
|
||||
if let Err(e) = ingest_sender.send((bytes_len, buffer)) {
|
||||
error!("Ingest server write error: {e:?}");
|
||||
|
||||
*proc_control.is_terminated.lock().unwrap() = true;
|
||||
break;
|
||||
proc_control.is_terminated.store(true, Ordering::SeqCst);
|
||||
break 'ingest_iter;
|
||||
}
|
||||
} else {
|
||||
break;
|
||||
@ -151,14 +146,19 @@ pub async fn ingest_server(
|
||||
}
|
||||
|
||||
drop(ingest_reader);
|
||||
|
||||
*proc_control.server_is_running.lock().unwrap() = false;
|
||||
|
||||
sleep(Duration::from_secs(1));
|
||||
proc_control.server_is_running.store(false, Ordering::SeqCst);
|
||||
|
||||
if let Err(e) = proc_control.wait(Ingest) {
|
||||
error!("{e}")
|
||||
}
|
||||
|
||||
if let Err(e) = error_reader_thread.join() {
|
||||
error!("{e:?}");
|
||||
};
|
||||
|
||||
if proc_control.is_terminated.load(Ordering::SeqCst) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
@ -1,10 +1,10 @@
|
||||
use std::{
|
||||
process,
|
||||
sync::{Arc, Mutex},
|
||||
sync::{Arc, Mutex, atomic::{AtomicBool, AtomicUsize}},
|
||||
thread,
|
||||
};
|
||||
|
||||
use simplelog::*;
|
||||
use tokio::runtime::Handle;
|
||||
|
||||
use crate::utils::{GlobalConfig, Media, PlayoutStatus};
|
||||
|
||||
@ -17,12 +17,11 @@ pub use ingest::ingest_server;
|
||||
pub use playlist::CurrentProgram;
|
||||
|
||||
pub fn source_generator(
|
||||
rt_handle: &Handle,
|
||||
config: GlobalConfig,
|
||||
current_list: Arc<Mutex<Vec<Media>>>,
|
||||
index: Arc<Mutex<usize>>,
|
||||
index: Arc<AtomicUsize>,
|
||||
playout_stat: PlayoutStatus,
|
||||
is_terminated: Arc<Mutex<bool>>,
|
||||
is_terminated: Arc<AtomicBool>,
|
||||
) -> Box<dyn Iterator<Item = Media>> {
|
||||
let get_source = match config.processing.clone().mode.as_str() {
|
||||
"folder" => {
|
||||
@ -30,14 +29,14 @@ pub fn source_generator(
|
||||
debug!("Monitor folder: <b><magenta>{}</></b>", &config.storage.path);
|
||||
|
||||
let folder_source = Source::new(current_list, index);
|
||||
rt_handle.spawn(watchman(folder_source.nodes.clone(), is_terminated.clone()));
|
||||
let node_clone = folder_source.nodes.clone();
|
||||
thread::spawn(move || watchman(node_clone));
|
||||
|
||||
Box::new(folder_source) as Box<dyn Iterator<Item = Media>>
|
||||
}
|
||||
"playlist" => {
|
||||
info!("Playout in playlist mode");
|
||||
let program = CurrentProgram::new(
|
||||
rt_handle.clone(),
|
||||
playout_stat,
|
||||
is_terminated.clone(),
|
||||
current_list,
|
||||
|
@ -1,12 +1,11 @@
|
||||
use std::{
|
||||
fs,
|
||||
path::Path,
|
||||
sync::{Arc, Mutex},
|
||||
sync::{atomic::{AtomicBool, AtomicUsize, Ordering}, Arc, Mutex},
|
||||
};
|
||||
|
||||
use serde_json::json;
|
||||
use simplelog::*;
|
||||
use tokio::runtime::Handle;
|
||||
|
||||
use crate::utils::{
|
||||
check_sync, gen_dummy, get_delta, get_sec, is_close, json_serializer::read_json, modified_time,
|
||||
@ -22,22 +21,20 @@ pub struct CurrentProgram {
|
||||
json_date: String,
|
||||
pub nodes: Arc<Mutex<Vec<Media>>>,
|
||||
current_node: Media,
|
||||
index: Arc<Mutex<usize>>,
|
||||
rt_handle: Handle,
|
||||
is_terminated: Arc<Mutex<bool>>,
|
||||
index: Arc<AtomicUsize>,
|
||||
is_terminated: Arc<AtomicBool>,
|
||||
playout_stat: PlayoutStatus,
|
||||
}
|
||||
|
||||
impl CurrentProgram {
|
||||
pub fn new(
|
||||
rt_handle: Handle,
|
||||
playout_stat: PlayoutStatus,
|
||||
is_terminated: Arc<Mutex<bool>>,
|
||||
is_terminated: Arc<AtomicBool>,
|
||||
current_list: Arc<Mutex<Vec<Media>>>,
|
||||
global_index: Arc<Mutex<usize>>,
|
||||
global_index: Arc<AtomicUsize>,
|
||||
) -> Self {
|
||||
let config = GlobalConfig::global();
|
||||
let json = read_json(None, rt_handle.clone(), is_terminated.clone(), true, 0.0);
|
||||
let json = read_json(None, is_terminated.clone(), true, 0.0);
|
||||
|
||||
*current_list.lock().unwrap() = json.program;
|
||||
*playout_stat.current_date.lock().unwrap() = json.date.clone();
|
||||
@ -61,7 +58,6 @@ impl CurrentProgram {
|
||||
nodes: current_list,
|
||||
current_node: Media::new(0, String::new(), false),
|
||||
index: global_index,
|
||||
rt_handle,
|
||||
is_terminated,
|
||||
playout_stat,
|
||||
}
|
||||
@ -69,13 +65,7 @@ impl CurrentProgram {
|
||||
|
||||
fn check_update(&mut self, seek: bool) {
|
||||
if self.json_path.is_none() {
|
||||
let json = read_json(
|
||||
None,
|
||||
self.rt_handle.clone(),
|
||||
self.is_terminated.clone(),
|
||||
seek,
|
||||
0.0,
|
||||
);
|
||||
let json = read_json(None, self.is_terminated.clone(), seek, 0.0);
|
||||
|
||||
self.json_path = json.current_file;
|
||||
self.json_mod = json.modified;
|
||||
@ -96,7 +86,6 @@ impl CurrentProgram {
|
||||
|
||||
let json = read_json(
|
||||
self.json_path.clone(),
|
||||
self.rt_handle.clone(),
|
||||
self.is_terminated.clone(),
|
||||
false,
|
||||
0.0,
|
||||
@ -106,7 +95,7 @@ impl CurrentProgram {
|
||||
*self.nodes.lock().unwrap() = json.program;
|
||||
|
||||
self.get_current_clip();
|
||||
*self.index.lock().unwrap() += 1;
|
||||
self.index.fetch_add(1, Ordering::SeqCst);
|
||||
}
|
||||
} else {
|
||||
error!(
|
||||
@ -121,8 +110,8 @@ impl CurrentProgram {
|
||||
self.json_path = None;
|
||||
*self.nodes.lock().unwrap() = vec![media.clone()];
|
||||
self.current_node = media;
|
||||
*self.playout_stat.list_init.lock().unwrap() = true;
|
||||
*self.index.lock().unwrap() = 0;
|
||||
self.playout_stat.list_init.store(true, Ordering::SeqCst);
|
||||
self.index.store(0, Ordering::SeqCst);
|
||||
}
|
||||
}
|
||||
|
||||
@ -143,13 +132,7 @@ impl CurrentProgram {
|
||||
|| is_close(total_delta, 0.0, 2.0)
|
||||
|| is_close(total_delta, target_length, 2.0)
|
||||
{
|
||||
let json = read_json(
|
||||
None,
|
||||
self.rt_handle.clone(),
|
||||
self.is_terminated.clone(),
|
||||
false,
|
||||
next_start,
|
||||
);
|
||||
let json = read_json(None, self.is_terminated.clone(), false, next_start);
|
||||
|
||||
let data = json!({
|
||||
"time_shift": 0.0,
|
||||
@ -167,27 +150,35 @@ impl CurrentProgram {
|
||||
self.json_mod = json.modified;
|
||||
self.json_date = json.date;
|
||||
*self.nodes.lock().unwrap() = json.program;
|
||||
*self.index.lock().unwrap() = 0;
|
||||
self.index.store(0, Ordering::SeqCst);
|
||||
|
||||
if json.current_file.is_none() {
|
||||
*self.playout_stat.list_init.lock().unwrap() = true;
|
||||
self.playout_stat.list_init.store(true, Ordering::SeqCst);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn last_next_ad(&mut self) {
|
||||
let index = *self.index.lock().unwrap();
|
||||
let index = self.index.load(Ordering::SeqCst);
|
||||
let current_list = self.nodes.lock().unwrap();
|
||||
|
||||
if index + 1 < current_list.len()
|
||||
&& ¤t_list[index + 1].category.clone().unwrap_or(String::new()) == "advertisement"
|
||||
&& ¤t_list[index + 1]
|
||||
.category
|
||||
.clone()
|
||||
.unwrap_or(String::new())
|
||||
== "advertisement"
|
||||
{
|
||||
self.current_node.next_ad = Some(true);
|
||||
}
|
||||
|
||||
if index > 0
|
||||
&& index < current_list.len()
|
||||
&& ¤t_list[index - 1].category.clone().unwrap_or(String::new()) == "advertisement"
|
||||
&& ¤t_list[index - 1]
|
||||
.category
|
||||
.clone()
|
||||
.unwrap_or(String::new())
|
||||
== "advertisement"
|
||||
{
|
||||
self.current_node.last_ad = Some(true);
|
||||
}
|
||||
@ -217,8 +208,8 @@ impl CurrentProgram {
|
||||
|
||||
for (i, item) in self.nodes.lock().unwrap().iter_mut().enumerate() {
|
||||
if item.begin.unwrap() + item.out - item.seek > time_sec {
|
||||
*self.playout_stat.list_init.lock().unwrap() = false;
|
||||
*self.index.lock().unwrap() = i;
|
||||
self.playout_stat.list_init.store(false, Ordering::SeqCst);
|
||||
self.index.store(i, Ordering::SeqCst);
|
||||
|
||||
break;
|
||||
}
|
||||
@ -228,13 +219,12 @@ impl CurrentProgram {
|
||||
fn init_clip(&mut self) {
|
||||
self.get_current_clip();
|
||||
|
||||
if !*self.playout_stat.list_init.lock().unwrap() {
|
||||
if !self.playout_stat.list_init.load(Ordering::SeqCst) {
|
||||
let time_sec = self.get_current_time();
|
||||
let index = *self.index.lock().unwrap();
|
||||
let index = self.index.fetch_add(1, Ordering::SeqCst);
|
||||
|
||||
// de-instance node to preserve original values in list
|
||||
let mut node_clone = self.nodes.lock().unwrap()[index].clone();
|
||||
*self.index.lock().unwrap() += 1;
|
||||
|
||||
node_clone.seek = time_sec - node_clone.begin.unwrap();
|
||||
self.current_node = handle_list_init(node_clone);
|
||||
@ -246,7 +236,7 @@ impl Iterator for CurrentProgram {
|
||||
type Item = Media;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
if *self.playout_stat.list_init.lock().unwrap() {
|
||||
if self.playout_stat.list_init.load(Ordering::SeqCst) {
|
||||
debug!("Playlist init");
|
||||
self.check_update(true);
|
||||
|
||||
@ -254,7 +244,7 @@ impl Iterator for CurrentProgram {
|
||||
self.init_clip();
|
||||
}
|
||||
|
||||
if *self.playout_stat.list_init.lock().unwrap() {
|
||||
if self.playout_stat.list_init.load(Ordering::SeqCst) {
|
||||
// on init load playlist, could be not long enough,
|
||||
// so we check if we can take the next playlist already,
|
||||
// or we fill the gap with a dummy.
|
||||
@ -277,12 +267,13 @@ impl Iterator for CurrentProgram {
|
||||
|
||||
if DUMMY_LEN > total_delta {
|
||||
duration = total_delta;
|
||||
*self.playout_stat.list_init.lock().unwrap() = false;
|
||||
self.playout_stat.list_init.store(false, Ordering::SeqCst);
|
||||
}
|
||||
|
||||
if self.config.playlist.start_sec.unwrap() > current_time {
|
||||
current_time += self.config.playlist.length_sec.unwrap() + 1.0;
|
||||
}
|
||||
|
||||
let mut media = Media::new(0, String::new(), false);
|
||||
media.begin = Some(current_time);
|
||||
media.duration = duration;
|
||||
@ -290,7 +281,7 @@ impl Iterator for CurrentProgram {
|
||||
|
||||
self.current_node = gen_source(media);
|
||||
self.nodes.lock().unwrap().push(self.current_node.clone());
|
||||
*self.index.lock().unwrap() = self.nodes.lock().unwrap().len();
|
||||
self.index.store(self.nodes.lock().unwrap().len(), Ordering::SeqCst);
|
||||
}
|
||||
}
|
||||
|
||||
@ -299,10 +290,10 @@ impl Iterator for CurrentProgram {
|
||||
return Some(self.current_node.clone());
|
||||
}
|
||||
|
||||
if *self.index.lock().unwrap() < self.nodes.lock().unwrap().len() {
|
||||
if self.index.load(Ordering::SeqCst) < self.nodes.lock().unwrap().len() {
|
||||
self.check_for_next_playlist();
|
||||
let mut is_last = false;
|
||||
let index = *self.index.lock().unwrap();
|
||||
let index = self.index.load(Ordering::SeqCst);
|
||||
|
||||
if index == self.nodes.lock().unwrap().len() - 1 {
|
||||
is_last = true
|
||||
@ -315,7 +306,7 @@ impl Iterator for CurrentProgram {
|
||||
&self.playout_stat,
|
||||
);
|
||||
self.last_next_ad();
|
||||
*self.index.lock().unwrap() += 1;
|
||||
self.index.fetch_add(1, Ordering::SeqCst);
|
||||
|
||||
// update playlist should happen after current clip,
|
||||
// to prevent unknown behaviors.
|
||||
@ -332,7 +323,7 @@ impl Iterator for CurrentProgram {
|
||||
{
|
||||
// Test if playlist is to early finish,
|
||||
// and if we have to fill it with a placeholder.
|
||||
let index = *self.index.lock().unwrap();
|
||||
let index = self.index.load(Ordering::SeqCst);
|
||||
self.current_node = Media::new(index, String::new(), false);
|
||||
self.current_node.begin = Some(get_sec());
|
||||
let mut duration = total_delta.abs();
|
||||
@ -349,17 +340,17 @@ impl Iterator for CurrentProgram {
|
||||
self.current_node.last_ad = last_ad;
|
||||
self.current_node.add_filter();
|
||||
|
||||
*self.index.lock().unwrap() += 1;
|
||||
self.index.fetch_add(1, Ordering::SeqCst);
|
||||
|
||||
return Some(self.current_node.clone());
|
||||
}
|
||||
|
||||
*self.index.lock().unwrap() = 0;
|
||||
self.index.store(0, Ordering::SeqCst);
|
||||
self.current_node = gen_source(self.nodes.lock().unwrap()[0].clone());
|
||||
self.last_next_ad();
|
||||
self.current_node.last_ad = last_ad;
|
||||
|
||||
*self.index.lock().unwrap() = 1;
|
||||
self.index.store(1, Ordering::SeqCst);
|
||||
|
||||
Some(self.current_node.clone())
|
||||
}
|
||||
@ -438,7 +429,7 @@ fn gen_source(mut node: Media) -> Media {
|
||||
node.out - node.seek
|
||||
);
|
||||
} else {
|
||||
error!("File not found: {}", node.source);
|
||||
error!("File not found: <b><magenta>{}</></b>", node.source);
|
||||
}
|
||||
let (source, cmd) = gen_dummy(node.out - node.seek);
|
||||
node.source = source;
|
||||
|
26
src/main.rs
26
src/main.rs
@ -2,15 +2,16 @@ extern crate log;
|
||||
extern crate simplelog;
|
||||
|
||||
use std::{
|
||||
{fs, fs::File},
|
||||
path::PathBuf,
|
||||
process::exit,
|
||||
{fs, fs::File},
|
||||
thread,
|
||||
|
||||
};
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::json;
|
||||
use simplelog::*;
|
||||
use tokio::runtime::Builder;
|
||||
|
||||
mod filter;
|
||||
mod input;
|
||||
@ -60,10 +61,7 @@ fn main() {
|
||||
*playout_stat.date.lock().unwrap() = data.date;
|
||||
}
|
||||
|
||||
let runtime = Builder::new_multi_thread().enable_all().build().unwrap();
|
||||
let rt_handle = runtime.handle();
|
||||
|
||||
let logging = init_logging(rt_handle.clone(), proc_control.is_terminated.clone());
|
||||
let logging = init_logging();
|
||||
CombinedLogger::init(logging).unwrap();
|
||||
|
||||
validate_ffmpeg();
|
||||
@ -74,18 +72,22 @@ fn main() {
|
||||
exit(0);
|
||||
}
|
||||
|
||||
let play_ctl = play_control.clone();
|
||||
let play_stat = playout_stat.clone();
|
||||
let proc_ctl = proc_control.clone();
|
||||
|
||||
if config.rpc_server.enable {
|
||||
rt_handle.spawn(json_rpc_server(
|
||||
play_control.clone(),
|
||||
playout_stat.clone(),
|
||||
proc_control.clone(),
|
||||
thread::spawn( move || json_rpc_server(
|
||||
play_ctl,
|
||||
play_stat,
|
||||
proc_ctl,
|
||||
));
|
||||
}
|
||||
|
||||
if &config.out.mode.to_lowercase() == "hls" {
|
||||
write_hls(rt_handle, play_control, playout_stat, proc_control);
|
||||
write_hls(play_control, playout_stat, proc_control);
|
||||
} else {
|
||||
player(rt_handle, play_control, playout_stat, proc_control);
|
||||
player(play_control, playout_stat, proc_control);
|
||||
}
|
||||
|
||||
info!("Playout done...");
|
||||
|
@ -1,7 +1,4 @@
|
||||
use std::{
|
||||
process,
|
||||
process::{Command, Stdio},
|
||||
};
|
||||
use std::process::{self, Command, Stdio};
|
||||
|
||||
use simplelog::*;
|
||||
|
||||
@ -13,14 +10,7 @@ pub fn output(log_format: &str) -> process::Child {
|
||||
|
||||
let mut enc_filter: Vec<String> = vec![];
|
||||
|
||||
let mut enc_cmd = vec![
|
||||
"-hide_banner",
|
||||
"-nostats",
|
||||
"-v",
|
||||
log_format,
|
||||
"-i",
|
||||
"pipe:0",
|
||||
];
|
||||
let mut enc_cmd = vec!["-hide_banner", "-nostats", "-v", log_format, "-i", "pipe:0"];
|
||||
|
||||
if config.text.add_text && !config.text.over_pre {
|
||||
info!(
|
||||
@ -35,7 +25,10 @@ pub fn output(log_format: &str) -> process::Child {
|
||||
|
||||
enc_cmd.append(&mut enc_filter.iter().map(String::as_str).collect());
|
||||
|
||||
debug!("Encoder CMD: <bright-blue>\"ffplay {}\"</>", enc_cmd.join(" "));
|
||||
debug!(
|
||||
"Encoder CMD: <bright-blue>\"ffplay {}\"</>",
|
||||
enc_cmd.join(" ")
|
||||
);
|
||||
|
||||
let enc_proc = match Command::new("ffplay")
|
||||
.args(enc_cmd)
|
||||
|
@ -18,11 +18,12 @@ out:
|
||||
*/
|
||||
|
||||
use std::{
|
||||
io::BufReader,
|
||||
process::{Command, Stdio},
|
||||
thread,
|
||||
};
|
||||
|
||||
use simplelog::*;
|
||||
use tokio::runtime::Handle;
|
||||
|
||||
use crate::input::source_generator;
|
||||
use crate::utils::{
|
||||
@ -30,7 +31,6 @@ use crate::utils::{
|
||||
};
|
||||
|
||||
pub fn write_hls(
|
||||
rt_handle: &Handle,
|
||||
play_control: PlayerControl,
|
||||
playout_stat: PlayoutStatus,
|
||||
proc_control: ProcessControl,
|
||||
@ -40,7 +40,6 @@ pub fn write_hls(
|
||||
let ff_log_format = format!("level+{}", config.logging.ffmpeg_level.to_lowercase());
|
||||
|
||||
let get_source = source_generator(
|
||||
rt_handle,
|
||||
config.clone(),
|
||||
play_control.current_list.clone(),
|
||||
play_control.index.clone(),
|
||||
@ -93,13 +92,15 @@ pub fn write_hls(
|
||||
Ok(proc) => proc,
|
||||
};
|
||||
|
||||
rt_handle.spawn(stderr_reader(
|
||||
dec_proc.stderr.take().unwrap(),
|
||||
"Writer",
|
||||
));
|
||||
let dec_err = BufReader::new(dec_proc.stderr.take().unwrap());
|
||||
let error_decoder_thread = thread::spawn(move || stderr_reader(dec_err, "Writer"));
|
||||
|
||||
if let Err(e) = dec_proc.wait() {
|
||||
error!("Writer: {e}")
|
||||
};
|
||||
|
||||
if let Err(e) = error_decoder_thread.join() {
|
||||
error!("{e:?}");
|
||||
};
|
||||
}
|
||||
}
|
||||
|
@ -1,13 +1,13 @@
|
||||
use std::{
|
||||
io::{prelude::*, BufReader, BufWriter, Read},
|
||||
process::{Command, Stdio},
|
||||
thread::sleep,
|
||||
sync::atomic::Ordering,
|
||||
thread::{self, sleep},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use crossbeam_channel::bounded;
|
||||
use simplelog::*;
|
||||
use tokio::runtime::Handle;
|
||||
|
||||
mod desktop;
|
||||
mod hls;
|
||||
@ -22,7 +22,6 @@ use crate::utils::{
|
||||
};
|
||||
|
||||
pub fn player(
|
||||
rt_handle: &Handle,
|
||||
play_control: PlayerControl,
|
||||
playout_stat: PlayoutStatus,
|
||||
mut proc_control: ProcessControl,
|
||||
@ -35,7 +34,6 @@ pub fn player(
|
||||
let playlist_init = playout_stat.list_init.clone();
|
||||
|
||||
let get_source = source_generator(
|
||||
rt_handle,
|
||||
config.clone(),
|
||||
play_control.current_list.clone(),
|
||||
play_control.index.clone(),
|
||||
@ -50,18 +48,18 @@ pub fn player(
|
||||
};
|
||||
|
||||
let mut enc_writer = BufWriter::new(enc_proc.stdin.take().unwrap());
|
||||
rt_handle.spawn(stderr_reader(enc_proc.stderr.take().unwrap(), "Encoder"));
|
||||
let enc_err = BufReader::new(enc_proc.stderr.take().unwrap());
|
||||
let error_encoder_thread = thread::spawn(move || stderr_reader(enc_err, "Encoder"));
|
||||
|
||||
*proc_control.decoder_term.lock().unwrap() = Some(enc_proc);
|
||||
|
||||
let (ingest_sender, ingest_receiver) = bounded(96);
|
||||
|
||||
let ff_log_format_c = ff_log_format.clone();
|
||||
let proc_control_c = proc_control.clone();
|
||||
|
||||
if config.ingest.enable {
|
||||
rt_handle.spawn(ingest_server(
|
||||
ff_log_format.clone(),
|
||||
ingest_sender.clone(),
|
||||
rt_handle.clone(),
|
||||
proc_control.clone(),
|
||||
));
|
||||
thread::spawn(move || ingest_server(ff_log_format_c, ingest_sender, proc_control_c));
|
||||
}
|
||||
|
||||
'source_iter: for node in get_source {
|
||||
@ -111,11 +109,13 @@ pub fn player(
|
||||
};
|
||||
|
||||
let mut dec_reader = BufReader::new(dec_proc.stdout.take().unwrap());
|
||||
rt_handle.spawn(stderr_reader(dec_proc.stderr.take().unwrap(), "Decoder"));
|
||||
let dec_err = BufReader::new(dec_proc.stderr.take().unwrap());
|
||||
let error_decoder_thread = thread::spawn(move || stderr_reader(dec_err, "Encoder"));
|
||||
|
||||
*proc_control.decoder_term.lock().unwrap() = Some(dec_proc);
|
||||
|
||||
loop {
|
||||
if *proc_control.server_is_running.lock().unwrap() {
|
||||
if proc_control.server_is_running.load(Ordering::SeqCst) {
|
||||
if !live_on {
|
||||
info!("Switch from {} to live ingest", config.processing.mode);
|
||||
|
||||
@ -128,7 +128,7 @@ pub fn player(
|
||||
}
|
||||
|
||||
live_on = true;
|
||||
*playlist_init.lock().unwrap() = true;
|
||||
playlist_init.store(true, Ordering::SeqCst);
|
||||
}
|
||||
|
||||
for rx in ingest_receiver.try_iter() {
|
||||
@ -173,6 +173,10 @@ pub fn player(
|
||||
if let Err(e) = proc_control.wait(Decoder) {
|
||||
error!("{e}")
|
||||
}
|
||||
|
||||
if let Err(e) = error_decoder_thread.join() {
|
||||
error!("{e:?}");
|
||||
};
|
||||
}
|
||||
|
||||
sleep(Duration::from_secs(1));
|
||||
@ -180,4 +184,8 @@ pub fn player(
|
||||
if let Err(e) = proc_control.kill(Encoder) {
|
||||
error!("{e}")
|
||||
}
|
||||
|
||||
if let Err(e) = error_encoder_thread.join() {
|
||||
error!("{e:?}");
|
||||
};
|
||||
}
|
||||
|
@ -1,6 +1,9 @@
|
||||
use jsonrpc_http_server::jsonrpc_core::{IoHandler, Params, Value};
|
||||
use std::sync::atomic::Ordering;
|
||||
|
||||
use jsonrpc_http_server::{
|
||||
hyper, AccessControlAllowOrigin, DomainsValidation, Response, RestApi, ServerBuilder,
|
||||
hyper,
|
||||
jsonrpc_core::{IoHandler, Params, Value},
|
||||
AccessControlAllowOrigin, DomainsValidation, Response, RestApi, ServerBuilder,
|
||||
};
|
||||
use serde_json::{json, Map};
|
||||
use simplelog::*;
|
||||
@ -42,7 +45,7 @@ fn get_data_map(config: &GlobalConfig, media: Media) -> Map<String, Value> {
|
||||
data_map
|
||||
}
|
||||
|
||||
pub async fn json_rpc_server(
|
||||
pub fn json_rpc_server(
|
||||
play_control: PlayerControl,
|
||||
playout_stat: PlayoutStatus,
|
||||
proc_control: ProcessControl,
|
||||
@ -59,7 +62,7 @@ pub async fn json_rpc_server(
|
||||
let mut date = playout_stat.date.lock().unwrap();
|
||||
|
||||
if map.contains_key("control") && &map["control"] == "next" {
|
||||
let index = *play.index.lock().unwrap();
|
||||
let index = play.index.load(Ordering::SeqCst);
|
||||
|
||||
if index < play.current_list.lock().unwrap().len() {
|
||||
if let Some(proc) = proc.decoder_term.lock().unwrap().as_mut() {
|
||||
@ -96,7 +99,7 @@ pub async fn json_rpc_server(
|
||||
}
|
||||
|
||||
if map.contains_key("control") && &map["control"] == "back" {
|
||||
let index = *play.index.lock().unwrap();
|
||||
let index = play.index.load(Ordering::SeqCst);
|
||||
|
||||
if index > 1 && play.current_list.lock().unwrap().len() > 1 {
|
||||
if let Some(proc) = proc.decoder_term.lock().unwrap().as_mut() {
|
||||
@ -111,7 +114,7 @@ pub async fn json_rpc_server(
|
||||
info!("Move to last clip");
|
||||
let mut data_map = Map::new();
|
||||
let mut media = play.current_list.lock().unwrap()[index - 2].clone();
|
||||
*play.index.lock().unwrap() = index - 2;
|
||||
play.index.fetch_sub(2, Ordering::SeqCst);
|
||||
media.add_probe();
|
||||
|
||||
let (delta, _) = get_delta(&media.begin.unwrap_or(0.0));
|
||||
@ -146,7 +149,7 @@ pub async fn json_rpc_server(
|
||||
let mut data_map = Map::new();
|
||||
*time_shift = 0.0;
|
||||
*date = current_date.clone();
|
||||
*playout_stat.list_init.lock().unwrap() = true;
|
||||
playout_stat.list_init.store(true, Ordering::SeqCst);
|
||||
|
||||
write_status(¤t_date, 0.0);
|
||||
|
||||
@ -167,7 +170,7 @@ pub async fn json_rpc_server(
|
||||
}
|
||||
|
||||
if map.contains_key("media") && &map["media"] == "next" {
|
||||
let index = *play.index.lock().unwrap();
|
||||
let index = play.index.load(Ordering::SeqCst);
|
||||
|
||||
if index < play.current_list.lock().unwrap().len() {
|
||||
let media = play.current_list.lock().unwrap()[index].clone();
|
||||
@ -181,7 +184,7 @@ pub async fn json_rpc_server(
|
||||
}
|
||||
|
||||
if map.contains_key("media") && &map["media"] == "last" {
|
||||
let index = *play.index.lock().unwrap();
|
||||
let index = play.index.load(Ordering::SeqCst);
|
||||
|
||||
if index > 1 && index - 2 < play.current_list.lock().unwrap().len() {
|
||||
let media = play.current_list.lock().unwrap()[index - 2].clone();
|
||||
|
@ -1,6 +1,3 @@
|
||||
use once_cell::sync::OnceCell;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_yaml::{self};
|
||||
use std::{
|
||||
env,
|
||||
fs::File,
|
||||
@ -8,6 +5,9 @@ use std::{
|
||||
process,
|
||||
};
|
||||
|
||||
use once_cell::sync::OnceCell;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_yaml::{self};
|
||||
use shlex::split;
|
||||
|
||||
use crate::utils::{get_args, time_to_sec};
|
||||
@ -51,7 +51,7 @@ pub struct Mail {
|
||||
pub sender_pass: String,
|
||||
pub recipient: String,
|
||||
pub mail_level: String,
|
||||
pub interval: i32,
|
||||
pub interval: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
|
@ -1,8 +1,10 @@
|
||||
use std::{
|
||||
fmt,
|
||||
process::Child,
|
||||
sync::{Arc, Mutex, RwLock},
|
||||
|
||||
sync::{
|
||||
atomic::{AtomicBool, AtomicUsize, Ordering},
|
||||
Arc, Mutex,
|
||||
},
|
||||
};
|
||||
|
||||
use jsonrpc_http_server::CloseHandle;
|
||||
@ -33,10 +35,10 @@ pub struct ProcessControl {
|
||||
pub decoder_term: Arc<Mutex<Option<Child>>>,
|
||||
pub encoder_term: Arc<Mutex<Option<Child>>>,
|
||||
pub server_term: Arc<Mutex<Option<Child>>>,
|
||||
pub server_is_running: Arc<Mutex<bool>>,
|
||||
pub server_is_running: Arc<AtomicBool>,
|
||||
pub rpc_handle: Arc<Mutex<Option<CloseHandle>>>,
|
||||
pub is_terminated: Arc<Mutex<bool>>,
|
||||
pub is_alive: Arc<RwLock<bool>>,
|
||||
pub is_terminated: Arc<AtomicBool>,
|
||||
pub is_alive: Arc<AtomicBool>,
|
||||
}
|
||||
|
||||
impl ProcessControl {
|
||||
@ -45,10 +47,10 @@ impl ProcessControl {
|
||||
decoder_term: Arc::new(Mutex::new(None)),
|
||||
encoder_term: Arc::new(Mutex::new(None)),
|
||||
server_term: Arc::new(Mutex::new(None)),
|
||||
server_is_running: Arc::new(Mutex::new(false)),
|
||||
server_is_running: Arc::new(AtomicBool::new(false)),
|
||||
rpc_handle: Arc::new(Mutex::new(None)),
|
||||
is_terminated: Arc::new(Mutex::new(false)),
|
||||
is_alive: Arc::new(RwLock::new(true)),
|
||||
is_terminated: Arc::new(AtomicBool::new(false)),
|
||||
is_alive: Arc::new(AtomicBool::new(true)),
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -115,20 +117,16 @@ impl ProcessControl {
|
||||
}
|
||||
|
||||
pub fn kill_all(&mut self) {
|
||||
*self.is_terminated.lock().unwrap() = true;
|
||||
self.is_terminated.store(true, Ordering::SeqCst);
|
||||
|
||||
if *self.is_alive.read().unwrap() {
|
||||
*self.is_alive.write().unwrap() = false;
|
||||
if self.is_alive.load(Ordering::SeqCst) {
|
||||
self.is_alive.store(false, Ordering::SeqCst);
|
||||
|
||||
if let Some(rpc) = &*self.rpc_handle.lock().unwrap() {
|
||||
rpc.clone().close()
|
||||
};
|
||||
|
||||
for unit in [
|
||||
Decoder,
|
||||
Encoder,
|
||||
Ingest,
|
||||
] {
|
||||
for unit in [Decoder, Encoder, Ingest] {
|
||||
if let Err(e) = self.kill(unit) {
|
||||
error!("{e}")
|
||||
}
|
||||
@ -147,7 +145,7 @@ impl Drop for ProcessControl {
|
||||
pub struct PlayerControl {
|
||||
pub current_media: Arc<Mutex<Option<Media>>>,
|
||||
pub current_list: Arc<Mutex<Vec<Media>>>,
|
||||
pub index: Arc<Mutex<usize>>,
|
||||
pub index: Arc<AtomicUsize>,
|
||||
}
|
||||
|
||||
impl PlayerControl {
|
||||
@ -155,7 +153,7 @@ impl PlayerControl {
|
||||
Self {
|
||||
current_media: Arc::new(Mutex::new(None)),
|
||||
current_list: Arc::new(Mutex::new(vec![Media::new(0, String::new(), false)])),
|
||||
index: Arc::new(Mutex::new(0)),
|
||||
index: Arc::new(AtomicUsize::new(0)),
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -165,7 +163,7 @@ pub struct PlayoutStatus {
|
||||
pub time_shift: Arc<Mutex<f64>>,
|
||||
pub date: Arc<Mutex<String>>,
|
||||
pub current_date: Arc<Mutex<String>>,
|
||||
pub list_init: Arc<Mutex<bool>>,
|
||||
pub list_init: Arc<AtomicBool>,
|
||||
}
|
||||
|
||||
impl PlayoutStatus {
|
||||
@ -174,8 +172,7 @@ impl PlayoutStatus {
|
||||
time_shift: Arc::new(Mutex::new(0.0)),
|
||||
date: Arc::new(Mutex::new(String::new())),
|
||||
current_date: Arc::new(Mutex::new(String::new())),
|
||||
list_init: Arc::new(Mutex::new(true)),
|
||||
list_init: Arc::new(AtomicBool::new(true)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2,7 +2,7 @@ use std::{
|
||||
fs::{create_dir_all, write},
|
||||
path::Path,
|
||||
process::exit,
|
||||
sync::{Arc, Mutex},
|
||||
sync::{atomic::AtomicUsize, Arc, Mutex},
|
||||
};
|
||||
|
||||
use chrono::{Duration, NaiveDate};
|
||||
@ -50,7 +50,7 @@ pub fn generate_playlist(mut date_range: Vec<String>) {
|
||||
let config = GlobalConfig::global();
|
||||
let total_length = config.playlist.length_sec.unwrap().clone();
|
||||
let current_list = Arc::new(Mutex::new(vec![Media::new(0, "".to_string(), false)]));
|
||||
let index = Arc::new(Mutex::new(0));
|
||||
let index = Arc::new(AtomicUsize::new(0));
|
||||
let playlist_root = Path::new(&config.playlist.path);
|
||||
|
||||
if !playlist_root.is_dir() {
|
||||
|
@ -2,11 +2,11 @@ use serde::{Deserialize, Serialize};
|
||||
use std::{
|
||||
fs::File,
|
||||
path::Path,
|
||||
sync::{Arc, Mutex},
|
||||
sync::{atomic::AtomicBool, Arc},
|
||||
thread,
|
||||
};
|
||||
|
||||
use simplelog::*;
|
||||
use tokio::runtime::Handle;
|
||||
|
||||
use crate::utils::{get_date, modified_time, validate_playlist, GlobalConfig, Media};
|
||||
|
||||
@ -46,8 +46,7 @@ impl Playlist {
|
||||
|
||||
pub fn read_json(
|
||||
path: Option<String>,
|
||||
rt_handle: Handle,
|
||||
is_terminated: Arc<Mutex<bool>>,
|
||||
is_terminated: Arc<AtomicBool>,
|
||||
seek: bool,
|
||||
next_start: f64,
|
||||
) -> Playlist {
|
||||
@ -108,11 +107,9 @@ pub fn read_json(
|
||||
start_sec += item.out - item.seek;
|
||||
}
|
||||
|
||||
rt_handle.spawn(validate_playlist(
|
||||
playlist.clone(),
|
||||
is_terminated,
|
||||
config.clone(),
|
||||
));
|
||||
let list_clone = playlist.clone();
|
||||
|
||||
thread::spawn(move || validate_playlist(list_clone, is_terminated, config.clone()));
|
||||
|
||||
playlist
|
||||
}
|
||||
|
@ -1,10 +1,10 @@
|
||||
use std::{path::Path, sync::{Arc, Mutex},};
|
||||
use std::{path::Path, sync::{atomic::{AtomicBool, Ordering}, Arc}};
|
||||
|
||||
use simplelog::*;
|
||||
|
||||
use crate::utils::{sec_to_time, GlobalConfig, MediaProbe, Playlist};
|
||||
|
||||
pub async fn validate_playlist(playlist: Playlist, is_terminated: Arc<Mutex<bool>>, config: GlobalConfig) {
|
||||
pub fn validate_playlist(playlist: Playlist, is_terminated: Arc<AtomicBool>, config: GlobalConfig) {
|
||||
let date = playlist.date;
|
||||
let mut length = config.playlist.length_sec.unwrap();
|
||||
let mut begin = config.playlist.start_sec.unwrap();
|
||||
@ -14,7 +14,7 @@ pub async fn validate_playlist(playlist: Playlist, is_terminated: Arc<Mutex<bool
|
||||
debug!("validate playlist from: <yellow>{date}</>");
|
||||
|
||||
for item in playlist.program.iter() {
|
||||
if *is_terminated.lock().unwrap() {
|
||||
if is_terminated.load(Ordering::SeqCst) {
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -1,12 +1,10 @@
|
||||
extern crate log;
|
||||
extern crate simplelog;
|
||||
|
||||
use chrono::prelude::*;
|
||||
use regex::Regex;
|
||||
use std::{
|
||||
path::Path,
|
||||
sync::{Arc, Mutex},
|
||||
thread::sleep,
|
||||
thread::{self, sleep},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
@ -15,9 +13,11 @@ use lettre::{
|
||||
message::header, transport::smtp::authentication::Credentials, Message, SmtpTransport,
|
||||
Transport,
|
||||
};
|
||||
|
||||
use chrono::prelude::*;
|
||||
use log::{Level, LevelFilter, Log, Metadata, Record};
|
||||
use regex::Regex;
|
||||
use simplelog::*;
|
||||
use tokio::runtime::Handle;
|
||||
|
||||
use crate::utils::GlobalConfig;
|
||||
|
||||
@ -52,16 +52,10 @@ fn send_mail(msg: String) {
|
||||
}
|
||||
}
|
||||
|
||||
async fn mail_queue(
|
||||
messages: Arc<Mutex<Vec<String>>>,
|
||||
is_terminated: Arc<Mutex<bool>>,
|
||||
interval: i32,
|
||||
) {
|
||||
let mut count: i32 = 0;
|
||||
fn mail_queue(messages: Arc<Mutex<Vec<String>>>, interval: u64) {
|
||||
// check every give seconds for messages and send them
|
||||
|
||||
loop {
|
||||
if *is_terminated.lock().unwrap() || count == interval {
|
||||
// check every 30 seconds for messages and send them
|
||||
if messages.lock().unwrap().len() > 0 {
|
||||
let msg = messages.lock().unwrap().join("\n");
|
||||
send_mail(msg);
|
||||
@ -69,15 +63,7 @@ async fn mail_queue(
|
||||
messages.lock().unwrap().clear();
|
||||
}
|
||||
|
||||
count = 0;
|
||||
}
|
||||
|
||||
if *is_terminated.lock().unwrap() {
|
||||
break;
|
||||
}
|
||||
|
||||
sleep(Duration::from_secs(1));
|
||||
count += 1;
|
||||
sleep(Duration::from_secs(interval));
|
||||
}
|
||||
}
|
||||
|
||||
@ -141,10 +127,7 @@ fn clean_string(text: &str) -> String {
|
||||
regex.replace_all(text, "").to_string()
|
||||
}
|
||||
|
||||
pub fn init_logging(
|
||||
rt_handle: Handle,
|
||||
is_terminated: Arc<Mutex<bool>>,
|
||||
) -> Vec<Box<dyn SharedLogger>> {
|
||||
pub fn init_logging() -> Vec<Box<dyn SharedLogger>> {
|
||||
let config = GlobalConfig::global();
|
||||
let app_config = config.logging.clone();
|
||||
let mut time_level = LevelFilter::Off;
|
||||
@ -210,17 +193,12 @@ pub fn init_logging(
|
||||
|
||||
if config.mail.recipient.contains("@") && config.mail.recipient.contains(".") {
|
||||
let messages: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
|
||||
let messages_clone = messages.clone();
|
||||
let interval = config.mail.interval.clone();
|
||||
|
||||
rt_handle.spawn(mail_queue(
|
||||
messages.clone(),
|
||||
is_terminated.clone(),
|
||||
interval,
|
||||
));
|
||||
thread::spawn(move || mail_queue(messages_clone, interval));
|
||||
|
||||
let mail_config = log_config
|
||||
.clone()
|
||||
.build();
|
||||
let mail_config = log_config.clone().build();
|
||||
|
||||
let filter = match config.mail.mail_level.to_lowercase().as_str() {
|
||||
"info" => LevelFilter::Info,
|
||||
|
@ -353,7 +353,7 @@ pub fn seek_and_length(src: String, seek: f64, out: f64, duration: f64) -> Vec<S
|
||||
source_cmd
|
||||
}
|
||||
|
||||
pub async fn stderr_reader(std_errors: ChildStderr, suffix: &str) -> Result<(), Error> {
|
||||
pub fn stderr_reader(buffer: BufReader<ChildStderr>, suffix: &str) -> Result<(), Error> {
|
||||
// read ffmpeg stderr decoder, encoder and server instance
|
||||
// and log the output
|
||||
|
||||
@ -361,7 +361,7 @@ pub async fn stderr_reader(std_errors: ChildStderr, suffix: &str) -> Result<(),
|
||||
line.replace(&format!("[{level: >5}] "), "")
|
||||
}
|
||||
|
||||
let buffer = BufReader::new(std_errors);
|
||||
// let buffer = BufReader::new(std_errors);
|
||||
|
||||
for line in buffer.lines() {
|
||||
let line = line?;
|
||||
|
Loading…
Reference in New Issue
Block a user