From c33ceda4eb1e848cf1cd6be24929fbd7a27e9888 Mon Sep 17 00:00:00 2001 From: jb-alvarado Date: Thu, 14 Apr 2022 21:23:51 +0200 Subject: [PATCH 1/8] default of rpc --- assets/ffplayout.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/assets/ffplayout.yml b/assets/ffplayout.yml index f8253787..44697edb 100644 --- a/assets/ffplayout.yml +++ b/assets/ffplayout.yml @@ -10,7 +10,7 @@ general: rpc_server: help_text: Run a JSON RPC server, for getting infos about current playing, and control for some functions. - enable: true + enable: false address: 127.0.0.1:7070 authorization: av2Kx8g67lF9qj5wEH3ym1bI4cCs From 2eb8f51139a1e29c294550dc61b234b29ce9a3ba Mon Sep 17 00:00:00 2001 From: jb-alvarado Date: Thu, 14 Apr 2022 21:24:02 +0200 Subject: [PATCH 2/8] check if storage path exists --- src/input/folder.rs | 29 +++++++++++++++++------------ 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/src/input/folder.rs b/src/input/folder.rs index 3f02bad9..3d9fe8b6 100644 --- a/src/input/folder.rs +++ b/src/input/folder.rs @@ -1,13 +1,7 @@ -use notify::{ - DebouncedEvent::{Create, Remove, Rename}, - {watcher, RecursiveMode, Watcher}, -}; - -use rand::{seq::SliceRandom, thread_rng}; -use simplelog::*; use std::{ ffi::OsStr, path::Path, + process::exit, sync::{ mpsc::channel, {Arc, Mutex}, @@ -16,6 +10,12 @@ use std::{ time::Duration, }; +use notify::{ + DebouncedEvent::{Create, Remove, Rename}, + {watcher, RecursiveMode, Watcher}, +}; +use rand::{seq::SliceRandom, thread_rng}; +use simplelog::*; use walkdir::WalkDir; use crate::utils::{get_sec, GlobalConfig, Media}; @@ -34,6 +34,14 @@ impl Source { let mut media_list = vec![]; let mut index: usize = 0; + if !Path::new(&config.storage.path).is_dir() { + error!( + "Path not exists: {}", + config.storage.path + ); + exit(1); + } + for entry in WalkDir::new(config.storage.path.clone()) .into_iter() .filter_map(|e| e.ok()) @@ -150,10 +158,7 @@ fn file_extension(filename: &Path) -> Option<&str> { filename.extension().and_then(OsStr::to_str) } -pub async fn watchman( - sources: Arc>>, - is_terminated: Arc>, -) { +pub async fn watchman(sources: Arc>>, is_terminated: Arc>) { let config = GlobalConfig::global(); let (tx, rx) = channel(); @@ -169,7 +174,7 @@ pub async fn watchman( loop { if *is_terminated.lock().unwrap() { - break + break; } if let Ok(res) = rx.try_recv() { From 68564256ffe6e5be9d17cded9dd998d24e2cf088 Mon Sep 17 00:00:00 2001 From: jb-alvarado Date: Thu, 14 Apr 2022 21:24:42 +0200 Subject: [PATCH 3/8] change channel size and add info about it too small value for sync_channel size increases CPU load, large values leave packets in queue which creates artifacts --- src/output/mod.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/output/mod.rs b/src/output/mod.rs index ea922923..2ff59b93 100644 --- a/src/output/mod.rs +++ b/src/output/mod.rs @@ -53,10 +53,12 @@ pub fn player( rt_handle.spawn(stderr_reader(enc_proc.stderr.take().unwrap(), "Encoder")); *proc_control.decoder_term.lock().unwrap() = Some(enc_proc); + // too small value for sync_channel size increases CPU load, + // large values leave packets in queue which creates artifacts let (ingest_sender, ingest_receiver): ( SyncSender<(usize, [u8; 65088])>, Receiver<(usize, [u8; 65088])>, - ) = sync_channel(8); + ) = sync_channel(16); if config.ingest.enable { rt_handle.spawn(ingest_server( From 423c8f996f26da9f5a6c96fcda50aa956e3f7b89 Mon Sep 17 00:00:00 2001 From: jb-alvarado Date: Thu, 14 Apr 2022 21:41:59 +0200 Subject: [PATCH 4/8] from extension --- src/utils/arg_parse.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/utils/arg_parse.rs b/src/utils/arg_parse.rs index 193c8356..cfa5146e 100644 --- a/src/utils/arg_parse.rs +++ b/src/utils/arg_parse.rs @@ -5,7 +5,7 @@ use clap::Parser; about = "ffplayout, Rust based 24/7 playout solution.\n\nRun without any command to use config file only, or with commands to override parameters.", long_about = None)] pub struct Args { - #[clap(short, long, help = "File path to ffplayout.conf")] + #[clap(short, long, help = "File path to ffplayout.yml")] pub config: Option, #[clap(short, long, help = "File path for logging")] From 790e22a50d7e354ba5921626eec1fc148ab19012 Mon Sep 17 00:00:00 2001 From: jb-alvarado Date: Mon, 18 Apr 2022 13:31:43 +0200 Subject: [PATCH 5/8] change sync_channel to crossbeam-channel flush channel after switching --- Cargo.lock | 21 +++++++++++++++++++++ Cargo.toml | 1 + src/input/ingest.rs | 4 ++-- src/output/mod.rs | 17 ++++++++--------- 4 files changed, 32 insertions(+), 11 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f5a46d11..5e65c567 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -147,6 +147,26 @@ dependencies = [ "cfg-if 1.0.0", ] +[[package]] +name = "crossbeam-channel" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5aaa7bd5fb665c6864b5f963dd9097905c54125909c7aa94c9e18507cdbe6c53" +dependencies = [ + "cfg-if 1.0.0", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bf124c720b7686e3c2663cf54062ab0f68a88af2fb6a030e87e30bf721fcb38" +dependencies = [ + "cfg-if 1.0.0", + "lazy_static", +] + [[package]] name = "email-encoding" version = "0.1.0" @@ -171,6 +191,7 @@ version = "0.9.3" dependencies = [ "chrono", "clap", + "crossbeam-channel", "ffprobe", "file-rotate", "jsonrpc-http-server", diff --git a/Cargo.toml b/Cargo.toml index de6f1b5c..ca634de8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,7 @@ edition = "2021" [dependencies] chrono = "0.4" clap = { version = "3.1", features = ["derive"] } +crossbeam-channel = "0.5" ffprobe = "0.3" file-rotate = "0.6" jsonrpc-http-server = "18.0" diff --git a/src/input/ingest.rs b/src/input/ingest.rs index 8dbbedb2..23348462 100644 --- a/src/input/ingest.rs +++ b/src/input/ingest.rs @@ -2,11 +2,11 @@ use std::{ io::{BufReader, Error, Read}, path::Path, process::{Command, Stdio}, - sync::mpsc::SyncSender, thread::sleep, time::Duration, }; +use crossbeam_channel::Sender; use simplelog::*; use tokio::runtime::Handle; @@ -54,7 +54,7 @@ fn audio_filter(config: &GlobalConfig) -> String { pub async fn ingest_server( log_format: String, - ingest_sender: SyncSender<(usize, [u8; 65088])>, + ingest_sender: Sender<(usize, [u8; 65088])>, rt_handle: Handle, mut proc_control: ProcessControl, ) -> Result<(), Error> { diff --git a/src/output/mod.rs b/src/output/mod.rs index 2ff59b93..19b309be 100644 --- a/src/output/mod.rs +++ b/src/output/mod.rs @@ -1,11 +1,11 @@ use std::{ io::{prelude::*, BufReader, BufWriter, Read}, process::{Command, Stdio}, - sync::mpsc::{sync_channel, Receiver, SyncSender}, thread::sleep, time::Duration, }; +use crossbeam_channel::bounded; use simplelog::*; use tokio::runtime::Handle; @@ -30,7 +30,7 @@ pub fn player( let config = GlobalConfig::global(); let dec_settings = config.processing.clone().settings.unwrap(); let ff_log_format = format!("level+{}", config.logging.ffmpeg_level.to_lowercase()); - let mut buffer: [u8; 65088] = [0; 65088]; + let mut buffer = [0; 65088]; let mut live_on = false; let playlist_init = playout_stat.list_init.clone(); @@ -53,17 +53,13 @@ pub fn player( rt_handle.spawn(stderr_reader(enc_proc.stderr.take().unwrap(), "Encoder")); *proc_control.decoder_term.lock().unwrap() = Some(enc_proc); - // too small value for sync_channel size increases CPU load, - // large values leave packets in queue which creates artifacts - let (ingest_sender, ingest_receiver): ( - SyncSender<(usize, [u8; 65088])>, - Receiver<(usize, [u8; 65088])>, - ) = sync_channel(16); + // too small value for channel size increases CPU load, + let (ingest_sender, ingest_receiver) = bounded(56); if config.ingest.enable { rt_handle.spawn(ingest_server( ff_log_format.clone(), - ingest_sender, + ingest_sender.clone(), rt_handle.clone(), proc_control.clone(), )); @@ -152,6 +148,9 @@ pub fn player( error!("Encoder error: {e}") } + let trashcan: Vec<_> = ingest_receiver.try_iter().collect(); + drop(trashcan); + live_on = false; } From f3eb5da16f26d2195d905c0695cf8ce2afefdf11 Mon Sep 17 00:00:00 2001 From: jb-alvarado Date: Mon, 18 Apr 2022 14:36:54 +0200 Subject: [PATCH 6/8] better write out, then drop --- src/output/mod.rs | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/output/mod.rs b/src/output/mod.rs index 19b309be..c4c59c21 100644 --- a/src/output/mod.rs +++ b/src/output/mod.rs @@ -129,7 +129,6 @@ pub fn player( } live_on = true; - *playlist_init.lock().unwrap() = true; } @@ -148,8 +147,15 @@ pub fn player( error!("Encoder error: {e}") } - let trashcan: Vec<_> = ingest_receiver.try_iter().collect(); - drop(trashcan); + let rest_from_receiver: Vec<_> = ingest_receiver.try_iter().collect(); + + for rest in rest_from_receiver { + if let Err(e) = enc_writer.write(&rest.1[..rest.0]) { + error!("Encoder write error: {:?}", e); + + break 'source_iter; + }; + } live_on = false; } From 0c2ee7726bf6c461c8fb3e8e7598a3ad7d4ab934 Mon Sep 17 00:00:00 2001 From: jb-alvarado Date: Mon, 18 Apr 2022 20:47:17 +0200 Subject: [PATCH 7/8] switch from try_recv to try_iter --- src/output/mod.rs | 19 ++++--------------- 1 file changed, 4 insertions(+), 15 deletions(-) diff --git a/src/output/mod.rs b/src/output/mod.rs index c4c59c21..38d64174 100644 --- a/src/output/mod.rs +++ b/src/output/mod.rs @@ -53,8 +53,7 @@ pub fn player( rt_handle.spawn(stderr_reader(enc_proc.stderr.take().unwrap(), "Encoder")); *proc_control.decoder_term.lock().unwrap() = Some(enc_proc); - // too small value for channel size increases CPU load, - let (ingest_sender, ingest_receiver) = bounded(56); + let (ingest_sender, ingest_receiver) = bounded(96); if config.ingest.enable { rt_handle.spawn(ingest_server( @@ -132,9 +131,9 @@ pub fn player( *playlist_init.lock().unwrap() = true; } - if let Ok(receive) = ingest_receiver.try_recv() { - if let Err(e) = enc_writer.write(&receive.1[..receive.0]) { - error!("Ingest receiver error: {:?}", e); + for rx in ingest_receiver.try_iter() { + if let Err(e) = enc_writer.write(&rx.1[..rx.0]) { + error!("Encoder write error: {:?}", e); break 'source_iter; }; @@ -147,16 +146,6 @@ pub fn player( error!("Encoder error: {e}") } - let rest_from_receiver: Vec<_> = ingest_receiver.try_iter().collect(); - - for rest in rest_from_receiver { - if let Err(e) = enc_writer.write(&rest.1[..rest.0]) { - error!("Encoder write error: {:?}", e); - - break 'source_iter; - }; - } - live_on = false; } From 40e49b442d41b85eecc3b32de743aa2fd82ca0e2 Mon Sep 17 00:00:00 2001 From: jb-alvarado Date: Tue, 19 Apr 2022 17:44:14 +0200 Subject: [PATCH 8/8] change usage line, update --- Cargo.lock | 36 +++++++++++++++++++++--------------- Cargo.toml | 2 +- src/utils/arg_parse.rs | 3 ++- 3 files changed, 24 insertions(+), 17 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5e65c567..e077c98c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -94,16 +94,16 @@ dependencies = [ [[package]] name = "clap" -version = "3.1.8" +version = "3.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "71c47df61d9e16dc010b55dba1952a57d8c215dbb533fd13cdd13369aac73b1c" +checksum = "6aad2534fad53df1cc12519c5cda696dd3e20e6118a027e24054aea14a0bdcbe" dependencies = [ "atty", "bitflags", "clap_derive", + "clap_lex", "indexmap", "lazy_static", - "os_str_bytes", "strsim", "termcolor", "textwrap", @@ -122,6 +122,15 @@ dependencies = [ "syn", ] +[[package]] +name = "clap_lex" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "189ddd3b5d32a70b35e7686054371742a937b0d99128e76dde6340210e966669" +dependencies = [ + "os_str_bytes", +] + [[package]] name = "core-foundation" version = "0.9.3" @@ -187,7 +196,7 @@ dependencies = [ [[package]] name = "ffplayout-engine" -version = "0.9.3" +version = "0.9.4" dependencies = [ "chrono", "clap", @@ -233,9 +242,9 @@ dependencies = [ [[package]] name = "filetime" -version = "0.2.15" +version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "975ccf83d8d9d0d84682850a38c8169027be83368805971cc4f238c2b245bc98" +checksum = "c0408e2626025178a6a7f7ffc05a25bc47103229f19c113755de7bf63816290c" dependencies = [ "cfg-if 1.0.0", "libc", @@ -672,9 +681,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.123" +version = "0.2.124" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cb691a747a7ab48abc15c5b42066eaafde10dc427e3b6ee2a1cf43db04c763bd" +checksum = "21a41fed9d98f27ab1c6d161da622a4fa35e8a54a8adc24bbf3ddd0ef70b0e50" [[package]] name = "linked-hash-map" @@ -955,9 +964,6 @@ name = "os_str_bytes" version = "6.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8e22443d1643a904602595ba1cd8f7d896afe56d26712531c5ff73a15b2fbf64" -dependencies = [ - "memchr", -] [[package]] name = "paris" @@ -1384,9 +1390,9 @@ checksum = "360dfd1d6d30e05fda32ace2c8c70e9c0a9da713275777f5a4dbb8a1893930c6" [[package]] name = "tracing" -version = "0.1.33" +version = "0.1.34" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "80b9fa4360528139bc96100c160b7ae879f5567f49f1782b0b02035b0358ebf3" +checksum = "5d0ecdcb44a79f0fe9844f0c4f33a342cbcbb5117de8001e6ba0dc2351327d09" dependencies = [ "cfg-if 1.0.0", "pin-project-lite", @@ -1395,9 +1401,9 @@ dependencies = [ [[package]] name = "tracing-core" -version = "0.1.25" +version = "0.1.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6dfce9f3241b150f36e8e54bb561a742d5daa1a47b5dd9a5ce369fd4a4db2210" +checksum = "f54c8ca710e81886d498c2fd3331b56c93aa248d49de2222ad2742247c60072f" dependencies = [ "lazy_static", ] diff --git a/Cargo.toml b/Cargo.toml index ca634de8..76027829 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,7 +4,7 @@ description = "24/7 playout based on rust and ffmpeg" license = "GPL-3.0" authors = ["Jonathan Baecker jonbae77@gmail.com"] readme = "README.md" -version = "0.9.3" +version = "0.9.4" edition = "2021" [dependencies] diff --git a/src/utils/arg_parse.rs b/src/utils/arg_parse.rs index cfa5146e..ff612d34 100644 --- a/src/utils/arg_parse.rs +++ b/src/utils/arg_parse.rs @@ -2,7 +2,8 @@ use clap::Parser; #[derive(Parser, Debug)] #[clap(version, - about = "ffplayout, Rust based 24/7 playout solution.\n\nRun without any command to use config file only, or with commands to override parameters.", + about = "ffplayout, Rust based 24/7 playout solution.", + override_usage = "Run without any command to use config file only, or with commands to override parameters:\n\n ffplayout [OPTIONS]", long_about = None)] pub struct Args { #[clap(short, long, help = "File path to ffplayout.yml")]