diff --git a/Cargo.lock b/Cargo.lock index 31e660e0..ed78944b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1195,6 +1195,16 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" +[[package]] +name = "erased-serde" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24e2389d65ab4fab27dc2a5de7b191e1f6617d1f1c8855c0dc569c94a4cbb18d" +dependencies = [ + "serde", + "typeid", +] + [[package]] name = "errno" version = "0.3.9" @@ -1313,6 +1323,7 @@ dependencies = [ "lazy_static", "lexical-sort", "local-ip-address", + "log", "once_cell", "parking_lot", "path-clean", @@ -1324,10 +1335,12 @@ dependencies = [ "sanitize-filename", "serde", "serde_json", + "signal-child", "simplelog", "sqlx", "static-files", "sysinfo", + "time", "tokio", "tokio-stream", "toml_edit", @@ -2220,6 +2233,9 @@ version = "0.4.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c" dependencies = [ + "serde", + "sval", + "sval_ref", "value-bag", ] @@ -3094,6 +3110,15 @@ dependencies = [ "syn 2.0.63", ] +[[package]] +name = "serde_fmt" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1d4ddca14104cd60529e8c7f7ba71a2c8acd8f7f5cfcdc2faf97eeb7c3010a4" +dependencies = [ + "serde", +] + [[package]] name = "serde_html_form" version = "0.2.6" @@ -3567,6 +3592,84 @@ version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "81cdd64d312baedb58e21336b31bc043b77e01cc99033ce76ef539f78e965ebc" +[[package]] +name = "sval" +version = "2.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53eb957fbc79a55306d5d25d87daf3627bc3800681491cda0709eef36c748bfe" + +[[package]] +name = "sval_buffer" +version = "2.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96e860aef60e9cbf37888d4953a13445abf523c534640d1f6174d310917c410d" +dependencies = [ + "sval", + "sval_ref", +] + +[[package]] +name = "sval_dynamic" +version = "2.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea3f2b07929a1127d204ed7cb3905049381708245727680e9139dac317ed556f" +dependencies = [ + "sval", +] + +[[package]] +name = "sval_fmt" +version = "2.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4e188677497de274a1367c4bda15bd2296de4070d91729aac8f0a09c1abf64d" +dependencies = [ + "itoa", + "ryu", + "sval", +] + +[[package]] +name = "sval_json" +version = "2.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32f456c07dae652744781f2245d5e3b78e6a9ebad70790ac11eb15dbdbce5282" +dependencies = [ + "itoa", + "ryu", + "sval", +] + +[[package]] +name = "sval_nested" +version = "2.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "886feb24709f0476baaebbf9ac10671a50163caa7e439d7a7beb7f6d81d0a6fb" +dependencies = [ + "sval", + "sval_buffer", + "sval_ref", +] + +[[package]] +name = "sval_ref" +version = "2.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be2e7fc517d778f44f8cb64140afa36010999565528d48985f55e64d45f369ce" +dependencies = [ + "sval", +] + +[[package]] +name = "sval_serde" +version = "2.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "79bf66549a997ff35cd2114a27ac4b0c2843280f2cfa84b240d169ecaa0add46" +dependencies = [ + "serde", + "sval", + "sval_nested", +] + [[package]] name = "syn" version = "1.0.109" @@ -3889,6 +3992,12 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "typeid" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "059d83cc991e7a42fc37bd50941885db0888e34209f8cfd9aab07ddec03bc9cf" + [[package]] name = "typenum" version = "1.17.0" @@ -3986,6 +4095,36 @@ name = "value-bag" version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5a84c137d37ab0142f0f2ddfe332651fdbf252e7b7dbb4e67b6c1f1b2e925101" +dependencies = [ + "value-bag-serde1", + "value-bag-sval2", +] + +[[package]] +name = "value-bag-serde1" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ccacf50c5cb077a9abb723c5bcb5e0754c1a433f1e1de89edc328e2760b6328b" +dependencies = [ + "erased-serde", + "serde", + "serde_fmt", +] + +[[package]] +name = "value-bag-sval2" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1785bae486022dfb9703915d42287dcb284c1ee37bd1080eeba78cc04721285b" +dependencies = [ + "sval", + "sval_buffer", + "sval_dynamic", + "sval_fmt", + "sval_json", + "sval_ref", + "sval_serde", +] [[package]] name = "vcpkg" diff --git a/ffplayout/Cargo.toml b/ffplayout/Cargo.toml index e48b1ec5..f5492d7a 100644 --- a/ffplayout/Cargo.toml +++ b/ffplayout/Cargo.toml @@ -32,6 +32,7 @@ jsonwebtoken = "9" lazy_static = "1.4" lexical-sort = "0.3" local-ip-address = "0.6" +log = { version = "0.4", features = ["std", "serde", "kv", "kv_std", "kv_sval", "kv_serde"] } once_cell = "1.18" parking_lot = "0.12" path-clean = "1.0" @@ -47,11 +48,15 @@ simplelog = { version = "0.12", features = ["paris"] } static-files = "0.2" sysinfo ={ version = "0.30", features = ["linux-netdevs"] } sqlx = { version = "0.7", features = ["runtime-tokio", "sqlite"] } +time = { version = "0.3", features = ["formatting", "macros"] } tokio = { version = "1.29", features = ["full"] } tokio-stream = "0.1" toml_edit = {version ="0.22", features = ["serde"]} uuid = "1.8" +[target.'cfg(not(target_arch = "windows"))'.dependencies] +signal-child = "1" + [build-dependencies] static-files = "0.2" diff --git a/ffplayout/examples/multi_log.rs b/ffplayout/examples/multi_log.rs new file mode 100644 index 00000000..d43373b7 --- /dev/null +++ b/ffplayout/examples/multi_log.rs @@ -0,0 +1,101 @@ +use log::{LevelFilter, Log, Metadata, Record}; +use simplelog::*; +use std::fs::File; + +pub struct LogMailer { + level: LevelFilter, + pub config: Config, +} + +impl LogMailer { + pub fn new(log_level: LevelFilter, config: Config) -> Box { + Box::new(LogMailer { + level: log_level, + config, + }) + } +} + +impl Log for LogMailer { + fn enabled(&self, metadata: &Metadata<'_>) -> bool { + metadata.level() <= self.level + } + + fn log(&self, record: &Record<'_>) { + if self.enabled(record.metadata()) { + let _rec = record.args().to_string(); + + println!("{record:?}"); + println!("target: {:?}", record.target()); + } + } + + fn flush(&self) {} +} + +impl SharedLogger for LogMailer { + fn level(&self) -> LevelFilter { + self.level + } + + fn config(&self) -> Option<&Config> { + Some(&self.config) + } + + fn as_log(self: Box) -> Box { + Box::new(*self) + } +} + +struct Log2 { + logger: Box>, +} + +impl Log2 { + fn new() -> Self { + let log_file = File::create("log_file.log").expect("Failed to create log file"); + + let config = ConfigBuilder::new() + .set_time_format_custom(format_description!( + "[[[year]-[month]-[day] [hour]:[minute]:[second].[subsecond digits:5]]" + )) + .build(); + + let logger = WriteLogger::new(LevelFilter::Debug, config, log_file); + + Log2 { logger } + } + + fn debug(&self, message: &str) { + self.logger.log( + &Record::builder() + .args(format_args!("{}", message)) + .level(Level::Debug) + .build(), + ); + } +} + +fn main() { + let log2 = Log2::new(); + + log2.debug("Debug-Message in Logger 2"); + + // std::thread::spawn(move || { + // log2.debug("Error-Message in Logger 2"); + // }); + + CombinedLogger::init(vec![ + TermLogger::new( + LevelFilter::Debug, + Config::default(), + TerminalMode::Mixed, + ColorChoice::Auto, + ), + LogMailer::new(LevelFilter::Info, Config::default()), + ]) + .unwrap(); + + info!("Info in Logger 1"); + warn!("Warning in Logger 1"); +} diff --git a/ffplayout/src/db/models.rs b/ffplayout/src/db/models.rs index da000bbb..a65aa674 100644 --- a/ffplayout/src/db/models.rs +++ b/ffplayout/src/db/models.rs @@ -1,4 +1,3 @@ -use ffplayout_lib::utils::ProcessControl; use regex::Regex; use serde::{ de::{self, Visitor}, @@ -103,7 +102,7 @@ where deserializer.deserialize_any(StringOrNumberVisitor) } -#[derive(Debug, Deserialize, Serialize, sqlx::FromRow)] +#[derive(Clone, Debug, Default, Deserialize, Serialize, sqlx::FromRow)] pub struct Channel { #[serde(skip_deserializing)] pub id: i32, @@ -112,12 +111,10 @@ pub struct Channel { pub config_path: String, pub extra_extensions: String, pub active: bool, + pub modified: Option, + pub time_shift: f64, #[sqlx(default)] #[serde(default)] pub utc_offset: i32, - - #[serde(skip_serializing, skip_deserializing)] - #[sqlx(skip)] - pub control: ProcessControl, } diff --git a/ffplayout/src/lib.rs b/ffplayout/src/lib.rs index 3809055f..9c9c6e75 100644 --- a/ffplayout/src/lib.rs +++ b/ffplayout/src/lib.rs @@ -6,6 +6,7 @@ use sysinfo::{Disks, Networks, System}; pub mod api; pub mod db; +pub mod player; pub mod sse; pub mod utils; diff --git a/ffplayout/src/main.rs b/ffplayout/src/main.rs index b35a1152..ce9de1ea 100644 --- a/ffplayout/src/main.rs +++ b/ffplayout/src/main.rs @@ -1,10 +1,4 @@ -use std::{ - collections::HashSet, - env, - process::exit, - sync::{atomic::Ordering, Arc}, - thread, -}; +use std::{collections::HashSet, env, process::exit, sync::Arc, thread}; use actix_files::Files; use actix_web::{ @@ -23,6 +17,7 @@ use tokio::sync::Mutex; use ffplayout::{ api::{auth, routes::*}, db::{db_pool, handles, models::LoginUser}, + player::controller::ChannelController, sse::{broadcast::Broadcaster, routes::*, AuthState}, utils::{control::ProcessControl, db_path, init_config, run_args}, ARGS, @@ -76,22 +71,20 @@ async fn main() -> std::io::Result<()> { } }; + let channel_controller = ChannelController::new(); let mut channels = handles::select_all_channels(&pool) .await .unwrap_or_default(); for channel in channels.iter_mut() { - // TODO: maybe run here the player - channel - .control - .is_alive - .store(channel.active, Ordering::SeqCst) + let channel_clone = channel.clone(); + if channel.active { + thread::spawn(move || { + println!("{channel_clone:?}"); + }); + } } - thread::spawn(move || { - println!("{channels:?}"); - }); - if let Some(conn) = &ARGS.listen { if db_path().is_err() { error!("Database is not initialized! Init DB first and add admin user."); diff --git a/ffplayout/src/player/controller.rs b/ffplayout/src/player/controller.rs new file mode 100644 index 00000000..01948bd4 --- /dev/null +++ b/ffplayout/src/player/controller.rs @@ -0,0 +1,172 @@ +use std::{ + fmt, + process::Child, + sync::{atomic::AtomicBool, Arc, Mutex}, +}; + +#[cfg(not(windows))] +use signal_child::Signalable; + +use serde::{Deserialize, Serialize}; +// use simplelog::*; + +use crate::db::models::Channel; +use crate::utils::errors::ProcessError; + +/// Defined process units. +#[derive(Clone, Debug, Default, Copy, Eq, Serialize, Deserialize, PartialEq)] +pub enum ProcessUnit { + #[default] + Decoder, + Encoder, + Ingest, +} + +impl fmt::Display for ProcessUnit { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match *self { + ProcessUnit::Decoder => write!(f, "Decoder"), + ProcessUnit::Encoder => write!(f, "Encoder"), + ProcessUnit::Ingest => write!(f, "Ingest"), + } + } +} + +use ProcessUnit::*; + +#[derive(Clone, Debug, Default)] +pub struct ChannelManager { + pub channel: Arc>, + pub decoder: Arc>>, + pub encoder: Arc>>, + pub ingest: Arc>>, + pub ingest_is_running: Arc, + pub is_terminated: Arc, + pub is_alive: Arc, +} + +impl ChannelManager { + pub fn new(channel: Channel) -> Self { + Self { + channel: Arc::new(Mutex::new(channel)), + is_alive: Arc::new(AtomicBool::new(true)), + ..Default::default() + } + } +} + +#[derive(Clone, Debug, Default)] +pub struct ChannelController { + pub channels: Vec, +} + +impl ChannelController { + pub fn new() -> Self { + Self { channels: vec![] } + } + + pub fn add(&mut self, manager: ChannelManager) { + self.channels.push(manager); + } + + pub fn remove(&mut self, channel_id: i32) { + self.channels.retain(|manager| { + let channel = manager.channel.lock().unwrap(); + channel.id != channel_id + }); + } + + pub fn update_from(&mut self, other: &Channel, channel_id: i32) { + self.channels.iter_mut().for_each(|c| { + let mut channel = c.channel.lock().unwrap(); + + if channel.id == channel_id { + channel.name = other.name.clone(); + channel.preview_url = other.preview_url.clone(); + channel.config_path = other.config_path.clone(); + channel.extra_extensions = other.extra_extensions.clone(); + channel.active = other.active.clone(); + channel.utc_offset = other.utc_offset.clone(); + } + }) + } + + pub fn stop(mut self, channel_id: i32, unit: ProcessUnit) -> Result<(), ProcessError> { + for manager in self.channels.iter_mut() { + let mut channel = manager.channel.lock().unwrap(); + + if channel.id == channel_id { + match unit { + Decoder => { + if let Some(proc) = manager.decoder.lock().unwrap().as_mut() { + #[cfg(not(windows))] + proc.term() + .map_err(|e| ProcessError::Custom(format!("Decoder: {e}")))?; + + #[cfg(windows)] + proc.kill() + .map_err(|e| ProcessError::Custom(format!("Decoder: {e}")))?; + } + } + Encoder => { + if let Some(proc) = manager.encoder.lock().unwrap().as_mut() { + proc.kill() + .map_err(|e| ProcessError::Custom(format!("Encoder: {e}")))?; + } + } + Ingest => { + if let Some(proc) = manager.ingest.lock().unwrap().as_mut() { + proc.kill() + .map_err(|e| ProcessError::Custom(format!("Ingest: {e}")))?; + } + } + } + + channel.active = false; + } + } + + self.wait(channel_id, unit)?; + + Ok(()) + } + + /// Wait for process to proper close. + /// This prevents orphaned/zombi processes in system + pub fn wait(mut self, channel_id: i32, unit: ProcessUnit) -> Result<(), ProcessError> { + for manager in self.channels.iter_mut() { + let channel = manager.channel.lock().unwrap(); + + if channel.id == channel_id { + match unit { + Decoder => { + if let Some(proc) = manager.decoder.lock().unwrap().as_mut() { + proc.wait() + .map_err(|e| ProcessError::Custom(format!("Decoder: {e}")))?; + } + } + Encoder => { + if let Some(proc) = manager.encoder.lock().unwrap().as_mut() { + proc.wait() + .map_err(|e| ProcessError::Custom(format!("Encoder: {e}")))?; + } + } + Ingest => { + if let Some(proc) = manager.ingest.lock().unwrap().as_mut() { + proc.wait() + .map_err(|e| ProcessError::Custom(format!("Ingest: {e}")))?; + } + } + } + } + } + + Ok(()) + } +} + +pub fn play(controller: &mut ChannelController, channel: Channel) { + let manager = ChannelManager::new(channel); + + controller.add(manager); +} diff --git a/ffplayout/src/player/mod.rs b/ffplayout/src/player/mod.rs index e69de29b..cb9e0ac5 100644 --- a/ffplayout/src/player/mod.rs +++ b/ffplayout/src/player/mod.rs @@ -0,0 +1 @@ +pub mod controller; diff --git a/ffplayout/src/utils/errors.rs b/ffplayout/src/utils/errors.rs index dc5d418d..18845f06 100644 --- a/ffplayout/src/utils/errors.rs +++ b/ffplayout/src/utils/errors.rs @@ -1,3 +1,5 @@ +use std::io; + use actix_web::{error::ResponseError, Error, HttpResponse}; use derive_more::Display; @@ -103,3 +105,19 @@ impl From for ServiceError { ServiceError::BadRequest(err.to_string()) } } + +#[derive(Debug, Display)] +pub enum ProcessError { + #[display(fmt = "Failed to spawn ffmpeg/ffprobe. {}", _0)] + CommandSpawn(io::Error), + #[display(fmt = "IO error: {}", _0)] + IO(io::Error), + #[display(fmt = "{}", _0)] + Custom(String), +} + +impl From for ProcessError { + fn from(err: std::io::Error) -> ProcessError { + ProcessError::IO(err) + } +}