Switch from some Mutex to Atomics
This commit is contained in:
parent
13c20846e3
commit
a50e195dca
@ -3,6 +3,7 @@ use std::{
|
|||||||
path::Path,
|
path::Path,
|
||||||
process::exit,
|
process::exit,
|
||||||
sync::{
|
sync::{
|
||||||
|
atomic::{AtomicUsize, Ordering},
|
||||||
mpsc::channel,
|
mpsc::channel,
|
||||||
{Arc, Mutex},
|
{Arc, Mutex},
|
||||||
},
|
},
|
||||||
@ -25,11 +26,11 @@ pub struct Source {
|
|||||||
config: GlobalConfig,
|
config: GlobalConfig,
|
||||||
pub nodes: Arc<Mutex<Vec<Media>>>,
|
pub nodes: Arc<Mutex<Vec<Media>>>,
|
||||||
current_node: Media,
|
current_node: Media,
|
||||||
index: Arc<Mutex<usize>>,
|
index: Arc<AtomicUsize>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Source {
|
impl Source {
|
||||||
pub fn new(current_list: Arc<Mutex<Vec<Media>>>, global_index: Arc<Mutex<usize>>) -> Self {
|
pub fn new(current_list: Arc<Mutex<Vec<Media>>>, global_index: Arc<AtomicUsize>) -> Self {
|
||||||
let config = GlobalConfig::global();
|
let config = GlobalConfig::global();
|
||||||
let mut media_list = vec![];
|
let mut media_list = vec![];
|
||||||
let mut index: usize = 0;
|
let mut index: usize = 0;
|
||||||
@ -117,14 +118,14 @@ impl Iterator for Source {
|
|||||||
type Item = Media;
|
type Item = Media;
|
||||||
|
|
||||||
fn next(&mut self) -> Option<Self::Item> {
|
fn next(&mut self) -> Option<Self::Item> {
|
||||||
if *self.index.lock().unwrap() < self.nodes.lock().unwrap().len() {
|
if self.index.load(Ordering::SeqCst) < self.nodes.lock().unwrap().len() {
|
||||||
let i = *self.index.lock().unwrap();
|
let i = self.index.load(Ordering::SeqCst);
|
||||||
self.current_node = self.nodes.lock().unwrap()[i].clone();
|
self.current_node = self.nodes.lock().unwrap()[i].clone();
|
||||||
self.current_node.add_probe();
|
self.current_node.add_probe();
|
||||||
self.current_node.add_filter();
|
self.current_node.add_filter();
|
||||||
self.current_node.begin = Some(get_sec());
|
self.current_node.begin = Some(get_sec());
|
||||||
|
|
||||||
*self.index.lock().unwrap() += 1;
|
self.index.store(i + 1, Ordering::SeqCst);
|
||||||
|
|
||||||
Some(self.current_node.clone())
|
Some(self.current_node.clone())
|
||||||
} else {
|
} else {
|
||||||
@ -147,7 +148,7 @@ impl Iterator for Source {
|
|||||||
self.current_node.add_filter();
|
self.current_node.add_filter();
|
||||||
self.current_node.begin = Some(get_sec());
|
self.current_node.begin = Some(get_sec());
|
||||||
|
|
||||||
*self.index.lock().unwrap() = 1;
|
self.index.store(1, Ordering::SeqCst);
|
||||||
|
|
||||||
Some(self.current_node.clone())
|
Some(self.current_node.clone())
|
||||||
}
|
}
|
||||||
|
@ -2,6 +2,7 @@ use std::{
|
|||||||
io::{BufReader, Error, Read},
|
io::{BufReader, Error, Read},
|
||||||
path::Path,
|
path::Path,
|
||||||
process::{Command, Stdio},
|
process::{Command, Stdio},
|
||||||
|
sync::atomic::Ordering,
|
||||||
thread,
|
thread,
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -128,7 +129,7 @@ pub fn ingest_server(
|
|||||||
};
|
};
|
||||||
|
|
||||||
if !is_running {
|
if !is_running {
|
||||||
*proc_control.server_is_running.lock().unwrap() = true;
|
proc_control.server_is_running.store(true, Ordering::SeqCst);
|
||||||
is_running = true;
|
is_running = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -136,7 +137,7 @@ pub fn ingest_server(
|
|||||||
if let Err(e) = ingest_sender.send((bytes_len, buffer)) {
|
if let Err(e) = ingest_sender.send((bytes_len, buffer)) {
|
||||||
error!("Ingest server write error: {e:?}");
|
error!("Ingest server write error: {e:?}");
|
||||||
|
|
||||||
*proc_control.is_terminated.lock().unwrap() = true;
|
proc_control.is_terminated.store(true, Ordering::SeqCst);
|
||||||
break 'ingest_iter;
|
break 'ingest_iter;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@ -145,7 +146,7 @@ pub fn ingest_server(
|
|||||||
}
|
}
|
||||||
|
|
||||||
drop(ingest_reader);
|
drop(ingest_reader);
|
||||||
*proc_control.server_is_running.lock().unwrap() = false;
|
proc_control.server_is_running.store(false, Ordering::SeqCst);
|
||||||
|
|
||||||
if let Err(e) = proc_control.wait(Ingest) {
|
if let Err(e) = proc_control.wait(Ingest) {
|
||||||
error!("{e}")
|
error!("{e}")
|
||||||
@ -155,7 +156,7 @@ pub fn ingest_server(
|
|||||||
error!("{e:?}");
|
error!("{e:?}");
|
||||||
};
|
};
|
||||||
|
|
||||||
if *proc_control.is_terminated.lock().unwrap() {
|
if proc_control.is_terminated.load(Ordering::SeqCst) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
use std::{
|
use std::{
|
||||||
process,
|
process,
|
||||||
sync::{Arc, Mutex},
|
sync::{Arc, Mutex, atomic::{AtomicBool, AtomicUsize}},
|
||||||
thread,
|
thread,
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -19,9 +19,9 @@ pub use playlist::CurrentProgram;
|
|||||||
pub fn source_generator(
|
pub fn source_generator(
|
||||||
config: GlobalConfig,
|
config: GlobalConfig,
|
||||||
current_list: Arc<Mutex<Vec<Media>>>,
|
current_list: Arc<Mutex<Vec<Media>>>,
|
||||||
index: Arc<Mutex<usize>>,
|
index: Arc<AtomicUsize>,
|
||||||
playout_stat: PlayoutStatus,
|
playout_stat: PlayoutStatus,
|
||||||
is_terminated: Arc<Mutex<bool>>,
|
is_terminated: Arc<AtomicBool>,
|
||||||
) -> Box<dyn Iterator<Item = Media>> {
|
) -> Box<dyn Iterator<Item = Media>> {
|
||||||
let get_source = match config.processing.clone().mode.as_str() {
|
let get_source = match config.processing.clone().mode.as_str() {
|
||||||
"folder" => {
|
"folder" => {
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
use std::{
|
use std::{
|
||||||
fs,
|
fs,
|
||||||
path::Path,
|
path::Path,
|
||||||
sync::{Arc, Mutex},
|
sync::{atomic::{AtomicBool, AtomicUsize, Ordering}, Arc, Mutex},
|
||||||
};
|
};
|
||||||
|
|
||||||
use serde_json::json;
|
use serde_json::json;
|
||||||
@ -21,17 +21,17 @@ pub struct CurrentProgram {
|
|||||||
json_date: String,
|
json_date: String,
|
||||||
pub nodes: Arc<Mutex<Vec<Media>>>,
|
pub nodes: Arc<Mutex<Vec<Media>>>,
|
||||||
current_node: Media,
|
current_node: Media,
|
||||||
index: Arc<Mutex<usize>>,
|
index: Arc<AtomicUsize>,
|
||||||
is_terminated: Arc<Mutex<bool>>,
|
is_terminated: Arc<AtomicBool>,
|
||||||
playout_stat: PlayoutStatus,
|
playout_stat: PlayoutStatus,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl CurrentProgram {
|
impl CurrentProgram {
|
||||||
pub fn new(
|
pub fn new(
|
||||||
playout_stat: PlayoutStatus,
|
playout_stat: PlayoutStatus,
|
||||||
is_terminated: Arc<Mutex<bool>>,
|
is_terminated: Arc<AtomicBool>,
|
||||||
current_list: Arc<Mutex<Vec<Media>>>,
|
current_list: Arc<Mutex<Vec<Media>>>,
|
||||||
global_index: Arc<Mutex<usize>>,
|
global_index: Arc<AtomicUsize>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let config = GlobalConfig::global();
|
let config = GlobalConfig::global();
|
||||||
let json = read_json(None, is_terminated.clone(), true, 0.0);
|
let json = read_json(None, is_terminated.clone(), true, 0.0);
|
||||||
@ -65,12 +65,7 @@ impl CurrentProgram {
|
|||||||
|
|
||||||
fn check_update(&mut self, seek: bool) {
|
fn check_update(&mut self, seek: bool) {
|
||||||
if self.json_path.is_none() {
|
if self.json_path.is_none() {
|
||||||
let json = read_json(
|
let json = read_json(None, self.is_terminated.clone(), seek, 0.0);
|
||||||
None,
|
|
||||||
self.is_terminated.clone(),
|
|
||||||
seek,
|
|
||||||
0.0,
|
|
||||||
);
|
|
||||||
|
|
||||||
self.json_path = json.current_file;
|
self.json_path = json.current_file;
|
||||||
self.json_mod = json.modified;
|
self.json_mod = json.modified;
|
||||||
@ -100,7 +95,8 @@ impl CurrentProgram {
|
|||||||
*self.nodes.lock().unwrap() = json.program;
|
*self.nodes.lock().unwrap() = json.program;
|
||||||
|
|
||||||
self.get_current_clip();
|
self.get_current_clip();
|
||||||
*self.index.lock().unwrap() += 1;
|
let idx = self.index.load(Ordering::SeqCst);
|
||||||
|
self.index.store(idx + 1, Ordering::SeqCst);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
error!(
|
error!(
|
||||||
@ -115,8 +111,8 @@ impl CurrentProgram {
|
|||||||
self.json_path = None;
|
self.json_path = None;
|
||||||
*self.nodes.lock().unwrap() = vec![media.clone()];
|
*self.nodes.lock().unwrap() = vec![media.clone()];
|
||||||
self.current_node = media;
|
self.current_node = media;
|
||||||
*self.playout_stat.list_init.lock().unwrap() = true;
|
self.playout_stat.list_init.store(true, Ordering::SeqCst);
|
||||||
*self.index.lock().unwrap() = 0;
|
self.index.store(0, Ordering::SeqCst);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -137,12 +133,7 @@ impl CurrentProgram {
|
|||||||
|| is_close(total_delta, 0.0, 2.0)
|
|| is_close(total_delta, 0.0, 2.0)
|
||||||
|| is_close(total_delta, target_length, 2.0)
|
|| is_close(total_delta, target_length, 2.0)
|
||||||
{
|
{
|
||||||
let json = read_json(
|
let json = read_json(None, self.is_terminated.clone(), false, next_start);
|
||||||
None,
|
|
||||||
self.is_terminated.clone(),
|
|
||||||
false,
|
|
||||||
next_start,
|
|
||||||
);
|
|
||||||
|
|
||||||
let data = json!({
|
let data = json!({
|
||||||
"time_shift": 0.0,
|
"time_shift": 0.0,
|
||||||
@ -160,27 +151,35 @@ impl CurrentProgram {
|
|||||||
self.json_mod = json.modified;
|
self.json_mod = json.modified;
|
||||||
self.json_date = json.date;
|
self.json_date = json.date;
|
||||||
*self.nodes.lock().unwrap() = json.program;
|
*self.nodes.lock().unwrap() = json.program;
|
||||||
*self.index.lock().unwrap() = 0;
|
self.index.store(0, Ordering::SeqCst);
|
||||||
|
|
||||||
if json.current_file.is_none() {
|
if json.current_file.is_none() {
|
||||||
*self.playout_stat.list_init.lock().unwrap() = true;
|
self.playout_stat.list_init.store(true, Ordering::SeqCst);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn last_next_ad(&mut self) {
|
fn last_next_ad(&mut self) {
|
||||||
let index = *self.index.lock().unwrap();
|
let index = self.index.load(Ordering::SeqCst);
|
||||||
let current_list = self.nodes.lock().unwrap();
|
let current_list = self.nodes.lock().unwrap();
|
||||||
|
|
||||||
if index + 1 < current_list.len()
|
if index + 1 < current_list.len()
|
||||||
&& ¤t_list[index + 1].category.clone().unwrap_or(String::new()) == "advertisement"
|
&& ¤t_list[index + 1]
|
||||||
|
.category
|
||||||
|
.clone()
|
||||||
|
.unwrap_or(String::new())
|
||||||
|
== "advertisement"
|
||||||
{
|
{
|
||||||
self.current_node.next_ad = Some(true);
|
self.current_node.next_ad = Some(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
if index > 0
|
if index > 0
|
||||||
&& index < current_list.len()
|
&& index < current_list.len()
|
||||||
&& ¤t_list[index - 1].category.clone().unwrap_or(String::new()) == "advertisement"
|
&& ¤t_list[index - 1]
|
||||||
|
.category
|
||||||
|
.clone()
|
||||||
|
.unwrap_or(String::new())
|
||||||
|
== "advertisement"
|
||||||
{
|
{
|
||||||
self.current_node.last_ad = Some(true);
|
self.current_node.last_ad = Some(true);
|
||||||
}
|
}
|
||||||
@ -210,8 +209,8 @@ impl CurrentProgram {
|
|||||||
|
|
||||||
for (i, item) in self.nodes.lock().unwrap().iter_mut().enumerate() {
|
for (i, item) in self.nodes.lock().unwrap().iter_mut().enumerate() {
|
||||||
if item.begin.unwrap() + item.out - item.seek > time_sec {
|
if item.begin.unwrap() + item.out - item.seek > time_sec {
|
||||||
*self.playout_stat.list_init.lock().unwrap() = false;
|
self.playout_stat.list_init.store(false, Ordering::SeqCst);
|
||||||
*self.index.lock().unwrap() = i;
|
self.index.store(i, Ordering::SeqCst);
|
||||||
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -221,13 +220,14 @@ impl CurrentProgram {
|
|||||||
fn init_clip(&mut self) {
|
fn init_clip(&mut self) {
|
||||||
self.get_current_clip();
|
self.get_current_clip();
|
||||||
|
|
||||||
if !*self.playout_stat.list_init.lock().unwrap() {
|
if !self.playout_stat.list_init.load(Ordering::SeqCst) {
|
||||||
let time_sec = self.get_current_time();
|
let time_sec = self.get_current_time();
|
||||||
let index = *self.index.lock().unwrap();
|
let index = self.index.load(Ordering::SeqCst);
|
||||||
|
|
||||||
// de-instance node to preserve original values in list
|
// de-instance node to preserve original values in list
|
||||||
let mut node_clone = self.nodes.lock().unwrap()[index].clone();
|
let mut node_clone = self.nodes.lock().unwrap()[index].clone();
|
||||||
*self.index.lock().unwrap() += 1;
|
let idx = self.index.load(Ordering::SeqCst);
|
||||||
|
self.index.store(idx + 1, Ordering::SeqCst);
|
||||||
|
|
||||||
node_clone.seek = time_sec - node_clone.begin.unwrap();
|
node_clone.seek = time_sec - node_clone.begin.unwrap();
|
||||||
self.current_node = handle_list_init(node_clone);
|
self.current_node = handle_list_init(node_clone);
|
||||||
@ -239,7 +239,7 @@ impl Iterator for CurrentProgram {
|
|||||||
type Item = Media;
|
type Item = Media;
|
||||||
|
|
||||||
fn next(&mut self) -> Option<Self::Item> {
|
fn next(&mut self) -> Option<Self::Item> {
|
||||||
if *self.playout_stat.list_init.lock().unwrap() {
|
if self.playout_stat.list_init.load(Ordering::SeqCst) {
|
||||||
debug!("Playlist init");
|
debug!("Playlist init");
|
||||||
self.check_update(true);
|
self.check_update(true);
|
||||||
|
|
||||||
@ -247,7 +247,7 @@ impl Iterator for CurrentProgram {
|
|||||||
self.init_clip();
|
self.init_clip();
|
||||||
}
|
}
|
||||||
|
|
||||||
if *self.playout_stat.list_init.lock().unwrap() {
|
if self.playout_stat.list_init.load(Ordering::SeqCst) {
|
||||||
// on init load playlist, could be not long enough,
|
// on init load playlist, could be not long enough,
|
||||||
// so we check if we can take the next playlist already,
|
// so we check if we can take the next playlist already,
|
||||||
// or we fill the gap with a dummy.
|
// or we fill the gap with a dummy.
|
||||||
@ -270,7 +270,7 @@ impl Iterator for CurrentProgram {
|
|||||||
|
|
||||||
if DUMMY_LEN > total_delta {
|
if DUMMY_LEN > total_delta {
|
||||||
duration = total_delta;
|
duration = total_delta;
|
||||||
*self.playout_stat.list_init.lock().unwrap() = false;
|
self.playout_stat.list_init.store(false, Ordering::SeqCst);
|
||||||
}
|
}
|
||||||
|
|
||||||
if self.config.playlist.start_sec.unwrap() > current_time {
|
if self.config.playlist.start_sec.unwrap() > current_time {
|
||||||
@ -283,7 +283,7 @@ impl Iterator for CurrentProgram {
|
|||||||
|
|
||||||
self.current_node = gen_source(media);
|
self.current_node = gen_source(media);
|
||||||
self.nodes.lock().unwrap().push(self.current_node.clone());
|
self.nodes.lock().unwrap().push(self.current_node.clone());
|
||||||
*self.index.lock().unwrap() = self.nodes.lock().unwrap().len();
|
self.index.store(self.nodes.lock().unwrap().len(), Ordering::SeqCst);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -292,10 +292,10 @@ impl Iterator for CurrentProgram {
|
|||||||
return Some(self.current_node.clone());
|
return Some(self.current_node.clone());
|
||||||
}
|
}
|
||||||
|
|
||||||
if *self.index.lock().unwrap() < self.nodes.lock().unwrap().len() {
|
if self.index.load(Ordering::SeqCst) < self.nodes.lock().unwrap().len() {
|
||||||
self.check_for_next_playlist();
|
self.check_for_next_playlist();
|
||||||
let mut is_last = false;
|
let mut is_last = false;
|
||||||
let index = *self.index.lock().unwrap();
|
let index = self.index.load(Ordering::SeqCst);
|
||||||
|
|
||||||
if index == self.nodes.lock().unwrap().len() - 1 {
|
if index == self.nodes.lock().unwrap().len() - 1 {
|
||||||
is_last = true
|
is_last = true
|
||||||
@ -308,7 +308,7 @@ impl Iterator for CurrentProgram {
|
|||||||
&self.playout_stat,
|
&self.playout_stat,
|
||||||
);
|
);
|
||||||
self.last_next_ad();
|
self.last_next_ad();
|
||||||
*self.index.lock().unwrap() += 1;
|
self.index.store(index + 1, Ordering::SeqCst);
|
||||||
|
|
||||||
// update playlist should happen after current clip,
|
// update playlist should happen after current clip,
|
||||||
// to prevent unknown behaviors.
|
// to prevent unknown behaviors.
|
||||||
@ -325,7 +325,7 @@ impl Iterator for CurrentProgram {
|
|||||||
{
|
{
|
||||||
// Test if playlist is to early finish,
|
// Test if playlist is to early finish,
|
||||||
// and if we have to fill it with a placeholder.
|
// and if we have to fill it with a placeholder.
|
||||||
let index = *self.index.lock().unwrap();
|
let index = self.index.load(Ordering::SeqCst);
|
||||||
self.current_node = Media::new(index, String::new(), false);
|
self.current_node = Media::new(index, String::new(), false);
|
||||||
self.current_node.begin = Some(get_sec());
|
self.current_node.begin = Some(get_sec());
|
||||||
let mut duration = total_delta.abs();
|
let mut duration = total_delta.abs();
|
||||||
@ -342,17 +342,17 @@ impl Iterator for CurrentProgram {
|
|||||||
self.current_node.last_ad = last_ad;
|
self.current_node.last_ad = last_ad;
|
||||||
self.current_node.add_filter();
|
self.current_node.add_filter();
|
||||||
|
|
||||||
*self.index.lock().unwrap() += 1;
|
self.index.store(index + 1, Ordering::SeqCst);
|
||||||
|
|
||||||
return Some(self.current_node.clone());
|
return Some(self.current_node.clone());
|
||||||
}
|
}
|
||||||
|
|
||||||
*self.index.lock().unwrap() = 0;
|
self.index.store(0, Ordering::SeqCst);
|
||||||
self.current_node = gen_source(self.nodes.lock().unwrap()[0].clone());
|
self.current_node = gen_source(self.nodes.lock().unwrap()[0].clone());
|
||||||
self.last_next_ad();
|
self.last_next_ad();
|
||||||
self.current_node.last_ad = last_ad;
|
self.current_node.last_ad = last_ad;
|
||||||
|
|
||||||
*self.index.lock().unwrap() = 1;
|
self.index.store(1, Ordering::SeqCst);
|
||||||
|
|
||||||
Some(self.current_node.clone())
|
Some(self.current_node.clone())
|
||||||
}
|
}
|
||||||
|
@ -1,7 +1,4 @@
|
|||||||
use std::{
|
use std::process::{self, Command, Stdio};
|
||||||
process,
|
|
||||||
process::{Command, Stdio},
|
|
||||||
};
|
|
||||||
|
|
||||||
use simplelog::*;
|
use simplelog::*;
|
||||||
|
|
||||||
@ -13,14 +10,7 @@ pub fn output(log_format: &str) -> process::Child {
|
|||||||
|
|
||||||
let mut enc_filter: Vec<String> = vec![];
|
let mut enc_filter: Vec<String> = vec![];
|
||||||
|
|
||||||
let mut enc_cmd = vec![
|
let mut enc_cmd = vec!["-hide_banner", "-nostats", "-v", log_format, "-i", "pipe:0"];
|
||||||
"-hide_banner",
|
|
||||||
"-nostats",
|
|
||||||
"-v",
|
|
||||||
log_format,
|
|
||||||
"-i",
|
|
||||||
"pipe:0",
|
|
||||||
];
|
|
||||||
|
|
||||||
if config.text.add_text && !config.text.over_pre {
|
if config.text.add_text && !config.text.over_pre {
|
||||||
info!(
|
info!(
|
||||||
@ -35,7 +25,10 @@ pub fn output(log_format: &str) -> process::Child {
|
|||||||
|
|
||||||
enc_cmd.append(&mut enc_filter.iter().map(String::as_str).collect());
|
enc_cmd.append(&mut enc_filter.iter().map(String::as_str).collect());
|
||||||
|
|
||||||
debug!("Encoder CMD: <bright-blue>\"ffplay {}\"</>", enc_cmd.join(" "));
|
debug!(
|
||||||
|
"Encoder CMD: <bright-blue>\"ffplay {}\"</>",
|
||||||
|
enc_cmd.join(" ")
|
||||||
|
);
|
||||||
|
|
||||||
let enc_proc = match Command::new("ffplay")
|
let enc_proc = match Command::new("ffplay")
|
||||||
.args(enc_cmd)
|
.args(enc_cmd)
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
use std::{
|
use std::{
|
||||||
io::{prelude::*, BufReader, BufWriter, Read},
|
io::{prelude::*, BufReader, BufWriter, Read},
|
||||||
process::{Command, Stdio},
|
process::{Command, Stdio},
|
||||||
|
sync::atomic::Ordering,
|
||||||
thread::{self, sleep},
|
thread::{self, sleep},
|
||||||
time::Duration,
|
time::Duration,
|
||||||
};
|
};
|
||||||
@ -58,11 +59,7 @@ pub fn player(
|
|||||||
let proc_control_c = proc_control.clone();
|
let proc_control_c = proc_control.clone();
|
||||||
|
|
||||||
if config.ingest.enable {
|
if config.ingest.enable {
|
||||||
thread::spawn(move || ingest_server(
|
thread::spawn(move || ingest_server(ff_log_format_c, ingest_sender, proc_control_c));
|
||||||
ff_log_format_c,
|
|
||||||
ingest_sender,
|
|
||||||
proc_control_c,
|
|
||||||
));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
'source_iter: for node in get_source {
|
'source_iter: for node in get_source {
|
||||||
@ -118,7 +115,7 @@ pub fn player(
|
|||||||
*proc_control.decoder_term.lock().unwrap() = Some(dec_proc);
|
*proc_control.decoder_term.lock().unwrap() = Some(dec_proc);
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
if *proc_control.server_is_running.lock().unwrap() {
|
if proc_control.server_is_running.load(Ordering::SeqCst) {
|
||||||
if !live_on {
|
if !live_on {
|
||||||
info!("Switch from {} to live ingest", config.processing.mode);
|
info!("Switch from {} to live ingest", config.processing.mode);
|
||||||
|
|
||||||
@ -131,7 +128,7 @@ pub fn player(
|
|||||||
}
|
}
|
||||||
|
|
||||||
live_on = true;
|
live_on = true;
|
||||||
*playlist_init.lock().unwrap() = true;
|
playlist_init.store(true, Ordering::SeqCst);
|
||||||
}
|
}
|
||||||
|
|
||||||
for rx in ingest_receiver.try_iter() {
|
for rx in ingest_receiver.try_iter() {
|
||||||
|
@ -1,6 +1,9 @@
|
|||||||
use jsonrpc_http_server::jsonrpc_core::{IoHandler, Params, Value};
|
use std::sync::atomic::Ordering;
|
||||||
|
|
||||||
use jsonrpc_http_server::{
|
use jsonrpc_http_server::{
|
||||||
hyper, AccessControlAllowOrigin, DomainsValidation, Response, RestApi, ServerBuilder,
|
hyper,
|
||||||
|
jsonrpc_core::{IoHandler, Params, Value},
|
||||||
|
AccessControlAllowOrigin, DomainsValidation, Response, RestApi, ServerBuilder,
|
||||||
};
|
};
|
||||||
use serde_json::{json, Map};
|
use serde_json::{json, Map};
|
||||||
use simplelog::*;
|
use simplelog::*;
|
||||||
@ -59,7 +62,7 @@ pub fn json_rpc_server(
|
|||||||
let mut date = playout_stat.date.lock().unwrap();
|
let mut date = playout_stat.date.lock().unwrap();
|
||||||
|
|
||||||
if map.contains_key("control") && &map["control"] == "next" {
|
if map.contains_key("control") && &map["control"] == "next" {
|
||||||
let index = *play.index.lock().unwrap();
|
let index = play.index.load(Ordering::SeqCst);
|
||||||
|
|
||||||
if index < play.current_list.lock().unwrap().len() {
|
if index < play.current_list.lock().unwrap().len() {
|
||||||
if let Some(proc) = proc.decoder_term.lock().unwrap().as_mut() {
|
if let Some(proc) = proc.decoder_term.lock().unwrap().as_mut() {
|
||||||
@ -96,7 +99,7 @@ pub fn json_rpc_server(
|
|||||||
}
|
}
|
||||||
|
|
||||||
if map.contains_key("control") && &map["control"] == "back" {
|
if map.contains_key("control") && &map["control"] == "back" {
|
||||||
let index = *play.index.lock().unwrap();
|
let index = play.index.load(Ordering::SeqCst);
|
||||||
|
|
||||||
if index > 1 && play.current_list.lock().unwrap().len() > 1 {
|
if index > 1 && play.current_list.lock().unwrap().len() > 1 {
|
||||||
if let Some(proc) = proc.decoder_term.lock().unwrap().as_mut() {
|
if let Some(proc) = proc.decoder_term.lock().unwrap().as_mut() {
|
||||||
@ -111,7 +114,7 @@ pub fn json_rpc_server(
|
|||||||
info!("Move to last clip");
|
info!("Move to last clip");
|
||||||
let mut data_map = Map::new();
|
let mut data_map = Map::new();
|
||||||
let mut media = play.current_list.lock().unwrap()[index - 2].clone();
|
let mut media = play.current_list.lock().unwrap()[index - 2].clone();
|
||||||
*play.index.lock().unwrap() = index - 2;
|
play.index.store(index - 2, Ordering::SeqCst);
|
||||||
media.add_probe();
|
media.add_probe();
|
||||||
|
|
||||||
let (delta, _) = get_delta(&media.begin.unwrap_or(0.0));
|
let (delta, _) = get_delta(&media.begin.unwrap_or(0.0));
|
||||||
@ -146,7 +149,7 @@ pub fn json_rpc_server(
|
|||||||
let mut data_map = Map::new();
|
let mut data_map = Map::new();
|
||||||
*time_shift = 0.0;
|
*time_shift = 0.0;
|
||||||
*date = current_date.clone();
|
*date = current_date.clone();
|
||||||
*playout_stat.list_init.lock().unwrap() = true;
|
playout_stat.list_init.store(true, Ordering::SeqCst);
|
||||||
|
|
||||||
write_status(¤t_date, 0.0);
|
write_status(¤t_date, 0.0);
|
||||||
|
|
||||||
@ -167,7 +170,7 @@ pub fn json_rpc_server(
|
|||||||
}
|
}
|
||||||
|
|
||||||
if map.contains_key("media") && &map["media"] == "next" {
|
if map.contains_key("media") && &map["media"] == "next" {
|
||||||
let index = *play.index.lock().unwrap();
|
let index = play.index.load(Ordering::SeqCst);
|
||||||
|
|
||||||
if index < play.current_list.lock().unwrap().len() {
|
if index < play.current_list.lock().unwrap().len() {
|
||||||
let media = play.current_list.lock().unwrap()[index].clone();
|
let media = play.current_list.lock().unwrap()[index].clone();
|
||||||
@ -181,7 +184,7 @@ pub fn json_rpc_server(
|
|||||||
}
|
}
|
||||||
|
|
||||||
if map.contains_key("media") && &map["media"] == "last" {
|
if map.contains_key("media") && &map["media"] == "last" {
|
||||||
let index = *play.index.lock().unwrap();
|
let index = play.index.load(Ordering::SeqCst);
|
||||||
|
|
||||||
if index > 1 && index - 2 < play.current_list.lock().unwrap().len() {
|
if index > 1 && index - 2 < play.current_list.lock().unwrap().len() {
|
||||||
let media = play.current_list.lock().unwrap()[index - 2].clone();
|
let media = play.current_list.lock().unwrap()[index - 2].clone();
|
||||||
|
@ -1,6 +1,3 @@
|
|||||||
use once_cell::sync::OnceCell;
|
|
||||||
use serde::{Deserialize, Serialize};
|
|
||||||
use serde_yaml::{self};
|
|
||||||
use std::{
|
use std::{
|
||||||
env,
|
env,
|
||||||
fs::File,
|
fs::File,
|
||||||
@ -8,6 +5,9 @@ use std::{
|
|||||||
process,
|
process,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
use once_cell::sync::OnceCell;
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use serde_yaml::{self};
|
||||||
use shlex::split;
|
use shlex::split;
|
||||||
|
|
||||||
use crate::utils::{get_args, time_to_sec};
|
use crate::utils::{get_args, time_to_sec};
|
||||||
|
@ -1,8 +1,10 @@
|
|||||||
use std::{
|
use std::{
|
||||||
fmt,
|
fmt,
|
||||||
process::Child,
|
process::Child,
|
||||||
sync::{Arc, Mutex, RwLock},
|
sync::{
|
||||||
|
atomic::{AtomicBool, AtomicUsize, Ordering},
|
||||||
|
Arc, Mutex,
|
||||||
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
use jsonrpc_http_server::CloseHandle;
|
use jsonrpc_http_server::CloseHandle;
|
||||||
@ -33,10 +35,10 @@ pub struct ProcessControl {
|
|||||||
pub decoder_term: Arc<Mutex<Option<Child>>>,
|
pub decoder_term: Arc<Mutex<Option<Child>>>,
|
||||||
pub encoder_term: Arc<Mutex<Option<Child>>>,
|
pub encoder_term: Arc<Mutex<Option<Child>>>,
|
||||||
pub server_term: Arc<Mutex<Option<Child>>>,
|
pub server_term: Arc<Mutex<Option<Child>>>,
|
||||||
pub server_is_running: Arc<Mutex<bool>>,
|
pub server_is_running: Arc<AtomicBool>,
|
||||||
pub rpc_handle: Arc<Mutex<Option<CloseHandle>>>,
|
pub rpc_handle: Arc<Mutex<Option<CloseHandle>>>,
|
||||||
pub is_terminated: Arc<Mutex<bool>>,
|
pub is_terminated: Arc<AtomicBool>,
|
||||||
pub is_alive: Arc<RwLock<bool>>,
|
pub is_alive: Arc<AtomicBool>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ProcessControl {
|
impl ProcessControl {
|
||||||
@ -45,10 +47,10 @@ impl ProcessControl {
|
|||||||
decoder_term: Arc::new(Mutex::new(None)),
|
decoder_term: Arc::new(Mutex::new(None)),
|
||||||
encoder_term: Arc::new(Mutex::new(None)),
|
encoder_term: Arc::new(Mutex::new(None)),
|
||||||
server_term: Arc::new(Mutex::new(None)),
|
server_term: Arc::new(Mutex::new(None)),
|
||||||
server_is_running: Arc::new(Mutex::new(false)),
|
server_is_running: Arc::new(AtomicBool::new(false)),
|
||||||
rpc_handle: Arc::new(Mutex::new(None)),
|
rpc_handle: Arc::new(Mutex::new(None)),
|
||||||
is_terminated: Arc::new(Mutex::new(false)),
|
is_terminated: Arc::new(AtomicBool::new(false)),
|
||||||
is_alive: Arc::new(RwLock::new(true)),
|
is_alive: Arc::new(AtomicBool::new(true)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -115,20 +117,16 @@ impl ProcessControl {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn kill_all(&mut self) {
|
pub fn kill_all(&mut self) {
|
||||||
*self.is_terminated.lock().unwrap() = true;
|
self.is_terminated.store(true, Ordering::SeqCst);
|
||||||
|
|
||||||
if *self.is_alive.read().unwrap() {
|
if self.is_alive.load(Ordering::SeqCst) {
|
||||||
*self.is_alive.write().unwrap() = false;
|
self.is_alive.store(false, Ordering::SeqCst);
|
||||||
|
|
||||||
if let Some(rpc) = &*self.rpc_handle.lock().unwrap() {
|
if let Some(rpc) = &*self.rpc_handle.lock().unwrap() {
|
||||||
rpc.clone().close()
|
rpc.clone().close()
|
||||||
};
|
};
|
||||||
|
|
||||||
for unit in [
|
for unit in [Decoder, Encoder, Ingest] {
|
||||||
Decoder,
|
|
||||||
Encoder,
|
|
||||||
Ingest,
|
|
||||||
] {
|
|
||||||
if let Err(e) = self.kill(unit) {
|
if let Err(e) = self.kill(unit) {
|
||||||
error!("{e}")
|
error!("{e}")
|
||||||
}
|
}
|
||||||
@ -147,7 +145,7 @@ impl Drop for ProcessControl {
|
|||||||
pub struct PlayerControl {
|
pub struct PlayerControl {
|
||||||
pub current_media: Arc<Mutex<Option<Media>>>,
|
pub current_media: Arc<Mutex<Option<Media>>>,
|
||||||
pub current_list: Arc<Mutex<Vec<Media>>>,
|
pub current_list: Arc<Mutex<Vec<Media>>>,
|
||||||
pub index: Arc<Mutex<usize>>,
|
pub index: Arc<AtomicUsize>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl PlayerControl {
|
impl PlayerControl {
|
||||||
@ -155,7 +153,7 @@ impl PlayerControl {
|
|||||||
Self {
|
Self {
|
||||||
current_media: Arc::new(Mutex::new(None)),
|
current_media: Arc::new(Mutex::new(None)),
|
||||||
current_list: Arc::new(Mutex::new(vec![Media::new(0, String::new(), false)])),
|
current_list: Arc::new(Mutex::new(vec![Media::new(0, String::new(), false)])),
|
||||||
index: Arc::new(Mutex::new(0)),
|
index: Arc::new(AtomicUsize::new(0)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -165,7 +163,7 @@ pub struct PlayoutStatus {
|
|||||||
pub time_shift: Arc<Mutex<f64>>,
|
pub time_shift: Arc<Mutex<f64>>,
|
||||||
pub date: Arc<Mutex<String>>,
|
pub date: Arc<Mutex<String>>,
|
||||||
pub current_date: Arc<Mutex<String>>,
|
pub current_date: Arc<Mutex<String>>,
|
||||||
pub list_init: Arc<Mutex<bool>>,
|
pub list_init: Arc<AtomicBool>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl PlayoutStatus {
|
impl PlayoutStatus {
|
||||||
@ -174,8 +172,7 @@ impl PlayoutStatus {
|
|||||||
time_shift: Arc::new(Mutex::new(0.0)),
|
time_shift: Arc::new(Mutex::new(0.0)),
|
||||||
date: Arc::new(Mutex::new(String::new())),
|
date: Arc::new(Mutex::new(String::new())),
|
||||||
current_date: Arc::new(Mutex::new(String::new())),
|
current_date: Arc::new(Mutex::new(String::new())),
|
||||||
list_init: Arc::new(Mutex::new(true)),
|
list_init: Arc::new(AtomicBool::new(true)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2,7 +2,7 @@ use std::{
|
|||||||
fs::{create_dir_all, write},
|
fs::{create_dir_all, write},
|
||||||
path::Path,
|
path::Path,
|
||||||
process::exit,
|
process::exit,
|
||||||
sync::{Arc, Mutex},
|
sync::{atomic::AtomicUsize, Arc, Mutex},
|
||||||
};
|
};
|
||||||
|
|
||||||
use chrono::{Duration, NaiveDate};
|
use chrono::{Duration, NaiveDate};
|
||||||
@ -50,7 +50,7 @@ pub fn generate_playlist(mut date_range: Vec<String>) {
|
|||||||
let config = GlobalConfig::global();
|
let config = GlobalConfig::global();
|
||||||
let total_length = config.playlist.length_sec.unwrap().clone();
|
let total_length = config.playlist.length_sec.unwrap().clone();
|
||||||
let current_list = Arc::new(Mutex::new(vec![Media::new(0, "".to_string(), false)]));
|
let current_list = Arc::new(Mutex::new(vec![Media::new(0, "".to_string(), false)]));
|
||||||
let index = Arc::new(Mutex::new(0));
|
let index = Arc::new(AtomicUsize::new(0));
|
||||||
let playlist_root = Path::new(&config.playlist.path);
|
let playlist_root = Path::new(&config.playlist.path);
|
||||||
|
|
||||||
if !playlist_root.is_dir() {
|
if !playlist_root.is_dir() {
|
||||||
|
@ -2,7 +2,7 @@ use serde::{Deserialize, Serialize};
|
|||||||
use std::{
|
use std::{
|
||||||
fs::File,
|
fs::File,
|
||||||
path::Path,
|
path::Path,
|
||||||
sync::{Arc, Mutex},
|
sync::{atomic::AtomicBool, Arc},
|
||||||
thread,
|
thread,
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -46,7 +46,7 @@ impl Playlist {
|
|||||||
|
|
||||||
pub fn read_json(
|
pub fn read_json(
|
||||||
path: Option<String>,
|
path: Option<String>,
|
||||||
is_terminated: Arc<Mutex<bool>>,
|
is_terminated: Arc<AtomicBool>,
|
||||||
seek: bool,
|
seek: bool,
|
||||||
next_start: f64,
|
next_start: f64,
|
||||||
) -> Playlist {
|
) -> Playlist {
|
||||||
@ -109,11 +109,7 @@ pub fn read_json(
|
|||||||
|
|
||||||
let list_clone = playlist.clone();
|
let list_clone = playlist.clone();
|
||||||
|
|
||||||
thread::spawn(move || validate_playlist(
|
thread::spawn(move || validate_playlist(list_clone, is_terminated, config.clone()));
|
||||||
list_clone,
|
|
||||||
is_terminated,
|
|
||||||
config.clone(),
|
|
||||||
));
|
|
||||||
|
|
||||||
playlist
|
playlist
|
||||||
}
|
}
|
||||||
|
@ -1,10 +1,10 @@
|
|||||||
use std::{path::Path, sync::{Arc, Mutex},};
|
use std::{path::Path, sync::{atomic::{AtomicBool, Ordering}, Arc}};
|
||||||
|
|
||||||
use simplelog::*;
|
use simplelog::*;
|
||||||
|
|
||||||
use crate::utils::{sec_to_time, GlobalConfig, MediaProbe, Playlist};
|
use crate::utils::{sec_to_time, GlobalConfig, MediaProbe, Playlist};
|
||||||
|
|
||||||
pub fn validate_playlist(playlist: Playlist, is_terminated: Arc<Mutex<bool>>, config: GlobalConfig) {
|
pub fn validate_playlist(playlist: Playlist, is_terminated: Arc<AtomicBool>, config: GlobalConfig) {
|
||||||
let date = playlist.date;
|
let date = playlist.date;
|
||||||
let mut length = config.playlist.length_sec.unwrap();
|
let mut length = config.playlist.length_sec.unwrap();
|
||||||
let mut begin = config.playlist.start_sec.unwrap();
|
let mut begin = config.playlist.start_sec.unwrap();
|
||||||
@ -14,7 +14,7 @@ pub fn validate_playlist(playlist: Playlist, is_terminated: Arc<Mutex<bool>>, co
|
|||||||
debug!("validate playlist from: <yellow>{date}</>");
|
debug!("validate playlist from: <yellow>{date}</>");
|
||||||
|
|
||||||
for item in playlist.program.iter() {
|
for item in playlist.program.iter() {
|
||||||
if *is_terminated.lock().unwrap() {
|
if is_terminated.load(Ordering::SeqCst) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user