diff --git a/ffplayout/src/api/routes.rs b/ffplayout/src/api/routes.rs index 3c184fc2..7dd3bd4b 100644 --- a/ffplayout/src/api/routes.rs +++ b/ffplayout/src/api/routes.rs @@ -928,6 +928,7 @@ pub async fn process_control( manager.async_start().await; } ProcessCtl::Stop => { + manager.channel.lock().unwrap().active = false; manager.async_stop().await; } ProcessCtl::Restart => { diff --git a/ffplayout/src/main.rs b/ffplayout/src/main.rs index 1f494ba4..41faf79e 100644 --- a/ffplayout/src/main.rs +++ b/ffplayout/src/main.rs @@ -274,8 +274,9 @@ async fn main() -> std::io::Result<()> { } } - for channel in &channel_controllers.lock().unwrap().channels { - channel.stop_all(); + for channel_ctl in &channel_controllers.lock().unwrap().channels { + channel_ctl.channel.lock().unwrap().active = false; + channel_ctl.stop_all(); } Ok(()) diff --git a/ffplayout/src/player/controller.rs b/ffplayout/src/player/controller.rs index a8507e43..b5567061 100644 --- a/ffplayout/src/player/controller.rs +++ b/ffplayout/src/player/controller.rs @@ -7,6 +7,7 @@ use std::{ Arc, Mutex, }, thread, + time::Duration, }; #[cfg(not(windows))] @@ -129,12 +130,31 @@ impl ChannelManager { }; thread::spawn(move || { - let run_count = self_clone.run_count.clone(); + let mut run_endless = true; - if let Err(e) = start_channel(self_clone) { - run_count.fetch_sub(1, Ordering::SeqCst); - error!("{e}"); - }; + while run_endless { + let run_count = self_clone.run_count.clone(); + + if let Err(e) = start_channel(self_clone.clone()) { + run_count.fetch_sub(1, Ordering::SeqCst); + error!("{e}"); + }; + + let active = self_clone.channel.lock().unwrap().active; + + if !active { + run_endless = false; + } else { + self_clone.run_count.fetch_add(1, Ordering::SeqCst); + self_clone.is_alive.store(true, Ordering::SeqCst); + self_clone.is_terminated.store(false, Ordering::SeqCst); + self_clone.list_init.store(true, Ordering::SeqCst); + + thread::sleep(Duration::from_millis(250)); + } + } + + trace!("Async start done"); }); } } @@ -179,8 +199,6 @@ impl ChannelManager { } pub fn stop(&self, unit: ProcessUnit) -> Result<(), ProcessError> { - let mut channel = self.channel.lock()?; - match unit { Decoder => { if let Some(proc) = self.decoder.lock()?.as_mut() { @@ -207,8 +225,6 @@ impl ChannelManager { } } - channel.active = false; - self.wait(unit)?; Ok(()) diff --git a/ffplayout/src/player/input/ingest.rs b/ffplayout/src/player/input/ingest.rs index a574cab9..1008515c 100644 --- a/ffplayout/src/player/input/ingest.rs +++ b/ffplayout/src/player/input/ingest.rs @@ -47,6 +47,7 @@ fn server_monitor( .iter() .any(|i| line.contains(*i)) { + channel_mgr.channel.lock().unwrap().active = false; channel_mgr.stop_all(); } } @@ -91,6 +92,7 @@ pub fn ingest_server( if let Some(url) = stream_input.iter().find(|s| s.contains("://")) { if !test_tcp_port(id, url) { + channel_mgr.channel.lock().unwrap().active = false; channel_mgr.stop_all(); } diff --git a/ffplayout/src/player/output/hls.rs b/ffplayout/src/player/output/hls.rs index 3621b7f7..66761965 100644 --- a/ffplayout/src/player/output/hls.rs +++ b/ffplayout/src/player/output/hls.rs @@ -66,6 +66,7 @@ fn ingest_to_hls_server(manager: ChannelManager) -> Result<(), ProcessError> { if let Some(url) = stream_input.iter().find(|s| s.contains("://")) { if !test_tcp_port(id, url) { + manager.channel.lock().unwrap().active = false; manager.stop_all(); } diff --git a/ffplayout/src/player/utils/mod.rs b/ffplayout/src/player/utils/mod.rs index 89ea3704..3766074e 100644 --- a/ffplayout/src/player/utils/mod.rs +++ b/ffplayout/src/player/utils/mod.rs @@ -811,6 +811,7 @@ pub fn stderr_reader( || (line.contains("No such file or directory") && !line.contains("failed to delete old segment")) { + manager.channel.lock().unwrap().active = false; manager.stop_all(); } } diff --git a/ffplayout/src/utils/args_parse.rs b/ffplayout/src/utils/args_parse.rs index 8f1bd9c6..f281b772 100644 --- a/ffplayout/src/utils/args_parse.rs +++ b/ffplayout/src/utils/args_parse.rs @@ -104,8 +104,8 @@ pub struct Args { )] pub generate: Option>, - #[clap(long, help = "Optional folder path list for playlist generations", num_args = 1..)] - pub gen_paths: Option>, + #[clap(long, help = "Optional path list for playlist generations", num_args = 1..)] + pub paths: Option>, #[clap(long, env, help = "Keep log file for given days")] pub log_backup_count: Option, diff --git a/ffplayout/src/utils/config.rs b/ffplayout/src/utils/config.rs index fe8b6027..1a3b07d5 100644 --- a/ffplayout/src/utils/config.rs +++ b/ffplayout/src/utils/config.rs @@ -843,7 +843,7 @@ pub async fn get_config(pool: &Pool, channel_id: i32) -> Result { + manager.channel.lock().unwrap().active = false; manager.stop_all(); let mut data_map = Map::new(); diff --git a/ffplayout/src/utils/logging.rs b/ffplayout/src/utils/logging.rs index 3ba38d6f..f58d3b7d 100644 --- a/ffplayout/src/utils/logging.rs +++ b/ffplayout/src/utils/logging.rs @@ -30,7 +30,11 @@ pub struct Target; impl Target { pub fn all() -> &'static str { - "{file,mail,_Default}" + if ARGS.log_to_console { + "{_Default}" + } else { + "{file,mail,_Default}" + } } pub fn console() -> &'static str { diff --git a/tests/src/engine_playlist.rs b/tests/src/engine_playlist.rs index b5ea36cf..3bd603e8 100644 --- a/tests/src/engine_playlist.rs +++ b/tests/src/engine_playlist.rs @@ -49,6 +49,7 @@ fn timed_stop(sec: u64, manager: ChannelManager) { println!("Timed stop of process"); + manager.channel.lock().unwrap().active = false; manager.stop_all(); }