remove some locks, replace loop in threads with while condition

This commit is contained in:
jb-alvarado 2022-05-18 11:17:08 +02:00
parent d02cf98c60
commit 3b214d42fe
8 changed files with 43 additions and 30 deletions

6
Cargo.lock generated
View File

@ -1148,12 +1148,12 @@ dependencies = [
[[package]]
name = "schannel"
version = "0.1.19"
version = "0.1.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f05ba609c234e60bee0d547fe94a4c7e9da733d1c962cf6e59efa4cd9c8bc75"
checksum = "88d6731146462ea25d9244b2ed5fd1d716d25c52e4d54aa4fb0f3c4e9854dbe2"
dependencies = [
"lazy_static",
"winapi 0.3.9",
"windows-sys",
]
[[package]]

View File

@ -3,7 +3,7 @@ use std::{
path::Path,
process::exit,
sync::{
atomic::{AtomicUsize, Ordering},
atomic::{AtomicBool, AtomicUsize, Ordering},
mpsc::channel,
{Arc, Mutex},
},
@ -162,7 +162,11 @@ fn file_extension(filename: &Path) -> Option<&str> {
/// Create a watcher, which monitor file changes.
/// When a change is register, update the current file list.
/// This makes it possible, to play infinitely and and always new files to it.
pub fn watchman(config: GlobalConfig, sources: Arc<Mutex<Vec<Media>>>) {
pub fn watchman(
config: GlobalConfig,
is_terminated: Arc<AtomicBool>,
sources: Arc<Mutex<Vec<Media>>>,
) {
let (tx, rx) = channel();
let path = config.storage.path;
@ -175,7 +179,7 @@ pub fn watchman(config: GlobalConfig, sources: Arc<Mutex<Vec<Media>>>) {
let mut watcher = watcher(tx, Duration::from_secs(1)).unwrap();
watcher.watch(path, RecursiveMode::Recursive).unwrap();
loop {
while !is_terminated.load(Ordering::SeqCst) {
if let Ok(res) = rx.try_recv() {
match res {
Create(new_path) => {

View File

@ -40,7 +40,7 @@ pub fn source_generator(
let node_clone = folder_source.nodes.clone();
// Spawn a thread to monitor folder for file changes.
thread::spawn(move || watchman(config_clone, node_clone));
thread::spawn(move || watchman(config_clone, is_terminated.clone(), node_clone));
Box::new(folder_source) as Box<dyn Iterator<Item = Media>>
}

View File

@ -209,14 +209,14 @@ impl CurrentProgram {
// On init or reload we need to seek for the current clip.
fn get_current_clip(&mut self) {
let mut time_sec = self.get_current_time();
let shift = self.playout_stat.time_shift.lock().unwrap();
if *self.playout_stat.current_date.lock().unwrap()
== *self.playout_stat.date.lock().unwrap()
&& *self.playout_stat.time_shift.lock().unwrap() != 0.0
&& *shift != 0.0
{
let shift = *self.playout_stat.time_shift.lock().unwrap();
info!("Shift playlist start for <yellow>{shift}</> seconds");
time_sec += shift;
info!("Shift playlist start for <yellow>{}</> seconds", *shift);
time_sec += *shift;
}
for (i, item) in self.nodes.lock().unwrap().iter_mut().enumerate() {

View File

@ -71,9 +71,10 @@ fn main() {
let play_control = PlayerControl::new();
let playout_stat = PlayoutStatus::new();
let proc_control = ProcessControl::new();
let proc_ctl = proc_control.clone();
let messages = Arc::new(Mutex::new(Vec::new()));
let logging = init_logging(&config, messages.clone());
let logging = init_logging(&config, proc_ctl, messages.clone());
CombinedLogger::init(logging).unwrap();
validate_ffmpeg(&config);

View File

@ -71,12 +71,13 @@ pub fn json_rpc_server(
let mut time_shift = playout_stat.time_shift.lock().unwrap();
let current_date = playout_stat.current_date.lock().unwrap().clone();
let mut date = playout_stat.date.lock().unwrap();
let current_list = play_control.current_list.lock().unwrap();
// get next clip
if map.contains_key("control") && &map["control"] == "next" {
let index = play_control.index.load(Ordering::SeqCst);
if index < play_control.current_list.lock().unwrap().len() {
if index < current_list.len() {
if let Some(proc) = proc.decoder_term.lock().unwrap().as_mut() {
if let Err(e) = proc.kill() {
error!("Decoder {e:?}")
@ -89,7 +90,7 @@ pub fn json_rpc_server(
info!("Move to next clip");
let mut data_map = Map::new();
let mut media = play_control.current_list.lock().unwrap()[index].clone();
let mut media = current_list[index].clone();
media.add_probe();
let (delta, _) = get_delta(&config, &media.begin.unwrap_or(0.0));
@ -114,7 +115,7 @@ pub fn json_rpc_server(
if map.contains_key("control") && &map["control"] == "back" {
let index = play_control.index.load(Ordering::SeqCst);
if index > 1 && play_control.current_list.lock().unwrap().len() > 1 {
if index > 1 && current_list.len() > 1 {
if let Some(proc) = proc.decoder_term.lock().unwrap().as_mut() {
if let Err(e) = proc.kill() {
error!("Decoder {e:?}")
@ -126,8 +127,7 @@ pub fn json_rpc_server(
info!("Move to last clip");
let mut data_map = Map::new();
let mut media =
play_control.current_list.lock().unwrap()[index - 2].clone();
let mut media = current_list[index - 2].clone();
play_control.index.fetch_sub(2, Ordering::SeqCst);
media.add_probe();
@ -189,8 +189,8 @@ pub fn json_rpc_server(
if map.contains_key("media") && &map["media"] == "next" {
let index = play_control.index.load(Ordering::SeqCst);
if index < play_control.current_list.lock().unwrap().len() {
let media = play_control.current_list.lock().unwrap()[index].clone();
if index < current_list.len() {
let media = current_list[index].clone();
let data_map = get_data_map(&config, media);
@ -204,8 +204,8 @@ pub fn json_rpc_server(
if map.contains_key("media") && &map["media"] == "last" {
let index = play_control.index.load(Ordering::SeqCst);
if index > 1 && index - 2 < play_control.current_list.lock().unwrap().len() {
let media = play_control.current_list.lock().unwrap()[index - 2].clone();
if index > 1 && index - 2 < current_list.len() {
let media = current_list[index - 2].clone();
let data_map = get_data_map(&config, media);

View File

@ -34,13 +34,14 @@ fn playlist_change_at_midnight() {
let playout_stat = PlayoutStatus::new();
let proc_control = ProcessControl::new();
let proc_ctl = proc_control.clone();
let proc_ctl2 = proc_control.clone();
let logging = init_logging(&config, messages);
let logging = init_logging(&config, proc_ctl, messages);
CombinedLogger::init(logging).unwrap();
mock_time::set_mock_time("2022-05-09T23:59:45");
thread::spawn(move || timed_kill(30, proc_ctl));
thread::spawn(move || timed_kill(30, proc_ctl2));
player(&config, play_control, playout_stat, proc_control);
}
@ -60,13 +61,14 @@ fn playlist_change_at_six() {
let playout_stat = PlayoutStatus::new();
let proc_control = ProcessControl::new();
let proc_ctl = proc_control.clone();
let proc_ctl2 = proc_control.clone();
let logging = init_logging(&config, messages);
let logging = init_logging(&config, proc_ctl, messages);
CombinedLogger::init(logging).unwrap();
mock_time::set_mock_time("2022-05-09T05:59:45");
thread::spawn(move || timed_kill(30, proc_ctl));
thread::spawn(move || timed_kill(30, proc_ctl2));
player(&config, play_control, playout_stat, proc_control);
}

View File

@ -3,7 +3,7 @@ extern crate simplelog;
use std::{
path::Path,
sync::{Arc, Mutex},
sync::{atomic::Ordering, Arc, Mutex},
thread::{self, sleep},
time::Duration,
};
@ -22,7 +22,7 @@ use log::{Level, LevelFilter, Log, Metadata, Record};
use regex::Regex;
use simplelog::*;
use crate::utils::GlobalConfig;
use crate::utils::{GlobalConfig, ProcessControl};
/// send log messages to mail recipient
pub fn send_mail(cfg: &GlobalConfig, msg: String) {
@ -56,8 +56,13 @@ pub fn send_mail(cfg: &GlobalConfig, msg: String) {
/// Basic Mail Queue
///
/// Check every give seconds for messages and send them.
fn mail_queue(cfg: GlobalConfig, messages: Arc<Mutex<Vec<String>>>, interval: u64) {
loop {
fn mail_queue(
cfg: GlobalConfig,
proc_ctl: ProcessControl,
messages: Arc<Mutex<Vec<String>>>,
interval: u64,
) {
while !proc_ctl.is_terminated.load(Ordering::SeqCst) {
if messages.lock().unwrap().len() > 0 {
let msg = messages.lock().unwrap().join("\n");
send_mail(&cfg, msg);
@ -149,6 +154,7 @@ fn clean_string(text: &str) -> String {
/// - mail logger
pub fn init_logging(
config: &GlobalConfig,
proc_ctl: ProcessControl,
messages: Arc<Mutex<Vec<String>>>,
) -> Vec<Box<dyn SharedLogger>> {
let config_clone = config.clone();
@ -231,7 +237,7 @@ pub fn init_logging(
let messages_clone = messages.clone();
let interval = config.mail.interval;
thread::spawn(move || mail_queue(config_clone, messages_clone, interval));
thread::spawn(move || mail_queue(config_clone, proc_ctl, messages_clone, interval));
let mail_config = log_config.build();