higher sync_channel buffer size, put enc_writer out of loop, better cmd logging

This commit is contained in:
jb-alvarado 2022-03-22 11:45:24 +01:00
parent 22e4757059
commit 961ede69a1
7 changed files with 36 additions and 63 deletions

2
Cargo.lock generated
View File

@ -143,7 +143,7 @@ dependencies = [
[[package]]
name = "ffplayout-rs"
version = "0.5.0"
version = "0.6.0"
dependencies = [
"chrono",
"clap",

View File

@ -1,6 +1,6 @@
[package]
name = "ffplayout-rs"
version = "0.5.0"
version = "0.6.0"
edition = "2021"
[dependencies]

View File

@ -1,32 +0,0 @@
use regex::Regex;
use serde::{Deserialize, Serialize};
use serde_yaml::{self};
use shlex::split;
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Processing {
pub mode: String,
pub volume: f64,
pub settings: String,
}
fn main() {
let s = r#"
mode: "playlist"
volume: 0.5
settings: -i input.mp4 -c:v libx264 -metadata service_provider='ffplayout Inc.' -f mpegts out.mp4
"#;
let config: Processing =
serde_yaml::from_str(s).expect("Could not read config");
let pattern = Regex::new(r#"[^\s"']+|"([^"]*)"|'([^']*)'"#).unwrap();
let matches: Vec<String> = pattern
.find_iter(config.settings.as_str())
.map(|m| m.as_str().to_string())
.collect();
println!("{:#?}", matches);
println!("{:#?}", split(config.settings.as_str()));
}

View File

@ -98,7 +98,7 @@ pub async fn ingest_server(
stream_input.last().unwrap()
);
debug!("Server CMD: <bright-blue>{:?}</>", server_cmd);
debug!("Server CMD: <bright-blue>\"ffmpeg {}\"</>", server_cmd.join(" "));
loop {
if *is_terminated.lock().unwrap() {

View File

@ -5,8 +5,8 @@ use std::{
use simplelog::*;
use crate::utils::{GlobalConfig, Media};
use crate::filter::v_drawtext;
use crate::utils::{GlobalConfig, Media};
pub fn output(log_format: String) -> process::Child {
let config = GlobalConfig::global();
@ -35,7 +35,7 @@ pub fn output(log_format: String) -> process::Child {
enc_cmd.append(&mut enc_filter.iter().map(String::as_str).collect());
debug!("Encoder CMD: <bright-blue>{:?}</>", enc_cmd);
debug!("Encoder CMD: <bright-blue>\"ffplay {}\"</>", enc_cmd.join(" "));
let enc_proc = match Command::new("ffplay")
.args(enc_cmd)

View File

@ -1,6 +1,6 @@
use notify::{watcher, RecursiveMode, Watcher};
use std::{
io::{prelude::*, BufReader, Read},
io::{prelude::*, BufReader, BufWriter, Read},
path::Path,
process,
process::{Command, Stdio},
@ -77,6 +77,8 @@ pub fn play(rt_handle: &Handle) {
_ => panic!("Output mode doesn't exists!"),
};
let mut enc_writer = BufWriter::new(enc_proc.stdin.take().unwrap());
rt_handle.spawn(stderr_reader(
enc_proc.stderr.take().unwrap(),
"Encoder".to_string(),
@ -85,7 +87,7 @@ pub fn play(rt_handle: &Handle) {
let (ingest_sender, ingest_receiver): (
SyncSender<(usize, [u8; 65088])>,
Receiver<(usize, [u8; 65088])>,
) = sync_channel(1);
) = sync_channel(8);
if config.ingest.enable {
rt_handle.spawn(ingest_server(
@ -114,9 +116,9 @@ pub fn play(rt_handle: &Handle) {
node.source
);
let mut kill_dec = true;
let filter = node.filter.unwrap();
let mut dec_cmd = vec!["-hide_banner", "-nostats", "-v", ff_log_format.as_str()];
dec_cmd.append(&mut cmd.iter().map(String::as_str).collect());
if filter.len() > 1 {
@ -124,7 +126,8 @@ pub fn play(rt_handle: &Handle) {
}
dec_cmd.append(&mut dec_settings.iter().map(String::as_str).collect());
debug!("Decoder CMD: <bright-blue>{:?}</>", dec_cmd);
debug!("Decoder CMD: <bright-blue>\"ffmpeg {}\"</>", dec_cmd.join(" "));
let mut dec_proc = match Command::new("ffmpeg")
.args(dec_cmd)
@ -139,7 +142,6 @@ pub fn play(rt_handle: &Handle) {
Ok(proc) => proc,
};
let mut enc_writer = enc_proc.stdin.as_ref().unwrap();
let mut dec_reader = BufReader::new(dec_proc.stdout.take().unwrap());
rt_handle.spawn(stderr_reader(
@ -147,23 +149,15 @@ pub fn play(rt_handle: &Handle) {
"Decoder".to_string(),
));
let mut kill_dec = true;
loop {
if *server_is_running.lock().unwrap() {
if let Ok(receive) = ingest_receiver.try_recv() {
if let Err(e) = enc_writer.write(&receive.1[..receive.0]) {
error!("Ingest receiver error: {:?}", e);
break 'source_iter;
};
}
live_on = true;
if kill_dec {
info!("Switch from {} to live ingest", config.processing.mode);
if let Err(e) = enc_writer.flush() {
error!("Encoder error: {e}")
}
if let Err(e) = dec_proc.kill() {
error!("Decoder error: {e}")
};
@ -173,12 +167,31 @@ pub fn play(rt_handle: &Handle) {
};
kill_dec = false;
live_on = true;
if let Some(init) = &init_playlist {
*init.lock().unwrap() = true;
}
}
if let Ok(receive) = ingest_receiver.try_recv() {
if let Err(e) = enc_writer.write(&receive.1[..receive.0]) {
error!("Ingest receiver error: {:?}", e);
break 'source_iter;
};
}
} else {
if live_on {
info!("Switch from live ingest to {}", config.processing.mode);
if let Err(e) = enc_writer.flush() {
error!("Encoder error: {e}")
}
live_on = false;
}
let dec_bytes_len = match dec_reader.read(&mut buffer[..]) {
Ok(length) => length,
Err(e) => {
@ -195,14 +208,6 @@ pub fn play(rt_handle: &Handle) {
break 'source_iter;
};
} else {
if live_on {
info!("Switch from live ingest to {}", config.processing.mode);
live_on = false;
}
enc_writer.flush().unwrap();
break;
}
}

View File

@ -51,7 +51,7 @@ pub fn output(log_format: String) -> process::Child {
enc_cmd.append(&mut preview);
enc_cmd.append(&mut output_cmd.iter().map(String::as_str).collect());
debug!("Encoder CMD: <bright-blue>{:?}</>", enc_cmd);
debug!("Encoder CMD: <bright-blue>\"ffmpeg {}\"</>", enc_cmd.join(" "));
let enc_proc = match Command::new("ffmpeg")
.args(enc_cmd)