implement controller

This commit is contained in:
jb-alvarado 2024-05-28 22:20:47 +02:00
parent 7ffb44263b
commit 167b434e52
9 changed files with 449 additions and 22 deletions

139
Cargo.lock generated
View File

@ -1195,6 +1195,16 @@ version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5"
[[package]]
name = "erased-serde"
version = "0.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "24e2389d65ab4fab27dc2a5de7b191e1f6617d1f1c8855c0dc569c94a4cbb18d"
dependencies = [
"serde",
"typeid",
]
[[package]]
name = "errno"
version = "0.3.9"
@ -1313,6 +1323,7 @@ dependencies = [
"lazy_static",
"lexical-sort",
"local-ip-address",
"log",
"once_cell",
"parking_lot",
"path-clean",
@ -1324,10 +1335,12 @@ dependencies = [
"sanitize-filename",
"serde",
"serde_json",
"signal-child",
"simplelog",
"sqlx",
"static-files",
"sysinfo",
"time",
"tokio",
"tokio-stream",
"toml_edit",
@ -2220,6 +2233,9 @@ version = "0.4.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c"
dependencies = [
"serde",
"sval",
"sval_ref",
"value-bag",
]
@ -3094,6 +3110,15 @@ dependencies = [
"syn 2.0.63",
]
[[package]]
name = "serde_fmt"
version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e1d4ddca14104cd60529e8c7f7ba71a2c8acd8f7f5cfcdc2faf97eeb7c3010a4"
dependencies = [
"serde",
]
[[package]]
name = "serde_html_form"
version = "0.2.6"
@ -3567,6 +3592,84 @@ version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "81cdd64d312baedb58e21336b31bc043b77e01cc99033ce76ef539f78e965ebc"
[[package]]
name = "sval"
version = "2.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "53eb957fbc79a55306d5d25d87daf3627bc3800681491cda0709eef36c748bfe"
[[package]]
name = "sval_buffer"
version = "2.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "96e860aef60e9cbf37888d4953a13445abf523c534640d1f6174d310917c410d"
dependencies = [
"sval",
"sval_ref",
]
[[package]]
name = "sval_dynamic"
version = "2.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ea3f2b07929a1127d204ed7cb3905049381708245727680e9139dac317ed556f"
dependencies = [
"sval",
]
[[package]]
name = "sval_fmt"
version = "2.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c4e188677497de274a1367c4bda15bd2296de4070d91729aac8f0a09c1abf64d"
dependencies = [
"itoa",
"ryu",
"sval",
]
[[package]]
name = "sval_json"
version = "2.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32f456c07dae652744781f2245d5e3b78e6a9ebad70790ac11eb15dbdbce5282"
dependencies = [
"itoa",
"ryu",
"sval",
]
[[package]]
name = "sval_nested"
version = "2.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "886feb24709f0476baaebbf9ac10671a50163caa7e439d7a7beb7f6d81d0a6fb"
dependencies = [
"sval",
"sval_buffer",
"sval_ref",
]
[[package]]
name = "sval_ref"
version = "2.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "be2e7fc517d778f44f8cb64140afa36010999565528d48985f55e64d45f369ce"
dependencies = [
"sval",
]
[[package]]
name = "sval_serde"
version = "2.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "79bf66549a997ff35cd2114a27ac4b0c2843280f2cfa84b240d169ecaa0add46"
dependencies = [
"serde",
"sval",
"sval_nested",
]
[[package]]
name = "syn"
version = "1.0.109"
@ -3889,6 +3992,12 @@ version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b"
[[package]]
name = "typeid"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "059d83cc991e7a42fc37bd50941885db0888e34209f8cfd9aab07ddec03bc9cf"
[[package]]
name = "typenum"
version = "1.17.0"
@ -3986,6 +4095,36 @@ name = "value-bag"
version = "1.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a84c137d37ab0142f0f2ddfe332651fdbf252e7b7dbb4e67b6c1f1b2e925101"
dependencies = [
"value-bag-serde1",
"value-bag-sval2",
]
[[package]]
name = "value-bag-serde1"
version = "1.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ccacf50c5cb077a9abb723c5bcb5e0754c1a433f1e1de89edc328e2760b6328b"
dependencies = [
"erased-serde",
"serde",
"serde_fmt",
]
[[package]]
name = "value-bag-sval2"
version = "1.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1785bae486022dfb9703915d42287dcb284c1ee37bd1080eeba78cc04721285b"
dependencies = [
"sval",
"sval_buffer",
"sval_dynamic",
"sval_fmt",
"sval_json",
"sval_ref",
"sval_serde",
]
[[package]]
name = "vcpkg"

View File

@ -32,6 +32,7 @@ jsonwebtoken = "9"
lazy_static = "1.4"
lexical-sort = "0.3"
local-ip-address = "0.6"
log = { version = "0.4", features = ["std", "serde", "kv", "kv_std", "kv_sval", "kv_serde"] }
once_cell = "1.18"
parking_lot = "0.12"
path-clean = "1.0"
@ -47,11 +48,15 @@ simplelog = { version = "0.12", features = ["paris"] }
static-files = "0.2"
sysinfo ={ version = "0.30", features = ["linux-netdevs"] }
sqlx = { version = "0.7", features = ["runtime-tokio", "sqlite"] }
time = { version = "0.3", features = ["formatting", "macros"] }
tokio = { version = "1.29", features = ["full"] }
tokio-stream = "0.1"
toml_edit = {version ="0.22", features = ["serde"]}
uuid = "1.8"
[target.'cfg(not(target_arch = "windows"))'.dependencies]
signal-child = "1"
[build-dependencies]
static-files = "0.2"

View File

@ -0,0 +1,101 @@
use log::{LevelFilter, Log, Metadata, Record};
use simplelog::*;
use std::fs::File;
pub struct LogMailer {
level: LevelFilter,
pub config: Config,
}
impl LogMailer {
pub fn new(log_level: LevelFilter, config: Config) -> Box<LogMailer> {
Box::new(LogMailer {
level: log_level,
config,
})
}
}
impl Log for LogMailer {
fn enabled(&self, metadata: &Metadata<'_>) -> bool {
metadata.level() <= self.level
}
fn log(&self, record: &Record<'_>) {
if self.enabled(record.metadata()) {
let _rec = record.args().to_string();
println!("{record:?}");
println!("target: {:?}", record.target());
}
}
fn flush(&self) {}
}
impl SharedLogger for LogMailer {
fn level(&self) -> LevelFilter {
self.level
}
fn config(&self) -> Option<&Config> {
Some(&self.config)
}
fn as_log(self: Box<Self>) -> Box<dyn Log> {
Box::new(*self)
}
}
struct Log2 {
logger: Box<WriteLogger<File>>,
}
impl Log2 {
fn new() -> Self {
let log_file = File::create("log_file.log").expect("Failed to create log file");
let config = ConfigBuilder::new()
.set_time_format_custom(format_description!(
"[[[year]-[month]-[day] [hour]:[minute]:[second].[subsecond digits:5]]"
))
.build();
let logger = WriteLogger::new(LevelFilter::Debug, config, log_file);
Log2 { logger }
}
fn debug(&self, message: &str) {
self.logger.log(
&Record::builder()
.args(format_args!("{}", message))
.level(Level::Debug)
.build(),
);
}
}
fn main() {
let log2 = Log2::new();
log2.debug("Debug-Message in Logger 2");
// std::thread::spawn(move || {
// log2.debug("Error-Message in Logger 2");
// });
CombinedLogger::init(vec![
TermLogger::new(
LevelFilter::Debug,
Config::default(),
TerminalMode::Mixed,
ColorChoice::Auto,
),
LogMailer::new(LevelFilter::Info, Config::default()),
])
.unwrap();
info!("Info in Logger 1");
warn!("Warning in Logger 1");
}

View File

@ -1,4 +1,3 @@
use ffplayout_lib::utils::ProcessControl;
use regex::Regex;
use serde::{
de::{self, Visitor},
@ -103,7 +102,7 @@ where
deserializer.deserialize_any(StringOrNumberVisitor)
}
#[derive(Debug, Deserialize, Serialize, sqlx::FromRow)]
#[derive(Clone, Debug, Default, Deserialize, Serialize, sqlx::FromRow)]
pub struct Channel {
#[serde(skip_deserializing)]
pub id: i32,
@ -112,12 +111,10 @@ pub struct Channel {
pub config_path: String,
pub extra_extensions: String,
pub active: bool,
pub modified: Option<String>,
pub time_shift: f64,
#[sqlx(default)]
#[serde(default)]
pub utc_offset: i32,
#[serde(skip_serializing, skip_deserializing)]
#[sqlx(skip)]
pub control: ProcessControl,
}

View File

@ -6,6 +6,7 @@ use sysinfo::{Disks, Networks, System};
pub mod api;
pub mod db;
pub mod player;
pub mod sse;
pub mod utils;

View File

@ -1,10 +1,4 @@
use std::{
collections::HashSet,
env,
process::exit,
sync::{atomic::Ordering, Arc},
thread,
};
use std::{collections::HashSet, env, process::exit, sync::Arc, thread};
use actix_files::Files;
use actix_web::{
@ -23,6 +17,7 @@ use tokio::sync::Mutex;
use ffplayout::{
api::{auth, routes::*},
db::{db_pool, handles, models::LoginUser},
player::controller::ChannelController,
sse::{broadcast::Broadcaster, routes::*, AuthState},
utils::{control::ProcessControl, db_path, init_config, run_args},
ARGS,
@ -76,22 +71,20 @@ async fn main() -> std::io::Result<()> {
}
};
let channel_controller = ChannelController::new();
let mut channels = handles::select_all_channels(&pool)
.await
.unwrap_or_default();
for channel in channels.iter_mut() {
// TODO: maybe run here the player
channel
.control
.is_alive
.store(channel.active, Ordering::SeqCst)
let channel_clone = channel.clone();
if channel.active {
thread::spawn(move || {
println!("{channel_clone:?}");
});
}
}
thread::spawn(move || {
println!("{channels:?}");
});
if let Some(conn) = &ARGS.listen {
if db_path().is_err() {
error!("Database is not initialized! Init DB first and add admin user.");

View File

@ -0,0 +1,172 @@
use std::{
fmt,
process::Child,
sync::{atomic::AtomicBool, Arc, Mutex},
};
#[cfg(not(windows))]
use signal_child::Signalable;
use serde::{Deserialize, Serialize};
// use simplelog::*;
use crate::db::models::Channel;
use crate::utils::errors::ProcessError;
/// Defined process units.
#[derive(Clone, Debug, Default, Copy, Eq, Serialize, Deserialize, PartialEq)]
pub enum ProcessUnit {
#[default]
Decoder,
Encoder,
Ingest,
}
impl fmt::Display for ProcessUnit {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
ProcessUnit::Decoder => write!(f, "Decoder"),
ProcessUnit::Encoder => write!(f, "Encoder"),
ProcessUnit::Ingest => write!(f, "Ingest"),
}
}
}
use ProcessUnit::*;
#[derive(Clone, Debug, Default)]
pub struct ChannelManager {
pub channel: Arc<Mutex<Channel>>,
pub decoder: Arc<Mutex<Option<Child>>>,
pub encoder: Arc<Mutex<Option<Child>>>,
pub ingest: Arc<Mutex<Option<Child>>>,
pub ingest_is_running: Arc<AtomicBool>,
pub is_terminated: Arc<AtomicBool>,
pub is_alive: Arc<AtomicBool>,
}
impl ChannelManager {
pub fn new(channel: Channel) -> Self {
Self {
channel: Arc::new(Mutex::new(channel)),
is_alive: Arc::new(AtomicBool::new(true)),
..Default::default()
}
}
}
#[derive(Clone, Debug, Default)]
pub struct ChannelController {
pub channels: Vec<ChannelManager>,
}
impl ChannelController {
pub fn new() -> Self {
Self { channels: vec![] }
}
pub fn add(&mut self, manager: ChannelManager) {
self.channels.push(manager);
}
pub fn remove(&mut self, channel_id: i32) {
self.channels.retain(|manager| {
let channel = manager.channel.lock().unwrap();
channel.id != channel_id
});
}
pub fn update_from(&mut self, other: &Channel, channel_id: i32) {
self.channels.iter_mut().for_each(|c| {
let mut channel = c.channel.lock().unwrap();
if channel.id == channel_id {
channel.name = other.name.clone();
channel.preview_url = other.preview_url.clone();
channel.config_path = other.config_path.clone();
channel.extra_extensions = other.extra_extensions.clone();
channel.active = other.active.clone();
channel.utc_offset = other.utc_offset.clone();
}
})
}
pub fn stop(mut self, channel_id: i32, unit: ProcessUnit) -> Result<(), ProcessError> {
for manager in self.channels.iter_mut() {
let mut channel = manager.channel.lock().unwrap();
if channel.id == channel_id {
match unit {
Decoder => {
if let Some(proc) = manager.decoder.lock().unwrap().as_mut() {
#[cfg(not(windows))]
proc.term()
.map_err(|e| ProcessError::Custom(format!("Decoder: {e}")))?;
#[cfg(windows)]
proc.kill()
.map_err(|e| ProcessError::Custom(format!("Decoder: {e}")))?;
}
}
Encoder => {
if let Some(proc) = manager.encoder.lock().unwrap().as_mut() {
proc.kill()
.map_err(|e| ProcessError::Custom(format!("Encoder: {e}")))?;
}
}
Ingest => {
if let Some(proc) = manager.ingest.lock().unwrap().as_mut() {
proc.kill()
.map_err(|e| ProcessError::Custom(format!("Ingest: {e}")))?;
}
}
}
channel.active = false;
}
}
self.wait(channel_id, unit)?;
Ok(())
}
/// Wait for process to proper close.
/// This prevents orphaned/zombi processes in system
pub fn wait(mut self, channel_id: i32, unit: ProcessUnit) -> Result<(), ProcessError> {
for manager in self.channels.iter_mut() {
let channel = manager.channel.lock().unwrap();
if channel.id == channel_id {
match unit {
Decoder => {
if let Some(proc) = manager.decoder.lock().unwrap().as_mut() {
proc.wait()
.map_err(|e| ProcessError::Custom(format!("Decoder: {e}")))?;
}
}
Encoder => {
if let Some(proc) = manager.encoder.lock().unwrap().as_mut() {
proc.wait()
.map_err(|e| ProcessError::Custom(format!("Encoder: {e}")))?;
}
}
Ingest => {
if let Some(proc) = manager.ingest.lock().unwrap().as_mut() {
proc.wait()
.map_err(|e| ProcessError::Custom(format!("Ingest: {e}")))?;
}
}
}
}
}
Ok(())
}
}
pub fn play(controller: &mut ChannelController, channel: Channel) {
let manager = ChannelManager::new(channel);
controller.add(manager);
}

View File

@ -0,0 +1 @@
pub mod controller;

View File

@ -1,3 +1,5 @@
use std::io;
use actix_web::{error::ResponseError, Error, HttpResponse};
use derive_more::Display;
@ -103,3 +105,19 @@ impl From<uuid::Error> for ServiceError {
ServiceError::BadRequest(err.to_string())
}
}
#[derive(Debug, Display)]
pub enum ProcessError {
#[display(fmt = "Failed to spawn ffmpeg/ffprobe. {}", _0)]
CommandSpawn(io::Error),
#[display(fmt = "IO error: {}", _0)]
IO(io::Error),
#[display(fmt = "{}", _0)]
Custom(String),
}
impl From<std::io::Error> for ProcessError {
fn from(err: std::io::Error) -> ProcessError {
ProcessError::IO(err)
}
}