very basic ingest rtmp auth

This commit is contained in:
jb-alvarado 2022-05-25 18:00:33 +02:00
parent 520c9d4331
commit 492085c9fd
4 changed files with 69 additions and 32 deletions

View File

@ -1,6 +1,6 @@
use std::{ use std::{
io::{BufReader, Error, Read}, io::{BufRead, BufReader, Error, Read},
process::{Command, Stdio}, process::{ChildStderr, Command, Stdio},
sync::atomic::Ordering, sync::atomic::Ordering,
thread, thread,
}; };
@ -9,20 +9,59 @@ use crossbeam_channel::Sender;
use simplelog::*; use simplelog::*;
use crate::filter::ingest_filter::filter_cmd; use crate::filter::ingest_filter::filter_cmd;
use crate::utils::{stderr_reader, GlobalConfig, Ingest, ProcessControl}; use crate::utils::{format_log_line, GlobalConfig, Ingest, ProcessControl};
use crate::vec_strings; use crate::vec_strings;
fn server_monitor(
level: &str,
buffer: BufReader<ChildStderr>,
mut proc_ctl: ProcessControl,
) -> Result<(), Error> {
for line in buffer.lines() {
let line = line?;
if line.contains("[info]") && level.to_lowercase() == "info" {
info!(
"<bright black>[Server]</> {}",
format_log_line(line.clone(), "info")
)
} else if line.contains("[warning]")
&& (level.to_lowercase() == "warning" || level.to_lowercase() == "info")
{
warn!(
"<bright black>[Server]</> {}",
format_log_line(line.clone(), "warning")
)
} else if line.contains("[error]")
&& !line.contains("Input/output error")
&& !line.contains("Broken pipe")
{
error!(
"<bright black>[Server]</> {}",
format_log_line(line.clone(), "error")
);
}
if line.contains("rtmp") && line.contains("Unexpected stream") {
if let Err(e) = proc_ctl.kill(Ingest) {
error!("{e}");
};
}
}
Ok(())
}
/// ffmpeg Ingest Server /// ffmpeg Ingest Server
/// ///
/// Start ffmpeg in listen mode, and wait for input. /// Start ffmpeg in listen mode, and wait for input.
pub fn ingest_server( pub fn ingest_server(
config: GlobalConfig, config: GlobalConfig,
log_format: String,
ingest_sender: Sender<(usize, [u8; 65088])>, ingest_sender: Sender<(usize, [u8; 65088])>,
mut proc_control: ProcessControl, mut proc_control: ProcessControl,
) -> Result<(), Error> { ) -> Result<(), Error> {
let mut buffer: [u8; 65088] = [0; 65088]; let mut buffer: [u8; 65088] = [0; 65088];
let mut server_cmd = vec_strings!["-hide_banner", "-nostats", "-v", log_format]; let mut server_cmd = vec_strings!["-hide_banner", "-nostats", "-v", "level+info"];
let stream_input = config.ingest.input_cmd.clone().unwrap(); let stream_input = config.ingest.input_cmd.clone().unwrap();
server_cmd.append(&mut stream_input.clone()); server_cmd.append(&mut stream_input.clone());
@ -41,6 +80,8 @@ pub fn ingest_server(
); );
while !proc_control.is_terminated.load(Ordering::SeqCst) { while !proc_control.is_terminated.load(Ordering::SeqCst) {
let proc_ctl = proc_control.clone();
let level = config.logging.ffmpeg_level.clone();
let mut server_proc = match Command::new("ffmpeg") let mut server_proc = match Command::new("ffmpeg")
.args(server_cmd.clone()) .args(server_cmd.clone())
.stdout(Stdio::piped()) .stdout(Stdio::piped())
@ -55,7 +96,8 @@ pub fn ingest_server(
}; };
let mut ingest_reader = BufReader::new(server_proc.stdout.take().unwrap()); let mut ingest_reader = BufReader::new(server_proc.stdout.take().unwrap());
let server_err = BufReader::new(server_proc.stderr.take().unwrap()); let server_err = BufReader::new(server_proc.stderr.take().unwrap());
let error_reader_thread = thread::spawn(move || stderr_reader(server_err, "Server")); let error_reader_thread =
thread::spawn(move || server_monitor(&level, server_err, proc_ctl));
*proc_control.server_term.lock().unwrap() = Some(server_proc); *proc_control.server_term.lock().unwrap() = Some(server_proc);
is_running = false; is_running = false;

View File

@ -30,15 +30,11 @@ use simplelog::*;
use crate::filter::ingest_filter::filter_cmd; use crate::filter::ingest_filter::filter_cmd;
use crate::input::source_generator; use crate::input::source_generator;
use crate::utils::{ use crate::utils::{
sec_to_time, stderr_reader, Decoder, GlobalConfig, Ingest, PlayerControl, PlayoutStatus, format_log_line, sec_to_time, stderr_reader, Decoder, GlobalConfig, Ingest, PlayerControl,
ProcessControl, PlayoutStatus, ProcessControl,
}; };
use crate::vec_strings; use crate::vec_strings;
fn format_line(line: String, level: &str) -> String {
line.replace(&format!("[{level: >5}] "), "")
}
/// Ingest Server for HLS /// Ingest Server for HLS
fn ingest_to_hls_server( fn ingest_to_hls_server(
config: GlobalConfig, config: GlobalConfig,
@ -66,6 +62,7 @@ fn ingest_to_hls_server(
); );
loop { loop {
let mut proc_ctl = proc_control.clone();
let mut server_proc = match Command::new("ffmpeg") let mut server_proc = match Command::new("ffmpeg")
.args(server_cmd.clone()) .args(server_cmd.clone())
.stderr(Stdio::piped()) .stderr(Stdio::piped())
@ -103,9 +100,15 @@ fn ingest_to_hls_server(
{ {
error!( error!(
"<bright black>[server]</> {}", "<bright black>[server]</> {}",
format_line(line.clone(), "error") format_log_line(line.clone(), "error")
); );
} }
if line.contains("rtmp") && line.contains("Unexpected stream") {
if let Err(e) = proc_ctl.kill(Ingest) {
error!("{e}");
};
}
} }
info!("Switch from live ingest to {}", config.processing.mode); info!("Switch from live ingest to {}", config.processing.mode);

View File

@ -66,7 +66,6 @@ pub fn player(
*proc_control.encoder_term.lock().unwrap() = Some(enc_proc); *proc_control.encoder_term.lock().unwrap() = Some(enc_proc);
let ff_log_format_c = ff_log_format.clone();
let proc_control_c = proc_control.clone(); let proc_control_c = proc_control.clone();
let mut ingest_receiver = None; let mut ingest_receiver = None;
@ -74,9 +73,7 @@ pub fn player(
if config.ingest.enable { if config.ingest.enable {
let (ingest_sender, rx) = bounded(96); let (ingest_sender, rx) = bounded(96);
ingest_receiver = Some(rx); ingest_receiver = Some(rx);
thread::spawn(move || { thread::spawn(move || ingest_server(config_clone, ingest_sender, proc_control_c));
ingest_server(config_clone, ff_log_format_c, ingest_sender, proc_control_c)
});
} }
'source_iter: for node in get_source { 'source_iter: for node in get_source {

View File

@ -361,31 +361,26 @@ pub fn seek_and_length(src: String, seek: f64, out: f64, duration: f64) -> Vec<S
source_cmd source_cmd
} }
/// Read ffmpeg stderr decoder, encoder and server instance pub fn format_log_line(line: String, level: &str) -> String {
line.replace(&format!("[{level: >5}] "), "")
}
/// Read ffmpeg stderr decoder and encoder instance
/// and log the output. /// and log the output.
pub fn stderr_reader(buffer: BufReader<ChildStderr>, suffix: &str) -> Result<(), Error> { pub fn stderr_reader(buffer: BufReader<ChildStderr>, suffix: &str) -> Result<(), Error> {
fn format_line(line: String, level: &str) -> String {
line.replace(&format!("[{level: >5}] "), "")
}
for line in buffer.lines() { for line in buffer.lines() {
let line = line?; let line = line?;
if line.contains("[info]") { if line.contains("[info]") {
info!("<bright black>[{suffix}]</> {}", format_line(line, "info")) info!(
"<bright black>[{suffix}]</> {}",
format_log_line(line, "info")
)
} else if line.contains("[warning]") { } else if line.contains("[warning]") {
warn!( warn!(
"<bright black>[{suffix}]</> {}", "<bright black>[{suffix}]</> {}",
format_line(line, "warning") format_log_line(line, "warning")
) )
} else if suffix != "server"
&& !line.contains("Input/output error")
&& !line.contains("Broken pipe")
{
error!(
"<bright black>[{suffix}]</> {}",
format_line(line.clone(), "error")
);
} }
} }