test standard threads instead of tokio
This commit is contained in:
parent
372c3ba773
commit
9fb120dabe
13
Cargo.lock
generated
13
Cargo.lock
generated
@ -94,9 +94,9 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "clap"
|
name = "clap"
|
||||||
version = "3.1.9"
|
version = "3.1.12"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "6aad2534fad53df1cc12519c5cda696dd3e20e6118a027e24054aea14a0bdcbe"
|
checksum = "7c167e37342afc5f33fd87bbc870cedd020d2a6dffa05d45ccd9241fbdd146db"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"atty",
|
"atty",
|
||||||
"bitflags",
|
"bitflags",
|
||||||
@ -216,15 +216,14 @@ dependencies = [
|
|||||||
"serde_yaml",
|
"serde_yaml",
|
||||||
"shlex",
|
"shlex",
|
||||||
"simplelog",
|
"simplelog",
|
||||||
"tokio",
|
|
||||||
"walkdir",
|
"walkdir",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "ffprobe"
|
name = "ffprobe"
|
||||||
version = "0.3.1"
|
version = "0.3.2"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "e52fe7c1991d1d0f77383e9f3e584860a2e916fa22b834176b84a411fac7107a"
|
checksum = "4151d364a3709c400c4aaca1988324f02dfde8d3e2e8543176e596d39eb414ac"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"serde",
|
"serde",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
@ -1328,9 +1327,9 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "tinyvec"
|
name = "tinyvec"
|
||||||
version = "1.5.1"
|
version = "1.6.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "2c1c1d5a42b6245520c249549ec267180beaffcc0615401ac8e31853d4b6d8d2"
|
checksum = "87cc5ceb3875bb20c2890005a4e226a4651264a5c75edb2421b52861a0a0cb50"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"tinyvec_macros",
|
"tinyvec_macros",
|
||||||
]
|
]
|
||||||
|
@ -25,7 +25,6 @@ serde_json = "1.0"
|
|||||||
serde_yaml = "0.8"
|
serde_yaml = "0.8"
|
||||||
shlex = "1.1"
|
shlex = "1.1"
|
||||||
simplelog = { version = "^0.11", features = ["paris"] }
|
simplelog = { version = "^0.11", features = ["paris"] }
|
||||||
tokio = { version = "1.16", features = ["rt-multi-thread"] }
|
|
||||||
walkdir = "2"
|
walkdir = "2"
|
||||||
|
|
||||||
[target.x86_64-unknown-linux-musl.dependencies]
|
[target.x86_64-unknown-linux-musl.dependencies]
|
||||||
|
@ -6,6 +6,8 @@ use std::{
|
|||||||
mpsc::channel,
|
mpsc::channel,
|
||||||
{Arc, Mutex},
|
{Arc, Mutex},
|
||||||
},
|
},
|
||||||
|
thread::sleep,
|
||||||
|
time::Duration,
|
||||||
};
|
};
|
||||||
|
|
||||||
use notify::{
|
use notify::{
|
||||||
@ -14,7 +16,6 @@ use notify::{
|
|||||||
};
|
};
|
||||||
use rand::{seq::SliceRandom, thread_rng};
|
use rand::{seq::SliceRandom, thread_rng};
|
||||||
use simplelog::*;
|
use simplelog::*;
|
||||||
use tokio::time::{sleep, Duration};
|
|
||||||
use walkdir::WalkDir;
|
use walkdir::WalkDir;
|
||||||
|
|
||||||
use crate::utils::{get_sec, GlobalConfig, Media};
|
use crate::utils::{get_sec, GlobalConfig, Media};
|
||||||
@ -157,7 +158,7 @@ fn file_extension(filename: &Path) -> Option<&str> {
|
|||||||
filename.extension().and_then(OsStr::to_str)
|
filename.extension().and_then(OsStr::to_str)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn watchman(sources: Arc<Mutex<Vec<Media>>>) {
|
pub fn watchman(sources: Arc<Mutex<Vec<Media>>>) {
|
||||||
let config = GlobalConfig::global();
|
let config = GlobalConfig::global();
|
||||||
let (tx, rx) = channel();
|
let (tx, rx) = channel();
|
||||||
|
|
||||||
@ -205,6 +206,6 @@ pub async fn watchman(sources: Arc<Mutex<Vec<Media>>>) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
sleep(Duration::from_secs(5)).await;
|
sleep(Duration::from_secs(5));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2,11 +2,11 @@ use std::{
|
|||||||
io::{BufReader, Error, Read},
|
io::{BufReader, Error, Read},
|
||||||
path::Path,
|
path::Path,
|
||||||
process::{Command, Stdio},
|
process::{Command, Stdio},
|
||||||
|
thread,
|
||||||
};
|
};
|
||||||
|
|
||||||
use crossbeam_channel::Sender;
|
use crossbeam_channel::Sender;
|
||||||
use simplelog::*;
|
use simplelog::*;
|
||||||
use tokio::runtime::Handle;
|
|
||||||
|
|
||||||
use crate::utils::{stderr_reader, GlobalConfig, Ingest, ProcessControl};
|
use crate::utils::{stderr_reader, GlobalConfig, Ingest, ProcessControl};
|
||||||
|
|
||||||
@ -50,10 +50,9 @@ fn audio_filter(config: &GlobalConfig) -> String {
|
|||||||
audio_chain
|
audio_chain
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn ingest_server(
|
pub fn ingest_server(
|
||||||
log_format: String,
|
log_format: String,
|
||||||
ingest_sender: Sender<(usize, [u8; 65088])>,
|
ingest_sender: Sender<(usize, [u8; 65088])>,
|
||||||
rt_handle: Handle,
|
|
||||||
mut proc_control: ProcessControl,
|
mut proc_control: ProcessControl,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
let config = GlobalConfig::global();
|
let config = GlobalConfig::global();
|
||||||
@ -111,10 +110,10 @@ pub async fn ingest_server(
|
|||||||
}
|
}
|
||||||
Ok(proc) => proc,
|
Ok(proc) => proc,
|
||||||
};
|
};
|
||||||
|
|
||||||
rt_handle.spawn(stderr_reader(server_proc.stderr.take().unwrap(), "Server"));
|
|
||||||
|
|
||||||
let mut ingest_reader = BufReader::new(server_proc.stdout.take().unwrap());
|
let mut ingest_reader = BufReader::new(server_proc.stdout.take().unwrap());
|
||||||
|
let server_err = BufReader::new(server_proc.stderr.take().unwrap());
|
||||||
|
let error_reader_thread = thread::spawn(move || stderr_reader(server_err, "Server"));
|
||||||
|
|
||||||
*proc_control.server_term.lock().unwrap() = Some(server_proc);
|
*proc_control.server_term.lock().unwrap() = Some(server_proc);
|
||||||
|
|
||||||
is_running = false;
|
is_running = false;
|
||||||
@ -151,6 +150,14 @@ pub async fn ingest_server(
|
|||||||
if let Err(e) = proc_control.wait(Ingest) {
|
if let Err(e) = proc_control.wait(Ingest) {
|
||||||
error!("{e}")
|
error!("{e}")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if let Err(e) = error_reader_thread.join() {
|
||||||
|
error!("{e:?}");
|
||||||
|
};
|
||||||
|
|
||||||
|
if *proc_control.is_terminated.lock().unwrap() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -1,10 +1,10 @@
|
|||||||
use std::{
|
use std::{
|
||||||
process,
|
process,
|
||||||
sync::{Arc, Mutex},
|
sync::{Arc, Mutex},
|
||||||
|
thread,
|
||||||
};
|
};
|
||||||
|
|
||||||
use simplelog::*;
|
use simplelog::*;
|
||||||
use tokio::runtime::Handle;
|
|
||||||
|
|
||||||
use crate::utils::{GlobalConfig, Media, PlayoutStatus};
|
use crate::utils::{GlobalConfig, Media, PlayoutStatus};
|
||||||
|
|
||||||
@ -17,7 +17,6 @@ pub use ingest::ingest_server;
|
|||||||
pub use playlist::CurrentProgram;
|
pub use playlist::CurrentProgram;
|
||||||
|
|
||||||
pub fn source_generator(
|
pub fn source_generator(
|
||||||
rt_handle: &Handle,
|
|
||||||
config: GlobalConfig,
|
config: GlobalConfig,
|
||||||
current_list: Arc<Mutex<Vec<Media>>>,
|
current_list: Arc<Mutex<Vec<Media>>>,
|
||||||
index: Arc<Mutex<usize>>,
|
index: Arc<Mutex<usize>>,
|
||||||
@ -30,14 +29,14 @@ pub fn source_generator(
|
|||||||
debug!("Monitor folder: <b><magenta>{}</></b>", &config.storage.path);
|
debug!("Monitor folder: <b><magenta>{}</></b>", &config.storage.path);
|
||||||
|
|
||||||
let folder_source = Source::new(current_list, index);
|
let folder_source = Source::new(current_list, index);
|
||||||
rt_handle.spawn(watchman(folder_source.nodes.clone()));
|
let node_clone = folder_source.nodes.clone();
|
||||||
|
thread::spawn(move || watchman(node_clone));
|
||||||
|
|
||||||
Box::new(folder_source) as Box<dyn Iterator<Item = Media>>
|
Box::new(folder_source) as Box<dyn Iterator<Item = Media>>
|
||||||
}
|
}
|
||||||
"playlist" => {
|
"playlist" => {
|
||||||
info!("Playout in playlist mode");
|
info!("Playout in playlist mode");
|
||||||
let program = CurrentProgram::new(
|
let program = CurrentProgram::new(
|
||||||
rt_handle.clone(),
|
|
||||||
playout_stat,
|
playout_stat,
|
||||||
is_terminated.clone(),
|
is_terminated.clone(),
|
||||||
current_list,
|
current_list,
|
||||||
|
@ -6,7 +6,6 @@ use std::{
|
|||||||
|
|
||||||
use serde_json::json;
|
use serde_json::json;
|
||||||
use simplelog::*;
|
use simplelog::*;
|
||||||
use tokio::runtime::Handle;
|
|
||||||
|
|
||||||
use crate::utils::{
|
use crate::utils::{
|
||||||
check_sync, gen_dummy, get_delta, get_sec, is_close, json_serializer::read_json, modified_time,
|
check_sync, gen_dummy, get_delta, get_sec, is_close, json_serializer::read_json, modified_time,
|
||||||
@ -23,21 +22,19 @@ pub struct CurrentProgram {
|
|||||||
pub nodes: Arc<Mutex<Vec<Media>>>,
|
pub nodes: Arc<Mutex<Vec<Media>>>,
|
||||||
current_node: Media,
|
current_node: Media,
|
||||||
index: Arc<Mutex<usize>>,
|
index: Arc<Mutex<usize>>,
|
||||||
rt_handle: Handle,
|
|
||||||
is_terminated: Arc<Mutex<bool>>,
|
is_terminated: Arc<Mutex<bool>>,
|
||||||
playout_stat: PlayoutStatus,
|
playout_stat: PlayoutStatus,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl CurrentProgram {
|
impl CurrentProgram {
|
||||||
pub fn new(
|
pub fn new(
|
||||||
rt_handle: Handle,
|
|
||||||
playout_stat: PlayoutStatus,
|
playout_stat: PlayoutStatus,
|
||||||
is_terminated: Arc<Mutex<bool>>,
|
is_terminated: Arc<Mutex<bool>>,
|
||||||
current_list: Arc<Mutex<Vec<Media>>>,
|
current_list: Arc<Mutex<Vec<Media>>>,
|
||||||
global_index: Arc<Mutex<usize>>,
|
global_index: Arc<Mutex<usize>>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let config = GlobalConfig::global();
|
let config = GlobalConfig::global();
|
||||||
let json = read_json(None, rt_handle.clone(), is_terminated.clone(), true, 0.0);
|
let json = read_json(None, is_terminated.clone(), true, 0.0);
|
||||||
|
|
||||||
*current_list.lock().unwrap() = json.program;
|
*current_list.lock().unwrap() = json.program;
|
||||||
*playout_stat.current_date.lock().unwrap() = json.date.clone();
|
*playout_stat.current_date.lock().unwrap() = json.date.clone();
|
||||||
@ -61,7 +58,6 @@ impl CurrentProgram {
|
|||||||
nodes: current_list,
|
nodes: current_list,
|
||||||
current_node: Media::new(0, String::new(), false),
|
current_node: Media::new(0, String::new(), false),
|
||||||
index: global_index,
|
index: global_index,
|
||||||
rt_handle,
|
|
||||||
is_terminated,
|
is_terminated,
|
||||||
playout_stat,
|
playout_stat,
|
||||||
}
|
}
|
||||||
@ -71,7 +67,6 @@ impl CurrentProgram {
|
|||||||
if self.json_path.is_none() {
|
if self.json_path.is_none() {
|
||||||
let json = read_json(
|
let json = read_json(
|
||||||
None,
|
None,
|
||||||
self.rt_handle.clone(),
|
|
||||||
self.is_terminated.clone(),
|
self.is_terminated.clone(),
|
||||||
seek,
|
seek,
|
||||||
0.0,
|
0.0,
|
||||||
@ -96,7 +91,6 @@ impl CurrentProgram {
|
|||||||
|
|
||||||
let json = read_json(
|
let json = read_json(
|
||||||
self.json_path.clone(),
|
self.json_path.clone(),
|
||||||
self.rt_handle.clone(),
|
|
||||||
self.is_terminated.clone(),
|
self.is_terminated.clone(),
|
||||||
false,
|
false,
|
||||||
0.0,
|
0.0,
|
||||||
@ -145,7 +139,6 @@ impl CurrentProgram {
|
|||||||
{
|
{
|
||||||
let json = read_json(
|
let json = read_json(
|
||||||
None,
|
None,
|
||||||
self.rt_handle.clone(),
|
|
||||||
self.is_terminated.clone(),
|
self.is_terminated.clone(),
|
||||||
false,
|
false,
|
||||||
next_start,
|
next_start,
|
||||||
@ -438,7 +431,7 @@ fn gen_source(mut node: Media) -> Media {
|
|||||||
node.out - node.seek
|
node.out - node.seek
|
||||||
);
|
);
|
||||||
} else {
|
} else {
|
||||||
error!("File not found: {}", node.source);
|
error!("File not found: <b><magenta>{}</></b>", node.source);
|
||||||
}
|
}
|
||||||
let (source, cmd) = gen_dummy(node.out - node.seek);
|
let (source, cmd) = gen_dummy(node.out - node.seek);
|
||||||
node.source = source;
|
node.source = source;
|
||||||
|
26
src/main.rs
26
src/main.rs
@ -2,15 +2,16 @@ extern crate log;
|
|||||||
extern crate simplelog;
|
extern crate simplelog;
|
||||||
|
|
||||||
use std::{
|
use std::{
|
||||||
|
{fs, fs::File},
|
||||||
path::PathBuf,
|
path::PathBuf,
|
||||||
process::exit,
|
process::exit,
|
||||||
{fs, fs::File},
|
thread,
|
||||||
|
|
||||||
};
|
};
|
||||||
|
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use serde_json::json;
|
use serde_json::json;
|
||||||
use simplelog::*;
|
use simplelog::*;
|
||||||
use tokio::runtime::Builder;
|
|
||||||
|
|
||||||
mod filter;
|
mod filter;
|
||||||
mod input;
|
mod input;
|
||||||
@ -60,10 +61,7 @@ fn main() {
|
|||||||
*playout_stat.date.lock().unwrap() = data.date;
|
*playout_stat.date.lock().unwrap() = data.date;
|
||||||
}
|
}
|
||||||
|
|
||||||
let runtime = Builder::new_multi_thread().enable_all().build().unwrap();
|
let logging = init_logging();
|
||||||
let rt_handle = runtime.handle();
|
|
||||||
|
|
||||||
let logging = init_logging(rt_handle.clone());
|
|
||||||
CombinedLogger::init(logging).unwrap();
|
CombinedLogger::init(logging).unwrap();
|
||||||
|
|
||||||
validate_ffmpeg();
|
validate_ffmpeg();
|
||||||
@ -74,18 +72,22 @@ fn main() {
|
|||||||
exit(0);
|
exit(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let play_ctl = play_control.clone();
|
||||||
|
let play_stat = playout_stat.clone();
|
||||||
|
let proc_ctl = proc_control.clone();
|
||||||
|
|
||||||
if config.rpc_server.enable {
|
if config.rpc_server.enable {
|
||||||
rt_handle.spawn(json_rpc_server(
|
thread::spawn( move || json_rpc_server(
|
||||||
play_control.clone(),
|
play_ctl,
|
||||||
playout_stat.clone(),
|
play_stat,
|
||||||
proc_control.clone(),
|
proc_ctl,
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
if &config.out.mode.to_lowercase() == "hls" {
|
if &config.out.mode.to_lowercase() == "hls" {
|
||||||
write_hls(rt_handle, play_control, playout_stat, proc_control);
|
write_hls(play_control, playout_stat, proc_control);
|
||||||
} else {
|
} else {
|
||||||
player(rt_handle, play_control, playout_stat, proc_control);
|
player(play_control, playout_stat, proc_control);
|
||||||
}
|
}
|
||||||
|
|
||||||
info!("Playout done...");
|
info!("Playout done...");
|
||||||
|
@ -18,11 +18,12 @@ out:
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
use std::{
|
use std::{
|
||||||
|
io::BufReader,
|
||||||
process::{Command, Stdio},
|
process::{Command, Stdio},
|
||||||
|
thread,
|
||||||
};
|
};
|
||||||
|
|
||||||
use simplelog::*;
|
use simplelog::*;
|
||||||
use tokio::runtime::Handle;
|
|
||||||
|
|
||||||
use crate::input::source_generator;
|
use crate::input::source_generator;
|
||||||
use crate::utils::{
|
use crate::utils::{
|
||||||
@ -30,7 +31,6 @@ use crate::utils::{
|
|||||||
};
|
};
|
||||||
|
|
||||||
pub fn write_hls(
|
pub fn write_hls(
|
||||||
rt_handle: &Handle,
|
|
||||||
play_control: PlayerControl,
|
play_control: PlayerControl,
|
||||||
playout_stat: PlayoutStatus,
|
playout_stat: PlayoutStatus,
|
||||||
proc_control: ProcessControl,
|
proc_control: ProcessControl,
|
||||||
@ -40,7 +40,6 @@ pub fn write_hls(
|
|||||||
let ff_log_format = format!("level+{}", config.logging.ffmpeg_level.to_lowercase());
|
let ff_log_format = format!("level+{}", config.logging.ffmpeg_level.to_lowercase());
|
||||||
|
|
||||||
let get_source = source_generator(
|
let get_source = source_generator(
|
||||||
rt_handle,
|
|
||||||
config.clone(),
|
config.clone(),
|
||||||
play_control.current_list.clone(),
|
play_control.current_list.clone(),
|
||||||
play_control.index.clone(),
|
play_control.index.clone(),
|
||||||
@ -93,13 +92,15 @@ pub fn write_hls(
|
|||||||
Ok(proc) => proc,
|
Ok(proc) => proc,
|
||||||
};
|
};
|
||||||
|
|
||||||
rt_handle.spawn(stderr_reader(
|
let dec_err = BufReader::new(dec_proc.stderr.take().unwrap());
|
||||||
dec_proc.stderr.take().unwrap(),
|
let error_decoder_thread = thread::spawn(move || stderr_reader(dec_err, "Writer"));
|
||||||
"Writer",
|
|
||||||
));
|
|
||||||
|
|
||||||
if let Err(e) = dec_proc.wait() {
|
if let Err(e) = dec_proc.wait() {
|
||||||
error!("Writer: {e}")
|
error!("Writer: {e}")
|
||||||
};
|
};
|
||||||
|
|
||||||
|
if let Err(e) = error_decoder_thread.join() {
|
||||||
|
error!("{e:?}");
|
||||||
|
};
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,13 +1,13 @@
|
|||||||
use std::{
|
use std::{
|
||||||
io::{prelude::*, BufReader, BufWriter, Read},
|
io::{prelude::*, BufReader, BufWriter, Read},
|
||||||
process::{Command, Stdio},
|
process::{Command, Stdio},
|
||||||
thread::sleep,
|
thread::{self, sleep},
|
||||||
time::Duration,
|
time::Duration,
|
||||||
};
|
};
|
||||||
|
|
||||||
use crossbeam_channel::bounded;
|
use crossbeam_channel::bounded;
|
||||||
use simplelog::*;
|
use simplelog::*;
|
||||||
use tokio::runtime::Handle;
|
// use tokio::runtime::Handle;
|
||||||
|
|
||||||
mod desktop;
|
mod desktop;
|
||||||
mod hls;
|
mod hls;
|
||||||
@ -22,7 +22,6 @@ use crate::utils::{
|
|||||||
};
|
};
|
||||||
|
|
||||||
pub fn player(
|
pub fn player(
|
||||||
rt_handle: &Handle,
|
|
||||||
play_control: PlayerControl,
|
play_control: PlayerControl,
|
||||||
playout_stat: PlayoutStatus,
|
playout_stat: PlayoutStatus,
|
||||||
mut proc_control: ProcessControl,
|
mut proc_control: ProcessControl,
|
||||||
@ -35,7 +34,6 @@ pub fn player(
|
|||||||
let playlist_init = playout_stat.list_init.clone();
|
let playlist_init = playout_stat.list_init.clone();
|
||||||
|
|
||||||
let get_source = source_generator(
|
let get_source = source_generator(
|
||||||
rt_handle,
|
|
||||||
config.clone(),
|
config.clone(),
|
||||||
play_control.current_list.clone(),
|
play_control.current_list.clone(),
|
||||||
play_control.index.clone(),
|
play_control.index.clone(),
|
||||||
@ -50,17 +48,21 @@ pub fn player(
|
|||||||
};
|
};
|
||||||
|
|
||||||
let mut enc_writer = BufWriter::new(enc_proc.stdin.take().unwrap());
|
let mut enc_writer = BufWriter::new(enc_proc.stdin.take().unwrap());
|
||||||
rt_handle.spawn(stderr_reader(enc_proc.stderr.take().unwrap(), "Encoder"));
|
let enc_err = BufReader::new(enc_proc.stderr.take().unwrap());
|
||||||
|
let error_encoder_thread = thread::spawn(move || stderr_reader(enc_err, "Encoder"));
|
||||||
|
|
||||||
*proc_control.decoder_term.lock().unwrap() = Some(enc_proc);
|
*proc_control.decoder_term.lock().unwrap() = Some(enc_proc);
|
||||||
|
|
||||||
let (ingest_sender, ingest_receiver) = bounded(96);
|
let (ingest_sender, ingest_receiver) = bounded(96);
|
||||||
|
|
||||||
|
let ff_log_format_c = ff_log_format.clone();
|
||||||
|
let proc_control_c = proc_control.clone();
|
||||||
|
|
||||||
if config.ingest.enable {
|
if config.ingest.enable {
|
||||||
rt_handle.spawn(ingest_server(
|
thread::spawn(move || ingest_server(
|
||||||
ff_log_format.clone(),
|
ff_log_format_c,
|
||||||
ingest_sender.clone(),
|
ingest_sender,
|
||||||
rt_handle.clone(),
|
proc_control_c,
|
||||||
proc_control.clone(),
|
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -111,7 +113,9 @@ pub fn player(
|
|||||||
};
|
};
|
||||||
|
|
||||||
let mut dec_reader = BufReader::new(dec_proc.stdout.take().unwrap());
|
let mut dec_reader = BufReader::new(dec_proc.stdout.take().unwrap());
|
||||||
rt_handle.spawn(stderr_reader(dec_proc.stderr.take().unwrap(), "Decoder"));
|
let dec_err = BufReader::new(dec_proc.stderr.take().unwrap());
|
||||||
|
let error_decoder_thread = thread::spawn(move || stderr_reader(dec_err, "Encoder"));
|
||||||
|
|
||||||
*proc_control.decoder_term.lock().unwrap() = Some(dec_proc);
|
*proc_control.decoder_term.lock().unwrap() = Some(dec_proc);
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
@ -173,6 +177,10 @@ pub fn player(
|
|||||||
if let Err(e) = proc_control.wait(Decoder) {
|
if let Err(e) = proc_control.wait(Decoder) {
|
||||||
error!("{e}")
|
error!("{e}")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if let Err(e) = error_decoder_thread.join() {
|
||||||
|
error!("{e:?}");
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
sleep(Duration::from_secs(1));
|
sleep(Duration::from_secs(1));
|
||||||
@ -180,4 +188,8 @@ pub fn player(
|
|||||||
if let Err(e) = proc_control.kill(Encoder) {
|
if let Err(e) = proc_control.kill(Encoder) {
|
||||||
error!("{e}")
|
error!("{e}")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if let Err(e) = error_encoder_thread.join() {
|
||||||
|
error!("{e:?}");
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
@ -42,7 +42,7 @@ fn get_data_map(config: &GlobalConfig, media: Media) -> Map<String, Value> {
|
|||||||
data_map
|
data_map
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn json_rpc_server(
|
pub fn json_rpc_server(
|
||||||
play_control: PlayerControl,
|
play_control: PlayerControl,
|
||||||
playout_stat: PlayoutStatus,
|
playout_stat: PlayoutStatus,
|
||||||
proc_control: ProcessControl,
|
proc_control: ProcessControl,
|
||||||
|
@ -3,10 +3,10 @@ use std::{
|
|||||||
fs::File,
|
fs::File,
|
||||||
path::Path,
|
path::Path,
|
||||||
sync::{Arc, Mutex},
|
sync::{Arc, Mutex},
|
||||||
|
thread,
|
||||||
};
|
};
|
||||||
|
|
||||||
use simplelog::*;
|
use simplelog::*;
|
||||||
use tokio::runtime::Handle;
|
|
||||||
|
|
||||||
use crate::utils::{get_date, modified_time, validate_playlist, GlobalConfig, Media};
|
use crate::utils::{get_date, modified_time, validate_playlist, GlobalConfig, Media};
|
||||||
|
|
||||||
@ -46,7 +46,6 @@ impl Playlist {
|
|||||||
|
|
||||||
pub fn read_json(
|
pub fn read_json(
|
||||||
path: Option<String>,
|
path: Option<String>,
|
||||||
rt_handle: Handle,
|
|
||||||
is_terminated: Arc<Mutex<bool>>,
|
is_terminated: Arc<Mutex<bool>>,
|
||||||
seek: bool,
|
seek: bool,
|
||||||
next_start: f64,
|
next_start: f64,
|
||||||
@ -108,8 +107,10 @@ pub fn read_json(
|
|||||||
start_sec += item.out - item.seek;
|
start_sec += item.out - item.seek;
|
||||||
}
|
}
|
||||||
|
|
||||||
rt_handle.spawn(validate_playlist(
|
let list_clone = playlist.clone();
|
||||||
playlist.clone(),
|
|
||||||
|
thread::spawn(move || validate_playlist(
|
||||||
|
list_clone,
|
||||||
is_terminated,
|
is_terminated,
|
||||||
config.clone(),
|
config.clone(),
|
||||||
));
|
));
|
||||||
|
@ -4,7 +4,7 @@ use simplelog::*;
|
|||||||
|
|
||||||
use crate::utils::{sec_to_time, GlobalConfig, MediaProbe, Playlist};
|
use crate::utils::{sec_to_time, GlobalConfig, MediaProbe, Playlist};
|
||||||
|
|
||||||
pub async fn validate_playlist(playlist: Playlist, is_terminated: Arc<Mutex<bool>>, config: GlobalConfig) {
|
pub fn validate_playlist(playlist: Playlist, is_terminated: Arc<Mutex<bool>>, config: GlobalConfig) {
|
||||||
let date = playlist.date;
|
let date = playlist.date;
|
||||||
let mut length = config.playlist.length_sec.unwrap();
|
let mut length = config.playlist.length_sec.unwrap();
|
||||||
let mut begin = config.playlist.start_sec.unwrap();
|
let mut begin = config.playlist.start_sec.unwrap();
|
||||||
|
@ -4,6 +4,8 @@ extern crate simplelog;
|
|||||||
use std::{
|
use std::{
|
||||||
path::Path,
|
path::Path,
|
||||||
sync::{Arc, Mutex},
|
sync::{Arc, Mutex},
|
||||||
|
thread::{self, sleep},
|
||||||
|
time::Duration,
|
||||||
};
|
};
|
||||||
|
|
||||||
use file_rotate::{compression::Compression, suffix::AppendCount, ContentLimit, FileRotate};
|
use file_rotate::{compression::Compression, suffix::AppendCount, ContentLimit, FileRotate};
|
||||||
@ -16,10 +18,6 @@ use chrono::prelude::*;
|
|||||||
use log::{Level, LevelFilter, Log, Metadata, Record};
|
use log::{Level, LevelFilter, Log, Metadata, Record};
|
||||||
use regex::Regex;
|
use regex::Regex;
|
||||||
use simplelog::*;
|
use simplelog::*;
|
||||||
use tokio::{
|
|
||||||
runtime::Handle,
|
|
||||||
time::{sleep, Duration},
|
|
||||||
};
|
|
||||||
|
|
||||||
use crate::utils::GlobalConfig;
|
use crate::utils::GlobalConfig;
|
||||||
|
|
||||||
@ -54,7 +52,7 @@ fn send_mail(msg: String) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn mail_queue(messages: Arc<Mutex<Vec<String>>>, interval: u64) {
|
fn mail_queue(messages: Arc<Mutex<Vec<String>>>, interval: u64) {
|
||||||
// check every give seconds for messages and send them
|
// check every give seconds for messages and send them
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
@ -65,7 +63,7 @@ async fn mail_queue(messages: Arc<Mutex<Vec<String>>>, interval: u64) {
|
|||||||
messages.lock().unwrap().clear();
|
messages.lock().unwrap().clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
sleep(Duration::from_secs(interval)).await;
|
sleep(Duration::from_secs(interval));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -129,7 +127,7 @@ fn clean_string(text: &str) -> String {
|
|||||||
regex.replace_all(text, "").to_string()
|
regex.replace_all(text, "").to_string()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn init_logging(rt_handle: Handle) -> Vec<Box<dyn SharedLogger>> {
|
pub fn init_logging() -> Vec<Box<dyn SharedLogger>> {
|
||||||
let config = GlobalConfig::global();
|
let config = GlobalConfig::global();
|
||||||
let app_config = config.logging.clone();
|
let app_config = config.logging.clone();
|
||||||
let mut time_level = LevelFilter::Off;
|
let mut time_level = LevelFilter::Off;
|
||||||
@ -195,9 +193,10 @@ pub fn init_logging(rt_handle: Handle) -> Vec<Box<dyn SharedLogger>> {
|
|||||||
|
|
||||||
if config.mail.recipient.contains("@") && config.mail.recipient.contains(".") {
|
if config.mail.recipient.contains("@") && config.mail.recipient.contains(".") {
|
||||||
let messages: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
|
let messages: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
|
||||||
|
let messages_clone = messages.clone();
|
||||||
let interval = config.mail.interval.clone();
|
let interval = config.mail.interval.clone();
|
||||||
|
|
||||||
rt_handle.spawn(mail_queue(messages.clone(), interval));
|
thread::spawn(move || mail_queue(messages_clone, interval));
|
||||||
|
|
||||||
let mail_config = log_config.clone().build();
|
let mail_config = log_config.clone().build();
|
||||||
|
|
||||||
|
@ -353,7 +353,7 @@ pub fn seek_and_length(src: String, seek: f64, out: f64, duration: f64) -> Vec<S
|
|||||||
source_cmd
|
source_cmd
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn stderr_reader(std_errors: ChildStderr, suffix: &str) -> Result<(), Error> {
|
pub fn stderr_reader(buffer: BufReader<ChildStderr>, suffix: &str) -> Result<(), Error> {
|
||||||
// read ffmpeg stderr decoder, encoder and server instance
|
// read ffmpeg stderr decoder, encoder and server instance
|
||||||
// and log the output
|
// and log the output
|
||||||
|
|
||||||
@ -361,7 +361,7 @@ pub async fn stderr_reader(std_errors: ChildStderr, suffix: &str) -> Result<(),
|
|||||||
line.replace(&format!("[{level: >5}] "), "")
|
line.replace(&format!("[{level: >5}] "), "")
|
||||||
}
|
}
|
||||||
|
|
||||||
let buffer = BufReader::new(std_errors);
|
// let buffer = BufReader::new(std_errors);
|
||||||
|
|
||||||
for line in buffer.lines() {
|
for line in buffer.lines() {
|
||||||
let line = line?;
|
let line = line?;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user