From 961ede69a1f45ace5253414d84169896b4703399 Mon Sep 17 00:00:00 2001 From: jb-alvarado Date: Tue, 22 Mar 2022 11:45:24 +0100 Subject: [PATCH] higher sync_channel buffer size, put enc_writer out of loop, better cmd logging --- Cargo.lock | 2 +- Cargo.toml | 2 +- examples/string_to_arr.rs | 32 ----------------------- src/input/ingest.rs | 2 +- src/output/desktop.rs | 4 +-- src/output/mod.rs | 55 +++++++++++++++++++++------------------ src/output/stream.rs | 2 +- 7 files changed, 36 insertions(+), 63 deletions(-) delete mode 100644 examples/string_to_arr.rs diff --git a/Cargo.lock b/Cargo.lock index 3093b5e6..9147df5e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -143,7 +143,7 @@ dependencies = [ [[package]] name = "ffplayout-rs" -version = "0.5.0" +version = "0.6.0" dependencies = [ "chrono", "clap", diff --git a/Cargo.toml b/Cargo.toml index 2b8a931a..df757c7f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ffplayout-rs" -version = "0.5.0" +version = "0.6.0" edition = "2021" [dependencies] diff --git a/examples/string_to_arr.rs b/examples/string_to_arr.rs deleted file mode 100644 index 1e9eecd5..00000000 --- a/examples/string_to_arr.rs +++ /dev/null @@ -1,32 +0,0 @@ -use regex::Regex; -use serde::{Deserialize, Serialize}; -use serde_yaml::{self}; - -use shlex::split; - -#[derive(Debug, Serialize, Deserialize, Clone)] -pub struct Processing { - pub mode: String, - pub volume: f64, - pub settings: String, -} - -fn main() { - let s = r#" -mode: "playlist" -volume: 0.5 -settings: -i input.mp4 -c:v libx264 -metadata service_provider='ffplayout Inc.' -f mpegts out.mp4 -"#; - let config: Processing = - serde_yaml::from_str(s).expect("Could not read config"); - - let pattern = Regex::new(r#"[^\s"']+|"([^"]*)"|'([^']*)'"#).unwrap(); - - let matches: Vec = pattern - .find_iter(config.settings.as_str()) - .map(|m| m.as_str().to_string()) - .collect(); - - println!("{:#?}", matches); - println!("{:#?}", split(config.settings.as_str())); -} diff --git a/src/input/ingest.rs b/src/input/ingest.rs index 5f1b1900..852128b3 100644 --- a/src/input/ingest.rs +++ b/src/input/ingest.rs @@ -98,7 +98,7 @@ pub async fn ingest_server( stream_input.last().unwrap() ); - debug!("Server CMD: {:?}", server_cmd); + debug!("Server CMD: \"ffmpeg {}\"", server_cmd.join(" ")); loop { if *is_terminated.lock().unwrap() { diff --git a/src/output/desktop.rs b/src/output/desktop.rs index 6ca46561..59c214bb 100644 --- a/src/output/desktop.rs +++ b/src/output/desktop.rs @@ -5,8 +5,8 @@ use std::{ use simplelog::*; -use crate::utils::{GlobalConfig, Media}; use crate::filter::v_drawtext; +use crate::utils::{GlobalConfig, Media}; pub fn output(log_format: String) -> process::Child { let config = GlobalConfig::global(); @@ -35,7 +35,7 @@ pub fn output(log_format: String) -> process::Child { enc_cmd.append(&mut enc_filter.iter().map(String::as_str).collect()); - debug!("Encoder CMD: {:?}", enc_cmd); + debug!("Encoder CMD: \"ffplay {}\"", enc_cmd.join(" ")); let enc_proc = match Command::new("ffplay") .args(enc_cmd) diff --git a/src/output/mod.rs b/src/output/mod.rs index 673d1a92..7d09e372 100644 --- a/src/output/mod.rs +++ b/src/output/mod.rs @@ -1,6 +1,6 @@ use notify::{watcher, RecursiveMode, Watcher}; use std::{ - io::{prelude::*, BufReader, Read}, + io::{prelude::*, BufReader, BufWriter, Read}, path::Path, process, process::{Command, Stdio}, @@ -77,6 +77,8 @@ pub fn play(rt_handle: &Handle) { _ => panic!("Output mode doesn't exists!"), }; + let mut enc_writer = BufWriter::new(enc_proc.stdin.take().unwrap()); + rt_handle.spawn(stderr_reader( enc_proc.stderr.take().unwrap(), "Encoder".to_string(), @@ -85,7 +87,7 @@ pub fn play(rt_handle: &Handle) { let (ingest_sender, ingest_receiver): ( SyncSender<(usize, [u8; 65088])>, Receiver<(usize, [u8; 65088])>, - ) = sync_channel(1); + ) = sync_channel(8); if config.ingest.enable { rt_handle.spawn(ingest_server( @@ -114,9 +116,9 @@ pub fn play(rt_handle: &Handle) { node.source ); + let mut kill_dec = true; let filter = node.filter.unwrap(); let mut dec_cmd = vec!["-hide_banner", "-nostats", "-v", ff_log_format.as_str()]; - dec_cmd.append(&mut cmd.iter().map(String::as_str).collect()); if filter.len() > 1 { @@ -124,7 +126,8 @@ pub fn play(rt_handle: &Handle) { } dec_cmd.append(&mut dec_settings.iter().map(String::as_str).collect()); - debug!("Decoder CMD: {:?}", dec_cmd); + + debug!("Decoder CMD: \"ffmpeg {}\"", dec_cmd.join(" ")); let mut dec_proc = match Command::new("ffmpeg") .args(dec_cmd) @@ -139,7 +142,6 @@ pub fn play(rt_handle: &Handle) { Ok(proc) => proc, }; - let mut enc_writer = enc_proc.stdin.as_ref().unwrap(); let mut dec_reader = BufReader::new(dec_proc.stdout.take().unwrap()); rt_handle.spawn(stderr_reader( @@ -147,23 +149,15 @@ pub fn play(rt_handle: &Handle) { "Decoder".to_string(), )); - let mut kill_dec = true; - loop { if *server_is_running.lock().unwrap() { - if let Ok(receive) = ingest_receiver.try_recv() { - if let Err(e) = enc_writer.write(&receive.1[..receive.0]) { - error!("Ingest receiver error: {:?}", e); - - break 'source_iter; - }; - } - - live_on = true; - if kill_dec { info!("Switch from {} to live ingest", config.processing.mode); + if let Err(e) = enc_writer.flush() { + error!("Encoder error: {e}") + } + if let Err(e) = dec_proc.kill() { error!("Decoder error: {e}") }; @@ -173,12 +167,31 @@ pub fn play(rt_handle: &Handle) { }; kill_dec = false; + live_on = true; if let Some(init) = &init_playlist { *init.lock().unwrap() = true; } } + + if let Ok(receive) = ingest_receiver.try_recv() { + if let Err(e) = enc_writer.write(&receive.1[..receive.0]) { + error!("Ingest receiver error: {:?}", e); + + break 'source_iter; + }; + } } else { + if live_on { + info!("Switch from live ingest to {}", config.processing.mode); + + if let Err(e) = enc_writer.flush() { + error!("Encoder error: {e}") + } + + live_on = false; + } + let dec_bytes_len = match dec_reader.read(&mut buffer[..]) { Ok(length) => length, Err(e) => { @@ -195,14 +208,6 @@ pub fn play(rt_handle: &Handle) { break 'source_iter; }; } else { - if live_on { - info!("Switch from live ingest to {}", config.processing.mode); - - live_on = false; - } - - enc_writer.flush().unwrap(); - break; } } diff --git a/src/output/stream.rs b/src/output/stream.rs index c7e0d034..39328788 100644 --- a/src/output/stream.rs +++ b/src/output/stream.rs @@ -51,7 +51,7 @@ pub fn output(log_format: String) -> process::Child { enc_cmd.append(&mut preview); enc_cmd.append(&mut output_cmd.iter().map(String::as_str).collect()); - debug!("Encoder CMD: {:?}", enc_cmd); + debug!("Encoder CMD: \"ffmpeg {}\"", enc_cmd.join(" ")); let enc_proc = match Command::new("ffmpeg") .args(enc_cmd)