diff --git a/src/input/folder.rs b/src/input/folder.rs index f062cebe..7afa3012 100644 --- a/src/input/folder.rs +++ b/src/input/folder.rs @@ -3,6 +3,7 @@ use std::{ path::Path, process::exit, sync::{ + atomic::{AtomicUsize, Ordering}, mpsc::channel, {Arc, Mutex}, }, @@ -25,11 +26,11 @@ pub struct Source { config: GlobalConfig, pub nodes: Arc>>, current_node: Media, - index: Arc>, + index: Arc, } impl Source { - pub fn new(current_list: Arc>>, global_index: Arc>) -> Self { + pub fn new(current_list: Arc>>, global_index: Arc) -> Self { let config = GlobalConfig::global(); let mut media_list = vec![]; let mut index: usize = 0; @@ -117,14 +118,14 @@ impl Iterator for Source { type Item = Media; fn next(&mut self) -> Option { - if *self.index.lock().unwrap() < self.nodes.lock().unwrap().len() { - let i = *self.index.lock().unwrap(); + if self.index.load(Ordering::SeqCst) < self.nodes.lock().unwrap().len() { + let i = self.index.load(Ordering::SeqCst); self.current_node = self.nodes.lock().unwrap()[i].clone(); self.current_node.add_probe(); self.current_node.add_filter(); self.current_node.begin = Some(get_sec()); - *self.index.lock().unwrap() += 1; + self.index.store(i + 1, Ordering::SeqCst); Some(self.current_node.clone()) } else { @@ -147,7 +148,7 @@ impl Iterator for Source { self.current_node.add_filter(); self.current_node.begin = Some(get_sec()); - *self.index.lock().unwrap() = 1; + self.index.store(1, Ordering::SeqCst); Some(self.current_node.clone()) } diff --git a/src/input/ingest.rs b/src/input/ingest.rs index f03eb07d..66ecc59d 100644 --- a/src/input/ingest.rs +++ b/src/input/ingest.rs @@ -2,6 +2,7 @@ use std::{ io::{BufReader, Error, Read}, path::Path, process::{Command, Stdio}, + sync::atomic::Ordering, thread, }; @@ -128,7 +129,7 @@ pub fn ingest_server( }; if !is_running { - *proc_control.server_is_running.lock().unwrap() = true; + proc_control.server_is_running.store(true, Ordering::SeqCst); is_running = true; } @@ -136,7 +137,7 @@ pub fn ingest_server( if let Err(e) = ingest_sender.send((bytes_len, buffer)) { error!("Ingest server write error: {e:?}"); - *proc_control.is_terminated.lock().unwrap() = true; + proc_control.is_terminated.store(true, Ordering::SeqCst); break 'ingest_iter; } } else { @@ -145,7 +146,7 @@ pub fn ingest_server( } drop(ingest_reader); - *proc_control.server_is_running.lock().unwrap() = false; + proc_control.server_is_running.store(false, Ordering::SeqCst); if let Err(e) = proc_control.wait(Ingest) { error!("{e}") @@ -155,7 +156,7 @@ pub fn ingest_server( error!("{e:?}"); }; - if *proc_control.is_terminated.lock().unwrap() { + if proc_control.is_terminated.load(Ordering::SeqCst) { break; } } diff --git a/src/input/mod.rs b/src/input/mod.rs index 93d6440a..f4112487 100644 --- a/src/input/mod.rs +++ b/src/input/mod.rs @@ -1,6 +1,6 @@ use std::{ process, - sync::{Arc, Mutex}, + sync::{Arc, Mutex, atomic::{AtomicBool, AtomicUsize}}, thread, }; @@ -19,9 +19,9 @@ pub use playlist::CurrentProgram; pub fn source_generator( config: GlobalConfig, current_list: Arc>>, - index: Arc>, + index: Arc, playout_stat: PlayoutStatus, - is_terminated: Arc>, + is_terminated: Arc, ) -> Box> { let get_source = match config.processing.clone().mode.as_str() { "folder" => { diff --git a/src/input/playlist.rs b/src/input/playlist.rs index 82c09b80..e88e869a 100644 --- a/src/input/playlist.rs +++ b/src/input/playlist.rs @@ -1,7 +1,7 @@ use std::{ fs, path::Path, - sync::{Arc, Mutex}, + sync::{atomic::{AtomicBool, AtomicUsize, Ordering}, Arc, Mutex}, }; use serde_json::json; @@ -21,17 +21,17 @@ pub struct CurrentProgram { json_date: String, pub nodes: Arc>>, current_node: Media, - index: Arc>, - is_terminated: Arc>, + index: Arc, + is_terminated: Arc, playout_stat: PlayoutStatus, } impl CurrentProgram { pub fn new( playout_stat: PlayoutStatus, - is_terminated: Arc>, + is_terminated: Arc, current_list: Arc>>, - global_index: Arc>, + global_index: Arc, ) -> Self { let config = GlobalConfig::global(); let json = read_json(None, is_terminated.clone(), true, 0.0); @@ -65,12 +65,7 @@ impl CurrentProgram { fn check_update(&mut self, seek: bool) { if self.json_path.is_none() { - let json = read_json( - None, - self.is_terminated.clone(), - seek, - 0.0, - ); + let json = read_json(None, self.is_terminated.clone(), seek, 0.0); self.json_path = json.current_file; self.json_mod = json.modified; @@ -100,7 +95,8 @@ impl CurrentProgram { *self.nodes.lock().unwrap() = json.program; self.get_current_clip(); - *self.index.lock().unwrap() += 1; + let idx = self.index.load(Ordering::SeqCst); + self.index.store(idx + 1, Ordering::SeqCst); } } else { error!( @@ -115,8 +111,8 @@ impl CurrentProgram { self.json_path = None; *self.nodes.lock().unwrap() = vec![media.clone()]; self.current_node = media; - *self.playout_stat.list_init.lock().unwrap() = true; - *self.index.lock().unwrap() = 0; + self.playout_stat.list_init.store(true, Ordering::SeqCst); + self.index.store(0, Ordering::SeqCst); } } @@ -137,12 +133,7 @@ impl CurrentProgram { || is_close(total_delta, 0.0, 2.0) || is_close(total_delta, target_length, 2.0) { - let json = read_json( - None, - self.is_terminated.clone(), - false, - next_start, - ); + let json = read_json(None, self.is_terminated.clone(), false, next_start); let data = json!({ "time_shift": 0.0, @@ -160,27 +151,35 @@ impl CurrentProgram { self.json_mod = json.modified; self.json_date = json.date; *self.nodes.lock().unwrap() = json.program; - *self.index.lock().unwrap() = 0; + self.index.store(0, Ordering::SeqCst); if json.current_file.is_none() { - *self.playout_stat.list_init.lock().unwrap() = true; + self.playout_stat.list_init.store(true, Ordering::SeqCst); } } } fn last_next_ad(&mut self) { - let index = *self.index.lock().unwrap(); + let index = self.index.load(Ordering::SeqCst); let current_list = self.nodes.lock().unwrap(); if index + 1 < current_list.len() - && ¤t_list[index + 1].category.clone().unwrap_or(String::new()) == "advertisement" + && ¤t_list[index + 1] + .category + .clone() + .unwrap_or(String::new()) + == "advertisement" { self.current_node.next_ad = Some(true); } if index > 0 && index < current_list.len() - && ¤t_list[index - 1].category.clone().unwrap_or(String::new()) == "advertisement" + && ¤t_list[index - 1] + .category + .clone() + .unwrap_or(String::new()) + == "advertisement" { self.current_node.last_ad = Some(true); } @@ -210,8 +209,8 @@ impl CurrentProgram { for (i, item) in self.nodes.lock().unwrap().iter_mut().enumerate() { if item.begin.unwrap() + item.out - item.seek > time_sec { - *self.playout_stat.list_init.lock().unwrap() = false; - *self.index.lock().unwrap() = i; + self.playout_stat.list_init.store(false, Ordering::SeqCst); + self.index.store(i, Ordering::SeqCst); break; } @@ -221,13 +220,14 @@ impl CurrentProgram { fn init_clip(&mut self) { self.get_current_clip(); - if !*self.playout_stat.list_init.lock().unwrap() { + if !self.playout_stat.list_init.load(Ordering::SeqCst) { let time_sec = self.get_current_time(); - let index = *self.index.lock().unwrap(); + let index = self.index.load(Ordering::SeqCst); // de-instance node to preserve original values in list let mut node_clone = self.nodes.lock().unwrap()[index].clone(); - *self.index.lock().unwrap() += 1; + let idx = self.index.load(Ordering::SeqCst); + self.index.store(idx + 1, Ordering::SeqCst); node_clone.seek = time_sec - node_clone.begin.unwrap(); self.current_node = handle_list_init(node_clone); @@ -239,7 +239,7 @@ impl Iterator for CurrentProgram { type Item = Media; fn next(&mut self) -> Option { - if *self.playout_stat.list_init.lock().unwrap() { + if self.playout_stat.list_init.load(Ordering::SeqCst) { debug!("Playlist init"); self.check_update(true); @@ -247,7 +247,7 @@ impl Iterator for CurrentProgram { self.init_clip(); } - if *self.playout_stat.list_init.lock().unwrap() { + if self.playout_stat.list_init.load(Ordering::SeqCst) { // on init load playlist, could be not long enough, // so we check if we can take the next playlist already, // or we fill the gap with a dummy. @@ -270,7 +270,7 @@ impl Iterator for CurrentProgram { if DUMMY_LEN > total_delta { duration = total_delta; - *self.playout_stat.list_init.lock().unwrap() = false; + self.playout_stat.list_init.store(false, Ordering::SeqCst); } if self.config.playlist.start_sec.unwrap() > current_time { @@ -283,7 +283,7 @@ impl Iterator for CurrentProgram { self.current_node = gen_source(media); self.nodes.lock().unwrap().push(self.current_node.clone()); - *self.index.lock().unwrap() = self.nodes.lock().unwrap().len(); + self.index.store(self.nodes.lock().unwrap().len(), Ordering::SeqCst); } } @@ -292,10 +292,10 @@ impl Iterator for CurrentProgram { return Some(self.current_node.clone()); } - if *self.index.lock().unwrap() < self.nodes.lock().unwrap().len() { + if self.index.load(Ordering::SeqCst) < self.nodes.lock().unwrap().len() { self.check_for_next_playlist(); let mut is_last = false; - let index = *self.index.lock().unwrap(); + let index = self.index.load(Ordering::SeqCst); if index == self.nodes.lock().unwrap().len() - 1 { is_last = true @@ -308,7 +308,7 @@ impl Iterator for CurrentProgram { &self.playout_stat, ); self.last_next_ad(); - *self.index.lock().unwrap() += 1; + self.index.store(index + 1, Ordering::SeqCst); // update playlist should happen after current clip, // to prevent unknown behaviors. @@ -325,7 +325,7 @@ impl Iterator for CurrentProgram { { // Test if playlist is to early finish, // and if we have to fill it with a placeholder. - let index = *self.index.lock().unwrap(); + let index = self.index.load(Ordering::SeqCst); self.current_node = Media::new(index, String::new(), false); self.current_node.begin = Some(get_sec()); let mut duration = total_delta.abs(); @@ -342,17 +342,17 @@ impl Iterator for CurrentProgram { self.current_node.last_ad = last_ad; self.current_node.add_filter(); - *self.index.lock().unwrap() += 1; + self.index.store(index + 1, Ordering::SeqCst); return Some(self.current_node.clone()); } - *self.index.lock().unwrap() = 0; + self.index.store(0, Ordering::SeqCst); self.current_node = gen_source(self.nodes.lock().unwrap()[0].clone()); self.last_next_ad(); self.current_node.last_ad = last_ad; - *self.index.lock().unwrap() = 1; + self.index.store(1, Ordering::SeqCst); Some(self.current_node.clone()) } diff --git a/src/output/desktop.rs b/src/output/desktop.rs index 7484aec5..3afd04b1 100644 --- a/src/output/desktop.rs +++ b/src/output/desktop.rs @@ -1,7 +1,4 @@ -use std::{ - process, - process::{Command, Stdio}, -}; +use std::process::{self, Command, Stdio}; use simplelog::*; @@ -13,14 +10,7 @@ pub fn output(log_format: &str) -> process::Child { let mut enc_filter: Vec = vec![]; - let mut enc_cmd = vec![ - "-hide_banner", - "-nostats", - "-v", - log_format, - "-i", - "pipe:0", - ]; + let mut enc_cmd = vec!["-hide_banner", "-nostats", "-v", log_format, "-i", "pipe:0"]; if config.text.add_text && !config.text.over_pre { info!( @@ -35,7 +25,10 @@ pub fn output(log_format: &str) -> process::Child { enc_cmd.append(&mut enc_filter.iter().map(String::as_str).collect()); - debug!("Encoder CMD: \"ffplay {}\"", enc_cmd.join(" ")); + debug!( + "Encoder CMD: \"ffplay {}\"", + enc_cmd.join(" ") + ); let enc_proc = match Command::new("ffplay") .args(enc_cmd) diff --git a/src/output/mod.rs b/src/output/mod.rs index 3aab07c1..618e5da4 100644 --- a/src/output/mod.rs +++ b/src/output/mod.rs @@ -1,6 +1,7 @@ use std::{ io::{prelude::*, BufReader, BufWriter, Read}, process::{Command, Stdio}, + sync::atomic::Ordering, thread::{self, sleep}, time::Duration, }; @@ -58,11 +59,7 @@ pub fn player( let proc_control_c = proc_control.clone(); if config.ingest.enable { - thread::spawn(move || ingest_server( - ff_log_format_c, - ingest_sender, - proc_control_c, - )); + thread::spawn(move || ingest_server(ff_log_format_c, ingest_sender, proc_control_c)); } 'source_iter: for node in get_source { @@ -118,7 +115,7 @@ pub fn player( *proc_control.decoder_term.lock().unwrap() = Some(dec_proc); loop { - if *proc_control.server_is_running.lock().unwrap() { + if proc_control.server_is_running.load(Ordering::SeqCst) { if !live_on { info!("Switch from {} to live ingest", config.processing.mode); @@ -131,7 +128,7 @@ pub fn player( } live_on = true; - *playlist_init.lock().unwrap() = true; + playlist_init.store(true, Ordering::SeqCst); } for rx in ingest_receiver.try_iter() { diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index fe3b4a27..19cb1767 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -1,6 +1,9 @@ -use jsonrpc_http_server::jsonrpc_core::{IoHandler, Params, Value}; +use std::sync::atomic::Ordering; + use jsonrpc_http_server::{ - hyper, AccessControlAllowOrigin, DomainsValidation, Response, RestApi, ServerBuilder, + hyper, + jsonrpc_core::{IoHandler, Params, Value}, + AccessControlAllowOrigin, DomainsValidation, Response, RestApi, ServerBuilder, }; use serde_json::{json, Map}; use simplelog::*; @@ -59,7 +62,7 @@ pub fn json_rpc_server( let mut date = playout_stat.date.lock().unwrap(); if map.contains_key("control") && &map["control"] == "next" { - let index = *play.index.lock().unwrap(); + let index = play.index.load(Ordering::SeqCst); if index < play.current_list.lock().unwrap().len() { if let Some(proc) = proc.decoder_term.lock().unwrap().as_mut() { @@ -96,7 +99,7 @@ pub fn json_rpc_server( } if map.contains_key("control") && &map["control"] == "back" { - let index = *play.index.lock().unwrap(); + let index = play.index.load(Ordering::SeqCst); if index > 1 && play.current_list.lock().unwrap().len() > 1 { if let Some(proc) = proc.decoder_term.lock().unwrap().as_mut() { @@ -111,7 +114,7 @@ pub fn json_rpc_server( info!("Move to last clip"); let mut data_map = Map::new(); let mut media = play.current_list.lock().unwrap()[index - 2].clone(); - *play.index.lock().unwrap() = index - 2; + play.index.store(index - 2, Ordering::SeqCst); media.add_probe(); let (delta, _) = get_delta(&media.begin.unwrap_or(0.0)); @@ -146,7 +149,7 @@ pub fn json_rpc_server( let mut data_map = Map::new(); *time_shift = 0.0; *date = current_date.clone(); - *playout_stat.list_init.lock().unwrap() = true; + playout_stat.list_init.store(true, Ordering::SeqCst); write_status(¤t_date, 0.0); @@ -167,7 +170,7 @@ pub fn json_rpc_server( } if map.contains_key("media") && &map["media"] == "next" { - let index = *play.index.lock().unwrap(); + let index = play.index.load(Ordering::SeqCst); if index < play.current_list.lock().unwrap().len() { let media = play.current_list.lock().unwrap()[index].clone(); @@ -181,7 +184,7 @@ pub fn json_rpc_server( } if map.contains_key("media") && &map["media"] == "last" { - let index = *play.index.lock().unwrap(); + let index = play.index.load(Ordering::SeqCst); if index > 1 && index - 2 < play.current_list.lock().unwrap().len() { let media = play.current_list.lock().unwrap()[index - 2].clone(); diff --git a/src/utils/config.rs b/src/utils/config.rs index 68aa288c..07489213 100644 --- a/src/utils/config.rs +++ b/src/utils/config.rs @@ -1,6 +1,3 @@ -use once_cell::sync::OnceCell; -use serde::{Deserialize, Serialize}; -use serde_yaml::{self}; use std::{ env, fs::File, @@ -8,6 +5,9 @@ use std::{ process, }; +use once_cell::sync::OnceCell; +use serde::{Deserialize, Serialize}; +use serde_yaml::{self}; use shlex::split; use crate::utils::{get_args, time_to_sec}; diff --git a/src/utils/controller.rs b/src/utils/controller.rs index e251eebf..c51b00ba 100644 --- a/src/utils/controller.rs +++ b/src/utils/controller.rs @@ -1,8 +1,10 @@ use std::{ fmt, process::Child, - sync::{Arc, Mutex, RwLock}, - + sync::{ + atomic::{AtomicBool, AtomicUsize, Ordering}, + Arc, Mutex, + }, }; use jsonrpc_http_server::CloseHandle; @@ -33,10 +35,10 @@ pub struct ProcessControl { pub decoder_term: Arc>>, pub encoder_term: Arc>>, pub server_term: Arc>>, - pub server_is_running: Arc>, + pub server_is_running: Arc, pub rpc_handle: Arc>>, - pub is_terminated: Arc>, - pub is_alive: Arc>, + pub is_terminated: Arc, + pub is_alive: Arc, } impl ProcessControl { @@ -45,10 +47,10 @@ impl ProcessControl { decoder_term: Arc::new(Mutex::new(None)), encoder_term: Arc::new(Mutex::new(None)), server_term: Arc::new(Mutex::new(None)), - server_is_running: Arc::new(Mutex::new(false)), + server_is_running: Arc::new(AtomicBool::new(false)), rpc_handle: Arc::new(Mutex::new(None)), - is_terminated: Arc::new(Mutex::new(false)), - is_alive: Arc::new(RwLock::new(true)), + is_terminated: Arc::new(AtomicBool::new(false)), + is_alive: Arc::new(AtomicBool::new(true)), } } } @@ -115,20 +117,16 @@ impl ProcessControl { } pub fn kill_all(&mut self) { - *self.is_terminated.lock().unwrap() = true; + self.is_terminated.store(true, Ordering::SeqCst); - if *self.is_alive.read().unwrap() { - *self.is_alive.write().unwrap() = false; + if self.is_alive.load(Ordering::SeqCst) { + self.is_alive.store(false, Ordering::SeqCst); if let Some(rpc) = &*self.rpc_handle.lock().unwrap() { rpc.clone().close() }; - for unit in [ - Decoder, - Encoder, - Ingest, - ] { + for unit in [Decoder, Encoder, Ingest] { if let Err(e) = self.kill(unit) { error!("{e}") } @@ -147,7 +145,7 @@ impl Drop for ProcessControl { pub struct PlayerControl { pub current_media: Arc>>, pub current_list: Arc>>, - pub index: Arc>, + pub index: Arc, } impl PlayerControl { @@ -155,7 +153,7 @@ impl PlayerControl { Self { current_media: Arc::new(Mutex::new(None)), current_list: Arc::new(Mutex::new(vec![Media::new(0, String::new(), false)])), - index: Arc::new(Mutex::new(0)), + index: Arc::new(AtomicUsize::new(0)), } } } @@ -165,7 +163,7 @@ pub struct PlayoutStatus { pub time_shift: Arc>, pub date: Arc>, pub current_date: Arc>, - pub list_init: Arc>, + pub list_init: Arc, } impl PlayoutStatus { @@ -174,8 +172,7 @@ impl PlayoutStatus { time_shift: Arc::new(Mutex::new(0.0)), date: Arc::new(Mutex::new(String::new())), current_date: Arc::new(Mutex::new(String::new())), - list_init: Arc::new(Mutex::new(true)), + list_init: Arc::new(AtomicBool::new(true)), } } } - diff --git a/src/utils/generator.rs b/src/utils/generator.rs index 43592996..49e86ed4 100644 --- a/src/utils/generator.rs +++ b/src/utils/generator.rs @@ -2,7 +2,7 @@ use std::{ fs::{create_dir_all, write}, path::Path, process::exit, - sync::{Arc, Mutex}, + sync::{atomic::AtomicUsize, Arc, Mutex}, }; use chrono::{Duration, NaiveDate}; @@ -50,7 +50,7 @@ pub fn generate_playlist(mut date_range: Vec) { let config = GlobalConfig::global(); let total_length = config.playlist.length_sec.unwrap().clone(); let current_list = Arc::new(Mutex::new(vec![Media::new(0, "".to_string(), false)])); - let index = Arc::new(Mutex::new(0)); + let index = Arc::new(AtomicUsize::new(0)); let playlist_root = Path::new(&config.playlist.path); if !playlist_root.is_dir() { diff --git a/src/utils/json_serializer.rs b/src/utils/json_serializer.rs index 0022c979..f963b1c3 100644 --- a/src/utils/json_serializer.rs +++ b/src/utils/json_serializer.rs @@ -2,7 +2,7 @@ use serde::{Deserialize, Serialize}; use std::{ fs::File, path::Path, - sync::{Arc, Mutex}, + sync::{atomic::AtomicBool, Arc}, thread, }; @@ -46,7 +46,7 @@ impl Playlist { pub fn read_json( path: Option, - is_terminated: Arc>, + is_terminated: Arc, seek: bool, next_start: f64, ) -> Playlist { @@ -109,11 +109,7 @@ pub fn read_json( let list_clone = playlist.clone(); - thread::spawn(move || validate_playlist( - list_clone, - is_terminated, - config.clone(), - )); + thread::spawn(move || validate_playlist(list_clone, is_terminated, config.clone())); playlist } diff --git a/src/utils/json_validate.rs b/src/utils/json_validate.rs index b70abe61..753df56f 100644 --- a/src/utils/json_validate.rs +++ b/src/utils/json_validate.rs @@ -1,10 +1,10 @@ -use std::{path::Path, sync::{Arc, Mutex},}; +use std::{path::Path, sync::{atomic::{AtomicBool, Ordering}, Arc}}; use simplelog::*; use crate::utils::{sec_to_time, GlobalConfig, MediaProbe, Playlist}; -pub fn validate_playlist(playlist: Playlist, is_terminated: Arc>, config: GlobalConfig) { +pub fn validate_playlist(playlist: Playlist, is_terminated: Arc, config: GlobalConfig) { let date = playlist.date; let mut length = config.playlist.length_sec.unwrap(); let mut begin = config.playlist.start_sec.unwrap(); @@ -14,7 +14,7 @@ pub fn validate_playlist(playlist: Playlist, is_terminated: Arc>, co debug!("validate playlist from: {date}"); for item in playlist.program.iter() { - if *is_terminated.lock().unwrap() { + if is_terminated.load(Ordering::SeqCst) { return }