Merge pull request #7 from jb-alvarado/main

v0.8.0
This commit is contained in:
jb-alvarado 2022-03-31 21:43:47 +02:00 committed by GitHub
commit 83f1b83139
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 220 additions and 108 deletions

3
.gitignore vendored
View File

@ -13,4 +13,7 @@
*.log
/logs/
*.zip
*tar.gz
.vscode/

2
Cargo.lock generated
View File

@ -143,7 +143,7 @@ dependencies = [
[[package]]
name = "ffplayout-rs"
version = "0.7.0"
version = "0.8.0"
dependencies = [
"chrono",
"clap",

View File

@ -1,6 +1,6 @@
[package]
name = "ffplayout-rs"
version = "0.7.0"
version = "0.8.0"
edition = "2021"
[dependencies]

View File

@ -111,7 +111,7 @@ text:
out:
helptext: The final playout compression. Set the settings to your needs.
'mode' has the standard options 'desktop', 'hls', 'live_switch', 'stream'. Self made
'mode' has the standard options 'desktop', 'hls', 'stream'. Self made
outputs can be define, by adding script in output folder with an 'output' function
inside. 'preview' works only in streaming output and creates a separate preview stream.
mode: 'stream'

41
cross_compile_all.sh Executable file
View File

@ -0,0 +1,41 @@
#!/usr/bin/bash
targets=("x86_64-unknown-linux-musl" "x86_64-pc-windows-gnu" "x86_64-apple-darwin" "aarch64-apple-darwin")
IFS="= "
while read -r name value; do
if [[ $name == "version" ]]; then
version=${value//\"/}
fi
done < Cargo.toml
echo "Compile ffplayout-rs version is: \"$version\""
echo ""
for target in "${targets[@]}"; do
echo "compile static for $target"
echo ""
cargo build --release --target=$target
if [[ $target == "x86_64-pc-windows-gnu" ]]; then
if [[ -f "ffplayout-rs-v${version}_${target}.zip" ]]; then
rm -f "ffplayout-rs-v${version}_${target}.zip"
fi
cp ./target/${target}/release/ffplayout.exe .
zip -r "ffplayout-rs-v${version}_${target}.zip" assets docs LICENSE README.md ffplayout.exe
rm -f ffplayout.exe
else
if [[ -f "ffplayout-rs-v${version}_${target}.tar.gz" ]]; then
rm -f "ffplayout-rs-v${version}_${target}.tar.gz"
fi
cp ./target/${target}/release/ffplayout .
tar -czvf "ffplayout-rs-v${version}_${target}.tar.gz" assets docs LICENSE README.md ffplayout
rm -f ffplayout
fi
echo ""
done

View File

@ -1,59 +0,0 @@
use chrono::prelude::*;
use std::{
thread::sleep};
fn get_timestamp() -> i64 {
let local: DateTime<Local> = Local::now();
local.timestamp_millis() as i64
}
struct Timer {
init: bool,
timestamp: i64,
limit: i64,
messages: Vec<String>,
}
impl Timer {
fn new() -> Self {
Self {
init: true,
timestamp: get_timestamp(),
limit: 10 * 1000,
messages: vec![],
}
}
fn reset(&mut self) {
self.messages.clear();
self.timestamp = get_timestamp();
}
fn send(&mut self, msg: String) {
let now = get_timestamp();
self.messages.push(msg);
if self.init {
self.reset();
self.init = false;
}
if now >= self.timestamp + self.limit {
println!("Send messages: {:?}", self.messages);
self.reset();
}
}
}
fn main() {
let mut timer = Timer::new();
for i in 0..40 {
println!("{:?}", i);
timer.send(format!("{:?}", i));
sleep(std::time::Duration::from_secs(1));
}
}

View File

@ -4,7 +4,7 @@ use simplelog::*;
pub mod v_drawtext;
use crate::utils::{is_close, GlobalConfig, Media};
use crate::utils::{get_delta, is_close, GlobalConfig, Media};
#[derive(Debug, Clone)]
struct Filters {
@ -285,6 +285,37 @@ fn fps_calc(r_frame_rate: String) -> f64 {
fps
}
fn realtime_filter(
node: &mut Media,
chain: &mut Filters,
config: &GlobalConfig,
codec_type: String,
) {
//this realtime filter is important for HLS output to stay in sync
let mut t = "".to_string();
if codec_type == "audio".to_string() {
t = "a".to_string()
}
if config.out.mode.to_lowercase() == "hls".to_string() {
let mut speed_filter = format!("{t}realtime=speed=1");
let (delta, _) = get_delta(&node.begin.unwrap());
let duration = node.out - node.seek;
if delta < 0.0 {
let speed = duration / (duration + delta);
if speed > 0.0 && speed < 1.1 && delta < config.general.stop_threshold {
speed_filter = format!("{t}realtime=speed={speed}");
}
}
chain.add_filter(speed_filter, codec_type);
}
}
pub fn filter_chains(node: &mut Media) -> Vec<String> {
let config = GlobalConfig::global();
@ -323,10 +354,12 @@ pub fn filter_chains(node: &mut Media) -> Vec<String> {
add_text(node, &mut filters, &config);
fade(node, &mut filters, "video".into());
overlay(node, &mut filters, &config);
realtime_filter(node, &mut filters, &config, "video".into());
add_loudnorm(node, &mut filters, &config);
fade(node, &mut filters, "audio".into());
audio_volume(&mut filters, &config);
realtime_filter(node, &mut filters, &config, "audio".into());
let mut filter_cmd = vec![];
let mut filter_str: String = "".to_string();

View File

@ -32,7 +32,7 @@ fn overlay(config: &GlobalConfig) -> String {
}
fn audio_filter(config: &GlobalConfig) -> String {
let mut audio_chain = ";[0:a]anull".to_string();
let mut audio_chain = ";[0:a]afade=in:st=0:d=0.5".to_string();
if config.processing.add_loudnorm {
audio_chain.push_str(
@ -64,7 +64,7 @@ pub async fn ingest_server(
let config = GlobalConfig::global();
let mut buffer: [u8; 65088] = [0; 65088];
let mut filter = format!(
"[0:v]fps={},scale={}:{},setdar=dar={}",
"[0:v]fps={},scale={}:{},setdar=dar={},fade=in:st=0:d=0.5",
config.processing.fps,
config.processing.width,
config.processing.height,

View File

@ -7,7 +7,7 @@ use simplelog::*;
use tokio::runtime::Handle;
use crate::utils::{
check_sync, gen_dummy, get_date, get_delta, get_sec, is_close, json_reader::read_json,
check_sync, gen_dummy, get_delta, get_sec, is_close, json_reader::read_json,
modified_time, seek_and_length, GlobalConfig, Media, DUMMY_LEN,
};
@ -114,12 +114,10 @@ impl CurrentProgram {
}
let next_start = self.current_node.begin.unwrap() - start_sec + duration + delta;
let date = get_date(false, start_sec, next_start);
if (next_start >= target_length
if next_start >= target_length
|| is_close(total_delta, 0.0, 2.0)
|| is_close(total_delta, target_length, 2.0))
&& date != self.json_date
|| is_close(total_delta, target_length, 2.0)
{
let json = read_json(
None,
@ -243,6 +241,7 @@ impl Iterator for CurrentProgram {
}
if self.index < self.nodes.len() {
self.check_for_next_playlist();
let mut is_last = false;
if self.index == self.nodes.len() - 1 {
@ -257,10 +256,8 @@ impl Iterator for CurrentProgram {
// update playlist should happen after current clip,
// to prevent unknown behaviors.
self.check_update(false);
self.check_for_next_playlist();
Some(self.current_node.clone())
} else {
println!("last: {:?}", self.json_path);
let last_playlist = self.json_path.clone();
self.check_for_next_playlist();
let (_, total_delta) = get_delta(&self.config.playlist.start_sec.unwrap());

View File

@ -11,16 +11,14 @@ mod utils;
use simplelog::*;
use tokio::runtime::Builder;
use crate::output::play;
use crate::utils::{init_config, init_logging, validate_ffmpeg};
use crate::output::{player, write_hls};
use crate::utils::{init_config, init_logging, validate_ffmpeg, GlobalConfig};
fn main() {
init_config();
let config = GlobalConfig::global();
let runtime = Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();
let runtime = Builder::new_multi_thread().enable_all().build().unwrap();
let rt_handle = runtime.handle();
let is_terminated: Arc<Mutex<bool>> = Arc::new(Mutex::new(false));
@ -29,5 +27,9 @@ fn main() {
validate_ffmpeg();
play(rt_handle, is_terminated);
if config.out.mode.to_lowercase() == "hls".to_string() {
write_hls(rt_handle, is_terminated);
} else {
player(rt_handle, is_terminated);
}
}

96
src/output/hls.rs Normal file
View File

@ -0,0 +1,96 @@
use std::{
process::{Command, Stdio},
sync::{
Arc, Mutex,
},
};
use simplelog::*;
use tokio::runtime::Handle;
use crate::output::source_generator;
use crate::utils::{sec_to_time, stderr_reader, GlobalConfig};
/*
This module write the files compression directly to a hls (m3u8) playlist,
without pre- and post-processing.
Example config:
out:
output_param: >-
...
-flags +cgop
-f hls
-hls_time 6
-hls_list_size 600
-hls_flags append_list+delete_segments+omit_endlist+program_date_time
-hls_segment_filename /var/www/html/live/stream-%09d.ts /var/www/html/live/stream.m3u8
*/
pub fn write_hls(rt_handle: &Handle, is_terminated: Arc<Mutex<bool>>) {
let config = GlobalConfig::global();
let dec_settings = config.out.clone().output_cmd.unwrap();
let ff_log_format = format!("level+{}", config.logging.ffmpeg_level.to_lowercase());
let (get_source, _) = source_generator(
rt_handle,
config.clone(),
is_terminated.clone(),
);
for node in get_source {
let cmd = match node.cmd {
Some(cmd) => cmd,
None => break,
};
if !node.process.unwrap() {
continue;
}
info!(
"Play for <yellow>{}</>: <b><magenta>{}</></b>",
sec_to_time(node.out - node.seek),
node.source
);
let filter = node.filter.unwrap();
let mut dec_cmd = vec!["-hide_banner", "-nostats", "-v", ff_log_format.as_str()];
dec_cmd.append(&mut cmd.iter().map(String::as_str).collect());
if filter.len() > 1 {
dec_cmd.append(&mut filter.iter().map(String::as_str).collect());
}
dec_cmd.append(&mut dec_settings.iter().map(String::as_str).collect());
debug!(
"HLS writer CMD: <bright-blue>\"ffmpeg {}\"</>",
dec_cmd.join(" ")
);
let mut dec_proc = match Command::new("ffmpeg")
.args(dec_cmd)
.stderr(Stdio::piped())
.spawn()
{
Err(e) => {
error!("couldn't spawn decoder process: {}", e);
panic!("couldn't spawn decoder process: {}", e)
}
Ok(proc) => proc,
};
rt_handle.spawn(stderr_reader(
dec_proc.stderr.take().unwrap(),
"Writer".to_string(),
));
if let Err(e) = dec_proc.wait() {
error!("Writer: {e}")
};
}
}

View File

@ -17,8 +17,11 @@ use simplelog::*;
use tokio::runtime::Handle;
mod desktop;
mod hls;
mod stream;
pub use hls::write_hls;
use crate::input::{ingest_server, watch_folder, CurrentProgram, Source};
use crate::utils::{sec_to_time, stderr_reader, GlobalConfig, Media};
@ -77,17 +80,12 @@ impl Drop for ProcessCleanup {
}
}
pub fn play(rt_handle: &Handle, is_terminated: Arc<Mutex<bool>>) {
let config = GlobalConfig::global();
let dec_settings = config.processing.clone().settings.unwrap();
let ff_log_format = format!("level+{}", config.logging.ffmpeg_level.to_lowercase());
let server_term: Arc<Mutex<Option<Terminator>>> = Arc::new(Mutex::new(None));
let server_is_running: Arc<Mutex<bool>> = Arc::new(Mutex::new(false));
let mut init_playlist: Option<Arc<Mutex<bool>>> = None;
let mut live_on = false;
let mut buffer: [u8; 65088] = [0; 65088];
pub fn source_generator(
rt_handle: &Handle,
config: GlobalConfig,
is_terminated: Arc<Mutex<bool>>,
) -> (Box<dyn Iterator<Item = Media>>, Arc<Mutex<bool>>) {
let mut init_playlist: Arc<Mutex<bool>> = Arc::new(Mutex::new(false));
let get_source = match config.processing.clone().mode.as_str() {
"folder" => {
@ -116,7 +114,8 @@ pub fn play(rt_handle: &Handle, is_terminated: Arc<Mutex<bool>>) {
"playlist" => {
info!("Playout in playlist mode");
let program = CurrentProgram::new(rt_handle.clone(), is_terminated.clone());
init_playlist = Some(program.init.clone());
init_playlist = program.init.clone();
Box::new(program) as Box<dyn Iterator<Item = Media>>
}
_ => {
@ -125,6 +124,21 @@ pub fn play(rt_handle: &Handle, is_terminated: Arc<Mutex<bool>>) {
}
};
(get_source, init_playlist)
}
pub fn player(rt_handle: &Handle, is_terminated: Arc<Mutex<bool>>) {
let config = GlobalConfig::global();
let dec_settings = config.processing.clone().settings.unwrap();
let ff_log_format = format!("level+{}", config.logging.ffmpeg_level.to_lowercase());
let server_term: Arc<Mutex<Option<Terminator>>> = Arc::new(Mutex::new(None));
let server_is_running: Arc<Mutex<bool>> = Arc::new(Mutex::new(false));
let mut buffer: [u8; 65088] = [0; 65088];
let mut live_on = false;
let (get_source, init_playlist) =
source_generator(rt_handle, config.clone(), is_terminated.clone());
let mut enc_proc = match config.out.mode.as_str() {
"desktop" => desktop::output(ff_log_format.clone()),
"stream" => stream::output(ff_log_format.clone()),
@ -173,7 +187,6 @@ pub fn play(rt_handle: &Handle, is_terminated: Arc<Mutex<bool>>) {
node.source
);
let mut kill_dec = true;
let filter = node.filter.unwrap();
let mut dec_cmd = vec!["-hide_banner", "-nostats", "-v", ff_log_format.as_str()];
dec_cmd.append(&mut cmd.iter().map(String::as_str).collect());
@ -211,7 +224,7 @@ pub fn play(rt_handle: &Handle, is_terminated: Arc<Mutex<bool>>) {
loop {
if *server_is_running.lock().unwrap() {
if kill_dec {
if !live_on {
info!("Switch from {} to live ingest", config.processing.mode);
if let Err(e) = enc_writer.flush() {
@ -226,12 +239,9 @@ pub fn play(rt_handle: &Handle, is_terminated: Arc<Mutex<bool>>) {
error!("Decoder error: {e}")
};
kill_dec = false;
live_on = true;
if let Some(init) = &init_playlist {
*init.lock().unwrap() = true;
}
*init_playlist.lock().unwrap() = true;
}
if let Ok(receive) = ingest_receiver.try_recv() {
@ -249,7 +259,6 @@ pub fn play(rt_handle: &Handle, is_terminated: Arc<Mutex<bool>>) {
error!("Encoder error: {e}")
}
kill_dec = true;
live_on = false;
}
@ -279,16 +288,6 @@ pub fn play(rt_handle: &Handle, is_terminated: Arc<Mutex<bool>>) {
};
}
*is_terminated.lock().unwrap() = true;
if let Some(server) = &*server_term.lock().unwrap() {
unsafe {
if let Ok(_) = server.terminate() {
info!("Terminate ingest server done");
}
}
};
sleep(Duration::from_secs(1));
proc_cleanup.kill();