From 9fb120dabe55f3a91a4874d2b5706c7f82ed4a27 Mon Sep 17 00:00:00 2001 From: jb-alvarado Date: Tue, 26 Apr 2022 13:47:52 +0200 Subject: [PATCH] test standard threads instead of tokio --- Cargo.lock | 13 ++++++------- Cargo.toml | 1 - src/input/folder.rs | 7 ++++--- src/input/ingest.rs | 19 +++++++++++++------ src/input/mod.rs | 7 +++---- src/input/playlist.rs | 11 ++--------- src/main.rs | 26 ++++++++++++++------------ src/output/hls.rs | 15 ++++++++------- src/output/mod.rs | 34 +++++++++++++++++++++++----------- src/rpc/mod.rs | 2 +- src/utils/json_serializer.rs | 9 +++++---- src/utils/json_validate.rs | 2 +- src/utils/logging.rs | 15 +++++++-------- src/utils/mod.rs | 4 ++-- 14 files changed, 89 insertions(+), 76 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e077c98c..ac4216f3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -94,9 +94,9 @@ dependencies = [ [[package]] name = "clap" -version = "3.1.9" +version = "3.1.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6aad2534fad53df1cc12519c5cda696dd3e20e6118a027e24054aea14a0bdcbe" +checksum = "7c167e37342afc5f33fd87bbc870cedd020d2a6dffa05d45ccd9241fbdd146db" dependencies = [ "atty", "bitflags", @@ -216,15 +216,14 @@ dependencies = [ "serde_yaml", "shlex", "simplelog", - "tokio", "walkdir", ] [[package]] name = "ffprobe" -version = "0.3.1" +version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e52fe7c1991d1d0f77383e9f3e584860a2e916fa22b834176b84a411fac7107a" +checksum = "4151d364a3709c400c4aaca1988324f02dfde8d3e2e8543176e596d39eb414ac" dependencies = [ "serde", "serde_json", @@ -1328,9 +1327,9 @@ dependencies = [ [[package]] name = "tinyvec" -version = "1.5.1" +version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2c1c1d5a42b6245520c249549ec267180beaffcc0615401ac8e31853d4b6d8d2" +checksum = "87cc5ceb3875bb20c2890005a4e226a4651264a5c75edb2421b52861a0a0cb50" dependencies = [ "tinyvec_macros", ] diff --git a/Cargo.toml b/Cargo.toml index 76027829..6fb0b364 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,7 +25,6 @@ serde_json = "1.0" serde_yaml = "0.8" shlex = "1.1" simplelog = { version = "^0.11", features = ["paris"] } -tokio = { version = "1.16", features = ["rt-multi-thread"] } walkdir = "2" [target.x86_64-unknown-linux-musl.dependencies] diff --git a/src/input/folder.rs b/src/input/folder.rs index 3c83bb1a..f062cebe 100644 --- a/src/input/folder.rs +++ b/src/input/folder.rs @@ -6,6 +6,8 @@ use std::{ mpsc::channel, {Arc, Mutex}, }, + thread::sleep, + time::Duration, }; use notify::{ @@ -14,7 +16,6 @@ use notify::{ }; use rand::{seq::SliceRandom, thread_rng}; use simplelog::*; -use tokio::time::{sleep, Duration}; use walkdir::WalkDir; use crate::utils::{get_sec, GlobalConfig, Media}; @@ -157,7 +158,7 @@ fn file_extension(filename: &Path) -> Option<&str> { filename.extension().and_then(OsStr::to_str) } -pub async fn watchman(sources: Arc>>) { +pub fn watchman(sources: Arc>>) { let config = GlobalConfig::global(); let (tx, rx) = channel(); @@ -205,6 +206,6 @@ pub async fn watchman(sources: Arc>>) { } } - sleep(Duration::from_secs(5)).await; + sleep(Duration::from_secs(5)); } } diff --git a/src/input/ingest.rs b/src/input/ingest.rs index 7a12c00d..f03eb07d 100644 --- a/src/input/ingest.rs +++ b/src/input/ingest.rs @@ -2,11 +2,11 @@ use std::{ io::{BufReader, Error, Read}, path::Path, process::{Command, Stdio}, + thread, }; use crossbeam_channel::Sender; use simplelog::*; -use tokio::runtime::Handle; use crate::utils::{stderr_reader, GlobalConfig, Ingest, ProcessControl}; @@ -50,10 +50,9 @@ fn audio_filter(config: &GlobalConfig) -> String { audio_chain } -pub async fn ingest_server( +pub fn ingest_server( log_format: String, ingest_sender: Sender<(usize, [u8; 65088])>, - rt_handle: Handle, mut proc_control: ProcessControl, ) -> Result<(), Error> { let config = GlobalConfig::global(); @@ -111,10 +110,10 @@ pub async fn ingest_server( } Ok(proc) => proc, }; - - rt_handle.spawn(stderr_reader(server_proc.stderr.take().unwrap(), "Server")); - let mut ingest_reader = BufReader::new(server_proc.stdout.take().unwrap()); + let server_err = BufReader::new(server_proc.stderr.take().unwrap()); + let error_reader_thread = thread::spawn(move || stderr_reader(server_err, "Server")); + *proc_control.server_term.lock().unwrap() = Some(server_proc); is_running = false; @@ -151,6 +150,14 @@ pub async fn ingest_server( if let Err(e) = proc_control.wait(Ingest) { error!("{e}") } + + if let Err(e) = error_reader_thread.join() { + error!("{e:?}"); + }; + + if *proc_control.is_terminated.lock().unwrap() { + break; + } } Ok(()) diff --git a/src/input/mod.rs b/src/input/mod.rs index 0507b7b2..93d6440a 100644 --- a/src/input/mod.rs +++ b/src/input/mod.rs @@ -1,10 +1,10 @@ use std::{ process, sync::{Arc, Mutex}, + thread, }; use simplelog::*; -use tokio::runtime::Handle; use crate::utils::{GlobalConfig, Media, PlayoutStatus}; @@ -17,7 +17,6 @@ pub use ingest::ingest_server; pub use playlist::CurrentProgram; pub fn source_generator( - rt_handle: &Handle, config: GlobalConfig, current_list: Arc>>, index: Arc>, @@ -30,14 +29,14 @@ pub fn source_generator( debug!("Monitor folder: {}", &config.storage.path); let folder_source = Source::new(current_list, index); - rt_handle.spawn(watchman(folder_source.nodes.clone())); + let node_clone = folder_source.nodes.clone(); + thread::spawn(move || watchman(node_clone)); Box::new(folder_source) as Box> } "playlist" => { info!("Playout in playlist mode"); let program = CurrentProgram::new( - rt_handle.clone(), playout_stat, is_terminated.clone(), current_list, diff --git a/src/input/playlist.rs b/src/input/playlist.rs index c283884a..82c09b80 100644 --- a/src/input/playlist.rs +++ b/src/input/playlist.rs @@ -6,7 +6,6 @@ use std::{ use serde_json::json; use simplelog::*; -use tokio::runtime::Handle; use crate::utils::{ check_sync, gen_dummy, get_delta, get_sec, is_close, json_serializer::read_json, modified_time, @@ -23,21 +22,19 @@ pub struct CurrentProgram { pub nodes: Arc>>, current_node: Media, index: Arc>, - rt_handle: Handle, is_terminated: Arc>, playout_stat: PlayoutStatus, } impl CurrentProgram { pub fn new( - rt_handle: Handle, playout_stat: PlayoutStatus, is_terminated: Arc>, current_list: Arc>>, global_index: Arc>, ) -> Self { let config = GlobalConfig::global(); - let json = read_json(None, rt_handle.clone(), is_terminated.clone(), true, 0.0); + let json = read_json(None, is_terminated.clone(), true, 0.0); *current_list.lock().unwrap() = json.program; *playout_stat.current_date.lock().unwrap() = json.date.clone(); @@ -61,7 +58,6 @@ impl CurrentProgram { nodes: current_list, current_node: Media::new(0, String::new(), false), index: global_index, - rt_handle, is_terminated, playout_stat, } @@ -71,7 +67,6 @@ impl CurrentProgram { if self.json_path.is_none() { let json = read_json( None, - self.rt_handle.clone(), self.is_terminated.clone(), seek, 0.0, @@ -96,7 +91,6 @@ impl CurrentProgram { let json = read_json( self.json_path.clone(), - self.rt_handle.clone(), self.is_terminated.clone(), false, 0.0, @@ -145,7 +139,6 @@ impl CurrentProgram { { let json = read_json( None, - self.rt_handle.clone(), self.is_terminated.clone(), false, next_start, @@ -438,7 +431,7 @@ fn gen_source(mut node: Media) -> Media { node.out - node.seek ); } else { - error!("File not found: {}", node.source); + error!("File not found: {}", node.source); } let (source, cmd) = gen_dummy(node.out - node.seek); node.source = source; diff --git a/src/main.rs b/src/main.rs index 07fc434a..0427caf6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,15 +2,16 @@ extern crate log; extern crate simplelog; use std::{ + {fs, fs::File}, path::PathBuf, process::exit, - {fs, fs::File}, + thread, + }; use serde::{Deserialize, Serialize}; use serde_json::json; use simplelog::*; -use tokio::runtime::Builder; mod filter; mod input; @@ -60,10 +61,7 @@ fn main() { *playout_stat.date.lock().unwrap() = data.date; } - let runtime = Builder::new_multi_thread().enable_all().build().unwrap(); - let rt_handle = runtime.handle(); - - let logging = init_logging(rt_handle.clone()); + let logging = init_logging(); CombinedLogger::init(logging).unwrap(); validate_ffmpeg(); @@ -74,18 +72,22 @@ fn main() { exit(0); } + let play_ctl = play_control.clone(); + let play_stat = playout_stat.clone(); + let proc_ctl = proc_control.clone(); + if config.rpc_server.enable { - rt_handle.spawn(json_rpc_server( - play_control.clone(), - playout_stat.clone(), - proc_control.clone(), + thread::spawn( move || json_rpc_server( + play_ctl, + play_stat, + proc_ctl, )); } if &config.out.mode.to_lowercase() == "hls" { - write_hls(rt_handle, play_control, playout_stat, proc_control); + write_hls(play_control, playout_stat, proc_control); } else { - player(rt_handle, play_control, playout_stat, proc_control); + player(play_control, playout_stat, proc_control); } info!("Playout done..."); diff --git a/src/output/hls.rs b/src/output/hls.rs index 79079dbe..51d5869b 100644 --- a/src/output/hls.rs +++ b/src/output/hls.rs @@ -18,11 +18,12 @@ out: */ use std::{ + io::BufReader, process::{Command, Stdio}, + thread, }; use simplelog::*; -use tokio::runtime::Handle; use crate::input::source_generator; use crate::utils::{ @@ -30,7 +31,6 @@ use crate::utils::{ }; pub fn write_hls( - rt_handle: &Handle, play_control: PlayerControl, playout_stat: PlayoutStatus, proc_control: ProcessControl, @@ -40,7 +40,6 @@ pub fn write_hls( let ff_log_format = format!("level+{}", config.logging.ffmpeg_level.to_lowercase()); let get_source = source_generator( - rt_handle, config.clone(), play_control.current_list.clone(), play_control.index.clone(), @@ -93,13 +92,15 @@ pub fn write_hls( Ok(proc) => proc, }; - rt_handle.spawn(stderr_reader( - dec_proc.stderr.take().unwrap(), - "Writer", - )); + let dec_err = BufReader::new(dec_proc.stderr.take().unwrap()); + let error_decoder_thread = thread::spawn(move || stderr_reader(dec_err, "Writer")); if let Err(e) = dec_proc.wait() { error!("Writer: {e}") }; + + if let Err(e) = error_decoder_thread.join() { + error!("{e:?}"); + }; } } diff --git a/src/output/mod.rs b/src/output/mod.rs index 38d64174..04b20ac1 100644 --- a/src/output/mod.rs +++ b/src/output/mod.rs @@ -1,13 +1,13 @@ use std::{ io::{prelude::*, BufReader, BufWriter, Read}, process::{Command, Stdio}, - thread::sleep, + thread::{self, sleep}, time::Duration, }; use crossbeam_channel::bounded; use simplelog::*; -use tokio::runtime::Handle; +// use tokio::runtime::Handle; mod desktop; mod hls; @@ -22,7 +22,6 @@ use crate::utils::{ }; pub fn player( - rt_handle: &Handle, play_control: PlayerControl, playout_stat: PlayoutStatus, mut proc_control: ProcessControl, @@ -35,7 +34,6 @@ pub fn player( let playlist_init = playout_stat.list_init.clone(); let get_source = source_generator( - rt_handle, config.clone(), play_control.current_list.clone(), play_control.index.clone(), @@ -50,17 +48,21 @@ pub fn player( }; let mut enc_writer = BufWriter::new(enc_proc.stdin.take().unwrap()); - rt_handle.spawn(stderr_reader(enc_proc.stderr.take().unwrap(), "Encoder")); + let enc_err = BufReader::new(enc_proc.stderr.take().unwrap()); + let error_encoder_thread = thread::spawn(move || stderr_reader(enc_err, "Encoder")); + *proc_control.decoder_term.lock().unwrap() = Some(enc_proc); let (ingest_sender, ingest_receiver) = bounded(96); + let ff_log_format_c = ff_log_format.clone(); + let proc_control_c = proc_control.clone(); + if config.ingest.enable { - rt_handle.spawn(ingest_server( - ff_log_format.clone(), - ingest_sender.clone(), - rt_handle.clone(), - proc_control.clone(), + thread::spawn(move || ingest_server( + ff_log_format_c, + ingest_sender, + proc_control_c, )); } @@ -111,7 +113,9 @@ pub fn player( }; let mut dec_reader = BufReader::new(dec_proc.stdout.take().unwrap()); - rt_handle.spawn(stderr_reader(dec_proc.stderr.take().unwrap(), "Decoder")); + let dec_err = BufReader::new(dec_proc.stderr.take().unwrap()); + let error_decoder_thread = thread::spawn(move || stderr_reader(dec_err, "Encoder")); + *proc_control.decoder_term.lock().unwrap() = Some(dec_proc); loop { @@ -173,6 +177,10 @@ pub fn player( if let Err(e) = proc_control.wait(Decoder) { error!("{e}") } + + if let Err(e) = error_decoder_thread.join() { + error!("{e:?}"); + }; } sleep(Duration::from_secs(1)); @@ -180,4 +188,8 @@ pub fn player( if let Err(e) = proc_control.kill(Encoder) { error!("{e}") } + + if let Err(e) = error_encoder_thread.join() { + error!("{e:?}"); + }; } diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index 7957a648..fe3b4a27 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -42,7 +42,7 @@ fn get_data_map(config: &GlobalConfig, media: Media) -> Map { data_map } -pub async fn json_rpc_server( +pub fn json_rpc_server( play_control: PlayerControl, playout_stat: PlayoutStatus, proc_control: ProcessControl, diff --git a/src/utils/json_serializer.rs b/src/utils/json_serializer.rs index d1f12f69..0022c979 100644 --- a/src/utils/json_serializer.rs +++ b/src/utils/json_serializer.rs @@ -3,10 +3,10 @@ use std::{ fs::File, path::Path, sync::{Arc, Mutex}, + thread, }; use simplelog::*; -use tokio::runtime::Handle; use crate::utils::{get_date, modified_time, validate_playlist, GlobalConfig, Media}; @@ -46,7 +46,6 @@ impl Playlist { pub fn read_json( path: Option, - rt_handle: Handle, is_terminated: Arc>, seek: bool, next_start: f64, @@ -108,8 +107,10 @@ pub fn read_json( start_sec += item.out - item.seek; } - rt_handle.spawn(validate_playlist( - playlist.clone(), + let list_clone = playlist.clone(); + + thread::spawn(move || validate_playlist( + list_clone, is_terminated, config.clone(), )); diff --git a/src/utils/json_validate.rs b/src/utils/json_validate.rs index b96da6c9..b70abe61 100644 --- a/src/utils/json_validate.rs +++ b/src/utils/json_validate.rs @@ -4,7 +4,7 @@ use simplelog::*; use crate::utils::{sec_to_time, GlobalConfig, MediaProbe, Playlist}; -pub async fn validate_playlist(playlist: Playlist, is_terminated: Arc>, config: GlobalConfig) { +pub fn validate_playlist(playlist: Playlist, is_terminated: Arc>, config: GlobalConfig) { let date = playlist.date; let mut length = config.playlist.length_sec.unwrap(); let mut begin = config.playlist.start_sec.unwrap(); diff --git a/src/utils/logging.rs b/src/utils/logging.rs index 5b00ef01..bf83d732 100644 --- a/src/utils/logging.rs +++ b/src/utils/logging.rs @@ -4,6 +4,8 @@ extern crate simplelog; use std::{ path::Path, sync::{Arc, Mutex}, + thread::{self, sleep}, + time::Duration, }; use file_rotate::{compression::Compression, suffix::AppendCount, ContentLimit, FileRotate}; @@ -16,10 +18,6 @@ use chrono::prelude::*; use log::{Level, LevelFilter, Log, Metadata, Record}; use regex::Regex; use simplelog::*; -use tokio::{ - runtime::Handle, - time::{sleep, Duration}, -}; use crate::utils::GlobalConfig; @@ -54,7 +52,7 @@ fn send_mail(msg: String) { } } -async fn mail_queue(messages: Arc>>, interval: u64) { +fn mail_queue(messages: Arc>>, interval: u64) { // check every give seconds for messages and send them loop { @@ -65,7 +63,7 @@ async fn mail_queue(messages: Arc>>, interval: u64) { messages.lock().unwrap().clear(); } - sleep(Duration::from_secs(interval)).await; + sleep(Duration::from_secs(interval)); } } @@ -129,7 +127,7 @@ fn clean_string(text: &str) -> String { regex.replace_all(text, "").to_string() } -pub fn init_logging(rt_handle: Handle) -> Vec> { +pub fn init_logging() -> Vec> { let config = GlobalConfig::global(); let app_config = config.logging.clone(); let mut time_level = LevelFilter::Off; @@ -195,9 +193,10 @@ pub fn init_logging(rt_handle: Handle) -> Vec> { if config.mail.recipient.contains("@") && config.mail.recipient.contains(".") { let messages: Arc>> = Arc::new(Mutex::new(Vec::new())); + let messages_clone = messages.clone(); let interval = config.mail.interval.clone(); - rt_handle.spawn(mail_queue(messages.clone(), interval)); + thread::spawn(move || mail_queue(messages_clone, interval)); let mail_config = log_config.clone().build(); diff --git a/src/utils/mod.rs b/src/utils/mod.rs index e778d9f1..ee26434b 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -353,7 +353,7 @@ pub fn seek_and_length(src: String, seek: f64, out: f64, duration: f64) -> Vec Result<(), Error> { +pub fn stderr_reader(buffer: BufReader, suffix: &str) -> Result<(), Error> { // read ffmpeg stderr decoder, encoder and server instance // and log the output @@ -361,7 +361,7 @@ pub async fn stderr_reader(std_errors: ChildStderr, suffix: &str) -> Result<(), line.replace(&format!("[{level: >5}] "), "") } - let buffer = BufReader::new(std_errors); + // let buffer = BufReader::new(std_errors); for line in buffer.lines() { let line = line?;