add hls mode

This commit is contained in:
jb-alvarado 2022-03-31 17:36:10 +02:00
parent b5c18ac8ea
commit 53bedd409f
7 changed files with 170 additions and 40 deletions

2
Cargo.lock generated
View File

@ -143,7 +143,7 @@ dependencies = [
[[package]]
name = "ffplayout-rs"
version = "0.7.0"
version = "0.8.0"
dependencies = [
"chrono",
"clap",

View File

@ -1,6 +1,6 @@
[package]
name = "ffplayout-rs"
version = "0.7.0"
version = "0.8.0"
edition = "2021"
[dependencies]

View File

@ -111,7 +111,7 @@ text:
out:
helptext: The final playout compression. Set the settings to your needs.
'mode' has the standard options 'desktop', 'hls', 'live_switch', 'stream'. Self made
'mode' has the standard options 'desktop', 'hls', 'stream'. Self made
outputs can be define, by adding script in output folder with an 'output' function
inside. 'preview' works only in streaming output and creates a separate preview stream.
mode: 'stream'

View File

@ -4,7 +4,7 @@ use simplelog::*;
pub mod v_drawtext;
use crate::utils::{is_close, GlobalConfig, Media};
use crate::utils::{get_delta, is_close, GlobalConfig, Media};
#[derive(Debug, Clone)]
struct Filters {
@ -285,6 +285,37 @@ fn fps_calc(r_frame_rate: String) -> f64 {
fps
}
fn realtime_filter(
node: &mut Media,
chain: &mut Filters,
config: &GlobalConfig,
codec_type: String,
) {
//this realtime filter is important for HLS output to stay in sync
let mut t = "".to_string();
if codec_type == "audio".to_string() {
t = "a".to_string()
}
if config.out.mode.to_lowercase() == "hls".to_string() {
let mut speed_filter = format!("{t}realtime=speed=1");
let (delta, _) = get_delta(&node.begin.unwrap());
let duration = node.out - node.seek;
if delta < 0.0 {
let speed = duration / (duration + delta);
if speed > 0.0 && speed < 1.1 && delta < config.general.stop_threshold {
speed_filter = format!("{t}realtime=speed={speed}");
}
}
chain.add_filter(speed_filter, codec_type);
}
}
pub fn filter_chains(node: &mut Media) -> Vec<String> {
let config = GlobalConfig::global();
@ -323,10 +354,12 @@ pub fn filter_chains(node: &mut Media) -> Vec<String> {
add_text(node, &mut filters, &config);
fade(node, &mut filters, "video".into());
overlay(node, &mut filters, &config);
realtime_filter(node, &mut filters, &config, "video".into());
add_loudnorm(node, &mut filters, &config);
fade(node, &mut filters, "audio".into());
audio_volume(&mut filters, &config);
realtime_filter(node, &mut filters, &config, "audio".into());
let mut filter_cmd = vec![];
let mut filter_str: String = "".to_string();

View File

@ -11,16 +11,14 @@ mod utils;
use simplelog::*;
use tokio::runtime::Builder;
use crate::output::play;
use crate::utils::{init_config, init_logging, validate_ffmpeg};
use crate::output::{player, write_hls};
use crate::utils::{init_config, init_logging, validate_ffmpeg, GlobalConfig};
fn main() {
init_config();
let config = GlobalConfig::global();
let runtime = Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();
let runtime = Builder::new_multi_thread().enable_all().build().unwrap();
let rt_handle = runtime.handle();
let is_terminated: Arc<Mutex<bool>> = Arc::new(Mutex::new(false));
@ -29,5 +27,9 @@ fn main() {
validate_ffmpeg();
play(rt_handle, is_terminated);
if config.out.mode.to_lowercase() == "hls".to_string() {
write_hls(rt_handle, is_terminated);
} else {
player(rt_handle, is_terminated);
}
}

96
src/output/hls.rs Normal file
View File

@ -0,0 +1,96 @@
use std::{
process::{Command, Stdio},
sync::{
Arc, Mutex,
},
};
use simplelog::*;
use tokio::runtime::Handle;
use crate::output::source_generator;
use crate::utils::{sec_to_time, stderr_reader, GlobalConfig};
/*
This module write the files compression directly to a hls (m3u8) playlist,
without pre- and post-processing.
Example config:
out:
output_param: >-
...
-flags +cgop
-f hls
-hls_time 6
-hls_list_size 600
-hls_flags append_list+delete_segments+omit_endlist+program_date_time
-hls_segment_filename /var/www/html/live/stream-%09d.ts /var/www/html/live/stream.m3u8
*/
pub fn write_hls(rt_handle: &Handle, is_terminated: Arc<Mutex<bool>>) {
let config = GlobalConfig::global();
let dec_settings = config.out.clone().output_cmd.unwrap();
let ff_log_format = format!("level+{}", config.logging.ffmpeg_level.to_lowercase());
let (get_source, _) = source_generator(
rt_handle,
config.clone(),
is_terminated.clone(),
);
for node in get_source {
let cmd = match node.cmd {
Some(cmd) => cmd,
None => break,
};
if !node.process.unwrap() {
continue;
}
info!(
"Play for <yellow>{}</>: <b><magenta>{}</></b>",
sec_to_time(node.out - node.seek),
node.source
);
let filter = node.filter.unwrap();
let mut dec_cmd = vec!["-hide_banner", "-nostats", "-v", ff_log_format.as_str()];
dec_cmd.append(&mut cmd.iter().map(String::as_str).collect());
if filter.len() > 1 {
dec_cmd.append(&mut filter.iter().map(String::as_str).collect());
}
dec_cmd.append(&mut dec_settings.iter().map(String::as_str).collect());
debug!(
"HLS writer CMD: <bright-blue>\"ffmpeg {}\"</>",
dec_cmd.join(" ")
);
let mut dec_proc = match Command::new("ffmpeg")
.args(dec_cmd)
.stderr(Stdio::piped())
.spawn()
{
Err(e) => {
error!("couldn't spawn decoder process: {}", e);
panic!("couldn't spawn decoder process: {}", e)
}
Ok(proc) => proc,
};
rt_handle.spawn(stderr_reader(
dec_proc.stderr.take().unwrap(),
"Writer".to_string(),
));
if let Err(e) = dec_proc.wait() {
error!("Writer: {e}")
};
}
}

View File

@ -17,8 +17,11 @@ use simplelog::*;
use tokio::runtime::Handle;
mod desktop;
mod hls;
mod stream;
pub use hls::write_hls;
use crate::input::{ingest_server, watch_folder, CurrentProgram, Source};
use crate::utils::{sec_to_time, stderr_reader, GlobalConfig, Media};
@ -77,17 +80,12 @@ impl Drop for ProcessCleanup {
}
}
pub fn play(rt_handle: &Handle, is_terminated: Arc<Mutex<bool>>) {
let config = GlobalConfig::global();
let dec_settings = config.processing.clone().settings.unwrap();
let ff_log_format = format!("level+{}", config.logging.ffmpeg_level.to_lowercase());
let server_term: Arc<Mutex<Option<Terminator>>> = Arc::new(Mutex::new(None));
let server_is_running: Arc<Mutex<bool>> = Arc::new(Mutex::new(false));
let mut init_playlist: Option<Arc<Mutex<bool>>> = None;
let mut live_on = false;
let mut buffer: [u8; 65088] = [0; 65088];
pub fn source_generator(
rt_handle: &Handle,
config: GlobalConfig,
is_terminated: Arc<Mutex<bool>>,
) -> (Box<dyn Iterator<Item = Media>>, Arc<Mutex<bool>>) {
let mut init_playlist: Arc<Mutex<bool>> = Arc::new(Mutex::new(false));
let get_source = match config.processing.clone().mode.as_str() {
"folder" => {
@ -116,7 +114,8 @@ pub fn play(rt_handle: &Handle, is_terminated: Arc<Mutex<bool>>) {
"playlist" => {
info!("Playout in playlist mode");
let program = CurrentProgram::new(rt_handle.clone(), is_terminated.clone());
init_playlist = Some(program.init.clone());
init_playlist = program.init.clone();
Box::new(program) as Box<dyn Iterator<Item = Media>>
}
_ => {
@ -125,6 +124,21 @@ pub fn play(rt_handle: &Handle, is_terminated: Arc<Mutex<bool>>) {
}
};
(get_source, init_playlist)
}
pub fn player(rt_handle: &Handle, is_terminated: Arc<Mutex<bool>>) {
let config = GlobalConfig::global();
let dec_settings = config.processing.clone().settings.unwrap();
let ff_log_format = format!("level+{}", config.logging.ffmpeg_level.to_lowercase());
let server_term: Arc<Mutex<Option<Terminator>>> = Arc::new(Mutex::new(None));
let server_is_running: Arc<Mutex<bool>> = Arc::new(Mutex::new(false));
let mut buffer: [u8; 65088] = [0; 65088];
let mut live_on = false;
let (get_source, init_playlist) =
source_generator(rt_handle, config.clone(), is_terminated.clone());
let mut enc_proc = match config.out.mode.as_str() {
"desktop" => desktop::output(ff_log_format.clone()),
"stream" => stream::output(ff_log_format.clone()),
@ -173,7 +187,6 @@ pub fn play(rt_handle: &Handle, is_terminated: Arc<Mutex<bool>>) {
node.source
);
let mut kill_dec = true;
let filter = node.filter.unwrap();
let mut dec_cmd = vec!["-hide_banner", "-nostats", "-v", ff_log_format.as_str()];
dec_cmd.append(&mut cmd.iter().map(String::as_str).collect());
@ -211,7 +224,7 @@ pub fn play(rt_handle: &Handle, is_terminated: Arc<Mutex<bool>>) {
loop {
if *server_is_running.lock().unwrap() {
if kill_dec {
if !live_on {
info!("Switch from {} to live ingest", config.processing.mode);
if let Err(e) = enc_writer.flush() {
@ -226,12 +239,9 @@ pub fn play(rt_handle: &Handle, is_terminated: Arc<Mutex<bool>>) {
error!("Decoder error: {e}")
};
kill_dec = false;
live_on = true;
if let Some(init) = &init_playlist {
*init.lock().unwrap() = true;
}
*init_playlist.lock().unwrap() = true;
}
if let Ok(receive) = ingest_receiver.try_recv() {
@ -249,7 +259,6 @@ pub fn play(rt_handle: &Handle, is_terminated: Arc<Mutex<bool>>) {
error!("Encoder error: {e}")
}
kill_dec = true;
live_on = false;
}
@ -279,16 +288,6 @@ pub fn play(rt_handle: &Handle, is_terminated: Arc<Mutex<bool>>) {
};
}
*is_terminated.lock().unwrap() = true;
if let Some(server) = &*server_term.lock().unwrap() {
unsafe {
if let Ok(_) = server.terminate() {
info!("Terminate ingest server done");
}
}
};
sleep(Duration::from_secs(1));
proc_cleanup.kill();