diff --git a/src/main.rs b/src/main.rs index 249a288e..769ec4a1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,7 +7,7 @@ mod utils; use simplelog::*; -use crate::output::desktop; +use crate::output::play; use crate::utils::{get_config, init_logging}; fn main() { @@ -16,8 +16,5 @@ fn main() { CombinedLogger::init(logging).unwrap(); - // warn!("this is a warning"); - // error!("this is a error"); - - desktop::play(config); + play(config); } diff --git a/src/output/desktop.rs b/src/output/desktop.rs index 64f3d0f2..2c27d73e 100644 --- a/src/output/desktop.rs +++ b/src/output/desktop.rs @@ -1,76 +1,24 @@ -use notify::{watcher, RecursiveMode, Watcher}; use std::{ - io::{prelude::*, Read}, - path::Path, process, process::{Command, Stdio}, - sync::{mpsc::channel, Arc, Mutex}, - thread::sleep, - time::Duration, }; -use tokio::runtime::Builder; - use simplelog::*; -use crate::utils::{sec_to_time, watch_folder, Config, CurrentProgram, Media, Source}; +use crate::utils::Config; -pub fn play(config: Config) { - let dec_pid: Arc> = Arc::new(Mutex::new(0)); - let runtime = Builder::new_multi_thread() - .worker_threads(1) - .enable_all() - .build() - .unwrap(); +pub fn output(config: Config, log_format: String) -> process::Child { + let mut enc_filter: Vec = vec![]; - let get_source = match config.processing.mode.clone().as_str() { - "folder" => { - let path = config.storage.path.clone(); - if !Path::new(&path).exists() { - error!("Folder path not exists: '{path}'"); - process::exit(0x0100); - } - - info!("Playout in folder mode."); - - let folder_source = Source::new(config.clone()); - let (sender, receiver) = channel(); - let mut watcher = watcher(sender, Duration::from_secs(2)).unwrap(); - - watcher - .watch(path.clone(), RecursiveMode::Recursive) - .unwrap(); - - debug!("Monitor folder: {}", path); - - runtime.spawn(watch_folder(receiver, Arc::clone(&folder_source.nodes))); - - Box::new(folder_source) as Box> - } - "playlist" => { - info!("Playout in playlist mode."); - Box::new(CurrentProgram::new(config.clone())) as Box> - } - _ => { - error!("Process Mode not exists!"); - process::exit(0x0100); - } - }; - // let get_source = CurrentProgram::new(config.clone()); - let dec_settings = config.processing.settings.unwrap(); - let ff_log_format = format!("level+{}", config.logging.ffmpeg_level); let mut enc_cmd = vec![ "-hide_banner", "-nostats", "-v", - ff_log_format.as_str(), + log_format.as_str(), "-i", "pipe:0", ]; - let mut enc_filter: Vec = vec![]; - let mut buffer: [u8; 65424] = [0; 65424]; - if config.text.add_text && !config.text.over_pre { let text_filter: String = format!( "null,zmq=b=tcp\\\\://'{}',drawtext=text='':fontfile='{}'", @@ -85,7 +33,7 @@ pub fn play(config: Config) { debug!("Encoder CMD: {:?}", enc_cmd); - let mut enc_proc = match Command::new("ffplay") + let enc_proc = match Command::new("ffplay") .args(enc_cmd) .stdin(Stdio::piped()) // .stderr(Stdio::piped()) @@ -98,82 +46,5 @@ pub fn play(config: Config) { Ok(proc) => proc, }; - for node in get_source { - let cmd = match node.cmd { - Some(cmd) => cmd, - None => break, - }; - - // this is only for testing... - sleep(Duration::from_secs(1)); - - if !node.process.unwrap() { - continue; - } - - info!( - "Play for {}: {}", - sec_to_time(node.out - node.seek), - node.source - ); - - let filter = node.filter.unwrap(); - - let mut dec_cmd = vec!["-v", ff_log_format.as_str(), "-hide_banner", "-nostats"]; - - dec_cmd.append(&mut cmd.iter().map(String::as_str).collect()); - - if filter.len() > 1 { - dec_cmd.append(&mut filter.iter().map(String::as_str).collect()); - } - - dec_cmd.append(&mut dec_settings.iter().map(String::as_str).collect()); - debug!("Decoder CMD: {:?}", dec_cmd); - - let mut dec_proc = match Command::new("ffmpeg") - .args(dec_cmd) - .stdout(Stdio::piped()) - // .stderr(Stdio::piped()) - .spawn() - { - Err(e) => { - error!("couldn't spawn decoder process: {}", e); - panic!("couldn't spawn decoder process: {}", e) - } - Ok(proc) => proc, - }; - - *dec_pid.lock().unwrap() = dec_proc.id(); - - let mut enc_writer = enc_proc.stdin.as_ref().unwrap(); - let dec_reader = dec_proc.stdout.as_mut().unwrap(); - - debug!("Decoder PID: {}", dec_pid.lock().unwrap()); - - loop { - let dec_bytes_len = match dec_reader.read(&mut buffer[..]) { - Ok(length) => length, - Err(e) => panic!("Reading error from decoder: {:?}", e), - }; - - if let Err(e) = enc_writer.write(&buffer[..dec_bytes_len]) { - panic!("Err: {:?}", e) - }; - - if dec_bytes_len == 0 { - break; - }; - } - - if let Err(e) = dec_proc.wait() { - panic!("Decoder error: {:?}", e) - }; - } - - sleep(Duration::from_secs(1)); - - match enc_proc.kill() { - Ok(_) => info!("Playout done..."), - Err(e) => panic!("Encoder error: {:?}", e), - } + enc_proc } diff --git a/src/output/mod.rs b/src/output/mod.rs index 5862a5be..53d79183 100644 --- a/src/output/mod.rs +++ b/src/output/mod.rs @@ -1,3 +1,150 @@ -pub mod desktop; +use notify::{watcher, RecursiveMode, Watcher}; +use std::{ + io::{prelude::*, Read}, + path::Path, + process, + process::{Command, Stdio}, + sync::{mpsc::channel, Arc, Mutex}, + thread::sleep, + time::Duration, +}; -pub use desktop::play; +use tokio::runtime::Builder; + +use simplelog::*; + +mod desktop; +mod stream; + +use crate::utils::{sec_to_time, watch_folder, Config, CurrentProgram, Media, Source}; + +pub fn play(config: Config) { + let dec_pid: Arc> = Arc::new(Mutex::new(0)); + let runtime = Builder::new_multi_thread() + .worker_threads(1) + .enable_all() + .build() + .unwrap(); + + let get_source = match config.processing.mode.clone().as_str() { + "folder" => { + let path = config.storage.path.clone(); + if !Path::new(&path).exists() { + error!("Folder path not exists: '{path}'"); + process::exit(0x0100); + } + + info!("Playout in folder mode."); + + let folder_source = Source::new(config.clone()); + let (sender, receiver) = channel(); + let mut watcher = watcher(sender, Duration::from_secs(2)).unwrap(); + + watcher + .watch(path.clone(), RecursiveMode::Recursive) + .unwrap(); + + debug!("Monitor folder: {}", path); + + runtime.spawn(watch_folder(receiver, Arc::clone(&folder_source.nodes))); + + Box::new(folder_source) as Box> + } + "playlist" => { + info!("Playout in playlist mode."); + Box::new(CurrentProgram::new(config.clone())) as Box> + } + _ => { + error!("Process Mode not exists!"); + process::exit(0x0100); + } + }; + + let config_clone = config.clone(); + let dec_settings = config.processing.settings.unwrap(); + let ff_log_format = format!("level+{}", config.logging.ffmpeg_level); + + let mut enc_proc = match config.out.mode.as_str() { + "desktop" => desktop::output(config_clone, ff_log_format.clone()), + "stream" => stream::output(config_clone, ff_log_format.clone()), + _ => panic!("Output mode doesn't exists!") + }; + + let mut buffer: [u8; 65424] = [0; 65424]; + + for node in get_source { + let cmd = match node.cmd { + Some(cmd) => cmd, + None => break, + }; + + if !node.process.unwrap() { + continue; + } + + info!( + "Play for {}: {}", + sec_to_time(node.out - node.seek), + node.source + ); + + let filter = node.filter.unwrap(); + + let mut dec_cmd = vec!["-v", ff_log_format.as_str(), "-hide_banner", "-nostats"]; + + dec_cmd.append(&mut cmd.iter().map(String::as_str).collect()); + + if filter.len() > 1 { + dec_cmd.append(&mut filter.iter().map(String::as_str).collect()); + } + + dec_cmd.append(&mut dec_settings.iter().map(String::as_str).collect()); + debug!("Decoder CMD: {:?}", dec_cmd); + + let mut dec_proc = match Command::new("ffmpeg") + .args(dec_cmd) + .stdout(Stdio::piped()) + // .stderr(Stdio::piped()) + .spawn() + { + Err(e) => { + error!("couldn't spawn decoder process: {}", e); + panic!("couldn't spawn decoder process: {}", e) + } + Ok(proc) => proc, + }; + + *dec_pid.lock().unwrap() = dec_proc.id(); + + let mut enc_writer = enc_proc.stdin.as_ref().unwrap(); + let dec_reader = dec_proc.stdout.as_mut().unwrap(); + + // debug!("Decoder PID: {}", dec_pid.lock().unwrap()); + + loop { + let dec_bytes_len = match dec_reader.read(&mut buffer[..]) { + Ok(length) => length, + Err(e) => panic!("Reading error from decoder: {:?}", e), + }; + + if let Err(e) = enc_writer.write(&buffer[..dec_bytes_len]) { + panic!("Err: {:?}", e) + }; + + if dec_bytes_len == 0 { + break; + }; + } + + if let Err(e) = dec_proc.wait() { + panic!("Decoder error: {:?}", e) + }; + } + + sleep(Duration::from_secs(1)); + + match enc_proc.kill() { + Ok(_) => info!("Playout done..."), + Err(e) => panic!("Encoder error: {:?}", e), + } +} diff --git a/src/output/stream.rs b/src/output/stream.rs new file mode 100644 index 00000000..c8e9b03f --- /dev/null +++ b/src/output/stream.rs @@ -0,0 +1,52 @@ +use std::{ + process, + process::{Command, Stdio}, +}; + +use simplelog::*; + +use crate::utils::Config; + +pub fn output(config: Config, log_format: String) -> process::Child { + let mut enc_filter: Vec = vec![]; + + let mut enc_cmd = vec![ + "-hide_banner", + "-nostats", + "-v", + log_format.as_str(), + "-re", + "-i", + "pipe:0", + ]; + + if config.text.add_text && !config.text.over_pre { + let text_filter: String = format!( + "null,zmq=b=tcp\\\\://'{}',drawtext=text='':fontfile='{}'", + config.text.bind_address.replace(":", "\\:"), + config.text.fontfile + ); + + enc_filter = vec!["-vf".to_string(), text_filter]; + } + + enc_cmd.append(&mut enc_filter.iter().map(String::as_str).collect()); + enc_cmd.append(&mut config.out.stream_param.iter().map(String::as_str).collect()); + + debug!("Encoder CMD: {:?}", enc_cmd); + + let enc_proc = match Command::new("ffmpeg") + .args(enc_cmd) + .stdin(Stdio::piped()) + // .stderr(Stdio::piped()) + .spawn() + { + Err(e) => { + error!("couldn't spawn encoder process: {}", e); + panic!("couldn't spawn encoder process: {}", e) + } + Ok(proc) => proc, + }; + + enc_proc +}