diff --git a/Cargo.lock b/Cargo.lock index ac4216f3..bd3e5adc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -196,7 +196,7 @@ dependencies = [ [[package]] name = "ffplayout-engine" -version = "0.9.4" +version = "0.9.5" dependencies = [ "chrono", "clap", @@ -466,9 +466,9 @@ dependencies = [ [[package]] name = "http" -version = "0.2.6" +version = "0.2.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "31f4c6746584866f0feabcc69893c5b51beef3831656a968ed7ae254cdc4fd03" +checksum = "ff8670570af52249509a86f5e3e18a08c60b177071826898fde8997cf5f6bfbb" dependencies = [ "bytes", "fnv", @@ -488,9 +488,9 @@ dependencies = [ [[package]] name = "httparse" -version = "1.7.0" +version = "1.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6330e8a36bd8c859f3fa6d9382911fbb7147ec39807f63b923933a247240b9ba" +checksum = "496ce29bb5a52785b44e0f7ca2847ae0bb839c9bd28f69acac9b99d461c0c04c" [[package]] name = "httpdate" @@ -997,9 +997,9 @@ dependencies = [ [[package]] name = "pin-project-lite" -version = "0.2.8" +version = "0.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e280fbe77cc62c91527259e9442153f4688736748d24660126286329742b4c6c" +checksum = "e0a7ae3ac2f1173085d398531c705756c94a4c56843785df85a60c1a0afac116" [[package]] name = "pin-utils" @@ -1342,15 +1342,16 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" [[package]] name = "tokio" -version = "1.17.0" +version = "1.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2af73ac49756f3f7c01172e34a23e5d0216f6c32333757c2c61feb2bbff5a5ee" +checksum = "0f48b6d60512a392e34dbf7fd456249fd2de3c83669ab642e021903f4015185b" dependencies = [ "bytes", "libc", "memchr", "mio 0.8.2", "num_cpus", + "once_cell", "pin-project-lite", "socket2", "winapi 0.3.9", @@ -1424,9 +1425,9 @@ dependencies = [ [[package]] name = "unicode-bidi" -version = "0.3.7" +version = "0.3.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a01404663e3db436ed2746d9fefef640d868edae3cceb81c3b8d5732fda678f" +checksum = "099b7128301d285f79ddd55b9a83d5e6b9e97c92e0ea0daebee7263e932de992" [[package]] name = "unicode-normalization" diff --git a/Cargo.toml b/Cargo.toml index 6fb0b364..b377a6f5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,7 +4,7 @@ description = "24/7 playout based on rust and ffmpeg" license = "GPL-3.0" authors = ["Jonathan Baecker jonbae77@gmail.com"] readme = "README.md" -version = "0.9.4" +version = "0.9.5" edition = "2021" [dependencies] diff --git a/src/input/folder.rs b/src/input/folder.rs index ee0be6c3..29dfe3da 100644 --- a/src/input/folder.rs +++ b/src/input/folder.rs @@ -21,15 +21,18 @@ use walkdir::WalkDir; use crate::utils::{get_sec, GlobalConfig, Media}; +/// Folder Sources +/// +/// Like playlist source, we create here a folder list for iterate over it. #[derive(Debug, Clone)] -pub struct Source { +pub struct FolderSource { config: GlobalConfig, pub nodes: Arc>>, current_node: Media, index: Arc, } -impl Source { +impl FolderSource { pub fn new(current_list: Arc>>, global_index: Arc) -> Self { let config = GlobalConfig::global(); let mut media_list = vec![]; @@ -114,7 +117,8 @@ impl Source { } } -impl Iterator for Source { +/// Create iterator for folder source +impl Iterator for FolderSource { type Item = Media; fn next(&mut self) -> Option { @@ -159,6 +163,9 @@ fn file_extension(filename: &Path) -> Option<&str> { filename.extension().and_then(OsStr::to_str) } +/// Create a watcher, which monitor file changes. +/// When a change is register, update the current file list. +/// This makes it possible, to play infinitely and and always new files to it. pub fn watchman(sources: Arc>>) { let config = GlobalConfig::global(); let (tx, rx) = channel(); diff --git a/src/input/ingest.rs b/src/input/ingest.rs index 66ecc59d..682e436b 100644 --- a/src/input/ingest.rs +++ b/src/input/ingest.rs @@ -11,6 +11,9 @@ use simplelog::*; use crate::utils::{stderr_reader, GlobalConfig, Ingest, ProcessControl}; +/// Overlay Filter +/// +/// When a logo is set, we create here the filter for the server. fn overlay(config: &GlobalConfig) -> String { let mut logo_chain = String::new(); @@ -29,6 +32,9 @@ fn overlay(config: &GlobalConfig) -> String { logo_chain } +/// Audio Filter +/// +/// If needed we add audio filters to the server instance. fn audio_filter(config: &GlobalConfig) -> String { let mut audio_chain = ";[0:a]afade=in:st=0:d=0.5".to_string(); @@ -51,6 +57,9 @@ fn audio_filter(config: &GlobalConfig) -> String { audio_chain } +/// ffmpeg Ingest Server +/// +/// Start ffmpeg in listen mode, and wait for input. pub fn ingest_server( log_format: String, ingest_sender: Sender<(usize, [u8; 65088])>, @@ -116,7 +125,6 @@ pub fn ingest_server( let error_reader_thread = thread::spawn(move || stderr_reader(server_err, "Server")); *proc_control.server_term.lock().unwrap() = Some(server_proc); - is_running = false; loop { diff --git a/src/input/mod.rs b/src/input/mod.rs index f4112487..d610ec03 100644 --- a/src/input/mod.rs +++ b/src/input/mod.rs @@ -12,10 +12,11 @@ pub mod folder; pub mod ingest; pub mod playlist; -pub use folder::{watchman, Source}; +pub use folder::{watchman, FolderSource}; pub use ingest::ingest_server; pub use playlist::CurrentProgram; +/// Create a source iterator from playlist, or from folder. pub fn source_generator( config: GlobalConfig, current_list: Arc>>, @@ -28,8 +29,10 @@ pub fn source_generator( info!("Playout in folder mode"); debug!("Monitor folder: {}", &config.storage.path); - let folder_source = Source::new(current_list, index); + let folder_source = FolderSource::new(current_list, index); let node_clone = folder_source.nodes.clone(); + + // Spawn a thread to monitor folder for file changes. thread::spawn(move || watchman(node_clone)); Box::new(folder_source) as Box> @@ -47,7 +50,7 @@ pub fn source_generator( } _ => { error!("Process Mode not exists!"); - process::exit(0x0100); + process::exit(1); } }; diff --git a/src/input/playlist.rs b/src/input/playlist.rs index 4a488485..68380daf 100644 --- a/src/input/playlist.rs +++ b/src/input/playlist.rs @@ -1,7 +1,10 @@ use std::{ fs, path::Path, - sync::{atomic::{AtomicBool, AtomicUsize, Ordering}, Arc, Mutex}, + sync::{ + atomic::{AtomicBool, AtomicUsize, Ordering}, + Arc, Mutex, + }, }; use serde_json::json; @@ -12,6 +15,9 @@ use crate::utils::{ seek_and_length, GlobalConfig, Media, PlayoutStatus, DUMMY_LEN, }; +/// Struct for current playlist. +/// +/// Here we prepare the init clip and build a iterator where we pull our clips. #[derive(Debug)] pub struct CurrentProgram { config: GlobalConfig, @@ -63,6 +69,7 @@ impl CurrentProgram { } } + // Check if playlist file got updated, and when yes we reload it and setup everything in place. fn check_update(&mut self, seek: bool) { if self.json_path.is_none() { let json = read_json(None, self.is_terminated.clone(), seek, 0.0); @@ -115,6 +122,7 @@ impl CurrentProgram { } } + // Check if day is past and it is time for a new playlist. fn check_for_next_playlist(&mut self) { let current_time = get_sec(); let start_sec = self.config.playlist.start_sec.unwrap(); @@ -158,6 +166,7 @@ impl CurrentProgram { } } + // Check if last and/or next clip is a advertisement. fn last_next_ad(&mut self) { let index = self.index.load(Ordering::SeqCst); let current_list = self.nodes.lock().unwrap(); @@ -184,6 +193,8 @@ impl CurrentProgram { } } + // Get current time and when we are before start time, + // we add full seconds of a day to it. fn get_current_time(&mut self) -> f64 { let mut time_sec = get_sec(); @@ -194,6 +205,7 @@ impl CurrentProgram { time_sec } + // On init or reload we need to seek for the current clip. fn get_current_clip(&mut self) { let mut time_sec = self.get_current_time(); @@ -216,6 +228,7 @@ impl CurrentProgram { } } + // Prepare init clip. fn init_clip(&mut self) { self.get_current_clip(); @@ -232,6 +245,7 @@ impl CurrentProgram { } } +/// Build the playlist iterator impl Iterator for CurrentProgram { type Item = Media; @@ -245,7 +259,7 @@ impl Iterator for CurrentProgram { } if self.playout_stat.list_init.load(Ordering::SeqCst) { - // on init load playlist, could be not long enough, + // On init load, playlist could be not long enough, // so we check if we can take the next playlist already, // or we fill the gap with a dummy. let list_length = self.nodes.lock().unwrap().len(); @@ -357,16 +371,16 @@ impl Iterator for CurrentProgram { } } +/// Prepare input clip: +/// +/// - check begin and length from clip +/// - return clip only if we are in 24 hours time range fn timed_source( node: Media, config: &GlobalConfig, last: bool, playout_stat: &PlayoutStatus, ) -> Media { - // prepare input clip - // check begin and length from clip - // return clip only if we are in 24 hours time range - let (delta, total_delta) = get_delta(&node.begin.unwrap()); let mut shifted_delta = delta; let mut new_node = node.clone(); @@ -412,6 +426,7 @@ fn timed_source( new_node } +/// Generate the source CMD, or when clip not exist, get a dummy. fn gen_source(mut node: Media) -> Media { if Path::new(&node.source).is_file() { node.add_probe(); @@ -440,10 +455,9 @@ fn gen_source(mut node: Media) -> Media { node } +/// Handle init clip, but this clip can be the last one in playlist, +/// this we have to figure out and calculate the right length. fn handle_list_init(mut node: Media) -> Media { - // handle init clip, but this clip can be the last one in playlist, - // this we have to figure out and calculate the right length - let (_, total_delta) = get_delta(&node.begin.unwrap()); let mut out = node.out; @@ -457,11 +471,10 @@ fn handle_list_init(mut node: Media) -> Media { new_node } +/// when we come to last clip in playlist, +/// or when we reached total playtime, +/// we end up here fn handle_list_end(mut node: Media, total_delta: f64) -> Media { - // when we come to last clip in playlist, - // or when we reached total playtime, - // we end up here - debug!("Playlist end"); let mut out = if node.seek > 0.0 { diff --git a/src/output/desktop.rs b/src/output/desktop.rs index 3afd04b1..a94fa2f5 100644 --- a/src/output/desktop.rs +++ b/src/output/desktop.rs @@ -5,6 +5,9 @@ use simplelog::*; use crate::filter::v_drawtext; use crate::utils::{GlobalConfig, Media}; +/// Desktop Output +/// +/// Instead of streaming, we run a ffplay instance and play on desktop. pub fn output(log_format: &str) -> process::Child { let config = GlobalConfig::global(); diff --git a/src/output/hls.rs b/src/output/hls.rs index 51d5869b..1dfb92cf 100644 --- a/src/output/hls.rs +++ b/src/output/hls.rs @@ -30,6 +30,9 @@ use crate::utils::{ sec_to_time, stderr_reader, GlobalConfig, PlayerControl, PlayoutStatus, ProcessControl, }; +/// HLS Writer +/// +/// Write with single ffmpeg instance directly to a HLS playlist. pub fn write_hls( play_control: PlayerControl, playout_stat: PlayoutStatus, diff --git a/src/output/mod.rs b/src/output/mod.rs index cd21a38b..fd2ece80 100644 --- a/src/output/mod.rs +++ b/src/output/mod.rs @@ -21,6 +21,15 @@ use crate::utils::{ ProcessControl, }; +/// Player +/// +/// Here we create the input file loop, from playlist, or folder source. +/// Then we read the stdout from the reader ffmpeg instance +/// and write it to the stdin from the streamer ffmpeg instance. +/// If it is configured we also fire up a ffmpeg ingest server instance, +/// for getting live feeds. +/// When a live ingest arrive, it stops the current playing and switch to the live source. +/// When ingest stops, it switch back to playlist/folder mode. pub fn player( play_control: PlayerControl, playout_stat: PlayoutStatus, @@ -33,6 +42,7 @@ pub fn player( let mut live_on = false; let playlist_init = playout_stat.list_init.clone(); + // get source iterator let get_source = source_generator( config.clone(), play_control.current_list.clone(), @@ -41,6 +51,7 @@ pub fn player( proc_control.is_terminated.clone(), ); + // get ffmpeg output instance let mut enc_proc = match config.out.mode.as_str() { "desktop" => desktop::output(&ff_log_format), "stream" => stream::output(&ff_log_format), @@ -49,16 +60,17 @@ pub fn player( let mut enc_writer = BufWriter::new(enc_proc.stdin.take().unwrap()); let enc_err = BufReader::new(enc_proc.stderr.take().unwrap()); + + // spawn a thread to log ffmpeg output error messages let error_encoder_thread = thread::spawn(move || stderr_reader(enc_err, "Encoder")); *proc_control.decoder_term.lock().unwrap() = Some(enc_proc); - - let ff_log_format_c = ff_log_format.clone(); let proc_control_c = proc_control.clone(); let mut ingest_receiver = None; + // spawn a thread for ffmpeg ingest server and create a channel for package sending if config.ingest.enable { let (ingest_sender, rx) = bounded(96); ingest_receiver = Some(rx); @@ -98,6 +110,7 @@ pub fn player( dec_cmd.join(" ") ); + // create ffmpeg decoder instance, for reading the input files let mut dec_proc = match Command::new("ffmpeg") .args(dec_cmd) .stdout(Stdio::piped()) @@ -118,6 +131,7 @@ pub fn player( *proc_control.decoder_term.lock().unwrap() = Some(dec_proc); loop { + // when server is running, read from channel if proc_control.server_is_running.load(Ordering::SeqCst) { if !live_on { info!("Switch from {} to live ingest", config.processing.mode); @@ -141,6 +155,7 @@ pub fn player( break 'source_iter; }; } + // read from decoder instance } else { if live_on { info!("Switch from live ingest to {}", config.processing.mode); diff --git a/src/output/stream.rs b/src/output/stream.rs index 8dc7b276..52abff98 100644 --- a/src/output/stream.rs +++ b/src/output/stream.rs @@ -8,6 +8,9 @@ use simplelog::*; use crate::filter::v_drawtext; use crate::utils::{GlobalConfig, Media}; +/// Streaming Output +/// +/// Prepare the ffmpeg command for streaming output pub fn output(log_format: &str) -> process::Child { let config = GlobalConfig::global(); let mut enc_filter: Vec = vec![]; diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index f15ecc20..f57d6415 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -13,6 +13,7 @@ use crate::utils::{ PlayoutStatus, ProcessControl, }; +/// map media struct to json object fn get_media_map(media: Media) -> Value { json!({ "seek": media.seek, @@ -23,6 +24,7 @@ fn get_media_map(media: Media) -> Value { }) } +/// prepare json object for response fn get_data_map(config: &GlobalConfig, media: Media) -> Map { let mut data_map = Map::new(); let begin = media.begin.unwrap_or(0.0); @@ -45,6 +47,14 @@ fn get_data_map(config: &GlobalConfig, media: Media) -> Map { data_map } +/// JSON RPC Server +/// +/// A simple rpc server for getting status information and controlling player: +/// +/// - current clip information +/// - jump to next clip +/// - get last clip +/// - reset player state to original clip pub fn json_rpc_server( play_control: PlayerControl, playout_stat: PlayoutStatus, @@ -61,6 +71,7 @@ pub fn json_rpc_server( let current_date = playout_stat.current_date.lock().unwrap().clone(); let mut date = playout_stat.date.lock().unwrap(); + // get next clip if map.contains_key("control") && &map["control"] == "next" { let index = play.index.load(Ordering::SeqCst); @@ -98,6 +109,7 @@ pub fn json_rpc_server( return Ok(Value::String("Last clip can not be skipped".to_string())); } + // get last clip if map.contains_key("control") && &map["control"] == "back" { let index = play.index.load(Ordering::SeqCst); @@ -135,6 +147,7 @@ pub fn json_rpc_server( return Ok(Value::String("Clip index out of range".to_string())); } + // reset player state if map.contains_key("control") && &map["control"] == "reset" { if let Some(proc) = proc.decoder_term.lock().unwrap().as_mut() { if let Err(e) = proc.kill() { @@ -161,6 +174,7 @@ pub fn json_rpc_server( return Ok(Value::String("Reset playout state failed".to_string())); } + // get infos about current clip if map.contains_key("media") && &map["media"] == "current" { if let Some(media) = play.current_media.lock().unwrap().clone() { let data_map = get_data_map(config, media); @@ -169,6 +183,7 @@ pub fn json_rpc_server( }; } + // get infos about next clip if map.contains_key("media") && &map["media"] == "next" { let index = play.index.load(Ordering::SeqCst); @@ -183,6 +198,7 @@ pub fn json_rpc_server( return Ok(Value::String("There is no next clip".to_string())); } + // get infos about last clip if map.contains_key("media") && &map["media"] == "last" { let index = play.index.load(Ordering::SeqCst); @@ -201,10 +217,12 @@ pub fn json_rpc_server( Ok(Value::String("No, or wrong parameters set!".to_string())) }); + // build rpc server let server = ServerBuilder::new(io) .cors(DomainsValidation::AllowOnly(vec![ AccessControlAllowOrigin::Null, ])) + // add middleware, for authentication .request_middleware(|request: hyper::Request| { if request.headers().contains_key("authorization") && request.headers()["authorization"] == config.rpc_server.authorization diff --git a/src/utils/generator.rs b/src/utils/generator.rs index 8736f72e..43747afe 100644 --- a/src/utils/generator.rs +++ b/src/utils/generator.rs @@ -17,7 +17,7 @@ use std::{ use chrono::{Duration, NaiveDate}; use simplelog::*; -use crate::input::Source; +use crate::input::FolderSource; use crate::utils::{json_serializer::Playlist, GlobalConfig, Media}; @@ -78,7 +78,7 @@ pub fn generate_playlist(mut date_range: Vec) { date_range = get_date_range(&date_range) } - let media_list = Source::new(current_list, index); + let media_list = FolderSource::new(current_list, index); let list_length = media_list.nodes.lock().unwrap().len(); for date in date_range {