Merge pull request #129 from jb-alvarado/master
fix ingest, add basic rtmp ingest auth
This commit is contained in:
commit
0a4d8d8ede
20
Cargo.lock
generated
20
Cargo.lock
generated
@ -195,7 +195,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "ffplayout-engine"
|
||||
version = "0.9.6"
|
||||
version = "0.9.7"
|
||||
dependencies = [
|
||||
"chrono",
|
||||
"clap",
|
||||
@ -251,13 +251,11 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "flate2"
|
||||
version = "1.0.23"
|
||||
version = "1.0.24"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b39522e96686d38f4bc984b9198e3a0613264abaebaff2c5c918bfa6b6da09af"
|
||||
checksum = "f82b0f4c27ad9f8bfd1f3208d882da2b09c301bc1c828fd3a00d0216d2fbbff6"
|
||||
dependencies = [
|
||||
"cfg-if 1.0.0",
|
||||
"crc32fast",
|
||||
"libc",
|
||||
"miniz_oxide",
|
||||
]
|
||||
|
||||
@ -498,9 +496,9 @@ checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421"
|
||||
|
||||
[[package]]
|
||||
name = "hyper"
|
||||
version = "0.14.18"
|
||||
version = "0.14.19"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b26ae0a80afebe130861d90abf98e3814a4f28a4c6ffeb5ab8ebb2be311e0ef2"
|
||||
checksum = "42dc3c131584288d375f2d07f822b0cb012d8c6fb899a5b9fdb3cb7eb9b6004f"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"futures-channel",
|
||||
@ -532,9 +530,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "indexmap"
|
||||
version = "1.8.1"
|
||||
version = "1.8.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0f647032dfaa1f8b6dc29bd3edb7bbef4861b8b8007ebb118d6db284fd59f6ee"
|
||||
checksum = "e6012d540c5baa3589337a98ce73408de9b5a25ec9fc2c6fd6be8f0d39e0ca5a"
|
||||
dependencies = [
|
||||
"autocfg",
|
||||
"hashbrown",
|
||||
@ -959,9 +957,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "os_str_bytes"
|
||||
version = "6.0.1"
|
||||
version = "6.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "029d8d0b2f198229de29dca79676f2738ff952edf3fde542eb8bf94d8c21b435"
|
||||
checksum = "21326818e99cfe6ce1e524c2a805c189a99b5ae555a35d19f9a284b427d86afa"
|
||||
|
||||
[[package]]
|
||||
name = "paris"
|
||||
|
@ -4,7 +4,7 @@ description = "24/7 playout based on rust and ffmpeg"
|
||||
license = "GPL-3.0"
|
||||
authors = ["Jonathan Baecker jonbae77@gmail.com"]
|
||||
readme = "README.md"
|
||||
version = "0.9.6"
|
||||
version = "0.9.7"
|
||||
edition = "2021"
|
||||
default-run = "ffplayout"
|
||||
|
||||
|
@ -14,8 +14,8 @@ When it notice a incoming stream, it will stop the current playing and continue
|
||||
|
||||
In rare cases it can happen, that for a short moment after switching the image freezes, but then it will continue. Also a short frame flickering can happen.
|
||||
|
||||
You need to know, that **ffmpeg in current version has no authentication mechanism and it just listen to the protocol and port (no path or app name).**
|
||||
You need to know, that **ffmpeg in current version has no authentication mechanism and it just listen to the protocol and port (no app and stream name).**
|
||||
|
||||
For security you should not expose the ingest to the world. You localhost only, with an relay/reverse proxy where you can make your authentication. You could also use a [patch](https://gist.github.com/jb-alvarado/f8ee1e7a3cf5e482e818338f2b62c95f) for ffmpeg, but there is no guarantee if this really works.
|
||||
ffplayout catches this problem with monitoring the output from ffmpeg. When the input is **rtmp** and the app or stream name differs to the config it stops the ingest process. So in a way we have a bit control, which stream we let come in and which not.
|
||||
|
||||
In theory you can use every [protocol](https://ffmpeg.org/ffmpeg-protocols.html) from ffmpeg which support a **listen** mode.
|
||||
|
@ -1,6 +1,6 @@
|
||||
use std::{
|
||||
io::{BufReader, Error, Read},
|
||||
process::{Command, Stdio},
|
||||
io::{BufRead, BufReader, Error, Read},
|
||||
process::{ChildStderr, Command, Stdio},
|
||||
sync::atomic::Ordering,
|
||||
thread,
|
||||
};
|
||||
@ -9,20 +9,68 @@ use crossbeam_channel::Sender;
|
||||
use simplelog::*;
|
||||
|
||||
use crate::filter::ingest_filter::filter_cmd;
|
||||
use crate::utils::{stderr_reader, GlobalConfig, Ingest, ProcessControl};
|
||||
use crate::utils::{format_log_line, GlobalConfig, Ingest, ProcessControl};
|
||||
use crate::vec_strings;
|
||||
|
||||
pub fn log_line(line: String, level: &str) {
|
||||
if line.contains("[info]") && level.to_lowercase() == "info" {
|
||||
info!(
|
||||
"<bright black>[Server]</> {}",
|
||||
format_log_line(line, "info")
|
||||
)
|
||||
} else if line.contains("[warning]")
|
||||
&& (level.to_lowercase() == "warning" || level.to_lowercase() == "info")
|
||||
{
|
||||
warn!(
|
||||
"<bright black>[Server]</> {}",
|
||||
format_log_line(line, "warning")
|
||||
)
|
||||
} else if line.contains("[error]")
|
||||
&& !line.contains("Input/output error")
|
||||
&& !line.contains("Broken pipe")
|
||||
{
|
||||
error!(
|
||||
"<bright black>[Server]</> {}",
|
||||
format_log_line(line, "error")
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
fn server_monitor(
|
||||
level: &str,
|
||||
buffer: BufReader<ChildStderr>,
|
||||
mut proc_ctl: ProcessControl,
|
||||
) -> Result<(), Error> {
|
||||
for line in buffer.lines() {
|
||||
let line = line?;
|
||||
|
||||
if line.contains("rtmp") && line.contains("Unexpected stream") {
|
||||
if let Err(e) = proc_ctl.kill(Ingest) {
|
||||
error!("{e}");
|
||||
};
|
||||
|
||||
warn!(
|
||||
"<bright black>[Server]</> {}",
|
||||
format_log_line(line.clone(), "error")
|
||||
);
|
||||
}
|
||||
|
||||
log_line(line, level);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// ffmpeg Ingest Server
|
||||
///
|
||||
/// Start ffmpeg in listen mode, and wait for input.
|
||||
pub fn ingest_server(
|
||||
config: GlobalConfig,
|
||||
log_format: String,
|
||||
ingest_sender: Sender<(usize, [u8; 65088])>,
|
||||
mut proc_control: ProcessControl,
|
||||
) -> Result<(), Error> {
|
||||
let mut buffer: [u8; 65088] = [0; 65088];
|
||||
let mut server_cmd = vec_strings!["-hide_banner", "-nostats", "-v", log_format];
|
||||
let mut server_cmd = vec_strings!["-hide_banner", "-nostats", "-v", "level+info"];
|
||||
let stream_input = config.ingest.input_cmd.clone().unwrap();
|
||||
|
||||
server_cmd.append(&mut stream_input.clone());
|
||||
@ -41,6 +89,8 @@ pub fn ingest_server(
|
||||
);
|
||||
|
||||
while !proc_control.is_terminated.load(Ordering::SeqCst) {
|
||||
let proc_ctl = proc_control.clone();
|
||||
let level = config.logging.ffmpeg_level.clone();
|
||||
let mut server_proc = match Command::new("ffmpeg")
|
||||
.args(server_cmd.clone())
|
||||
.stdout(Stdio::piped())
|
||||
@ -55,7 +105,8 @@ pub fn ingest_server(
|
||||
};
|
||||
let mut ingest_reader = BufReader::new(server_proc.stdout.take().unwrap());
|
||||
let server_err = BufReader::new(server_proc.stderr.take().unwrap());
|
||||
let error_reader_thread = thread::spawn(move || stderr_reader(server_err, "Server"));
|
||||
let error_reader_thread =
|
||||
thread::spawn(move || server_monitor(&level, server_err, proc_ctl));
|
||||
|
||||
*proc_control.server_term.lock().unwrap() = Some(server_proc);
|
||||
is_running = false;
|
||||
|
@ -28,17 +28,13 @@ use std::{
|
||||
use simplelog::*;
|
||||
|
||||
use crate::filter::ingest_filter::filter_cmd;
|
||||
use crate::input::source_generator;
|
||||
use crate::input::{ingest::log_line, source_generator};
|
||||
use crate::utils::{
|
||||
sec_to_time, stderr_reader, Decoder, GlobalConfig, Ingest, PlayerControl, PlayoutStatus,
|
||||
ProcessControl,
|
||||
};
|
||||
use crate::vec_strings;
|
||||
|
||||
fn format_line(line: String, level: &str) -> String {
|
||||
line.replace(&format!("[{level: >5}] "), "")
|
||||
}
|
||||
|
||||
/// Ingest Server for HLS
|
||||
fn ingest_to_hls_server(
|
||||
config: GlobalConfig,
|
||||
@ -46,6 +42,7 @@ fn ingest_to_hls_server(
|
||||
mut proc_control: ProcessControl,
|
||||
) -> Result<(), Error> {
|
||||
let playlist_init = playout_stat.list_init;
|
||||
let level = config.logging.ffmpeg_level.clone();
|
||||
|
||||
let mut server_cmd = vec_strings!["-hide_banner", "-nostats", "-v", "level+info"];
|
||||
let stream_input = config.ingest.input_cmd.clone().unwrap();
|
||||
@ -66,6 +63,7 @@ fn ingest_to_hls_server(
|
||||
);
|
||||
|
||||
loop {
|
||||
let mut proc_ctl = proc_control.clone();
|
||||
let mut server_proc = match Command::new("ffmpeg")
|
||||
.args(server_cmd.clone())
|
||||
.stderr(Stdio::piped())
|
||||
@ -73,7 +71,7 @@ fn ingest_to_hls_server(
|
||||
{
|
||||
Err(e) => {
|
||||
error!("couldn't spawn ingest server: {e}");
|
||||
panic!("couldn't spawn ingest server: {e}")
|
||||
panic!("couldn't spawn ingest server: {e}");
|
||||
}
|
||||
Ok(proc) => proc,
|
||||
};
|
||||
@ -85,6 +83,12 @@ fn ingest_to_hls_server(
|
||||
for line in server_err.lines() {
|
||||
let line = line?;
|
||||
|
||||
if line.contains("rtmp") && line.contains("Unexpected stream") {
|
||||
if let Err(e) = proc_ctl.kill(Ingest) {
|
||||
error!("{e}");
|
||||
};
|
||||
}
|
||||
|
||||
if !is_running {
|
||||
proc_control.server_is_running.store(true, Ordering::SeqCst);
|
||||
playlist_init.store(true, Ordering::SeqCst);
|
||||
@ -97,15 +101,7 @@ fn ingest_to_hls_server(
|
||||
}
|
||||
}
|
||||
|
||||
if line.contains("[error]")
|
||||
&& !line.contains("Input/output error")
|
||||
&& !line.contains("Broken pipe")
|
||||
{
|
||||
error!(
|
||||
"<bright black>[server]</> {}",
|
||||
format_line(line.clone(), "error")
|
||||
);
|
||||
}
|
||||
log_line(line, &level);
|
||||
}
|
||||
|
||||
info!("Switch from live ingest to {}", config.processing.mode);
|
||||
|
@ -66,7 +66,6 @@ pub fn player(
|
||||
|
||||
*proc_control.encoder_term.lock().unwrap() = Some(enc_proc);
|
||||
|
||||
let ff_log_format_c = ff_log_format.clone();
|
||||
let proc_control_c = proc_control.clone();
|
||||
let mut ingest_receiver = None;
|
||||
|
||||
@ -74,9 +73,7 @@ pub fn player(
|
||||
if config.ingest.enable {
|
||||
let (ingest_sender, rx) = bounded(96);
|
||||
ingest_receiver = Some(rx);
|
||||
thread::spawn(move || {
|
||||
ingest_server(config_clone, ff_log_format_c, ingest_sender, proc_control_c)
|
||||
});
|
||||
thread::spawn(move || ingest_server(config_clone, ingest_sender, proc_control_c));
|
||||
}
|
||||
|
||||
'source_iter: for node in get_source {
|
||||
|
@ -151,12 +151,6 @@ impl ProcessControl {
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for ProcessControl {
|
||||
fn drop(&mut self) {
|
||||
self.kill_all()
|
||||
}
|
||||
}
|
||||
|
||||
/// Global player control, to get infos about current clip etc.
|
||||
#[derive(Clone)]
|
||||
pub struct PlayerControl {
|
||||
|
@ -361,31 +361,26 @@ pub fn seek_and_length(src: String, seek: f64, out: f64, duration: f64) -> Vec<S
|
||||
source_cmd
|
||||
}
|
||||
|
||||
/// Read ffmpeg stderr decoder, encoder and server instance
|
||||
pub fn format_log_line(line: String, level: &str) -> String {
|
||||
line.replace(&format!("[{level: >5}] "), "")
|
||||
}
|
||||
|
||||
/// Read ffmpeg stderr decoder and encoder instance
|
||||
/// and log the output.
|
||||
pub fn stderr_reader(buffer: BufReader<ChildStderr>, suffix: &str) -> Result<(), Error> {
|
||||
fn format_line(line: String, level: &str) -> String {
|
||||
line.replace(&format!("[{level: >5}] "), "")
|
||||
}
|
||||
|
||||
for line in buffer.lines() {
|
||||
let line = line?;
|
||||
|
||||
if line.contains("[info]") {
|
||||
info!("<bright black>[{suffix}]</> {}", format_line(line, "info"))
|
||||
info!(
|
||||
"<bright black>[{suffix}]</> {}",
|
||||
format_log_line(line, "info")
|
||||
)
|
||||
} else if line.contains("[warning]") {
|
||||
warn!(
|
||||
"<bright black>[{suffix}]</> {}",
|
||||
format_line(line, "warning")
|
||||
format_log_line(line, "warning")
|
||||
)
|
||||
} else if suffix != "server"
|
||||
&& !line.contains("Input/output error")
|
||||
&& !line.contains("Broken pipe")
|
||||
{
|
||||
error!(
|
||||
"<bright black>[{suffix}]</> {}",
|
||||
format_line(line.clone(), "error")
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user