fix restart, when out of sync, log only ones when console log is active

This commit is contained in:
jb-alvarado 2024-08-26 20:08:12 +02:00
parent c7871f21d6
commit 537efc6d97
11 changed files with 43 additions and 15 deletions

View File

@ -928,6 +928,7 @@ pub async fn process_control(
manager.async_start().await;
}
ProcessCtl::Stop => {
manager.channel.lock().unwrap().active = false;
manager.async_stop().await;
}
ProcessCtl::Restart => {

View File

@ -274,8 +274,9 @@ async fn main() -> std::io::Result<()> {
}
}
for channel in &channel_controllers.lock().unwrap().channels {
channel.stop_all();
for channel_ctl in &channel_controllers.lock().unwrap().channels {
channel_ctl.channel.lock().unwrap().active = false;
channel_ctl.stop_all();
}
Ok(())

View File

@ -7,6 +7,7 @@ use std::{
Arc, Mutex,
},
thread,
time::Duration,
};
#[cfg(not(windows))]
@ -129,12 +130,31 @@ impl ChannelManager {
};
thread::spawn(move || {
let mut run_endless = true;
while run_endless {
let run_count = self_clone.run_count.clone();
if let Err(e) = start_channel(self_clone) {
if let Err(e) = start_channel(self_clone.clone()) {
run_count.fetch_sub(1, Ordering::SeqCst);
error!("{e}");
};
let active = self_clone.channel.lock().unwrap().active;
if !active {
run_endless = false;
} else {
self_clone.run_count.fetch_add(1, Ordering::SeqCst);
self_clone.is_alive.store(true, Ordering::SeqCst);
self_clone.is_terminated.store(false, Ordering::SeqCst);
self_clone.list_init.store(true, Ordering::SeqCst);
thread::sleep(Duration::from_millis(250));
}
}
trace!("Async start done");
});
}
}
@ -179,8 +199,6 @@ impl ChannelManager {
}
pub fn stop(&self, unit: ProcessUnit) -> Result<(), ProcessError> {
let mut channel = self.channel.lock()?;
match unit {
Decoder => {
if let Some(proc) = self.decoder.lock()?.as_mut() {
@ -207,8 +225,6 @@ impl ChannelManager {
}
}
channel.active = false;
self.wait(unit)?;
Ok(())

View File

@ -47,6 +47,7 @@ fn server_monitor(
.iter()
.any(|i| line.contains(*i))
{
channel_mgr.channel.lock().unwrap().active = false;
channel_mgr.stop_all();
}
}
@ -91,6 +92,7 @@ pub fn ingest_server(
if let Some(url) = stream_input.iter().find(|s| s.contains("://")) {
if !test_tcp_port(id, url) {
channel_mgr.channel.lock().unwrap().active = false;
channel_mgr.stop_all();
}

View File

@ -66,6 +66,7 @@ fn ingest_to_hls_server(manager: ChannelManager) -> Result<(), ProcessError> {
if let Some(url) = stream_input.iter().find(|s| s.contains("://")) {
if !test_tcp_port(id, url) {
manager.channel.lock().unwrap().active = false;
manager.stop_all();
}

View File

@ -811,6 +811,7 @@ pub fn stderr_reader(
|| (line.contains("No such file or directory")
&& !line.contains("failed to delete old segment"))
{
manager.channel.lock().unwrap().active = false;
manager.stop_all();
}
}

View File

@ -104,8 +104,8 @@ pub struct Args {
)]
pub generate: Option<Vec<String>>,
#[clap(long, help = "Optional folder path list for playlist generations", num_args = 1..)]
pub gen_paths: Option<Vec<PathBuf>>,
#[clap(long, help = "Optional path list for playlist generations", num_args = 1..)]
pub paths: Option<Vec<PathBuf>>,
#[clap(long, env, help = "Keep log file for given days")]
pub log_backup_count: Option<usize>,

View File

@ -843,7 +843,7 @@ pub async fn get_config(pool: &Pool<Sqlite>, channel_id: i32) -> Result<PlayoutC
config.general.template = Some(template);
}
if let Some(paths) = args.gen_paths {
if let Some(paths) = args.paths {
config.storage.paths = paths;
}

View File

@ -240,6 +240,7 @@ pub async fn control_state(
}
"stop_all" => {
manager.channel.lock().unwrap().active = false;
manager.stop_all();
let mut data_map = Map::new();

View File

@ -30,8 +30,12 @@ pub struct Target;
impl Target {
pub fn all() -> &'static str {
if ARGS.log_to_console {
"{_Default}"
} else {
"{file,mail,_Default}"
}
}
pub fn console() -> &'static str {
"{console}"

View File

@ -49,6 +49,7 @@ fn timed_stop(sec: u64, manager: ChannelManager) {
println!("Timed stop of process");
manager.channel.lock().unwrap().active = false;
manager.stop_all();
}