fix combination of folder mode and HLS output

- fix #180
- exclude files from hls output path
This commit is contained in:
jb-alvarado 2022-08-26 11:29:02 +02:00
parent 6dfddb84a8
commit 0a9441fd9b
10 changed files with 125 additions and 38 deletions

4
Cargo.lock generated
View File

@ -942,7 +942,7 @@ dependencies = [
[[package]] [[package]]
name = "ffplayout" name = "ffplayout"
version = "0.14.2" version = "0.14.3"
dependencies = [ dependencies = [
"chrono", "chrono",
"clap", "clap",
@ -991,7 +991,7 @@ dependencies = [
[[package]] [[package]]
name = "ffplayout-lib" name = "ffplayout-lib"
version = "0.14.1" version = "0.14.3"
dependencies = [ dependencies = [
"chrono", "chrono",
"crossbeam-channel", "crossbeam-channel",

View File

@ -73,7 +73,7 @@ ingest:
until is done. There is only a very simple authentication mechanism, which check if the until is done. There is only a very simple authentication mechanism, which check if the
stream name is correct. stream name is correct.
enable: false enable: false
input_param: -f live_flv -listen 1 -i rtmp://localhost:1936/live/stream input_param: -f live_flv -listen 1 -i rtmp://127.0.0.1:1936/live/stream
playlist: playlist:
help_text: > help_text: >

View File

@ -4,7 +4,7 @@ description = "24/7 playout based on rust and ffmpeg"
license = "GPL-3.0" license = "GPL-3.0"
authors = ["Jonathan Baecker jonbae77@gmail.com"] authors = ["Jonathan Baecker jonbae77@gmail.com"]
readme = "README.md" readme = "README.md"
version = "0.14.2" version = "0.14.3"
edition = "2021" edition = "2021"
[dependencies] [dependencies]

View File

@ -15,7 +15,7 @@ use notify::{
}; };
use simplelog::*; use simplelog::*;
use ffplayout_lib::utils::{Media, PlayoutConfig}; use ffplayout_lib::utils::{include_file, Media, PlayoutConfig};
/// Create a watcher, which monitor file changes. /// Create a watcher, which monitor file changes.
/// When a change is register, update the current file list. /// When a change is register, update the current file list.
@ -27,7 +27,7 @@ pub fn watchman(
) { ) {
let (tx, rx) = channel(); let (tx, rx) = channel();
let path = config.storage.path; let path = config.storage.path.clone();
if !Path::new(&path).exists() { if !Path::new(&path).exists() {
error!("Folder path not exists: '{path}'"); error!("Folder path not exists: '{path}'");
@ -44,16 +44,20 @@ pub fn watchman(
let index = sources.lock().unwrap().len(); let index = sources.lock().unwrap().len();
let media = Media::new(index, new_path.display().to_string(), false); let media = Media::new(index, new_path.display().to_string(), false);
if include_file(config.clone(), &new_path) {
sources.lock().unwrap().push(media); sources.lock().unwrap().push(media);
info!("Create new file: <b><magenta>{new_path:?}</></b>"); info!("Create new file: <b><magenta>{new_path:?}</></b>");
} }
}
Remove(old_path) => { Remove(old_path) => {
if include_file(config.clone(), &old_path) {
sources sources
.lock() .lock()
.unwrap() .unwrap()
.retain(|x| x.source != old_path.display().to_string()); .retain(|x| x.source != old_path.display().to_string());
info!("Remove file: <b><magenta>{old_path:?}</></b>"); info!("Remove file: <b><magenta>{old_path:?}</></b>");
} }
}
Rename(old_path, new_path) => { Rename(old_path, new_path) => {
let index = sources let index = sources
.lock() .lock()

View File

@ -1,6 +1,6 @@
use std::{ use std::{
io::{BufRead, BufReader, Error, Read}, io::{BufRead, BufReader, Error, Read},
process::{ChildStderr, Command, Stdio}, process::{exit, ChildStderr, Command, Stdio},
sync::atomic::Ordering, sync::atomic::Ordering,
sync::{Arc, Mutex}, sync::{Arc, Mutex},
thread, thread,
@ -10,7 +10,7 @@ use crossbeam_channel::Sender;
use simplelog::*; use simplelog::*;
use ffplayout_lib::filter::ingest_filter::filter_cmd; use ffplayout_lib::filter::ingest_filter::filter_cmd;
use ffplayout_lib::utils::{format_log_line, Ingest, PlayoutConfig, ProcessControl}; use ffplayout_lib::utils::{format_log_line, test_tcp_port, Ingest, PlayoutConfig, ProcessControl};
use ffplayout_lib::vec_strings; use ffplayout_lib::vec_strings;
pub fn log_line(line: String, level: &str) { pub fn log_line(line: String, level: &str) {
@ -90,6 +90,11 @@ pub fn ingest_server(
let mut is_running; let mut is_running;
if let Some(url) = stream_input.iter().find(|s| s.contains("://")) { if let Some(url) = stream_input.iter().find(|s| s.contains("://")) {
if !test_tcp_port(url) {
proc_control.kill_all();
exit(1);
}
info!("Start ingest server, listening on: <b><magenta>{url}</></b>",); info!("Start ingest server, listening on: <b><magenta>{url}</></b>",);
}; };

View File

@ -19,7 +19,7 @@ out:
use std::{ use std::{
io::{BufRead, BufReader, Error}, io::{BufRead, BufReader, Error},
process::{Command, Stdio}, process::{exit, Command, Stdio},
sync::atomic::Ordering, sync::atomic::Ordering,
thread::{self, sleep}, thread::{self, sleep},
time::Duration, time::Duration,
@ -30,8 +30,8 @@ use simplelog::*;
use crate::input::{ingest::log_line, source_generator}; use crate::input::{ingest::log_line, source_generator};
use ffplayout_lib::filter::ingest_filter::filter_cmd; use ffplayout_lib::filter::ingest_filter::filter_cmd;
use ffplayout_lib::utils::{ use ffplayout_lib::utils::{
prepare_output_cmd, sec_to_time, stderr_reader, Decoder, Ingest, PlayerControl, PlayoutConfig, prepare_output_cmd, sec_to_time, stderr_reader, test_tcp_port, Decoder, Ingest, PlayerControl,
PlayoutStatus, ProcessControl, PlayoutConfig, PlayoutStatus, ProcessControl,
}; };
use ffplayout_lib::vec_strings; use ffplayout_lib::vec_strings;
@ -45,8 +45,8 @@ fn ingest_to_hls_server(
let level = config.logging.ffmpeg_level.clone(); let level = config.logging.ffmpeg_level.clone();
let mut server_prefix = vec_strings!["-hide_banner", "-nostats", "-v", "level+info"]; let mut server_prefix = vec_strings!["-hide_banner", "-nostats", "-v", "level+info"];
let mut stream_input = config.ingest.input_cmd.clone().unwrap(); let stream_input = config.ingest.input_cmd.clone().unwrap();
server_prefix.append(&mut stream_input); server_prefix.append(&mut stream_input.clone());
let server_filter = filter_cmd(&config, &playout_stat.chain); let server_filter = filter_cmd(&config, &playout_stat.chain);
if server_filter.len() > 1 { if server_filter.len() > 1 {
@ -73,6 +73,11 @@ fn ingest_to_hls_server(
let mut is_running; let mut is_running;
if let Some(url) = stream_input.iter().find(|s| s.contains("://")) { if let Some(url) = stream_input.iter().find(|s| s.contains("://")) {
if !test_tcp_port(url) {
proc_control.kill_all();
exit(1);
}
info!("Start ingest server, listening on: <b><magenta>{url}</></b>"); info!("Start ingest server, listening on: <b><magenta>{url}</></b>");
}; };

View File

@ -4,7 +4,7 @@ description = "Library for ffplayout"
license = "GPL-3.0" license = "GPL-3.0"
authors = ["Jonathan Baecker jonbae77@gmail.com"] authors = ["Jonathan Baecker jonbae77@gmail.com"]
readme = "README.md" readme = "README.md"
version = "0.14.1" version = "0.14.3"
edition = "2021" edition = "2021"
[dependencies] [dependencies]

View File

@ -305,7 +305,9 @@ fn realtime_filter(
} }
let mut speed_filter = format!("{t}realtime=speed=1"); let mut speed_filter = format!("{t}realtime=speed=1");
let (delta, _) = get_delta(config, &node.begin.unwrap());
if let Some(begin) = &node.begin {
let (delta, _) = get_delta(config, begin);
if delta < 0.0 && node.seek == 0.0 { if delta < 0.0 && node.seek == 0.0 {
let duration = node.out - node.seek; let duration = node.out - node.seek;
@ -315,6 +317,7 @@ fn realtime_filter(
speed_filter = format!("{t}realtime=speed={speed}"); speed_filter = format!("{t}realtime=speed={speed}");
} }
} }
}
chain.add_filter(&speed_filter, codec_type); chain.add_filter(&speed_filter, codec_type);
} }

View File

@ -11,7 +11,7 @@ use rand::{seq::SliceRandom, thread_rng};
use simplelog::*; use simplelog::*;
use walkdir::WalkDir; use walkdir::WalkDir;
use crate::utils::{file_extension, get_sec, Media, PlayoutConfig}; use crate::utils::{get_sec, include_file, Media, PlayoutConfig};
/// Folder Sources /// Folder Sources
/// ///
@ -48,17 +48,19 @@ impl FolderSource {
.flat_map(|e| e.ok()) .flat_map(|e| e.ok())
.filter(|f| f.path().is_file()) .filter(|f| f.path().is_file())
{ {
if let Some(ext) = file_extension(entry.path()) { if include_file(config.clone(), entry.path()) {
if config
.storage
.extensions
.clone()
.contains(&ext.to_lowercase())
{
let media = Media::new(0, entry.path().display().to_string(), false); let media = Media::new(0, entry.path().display().to_string(), false);
media_list.push(media); media_list.push(media);
} }
} }
if media_list.is_empty() {
error!(
"no playable files found under: <b><magenta>{}</></b>",
config.storage.path
);
exit(1);
} }
if config.storage.shuffle { if config.storage.shuffle {

View File

@ -600,6 +600,52 @@ pub fn valid_source(source: &str) -> bool {
Path::new(&source).is_file() Path::new(&source).is_file()
} }
/// Check if file can include or has to exclude.
/// For example when a file is on given HLS output path, it should exclude.
/// Or when the file extension is set under storage config it can be include.
pub fn include_file(config: PlayoutConfig, file_path: &Path) -> bool {
let mut include = false;
if let Some(ext) = file_extension(file_path) {
if config.storage.extensions.contains(&ext.to_lowercase()) {
include = true;
}
}
if config.out.mode.to_lowercase() == "hls" {
if let Some(ts_path) = config
.out
.output_cmd
.clone()
.unwrap()
.iter()
.find(|s| s.contains(".ts"))
{
if let Some(p) = Path::new(ts_path).parent() {
if file_path.starts_with(p) {
include = false;
}
}
}
if let Some(m3u8_path) = config
.out
.output_cmd
.unwrap()
.iter()
.find(|s| s.contains(".m3u8") && !s.contains("master.m3u8"))
{
if let Some(p) = Path::new(m3u8_path).parent() {
if file_path.starts_with(p) {
include = false;
}
}
}
}
include
}
pub fn format_log_line(line: String, level: &str) -> String { pub fn format_log_line(line: String, level: &str) -> String {
line.replace(&format!("[{level: >5}] "), "") line.replace(&format!("[{level: >5}] "), "")
} }
@ -736,6 +782,28 @@ pub fn free_tcp_socket(exclude_socket: String) -> Option<String> {
None None
} }
/// check if tcp port is free
pub fn test_tcp_port(url: &str) -> bool {
let raw_addr = url.split('/').collect::<Vec<&str>>();
if raw_addr.len() > 1 {
if let Some(socket) = raw_addr[2].split_once(':') {
if TcpListener::bind((
socket.0,
socket.1.to_string().parse::<u16>().unwrap_or_default(),
))
.is_ok()
{
return true;
}
};
}
error!("Address <b><magenta>{url}</></b> already in use!");
false
}
pub fn home_dir() -> Option<PathBuf> { pub fn home_dir() -> Option<PathBuf> {
home_dir_inner() home_dir_inner()
} }