diff --git a/Cargo.lock b/Cargo.lock index beeaa855..250b7828 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -167,7 +167,7 @@ dependencies = [ [[package]] name = "ffplayout-rs" -version = "0.9.1" +version = "0.9.2" dependencies = [ "chrono", "clap", @@ -179,7 +179,6 @@ dependencies = [ "notify", "once_cell", "openssl", - "process_control", "rand", "regex", "serde", @@ -1028,21 +1027,11 @@ dependencies = [ "unicode-xid", ] -[[package]] -name = "process_control" -version = "3.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26d4fa9c62a51815c9588b09a94f713c1e9a87d74142537d7c7d5ee972b8479f" -dependencies = [ - "libc", - "windows-sys", -] - [[package]] name = "quote" -version = "1.0.17" +version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "632d02bff7f874a36f33ea8bb416cd484b90cc66c1194b1a1110d067a7013f58" +checksum = "a1feb54ed693b93a84e14094943b84b7c4eae204c512b7ccb95ab0c66d278ad1" dependencies = [ "proc-macro2", ] @@ -1375,9 +1364,9 @@ checksum = "360dfd1d6d30e05fda32ace2c8c70e9c0a9da713275777f5a4dbb8a1893930c6" [[package]] name = "tracing" -version = "0.1.32" +version = "0.1.33" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4a1bdf54a7c28a2bbf701e1d2233f6c77f473486b94bee4f9678da5a148dca7f" +checksum = "80b9fa4360528139bc96100c160b7ae879f5567f49f1782b0b02035b0358ebf3" dependencies = [ "cfg-if 1.0.0", "pin-project-lite", @@ -1517,49 +1506,6 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" -[[package]] -name = "windows-sys" -version = "0.33.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43dbb096663629518eb1dfa72d80243ca5a6aca764cae62a2df70af760a9be75" -dependencies = [ - "windows_aarch64_msvc", - "windows_i686_gnu", - "windows_i686_msvc", - "windows_x86_64_gnu", - "windows_x86_64_msvc", -] - -[[package]] -name = "windows_aarch64_msvc" -version = "0.33.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cd761fd3eb9ab8cc1ed81e56e567f02dd82c4c837e48ac3b2181b9ffc5060807" - -[[package]] -name = "windows_i686_gnu" -version = "0.33.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cab0cf703a96bab2dc0c02c0fa748491294bf9b7feb27e1f4f96340f208ada0e" - -[[package]] -name = "windows_i686_msvc" -version = "0.33.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8cfdbe89cc9ad7ce618ba34abc34bbb6c36d99e96cae2245b7943cd75ee773d0" - -[[package]] -name = "windows_x86_64_gnu" -version = "0.33.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b4dd9b0c0e9ece7bb22e84d70d01b71c6d6248b81a3c60d11869451b4cb24784" - -[[package]] -name = "windows_x86_64_msvc" -version = "0.33.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ff1e4aa646495048ec7f3ffddc411e1d829c026a2ec62b39da15c1055e406eaa" - [[package]] name = "ws2_32-sys" version = "0.2.1" diff --git a/Cargo.toml b/Cargo.toml index c23d5a08..50fb70fb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,10 @@ [package] name = "ffplayout-rs" -version = "0.9.1" +description = "24/7 playout based on rust and ffmpeg" +license = "GPL-3.0" +authors = ["Jonathan Baecker jonbae77@gmail.com"] +readme = "README.md" +version = "0.9.2" edition = "2021" [dependencies] @@ -13,7 +17,6 @@ lettre = "0.10.0-rc.5" log = "0.4" notify = "4.0" once_cell = "1.10" -process_control = "3.3" rand = "0.8" regex = "1" serde = { version = "1.0", features = ["derive"] } @@ -35,3 +38,35 @@ path = "src/main.rs" opt-level = 3 strip = true lto = true + +# DEBIAN DEB PACKAGE +[package.metadata.deb] +name = "ffplayout-engine" +priority = "optional" +section = "net" +license-file = ["LICENSE", "3"] +depends = "" +suggests = "ffmpeg" +maintainer-scripts = "package/debian/" +copyright = "Copyright (c) 2022, Jonathan Baecker. All rights reserved." +assets = [ + ["target/x86_64-unknown-linux-musl/release/ffplayout", "/usr/bin/ffplayout", "755"], + ["assets/ffplayout.yml", "/etc/ffplayout/ffplayout.yml", "644"], + ["assets/logo.png", "/usr/share/ffplayout/logo.png", "644"], + ["README.md", "/usr/share/doc/ffplayout-engine/README", "644"], + ["package/common/ffplayout-engine.service.preset", "/lib/systemd/system-preset/50-ffplayout-engine.preset", "644"], +] +systemd-units = { unit-name = "ffplayout-engine", unit-scripts = "package/common", enable = false } + +# REHL RPM PACKAGE +[package.metadata.generate-rpm] +name = "ffplayout-engine" +license = "GPL-3.0" +assets = [ + { source = "target/x86_64-unknown-linux-musl/release/ffplayout", dest = "/usr/bin/ffplayout", mode = "755" }, + { source = "assets/ffplayout.yml", dest = "/etc/ffplayout/ffplayout.yml", mode = "644" }, + { source = "package/common/ffplayout-engine.service", dest = "/lib/systemd/system/ffplayout-engine.service", mode = "644" }, + { source = "README.md", dest = "/usr/share/doc/ffplayout-engine/README", mode = "644", doc = true }, + { source = "LICENSE", dest = "/usr/share/doc/ffplayout-engine/LICENSE", mode = "644" }, + { source = "assets/logo.png", dest = "/usr/share/ffplayout/logo.png", mode = "644" }, +] diff --git a/assets/ffplayout.yml b/assets/ffplayout.yml index 028eefba..d04bc270 100644 --- a/assets/ffplayout.yml +++ b/assets/ffplayout.yml @@ -36,7 +36,7 @@ logging: log_to_file: false backup_count: 7 local_time: true - timestamp: true + timestamp: false log_path: /var/log/ffplayout/ log_level: DEBUG ffmpeg_level: error diff --git a/cross_compile_all.sh b/cross_compile_all.sh index 17900ecd..d359124a 100755 --- a/cross_compile_all.sh +++ b/cross_compile_all.sh @@ -39,3 +39,17 @@ for target in "${targets[@]}"; do echo "" done + +echo "Create debian package" +echo "" + +cargo deb --target=x86_64-unknown-linux-musl + +mv ./target/x86_64-unknown-linux-musl/debian/ffplayout-engine_${version}_amd64.deb . + +echo "Create rhel package" +echo "" + +cargo generate-rpm --target=x86_64-unknown-linux-musl + +mv ./target/x86_64-unknown-linux-musl/generate-rpm/ffplayout-engine-${version}-1.x86_64.rpm . diff --git a/docs/developer.md b/docs/developer.md index 749a3e95..30d99aa2 100644 --- a/docs/developer.md +++ b/docs/developer.md @@ -66,3 +66,19 @@ CC="aarch64-apple-darwin20.4-clang -arch arm64e" cargo build --release --target= # for x86_64 CC="o64-clang" cargo build --release --target=x86_64-apple-darwin ``` + +### Create debian DEB and RHEL RPM packages + +install: +- `cargo install cargo-deb` +- `cargo install cargo-generate-rpm` + +And run with: + +```Bash +# for debian based systems: +cargo deb --target=x86_64-unknown-linux-musl + +# for rhel based systems: +cargo generate-rpm --target=x86_64-unknown-linux-musl +``` diff --git a/assets/ffplayout_engine.service b/package/common/ffplayout-engine.service similarity index 67% rename from assets/ffplayout_engine.service rename to package/common/ffplayout-engine.service index a4445d45..0471626a 100644 --- a/assets/ffplayout_engine.service +++ b/package/common/ffplayout-engine.service @@ -1,9 +1,9 @@ [Unit] -Description=python and ffmpeg based playout +Description=Rust based 24/7 playout solution After=network.target [Service] -ExecStart= /usr/local/bin/ffplayout +ExecStart= /usr/bin/ffplayout ExecReload=/bin/kill -1 $MAINPID Restart=always RestartSec=1 diff --git a/package/common/ffplayout-engine.service.preset b/package/common/ffplayout-engine.service.preset new file mode 100644 index 00000000..e922c231 --- /dev/null +++ b/package/common/ffplayout-engine.service.preset @@ -0,0 +1 @@ +disable ffplayout-engine.service diff --git a/src/filter/mod.rs b/src/filter/mod.rs index 7ff433bf..03edeb9c 100644 --- a/src/filter/mod.rs +++ b/src/filter/mod.rs @@ -142,7 +142,7 @@ fn fade(node: &mut Media, chain: &mut Filters, codec_type: &str) { fn overlay(node: &mut Media, chain: &mut Filters, config: &GlobalConfig) { if config.processing.add_logo && Path::new(&config.processing.logo).is_file() - && node.category != "advertisement".to_string() + && &node.category.clone().unwrap_or(String::new()) != "advertisement" { let opacity = format!( "format=rgba,colorchannelmixer=aa={}", diff --git a/src/input/folder.rs b/src/input/folder.rs index 8c2773cd..f0ab30cf 100644 --- a/src/input/folder.rs +++ b/src/input/folder.rs @@ -1,13 +1,19 @@ -use notify::DebouncedEvent::{Create, Remove, Rename}; +use notify::{ + DebouncedEvent::{Create, Remove, Rename}, + {watcher, RecursiveMode, Watcher}, +}; + use rand::{seq::SliceRandom, thread_rng}; use simplelog::*; use std::{ ffi::OsStr, path::Path, sync::{ - mpsc::Receiver, + mpsc::channel, {Arc, Mutex}, }, + thread::sleep, + time::Duration, }; use walkdir::WalkDir; @@ -23,10 +29,7 @@ pub struct Source { } impl Source { - pub fn new( - current_list: Arc>>, - global_index: Arc>, - ) -> Self { + pub fn new(current_list: Arc>>, global_index: Arc>) -> Self { let config = GlobalConfig::global(); let mut media_list = vec![]; let mut index: usize = 0; @@ -141,40 +144,61 @@ fn file_extension(filename: &Path) -> Option<&str> { filename.extension().and_then(OsStr::to_str) } -pub async fn file_worker( - receiver: Receiver, +pub async fn watchman( sources: Arc>>, + is_terminated: Arc>, ) { - while let Ok(res) = receiver.recv() { - match res { - Create(new_path) => { - let index = sources.lock().unwrap().len(); - let media = Media::new(index, new_path.display().to_string(), false); + let config = GlobalConfig::global(); + let (tx, rx) = channel(); - sources.lock().unwrap().push(media); - info!("Create new file: {new_path:?}"); - } - Remove(old_path) => { - sources - .lock() - .unwrap() - .retain(|x| x.source != old_path.display().to_string()); - info!("Remove file: {old_path:?}"); - } - Rename(old_path, new_path) => { - let index = sources - .lock() - .unwrap() - .iter() - .position(|x| *x.source == old_path.display().to_string()) - .unwrap(); + let path = config.storage.path.clone(); - let media = Media::new(index, new_path.display().to_string(), false); - sources.lock().unwrap()[index] = media; + if !Path::new(&path).exists() { + error!("Folder path not exists: '{path}'"); + panic!("Folder path not exists: '{path}'"); + } - info!("Rename file: {old_path:?} to {new_path:?}"); - } - _ => (), + let mut watcher = watcher(tx, Duration::from_secs(2)).unwrap(); + watcher.watch(path, RecursiveMode::Recursive).unwrap(); + + loop { + if *is_terminated.lock().unwrap() { + break } + + if let Ok(res) = rx.try_recv() { + match res { + Create(new_path) => { + let index = sources.lock().unwrap().len(); + let media = Media::new(index, new_path.display().to_string(), false); + + sources.lock().unwrap().push(media); + info!("Create new file: {new_path:?}"); + } + Remove(old_path) => { + sources + .lock() + .unwrap() + .retain(|x| x.source != old_path.display().to_string()); + info!("Remove file: {old_path:?}"); + } + Rename(old_path, new_path) => { + let index = sources + .lock() + .unwrap() + .iter() + .position(|x| *x.source == old_path.display().to_string()) + .unwrap(); + + let media = Media::new(index, new_path.display().to_string(), false); + sources.lock().unwrap()[index] = media; + + info!("Rename file: {old_path:?} to {new_path:?}"); + } + _ => (), + } + } + + sleep(Duration::from_secs(4)); } } diff --git a/src/input/ingest.rs b/src/input/ingest.rs index dcb3adf7..8dbbedb2 100644 --- a/src/input/ingest.rs +++ b/src/input/ingest.rs @@ -2,16 +2,15 @@ use std::{ io::{BufReader, Error, Read}, path::Path, process::{Command, Stdio}, - sync::{mpsc::SyncSender}, + sync::mpsc::SyncSender, thread::sleep, time::Duration, }; -use process_control::ChildExt; use simplelog::*; use tokio::runtime::Handle; -use crate::utils::{stderr_reader, GlobalConfig, ProcessControl}; +use crate::utils::{stderr_reader, GlobalConfig, Ingest, ProcessControl}; fn overlay(config: &GlobalConfig) -> String { let mut logo_chain = String::new(); @@ -57,7 +56,7 @@ pub async fn ingest_server( log_format: String, ingest_sender: SyncSender<(usize, [u8; 65088])>, rt_handle: Handle, - proc_control: ProcessControl, + mut proc_control: ProcessControl, ) -> Result<(), Error> { let config = GlobalConfig::global(); let mut buffer: [u8; 65088] = [0; 65088]; @@ -96,7 +95,10 @@ pub async fn ingest_server( stream_input.last().unwrap() ); - debug!("Server CMD: \"ffmpeg {}\"", server_cmd.join(" ")); + debug!( + "Server CMD: \"ffmpeg {}\"", + server_cmd.join(" ") + ); loop { if *proc_control.is_terminated.lock().unwrap() { @@ -115,15 +117,11 @@ pub async fn ingest_server( Ok(proc) => proc, }; - let serv_terminator = server_proc.terminator()?; - *proc_control.server_term.lock().unwrap() = Some(serv_terminator); - - rt_handle.spawn(stderr_reader( - server_proc.stderr.take().unwrap(), - "Server", - )); + rt_handle.spawn(stderr_reader(server_proc.stderr.take().unwrap(), "Server")); let mut ingest_reader = BufReader::new(server_proc.stdout.take().unwrap()); + *proc_control.server_term.lock().unwrap() = Some(server_proc); + is_running = false; loop { @@ -131,7 +129,6 @@ pub async fn ingest_server( Ok(length) => length, Err(e) => { debug!("Ingest server read {e:?}"); - break; } }; @@ -153,17 +150,15 @@ pub async fn ingest_server( } } + drop(ingest_reader); + *proc_control.server_is_running.lock().unwrap() = false; sleep(Duration::from_secs(1)); - if let Err(e) = server_proc.kill() { - error!("Ingest server {e:?}") - }; - - if let Err(e) = server_proc.wait() { - error!("Ingest server {e:?}") - }; + if let Err(e) = proc_control.wait(Ingest) { + error!("{e}") + } } Ok(()) diff --git a/src/input/mod.rs b/src/input/mod.rs index c5273137..3782db94 100644 --- a/src/input/mod.rs +++ b/src/input/mod.rs @@ -1,7 +1,56 @@ +use std::{ + process, + sync::{Arc, Mutex}, +}; + +use simplelog::*; +use tokio::runtime::Handle; + +use crate::utils::{GlobalConfig, Media, PlayoutStatus}; + pub mod folder; pub mod ingest; pub mod playlist; +pub use folder::{watchman, Source}; pub use ingest::ingest_server; -pub use folder::{file_worker, Source}; pub use playlist::CurrentProgram; + +pub fn source_generator( + rt_handle: &Handle, + config: GlobalConfig, + current_list: Arc>>, + index: Arc>, + playout_stat: PlayoutStatus, + is_terminated: Arc>, +) -> Box> { + let get_source = match config.processing.clone().mode.as_str() { + "folder" => { + info!("Playout in folder mode"); + debug!("Monitor folder: {}", &config.storage.path); + + let folder_source = Source::new(current_list, index); + rt_handle.spawn(watchman(folder_source.nodes.clone(), is_terminated.clone())); + + Box::new(folder_source) as Box> + } + "playlist" => { + info!("Playout in playlist mode"); + let program = CurrentProgram::new( + rt_handle.clone(), + playout_stat, + is_terminated.clone(), + current_list, + index, + ); + + Box::new(program) as Box> + } + _ => { + error!("Process Mode not exists!"); + process::exit(0x0100); + } + }; + + get_source +} diff --git a/src/input/playlist.rs b/src/input/playlist.rs index a4b5cccf..b63fda94 100644 --- a/src/input/playlist.rs +++ b/src/input/playlist.rs @@ -179,13 +179,15 @@ impl CurrentProgram { let index = *self.index.lock().unwrap(); let current_list = self.nodes.lock().unwrap(); - if index + 1 < current_list.len() && ¤t_list[index + 1].category == "advertisement" { + if index + 1 < current_list.len() + && ¤t_list[index + 1].category.clone().unwrap_or(String::new()) == "advertisement" + { self.current_node.next_ad = Some(true); } if index > 0 && index < current_list.len() - && ¤t_list[index - 1].category == "advertisement" + && ¤t_list[index - 1].category.clone().unwrap_or(String::new()) == "advertisement" { self.current_node.last_ad = Some(true); } diff --git a/src/main.rs b/src/main.rs index 1fe5ee48..e57d5147 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,11 +1,11 @@ +extern crate log; +extern crate simplelog; + use std::{ path::PathBuf, {fs, fs::File}, }; -extern crate log; -extern crate simplelog; - use serde::{Deserialize, Serialize}; use serde_json::json; use simplelog::*; diff --git a/src/output/hls.rs b/src/output/hls.rs index b92fb03d..79079dbe 100644 --- a/src/output/hls.rs +++ b/src/output/hls.rs @@ -17,12 +17,14 @@ out: */ -use std::process::{Command, Stdio}; +use std::{ + process::{Command, Stdio}, +}; use simplelog::*; use tokio::runtime::Handle; -use crate::output::source_generator; +use crate::input::source_generator; use crate::utils::{ sec_to_time, stderr_reader, GlobalConfig, PlayerControl, PlayoutStatus, ProcessControl, }; diff --git a/src/output/mod.rs b/src/output/mod.rs index 32e26b3a..ea922923 100644 --- a/src/output/mod.rs +++ b/src/output/mod.rs @@ -1,18 +1,11 @@ -use notify::{watcher, RecursiveMode, Watcher}; use std::{ io::{prelude::*, BufReader, BufWriter, Read}, - path::Path, - process, process::{Command, Stdio}, - sync::{ - mpsc::{channel, sync_channel, Receiver, SyncSender}, - Arc, Mutex, - }, + sync::mpsc::{sync_channel, Receiver, SyncSender}, thread::sleep, time::Duration, }; -use process_control::ChildExt; use simplelog::*; use tokio::runtime::Handle; @@ -22,69 +15,17 @@ mod stream; pub use hls::write_hls; -use crate::input::{file_worker, ingest_server, CurrentProgram, Source}; +use crate::input::{ingest_server, source_generator}; use crate::utils::{ - sec_to_time, stderr_reader, GlobalConfig, Media, PlayerControl, PlayoutStatus, ProcessControl, + sec_to_time, stderr_reader, Decoder, Encoder, GlobalConfig, PlayerControl, PlayoutStatus, + ProcessControl, }; -pub fn source_generator( - rt_handle: &Handle, - config: GlobalConfig, - current_list: Arc>>, - index: Arc>, - playout_stat: PlayoutStatus, - is_terminated: Arc>, -) -> Box> { - let get_source = match config.processing.clone().mode.as_str() { - "folder" => { - let path = config.storage.path.clone(); - if !Path::new(&path).exists() { - error!("Folder path not exists: '{path}'"); - process::exit(0x0100); - } - - info!("Playout in folder mode."); - - let folder_source = Source::new(current_list, index); - - let (sender, receiver) = channel(); - let mut watchman = watcher(sender, Duration::from_secs(2)).unwrap(); - watchman - .watch(path.clone(), RecursiveMode::Recursive) - .unwrap(); - - debug!("Monitor folder: {}", path); - - rt_handle.spawn(file_worker(receiver, folder_source.nodes.clone())); - - Box::new(folder_source) as Box> - } - "playlist" => { - info!("Playout in playlist mode"); - let program = CurrentProgram::new( - rt_handle.clone(), - playout_stat, - is_terminated.clone(), - current_list, - index, - ); - - Box::new(program) as Box> - } - _ => { - error!("Process Mode not exists!"); - process::exit(0x0100); - } - }; - - get_source -} - pub fn player( rt_handle: &Handle, play_control: PlayerControl, playout_stat: PlayoutStatus, - proc_control: ProcessControl, + mut proc_control: ProcessControl, ) { let config = GlobalConfig::global(); let dec_settings = config.processing.clone().settings.unwrap(); @@ -109,11 +50,8 @@ pub fn player( }; let mut enc_writer = BufWriter::new(enc_proc.stdin.take().unwrap()); - - rt_handle.spawn(stderr_reader( - enc_proc.stderr.take().unwrap(), - "Encoder", - )); + rt_handle.spawn(stderr_reader(enc_proc.stderr.take().unwrap(), "Encoder")); + *proc_control.decoder_term.lock().unwrap() = Some(enc_proc); let (ingest_sender, ingest_receiver): ( SyncSender<(usize, [u8; 65088])>, @@ -176,15 +114,8 @@ pub fn player( }; let mut dec_reader = BufReader::new(dec_proc.stdout.take().unwrap()); - - rt_handle.spawn(stderr_reader( - dec_proc.stderr.take().unwrap(), - "Decoder", - )); - - if let Ok(dec_terminator) = dec_proc.terminator() { - *proc_control.decoder_term.lock().unwrap() = Some(dec_terminator); - }; + rt_handle.spawn(stderr_reader(dec_proc.stderr.take().unwrap(), "Decoder")); + *proc_control.decoder_term.lock().unwrap() = Some(dec_proc); loop { if *proc_control.server_is_running.lock().unwrap() { @@ -195,13 +126,9 @@ pub fn player( error!("Encoder error: {e}") } - if let Err(e) = dec_proc.kill() { - error!("Decoder error: {e}") - }; - - if let Err(e) = dec_proc.wait() { - error!("Decoder error: {e}") - }; + if let Err(e) = proc_control.kill(Decoder) { + error!("{e}") + } live_on = true; @@ -247,18 +174,14 @@ pub fn player( } } - if let Err(e) = dec_proc.wait() { - panic!("Decoder error: {e:?}") - }; + if let Err(e) = proc_control.wait(Decoder) { + error!("{e}") + } } sleep(Duration::from_secs(1)); - if let Err(e) = enc_proc.kill() { - panic!("Encoder error: {e:?}") - }; - - if let Err(e) = enc_proc.wait() { - panic!("Encoder error: {e:?}") - }; + if let Err(e) = proc_control.kill(Encoder) { + error!("{e}") + } } diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index 295a3753..7957a648 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -1,10 +1,7 @@ -use std::sync::{Arc, Mutex}; - use jsonrpc_http_server::jsonrpc_core::{IoHandler, Params, Value}; use jsonrpc_http_server::{ hyper, AccessControlAllowOrigin, DomainsValidation, Response, RestApi, ServerBuilder, }; -use process_control::Terminator; use serde_json::{json, Map}; use simplelog::*; @@ -45,19 +42,6 @@ fn get_data_map(config: &GlobalConfig, media: Media) -> Map { data_map } -fn kill_decoder(terminator: Arc>>) -> Result<(), String> { - match &*terminator.lock().unwrap() { - Some(decoder) => unsafe { - if let Err(e) = decoder.terminate() { - return Err(format!("Terminate decoder: {e}")); - } - }, - None => return Err("No decoder terminator found".to_string()), - } - - Ok(()) -} - pub async fn json_rpc_server( play_control: PlayerControl, playout_stat: PlayoutStatus, @@ -78,7 +62,15 @@ pub async fn json_rpc_server( let index = *play.index.lock().unwrap(); if index < play.current_list.lock().unwrap().len() { - if let Ok(_) = kill_decoder(proc.decoder_term.clone()) { + if let Some(proc) = proc.decoder_term.lock().unwrap().as_mut() { + if let Err(e) = proc.kill() { + error!("Decoder {e:?}") + }; + + if let Err(e) = proc.wait() { + error!("Decoder {e:?}") + }; + info!("Move to next clip"); let mut data_map = Map::new(); @@ -107,7 +99,15 @@ pub async fn json_rpc_server( let index = *play.index.lock().unwrap(); if index > 1 && play.current_list.lock().unwrap().len() > 1 { - if let Ok(_) = kill_decoder(proc.decoder_term.clone()) { + if let Some(proc) = proc.decoder_term.lock().unwrap().as_mut() { + if let Err(e) = proc.kill() { + error!("Decoder {e:?}") + }; + + if let Err(e) = proc.wait() { + error!("Decoder {e:?}") + }; + info!("Move to last clip"); let mut data_map = Map::new(); let mut media = play.current_list.lock().unwrap()[index - 2].clone(); @@ -133,7 +133,15 @@ pub async fn json_rpc_server( } if map.contains_key("control") && &map["control"] == "reset" { - if let Ok(_) = kill_decoder(proc.decoder_term.clone()) { + if let Some(proc) = proc.decoder_term.lock().unwrap().as_mut() { + if let Err(e) = proc.kill() { + error!("Decoder {e:?}") + }; + + if let Err(e) = proc.wait() { + error!("Decoder {e:?}") + }; + info!("Reset playout to original state"); let mut data_map = Map::new(); *time_shift = 0.0; diff --git a/src/utils/arg_parse.rs b/src/utils/arg_parse.rs index 426de023..11f8ed69 100644 --- a/src/utils/arg_parse.rs +++ b/src/utils/arg_parse.rs @@ -2,7 +2,7 @@ use clap::Parser; #[derive(Parser, Debug)] #[clap(version, - about = "ffplayout, the rust playout solution", + about = "ffplayout, Rust based 24/7 playout solution", long_about = None)] pub struct Args { #[clap(short, long, help = "file path to ffplayout.conf")] diff --git a/src/utils/controller.rs b/src/utils/controller.rs new file mode 100644 index 00000000..e251eebf --- /dev/null +++ b/src/utils/controller.rs @@ -0,0 +1,181 @@ +use std::{ + fmt, + process::Child, + sync::{Arc, Mutex, RwLock}, + +}; + +use jsonrpc_http_server::CloseHandle; +use simplelog::*; + +use crate::utils::Media; + +pub enum ProcessUnit { + Decoder, + Encoder, + Ingest, +} + +impl fmt::Display for ProcessUnit { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match *self { + ProcessUnit::Decoder => write!(f, "Decoder"), + ProcessUnit::Encoder => write!(f, "Encoder"), + ProcessUnit::Ingest => write!(f, "Ingest"), + } + } +} + +use ProcessUnit::*; + +#[derive(Clone)] +pub struct ProcessControl { + pub decoder_term: Arc>>, + pub encoder_term: Arc>>, + pub server_term: Arc>>, + pub server_is_running: Arc>, + pub rpc_handle: Arc>>, + pub is_terminated: Arc>, + pub is_alive: Arc>, +} + +impl ProcessControl { + pub fn new() -> Self { + Self { + decoder_term: Arc::new(Mutex::new(None)), + encoder_term: Arc::new(Mutex::new(None)), + server_term: Arc::new(Mutex::new(None)), + server_is_running: Arc::new(Mutex::new(false)), + rpc_handle: Arc::new(Mutex::new(None)), + is_terminated: Arc::new(Mutex::new(false)), + is_alive: Arc::new(RwLock::new(true)), + } + } +} + +impl ProcessControl { + pub fn kill(&mut self, proc: ProcessUnit) -> Result<(), String> { + match proc { + Decoder => { + if let Some(proc) = self.decoder_term.lock().unwrap().as_mut() { + if let Err(e) = proc.kill() { + return Err(format!("Decoder {e:?}")); + }; + } + } + Encoder => { + if let Some(proc) = self.encoder_term.lock().unwrap().as_mut() { + if let Err(e) = proc.kill() { + return Err(format!("Encoder {e:?}")); + }; + } + } + Ingest => { + if let Some(proc) = self.server_term.lock().unwrap().as_mut() { + if let Err(e) = proc.kill() { + return Err(format!("Ingest server {e:?}")); + }; + } + } + } + + if let Err(e) = self.wait(proc) { + return Err(e); + }; + + Ok(()) + } + + pub fn wait(&mut self, proc: ProcessUnit) -> Result<(), String> { + match proc { + Decoder => { + if let Some(proc) = self.decoder_term.lock().unwrap().as_mut() { + if let Err(e) = proc.wait() { + return Err(format!("Decoder {e:?}")); + }; + } + } + Encoder => { + if let Some(proc) = self.encoder_term.lock().unwrap().as_mut() { + if let Err(e) = proc.wait() { + return Err(format!("Encoder {e:?}")); + }; + } + } + Ingest => { + if let Some(proc) = self.server_term.lock().unwrap().as_mut() { + if let Err(e) = proc.wait() { + return Err(format!("Ingest server {e:?}")); + }; + } + } + } + + Ok(()) + } + + pub fn kill_all(&mut self) { + *self.is_terminated.lock().unwrap() = true; + + if *self.is_alive.read().unwrap() { + *self.is_alive.write().unwrap() = false; + + if let Some(rpc) = &*self.rpc_handle.lock().unwrap() { + rpc.clone().close() + }; + + for unit in [ + Decoder, + Encoder, + Ingest, + ] { + if let Err(e) = self.kill(unit) { + error!("{e}") + } + } + } + } +} + +impl Drop for ProcessControl { + fn drop(&mut self) { + self.kill_all() + } +} + +#[derive(Clone)] +pub struct PlayerControl { + pub current_media: Arc>>, + pub current_list: Arc>>, + pub index: Arc>, +} + +impl PlayerControl { + pub fn new() -> Self { + Self { + current_media: Arc::new(Mutex::new(None)), + current_list: Arc::new(Mutex::new(vec![Media::new(0, String::new(), false)])), + index: Arc::new(Mutex::new(0)), + } + } +} + +#[derive(Clone, Debug)] +pub struct PlayoutStatus { + pub time_shift: Arc>, + pub date: Arc>, + pub current_date: Arc>, + pub list_init: Arc>, +} + +impl PlayoutStatus { + pub fn new() -> Self { + Self { + time_shift: Arc::new(Mutex::new(0.0)), + date: Arc::new(Mutex::new(String::new())), + current_date: Arc::new(Mutex::new(String::new())), + list_init: Arc::new(Mutex::new(true)), + } + } +} + diff --git a/src/utils/logging.rs b/src/utils/logging.rs index c097a4e1..7c0a1c17 100644 --- a/src/utils/logging.rs +++ b/src/utils/logging.rs @@ -208,7 +208,7 @@ pub fn init_logging( )); } - if config.mail.recipient.len() > 3 { + if config.mail.recipient.contains("@") && config.mail.recipient.contains(".") { let messages: Arc>> = Arc::new(Mutex::new(Vec::new())); let interval = config.mail.interval.clone(); diff --git a/src/utils/mod.rs b/src/utils/mod.rs index c0532312..e1f450be 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -8,13 +8,10 @@ use std::{ path::Path, process::exit, process::{ChildStderr, Command, Stdio}, - sync::{Arc, Mutex, RwLock}, time, time::UNIX_EPOCH, }; -use jsonrpc_http_server::CloseHandle; -use process_control::Terminator; use regex::Regex; use serde::{Deserialize, Serialize}; use serde_json::json; @@ -22,123 +19,20 @@ use simplelog::*; mod arg_parse; mod config; +pub mod controller; pub mod json_reader; mod json_validate; mod logging; pub use arg_parse::get_args; pub use config::{init_config, GlobalConfig}; +pub use controller::{PlayerControl, PlayoutStatus, ProcessControl, ProcessUnit::*}; pub use json_reader::{read_json, Playlist, DUMMY_LEN}; pub use json_validate::validate_playlist; pub use logging::init_logging; use crate::filter::filter_chains; -#[derive(Clone)] -pub struct ProcessControl { - pub decoder_term: Arc>>, - pub encoder_term: Arc>>, - pub server_term: Arc>>, - pub server_is_running: Arc>, - pub rpc_handle: Arc>>, - pub is_terminated: Arc>, - pub is_alive: Arc>, -} - -impl ProcessControl { - pub fn new() -> Self { - Self { - decoder_term: Arc::new(Mutex::new(None)), - encoder_term: Arc::new(Mutex::new(None)), - server_term: Arc::new(Mutex::new(None)), - server_is_running: Arc::new(Mutex::new(false)), - rpc_handle: Arc::new(Mutex::new(None)), - is_terminated: Arc::new(Mutex::new(false)), - is_alive: Arc::new(RwLock::new(true)), - } - } -} - -impl ProcessControl { - pub fn kill_all(&mut self) { - *self.is_terminated.lock().unwrap() = true; - - if *self.is_alive.read().unwrap() { - *self.is_alive.write().unwrap() = false; - - if let Some(rpc) = &*self.rpc_handle.lock().unwrap() { - rpc.clone().close() - }; - - if let Some(server) = &*self.server_term.lock().unwrap() { - unsafe { - if let Err(e) = server.terminate() { - error!("Ingest server: {e:?}"); - } - } - }; - - if let Some(decoder) = &*self.decoder_term.lock().unwrap() { - unsafe { - if let Err(e) = decoder.terminate() { - error!("Decoder: {e:?}"); - } - } - }; - - if let Some(encoder) = &*self.encoder_term.lock().unwrap() { - unsafe { - if let Err(e) = encoder.terminate() { - error!("Encoder: {e:?}"); - } - } - }; - } - } -} - -impl Drop for ProcessControl { - fn drop(&mut self) { - self.kill_all() - } -} - -#[derive(Clone, Debug)] -pub struct PlayoutStatus { - pub time_shift: Arc>, - pub date: Arc>, - pub current_date: Arc>, - pub list_init: Arc>, -} - -impl PlayoutStatus { - pub fn new() -> Self { - Self { - time_shift: Arc::new(Mutex::new(0.0)), - date: Arc::new(Mutex::new(String::new())), - current_date: Arc::new(Mutex::new(String::new())), - list_init: Arc::new(Mutex::new(true)), - } - } -} - -#[derive(Clone)] -pub struct PlayerControl { - pub current_media: Arc>>, - pub current_list: Arc>>, - pub index: Arc>, -} - -impl PlayerControl { - pub fn new() -> Self { - Self { - current_media: Arc::new(Mutex::new(None)), - current_list: Arc::new(Mutex::new(vec![Media::new(0, String::new(), false)])), - index: Arc::new(Mutex::new(0)), - } - } -} - #[derive(Debug, Serialize, Deserialize, Clone)] pub struct Media { pub begin: Option, @@ -147,7 +41,7 @@ pub struct Media { pub seek: f64, pub out: f64, pub duration: f64, - pub category: String, + pub category: Option, pub source: String, pub cmd: Option>, pub filter: Option>, @@ -177,7 +71,7 @@ impl Media { seek: 0.0, out: duration, duration: duration, - category: String::new(), + category: None, source: src.clone(), cmd: Some(vec!["-i".to_string(), src]), filter: Some(vec![]),