work on player

This commit is contained in:
jb-alvarado 2024-06-06 22:18:59 +02:00
parent 3c2801acab
commit 3671daf7c7
23 changed files with 4402 additions and 113 deletions

6
Cargo.lock generated
View File

@ -1314,18 +1314,24 @@ dependencies = [
"argon2", "argon2",
"chrono", "chrono",
"clap", "clap",
"crossbeam-channel",
"derive_more", "derive_more",
"faccess", "faccess",
"ffplayout-lib", "ffplayout-lib",
"ffprobe",
"flexi_logger", "flexi_logger",
"futures-util", "futures-util",
"home", "home",
"itertools",
"jsonwebtoken", "jsonwebtoken",
"lazy_static", "lazy_static",
"lettre", "lettre",
"lexical-sort", "lexical-sort",
"local-ip-address", "local-ip-address",
"log", "log",
"notify",
"notify-debouncer-full",
"num-traits",
"once_cell", "once_cell",
"paris", "paris",
"parking_lot", "parking_lot",

View File

@ -24,17 +24,23 @@ actix-web-static-files = "4.0"
argon2 = "0.5" argon2 = "0.5"
chrono = { version = "0.4", default-features = false, features = ["clock", "std"] } chrono = { version = "0.4", default-features = false, features = ["clock", "std"] }
clap = { version = "4.3", features = ["derive"] } clap = { version = "4.3", features = ["derive"] }
crossbeam-channel = "0.5"
derive_more = "0.99" derive_more = "0.99"
faccess = "0.2" faccess = "0.2"
ffprobe = "0.4"
flexi_logger = { version = "0.28", features = ["kv", "colors"] } flexi_logger = { version = "0.28", features = ["kv", "colors"] }
futures-util = { version = "0.3", default-features = false, features = ["std"] } futures-util = { version = "0.3", default-features = false, features = ["std"] }
home = "0.5" home = "0.5"
itertools = "0.12"
jsonwebtoken = "9" jsonwebtoken = "9"
lazy_static = "1.4" lazy_static = "1.4"
lettre = { version = "0.11", features = ["builder", "rustls-tls", "smtp-transport", "tokio1", "tokio1-rustls-tls"], default-features = false } lettre = { version = "0.11", features = ["builder", "rustls-tls", "smtp-transport", "tokio1", "tokio1-rustls-tls"], default-features = false }
lexical-sort = "0.3" lexical-sort = "0.3"
local-ip-address = "0.6" local-ip-address = "0.6"
log = { version = "0.4", features = ["std", "serde", "kv", "kv_std", "kv_sval", "kv_serde"] } log = { version = "0.4", features = ["std", "serde", "kv", "kv_std", "kv_sval", "kv_serde"] }
notify = "6.0"
notify-debouncer-full = { version = "*", default-features = false }
num-traits = "0.2"
once_cell = "1.18" once_cell = "1.18"
paris = "1.5" paris = "1.5"
parking_lot = "0.12" parking_lot = "0.12"

View File

@ -23,7 +23,7 @@ use path_clean::PathClean;
use ffplayout::{ use ffplayout::{
api::{auth, routes::*}, api::{auth, routes::*},
db::{db_pool, handles, models::LoginUser}, db::{db_pool, handles, models::LoginUser},
player::controller::ChannelController, player::controller::{self, ChannelController, ChannelManager},
sse::{broadcast::Broadcaster, routes::*, AuthState}, sse::{broadcast::Broadcaster, routes::*, AuthState},
utils::{ utils::{
config::PlayoutConfig, config::PlayoutConfig,
@ -69,7 +69,7 @@ async fn main() -> std::io::Result<()> {
.await .await
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
let _channel_controller = ChannelController::new(); let channel_controller = Arc::new(Mutex::new(ChannelController::new()));
let channels = handles::select_all_channels(&pool) let channels = handles::select_all_channels(&pool)
.await .await
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
@ -80,9 +80,6 @@ async fn main() -> std::io::Result<()> {
init_logging(mail_queues.clone())?; init_logging(mail_queues.clone())?;
for channel in channels.iter() { for channel in channels.iter() {
println!("channel: {channel:?}");
let _channel_clone = channel.clone();
let config_path = PathBuf::from(&channel.config_path); let config_path = PathBuf::from(&channel.config_path);
let config = match web::block(move || PlayoutConfig::new(Some(config_path), None)).await { let config = match web::block(move || PlayoutConfig::new(Some(config_path), None)).await {
Ok(config) => config, Ok(config) => config,
@ -92,26 +89,27 @@ async fn main() -> std::io::Result<()> {
} }
}; };
let channel_manager = Arc::new(Mutex::new(ChannelManager::new(
channel.clone(),
config.clone(),
)));
channel_controller
.lock()
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?
.add(channel_manager);
let control_clone = channel_controller.clone();
let m_queue = Arc::new(Mutex::new(MailQueue::new(channel.id, config.mail))); let m_queue = Arc::new(Mutex::new(MailQueue::new(channel.id, config.mail)));
if let Ok(mut mqs) = mail_queues.lock() { if let Ok(mut mqs) = mail_queues.lock() {
mqs.push(m_queue.clone()); mqs.push(m_queue.clone());
} }
warn!("This logs to console");
if channel.active { if channel.active {
info!(target: Target::file(), channel = channel.id; "Start Playout");
thread::spawn(move || { thread::spawn(move || {
info!(target: Target::file(), channel = 1; "Start Playout"); controller::start(control_clone);
thread::sleep(std::time::Duration::from_secs(1));
error!(target: Target::file_mail(), channel = 1; "This logs to File and Mail, channel 1");
error!(target: Target::file_mail(), channel = 2; "This logs to File and Mail, channel 2");
error!(target: Target::file_mail(), channel = 1; "This logs to File and Mail, channel 1");
error!(target: Target::file_mail(), channel = 3; "This logs to File and Mail, channel 3");
error!(target: Target::file_mail(), channel = 1; "This logs to File and Mail, channel 1");
error!(target: Target::file_mail(), channel = 1; "This logs to File and Mail, channel 1");
}); });
} }
} }

View File

@ -1,17 +1,21 @@
use std::{ use std::{
fmt, fmt,
process::Child, process::Child,
sync::{atomic::AtomicBool, Arc, Mutex}, sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex,
},
}; };
#[cfg(not(windows))] #[cfg(not(windows))]
use signal_child::Signalable; use signal_child::Signalable;
use log::*;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
// use simplelog::*;
use crate::db::models::Channel; use crate::db::models::Channel;
use crate::utils::errors::ProcessError; use crate::player::output::{player, write_hls};
use crate::utils::{config::PlayoutConfig, errors::ProcessError};
/// Defined process units. /// Defined process units.
#[derive(Clone, Debug, Default, Copy, Eq, Serialize, Deserialize, PartialEq)] #[derive(Clone, Debug, Default, Copy, Eq, Serialize, Deserialize, PartialEq)]
@ -36,6 +40,7 @@ use ProcessUnit::*;
#[derive(Clone, Debug, Default)] #[derive(Clone, Debug, Default)]
pub struct ChannelManager { pub struct ChannelManager {
pub config: Arc<Mutex<PlayoutConfig>>,
pub channel: Arc<Mutex<Channel>>, pub channel: Arc<Mutex<Channel>>,
pub decoder: Arc<Mutex<Option<Child>>>, pub decoder: Arc<Mutex<Option<Child>>>,
pub encoder: Arc<Mutex<Option<Child>>>, pub encoder: Arc<Mutex<Option<Child>>>,
@ -46,13 +51,144 @@ pub struct ChannelManager {
} }
impl ChannelManager { impl ChannelManager {
pub fn new(channel: Channel) -> Self { pub fn new(channel: Channel, config: PlayoutConfig) -> Self {
Self { Self {
is_alive: Arc::new(AtomicBool::new(channel.active)),
channel: Arc::new(Mutex::new(channel)), channel: Arc::new(Mutex::new(channel)),
is_alive: Arc::new(AtomicBool::new(true)), config: Arc::new(Mutex::new(config)),
..Default::default() ..Default::default()
} }
} }
pub fn update_channel(self, other: &Channel) {
let mut channel = self.channel.lock().unwrap();
channel.name = other.name.clone();
channel.preview_url = other.preview_url.clone();
channel.config_path = other.config_path.clone();
channel.extra_extensions = other.extra_extensions.clone();
channel.active = other.active.clone();
channel.modified = other.modified.clone();
channel.time_shift = other.time_shift.clone();
channel.utc_offset = other.utc_offset.clone();
}
pub fn stop(&self, unit: ProcessUnit) -> Result<(), ProcessError> {
let mut channel = self.channel.lock()?;
match unit {
Decoder => {
if let Some(proc) = self.decoder.lock()?.as_mut() {
#[cfg(not(windows))]
proc.term()
.map_err(|e| ProcessError::Custom(format!("Decoder: {e}")))?;
#[cfg(windows)]
proc.kill()
.map_err(|e| ProcessError::Custom(format!("Decoder: {e}")))?;
}
}
Encoder => {
if let Some(proc) = self.encoder.lock()?.as_mut() {
proc.kill()
.map_err(|e| ProcessError::Custom(format!("Encoder: {e}")))?;
}
}
Ingest => {
if let Some(proc) = self.ingest.lock()?.as_mut() {
proc.kill()
.map_err(|e| ProcessError::Custom(format!("Ingest: {e}")))?;
}
}
}
channel.active = false;
self.wait(unit)?;
Ok(())
}
/// Wait for process to proper close.
/// This prevents orphaned/zombi processes in system
pub fn wait(&self, unit: ProcessUnit) -> Result<(), ProcessError> {
match unit {
Decoder => {
if let Some(proc) = self.decoder.lock().unwrap().as_mut() {
proc.wait()
.map_err(|e| ProcessError::Custom(format!("Decoder: {e}")))?;
}
}
Encoder => {
if let Some(proc) = self.encoder.lock().unwrap().as_mut() {
proc.wait()
.map_err(|e| ProcessError::Custom(format!("Encoder: {e}")))?;
}
}
Ingest => {
if let Some(proc) = self.ingest.lock().unwrap().as_mut() {
proc.wait()
.map_err(|e| ProcessError::Custom(format!("Ingest: {e}")))?;
}
}
}
Ok(())
}
/// No matter what is running, terminate them all.
pub fn stop_all(&self) {
debug!("Stop all child processes");
self.is_terminated.store(true, Ordering::SeqCst);
self.ingest_is_running.store(false, Ordering::SeqCst);
if self.is_alive.load(Ordering::SeqCst) {
self.is_alive.store(false, Ordering::SeqCst);
trace!("Playout is alive and processes are terminated");
for unit in [Decoder, Encoder, Ingest] {
if let Err(e) = self.stop(unit) {
if !e.to_string().contains("exited process") {
error!("{e}")
}
}
if let Err(e) = self.wait(unit) {
if !e.to_string().contains("exited process") {
error!("{e}")
}
}
}
}
}
}
/// Global playout control, for move forward/backward clip, or resetting playlist/state.
#[derive(Clone, Debug)]
pub struct PlayoutStatus {
pub chain: Option<Arc<Mutex<Vec<String>>>>,
pub current_date: Arc<Mutex<String>>,
pub date: Arc<Mutex<String>>,
pub list_init: Arc<AtomicBool>,
pub time_shift: Arc<Mutex<f64>>,
}
impl PlayoutStatus {
pub fn new() -> Self {
Self {
chain: None,
current_date: Arc::new(Mutex::new(String::new())),
date: Arc::new(Mutex::new(String::new())),
list_init: Arc::new(AtomicBool::new(true)),
time_shift: Arc::new(Mutex::new(0.0)),
}
}
}
impl Default for PlayoutStatus {
fn default() -> Self {
Self::new()
}
} }
#[derive(Clone, Debug, Default)] #[derive(Clone, Debug, Default)]
@ -75,98 +211,17 @@ impl ChannelController {
channel.id != channel_id channel.id != channel_id
}); });
} }
pub fn update_from(&mut self, other: &Channel, channel_id: i32) {
self.channels.iter_mut().for_each(|c| {
let mut channel = c.channel.lock().unwrap();
if channel.id == channel_id {
channel.name = other.name.clone();
channel.preview_url = other.preview_url.clone();
channel.config_path = other.config_path.clone();
channel.extra_extensions = other.extra_extensions.clone();
channel.active = other.active.clone();
channel.utc_offset = other.utc_offset.clone();
}
})
}
pub fn stop(mut self, channel_id: i32, unit: ProcessUnit) -> Result<(), ProcessError> {
for manager in self.channels.iter_mut() {
let mut channel = manager.channel.lock().unwrap();
if channel.id == channel_id {
match unit {
Decoder => {
if let Some(proc) = manager.decoder.lock().unwrap().as_mut() {
#[cfg(not(windows))]
proc.term()
.map_err(|e| ProcessError::Custom(format!("Decoder: {e}")))?;
#[cfg(windows)]
proc.kill()
.map_err(|e| ProcessError::Custom(format!("Decoder: {e}")))?;
}
}
Encoder => {
if let Some(proc) = manager.encoder.lock().unwrap().as_mut() {
proc.kill()
.map_err(|e| ProcessError::Custom(format!("Encoder: {e}")))?;
}
}
Ingest => {
if let Some(proc) = manager.ingest.lock().unwrap().as_mut() {
proc.kill()
.map_err(|e| ProcessError::Custom(format!("Ingest: {e}")))?;
}
}
}
channel.active = false;
}
}
self.wait(channel_id, unit)?;
Ok(())
}
/// Wait for process to proper close.
/// This prevents orphaned/zombi processes in system
pub fn wait(mut self, channel_id: i32, unit: ProcessUnit) -> Result<(), ProcessError> {
for manager in self.channels.iter_mut() {
let channel = manager.channel.lock().unwrap();
if channel.id == channel_id {
match unit {
Decoder => {
if let Some(proc) = manager.decoder.lock().unwrap().as_mut() {
proc.wait()
.map_err(|e| ProcessError::Custom(format!("Decoder: {e}")))?;
}
}
Encoder => {
if let Some(proc) = manager.encoder.lock().unwrap().as_mut() {
proc.wait()
.map_err(|e| ProcessError::Custom(format!("Encoder: {e}")))?;
}
}
Ingest => {
if let Some(proc) = manager.ingest.lock().unwrap().as_mut() {
proc.wait()
.map_err(|e| ProcessError::Custom(format!("Ingest: {e}")))?;
}
}
}
}
}
Ok(())
}
} }
pub fn play(controller: &mut ChannelController, channel: Channel) { pub fn start(controller: Arc<Mutex<ChannelManager>>) -> Result<(), ProcessError> {
let manager = ChannelManager::new(channel); let config = controller.lock()?.config.lock()?.clone();
controller.add(manager); match config.out.mode {
// write files/playlist to HLS m3u8 playlist
HLS => write_hls(&config, play_control, playout_stat, proc_control),
// play on desktop or stream to a remote target
_ => player(&config, &play_control, playout_stat, proc_control),
};
Ok(())
} }

View File

@ -0,0 +1,39 @@
use regex::Regex;
use simplelog::*;
/// Apply custom filters
pub fn filter_node(filter: &str) -> (String, String) {
let re = Regex::new(r"^;?(\[[0-9]:[^\[]+\])?|\[[^\[]+\]$").unwrap(); // match start/end link
let mut video_filter = String::new();
let mut audio_filter = String::new();
// match chain with audio and video filter
if filter.contains("[c_v_out]") && filter.contains("[c_a_out]") {
let v_pos = filter.find("[c_v_out]").unwrap();
let a_pos = filter.find("[c_a_out]").unwrap();
let mut delimiter = "[c_v_out]";
// split delimiter should be first filter output link
if v_pos > a_pos {
delimiter = "[c_a_out]";
}
if let Some((f_1, f_2)) = filter.split_once(delimiter) {
if f_2.contains("[c_a_out]") {
video_filter = re.replace_all(f_1, "").to_string();
audio_filter = re.replace_all(f_2, "").to_string();
} else {
video_filter = re.replace_all(f_2, "").to_string();
audio_filter = re.replace_all(f_1, "").to_string();
}
}
} else if filter.contains("[c_v_out]") {
video_filter = re.replace_all(filter, "").to_string();
} else if filter.contains("[c_a_out]") {
audio_filter = re.replace_all(filter, "").to_string();
} else if !filter.is_empty() && filter != "~" {
error!("Custom filter is not well formatted, use correct out link names (\"[c_v_out]\" and/or \"[c_a_out]\"). Filter skipped!")
}
(video_filter, audio_filter)
}

View File

@ -0,0 +1,794 @@
use std::{
fmt,
path::Path,
sync::{Arc, Mutex},
};
use regex::Regex;
use simplelog::*;
mod custom;
pub mod v_drawtext;
use crate::player::{
controller::ProcessUnit::*,
utils::{custom_format, fps_calc, is_close, Media},
};
use crate::utils::config::{OutputMode::*, PlayoutConfig};
use crate::vec_strings;
#[derive(Clone, Debug, Copy, Eq, PartialEq)]
pub enum FilterType {
Audio,
Video,
}
impl fmt::Display for FilterType {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
FilterType::Audio => write!(f, "a"),
FilterType::Video => write!(f, "v"),
}
}
}
use FilterType::*;
#[derive(Debug, Clone)]
pub struct Filters {
pub audio_chain: String,
pub video_chain: String,
pub output_chain: Vec<String>,
pub audio_map: Vec<String>,
pub video_map: Vec<String>,
pub audio_out_link: Vec<String>,
pub video_out_link: Vec<String>,
pub output_map: Vec<String>,
config: PlayoutConfig,
audio_position: i32,
video_position: i32,
audio_last: i32,
video_last: i32,
}
impl Filters {
pub fn new(config: PlayoutConfig, audio_position: i32) -> Self {
Self {
audio_chain: String::new(),
video_chain: String::new(),
output_chain: vec![],
audio_map: vec![],
video_map: vec![],
audio_out_link: vec![],
video_out_link: vec![],
output_map: vec![],
config,
audio_position,
video_position: 0,
audio_last: -1,
video_last: -1,
}
}
pub fn add_filter(&mut self, filter: &str, track_nr: i32, filter_type: FilterType) {
let (map, chain, position, last) = match filter_type {
Audio => (
&mut self.audio_map,
&mut self.audio_chain,
self.audio_position,
&mut self.audio_last,
),
Video => (
&mut self.video_map,
&mut self.video_chain,
self.video_position,
&mut self.video_last,
),
};
if *last != track_nr {
// start new filter chain
let mut selector = String::new();
let mut sep = String::new();
if !chain.is_empty() {
selector = format!("[{filter_type}out{last}]");
sep = ";".to_string()
}
chain.push_str(&selector);
if filter.starts_with("aevalsrc") || filter.starts_with("movie") {
chain.push_str(&format!("{sep}{filter}"));
} else {
chain.push_str(&format!(
// build audio/video selector like [0:a:0]
"{sep}[{position}:{filter_type}:{track_nr}]{filter}",
));
}
let m = format!("[{filter_type}out{track_nr}]");
map.push(m.clone());
self.output_map.append(&mut vec_strings!["-map", m]);
*last = track_nr;
} else if filter.starts_with(';') || filter.starts_with('[') {
chain.push_str(filter);
} else {
chain.push_str(&format!(",{filter}"))
}
}
pub fn cmd(&mut self) -> Vec<String> {
if !self.output_chain.is_empty() {
return self.output_chain.clone();
}
let mut v_chain = self.video_chain.clone();
let mut a_chain = self.audio_chain.clone();
if self.video_last >= 0 && !v_chain.ends_with(']') {
v_chain.push_str(&format!("[vout{}]", self.video_last));
}
if self.audio_last >= 0 && !a_chain.ends_with(']') {
a_chain.push_str(&format!("[aout{}]", self.audio_last));
}
let mut f_chain = v_chain;
let mut cmd = vec![];
if !a_chain.is_empty() {
if !f_chain.is_empty() {
f_chain.push(';');
}
f_chain.push_str(&a_chain);
}
if !f_chain.is_empty() {
cmd.push("-filter_complex".to_string());
cmd.push(f_chain);
}
cmd
}
pub fn map(&mut self) -> Vec<String> {
let mut o_map = self.output_map.clone();
if self.video_last == -1 && !self.config.processing.audio_only {
let v_map = "0:v".to_string();
if !o_map.contains(&v_map) {
o_map.append(&mut vec_strings!["-map", v_map]);
};
}
if self.audio_last == -1 {
for i in 0..self.config.processing.audio_tracks {
let a_map = format!("{}:a:{i}", self.audio_position);
if !o_map.contains(&a_map) {
o_map.append(&mut vec_strings!["-map", a_map]);
};
}
}
o_map
}
}
impl Default for Filters {
fn default() -> Self {
Self::new(PlayoutConfig::new(None, None), 0)
}
}
fn deinterlace(field_order: &Option<String>, chain: &mut Filters, config: &PlayoutConfig) {
if let Some(order) = field_order {
if order != "progressive" {
let deinterlace = match config
.advanced
.as_ref()
.and_then(|a| a.decoder.filters.deinterlace.clone())
{
Some(deinterlace) => deinterlace,
None => "yadif=0:-1:0".to_string(),
};
chain.add_filter(&deinterlace, 0, Video);
}
}
}
fn pad(aspect: f64, chain: &mut Filters, v_stream: &ffprobe::Stream, config: &PlayoutConfig) {
if !is_close(aspect, config.processing.aspect, 0.03) {
let mut scale = String::new();
if let (Some(w), Some(h)) = (v_stream.width, v_stream.height) {
if w > config.processing.width && aspect > config.processing.aspect {
scale = match config
.advanced
.as_ref()
.and_then(|a| a.decoder.filters.pad_scale_w.clone())
{
Some(pad_scale_w) => {
custom_format(&format!("{pad_scale_w},"), &[&config.processing.width])
}
None => format!("scale={}:-1,", config.processing.width),
};
} else if h > config.processing.height && aspect < config.processing.aspect {
scale = match config
.advanced
.as_ref()
.and_then(|a| a.decoder.filters.pad_scale_h.clone())
{
Some(pad_scale_h) => {
custom_format(&format!("{pad_scale_h},"), &[&config.processing.width])
}
None => format!("scale=-1:{},", config.processing.height),
};
}
}
let pad = match config
.advanced
.as_ref()
.and_then(|a| a.decoder.filters.pad_video.clone())
{
Some(pad_video) => custom_format(
&format!("{scale}{pad_video}"),
&[
&config.processing.width.to_string(),
&config.processing.height.to_string(),
],
),
None => format!(
"{}pad=max(iw\\,ih*({1}/{2})):ow/({1}/{2}):(ow-iw)/2:(oh-ih)/2",
scale, config.processing.width, config.processing.height
),
};
chain.add_filter(&pad, 0, Video)
}
}
fn fps(fps: f64, chain: &mut Filters, config: &PlayoutConfig) {
if fps != config.processing.fps {
let fps_filter = match config
.advanced
.as_ref()
.and_then(|a| a.decoder.filters.fps.clone())
{
Some(fps) => custom_format(&fps, &[&config.processing.fps]),
None => format!("fps={}", config.processing.fps),
};
chain.add_filter(&fps_filter, 0, Video)
}
}
fn scale(
width: Option<i64>,
height: Option<i64>,
aspect: f64,
chain: &mut Filters,
config: &PlayoutConfig,
) {
// width: i64, height: i64
if let (Some(w), Some(h)) = (width, height) {
if w != config.processing.width || h != config.processing.height {
let scale = match config
.advanced
.as_ref()
.and_then(|a| a.decoder.filters.scale.clone())
{
Some(scale) => custom_format(
&scale,
&[&config.processing.width, &config.processing.height],
),
None => format!(
"scale={}:{}",
config.processing.width, config.processing.height
),
};
chain.add_filter(&scale, 0, Video);
} else {
chain.add_filter("null", 0, Video);
}
if !is_close(aspect, config.processing.aspect, 0.03) {
let dar = match config
.advanced
.as_ref()
.and_then(|a| a.decoder.filters.set_dar.clone())
{
Some(set_dar) => custom_format(&set_dar, &[&config.processing.aspect]),
None => format!("setdar=dar={}", config.processing.aspect),
};
chain.add_filter(&dar, 0, Video);
}
} else {
let scale = match config
.advanced
.as_ref()
.and_then(|a| a.decoder.filters.scale.clone())
{
Some(scale) => custom_format(
&scale,
&[&config.processing.width, &config.processing.height],
),
None => format!(
"scale={}:{}",
config.processing.width, config.processing.height
),
};
chain.add_filter(&scale, 0, Video);
let dar = match config
.advanced
.as_ref()
.and_then(|a| a.decoder.filters.set_dar.clone())
{
Some(set_dar) => custom_format(&set_dar, &[&config.processing.aspect]),
None => format!("setdar=dar={}", config.processing.aspect),
};
chain.add_filter(&dar, 0, Video);
}
}
fn fade(
node: &mut Media,
chain: &mut Filters,
nr: i32,
filter_type: FilterType,
config: &PlayoutConfig,
) {
let mut t = "";
let mut fade_audio = false;
if filter_type == Audio {
t = "a";
if node.duration_audio > 0.0 && node.duration_audio != node.duration {
fade_audio = true;
}
}
if node.seek > 0.0 || node.unit == Ingest {
let mut fade_in = format!("{t}fade=in:st=0:d=0.5");
if t == "a" {
if let Some(fade) = config
.advanced
.as_ref()
.and_then(|a| a.decoder.filters.afade_in.clone())
{
fade_in = custom_format(&fade, &[t]);
}
} else if let Some(fade) = config
.advanced
.as_ref()
.and_then(|a| a.decoder.filters.fade_in.clone())
{
fade_in = custom_format(&fade, &[t]);
};
chain.add_filter(&fade_in, nr, filter_type);
}
if (node.out != node.duration && node.out - node.seek > 1.0) || fade_audio {
let mut fade_out = format!("{t}fade=out:st={}:d=1.0", (node.out - node.seek - 1.0));
if t == "a" {
if let Some(fade) = config
.advanced
.as_ref()
.and_then(|a| a.decoder.filters.afade_out.clone())
{
fade_out = custom_format(&fade, &[node.out - node.seek - 1.0]);
}
} else if let Some(fade) = config
.advanced
.as_ref()
.and_then(|a| a.decoder.filters.fade_out.clone())
.clone()
{
fade_out = custom_format(&fade, &[node.out - node.seek - 1.0]);
};
chain.add_filter(&fade_out, nr, filter_type);
}
}
fn overlay(node: &mut Media, chain: &mut Filters, config: &PlayoutConfig) {
if config.processing.add_logo
&& Path::new(&config.processing.logo).is_file()
&& &node.category != "advertisement"
{
let mut logo_chain = format!(
"null[v];movie={}:loop=0,setpts=N/(FRAME_RATE*TB),format=rgba,colorchannelmixer=aa={}",
config
.processing
.logo
.replace('\\', "/")
.replace(':', "\\\\:"),
config.processing.logo_opacity,
);
if node.last_ad {
match config
.advanced
.as_ref()
.and_then(|a| a.decoder.filters.overlay_logo_fade_in.clone())
{
Some(fade_in) => logo_chain.push_str(&format!(",{fade_in}")),
None => logo_chain.push_str(",fade=in:st=0:d=1.0:alpha=1"),
};
}
if node.next_ad {
let length = node.out - node.seek - 1.0;
match config
.advanced
.as_ref()
.and_then(|a| a.decoder.filters.overlay_logo_fade_out.clone())
{
Some(fade_out) => {
logo_chain.push_str(&custom_format(&format!(",{fade_out}"), &[length]))
}
None => logo_chain.push_str(&format!(",fade=out:st={length}:d=1.0:alpha=1")),
}
}
if !config.processing.logo_scale.is_empty() {
match &config
.advanced
.as_ref()
.and_then(|a| a.decoder.filters.overlay_logo_scale.clone())
{
Some(logo_scale) => logo_chain.push_str(&custom_format(
&format!(",{logo_scale}"),
&[&config.processing.logo_scale],
)),
None => logo_chain.push_str(&format!(",scale={}", config.processing.logo_scale)),
}
}
match config
.advanced
.as_ref()
.and_then(|a| a.decoder.filters.overlay_logo.clone())
{
Some(overlay) => {
if !overlay.starts_with(',') {
logo_chain.push(',');
}
logo_chain.push_str(&custom_format(
&overlay,
&[&config.processing.logo_position],
))
}
None => logo_chain.push_str(&format!(
"[l];[v][l]overlay={}:shortest=1",
config.processing.logo_position
)),
};
chain.add_filter(&logo_chain, 0, Video);
}
}
fn extend_video(node: &mut Media, chain: &mut Filters, config: &PlayoutConfig) {
if let Some(video_duration) = node
.probe
.as_ref()
.and_then(|p| p.video_streams.first())
.and_then(|v| v.duration.as_ref())
.and_then(|v| v.parse::<f64>().ok())
{
if node.out - node.seek > video_duration - node.seek + 0.1 && node.duration >= node.out {
let duration = (node.out - node.seek) - (video_duration - node.seek);
let tpad = match config
.advanced
.as_ref()
.and_then(|a| a.decoder.filters.tpad.clone())
{
Some(pad) => custom_format(&pad, &[duration]),
None => format!("tpad=stop_mode=add:stop_duration={duration}"),
};
chain.add_filter(&tpad, 0, Video)
}
}
}
/// add drawtext filter for lower thirds messages
fn add_text(
node: &mut Media,
chain: &mut Filters,
config: &PlayoutConfig,
filter_chain: &Option<Arc<Mutex<Vec<String>>>>,
) {
if config.text.add_text
&& (config.text.text_from_filename || config.out.mode == HLS || node.unit == Encoder)
{
let filter = v_drawtext::filter_node(config, Some(node), filter_chain);
chain.add_filter(&filter, 0, Video);
}
}
fn add_audio(node: &Media, chain: &mut Filters, nr: i32, config: &PlayoutConfig) {
let audio = match config
.advanced
.as_ref()
.and_then(|a| a.decoder.filters.aevalsrc.clone())
{
Some(aevalsrc) => custom_format(&aevalsrc, &[node.out - node.seek]),
None => format!(
"aevalsrc=0:channel_layout=stereo:duration={}:sample_rate=48000",
node.out - node.seek
),
};
chain.add_filter(&audio, nr, Audio);
}
fn extend_audio(node: &mut Media, chain: &mut Filters, nr: i32, config: &PlayoutConfig) {
if !Path::new(&node.audio).is_file() {
if let Some(audio_duration) = node
.probe
.as_ref()
.and_then(|p| p.audio_streams.first())
.and_then(|a| a.duration.clone())
.and_then(|a| a.parse::<f64>().ok())
{
if node.out - node.seek > audio_duration - node.seek + 0.1 && node.duration >= node.out
{
let apad = match config
.advanced
.as_ref()
.and_then(|a| a.decoder.filters.apad.clone())
{
Some(apad) => custom_format(&apad, &[node.out - node.seek]),
None => format!("apad=whole_dur={}", node.out - node.seek),
};
chain.add_filter(&apad, nr, Audio)
}
}
}
}
fn audio_volume(chain: &mut Filters, config: &PlayoutConfig, nr: i32) {
if config.processing.volume != 1.0 {
let volume = match config
.advanced
.as_ref()
.and_then(|a| a.decoder.filters.volume.clone())
{
Some(volume) => custom_format(&volume, &[config.processing.volume]),
None => format!("volume={}", config.processing.volume),
};
chain.add_filter(&volume, nr, Audio)
}
}
fn aspect_calc(aspect_string: &Option<String>, config: &PlayoutConfig) -> f64 {
let mut source_aspect = config.processing.aspect;
if let Some(aspect) = aspect_string {
let aspect_vec: Vec<&str> = aspect.split(':').collect();
let w = aspect_vec[0].parse::<f64>().unwrap();
let h = aspect_vec[1].parse::<f64>().unwrap();
source_aspect = w / h;
}
source_aspect
}
pub fn split_filter(
chain: &mut Filters,
count: usize,
nr: i32,
filter_type: FilterType,
config: &PlayoutConfig,
) {
if count > 1 {
let out_link = match filter_type {
Audio => &mut chain.audio_out_link,
Video => &mut chain.video_out_link,
};
for i in 0..count {
let link = format!("[{filter_type}out_{nr}_{i}]");
if !out_link.contains(&link) {
out_link.push(link)
}
}
let split = match config
.advanced
.as_ref()
.and_then(|a| a.decoder.filters.split.clone())
{
Some(split) => custom_format(&split, &[count.to_string(), out_link.join("")]),
None => format!("split={count}{}", out_link.join("")),
};
chain.add_filter(&split, nr, filter_type);
}
}
/// Process output filter chain and add new filters to existing ones.
fn process_output_filters(config: &PlayoutConfig, chain: &mut Filters, custom_filter: &str) {
let filter =
if (config.text.add_text && !config.text.text_from_filename) || config.out.mode == HLS {
let re_v = Regex::new(r"\[[0:]+[v^\[]+([:0]+)?\]").unwrap(); // match video filter input link
let _re_a = Regex::new(r"\[[0:]+[a^\[]+([:0]+)?\]").unwrap(); // match video filter input link
let mut cf = custom_filter.to_string();
if !chain.video_chain.is_empty() {
cf = re_v
.replace(&cf, &format!("{},", chain.video_chain))
.to_string()
}
if !chain.audio_chain.is_empty() {
let audio_split = chain
.audio_chain
.split(';')
.enumerate()
.map(|(i, p)| p.replace(&format!("[aout{i}]"), ""))
.collect::<Vec<String>>();
for i in 0..config.processing.audio_tracks {
cf = cf.replace(
&format!("[0:a:{i}]"),
&format!("{},", &audio_split[i as usize]),
)
}
}
cf
} else {
custom_filter.to_string()
};
chain.output_chain = vec_strings!["-filter_complex", filter]
}
fn custom(filter: &str, chain: &mut Filters, nr: i32, filter_type: FilterType) {
if !filter.is_empty() {
chain.add_filter(filter, nr, filter_type);
}
}
pub fn filter_chains(
config: &PlayoutConfig,
node: &mut Media,
filter_chain: &Option<Arc<Mutex<Vec<String>>>>,
) -> Filters {
let mut filters = Filters::new(config.clone(), 0);
if node.unit == Encoder {
if !config.processing.audio_only {
add_text(node, &mut filters, config, filter_chain);
}
if let Some(f) = config.out.output_filter.clone() {
process_output_filters(config, &mut filters, &f)
} else if config.out.output_count > 1 && !config.processing.audio_only {
split_filter(&mut filters, config.out.output_count, 0, Video, config);
}
return filters;
}
if !config.processing.audio_only && !config.processing.copy_video {
if let Some(probe) = node.probe.as_ref() {
if Path::new(&node.audio).is_file() {
filters.audio_position = 1;
}
if let Some(v_stream) = &probe.video_streams.first() {
let aspect = aspect_calc(&v_stream.display_aspect_ratio, config);
let frame_per_sec = fps_calc(&v_stream.r_frame_rate, 1.0);
deinterlace(&v_stream.field_order, &mut filters, config);
pad(aspect, &mut filters, v_stream, config);
fps(frame_per_sec, &mut filters, config);
scale(
v_stream.width,
v_stream.height,
aspect,
&mut filters,
config,
);
}
extend_video(node, &mut filters, config);
} else {
fps(0.0, &mut filters, config);
scale(None, None, 1.0, &mut filters, config);
}
add_text(node, &mut filters, config, filter_chain);
fade(node, &mut filters, 0, Video, config);
overlay(node, &mut filters, config);
}
let (proc_vf, proc_af) = if node.unit == Ingest {
custom::filter_node(&config.ingest.custom_filter)
} else {
custom::filter_node(&config.processing.custom_filter)
};
let (list_vf, list_af) = custom::filter_node(&node.custom_filter);
if !config.processing.copy_video {
custom(&proc_vf, &mut filters, 0, Video);
custom(&list_vf, &mut filters, 0, Video);
}
let mut audio_indexes = vec![];
if config.processing.audio_track_index == -1 {
for i in 0..config.processing.audio_tracks {
audio_indexes.push(i)
}
} else {
audio_indexes.push(config.processing.audio_track_index)
}
if !config.processing.copy_audio {
for i in audio_indexes {
if node
.probe
.as_ref()
.and_then(|p| p.audio_streams.get(i as usize))
.is_some()
|| Path::new(&node.audio).is_file()
{
extend_audio(node, &mut filters, i, config);
} else if node.unit == Decoder {
if !node.source.contains("color=c=") {
warn!(
"Missing audio track (id {i}) from <b><magenta>{}</></b>",
node.source
);
}
add_audio(node, &mut filters, i, config);
}
// add at least anull filter, for correct filter construction,
// is important for split filter in HLS mode
filters.add_filter("anull", i, Audio);
fade(node, &mut filters, i, Audio, config);
audio_volume(&mut filters, config, i);
custom(&proc_af, &mut filters, i, Audio);
custom(&list_af, &mut filters, i, Audio);
}
} else if config.processing.audio_track_index > -1 {
error!("Setting 'audio_track_index' other than '-1' is not allowed in audio copy mode!")
}
if config.out.mode == HLS {
if let Some(f) = config.out.output_filter.clone() {
process_output_filters(config, &mut filters, &f)
}
}
filters
}

View File

@ -0,0 +1,82 @@
use std::{
ffi::OsStr,
path::Path,
sync::{Arc, Mutex},
};
use regex::Regex;
use crate::player::{
controller::ProcessUnit::*,
utils::{custom_format, Media},
};
use crate::utils::config::PlayoutConfig;
pub fn filter_node(
config: &PlayoutConfig,
node: Option<&Media>,
filter_chain: &Option<Arc<Mutex<Vec<String>>>>,
) -> String {
let mut filter = String::new();
let mut font = String::new();
if Path::new(&config.text.fontfile).is_file() {
font = format!(":fontfile='{}'", config.text.fontfile)
}
let zmq_socket = match node.map(|n| n.unit) {
Some(Ingest) => config.text.zmq_server_socket.clone(),
_ => config.text.zmq_stream_socket.clone(),
};
if config.text.text_from_filename && node.is_some() {
let source = node.unwrap_or(&Media::new(0, "", false)).source.clone();
let text = match Regex::new(&config.text.regex)
.ok()
.and_then(|r| r.captures(&source))
{
Some(t) => t[1].to_string(),
None => Path::new(&source)
.file_stem()
.unwrap_or_else(|| OsStr::new(&source))
.to_string_lossy()
.to_string(),
};
let escaped_text = text
.replace('\'', "'\\\\\\''")
.replace('%', "\\\\\\%")
.replace(':', "\\:");
filter = match &config
.advanced
.clone()
.and_then(|a| a.decoder.filters.drawtext_from_file)
{
Some(drawtext) => custom_format(drawtext, &[&escaped_text, &config.text.style, &font]),
None => format!("drawtext=text='{escaped_text}':{}{font}", config.text.style),
};
} else if let Some(socket) = zmq_socket {
let mut filter_cmd = format!("text=''{font}");
if let Some(chain) = filter_chain {
if let Some(link) = chain.lock().unwrap().iter().find(|&l| l.contains("text")) {
filter_cmd = link.to_string();
}
}
filter = match config
.advanced
.as_ref()
.and_then(|a| a.decoder.filters.drawtext_from_zmq.clone())
{
Some(drawtext) => custom_format(&drawtext, &[&socket.replace(':', "\\:"), &filter_cmd]),
None => format!(
"zmq=b=tcp\\\\://'{}',drawtext@dyntext={filter_cmd}",
socket.replace(':', "\\:")
),
};
}
filter
}

View File

@ -0,0 +1,102 @@
use std::{
path::Path,
sync::{
atomic::{AtomicBool, Ordering},
mpsc::channel,
{Arc, Mutex},
},
thread::sleep,
time::Duration,
};
use notify::{
event::{CreateKind, ModifyKind, RemoveKind, RenameMode},
EventKind::{Create, Modify, Remove},
RecursiveMode, Watcher,
};
use notify_debouncer_full::new_debouncer;
use simplelog::*;
use ffplayout_lib::utils::{include_file_extension, Media, PlayoutConfig};
/// Create a watcher, which monitor file changes.
/// When a change is register, update the current file list.
/// This makes it possible, to play infinitely and and always new files to it.
pub fn watchman(
config: PlayoutConfig,
is_terminated: Arc<AtomicBool>,
sources: Arc<Mutex<Vec<Media>>>,
) {
let path = Path::new(&config.storage.path);
if !path.exists() {
error!("Folder path not exists: '{path:?}'");
panic!("Folder path not exists: '{path:?}'");
}
// let (tx, rx) = channel();
let (tx, rx) = channel();
let mut debouncer = new_debouncer(Duration::from_secs(1), None, tx).unwrap();
debouncer
.watcher()
.watch(path, RecursiveMode::Recursive)
.unwrap();
debouncer.cache().add_root(path, RecursiveMode::Recursive);
while !is_terminated.load(Ordering::SeqCst) {
if let Ok(result) = rx.try_recv() {
match result {
Ok(events) => events.iter().for_each(|event| match event.kind {
Create(CreateKind::File) | Modify(ModifyKind::Name(RenameMode::To)) => {
let new_path = &event.paths[0];
if new_path.is_file() && include_file_extension(&config, new_path) {
let index = sources.lock().unwrap().len();
let media = Media::new(index, &new_path.to_string_lossy(), false);
sources.lock().unwrap().push(media);
info!("Create new file: <b><magenta>{new_path:?}</></b>");
}
}
Remove(RemoveKind::File) | Modify(ModifyKind::Name(RenameMode::From)) => {
let old_path = &event.paths[0];
if !old_path.is_file() && include_file_extension(&config, old_path) {
sources
.lock()
.unwrap()
.retain(|x| x.source != old_path.to_string_lossy());
info!("Remove file: <b><magenta>{old_path:?}</></b>");
}
}
Modify(ModifyKind::Name(RenameMode::Both)) => {
let old_path = &event.paths[0];
let new_path = &event.paths[1];
let mut media_list = sources.lock().unwrap();
if let Some(index) = media_list
.iter()
.position(|x| *x.source == old_path.display().to_string()) {
let media = Media::new(index, &new_path.to_string_lossy(), false);
media_list[index] = media;
info!("Move file: <b><magenta>{old_path:?}</></b> to <b><magenta>{new_path:?}</></b>");
} else if include_file_extension(&config, new_path) {
let index = media_list.len();
let media = Media::new(index, &new_path.to_string_lossy(), false);
media_list.push(media);
info!("Create new file: <b><magenta>{new_path:?}</></b>");
}
}
_ => debug!("Not tracked file event: {event:?}")
}),
Err(errors) => errors.iter().for_each(|error| error!("{error:?}")),
}
}
sleep(Duration::from_secs(3));
}
}

View File

@ -0,0 +1,168 @@
use std::{
io::{BufRead, BufReader, Error, Read},
process::{exit, ChildStderr, Command, Stdio},
sync::atomic::Ordering,
thread,
};
use crossbeam_channel::Sender;
use simplelog::*;
use crate::player::utils::valid_stream;
use crate::utils::logging::log_line;
use ffplayout_lib::{
utils::{
controller::ProcessUnit::*, test_tcp_port, Media, PlayoutConfig, ProcessControl,
FFMPEG_IGNORE_ERRORS, FFMPEG_UNRECOVERABLE_ERRORS,
},
vec_strings,
};
fn server_monitor(
level: &str,
ignore: Vec<String>,
buffer: BufReader<ChildStderr>,
proc_ctl: ProcessControl,
) -> Result<(), Error> {
for line in buffer.lines() {
let line = line?;
if !FFMPEG_IGNORE_ERRORS.iter().any(|i| line.contains(*i))
&& !ignore.iter().any(|i| line.contains(i))
{
log_line(&line, level);
}
if line.contains("rtmp") && line.contains("Unexpected stream") && !valid_stream(&line) {
if let Err(e) = proc_ctl.stop(Ingest) {
error!("{e}");
};
}
if FFMPEG_UNRECOVERABLE_ERRORS
.iter()
.any(|i| line.contains(*i))
{
proc_ctl.stop_all();
}
}
Ok(())
}
/// ffmpeg Ingest Server
///
/// Start ffmpeg in listen mode, and wait for input.
pub fn ingest_server(
config: PlayoutConfig,
ingest_sender: Sender<(usize, [u8; 65088])>,
proc_control: ProcessControl,
) -> Result<(), Error> {
let mut buffer: [u8; 65088] = [0; 65088];
let mut server_cmd = vec_strings!["-hide_banner", "-nostats", "-v", "level+info"];
let stream_input = config.ingest.input_cmd.clone().unwrap();
let mut dummy_media = Media::new(0, "Live Stream", false);
dummy_media.unit = Ingest;
dummy_media.add_filter(&config, &None);
if let Some(ingest_input_cmd) = config
.advanced
.as_ref()
.and_then(|a| a.ingest.input_cmd.clone())
{
server_cmd.append(&mut ingest_input_cmd.clone());
}
server_cmd.append(&mut stream_input.clone());
if let Some(mut filter) = dummy_media.filter {
server_cmd.append(&mut filter.cmd());
server_cmd.append(&mut filter.map());
}
if let Some(mut cmd) = config.processing.cmd {
server_cmd.append(&mut cmd);
}
let mut is_running;
if let Some(url) = stream_input.iter().find(|s| s.contains("://")) {
if !test_tcp_port(url) {
proc_control.stop_all();
exit(1);
}
info!("Start ingest server, listening on: <b><magenta>{url}</></b>",);
};
debug!(
"Server CMD: <bright-blue>\"ffmpeg {}\"</>",
server_cmd.join(" ")
);
while !proc_control.is_terminated.load(Ordering::SeqCst) {
let proc_ctl = proc_control.clone();
let level = config.logging.ingest_level.clone().unwrap();
let ignore = config.logging.ignore_lines.clone();
let mut server_proc = match Command::new("ffmpeg")
.args(server_cmd.clone())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
{
Err(e) => {
error!("couldn't spawn ingest server: {e}");
panic!("couldn't spawn ingest server: {e}")
}
Ok(proc) => proc,
};
let mut ingest_reader = BufReader::new(server_proc.stdout.take().unwrap());
let server_err = BufReader::new(server_proc.stderr.take().unwrap());
let error_reader_thread =
thread::spawn(move || server_monitor(&level, ignore, server_err, proc_ctl));
*proc_control.server_term.lock().unwrap() = Some(server_proc);
is_running = false;
loop {
let bytes_len = match ingest_reader.read(&mut buffer[..]) {
Ok(length) => length,
Err(e) => {
debug!("Ingest server read {e:?}");
break;
}
};
if !is_running {
proc_control.server_is_running.store(true, Ordering::SeqCst);
is_running = true;
}
if bytes_len > 0 {
if let Err(e) = ingest_sender.send((bytes_len, buffer)) {
error!("Ingest server write error: {e:?}");
proc_control.is_terminated.store(true, Ordering::SeqCst);
break;
}
} else {
break;
}
}
drop(ingest_reader);
proc_control
.server_is_running
.store(false, Ordering::SeqCst);
if let Err(e) = proc_control.wait(Ingest) {
error!("{e}")
}
if let Err(e) = error_reader_thread.join() {
error!("{e:?}");
};
}
Ok(())
}

View File

@ -0,0 +1,51 @@
use std::{
sync::{atomic::AtomicBool, Arc},
thread,
};
use simplelog::*;
pub mod folder;
pub mod ingest;
pub mod playlist;
pub use folder::watchman;
pub use ingest::ingest_server;
pub use playlist::CurrentProgram;
use crate::utils::config::PlayoutConfig;
use ffplayout_lib::utils::{controller::PlayerControl, folder::FolderSource};
use ffplayout_lib::utils::{Media, PlayoutStatus, ProcessMode::*};
/// Create a source iterator from playlist, or from folder.
pub fn source_generator(
config: PlayoutConfig,
player_control: &PlayerControl,
playout_stat: PlayoutStatus,
is_terminated: Arc<AtomicBool>,
) -> Box<dyn Iterator<Item = Media>> {
match config.processing.mode {
Folder => {
info!("Playout in folder mode");
debug!(
"Monitor folder: <b><magenta>{:?}</></b>",
config.storage.path
);
let config_clone = config.clone();
let folder_source = FolderSource::new(&config, playout_stat.chain, player_control);
let node_clone = folder_source.player_control.current_list.clone();
// Spawn a thread to monitor folder for file changes.
thread::spawn(move || watchman(config_clone, is_terminated.clone(), node_clone));
Box::new(folder_source) as Box<dyn Iterator<Item = Media>>
}
Playlist => {
info!("Playout in playlist mode");
let program = CurrentProgram::new(&config, playout_stat, is_terminated, player_control);
Box::new(program) as Box<dyn Iterator<Item = Media>>
}
}
}

View File

@ -0,0 +1,848 @@
use std::{
fs,
path::Path,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};
use serde_json::json;
use simplelog::*;
use ffplayout_lib::utils::{
controller::PlayerControl,
gen_dummy, get_delta, is_close, is_remote,
json_serializer::{read_json, set_defaults},
loop_filler, loop_image, modified_time, seek_and_length, time_in_seconds, JsonPlaylist, Media,
MediaProbe, PlayoutConfig, PlayoutStatus, IMAGE_FORMAT,
};
/// Struct for current playlist.
///
/// Here we prepare the init clip and build a iterator where we pull our clips.
#[derive(Debug)]
pub struct CurrentProgram {
config: PlayoutConfig,
start_sec: f64,
end_sec: f64,
json_playlist: JsonPlaylist,
player_control: PlayerControl,
current_node: Media,
is_terminated: Arc<AtomicBool>,
playout_stat: PlayoutStatus,
last_json_path: Option<String>,
last_node_ad: bool,
}
/// Prepare a playlist iterator.
impl CurrentProgram {
pub fn new(
config: &PlayoutConfig,
playout_stat: PlayoutStatus,
is_terminated: Arc<AtomicBool>,
player_control: &PlayerControl,
) -> Self {
Self {
config: config.clone(),
start_sec: config.playlist.start_sec.unwrap(),
end_sec: config.playlist.length_sec.unwrap(),
json_playlist: JsonPlaylist::new(
"1970-01-01".to_string(),
config.playlist.start_sec.unwrap(),
),
player_control: player_control.clone(),
current_node: Media::new(0, "", false),
is_terminated,
playout_stat,
last_json_path: None,
last_node_ad: false,
}
}
// Check if there is no current playlist or file got updated,
// and when is so load/reload it.
fn load_or_update_playlist(&mut self, seek: bool) {
let mut get_current = false;
let mut reload = false;
if let Some(path) = self.json_playlist.path.clone() {
if (Path::new(&path).is_file() || is_remote(&path))
&& self.json_playlist.modified != modified_time(&path)
{
info!("Reload playlist <b><magenta>{path}</></b>");
self.playout_stat.list_init.store(true, Ordering::SeqCst);
get_current = true;
reload = true;
}
} else {
get_current = true;
}
if get_current {
self.json_playlist = read_json(
&mut self.config,
&self.player_control,
self.json_playlist.path.clone(),
self.is_terminated.clone(),
seek,
false,
);
if !reload {
if let Some(file) = &self.json_playlist.path {
info!("Read playlist: <b><magenta>{file}</></b>");
}
if *self.playout_stat.date.lock().unwrap() != self.json_playlist.date {
self.set_status(self.json_playlist.date.clone());
}
self.playout_stat
.current_date
.lock()
.unwrap()
.clone_from(&self.json_playlist.date);
}
self.player_control
.current_list
.lock()
.unwrap()
.clone_from(&self.json_playlist.program);
if self.json_playlist.path.is_none() {
trace!("missing playlist");
self.current_node = Media::new(0, "", false);
self.playout_stat.list_init.store(true, Ordering::SeqCst);
self.player_control.current_index.store(0, Ordering::SeqCst);
}
}
}
// Check if day is past and it is time for a new playlist.
fn check_for_playlist(&mut self, seek: bool) -> bool {
let (delta, total_delta) = get_delta(&self.config, &time_in_seconds());
let mut next = false;
let duration = if self.current_node.duration >= self.current_node.out {
self.current_node.duration
} else {
// maybe out is longer to be able to loop
self.current_node.out
};
let node_index = self.current_node.index.unwrap_or_default();
let mut next_start =
self.current_node.begin.unwrap_or_default() - self.start_sec + duration + delta;
if node_index > 0
&& node_index == self.player_control.current_list.lock().unwrap().len() - 1
{
next_start += self.config.general.stop_threshold;
}
trace!(
"delta: {delta} | total_delta: {total_delta}, index: {node_index} \nnext_start: {next_start} | end_sec: {} | source {}",
self.end_sec,
self.current_node.source
);
// Check if we over the target length or we are close to it, if so we load the next playlist.
if !self.config.playlist.infinit
&& (next_start >= self.end_sec
|| is_close(total_delta, 0.0, 2.0)
|| is_close(total_delta, self.end_sec, 2.0))
{
trace!("get next day");
next = true;
self.json_playlist = read_json(
&mut self.config,
&self.player_control,
None,
self.is_terminated.clone(),
false,
true,
);
if let Some(file) = &self.json_playlist.path {
info!("Read next playlist: <b><magenta>{file}</></b>");
}
self.playout_stat.list_init.store(false, Ordering::SeqCst);
self.set_status(self.json_playlist.date.clone());
self.player_control
.current_list
.lock()
.unwrap()
.clone_from(&self.json_playlist.program);
self.player_control.current_index.store(0, Ordering::SeqCst);
} else {
self.load_or_update_playlist(seek)
}
next
}
fn set_status(&mut self, date: String) {
if *self.playout_stat.date.lock().unwrap() != date
&& *self.playout_stat.time_shift.lock().unwrap() != 0.0
{
info!("Reset playout status");
}
self.playout_stat
.current_date
.lock()
.unwrap()
.clone_from(&date);
*self.playout_stat.time_shift.lock().unwrap() = 0.0;
if let Err(e) = fs::write(
&self.config.general.stat_file,
serde_json::to_string(&json!({
"time_shift": 0.0,
"date": date,
}))
.unwrap(),
) {
error!("Unable to write status file: {e}");
};
}
// Check if last and/or next clip is a advertisement.
fn last_next_ad(&mut self, node: &mut Media) {
let index = self.player_control.current_index.load(Ordering::SeqCst);
let current_list = self.player_control.current_list.lock().unwrap();
if index + 1 < current_list.len() && &current_list[index + 1].category == "advertisement" {
node.next_ad = true;
}
if index > 0
&& index < current_list.len()
&& &current_list[index - 1].category == "advertisement"
{
node.last_ad = true;
}
}
// Get current time and when we are before start time,
// we add full seconds of a day to it.
fn get_current_time(&mut self) -> f64 {
let mut time_sec = time_in_seconds();
if time_sec < self.start_sec {
time_sec += 86400.0 // self.config.playlist.length_sec.unwrap();
}
time_sec
}
// On init or reload we need to seek for the current clip.
fn get_current_clip(&mut self) {
let mut time_sec = self.get_current_time();
let shift = *self.playout_stat.time_shift.lock().unwrap();
if shift != 0.0 {
info!("Shift playlist start for <yellow>{shift:.3}</> seconds");
time_sec += shift;
}
if self.config.playlist.infinit
&& self.json_playlist.length.unwrap() < 86400.0
&& time_sec > self.json_playlist.length.unwrap() + self.start_sec
{
self.recalculate_begin(true)
}
for (i, item) in self
.player_control
.current_list
.lock()
.unwrap()
.iter()
.enumerate()
{
if item.begin.unwrap() + item.out - item.seek > time_sec {
self.playout_stat.list_init.store(false, Ordering::SeqCst);
self.player_control.current_index.store(i, Ordering::SeqCst);
break;
}
}
}
// Prepare init clip.
fn init_clip(&mut self) -> bool {
trace!("init_clip");
self.get_current_clip();
let mut is_filler = false;
if !self.playout_stat.list_init.load(Ordering::SeqCst) {
let time_sec = self.get_current_time();
let index = self.player_control.current_index.load(Ordering::SeqCst);
let nodes = self.player_control.current_list.lock().unwrap();
let last_index = nodes.len() - 1;
// de-instance node to preserve original values in list
let mut node_clone = nodes[index].clone();
// Important! When no manual drop is happen here, lock is still active in handle_list_init
drop(nodes);
trace!("Clip from init: {}", node_clone.source);
node_clone.seek += time_sec
- (node_clone.begin.unwrap() - *self.playout_stat.time_shift.lock().unwrap());
self.last_next_ad(&mut node_clone);
self.player_control
.current_index
.fetch_add(1, Ordering::SeqCst);
self.current_node = handle_list_init(
&self.config,
node_clone,
&self.playout_stat,
&self.player_control,
last_index,
);
if self
.current_node
.source
.contains(&self.config.storage.path.to_string_lossy().to_string())
|| self.current_node.source.contains("color=c=#121212")
{
is_filler = true;
}
}
is_filler
}
fn fill_end(&mut self, total_delta: f64) {
// Fill end from playlist
let index = self.player_control.current_index.load(Ordering::SeqCst);
let mut media = Media::new(index, "", false);
media.begin = Some(time_in_seconds());
media.duration = total_delta;
media.out = total_delta;
self.last_next_ad(&mut media);
self.current_node = gen_source(
&self.config,
media,
&self.playout_stat,
&self.player_control,
0,
);
self.player_control
.current_list
.lock()
.unwrap()
.push(self.current_node.clone());
self.current_node.last_ad = self.last_node_ad;
self.current_node
.add_filter(&self.config, &self.playout_stat.chain);
self.player_control
.current_index
.fetch_add(1, Ordering::SeqCst);
}
fn recalculate_begin(&mut self, extend: bool) {
debug!("Infinit playlist reaches end, recalculate clip begins.");
let mut time_sec = time_in_seconds();
if extend {
time_sec = self.start_sec + self.json_playlist.length.unwrap();
}
self.json_playlist.start_sec = Some(time_sec);
set_defaults(&mut self.json_playlist);
self.player_control
.current_list
.lock()
.unwrap()
.clone_from(&self.json_playlist.program);
}
}
/// Build the playlist iterator
impl Iterator for CurrentProgram {
type Item = Media;
fn next(&mut self) -> Option<Self::Item> {
self.last_json_path.clone_from(&self.json_playlist.path);
self.last_node_ad = self.current_node.last_ad;
self.check_for_playlist(self.playout_stat.list_init.load(Ordering::SeqCst));
if self.playout_stat.list_init.load(Ordering::SeqCst) {
trace!("Init playlist, from next iterator");
let mut init_clip_is_filler = false;
if self.json_playlist.path.is_some() {
init_clip_is_filler = self.init_clip();
}
if self.playout_stat.list_init.load(Ordering::SeqCst) && !init_clip_is_filler {
// On init load, playlist could be not long enough, or clips are not found
// so we fill the gap with a dummy.
trace!("Init clip is no filler");
let mut current_time = time_in_seconds();
let (_, total_delta) = get_delta(&self.config, &current_time);
if self.start_sec > current_time {
current_time += self.end_sec + 1.0;
}
let mut last_index = 0;
let length = self.player_control.current_list.lock().unwrap().len();
if length > 0 {
last_index = length - 1;
}
let mut media = Media::new(length, "", false);
media.begin = Some(current_time);
media.duration = total_delta;
media.out = total_delta;
self.last_next_ad(&mut media);
self.current_node = gen_source(
&self.config,
media,
&self.playout_stat,
&self.player_control,
last_index,
);
}
return Some(self.current_node.clone());
}
if self.player_control.current_index.load(Ordering::SeqCst)
< self.player_control.current_list.lock().unwrap().len()
{
// get next clip from current playlist
let mut is_last = false;
let index = self.player_control.current_index.load(Ordering::SeqCst);
let node_list = self.player_control.current_list.lock().unwrap();
let mut node = node_list[index].clone();
let last_index = node_list.len() - 1;
drop(node_list);
if index == last_index {
is_last = true
}
self.last_next_ad(&mut node);
self.current_node = timed_source(
node,
&self.config,
is_last,
&self.playout_stat,
&self.player_control,
last_index,
);
self.player_control
.current_index
.fetch_add(1, Ordering::SeqCst);
Some(self.current_node.clone())
} else {
let (_, total_delta) = get_delta(&self.config, &self.start_sec);
if !self.config.playlist.infinit
&& self.last_json_path == self.json_playlist.path
&& total_delta.abs() > 1.0
{
// Playlist is to early finish,
// and if we have to fill it with a placeholder.
trace!("Total delta on list end: {total_delta}");
self.fill_end(total_delta);
return Some(self.current_node.clone());
}
// Get first clip from next playlist.
let c_list = self.player_control.current_list.lock().unwrap();
let mut first_node = c_list[0].clone();
drop(c_list);
if self.config.playlist.infinit {
self.recalculate_begin(false)
}
self.player_control.current_index.store(0, Ordering::SeqCst);
self.last_next_ad(&mut first_node);
first_node.last_ad = self.last_node_ad;
self.current_node = gen_source(
&self.config,
first_node,
&self.playout_stat,
&self.player_control,
0,
);
self.player_control.current_index.store(1, Ordering::SeqCst);
Some(self.current_node.clone())
}
}
}
/// Prepare input clip:
///
/// - check begin and length from clip
/// - return clip only if we are in 24 hours time range
fn timed_source(
node: Media,
config: &PlayoutConfig,
last: bool,
playout_stat: &PlayoutStatus,
player_control: &PlayerControl,
last_index: usize,
) -> Media {
let (delta, total_delta) = get_delta(config, &node.begin.unwrap());
let mut shifted_delta = delta;
let mut new_node = node.clone();
new_node.process = Some(false);
trace!("Node begin: {}", node.begin.unwrap());
trace!("timed source is last: {last}");
if config.playlist.length.contains(':') {
let time_shift = playout_stat.time_shift.lock().unwrap();
if *playout_stat.current_date.lock().unwrap() == *playout_stat.date.lock().unwrap()
&& *time_shift != 0.0
{
shifted_delta = delta - *time_shift;
debug!("Delta: <yellow>{shifted_delta:.3}</>, shifted: <yellow>{delta:.3}</>");
} else {
debug!("Delta: <yellow>{shifted_delta:.3}</>");
}
if config.general.stop_threshold > 0.0
&& shifted_delta.abs() > config.general.stop_threshold
{
error!("Clip begin out of sync for <yellow>{delta:.3}</> seconds.");
new_node.cmd = None;
return new_node;
}
}
if (total_delta > node.out - node.seek && !last)
|| node.index.unwrap() < 2
|| !config.playlist.length.contains(':')
|| config.playlist.infinit
{
// when we are in the 24 hour range, get the clip
new_node.process = Some(true);
new_node = gen_source(config, node, playout_stat, player_control, last_index);
} else if total_delta <= 0.0 {
info!("Begin is over play time, skip: {}", node.source);
} else if total_delta < node.duration - node.seek || last {
new_node = handle_list_end(
config,
node,
total_delta,
playout_stat,
player_control,
last_index,
);
}
new_node
}
fn duplicate_for_seek_and_loop(node: &mut Media, player_control: &PlayerControl) {
warn!("Clip loops and has seek value: duplicate clip to separate loop and seek.");
let mut nodes = player_control.current_list.lock().unwrap();
let index = node.index.unwrap_or_default();
let mut node_duplicate = node.clone();
node_duplicate.seek = 0.0;
let orig_seek = node.seek;
node.out = node.duration;
if node.seek > node.duration {
node.seek %= node.duration;
node_duplicate.out = node_duplicate.out - orig_seek - (node.out - node.seek);
} else {
node_duplicate.out -= node_duplicate.duration;
}
if node.seek == node.out {
node.seek = node_duplicate.seek;
node.out = node_duplicate.out;
} else if node_duplicate.out - node_duplicate.seek > 1.2 {
node_duplicate.begin =
Some(node_duplicate.begin.unwrap_or_default() + (node.out - node.seek));
nodes.insert(index + 1, node_duplicate);
for (i, item) in nodes.iter_mut().enumerate() {
item.index = Some(i);
}
}
}
/// Generate the source CMD, or when clip not exist, get a dummy.
pub fn gen_source(
config: &PlayoutConfig,
mut node: Media,
playout_stat: &PlayoutStatus,
player_control: &PlayerControl,
last_index: usize,
) -> Media {
let node_index = node.index.unwrap_or_default();
let mut duration = node.out - node.seek;
if duration < 1.0 {
warn!("Clip is less then 1 second long (<yellow>{duration:.3}</>), adjust length.");
duration = 1.2;
if node.seek > 1.0 {
node.seek -= 1.2;
} else {
node.out = 1.2;
}
}
trace!("Clip new length: {duration}, duration: {}", node.duration);
if node.probe.is_none() && !node.source.is_empty() {
if let Err(e) = node.add_probe(true) {
trace!("{e:?}");
};
} else {
trace!("Node has a probe...")
}
// separate if condition, because of node.add_probe() in last condition
if node.probe.is_some() {
if node
.source
.rsplit_once('.')
.map(|(_, e)| e.to_lowercase())
.filter(|c| IMAGE_FORMAT.contains(&c.as_str()))
.is_some()
{
node.cmd = Some(loop_image(&node));
} else {
if node.seek > 0.0 && node.out > node.duration {
duplicate_for_seek_and_loop(&mut node, player_control);
}
node.cmd = Some(seek_and_length(&mut node));
}
} else {
trace!("clip index: {node_index} | last index: {last_index}");
// Last index is the index from the last item from the node list.
if node_index < last_index {
error!("Source not found: <b><magenta>{}</></b>", node.source);
}
let mut filler_list = vec![];
match player_control.filler_list.try_lock() {
Ok(list) => filler_list = list.to_vec(),
Err(e) => error!("Lock filler list error: {e}"),
}
// Set list_init to true, to stay in sync.
playout_stat.list_init.store(true, Ordering::SeqCst);
if config.storage.filler.is_dir() && !filler_list.is_empty() {
let filler_index = player_control.filler_index.fetch_add(1, Ordering::SeqCst);
let mut filler_media = filler_list[filler_index].clone();
trace!("take filler: {}", filler_media.source);
if filler_index == filler_list.len() - 1 {
// reset index for next round
player_control.filler_index.store(0, Ordering::SeqCst)
}
if filler_media.probe.is_none() {
if let Err(e) = filler_media.add_probe(false) {
error!("{e:?}");
};
}
if filler_media.duration > duration {
filler_media.out = duration;
}
node.source = filler_media.source;
node.seek = 0.0;
node.out = filler_media.out;
node.duration = filler_media.duration;
node.cmd = Some(loop_filler(&node));
node.probe = filler_media.probe;
} else {
match MediaProbe::new(&config.storage.filler.to_string_lossy()) {
Ok(probe) => {
if config
.storage
.filler
.to_string_lossy()
.to_string()
.rsplit_once('.')
.map(|(_, e)| e.to_lowercase())
.filter(|c| IMAGE_FORMAT.contains(&c.as_str()))
.is_some()
{
node.source = config.storage.filler.clone().to_string_lossy().to_string();
node.cmd = Some(loop_image(&node));
node.probe = Some(probe);
} else if let Some(filler_duration) = probe
.clone()
.format
.duration
.and_then(|d| d.parse::<f64>().ok())
{
// Create placeholder from config filler.
let mut filler_out = filler_duration;
if filler_duration > duration {
filler_out = duration;
}
node.source = config.storage.filler.clone().to_string_lossy().to_string();
node.seek = 0.0;
node.out = filler_out;
node.duration = filler_duration;
node.cmd = Some(loop_filler(&node));
node.probe = Some(probe);
} else {
// Create colored placeholder.
let (source, cmd) = gen_dummy(config, duration);
node.source = source;
node.cmd = Some(cmd);
}
}
Err(e) => {
// Create colored placeholder.
error!("Filler error: {e}");
let mut dummy_duration = 60.0;
if dummy_duration > duration {
dummy_duration = duration;
}
let (source, cmd) = gen_dummy(config, dummy_duration);
node.seek = 0.0;
node.out = dummy_duration;
node.duration = dummy_duration;
node.source = source;
node.cmd = Some(cmd);
}
}
}
warn!(
"Generate filler with <yellow>{:.2}</> seconds length!",
node.out
);
}
node.add_filter(config, &playout_stat.chain);
trace!(
"return gen_source: {}, seek: {}, out: {}",
node.source,
node.seek,
node.out,
);
node
}
/// Handle init clip, but this clip can be the last one in playlist,
/// this we have to figure out and calculate the right length.
fn handle_list_init(
config: &PlayoutConfig,
mut node: Media,
playout_stat: &PlayoutStatus,
player_control: &PlayerControl,
last_index: usize,
) -> Media {
debug!("Playlist init");
let (_, total_delta) = get_delta(config, &node.begin.unwrap());
if !config.playlist.infinit && node.out - node.seek > total_delta {
node.out = total_delta + node.seek;
}
gen_source(config, node, playout_stat, player_control, last_index)
}
/// when we come to last clip in playlist,
/// or when we reached total playtime,
/// we end up here
fn handle_list_end(
config: &PlayoutConfig,
mut node: Media,
total_delta: f64,
playout_stat: &PlayoutStatus,
player_control: &PlayerControl,
last_index: usize,
) -> Media {
debug!("Last clip from day");
let mut out = if node.seek > 0.0 {
node.seek + total_delta
} else {
if node.duration > total_delta {
warn!("Adjust clip duration to: <yellow>{total_delta:.2}</>");
}
total_delta
};
// out can't be longer then duration
if out > node.duration {
out = node.duration
}
if node.duration > total_delta && total_delta > 1.0 && node.duration - node.seek >= total_delta
{
node.out = out;
} else {
warn!("Playlist is not long enough: <yellow>{total_delta:.2}</> seconds needed");
}
node.process = Some(true);
gen_source(config, node, playout_stat, player_control, last_index)
}

View File

@ -1 +1,5 @@
pub mod controller; pub mod controller;
pub mod filter;
pub mod input;
pub mod output;
pub mod utils;

View File

@ -0,0 +1,88 @@
use std::process::{self, Command, Stdio};
use simplelog::*;
use ffplayout_lib::{filter::v_drawtext, utils::PlayoutConfig, vec_strings};
/// Desktop Output
///
/// Instead of streaming, we run a ffplay instance and play on desktop.
pub fn output(config: &PlayoutConfig, log_format: &str) -> process::Child {
let mut enc_filter: Vec<String> = vec![];
let mut enc_cmd = vec_strings!["-hide_banner", "-nostats", "-v", log_format];
if let Some(encoder_input_cmd) = config
.advanced
.as_ref()
.and_then(|a| a.encoder.input_cmd.clone())
{
enc_cmd.append(&mut encoder_input_cmd.clone());
}
enc_cmd.append(&mut vec_strings![
"-autoexit",
"-i",
"pipe:0",
"-window_title",
"ffplayout"
]);
if let Some(mut cmd) = config.out.output_cmd.clone() {
if !cmd.iter().any(|i| {
[
"-c",
"-c:v",
"-c:v:0",
"-b:v",
"-b:v:0",
"-vcodec",
"-c:a",
"-acodec",
"-crf",
"-map",
"-filter_complex",
]
.contains(&i.as_str())
}) {
enc_cmd.append(&mut cmd);
} else {
warn!("ffplay doesn't support given output parameters, they will be skipped!");
}
}
if config.text.add_text && !config.text.text_from_filename && !config.processing.audio_only {
if let Some(socket) = config.text.zmq_stream_socket.clone() {
debug!(
"Using drawtext filter, listening on address: <yellow>{}</>",
socket
);
let mut filter: String = "null,".to_string();
filter.push_str(v_drawtext::filter_node(config, None, &None).as_str());
enc_filter = vec!["-vf".to_string(), filter];
}
}
enc_cmd.append(&mut enc_filter);
debug!(
"Encoder CMD: <bright-blue>\"ffplay {}\"</>",
enc_cmd.join(" ")
);
let enc_proc = match Command::new("ffplay")
.args(enc_cmd)
.stdin(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
{
Err(e) => {
error!("couldn't spawn encoder process: {e}");
panic!("couldn't spawn encoder process: {e}")
}
Ok(proc) => proc,
};
enc_proc
}

View File

@ -0,0 +1,276 @@
/*
This module write the files compression directly to a hls (m3u8) playlist,
without pre- and post-processing.
Example config:
out:
output_param: >-
...
-flags +cgop
-f hls
-hls_time 6
-hls_list_size 600
-hls_flags append_list+delete_segments+omit_endlist+program_date_time
-hls_segment_filename /var/www/html/live/stream-%d.ts /var/www/html/live/stream.m3u8
*/
use std::{
io::{BufRead, BufReader, Error},
process::{exit, Command, Stdio},
sync::atomic::Ordering,
thread::{self, sleep},
time::Duration,
};
use simplelog::*;
use crate::player::{
controller::{PlayerControl, PlayoutStatus, ProcessControl, ProcessUnit::*},
input::source_generator,
utils::{
get_delta, prepare_output_cmd, sec_to_time, stderr_reader, test_tcp_port, valid_stream,
Media,
},
};
use crate::utils::{config::PlayoutConfig, logging::log_line, task_runner};
use crate::vec_strings;
/// Ingest Server for HLS
fn ingest_to_hls_server(
config: PlayoutConfig,
playout_stat: PlayoutStatus,
proc_control: ProcessControl,
) -> Result<(), Error> {
let playlist_init = playout_stat.list_init;
let mut server_prefix = vec_strings!["-hide_banner", "-nostats", "-v", "level+info"];
let stream_input = config.ingest.input_cmd.clone().unwrap();
let mut dummy_media = Media::new(0, "Live Stream", false);
dummy_media.unit = Ingest;
if let Some(ingest_input_cmd) = config
.advanced
.as_ref()
.and_then(|a| a.ingest.input_cmd.clone())
{
server_prefix.append(&mut ingest_input_cmd.clone());
}
server_prefix.append(&mut stream_input.clone());
let mut is_running;
if let Some(url) = stream_input.iter().find(|s| s.contains("://")) {
if !test_tcp_port(url) {
proc_control.stop_all();
exit(1);
}
info!("Start ingest server, listening on: <b><magenta>{url}</></b>");
};
loop {
dummy_media.add_filter(&config, &playout_stat.chain);
let server_cmd = prepare_output_cmd(&config, server_prefix.clone(), &dummy_media.filter);
debug!(
"Server CMD: <bright-blue>\"ffmpeg {}\"</>",
server_cmd.join(" ")
);
let proc_ctl = proc_control.clone();
let mut server_proc = match Command::new("ffmpeg")
.args(server_cmd.clone())
.stderr(Stdio::piped())
.spawn()
{
Err(e) => {
error!("couldn't spawn ingest server: {e}");
panic!("couldn't spawn ingest server: {e}");
}
Ok(proc) => proc,
};
let server_err = BufReader::new(server_proc.stderr.take().unwrap());
*proc_control.server_term.lock().unwrap() = Some(server_proc);
is_running = false;
for line in server_err.lines() {
let line = line?;
if line.contains("rtmp") && line.contains("Unexpected stream") && !valid_stream(&line) {
if let Err(e) = proc_ctl.stop(Ingest) {
error!("{e}");
};
}
if !is_running {
proc_control.server_is_running.store(true, Ordering::SeqCst);
playlist_init.store(true, Ordering::SeqCst);
is_running = true;
info!("Switch from {} to live ingest", config.processing.mode);
if let Err(e) = proc_control.stop(Decoder) {
error!("{e}");
}
}
log_line(&line, &config.logging.ffmpeg_level);
}
if proc_control.server_is_running.load(Ordering::SeqCst) {
info!("Switch from live ingest to {}", config.processing.mode);
}
proc_control
.server_is_running
.store(false, Ordering::SeqCst);
if let Err(e) = proc_control.wait(Ingest) {
error!("{e}")
}
if proc_control.is_terminated.load(Ordering::SeqCst) {
break;
}
}
Ok(())
}
/// HLS Writer
///
/// Write with single ffmpeg instance directly to a HLS playlist.
pub fn write_hls(
config: &PlayoutConfig,
player_control: PlayerControl,
playout_stat: PlayoutStatus,
proc_control: ProcessControl,
) {
let config_clone = config.clone();
let ff_log_format = format!("level+{}", config.logging.ffmpeg_level.to_lowercase());
let play_stat = playout_stat.clone();
let play_stat2 = playout_stat.clone();
let proc_control_c = proc_control.clone();
let get_source = source_generator(
config.clone(),
&player_control,
playout_stat,
proc_control.is_terminated.clone(),
);
// spawn a thread for ffmpeg ingest server and create a channel for package sending
if config.ingest.enable {
thread::spawn(move || ingest_to_hls_server(config_clone, play_stat, proc_control_c));
}
for node in get_source {
*player_control.current_media.lock().unwrap() = Some(node.clone());
let ignore = config.logging.ignore_lines.clone();
let mut cmd = match &node.cmd {
Some(cmd) => cmd.clone(),
None => break,
};
if !node.process.unwrap() {
continue;
}
info!(
"Play for <yellow>{}</>: <b><magenta>{}</></b>",
sec_to_time(node.out - node.seek),
node.source
);
if config.task.enable {
if config.task.path.is_file() {
let task_config = config.clone();
let task_node = node.clone();
let server_running = proc_control.server_is_running.load(Ordering::SeqCst);
let stat = play_stat2.clone();
thread::spawn(move || {
task_runner::run(task_config, task_node, stat, server_running)
});
} else {
error!(
"<bright-blue>{:?}</> executable not exists!",
config.task.path
);
}
}
let mut enc_prefix = vec_strings!["-hide_banner", "-nostats", "-v", &ff_log_format];
if let Some(encoder_input_cmd) = config
.advanced
.as_ref()
.and_then(|a| a.encoder.input_cmd.clone())
{
enc_prefix.append(&mut encoder_input_cmd.clone());
}
let mut read_rate = 1.0;
if let Some(begin) = &node.begin {
let (delta, _) = get_delta(config, begin);
let duration = node.out - node.seek;
let speed = duration / (duration + delta);
if node.seek == 0.0
&& speed > 0.0
&& speed < 1.3
&& delta < config.general.stop_threshold
{
read_rate = speed;
}
}
enc_prefix.append(&mut vec_strings!["-readrate", read_rate]);
enc_prefix.append(&mut cmd);
let enc_cmd = prepare_output_cmd(config, enc_prefix, &node.filter);
debug!(
"HLS writer CMD: <bright-blue>\"ffmpeg {}\"</>",
enc_cmd.join(" ")
);
let mut dec_proc = match Command::new("ffmpeg")
.args(enc_cmd)
.stderr(Stdio::piped())
.spawn()
{
Ok(proc) => proc,
Err(e) => {
error!("couldn't spawn ffmpeg process: {e}");
panic!("couldn't spawn ffmpeg process: {e}")
}
};
let enc_err = BufReader::new(dec_proc.stderr.take().unwrap());
*proc_control.decoder_term.lock().unwrap() = Some(dec_proc);
if let Err(e) = stderr_reader(enc_err, ignore, Decoder, proc_control.clone()) {
error!("{e:?}")
};
if let Err(e) = proc_control.wait(Decoder) {
error!("{e}");
}
while proc_control.server_is_running.load(Ordering::SeqCst) {
sleep(Duration::from_secs(1));
}
}
sleep(Duration::from_secs(1));
proc_control.stop_all();
}

View File

@ -0,0 +1,263 @@
use std::{
io::{prelude::*, BufReader, BufWriter, Read},
process::{Command, Stdio},
sync::atomic::Ordering,
thread::{self, sleep},
time::Duration,
};
use crossbeam_channel::bounded;
use simplelog::*;
mod desktop;
mod hls;
mod null;
mod stream;
pub use hls::write_hls;
use crate::player::input::{ingest_server, source_generator};
use crate::utils::{config::PlayoutConfig, task_runner};
use ffplayout_lib::utils::{
sec_to_time, stderr_reader, OutputMode::*, PlayerControl, PlayoutStatus, ProcessControl,
ProcessUnit::*,
};
use ffplayout_lib::vec_strings;
/// Player
///
/// Here we create the input file loop, from playlist, or folder source.
/// Then we read the stdout from the reader ffmpeg instance
/// and write it to the stdin from the streamer ffmpeg instance.
/// If it is configured we also fire up a ffmpeg ingest server instance,
/// for getting live feeds.
/// When a live ingest arrive, it stops the current playing and switch to the live source.
/// When ingest stops, it switch back to playlist/folder mode.
pub fn player(
config: &PlayoutConfig,
play_control: &PlayerControl,
playout_stat: PlayoutStatus,
proc_control: ProcessControl,
) {
let config_clone = config.clone();
let ff_log_format = format!("level+{}", config.logging.ffmpeg_level.to_lowercase());
let ignore_enc = config.logging.ignore_lines.clone();
let mut buffer = [0; 65088];
let mut live_on = false;
let playlist_init = playout_stat.list_init.clone();
let play_stat = playout_stat.clone();
// get source iterator
let node_sources = source_generator(
config.clone(),
play_control,
playout_stat,
proc_control.is_terminated.clone(),
);
// get ffmpeg output instance
let mut enc_proc = match config.out.mode {
Desktop => desktop::output(config, &ff_log_format),
Null => null::output(config, &ff_log_format),
Stream => stream::output(config, &ff_log_format),
_ => panic!("Output mode doesn't exists!"),
};
let mut enc_writer = BufWriter::new(enc_proc.stdin.take().unwrap());
let enc_err = BufReader::new(enc_proc.stderr.take().unwrap());
*proc_control.encoder_term.lock().unwrap() = Some(enc_proc);
let enc_p_ctl = proc_control.clone();
// spawn a thread to log ffmpeg output error messages
let error_encoder_thread =
thread::spawn(move || stderr_reader(enc_err, ignore_enc, Encoder, enc_p_ctl));
let proc_control_c = proc_control.clone();
let mut ingest_receiver = None;
// spawn a thread for ffmpeg ingest server and create a channel for package sending
if config.ingest.enable {
let (ingest_sender, rx) = bounded(96);
ingest_receiver = Some(rx);
thread::spawn(move || ingest_server(config_clone, ingest_sender, proc_control_c));
}
'source_iter: for node in node_sources {
*play_control.current_media.lock().unwrap() = Some(node.clone());
let ignore_dec = config.logging.ignore_lines.clone();
if proc_control.is_terminated.load(Ordering::SeqCst) {
debug!("Playout is terminated, break out from source loop");
break;
}
trace!("Decoder CMD: {:?}", node.cmd);
let mut cmd = match &node.cmd {
Some(cmd) => cmd.clone(),
None => break,
};
if !node.process.unwrap() {
// process true/false differs from node.cmd = None in that way,
// that source is valid but to show for playing,
// so better skip it and jump to the next one.
continue;
}
let c_index = if cfg!(debug_assertions) {
format!(
" ({}/{})",
node.index.unwrap() + 1,
play_control.current_list.lock().unwrap().len()
)
} else {
String::new()
};
info!(
"Play for <yellow>{}</>{c_index}: <b><magenta>{} {}</></b>",
sec_to_time(node.out - node.seek),
node.source,
node.audio
);
if config.task.enable {
if config.task.path.is_file() {
let task_config = config.clone();
let task_node = node.clone();
let server_running = proc_control.server_is_running.load(Ordering::SeqCst);
let stat = play_stat.clone();
thread::spawn(move || {
task_runner::run(task_config, task_node, stat, server_running)
});
} else {
error!(
"<bright-blue>{:?}</> executable not exists!",
config.task.path
);
}
}
let mut dec_cmd = vec_strings!["-hide_banner", "-nostats", "-v", &ff_log_format];
if let Some(decoder_input_cmd) = config
.advanced
.as_ref()
.and_then(|a| a.decoder.input_cmd.clone())
{
dec_cmd.append(&mut decoder_input_cmd.clone());
}
dec_cmd.append(&mut cmd);
if let Some(mut filter) = node.filter {
dec_cmd.append(&mut filter.cmd());
dec_cmd.append(&mut filter.map());
}
if let Some(mut cmd) = config.processing.cmd.clone() {
dec_cmd.append(&mut cmd);
}
debug!(
"Decoder CMD: <bright-blue>\"ffmpeg {}\"</>",
dec_cmd.join(" ")
);
// create ffmpeg decoder instance, for reading the input files
let mut dec_proc = match Command::new("ffmpeg")
.args(dec_cmd)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
{
Ok(proc) => proc,
Err(e) => {
error!("couldn't spawn decoder process: {e}");
panic!("couldn't spawn decoder process: {e}")
}
};
let mut dec_reader = BufReader::new(dec_proc.stdout.take().unwrap());
let dec_err = BufReader::new(dec_proc.stderr.take().unwrap());
*proc_control.decoder_term.lock().unwrap() = Some(dec_proc);
let dec_p_ctl = proc_control.clone();
let error_decoder_thread =
thread::spawn(move || stderr_reader(dec_err, ignore_dec, Decoder, dec_p_ctl));
loop {
// when server is running, read from it
if proc_control.server_is_running.load(Ordering::SeqCst) {
if !live_on {
info!("Switch from {} to live ingest", config.processing.mode);
if let Err(e) = proc_control.stop(Decoder) {
error!("{e}")
}
live_on = true;
playlist_init.store(true, Ordering::SeqCst);
}
for rx in ingest_receiver.as_ref().unwrap().try_iter() {
if let Err(e) = enc_writer.write(&rx.1[..rx.0]) {
error!("Error from Ingest: {:?}", e);
break 'source_iter;
};
}
// read from decoder instance
} else {
if live_on {
info!("Switch from live ingest to {}", config.processing.mode);
live_on = false;
break;
}
let dec_bytes_len = match dec_reader.read(&mut buffer[..]) {
Ok(length) => length,
Err(e) => {
error!("Reading error from decoder: {e:?}");
break 'source_iter;
}
};
if dec_bytes_len > 0 {
if let Err(e) = enc_writer.write(&buffer[..dec_bytes_len]) {
error!("Encoder write error: {}", e.kind());
break 'source_iter;
};
} else {
break;
}
}
}
if let Err(e) = proc_control.wait(Decoder) {
error!("{e}")
}
if let Err(e) = error_decoder_thread.join() {
error!("{e:?}");
};
}
trace!("Out of source loop");
sleep(Duration::from_secs(1));
proc_control.stop_all();
if let Err(e) = error_encoder_thread.join() {
error!("{e:?}");
};
}

View File

@ -0,0 +1,52 @@
use std::process::{self, Command, Stdio};
use simplelog::*;
use crate::player::utils::prepare_output_cmd;
use ffplayout_lib::{
utils::{Media, PlayoutConfig, ProcessUnit::*},
vec_strings,
};
/// Desktop Output
///
/// Instead of streaming, we run a ffplay instance and play on desktop.
pub fn output(config: &PlayoutConfig, log_format: &str) -> process::Child {
let mut media = Media::new(0, "", false);
media.unit = Encoder;
media.add_filter(config, &None);
let mut enc_prefix = vec_strings!["-hide_banner", "-nostats", "-v", log_format];
if let Some(input_cmd) = config
.advanced
.as_ref()
.and_then(|a| a.encoder.input_cmd.clone())
{
enc_prefix.append(&mut input_cmd.clone());
}
enc_prefix.append(&mut vec_strings!["-re", "-i", "pipe:0"]);
let enc_cmd = prepare_output_cmd(config, enc_prefix, &media.filter);
debug!(
"Encoder CMD: <bright-blue>\"ffmpeg {}\"</>",
enc_cmd.join(" ")
);
let enc_proc = match Command::new("ffmpeg")
.args(enc_cmd)
.stdin(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
{
Err(e) => {
error!("couldn't spawn encoder process: {e}");
panic!("couldn't spawn encoder process: {e}")
}
Ok(proc) => proc,
};
enc_proc
}

View File

@ -0,0 +1,52 @@
use std::process::{self, Command, Stdio};
use simplelog::*;
use crate::player::utils::prepare_output_cmd;
use ffplayout_lib::{
utils::{Media, PlayoutConfig, ProcessUnit::*},
vec_strings,
};
/// Streaming Output
///
/// Prepare the ffmpeg command for streaming output
pub fn output(config: &PlayoutConfig, log_format: &str) -> process::Child {
let mut media = Media::new(0, "", false);
media.unit = Encoder;
media.add_filter(config, &None);
let mut enc_prefix = vec_strings!["-hide_banner", "-nostats", "-v", log_format];
if let Some(input_cmd) = config
.advanced
.as_ref()
.and_then(|a| a.encoder.input_cmd.clone())
{
enc_prefix.append(&mut input_cmd.clone());
}
enc_prefix.append(&mut vec_strings!["-re", "-i", "pipe:0"]);
let enc_cmd = prepare_output_cmd(config, enc_prefix, &media.filter);
debug!(
"Encoder CMD: <bright-blue>\"ffmpeg {}\"</>",
enc_cmd.join(" ")
);
let enc_proc = match Command::new("ffmpeg")
.args(enc_cmd)
.stdin(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
{
Err(e) => {
error!("couldn't spawn encoder process: {e}");
panic!("couldn't spawn encoder process: {e}")
}
Ok(proc) => proc,
};
enc_proc
}

View File

@ -0,0 +1,205 @@
use serde::{Deserialize, Serialize};
use std::{
fs::File,
path::Path,
sync::{atomic::AtomicBool, Arc},
thread,
};
use simplelog::*;
use crate::utils::{
get_date, is_remote, modified_time, time_from_header, validate_playlist, Media, PlayerControl,
PlayoutConfig, DUMMY_LEN,
};
/// This is our main playlist object, it holds all necessary information for the current day.
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct JsonPlaylist {
#[serde(default = "default_channel")]
pub channel: String,
pub date: String,
#[serde(skip_serializing, skip_deserializing)]
pub start_sec: Option<f64>,
#[serde(skip_serializing, skip_deserializing)]
pub length: Option<f64>,
#[serde(skip_serializing, skip_deserializing)]
pub path: Option<String>,
#[serde(skip_serializing, skip_deserializing)]
pub modified: Option<String>,
pub program: Vec<Media>,
}
impl JsonPlaylist {
pub fn new(date: String, start: f64) -> Self {
let mut media = Media::new(0, "", false);
media.begin = Some(start);
media.title = None;
media.duration = DUMMY_LEN;
media.out = DUMMY_LEN;
Self {
channel: "Channel 1".into(),
date,
start_sec: Some(start),
length: Some(86400.0),
path: None,
modified: None,
program: vec![media],
}
}
}
impl PartialEq for JsonPlaylist {
fn eq(&self, other: &Self) -> bool {
self.channel == other.channel && self.date == other.date && self.program == other.program
}
}
impl Eq for JsonPlaylist {}
fn default_channel() -> String {
"Channel 1".to_string()
}
pub fn set_defaults(playlist: &mut JsonPlaylist) {
let mut start_sec = playlist.start_sec.unwrap();
let mut length = 0.0;
// Add extra values to every media clip
for (i, item) in playlist.program.iter_mut().enumerate() {
item.begin = Some(start_sec);
item.index = Some(i);
item.last_ad = false;
item.next_ad = false;
item.process = Some(true);
item.filter = None;
let dur = item.out - item.seek;
start_sec += dur;
length += dur;
}
playlist.length = Some(length)
}
/// Read json playlist file, fills JsonPlaylist struct and set some extra values,
/// which we need to process.
pub fn read_json(
config: &mut PlayoutConfig,
player_control: &PlayerControl,
path: Option<String>,
is_terminated: Arc<AtomicBool>,
seek: bool,
get_next: bool,
) -> JsonPlaylist {
let config_clone = config.clone();
let control_clone = player_control.clone();
let mut playlist_path = config.playlist.path.clone();
let start_sec = config.playlist.start_sec.unwrap();
let date = get_date(seek, start_sec, get_next);
if playlist_path.is_dir() || is_remote(&config.playlist.path.to_string_lossy()) {
let d: Vec<&str> = date.split('-').collect();
playlist_path = playlist_path
.join(d[0])
.join(d[1])
.join(date.clone())
.with_extension("json");
}
let mut current_file = playlist_path.as_path().display().to_string();
if let Some(p) = path {
Path::new(&p).clone_into(&mut playlist_path);
current_file = p
}
if is_remote(&current_file) {
let response = reqwest::blocking::Client::new().get(&current_file).send();
if let Ok(resp) = response {
if resp.status().is_success() {
let headers = resp.headers().clone();
if let Ok(body) = resp.text() {
let mut playlist: JsonPlaylist = match serde_json::from_str(&body) {
Ok(p) => p,
Err(e) => {
error!("Could't read remote json playlist. {e:?}");
JsonPlaylist::new(date.clone(), start_sec)
}
};
playlist.path = Some(current_file);
playlist.start_sec = Some(start_sec);
if let Some(time) = time_from_header(&headers) {
playlist.modified = Some(time.to_string());
}
let list_clone = playlist.clone();
if !config.general.skip_validation {
thread::spawn(move || {
validate_playlist(
config_clone,
control_clone,
list_clone,
is_terminated,
)
});
}
set_defaults(&mut playlist);
return playlist;
}
}
}
} else if playlist_path.is_file() {
let modified = modified_time(&current_file);
let f = File::options()
.read(true)
.write(false)
.open(&current_file)
.expect("Could not open json playlist file.");
let mut playlist: JsonPlaylist = match serde_json::from_reader(f) {
Ok(p) => p,
Err(e) => {
error!("Playlist file not readable! {e}");
JsonPlaylist::new(date.clone(), start_sec)
}
};
// catch empty program list
if playlist.program.is_empty() {
playlist = JsonPlaylist::new(date, start_sec)
}
playlist.path = Some(current_file);
playlist.start_sec = Some(start_sec);
playlist.modified = modified;
let list_clone = playlist.clone();
if !config.general.skip_validation {
thread::spawn(move || {
validate_playlist(config_clone, control_clone, list_clone, is_terminated)
});
}
set_defaults(&mut playlist);
return playlist;
}
error!("Playlist <b><magenta>{current_file}</></b> not exist!");
JsonPlaylist::new(date, start_sec)
}

File diff suppressed because it is too large Load Diff

View File

@ -2,6 +2,7 @@ use std::io;
use actix_web::{error::ResponseError, Error, HttpResponse}; use actix_web::{error::ResponseError, Error, HttpResponse};
use derive_more::Display; use derive_more::Display;
use ffprobe::FfProbeError;
#[derive(Debug, Display)] #[derive(Debug, Display)]
pub enum ServiceError { pub enum ServiceError {
@ -113,6 +114,8 @@ pub enum ProcessError {
#[display(fmt = "IO error: {}", _0)] #[display(fmt = "IO error: {}", _0)]
IO(io::Error), IO(io::Error),
#[display(fmt = "{}", _0)] #[display(fmt = "{}", _0)]
Ffprobe(FfProbeError),
#[display(fmt = "{}", _0)]
Custom(String), Custom(String),
} }
@ -122,6 +125,12 @@ impl From<std::io::Error> for ProcessError {
} }
} }
impl From<FfProbeError> for ProcessError {
fn from(err: FfProbeError) -> Self {
Self::Ffprobe(err)
}
}
impl From<lettre::address::AddressError> for ProcessError { impl From<lettre::address::AddressError> for ProcessError {
fn from(err: lettre::address::AddressError) -> ProcessError { fn from(err: lettre::address::AddressError) -> ProcessError {
ProcessError::Custom(err.to_string()) ProcessError::Custom(err.to_string())
@ -139,3 +148,9 @@ impl From<lettre::error::Error> for ProcessError {
ProcessError::Custom(err.to_string()) ProcessError::Custom(err.to_string())
} }
} }
impl<T> From<std::sync::PoisonError<T>> for ProcessError {
fn from(err: std::sync::PoisonError<T>) -> ProcessError {
ProcessError::Custom(err.to_string())
}
}

View File

@ -407,3 +407,24 @@ pub fn init_logging(mail_queues: Arc<Mutex<Vec<Arc<Mutex<MailQueue>>>>>) -> io::
Ok(()) Ok(())
} }
/// Format ingest and HLS logging output
pub fn log_line(line: &str, level: &str) {
if line.contains("[info]") && level.to_lowercase() == "info" {
info!("<bright black>[Server]</> {}", line.replace("[info] ", ""))
} else if line.contains("[warning]")
&& (level.to_lowercase() == "warning" || level.to_lowercase() == "info")
{
warn!(
"<bright black>[Server]</> {}",
line.replace("[warning] ", "")
)
} else if line.contains("[error]")
&& !line.contains("Input/output error")
&& !line.contains("Broken pipe")
{
error!("<bright black>[Server]</> {}", line.replace("[error] ", ""));
} else if line.contains("[fatal]") {
error!("<bright black>[Server]</> {}", line.replace("[fatal] ", ""))
}
}

View File

@ -31,6 +31,7 @@ pub mod files;
pub mod logging; pub mod logging;
pub mod playlist; pub mod playlist;
pub mod system; pub mod system;
pub mod task_runner;
use crate::db::{ use crate::db::{
db_pool, db_pool,

View File

@ -0,0 +1,26 @@
use std::process::Command;
use log::*;
use crate::player::utils::get_data_map;
use ffplayout_lib::utils::{config::PlayoutConfig, Media, PlayoutStatus};
pub fn run(config: PlayoutConfig, node: Media, playout_stat: PlayoutStatus, server_running: bool) {
let obj =
serde_json::to_string(&get_data_map(&config, node, &playout_stat, server_running)).unwrap();
trace!("Run task: {obj}");
match Command::new(config.task.path).arg(obj).spawn() {
Ok(mut c) => {
let status = c.wait().expect("Error in waiting for the task process!");
if !status.success() {
error!("Process stops with error.");
}
}
Err(e) => {
error!("Couldn't spawn task runner: {e}")
}
}
}