better error handling on channel create/delete, always try to terminate sub processes, update packages

This commit is contained in:
jb-alvarado 2024-09-05 12:06:53 +02:00
parent d54745a860
commit d2027b7920
11 changed files with 87 additions and 87 deletions

56
Cargo.lock generated
View File

@ -741,9 +741,9 @@ dependencies = [
[[package]] [[package]]
name = "cc" name = "cc"
version = "1.1.15" version = "1.1.16"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "57b6a275aa2903740dc87da01c62040406b8812552e97129a63ea8850a17c6e6" checksum = "e9d013ecb737093c0e86b151a7b837993cf9ec6c502946cfb44bedc392421e0b"
dependencies = [ dependencies = [
"jobserver", "jobserver",
"libc", "libc",
@ -799,9 +799,9 @@ dependencies = [
[[package]] [[package]]
name = "clap" name = "clap"
version = "4.5.16" version = "4.5.17"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed6719fffa43d0d87e5fd8caeab59be1554fb028cd30edc88fc4369b17971019" checksum = "3e5a21b8495e732f1b3c364c9949b201ca7bae518c502c80256c96ad79eaf6ac"
dependencies = [ dependencies = [
"clap_builder", "clap_builder",
"clap_derive", "clap_derive",
@ -809,9 +809,9 @@ dependencies = [
[[package]] [[package]]
name = "clap_builder" name = "clap_builder"
version = "4.5.15" version = "4.5.17"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "216aec2b177652e3846684cbfe25c9964d18ec45234f0f5da5157b207ed1aab6" checksum = "8cf2dd12af7a047ad9d6da2b6b249759a22a7abc0f474c1dae1777afa4b21a73"
dependencies = [ dependencies = [
"anstream", "anstream",
"anstyle", "anstyle",
@ -1685,9 +1685,9 @@ dependencies = [
[[package]] [[package]]
name = "hyper-rustls" name = "hyper-rustls"
version = "0.27.2" version = "0.27.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5ee4be2c948921a1a5320b629c4193916ed787a7f7f293fd3f7f5a6c9de74155" checksum = "08afdbb5c31130e3034af566421053ab03787c640246a446327f550d11bcb333"
dependencies = [ dependencies = [
"futures-util", "futures-util",
"http 1.1.0", "http 1.1.0",
@ -2632,9 +2632,9 @@ dependencies = [
[[package]] [[package]]
name = "psm" name = "psm"
version = "0.1.22" version = "0.1.23"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3b1f9bf148c15500d44581654fb9260bc9d82970f3ef777a79a40534f6aa784f" checksum = "aa37f80ca58604976033fae9515a8a2989fc13797d953f7c04fb8fa36a11f205"
dependencies = [ dependencies = [
"cc", "cc",
] ]
@ -3073,9 +3073,9 @@ dependencies = [
[[package]] [[package]]
name = "serde_json" name = "serde_json"
version = "1.0.127" version = "1.0.128"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8043c06d9f82bd7271361ed64f415fe5e12a77fdb52e573e7f06a516dea329ad" checksum = "6ff5456707a1de34e7e37f2a6fd3d3f808c318259cbd01ab6377795054b483d8"
dependencies = [ dependencies = [
"itoa", "itoa",
"memchr", "memchr",
@ -3302,9 +3302,9 @@ dependencies = [
[[package]] [[package]]
name = "sqlx" name = "sqlx"
version = "0.8.1" version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fcfa89bea9500db4a0d038513d7a060566bfc51d46d1c014847049a45cce85e8" checksum = "93334716a037193fac19df402f8571269c84a00852f6a7066b5d2616dcd64d3e"
dependencies = [ dependencies = [
"sqlx-core", "sqlx-core",
"sqlx-macros", "sqlx-macros",
@ -3315,9 +3315,9 @@ dependencies = [
[[package]] [[package]]
name = "sqlx-core" name = "sqlx-core"
version = "0.8.1" version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d06e2f2bd861719b1f3f0c7dbe1d80c30bf59e76cf019f07d9014ed7eefb8e08" checksum = "d4d8060b456358185f7d50c55d9b5066ad956956fddec42ee2e8567134a8936e"
dependencies = [ dependencies = [
"atoi", "atoi",
"byteorder", "byteorder",
@ -3354,9 +3354,9 @@ dependencies = [
[[package]] [[package]]
name = "sqlx-macros" name = "sqlx-macros"
version = "0.8.1" version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2f998a9defdbd48ed005a89362bd40dd2117502f15294f61c8d47034107dbbdc" checksum = "cac0692bcc9de3b073e8d747391827297e075c7710ff6276d9f7a1f3d58c6657"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
@ -3367,9 +3367,9 @@ dependencies = [
[[package]] [[package]]
name = "sqlx-macros-core" name = "sqlx-macros-core"
version = "0.8.1" version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3d100558134176a2629d46cec0c8891ba0be8910f7896abfdb75ef4ab6f4e7ce" checksum = "1804e8a7c7865599c9c79be146dc8a9fd8cc86935fa641d3ea58e5f0688abaa5"
dependencies = [ dependencies = [
"dotenvy", "dotenvy",
"either", "either",
@ -3393,9 +3393,9 @@ dependencies = [
[[package]] [[package]]
name = "sqlx-mysql" name = "sqlx-mysql"
version = "0.8.1" version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "936cac0ab331b14cb3921c62156d913e4c15b74fb6ec0f3146bd4ef6e4fb3c12" checksum = "64bb4714269afa44aef2755150a0fc19d756fb580a67db8885608cf02f47d06a"
dependencies = [ dependencies = [
"atoi", "atoi",
"base64 0.22.1", "base64 0.22.1",
@ -3435,9 +3435,9 @@ dependencies = [
[[package]] [[package]]
name = "sqlx-postgres" name = "sqlx-postgres"
version = "0.8.1" version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9734dbce698c67ecf67c442f768a5e90a49b2a4d61a9f1d59f73874bd4cf0710" checksum = "6fa91a732d854c5d7726349bb4bb879bb9478993ceb764247660aee25f67c2f8"
dependencies = [ dependencies = [
"atoi", "atoi",
"base64 0.22.1", "base64 0.22.1",
@ -3473,9 +3473,9 @@ dependencies = [
[[package]] [[package]]
name = "sqlx-sqlite" name = "sqlx-sqlite"
version = "0.8.1" version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a75b419c3c1b1697833dd927bdc4c6545a620bc1bbafabd44e1efbe9afcd337e" checksum = "d5b2cf34a45953bfd3daaf3db0f7a7878ab9b7a6b91b422d24a7a9e4c857b680"
dependencies = [ dependencies = [
"atoi", "atoi",
"flume", "flume",
@ -3853,9 +3853,9 @@ dependencies = [
[[package]] [[package]]
name = "tokio-util" name = "tokio-util"
version = "0.7.11" version = "0.7.12"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9cf6b47b3771c49ac75ad09a6162f53ad4b8088b76ac60e8ec1455b31a189fe1" checksum = "61e7c3654c13bcd040d4a03abee2c75b1d14a37b423cf5a813ceae1cc903ec6a"
dependencies = [ dependencies = [
"bytes", "bytes",
"futures-core", "futures-core",

View File

@ -570,7 +570,13 @@ async fn get_advanced_config(
role: AuthDetails<Role>, role: AuthDetails<Role>,
user: web::ReqData<UserMeta>, user: web::ReqData<UserMeta>,
) -> Result<impl Responder, ServiceError> { ) -> Result<impl Responder, ServiceError> {
let manager = controllers.lock().unwrap().get(*id).unwrap(); let manager = controllers
.lock()
.unwrap()
.get(*id)
.ok_or(ServiceError::BadRequest(format!(
"Channel ({id}) not exists!"
)))?;
let config = manager.config.lock().unwrap().advanced.clone(); let config = manager.config.lock().unwrap().advanced.clone();
Ok(web::Json(config)) Ok(web::Json(config))
@ -626,7 +632,13 @@ async fn get_playout_config(
role: AuthDetails<Role>, role: AuthDetails<Role>,
user: web::ReqData<UserMeta>, user: web::ReqData<UserMeta>,
) -> Result<impl Responder, ServiceError> { ) -> Result<impl Responder, ServiceError> {
let manager = controllers.lock().unwrap().get(*id).unwrap(); let manager = controllers
.lock()
.unwrap()
.get(*id)
.ok_or(ServiceError::BadRequest(format!(
"Channel ({id}) not exists!"
)))?;
let config = manager.config.lock().unwrap().clone(); let config = manager.config.lock().unwrap().clone();
Ok(web::Json(config)) Ok(web::Json(config))

View File

@ -73,7 +73,9 @@ async fn main() -> std::io::Result<()> {
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
for channel in channels.iter() { for channel in channels.iter() {
let config = get_config(&pool, channel.id).await?; let config = get_config(&pool, channel.id)
.await
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
let manager = ChannelManager::new(Some(pool.clone()), channel.clone(), config.clone()); let manager = ChannelManager::new(Some(pool.clone()), channel.clone(), config.clone());
let m_queue = Arc::new(Mutex::new(MailQueue::new(channel.id, config.mail))); let m_queue = Arc::new(Mutex::new(MailQueue::new(channel.id, config.mail)));
@ -210,7 +212,9 @@ async fn main() -> std::io::Result<()> {
let channels = ARGS.channels.clone().unwrap_or_else(|| vec![1]); let channels = ARGS.channels.clone().unwrap_or_else(|| vec![1]);
for (index, channel_id) in channels.iter().enumerate() { for (index, channel_id) in channels.iter().enumerate() {
let config = get_config(&pool, *channel_id).await?; let config = get_config(&pool, *channel_id)
.await
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
let channel = handles::select_channel(&pool, channel_id).await.unwrap(); let channel = handles::select_channel(&pool, channel_id).await.unwrap();
let manager = ChannelManager::new(Some(pool.clone()), channel.clone(), config.clone()); let manager = ChannelManager::new(Some(pool.clone()), channel.clone(), config.clone());

View File

@ -126,7 +126,7 @@ impl ChannelManager {
let channel_id = self.channel.lock().unwrap().id; let channel_id = self.channel.lock().unwrap().id;
if let Err(e) = handles::update_player(&pool_clone, channel_id, true).await { if let Err(e) = handles::update_player(&pool_clone, channel_id, true).await {
error!("Unable write to player status: {e}"); error!(target: Target::all(), channel = channel_id; "Unable write to player status: {e}");
}; };
thread::spawn(move || { thread::spawn(move || {
@ -171,7 +171,7 @@ impl ChannelManager {
let channel_id = self.channel.lock().unwrap().id; let channel_id = self.channel.lock().unwrap().id;
if let Err(e) = handles::update_player(&pool_clone, channel_id, true).await { if let Err(e) = handles::update_player(&pool_clone, channel_id, true).await {
error!("Unable write to player status: {e}"); error!(target: Target::all(), channel = channel_id; "Unable write to player status: {e}");
}; };
if index + 1 == ARGS.channels.clone().unwrap_or_default().len() { if index + 1 == ARGS.channels.clone().unwrap_or_default().len() {
@ -258,16 +258,16 @@ impl ChannelManager {
} }
pub async fn async_stop(&self) { pub async fn async_stop(&self) {
debug!("Stop all child processes");
self.is_terminated.store(true, Ordering::SeqCst); self.is_terminated.store(true, Ordering::SeqCst);
self.is_alive.store(false, Ordering::SeqCst); self.is_alive.store(false, Ordering::SeqCst);
self.ingest_is_running.store(false, Ordering::SeqCst); self.ingest_is_running.store(false, Ordering::SeqCst);
self.run_count.fetch_sub(1, Ordering::SeqCst); self.run_count.fetch_sub(1, Ordering::SeqCst);
let pool = self.db_pool.clone().unwrap(); let pool = self.db_pool.clone().unwrap();
let channel_id = self.channel.lock().unwrap().id; let channel_id = self.channel.lock().unwrap().id;
debug!(target: Target::all(), channel = channel_id; "Stop all child processes from channel {channel_id}");
if let Err(e) = handles::update_player(&pool, channel_id, false).await { if let Err(e) = handles::update_player(&pool, channel_id, false).await {
error!("Unable write to player status: {e}"); error!(target: Target::all(), channel = channel_id; "Unable write to player status: {e}");
}; };
for unit in [Decoder, Encoder, Ingest] { for unit in [Decoder, Encoder, Ingest] {
@ -281,26 +281,22 @@ impl ChannelManager {
/// No matter what is running, terminate them all. /// No matter what is running, terminate them all.
pub fn stop_all(&self) { pub fn stop_all(&self) {
debug!("Stop all child processes");
self.is_terminated.store(true, Ordering::SeqCst); self.is_terminated.store(true, Ordering::SeqCst);
self.is_alive.store(false, Ordering::SeqCst);
self.ingest_is_running.store(false, Ordering::SeqCst); self.ingest_is_running.store(false, Ordering::SeqCst);
self.run_count.fetch_sub(1, Ordering::SeqCst); self.run_count.fetch_sub(1, Ordering::SeqCst);
let channel_id = self.channel.lock().unwrap().id;
debug!(target: Target::all(), channel = channel_id; "Stop all child processes from channel {channel_id}");
if self.is_alive.load(Ordering::SeqCst) { for unit in [Decoder, Encoder, Ingest] {
self.is_alive.store(false, Ordering::SeqCst); if let Err(e) = self.stop(unit) {
if !e.to_string().contains("exited process") {
trace!("Playout is alive and processes are terminated"); error!(target: Target::all(), channel = channel_id; "{e}")
for unit in [Decoder, Encoder, Ingest] {
if let Err(e) = self.stop(unit) {
if !e.to_string().contains("exited process") {
error!("{e}")
}
} }
if let Err(e) = self.wait(unit) { }
if !e.to_string().contains("exited process") { if let Err(e) = self.wait(unit) {
error!("{e}") if !e.to_string().contains("exited process") {
} error!(target: Target::all(), channel = channel_id; "{e}")
} }
} }
} }

View File

@ -1,5 +1,5 @@
use std::{ use std::{
fmt, io, fmt,
path::{Path, PathBuf}, path::{Path, PathBuf},
str::FromStr, str::FromStr,
}; };
@ -552,19 +552,13 @@ fn default_track_index() -> i32 {
// } // }
impl PlayoutConfig { impl PlayoutConfig {
pub async fn new(pool: &Pool<Sqlite>, channel_id: i32) -> Self { pub async fn new(pool: &Pool<Sqlite>, channel_id: i32) -> Result<Self, ServiceError> {
let global = handles::select_global(pool) let global = handles::select_global(pool)
.await .await
.expect("Can't read globals"); .expect("Can't read globals");
let channel = handles::select_channel(pool, &channel_id) let channel = handles::select_channel(pool, &channel_id).await?;
.await let config = handles::select_configuration(pool, channel_id).await?;
.expect("Can't read channel"); let adv_config = handles::select_advanced_configuration(pool, channel_id).await?;
let config = handles::select_configuration(pool, channel_id)
.await
.expect("Can't read config");
let adv_config = handles::select_advanced_configuration(pool, channel_id)
.await
.expect("Can't read advanced config");
let channel = Channel::new(&global, channel); let channel = Channel::new(&global, channel);
let advanced = AdvancedConfig::new(adv_config); let advanced = AdvancedConfig::new(adv_config);
@ -590,23 +584,14 @@ impl PlayoutConfig {
Storage::new(&config, channel.storage_path.clone(), global.shared_storage); Storage::new(&config, channel.storage_path.clone(), global.shared_storage);
if !channel.playlist_path.is_dir() { if !channel.playlist_path.is_dir() {
tokio::fs::create_dir_all(&channel.playlist_path) tokio::fs::create_dir_all(&channel.playlist_path).await?;
.await
.unwrap_or_else(|_| {
panic!("Can't create playlist folder: {:#?}", channel.playlist_path)
});
} }
if !channel.logging_path.is_dir() { if !channel.logging_path.is_dir() {
tokio::fs::create_dir_all(&channel.logging_path) tokio::fs::create_dir_all(&channel.logging_path).await?;
.await
.unwrap_or_else(|_| {
panic!("Can't create logging folder: {:#?}", channel.logging_path)
});
} }
let (filler_path, _, _) = norm_abs_path(&channel.storage_path, &config.storage_filler) let (filler_path, _, _) = norm_abs_path(&channel.storage_path, &config.storage_filler)?;
.expect("Can't get filler path");
storage.filler = filler_path; storage.filler = filler_path;
@ -700,10 +685,10 @@ impl PlayoutConfig {
for item in cmd.iter_mut() { for item in cmd.iter_mut() {
if item.ends_with(".ts") || (item.ends_with(".m3u8") && item != "master.m3u8") { if item.ends_with(".ts") || (item.ends_with(".m3u8") && item != "master.m3u8") {
if let Ok((hls_path, _, _)) = norm_abs_path(&channel.hls_path, item) { if let Ok((hls_path, _, _)) = norm_abs_path(&channel.hls_path, item) {
let parent = hls_path.parent().expect("HLS parent path"); let parent = hls_path.parent().ok_or("HLS parent path")?;
if !parent.is_dir() { if !parent.is_dir() {
fs::create_dir_all(parent).await.expect("Create HLS path"); fs::create_dir_all(parent).await?;
} }
item.clone_from(&hls_path.to_string_lossy().to_string()); item.clone_from(&hls_path.to_string_lossy().to_string());
}; };
@ -726,7 +711,7 @@ impl PlayoutConfig {
text.node_pos = None; text.node_pos = None;
} }
Self { Ok(Self {
channel, channel,
advanced, advanced,
general, general,
@ -739,11 +724,11 @@ impl PlayoutConfig {
text, text,
task, task,
output, output,
} })
} }
pub async fn dump(pool: &Pool<Sqlite>, id: i32) -> Result<(), ServiceError> { pub async fn dump(pool: &Pool<Sqlite>, id: i32) -> Result<(), ServiceError> {
let mut config = Self::new(pool, id).await; let mut config = Self::new(pool, id).await?;
config.storage.filler.clone_from( config.storage.filler.clone_from(
&config &config
.storage .storage
@ -819,8 +804,11 @@ fn pre_audio_codec(proc_filter: &str, ingest_filter: &str, channel_count: u8) ->
} }
/// Read command line arguments, and override the config with them. /// Read command line arguments, and override the config with them.
pub async fn get_config(pool: &Pool<Sqlite>, channel_id: i32) -> Result<PlayoutConfig, io::Error> { pub async fn get_config(
let mut config = PlayoutConfig::new(pool, channel_id).await; pool: &Pool<Sqlite>,
channel_id: i32,
) -> Result<PlayoutConfig, ServiceError> {
let mut config = PlayoutConfig::new(pool, channel_id).await?;
let args = ARGS.clone(); let args = ARGS.clone();
config.general.generate = args.generate; config.general.generate = args.generate;

@ -1 +1 @@
Subproject commit 1c7f6892fd0809c5fb5b0508753cac97697c28ca Subproject commit 48f123bf6ad136968495e9e5e22249b8ca5ef192

View File

@ -42,7 +42,7 @@ async fn prepare_config() -> (PlayoutConfig, ChannelManager, Pool<Sqlite>) {
handles::insert_user(&pool, user.clone()).await.unwrap(); handles::insert_user(&pool, user.clone()).await.unwrap();
let config = PlayoutConfig::new(&pool, 1).await; let config = PlayoutConfig::new(&pool, 1).await.unwrap();
let channel = handles::select_channel(&pool, &1).await.unwrap(); let channel = handles::select_channel(&pool, &1).await.unwrap();
let manager = ChannelManager::new(Some(pool.clone()), channel, config.clone()); let manager = ChannelManager::new(Some(pool.clone()), channel, config.clone());

View File

@ -31,7 +31,7 @@ async fn prepare_config() -> (PlayoutConfig, ChannelManager) {
.await .await
.unwrap(); .unwrap();
let config = PlayoutConfig::new(&pool, 1).await; let config = PlayoutConfig::new(&pool, 1).await.unwrap();
let channel = handles::select_channel(&pool, &1).await.unwrap(); let channel = handles::select_channel(&pool, &1).await.unwrap();
let manager = ChannelManager::new(Some(pool), channel, config.clone()); let manager = ChannelManager::new(Some(pool), channel, config.clone());

View File

@ -34,7 +34,7 @@ async fn prepare_config() -> (PlayoutConfig, ChannelManager) {
.await .await
.unwrap(); .unwrap();
let config = PlayoutConfig::new(&pool, 1).await; let config = PlayoutConfig::new(&pool, 1).await.unwrap();
let channel = handles::select_channel(&pool, &1).await.unwrap(); let channel = handles::select_channel(&pool, &1).await.unwrap();
let manager = ChannelManager::new(Some(pool), channel, config.clone()); let manager = ChannelManager::new(Some(pool), channel, config.clone());

View File

@ -33,7 +33,7 @@ async fn prepare_config() -> (PlayoutConfig, ChannelManager) {
.await .await
.unwrap(); .unwrap();
let config = PlayoutConfig::new(&pool, 1).await; let config = PlayoutConfig::new(&pool, 1).await.unwrap();
let channel = handles::select_channel(&pool, &1).await.unwrap(); let channel = handles::select_channel(&pool, &1).await.unwrap();
let manager = ChannelManager::new(Some(pool), channel, config.clone()); let manager = ChannelManager::new(Some(pool), channel, config.clone());

View File

@ -27,7 +27,7 @@ async fn prepare_config() -> (PlayoutConfig, ChannelManager) {
.await .await
.unwrap(); .unwrap();
let config = PlayoutConfig::new(&pool, 1).await; let config = PlayoutConfig::new(&pool, 1).await.unwrap();
let channel = handles::select_channel(&pool, &1).await.unwrap(); let channel = handles::select_channel(&pool, &1).await.unwrap();
let manager = ChannelManager::new(Some(pool), channel, config.clone()); let manager = ChannelManager::new(Some(pool), channel, config.clone());