fix imports

This commit is contained in:
jb-alvarado 2024-06-07 09:31:58 +02:00
parent 3671daf7c7
commit 687bb2e1b9
30 changed files with 1191 additions and 440 deletions

1
Cargo.lock generated
View File

@ -1355,6 +1355,7 @@ dependencies = [
"tokio-stream",
"toml_edit",
"uuid",
"walkdir",
]
[[package]]

View File

@ -63,6 +63,7 @@ tokio = { version = "1.29", features = ["full"] }
tokio-stream = "0.1"
toml_edit = {version ="0.22", features = ["serde"]}
uuid = "1.8"
walkdir = "2"
[target.'cfg(not(target_arch = "windows"))'.dependencies]
signal-child = "1"

View File

@ -38,9 +38,13 @@ use crate::db::{
handles,
models::{Channel, LoginUser, TextPreset, User},
};
use crate::player::utils::{
get_date_range, import::import_file, sec_to_time, time_to_sec, JsonPlaylist,
};
use crate::utils::{
channels::{create_channel, delete_channel},
control::{control_service, control_state, media_info, send_message, ControlParams, Process},
config::{PlayoutConfig, Template},
control::{control_state, media_info, send_message, ControlParams, Process},
errors::ServiceError,
files::{
browser, create_directory, norm_abs_path, remove_file_or_folder, rename_file, upload,
@ -50,17 +54,11 @@ use crate::utils::{
playlist::{delete_playlist, generate_playlist, read_playlist, write_playlist},
playout_config, public_path, read_log_file, read_playout_config, system, Role,
};
use crate::vec_strings;
use crate::{
api::auth::{create_jwt, Claims},
utils::control::ProcessControl,
};
use ffplayout_lib::{
utils::{
get_date_range, import::import_file, sec_to_time, time_to_sec, JsonPlaylist, PlayoutConfig,
Template,
},
vec_strings,
};
#[derive(Serialize)]
struct UserObj<T> {
@ -766,19 +764,12 @@ pub async fn media_last(
pub async fn process_control(
pool: web::Data<Pool<Sqlite>>,
id: web::Path<i32>,
proc: web::Json<Process>,
engine_process: web::Data<ProcessControl>,
_proc: web::Json<Process>,
_engine_process: web::Data<ProcessControl>,
) -> Result<impl Responder, ServiceError> {
let (config, _) = playout_config(&pool.clone().into_inner(), &id).await?;
let (_config, _) = playout_config(&pool.clone().into_inner(), &id).await?;
control_service(
&pool.into_inner(),
&config,
*id,
&proc.command,
Some(engine_process),
)
.await
Ok(web::Json("no implemented"))
}
/// #### ffplayout Playlist Operations
@ -901,11 +892,10 @@ pub async fn del_playlist(
#[get("/log/{id}")]
#[protect(any("Role::Admin", "Role::User"), ty = "Role")]
pub async fn get_log(
pool: web::Data<Pool<Sqlite>>,
id: web::Path<i32>,
log: web::Query<DateObj>,
) -> Result<impl Responder, ServiceError> {
read_log_file(&pool.into_inner(), &id, &log.date).await
read_log_file(&id, &log.date).await
}
/// ### File Operations

View File

@ -2,7 +2,7 @@ use std::{
fmt,
process::Child,
sync::{
atomic::{AtomicBool, Ordering},
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc, Mutex,
},
};
@ -14,8 +14,14 @@ use log::*;
use serde::{Deserialize, Serialize};
use crate::db::models::Channel;
use crate::player::output::{player, write_hls};
use crate::utils::{config::PlayoutConfig, errors::ProcessError};
use crate::player::{
output::{player, write_hls},
utils::Media,
};
use crate::utils::{
config::{OutputMode::*, PlayoutConfig},
errors::ProcessError,
};
/// Defined process units.
#[derive(Clone, Debug, Default, Copy, Eq, Serialize, Deserialize, PartialEq)]
@ -163,6 +169,34 @@ impl ChannelManager {
}
}
/// Global player control, to get infos about current clip etc.
#[derive(Clone, Debug)]
pub struct PlayerControl {
pub current_media: Arc<Mutex<Option<Media>>>,
pub current_list: Arc<Mutex<Vec<Media>>>,
pub filler_list: Arc<Mutex<Vec<Media>>>,
pub current_index: Arc<AtomicUsize>,
pub filler_index: Arc<AtomicUsize>,
}
impl PlayerControl {
pub fn new() -> Self {
Self {
current_media: Arc::new(Mutex::new(None)),
current_list: Arc::new(Mutex::new(vec![Media::new(0, "", false)])),
filler_list: Arc::new(Mutex::new(vec![])),
current_index: Arc::new(AtomicUsize::new(0)),
filler_index: Arc::new(AtomicUsize::new(0)),
}
}
}
impl Default for PlayerControl {
fn default() -> Self {
Self::new()
}
}
/// Global playout control, for move forward/backward clip, or resetting playlist/state.
#[derive(Clone, Debug)]
pub struct PlayoutStatus {
@ -213,15 +247,15 @@ impl ChannelController {
}
}
pub fn start(controller: Arc<Mutex<ChannelManager>>) -> Result<(), ProcessError> {
let config = controller.lock()?.config.lock()?.clone();
pub fn start(channel: Arc<Mutex<ChannelManager>>) -> Result<(), ProcessError> {
let mode = channel.lock()?.config.lock()?.out.mode.clone();
let play_control = PlayerControl::new();
let play_status = PlayoutStatus::new();
match config.out.mode {
match mode {
// write files/playlist to HLS m3u8 playlist
HLS => write_hls(&config, play_control, playout_stat, proc_control),
HLS => write_hls(channel, play_control, play_status),
// play on desktop or stream to a remote target
_ => player(&config, &play_control, playout_stat, proc_control),
};
Ok(())
_ => player(channel, &play_control, play_status),
}
}

View File

@ -1,5 +1,5 @@
use log::*;
use regex::Regex;
use simplelog::*;
/// Apply custom filters
pub fn filter_node(filter: &str) -> (String, String) {

View File

@ -15,7 +15,6 @@ use crate::player::{
utils::{custom_format, fps_calc, is_close, Media},
};
use crate::utils::config::{OutputMode::*, PlayoutConfig};
use crate::vec_strings;
#[derive(Clone, Debug, Copy, Eq, PartialEq)]

View File

@ -17,7 +17,8 @@ use notify::{
use notify_debouncer_full::new_debouncer;
use simplelog::*;
use ffplayout_lib::utils::{include_file_extension, Media, PlayoutConfig};
use crate::player::utils::{include_file_extension, Media};
use crate::utils::config::PlayoutConfig;
/// Create a watcher, which monitor file changes.
/// When a change is register, update the current file list.

View File

@ -1,29 +1,32 @@
use std::{
io::{BufRead, BufReader, Error, Read},
io::{BufRead, BufReader, Read},
process::{exit, ChildStderr, Command, Stdio},
sync::atomic::Ordering,
sync::{atomic::Ordering, Arc, Mutex},
thread,
};
use crossbeam_channel::Sender;
use simplelog::*;
use crate::player::utils::valid_stream;
use crate::utils::logging::log_line;
use ffplayout_lib::{
utils::{
controller::ProcessUnit::*, test_tcp_port, Media, PlayoutConfig, ProcessControl,
FFMPEG_IGNORE_ERRORS, FFMPEG_UNRECOVERABLE_ERRORS,
use crate::utils::{
config::{PlayoutConfig, FFMPEG_IGNORE_ERRORS, FFMPEG_UNRECOVERABLE_ERRORS},
logging::log_line,
};
use crate::vec_strings;
use crate::{
player::{
controller::{ChannelManager, ProcessUnit::*},
utils::{test_tcp_port, valid_stream, Media},
},
vec_strings,
utils::errors::ProcessError,
};
fn server_monitor(
level: &str,
ignore: Vec<String>,
buffer: BufReader<ChildStderr>,
proc_ctl: ProcessControl,
) -> Result<(), Error> {
channel_mgr: Arc<Mutex<ChannelManager>>,
) -> Result<(), ProcessError> {
for line in buffer.lines() {
let line = line?;
@ -34,7 +37,7 @@ fn server_monitor(
}
if line.contains("rtmp") && line.contains("Unexpected stream") && !valid_stream(&line) {
if let Err(e) = proc_ctl.stop(Ingest) {
if let Err(e) = channel_mgr.lock()?.stop(Ingest) {
error!("{e}");
};
}
@ -43,7 +46,7 @@ fn server_monitor(
.iter()
.any(|i| line.contains(*i))
{
proc_ctl.stop_all();
channel_mgr.lock()?.stop_all();
}
}
@ -56,14 +59,16 @@ fn server_monitor(
pub fn ingest_server(
config: PlayoutConfig,
ingest_sender: Sender<(usize, [u8; 65088])>,
proc_control: ProcessControl,
) -> Result<(), Error> {
channel_mgr: Arc<Mutex<ChannelManager>>,
) -> Result<(), ProcessError> {
let mut buffer: [u8; 65088] = [0; 65088];
let mut server_cmd = vec_strings!["-hide_banner", "-nostats", "-v", "level+info"];
let stream_input = config.ingest.input_cmd.clone().unwrap();
let mut dummy_media = Media::new(0, "Live Stream", false);
dummy_media.unit = Ingest;
dummy_media.add_filter(&config, &None);
let is_terminated = channel_mgr.lock()?.is_terminated.clone();
let ingest_is_running = channel_mgr.lock()?.ingest_is_running.clone();
if let Some(ingest_input_cmd) = config
.advanced
@ -88,7 +93,7 @@ pub fn ingest_server(
if let Some(url) = stream_input.iter().find(|s| s.contains("://")) {
if !test_tcp_port(url) {
proc_control.stop_all();
channel_mgr.lock()?.stop_all();
exit(1);
}
@ -100,8 +105,8 @@ pub fn ingest_server(
server_cmd.join(" ")
);
while !proc_control.is_terminated.load(Ordering::SeqCst) {
let proc_ctl = proc_control.clone();
while !is_terminated.load(Ordering::SeqCst) {
let proc_ctl = channel_mgr.clone();
let level = config.logging.ingest_level.clone().unwrap();
let ignore = config.logging.ignore_lines.clone();
let mut server_proc = match Command::new("ffmpeg")
@ -121,7 +126,7 @@ pub fn ingest_server(
let error_reader_thread =
thread::spawn(move || server_monitor(&level, ignore, server_err, proc_ctl));
*proc_control.server_term.lock().unwrap() = Some(server_proc);
*channel_mgr.lock()?.ingest.lock().unwrap() = Some(server_proc);
is_running = false;
loop {
@ -134,7 +139,7 @@ pub fn ingest_server(
};
if !is_running {
proc_control.server_is_running.store(true, Ordering::SeqCst);
ingest_is_running.store(true, Ordering::SeqCst);
is_running = true;
}
@ -142,7 +147,10 @@ pub fn ingest_server(
if let Err(e) = ingest_sender.send((bytes_len, buffer)) {
error!("Ingest server write error: {e:?}");
proc_control.is_terminated.store(true, Ordering::SeqCst);
channel_mgr
.lock()?
.is_terminated
.store(true, Ordering::SeqCst);
break;
}
} else {
@ -151,11 +159,9 @@ pub fn ingest_server(
}
drop(ingest_reader);
proc_control
.server_is_running
.store(false, Ordering::SeqCst);
ingest_is_running.store(false, Ordering::SeqCst);
if let Err(e) = proc_control.wait(Ingest) {
if let Err(e) = channel_mgr.lock()?.wait(Ingest) {
error!("{e}")
}

View File

@ -13,9 +13,11 @@ pub use folder::watchman;
pub use ingest::ingest_server;
pub use playlist::CurrentProgram;
use crate::utils::config::PlayoutConfig;
use ffplayout_lib::utils::{controller::PlayerControl, folder::FolderSource};
use ffplayout_lib::utils::{Media, PlayoutStatus, ProcessMode::*};
use crate::player::{
controller::{PlayerControl, PlayoutStatus},
utils::{folder::FolderSource, Media},
};
use crate::utils::config::{PlayoutConfig, ProcessMode::*};
/// Create a source iterator from playlist, or from folder.
pub fn source_generator(

View File

@ -10,13 +10,16 @@ use std::{
use serde_json::json;
use simplelog::*;
use ffplayout_lib::utils::{
controller::PlayerControl,
gen_dummy, get_delta, is_close, is_remote,
json_serializer::{read_json, set_defaults},
loop_filler, loop_image, modified_time, seek_and_length, time_in_seconds, JsonPlaylist, Media,
MediaProbe, PlayoutConfig, PlayoutStatus, IMAGE_FORMAT,
use crate::player::{
controller::{PlayerControl, PlayoutStatus},
utils::{
gen_dummy, get_delta, is_close, is_remote,
json_serializer::{read_json, set_defaults},
loop_filler, loop_image, modified_time, seek_and_length, time_in_seconds, JsonPlaylist,
Media, MediaProbe,
},
};
use crate::utils::config::{PlayoutConfig, IMAGE_FORMAT};
/// Struct for current playlist.
///

View File

@ -1,8 +1,10 @@
use std::process::{self, Command, Stdio};
use simplelog::*;
use log::*;
use ffplayout_lib::{filter::v_drawtext, utils::PlayoutConfig, vec_strings};
use crate::player::filter::v_drawtext;
use crate::utils::config::PlayoutConfig;
use crate::vec_strings;
/// Desktop Output
///

View File

@ -18,32 +18,35 @@ out:
*/
use std::{
io::{BufRead, BufReader, Error},
io::{BufRead, BufReader},
process::{exit, Command, Stdio},
sync::atomic::Ordering,
sync::{atomic::Ordering, Arc, Mutex},
thread::{self, sleep},
time::Duration,
};
use simplelog::*;
use log::*;
use crate::player::{
controller::{PlayerControl, PlayoutStatus, ProcessControl, ProcessUnit::*},
input::source_generator,
utils::{
get_delta, prepare_output_cmd, sec_to_time, stderr_reader, test_tcp_port, valid_stream,
Media,
},
};
use crate::utils::{config::PlayoutConfig, logging::log_line, task_runner};
use crate::vec_strings;
use crate::{
player::{
controller::{ChannelManager, PlayerControl, PlayoutStatus, ProcessUnit::*},
input::source_generator,
utils::{
get_delta, prepare_output_cmd, sec_to_time, stderr_reader, test_tcp_port, valid_stream,
Media,
},
},
utils::errors::ProcessError,
};
/// Ingest Server for HLS
fn ingest_to_hls_server(
config: PlayoutConfig,
playout_stat: PlayoutStatus,
proc_control: ProcessControl,
) -> Result<(), Error> {
channel_mgr: Arc<Mutex<ChannelManager>>,
) -> Result<(), ProcessError> {
let playlist_init = playout_stat.list_init;
let mut server_prefix = vec_strings!["-hide_banner", "-nostats", "-v", "level+info"];
@ -51,6 +54,9 @@ fn ingest_to_hls_server(
let mut dummy_media = Media::new(0, "Live Stream", false);
dummy_media.unit = Ingest;
let is_terminated = channel_mgr.lock()?.is_terminated.clone();
let ingest_is_running = channel_mgr.lock()?.ingest_is_running.clone();
if let Some(ingest_input_cmd) = config
.advanced
.as_ref()
@ -65,7 +71,7 @@ fn ingest_to_hls_server(
if let Some(url) = stream_input.iter().find(|s| s.contains("://")) {
if !test_tcp_port(url) {
proc_control.stop_all();
channel_mgr.lock()?.stop_all();
exit(1);
}
@ -81,7 +87,7 @@ fn ingest_to_hls_server(
server_cmd.join(" ")
);
let proc_ctl = proc_control.clone();
let proc_ctl = channel_mgr.clone();
let mut server_proc = match Command::new("ffmpeg")
.args(server_cmd.clone())
.stderr(Stdio::piped())
@ -95,26 +101,26 @@ fn ingest_to_hls_server(
};
let server_err = BufReader::new(server_proc.stderr.take().unwrap());
*proc_control.server_term.lock().unwrap() = Some(server_proc);
*channel_mgr.lock()?.ingest.lock().unwrap() = Some(server_proc);
is_running = false;
for line in server_err.lines() {
let line = line?;
if line.contains("rtmp") && line.contains("Unexpected stream") && !valid_stream(&line) {
if let Err(e) = proc_ctl.stop(Ingest) {
if let Err(e) = proc_ctl.lock()?.stop(Ingest) {
error!("{e}");
};
}
if !is_running {
proc_control.server_is_running.store(true, Ordering::SeqCst);
ingest_is_running.store(true, Ordering::SeqCst);
playlist_init.store(true, Ordering::SeqCst);
is_running = true;
info!("Switch from {} to live ingest", config.processing.mode);
if let Err(e) = proc_control.stop(Decoder) {
if let Err(e) = channel_mgr.lock()?.stop(Decoder) {
error!("{e}");
}
}
@ -122,19 +128,17 @@ fn ingest_to_hls_server(
log_line(&line, &config.logging.ffmpeg_level);
}
if proc_control.server_is_running.load(Ordering::SeqCst) {
if ingest_is_running.load(Ordering::SeqCst) {
info!("Switch from live ingest to {}", config.processing.mode);
}
proc_control
.server_is_running
.store(false, Ordering::SeqCst);
ingest_is_running.store(false, Ordering::SeqCst);
if let Err(e) = proc_control.wait(Ingest) {
if let Err(e) = channel_mgr.lock()?.wait(Ingest) {
error!("{e}")
}
if proc_control.is_terminated.load(Ordering::SeqCst) {
if is_terminated.load(Ordering::SeqCst) {
break;
}
}
@ -146,27 +150,29 @@ fn ingest_to_hls_server(
///
/// Write with single ffmpeg instance directly to a HLS playlist.
pub fn write_hls(
config: &PlayoutConfig,
channel_mgr: Arc<Mutex<ChannelManager>>,
player_control: PlayerControl,
playout_stat: PlayoutStatus,
proc_control: ProcessControl,
) {
) -> Result<(), ProcessError> {
let config = channel_mgr.lock()?.config.lock()?.clone();
let config_clone = config.clone();
let ff_log_format = format!("level+{}", config.logging.ffmpeg_level.to_lowercase());
let play_stat = playout_stat.clone();
let play_stat2 = playout_stat.clone();
let proc_control_c = proc_control.clone();
let channel_mgr_c = channel_mgr.clone();
let is_terminated = channel_mgr.lock()?.is_terminated.clone();
let ingest_is_running = channel_mgr.lock()?.ingest_is_running.clone();
let get_source = source_generator(
config.clone(),
&player_control,
playout_stat,
proc_control.is_terminated.clone(),
is_terminated.clone(),
);
// spawn a thread for ffmpeg ingest server and create a channel for package sending
if config.ingest.enable {
thread::spawn(move || ingest_to_hls_server(config_clone, play_stat, proc_control_c));
thread::spawn(move || ingest_to_hls_server(config_clone, play_stat, channel_mgr_c));
}
for node in get_source {
@ -192,7 +198,7 @@ pub fn write_hls(
if config.task.path.is_file() {
let task_config = config.clone();
let task_node = node.clone();
let server_running = proc_control.server_is_running.load(Ordering::SeqCst);
let server_running = ingest_is_running.load(Ordering::SeqCst);
let stat = play_stat2.clone();
thread::spawn(move || {
@ -219,7 +225,7 @@ pub fn write_hls(
let mut read_rate = 1.0;
if let Some(begin) = &node.begin {
let (delta, _) = get_delta(config, begin);
let (delta, _) = get_delta(&config, begin);
let duration = node.out - node.seek;
let speed = duration / (duration + delta);
@ -235,7 +241,7 @@ pub fn write_hls(
enc_prefix.append(&mut vec_strings!["-readrate", read_rate]);
enc_prefix.append(&mut cmd);
let enc_cmd = prepare_output_cmd(config, enc_prefix, &node.filter);
let enc_cmd = prepare_output_cmd(&config, enc_prefix, &node.filter);
debug!(
"HLS writer CMD: <bright-blue>\"ffmpeg {}\"</>",
@ -255,22 +261,24 @@ pub fn write_hls(
};
let enc_err = BufReader::new(dec_proc.stderr.take().unwrap());
*proc_control.decoder_term.lock().unwrap() = Some(dec_proc);
*channel_mgr.lock()?.decoder.lock().unwrap() = Some(dec_proc);
if let Err(e) = stderr_reader(enc_err, ignore, Decoder, proc_control.clone()) {
if let Err(e) = stderr_reader(enc_err, ignore, Decoder, channel_mgr.clone()) {
error!("{e:?}")
};
if let Err(e) = proc_control.wait(Decoder) {
if let Err(e) = channel_mgr.lock()?.wait(Decoder) {
error!("{e}");
}
while proc_control.server_is_running.load(Ordering::SeqCst) {
while ingest_is_running.load(Ordering::SeqCst) {
sleep(Duration::from_secs(1));
}
}
sleep(Duration::from_secs(1));
proc_control.stop_all();
channel_mgr.lock()?.stop_all();
Ok(())
}

View File

@ -1,7 +1,7 @@
use std::{
io::{prelude::*, BufReader, BufWriter, Read},
process::{Command, Stdio},
sync::atomic::Ordering,
sync::{atomic::Ordering, Arc, Mutex},
thread::{self, sleep},
time::Duration,
};
@ -16,14 +16,13 @@ mod stream;
pub use hls::write_hls;
use crate::player::input::{ingest_server, source_generator};
use crate::utils::{config::PlayoutConfig, task_runner};
use ffplayout_lib::utils::{
sec_to_time, stderr_reader, OutputMode::*, PlayerControl, PlayoutStatus, ProcessControl,
ProcessUnit::*,
use crate::player::{
controller::{ChannelManager, PlayerControl, PlayoutStatus, ProcessUnit::*},
input::{ingest_server, source_generator},
utils::{sec_to_time, stderr_reader},
};
use ffplayout_lib::vec_strings;
use crate::utils::{config::OutputMode::*, errors::ProcessError, task_runner};
use crate::vec_strings;
/// Player
///
@ -35,11 +34,11 @@ use ffplayout_lib::vec_strings;
/// When a live ingest arrive, it stops the current playing and switch to the live source.
/// When ingest stops, it switch back to playlist/folder mode.
pub fn player(
config: &PlayoutConfig,
channel_mgr: Arc<Mutex<ChannelManager>>,
play_control: &PlayerControl,
playout_stat: PlayoutStatus,
proc_control: ProcessControl,
) {
) -> Result<(), ProcessError> {
let config = channel_mgr.lock()?.config.lock()?.clone();
let config_clone = config.clone();
let ff_log_format = format!("level+{}", config.logging.ffmpeg_level.to_lowercase());
let ignore_enc = config.logging.ignore_lines.clone();
@ -48,47 +47,51 @@ pub fn player(
let playlist_init = playout_stat.list_init.clone();
let play_stat = playout_stat.clone();
let channel = channel_mgr.lock()?;
let is_terminated = channel_mgr.lock()?.is_terminated.clone();
let ingest_is_running = channel_mgr.lock()?.ingest_is_running.clone();
// get source iterator
let node_sources = source_generator(
config.clone(),
play_control,
playout_stat,
proc_control.is_terminated.clone(),
channel.is_terminated.clone(),
);
// get ffmpeg output instance
let mut enc_proc = match config.out.mode {
Desktop => desktop::output(config, &ff_log_format),
Null => null::output(config, &ff_log_format),
Stream => stream::output(config, &ff_log_format),
Desktop => desktop::output(&config, &ff_log_format),
Null => null::output(&config, &ff_log_format),
Stream => stream::output(&config, &ff_log_format),
_ => panic!("Output mode doesn't exists!"),
};
let mut enc_writer = BufWriter::new(enc_proc.stdin.take().unwrap());
let enc_err = BufReader::new(enc_proc.stderr.take().unwrap());
*proc_control.encoder_term.lock().unwrap() = Some(enc_proc);
let enc_p_ctl = proc_control.clone();
*channel.encoder.lock().unwrap() = Some(enc_proc);
let enc_p_ctl = channel_mgr.clone();
// spawn a thread to log ffmpeg output error messages
let error_encoder_thread =
thread::spawn(move || stderr_reader(enc_err, ignore_enc, Encoder, enc_p_ctl));
let proc_control_c = proc_control.clone();
let channel_mgr_c = channel_mgr.clone();
let mut ingest_receiver = None;
// spawn a thread for ffmpeg ingest server and create a channel for package sending
if config.ingest.enable {
let (ingest_sender, rx) = bounded(96);
ingest_receiver = Some(rx);
thread::spawn(move || ingest_server(config_clone, ingest_sender, proc_control_c));
thread::spawn(move || ingest_server(config_clone, ingest_sender, channel_mgr_c));
}
'source_iter: for node in node_sources {
*play_control.current_media.lock().unwrap() = Some(node.clone());
let ignore_dec = config.logging.ignore_lines.clone();
if proc_control.is_terminated.load(Ordering::SeqCst) {
if is_terminated.load(Ordering::SeqCst) {
debug!("Playout is terminated, break out from source loop");
break;
}
@ -128,7 +131,7 @@ pub fn player(
if config.task.path.is_file() {
let task_config = config.clone();
let task_node = node.clone();
let server_running = proc_control.server_is_running.load(Ordering::SeqCst);
let server_running = ingest_is_running.load(Ordering::SeqCst);
let stat = play_stat.clone();
thread::spawn(move || {
@ -185,19 +188,19 @@ pub fn player(
let mut dec_reader = BufReader::new(dec_proc.stdout.take().unwrap());
let dec_err = BufReader::new(dec_proc.stderr.take().unwrap());
*proc_control.decoder_term.lock().unwrap() = Some(dec_proc);
let dec_p_ctl = proc_control.clone();
*channel_mgr.lock()?.decoder.lock().unwrap() = Some(dec_proc);
let channel_mgr_c = channel_mgr.clone();
let error_decoder_thread =
thread::spawn(move || stderr_reader(dec_err, ignore_dec, Decoder, dec_p_ctl));
thread::spawn(move || stderr_reader(dec_err, ignore_dec, Decoder, channel_mgr_c));
loop {
// when server is running, read from it
if proc_control.server_is_running.load(Ordering::SeqCst) {
if ingest_is_running.load(Ordering::SeqCst) {
if !live_on {
info!("Switch from {} to live ingest", config.processing.mode);
if let Err(e) = proc_control.stop(Decoder) {
if let Err(e) = channel_mgr.lock()?.stop(Decoder) {
error!("{e}")
}
@ -242,7 +245,7 @@ pub fn player(
}
}
if let Err(e) = proc_control.wait(Decoder) {
if let Err(e) = channel_mgr.lock()?.wait(Decoder) {
error!("{e}")
}
@ -255,9 +258,11 @@ pub fn player(
sleep(Duration::from_secs(1));
proc_control.stop_all();
channel_mgr.lock()?.stop_all();
if let Err(e) = error_encoder_thread.join() {
error!("{e:?}");
};
Ok(())
}

View File

@ -1,12 +1,13 @@
use std::process::{self, Command, Stdio};
use simplelog::*;
use log::*;
use crate::player::utils::prepare_output_cmd;
use ffplayout_lib::{
utils::{Media, PlayoutConfig, ProcessUnit::*},
vec_strings,
use crate::player::{
controller::ProcessUnit::*,
utils::{prepare_output_cmd, Media},
};
use crate::utils::config::PlayoutConfig;
use crate::vec_strings;
/// Desktop Output
///

View File

@ -1,12 +1,13 @@
use std::process::{self, Command, Stdio};
use simplelog::*;
use log::*;
use crate::player::utils::prepare_output_cmd;
use ffplayout_lib::{
utils::{Media, PlayoutConfig, ProcessUnit::*},
vec_strings,
use crate::player::{
controller::ProcessUnit::*,
utils::{prepare_output_cmd, Media},
};
use crate::utils::config::PlayoutConfig;
use crate::vec_strings;
/// Streaming Output
///

View File

@ -0,0 +1,234 @@
use std::sync::{
atomic::Ordering,
{Arc, Mutex},
};
use lexical_sort::natural_lexical_cmp;
use rand::{seq::SliceRandom, thread_rng};
use simplelog::*;
use walkdir::WalkDir;
use crate::player::controller::PlayerControl;
use crate::player::utils::{include_file_extension, time_in_seconds, Media, PlayoutConfig};
/// Folder Sources
///
/// Like playlist source, we create here a folder list for iterate over it.
#[derive(Debug, Clone)]
pub struct FolderSource {
config: PlayoutConfig,
filter_chain: Option<Arc<Mutex<Vec<String>>>>,
pub player_control: PlayerControl,
current_node: Media,
}
impl FolderSource {
pub fn new(
config: &PlayoutConfig,
filter_chain: Option<Arc<Mutex<Vec<String>>>>,
player_control: &PlayerControl,
) -> Self {
let mut path_list = vec![];
let mut media_list = vec![];
let mut index: usize = 0;
if config.general.generate.is_some() && !config.storage.paths.is_empty() {
for path in &config.storage.paths {
path_list.push(path)
}
} else {
path_list.push(&config.storage.path)
}
for path in &path_list {
if !path.is_dir() {
error!("Path not exists: <b><magenta>{path:?}</></b>");
}
for entry in WalkDir::new(path)
.into_iter()
.flat_map(|e| e.ok())
.filter(|f| f.path().is_file())
.filter(|f| include_file_extension(config, f.path()))
{
let media = Media::new(0, &entry.path().to_string_lossy(), false);
media_list.push(media);
}
}
if media_list.is_empty() {
error!(
"no playable files found under: <b><magenta>{:?}</></b>",
path_list
);
}
if config.storage.shuffle {
info!("Shuffle files");
let mut rng = thread_rng();
media_list.shuffle(&mut rng);
} else {
media_list.sort_by(|d1, d2| d1.source.cmp(&d2.source));
}
for item in media_list.iter_mut() {
item.index = Some(index);
index += 1;
}
*player_control.current_list.lock().unwrap() = media_list;
Self {
config: config.clone(),
filter_chain,
player_control: player_control.clone(),
current_node: Media::new(0, "", false),
}
}
pub fn from_list(
config: &PlayoutConfig,
filter_chain: Option<Arc<Mutex<Vec<String>>>>,
player_control: &PlayerControl,
list: Vec<Media>,
) -> Self {
*player_control.current_list.lock().unwrap() = list;
Self {
config: config.clone(),
filter_chain,
player_control: player_control.clone(),
current_node: Media::new(0, "", false),
}
}
fn shuffle(&mut self) {
let mut rng = thread_rng();
let mut nodes = self.player_control.current_list.lock().unwrap();
nodes.shuffle(&mut rng);
for (index, item) in nodes.iter_mut().enumerate() {
item.index = Some(index);
}
}
fn sort(&mut self) {
let mut nodes = self.player_control.current_list.lock().unwrap();
nodes.sort_by(|d1, d2| d1.source.cmp(&d2.source));
for (index, item) in nodes.iter_mut().enumerate() {
item.index = Some(index);
}
}
}
/// Create iterator for folder source
impl Iterator for FolderSource {
type Item = Media;
fn next(&mut self) -> Option<Self::Item> {
if self.player_control.current_index.load(Ordering::SeqCst)
< self.player_control.current_list.lock().unwrap().len()
{
let i = self.player_control.current_index.load(Ordering::SeqCst);
self.current_node = self.player_control.current_list.lock().unwrap()[i].clone();
let _ = self.current_node.add_probe(false).ok();
self.current_node
.add_filter(&self.config, &self.filter_chain);
self.current_node.begin = Some(time_in_seconds());
self.player_control
.current_index
.fetch_add(1, Ordering::SeqCst);
Some(self.current_node.clone())
} else {
if self.config.storage.shuffle {
if self.config.general.generate.is_none() {
info!("Shuffle files");
}
self.shuffle();
} else {
if self.config.general.generate.is_none() {
info!("Sort files");
}
self.sort();
}
self.current_node = self.player_control.current_list.lock().unwrap()[0].clone();
let _ = self.current_node.add_probe(false).ok();
self.current_node
.add_filter(&self.config, &self.filter_chain);
self.current_node.begin = Some(time_in_seconds());
self.player_control.current_index.store(1, Ordering::SeqCst);
Some(self.current_node.clone())
}
}
}
pub fn fill_filler_list(
config: &PlayoutConfig,
player_control: Option<PlayerControl>,
) -> Vec<Media> {
let mut filler_list = vec![];
let filler_path = &config.storage.filler;
if filler_path.is_dir() {
for (index, entry) in WalkDir::new(&config.storage.filler)
.into_iter()
.flat_map(|e| e.ok())
.filter(|f| f.path().is_file())
.filter(|f| include_file_extension(config, f.path()))
.enumerate()
{
let mut media = Media::new(index, &entry.path().to_string_lossy(), false);
if player_control.is_none() {
if let Err(e) = media.add_probe(false) {
error!("{e:?}");
};
}
filler_list.push(media);
}
if config.storage.shuffle {
let mut rng = thread_rng();
filler_list.shuffle(&mut rng);
} else {
filler_list.sort_by(|d1, d2| natural_lexical_cmp(&d1.source, &d2.source));
}
for (index, item) in filler_list.iter_mut().enumerate() {
item.index = Some(index);
}
if let Some(control) = player_control.as_ref() {
control.filler_list.lock().unwrap().clone_from(&filler_list);
}
} else if filler_path.is_file() {
let mut media = Media::new(0, &config.storage.filler.to_string_lossy(), false);
if player_control.is_none() {
if let Err(e) = media.add_probe(false) {
error!("{e:?}");
};
}
filler_list.push(media);
if let Some(control) = player_control.as_ref() {
control.filler_list.lock().unwrap().clone_from(&filler_list);
}
}
filler_list
}

View File

@ -0,0 +1,82 @@
/// Import text/m3u file and create a playlist out of it
use std::{
//error::Error,
fs::{create_dir_all, File},
io::{BufRead, BufReader, Error, ErrorKind},
path::Path,
};
use crate::player::utils::{
json_reader, json_serializer::JsonPlaylist, json_writer, Media, PlayoutConfig,
};
pub fn import_file(
config: &PlayoutConfig,
date: &str,
channel_name: Option<String>,
path: &Path,
) -> Result<String, Error> {
let file = File::open(path)?;
let reader = BufReader::new(file);
let mut playlist = JsonPlaylist {
channel: channel_name.unwrap_or_else(|| "Channel 1".to_string()),
date: date.to_string(),
path: None,
start_sec: None,
length: None,
modified: None,
program: vec![],
};
let playlist_root = &config.playlist.path;
if !playlist_root.is_dir() {
return Err(Error::new(
ErrorKind::Other,
format!(
"Playlist folder <b><magenta>{:?}</></b> not exists!",
config.playlist.path,
),
));
}
let d: Vec<&str> = date.split('-').collect();
let year = d[0];
let month = d[1];
let playlist_path = playlist_root.join(year).join(month);
let playlist_file = &playlist_path.join(format!("{date}.json"));
create_dir_all(playlist_path)?;
for line in reader.lines() {
let line = line?;
if !line.starts_with('#') {
let item = Media::new(0, &line, true);
if item.duration > 0.0 {
playlist.program.push(item);
}
}
}
let mut file_exists = false;
if playlist_file.is_file() {
file_exists = true;
let mut existing_data = json_reader(playlist_file)?;
existing_data.program.append(&mut playlist.program);
playlist.program = existing_data.program;
};
let mut msg = format!("Write playlist from {date} success!");
if file_exists {
msg = format!("Update playlist from {date} success!");
}
match json_writer(playlist_file, playlist) {
Ok(_) => Ok(msg),
Err(e) => Err(Error::new(ErrorKind::Other, e)),
}
}

View File

@ -8,10 +8,14 @@ use std::{
use simplelog::*;
use crate::utils::{
get_date, is_remote, modified_time, time_from_header, validate_playlist, Media, PlayerControl,
PlayoutConfig, DUMMY_LEN,
use crate::player::{
controller::PlayerControl,
utils::{
get_date, is_remote, json_validate::validate_playlist, modified_time, time_from_header,
Media, PlayoutConfig,
},
};
use crate::utils::config::DUMMY_LEN;
/// This is our main playlist object, it holds all necessary information for the current day.
#[derive(Debug, Serialize, Deserialize, Clone)]

View File

@ -0,0 +1,264 @@
use std::{
io::{BufRead, BufReader},
process::{Command, Stdio},
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::Instant,
};
use log::*;
use regex::Regex;
use crate::player::filter::FilterType::Audio;
use crate::player::{
controller::PlayerControl,
utils::{is_close, is_remote, loop_image, sec_to_time, seek_and_length, JsonPlaylist, Media},
};
use crate::utils::{
config::{OutputMode::Null, PlayoutConfig, FFMPEG_IGNORE_ERRORS, IMAGE_FORMAT},
errors::ProcessError,
};
use crate::vec_strings;
/// Validate a single media file.
///
/// - Check if file exists
/// - Check if ffmpeg can read the file
/// - Check if Metadata exists
/// - Check if the file is not silent
fn check_media(
mut node: Media,
pos: usize,
begin: f64,
config: &PlayoutConfig,
) -> Result<(), ProcessError> {
let mut dec_cmd = vec_strings!["-hide_banner", "-nostats", "-v", "level+info"];
let mut error_list = vec![];
let mut config = config.clone();
config.out.mode = Null;
let mut process_length = 0.1;
if let Some(decoder_input_cmd) = config
.advanced
.as_ref()
.and_then(|a| a.decoder.input_cmd.clone())
{
dec_cmd.append(&mut decoder_input_cmd.clone());
}
if config.logging.detect_silence {
process_length = 15.0;
let seek = node.duration / 4.0;
// Seek in file, to prevent false silence detection on intros without sound.
dec_cmd.append(&mut vec_strings!["-ss", seek]);
}
// Take care, that no seek and length command is added.
node.seek = 0.0;
node.out = node.duration;
if node
.source
.rsplit_once('.')
.map(|(_, e)| e.to_lowercase())
.filter(|c| IMAGE_FORMAT.contains(&c.as_str()))
.is_some()
{
node.cmd = Some(loop_image(&node));
} else {
node.cmd = Some(seek_and_length(&mut node));
}
node.add_filter(&config, &None);
let mut filter = node.filter.unwrap_or_default();
if filter.cmd().len() > 1 {
let re_clean = Regex::new(r"volume=[0-9.]+")?;
filter.audio_chain = re_clean
.replace_all(&filter.audio_chain, "anull")
.to_string();
}
filter.add_filter("silencedetect=n=-30dB", 0, Audio);
dec_cmd.append(&mut node.cmd.unwrap_or_default());
dec_cmd.append(&mut filter.cmd());
dec_cmd.append(&mut filter.map());
dec_cmd.append(&mut vec_strings!["-t", process_length, "-f", "null", "-"]);
let mut enc_proc = Command::new("ffmpeg")
.args(dec_cmd)
.stderr(Stdio::piped())
.spawn()?;
let enc_err = BufReader::new(enc_proc.stderr.take().unwrap());
let mut silence_start = 0.0;
let mut silence_end = 0.0;
let re_start = Regex::new(r"silence_start: ([0-9]+:)?([0-9.]+)")?;
let re_end = Regex::new(r"silence_end: ([0-9]+:)?([0-9.]+)")?;
for line in enc_err.lines() {
let line = line?;
if !FFMPEG_IGNORE_ERRORS.iter().any(|i| line.contains(*i))
&& !config.logging.ignore_lines.iter().any(|i| line.contains(i))
&& (line.contains("[error]") || line.contains("[fatal]"))
{
let log_line = line.replace("[error] ", "").replace("[fatal] ", "");
if !error_list.contains(&log_line) {
error_list.push(log_line);
}
}
if config.logging.detect_silence {
if let Some(start) = re_start.captures(&line).and_then(|c| c.get(2)) {
silence_start = start.as_str().parse::<f32>().unwrap_or_default();
}
if let Some(end) = re_end.captures(&line).and_then(|c| c.get(2)) {
silence_end = end.as_str().parse::<f32>().unwrap_or_default() + 0.5;
}
}
}
if silence_end - silence_start > process_length {
error_list.push("Audio is totally silent!".to_string());
}
if !error_list.is_empty() {
error!(
"<bright black>[Validator]</> ffmpeg error on position <yellow>{pos}</> - {}: <b><magenta>{}</></b>: {}",
sec_to_time(begin),
node.source,
error_list.join("\n")
)
}
error_list.clear();
if let Err(e) = enc_proc.wait() {
error!("Validation process: {e:?}");
}
Ok(())
}
/// Validate a given playlist, to check if:
///
/// - the source files are existing
/// - file can be read by ffprobe and metadata exists
/// - total playtime fits target length from config
///
/// This function we run in a thread, to don't block the main function.
pub fn validate_playlist(
mut config: PlayoutConfig,
player_control: PlayerControl,
mut playlist: JsonPlaylist,
is_terminated: Arc<AtomicBool>,
) {
let date = playlist.date;
if config.text.add_text && !config.text.text_from_filename {
// Turn of drawtext filter with zmq, because its port is needed by the decoder instance.
config.text.add_text = false;
}
let mut length = config.playlist.length_sec.unwrap();
let mut begin = config.playlist.start_sec.unwrap();
length += begin;
debug!("Validate playlist from: <yellow>{date}</>");
let timer = Instant::now();
for (index, item) in playlist.program.iter_mut().enumerate() {
if is_terminated.load(Ordering::SeqCst) {
return;
}
let pos = index + 1;
if !is_remote(&item.source) {
if item.audio.is_empty() {
if let Err(e) = item.add_probe(false) {
error!(
"[Validation] Error on position <yellow>{pos:0>3}</> <yellow>{}</>: {e}",
sec_to_time(begin)
);
}
} else if let Err(e) = item.add_probe(true) {
error!(
"[Validation] Error on position <yellow>{pos:0>3}</> <yellow>{}</>: {e}",
sec_to_time(begin)
);
}
}
if item.probe.is_some() {
if let Err(e) = check_media(item.clone(), pos, begin, &config) {
error!("{e}");
} else if config.general.validate {
debug!(
"[Validation] Source at <yellow>{}</>, seems fine: <b><magenta>{}</></b>",
sec_to_time(begin),
item.source
)
} else if let Ok(mut list) = player_control.current_list.try_lock() {
// Filter out same item in current playlist, then add the probe to it.
// Check also if duration differs with playlist value, log error if so and adjust that value.
list.iter_mut().filter(|list_item| list_item.source == item.source).for_each(|o| {
o.probe.clone_from(&item.probe);
if let Some(dur) =
item.probe.as_ref().and_then(|f| f.format.duration.clone())
{
let probe_duration = dur.parse().unwrap_or_default();
if !is_close(o.duration, probe_duration, 1.2) {
error!(
"[Validation] File duration (at: <yellow>{}</>) differs from playlist value. File duration: <yellow>{}</>, playlist value: <yellow>{}</>, source <b><magenta>{}</></b>",
sec_to_time(o.begin.unwrap_or_default()), sec_to_time(probe_duration), sec_to_time(o.duration), o.source
);
o.duration = probe_duration;
}
}
if o.audio == item.audio && item.probe_audio.is_some() {
o.probe_audio.clone_from(&item.probe_audio);
o.duration_audio = item.duration_audio;
}
});
}
}
begin += item.out - item.seek;
}
if !config.playlist.infinit && length > begin + 1.2 {
error!(
"[Validation] Playlist from <yellow>{date}</> not long enough, <yellow>{}</> needed!",
sec_to_time(length - begin),
);
}
if config.general.validate {
info!(
"[Validation] Playlist length: <yellow>{}</>",
sec_to_time(begin - config.playlist.start_sec.unwrap())
);
}
debug!(
"Validation done, in <yellow>{:.3?}</>, playlist length: <yellow>{}</> ...",
timer.elapsed(),
sec_to_time(begin - config.playlist.start_sec.unwrap())
);
}

View File

@ -19,19 +19,20 @@ use serde::{de::Deserializer, Deserialize, Serialize};
use serde_json::{json, Map, Value};
use simplelog::*;
pub mod folder;
pub mod import;
pub mod json_serializer;
pub mod json_validate;
use crate::db::models::Channel;
use crate::player::{
controller::{
PlayoutStatus,
ChannelManager, PlayoutStatus,
ProcessUnit::{self, *},
},
filter::{filter_chains, Filters},
};
use crate::utils::{
config::{OutputMode::HLS, PlayoutConfig, FFMPEG_IGNORE_ERRORS, FFMPEG_UNRECOVERABLE_ERRORS},
control::ProcessControl,
config::{OutputMode::*, PlayoutConfig, FFMPEG_IGNORE_ERRORS, FFMPEG_UNRECOVERABLE_ERRORS},
errors::ProcessError,
};
pub use json_serializer::{read_json, JsonPlaylist};
@ -144,7 +145,6 @@ pub fn get_media_map(media: Media) -> Value {
/// prepare json object for response
pub fn get_data_map(
config: &PlayoutConfig,
channel: Channel,
media: Media,
playout_stat: &PlayoutStatus,
server_is_running: bool,
@ -799,8 +799,8 @@ pub fn stderr_reader(
buffer: BufReader<ChildStderr>,
ignore: Vec<String>,
suffix: ProcessUnit,
proc_control: ProcessControl,
) -> Result<(), Error> {
channel_mgr: Arc<Mutex<ChannelManager>>,
) -> Result<(), ProcessError> {
for line in buffer.lines() {
let line = line?;
@ -832,7 +832,7 @@ pub fn stderr_reader(
|| (line.contains("No such file or directory")
&& !line.contains("failed to delete old segment"))
{
proc_control.stop_all();
channel_mgr.lock()?.stop_all();
exit(1);
}
}

View File

@ -6,12 +6,11 @@ use actix_web_lab::{
util::InfallibleStream,
};
use ffplayout_lib::utils::PlayoutConfig;
use parking_lot::Mutex;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use crate::utils::{control::media_info, system};
use crate::utils::{config::PlayoutConfig, control::media_info, system};
#[derive(Debug, Clone)]
struct Client {

View File

@ -4,15 +4,8 @@ use rand::prelude::*;
use simplelog::*;
use sqlx::{Pool, Sqlite};
use crate::utils::{
control::{control_service, ServiceCmd},
errors::ServiceError,
};
use ffplayout_lib::utils::PlayoutConfig;
use crate::db::{handles, models::Channel};
use crate::utils::playout_config;
use crate::utils::{config::PlayoutConfig, errors::ServiceError, playout_config};
pub async fn create_channel(
conn: &Pool<Sqlite>,
@ -34,7 +27,6 @@ pub async fn create_channel(
);
config.general.stat_file = format!(".ffp_{channel_name}",);
config.logging.path = config.logging.path.join(&channel_name);
config.rpc_server.address = format!("127.0.0.1:70{:7>2}", channel_num);
config.playlist.path = config.playlist.path.join(channel_name);
@ -48,17 +40,16 @@ pub async fn create_channel(
fs::write(&target_channel.config_path, toml_string)?;
let new_channel = handles::insert_channel(conn, target_channel).await?;
control_service(conn, &config, new_channel.id, &ServiceCmd::Enable, None).await?;
// TODO: Create Channel controller
Ok(new_channel)
}
pub async fn delete_channel(conn: &Pool<Sqlite>, id: i32) -> Result<(), ServiceError> {
let channel = handles::select_channel(conn, &id).await?;
let (config, _) = playout_config(conn, &id).await?;
let (_config, _) = playout_config(conn, &id).await?;
control_service(conn, &config, channel.id, &ServiceCmd::Stop, None).await?;
control_service(conn, &config, channel.id, &ServiceCmd::Disable, None).await?;
// TODO: Remove Channel controller
if let Err(e) = fs::remove_file(channel.config_path) {
error!("{e}");

View File

@ -1,22 +1,10 @@
use std::{
collections::HashMap,
env, fmt,
str::FromStr,
sync::atomic::{AtomicBool, Ordering},
};
use std::{collections::HashMap, fmt, str::FromStr, sync::atomic::AtomicBool};
use actix_web::web;
use reqwest::{header::AUTHORIZATION, Client, Response};
use serde::{Deserialize, Serialize};
use sqlx::{Pool, Sqlite};
use tokio::{
process::{Child, Command},
sync::Mutex,
};
use tokio::{process::Child, sync::Mutex};
use crate::db::handles::select_channel;
use crate::utils::errors::ServiceError;
use ffplayout_lib::{utils::PlayoutConfig, vec_strings};
use crate::utils::{config::PlayoutConfig, errors::ServiceError};
#[derive(Debug, Deserialize, Serialize, Clone)]
struct TextParams {
@ -44,89 +32,6 @@ pub struct ProcessControl {
pub piggyback: AtomicBool,
}
impl ProcessControl {
pub fn new() -> Self {
let piggyback = if env::consts::OS != "linux" || env::var("PIGGYBACK_MODE").is_ok() {
AtomicBool::new(true)
} else {
AtomicBool::new(false)
};
Self {
engine_child: Mutex::new(None),
is_running: AtomicBool::new(false),
piggyback,
}
}
}
impl ProcessControl {
pub async fn start(&self) -> Result<String, ServiceError> {
#[cfg(not(debug_assertions))]
let engine_path = "ffplayout";
#[cfg(debug_assertions)]
let engine_path = "./target/debug/ffplayout";
match Command::new(engine_path).kill_on_drop(true).spawn() {
Ok(proc) => *self.engine_child.lock().await = Some(proc),
Err(_) => return Err(ServiceError::InternalServerError),
};
self.is_running.store(true, Ordering::SeqCst);
Ok("Success".to_string())
}
pub async fn stop(&self) -> Result<String, ServiceError> {
if let Some(proc) = self.engine_child.lock().await.as_mut() {
if proc.kill().await.is_err() {
return Err(ServiceError::InternalServerError);
};
}
self.wait().await?;
self.is_running.store(false, Ordering::SeqCst);
Ok("Success".to_string())
}
pub async fn restart(&self) -> Result<String, ServiceError> {
self.stop().await?;
self.start().await?;
self.is_running.store(true, Ordering::SeqCst);
Ok("Success".to_string())
}
/// Wait for process to proper close.
/// This prevents orphaned/zombi processes in system
pub async fn wait(&self) -> Result<String, ServiceError> {
if let Some(proc) = self.engine_child.lock().await.as_mut() {
if proc.wait().await.is_err() {
return Err(ServiceError::InternalServerError);
};
}
Ok("Success".to_string())
}
pub fn status(&self) -> Result<String, ServiceError> {
if self.is_running.load(Ordering::SeqCst) {
Ok("active".to_string())
} else {
Ok("not running".to_string())
}
}
}
impl Default for ProcessControl {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq)]
#[serde(rename_all = "snake_case")]
pub enum ServiceCmd {
@ -172,75 +77,6 @@ pub struct Process {
pub command: ServiceCmd,
}
struct SystemD {
service: String,
cmd: Vec<String>,
}
impl SystemD {
async fn new(conn: &Pool<Sqlite>, id: i32) -> Result<Self, ServiceError> {
let _channel = select_channel(conn, &id).await?;
Ok(Self {
service: String::new(), // TODO: ...
cmd: vec_strings!["/usr/bin/systemctl"],
})
}
fn enable(mut self) -> Result<String, ServiceError> {
self.cmd
.append(&mut vec!["enable".to_string(), self.service]);
Command::new("sudo").args(self.cmd).spawn()?;
Ok("Success".to_string())
}
fn disable(mut self) -> Result<String, ServiceError> {
self.cmd
.append(&mut vec!["disable".to_string(), self.service]);
Command::new("sudo").args(self.cmd).spawn()?;
Ok("Success".to_string())
}
fn start(mut self) -> Result<String, ServiceError> {
self.cmd
.append(&mut vec!["start".to_string(), self.service]);
Command::new("sudo").args(self.cmd).spawn()?;
Ok("Success".to_string())
}
fn stop(mut self) -> Result<String, ServiceError> {
self.cmd.append(&mut vec!["stop".to_string(), self.service]);
Command::new("sudo").args(self.cmd).spawn()?;
Ok("Success".to_string())
}
fn restart(mut self) -> Result<String, ServiceError> {
self.cmd
.append(&mut vec!["restart".to_string(), self.service]);
Command::new("sudo").args(self.cmd).spawn()?;
Ok("Success".to_string())
}
async fn status(mut self) -> Result<String, ServiceError> {
self.cmd
.append(&mut vec!["is-active".to_string(), self.service]);
let output = Command::new("sudo").args(self.cmd).output().await?;
Ok(String::from_utf8_lossy(&output.stdout).trim().to_string())
}
}
async fn post_request<T>(config: &PlayoutConfig, obj: T) -> Result<Response, ServiceError>
where
T: Serialize,
@ -288,58 +124,3 @@ pub async fn media_info(config: &PlayoutConfig, command: String) -> Result<Respo
post_request(config, json_obj).await
}
pub async fn control_service(
conn: &Pool<Sqlite>,
config: &PlayoutConfig,
id: i32,
command: &ServiceCmd,
engine: Option<web::Data<ProcessControl>>,
) -> Result<String, ServiceError> {
if let Some(en) = engine {
if en.piggyback.load(Ordering::SeqCst) {
match command {
ServiceCmd::Start => en.start().await,
ServiceCmd::Stop => {
if control_state(config, "stop_all").await.is_ok() {
en.stop().await
} else {
Err(ServiceError::NoContent("Nothing to stop".to_string()))
}
}
ServiceCmd::Restart => {
if control_state(config, "stop_all").await.is_ok() {
en.restart().await
} else {
Err(ServiceError::NoContent("Nothing to restart".to_string()))
}
}
ServiceCmd::Status => en.status(),
_ => Err(ServiceError::Conflict(
"Engine runs in piggyback mode, in this mode this command is not allowed."
.to_string(),
)),
}
} else {
execute_systemd(conn, id, command).await
}
} else {
execute_systemd(conn, id, command).await
}
}
async fn execute_systemd(
conn: &Pool<Sqlite>,
id: i32,
command: &ServiceCmd,
) -> Result<String, ServiceError> {
let system_d = SystemD::new(conn, id).await?;
match command {
ServiceCmd::Enable => system_d.enable(),
ServiceCmd::Disable => system_d.disable(),
ServiceCmd::Start => system_d.start(),
ServiceCmd::Stop => system_d.stop(),
ServiceCmd::Restart => system_d.restart(),
ServiceCmd::Status => system_d.status().await,
}
}

View File

@ -111,12 +111,16 @@ impl From<uuid::Error> for ServiceError {
pub enum ProcessError {
#[display(fmt = "Failed to spawn ffmpeg/ffprobe. {}", _0)]
CommandSpawn(io::Error),
#[display(fmt = "{}", _0)]
Custom(String),
#[display(fmt = "IO error: {}", _0)]
IO(io::Error),
#[display(fmt = "{}", _0)]
Ffprobe(FfProbeError),
#[display(fmt = "{}", _0)]
Custom(String),
#[display(fmt = "Regex compile error {}", _0)]
Regex(String),
#[display(fmt = "Thread error {}", _0)]
Thread(String),
}
impl From<std::io::Error> for ProcessError {
@ -154,3 +158,21 @@ impl<T> From<std::sync::PoisonError<T>> for ProcessError {
ProcessError::Custom(err.to_string())
}
}
impl From<regex::Error> for ProcessError {
fn from(err: regex::Error) -> Self {
Self::Regex(err.to_string())
}
}
impl From<serde_json::Error> for ProcessError {
fn from(err: serde_json::Error) -> Self {
Self::Custom(err.to_string())
}
}
impl From<Box<dyn std::any::Any + std::marker::Send>> for ProcessError {
fn from(err: Box<dyn std::any::Any + std::marker::Send>) -> Self {
Self::Thread(format!("{err:?}"))
}
}

View File

@ -0,0 +1,325 @@
/// Simple Playlist Generator
///
/// You can call ffplayout[.exe] -g YYYY-mm-dd - YYYY-mm-dd to generate JSON playlists.
///
/// The generator takes the files from storage, which are set in config.
/// It also respect the shuffle/sort mode.
use std::{
fs::{create_dir_all, write},
io::Error,
process::exit,
};
use chrono::Timelike;
use lexical_sort::{natural_lexical_cmp, StringSort};
use rand::{seq::SliceRandom, thread_rng, Rng};
use simplelog::*;
use walkdir::WalkDir;
use crate::player::{
controller::PlayerControl,
utils::{
folder::{fill_filler_list, FolderSource},
gen_dummy, get_date_range, include_file_extension,
json_serializer::JsonPlaylist,
sum_durations, Media,
},
};
use crate::utils::{
config::{PlayoutConfig, Template},
time_to_sec,
};
pub fn random_list(clip_list: Vec<Media>, total_length: f64) -> Vec<Media> {
let mut max_attempts = 10000;
let mut randomized_clip_list: Vec<Media> = vec![];
let mut target_duration = 0.0;
let clip_list_length = clip_list.len();
let usage_limit = (total_length / sum_durations(&clip_list)).floor() + 1.0;
let mut last_clip = Media::new(0, "", false);
while target_duration < total_length && max_attempts > 0 {
let index = rand::thread_rng().gen_range(0..clip_list_length);
let selected_clip = clip_list[index].clone();
let selected_clip_count = randomized_clip_list
.iter()
.filter(|&n| *n == selected_clip)
.count() as f64;
if selected_clip_count == usage_limit
|| last_clip == selected_clip
|| target_duration + selected_clip.duration > total_length
{
max_attempts -= 1;
continue;
}
target_duration += selected_clip.duration;
randomized_clip_list.push(selected_clip.clone());
max_attempts -= 1;
last_clip = selected_clip;
}
randomized_clip_list
}
pub fn ordered_list(clip_list: Vec<Media>, total_length: f64) -> Vec<Media> {
let mut index = 0;
let mut skip_count = 0;
let mut ordered_clip_list: Vec<Media> = vec![];
let mut target_duration = 0.0;
let clip_list_length = clip_list.len();
while target_duration < total_length && skip_count < clip_list_length {
if index == clip_list_length {
index = 0;
}
let selected_clip = clip_list[index].clone();
if sum_durations(&ordered_clip_list) + selected_clip.duration > total_length
|| (!ordered_clip_list.is_empty()
&& selected_clip == ordered_clip_list[ordered_clip_list.len() - 1])
{
skip_count += 1;
index += 1;
continue;
}
target_duration += selected_clip.duration;
ordered_clip_list.push(selected_clip);
index += 1;
}
ordered_clip_list
}
pub fn filler_list(config: &PlayoutConfig, total_length: f64) -> Vec<Media> {
let filler_list = fill_filler_list(config, None);
let mut index = 0;
let mut filler_clip_list: Vec<Media> = vec![];
let mut target_duration = 0.0;
let clip_list_length = filler_list.len();
if clip_list_length > 0 {
while target_duration < total_length {
if index == clip_list_length {
index = 0;
}
let selected_clip = filler_list[index].clone();
target_duration += selected_clip.duration;
filler_clip_list.push(selected_clip);
index += 1;
}
let over_length = target_duration - total_length;
let last_index = filler_clip_list.len() - 1;
filler_clip_list[last_index].out = filler_clip_list[last_index].duration - over_length;
} else {
let mut dummy = Media::new(0, "", false);
let (source, cmd) = gen_dummy(config, total_length);
dummy.source = source;
dummy.cmd = Some(cmd);
dummy.duration = total_length;
dummy.out = total_length;
filler_clip_list.push(dummy);
}
filler_clip_list
}
pub fn generate_from_template(
config: &PlayoutConfig,
player_control: &PlayerControl,
template: Template,
) -> FolderSource {
let mut media_list = vec![];
let mut rng = thread_rng();
let mut index: usize = 0;
for source in template.sources {
let mut source_list = vec![];
let duration = (source.duration.hour() as f64 * 3600.0)
+ (source.duration.minute() as f64 * 60.0)
+ source.duration.second() as f64;
debug!("Generating playlist block with <yellow>{duration:.2}</> seconds length");
for path in source.paths {
debug!("Search files in <b><magenta>{path:?}</></b>");
let mut file_list = WalkDir::new(path.clone())
.into_iter()
.flat_map(|e| e.ok())
.filter(|f| f.path().is_file())
.filter(|f| include_file_extension(config, f.path()))
.map(|p| p.path().to_string_lossy().to_string())
.collect::<Vec<String>>();
if !source.shuffle {
file_list.string_sort_unstable(natural_lexical_cmp);
}
for entry in file_list {
let media = Media::new(0, &entry, true);
source_list.push(media);
}
}
let mut timed_list = if source.shuffle {
source_list.shuffle(&mut rng);
random_list(source_list, duration)
} else {
ordered_list(source_list, duration)
};
let total_length = sum_durations(&timed_list);
if duration > total_length {
let mut filler = filler_list(config, duration - total_length);
timed_list.append(&mut filler);
}
media_list.append(&mut timed_list);
}
for item in media_list.iter_mut() {
item.index = Some(index);
index += 1;
}
FolderSource::from_list(config, None, player_control, media_list)
}
/// Generate playlists
pub fn generate_playlist(
config: &PlayoutConfig,
channel_name: Option<String>,
) -> Result<Vec<JsonPlaylist>, Error> {
let total_length = match config.playlist.length_sec {
Some(length) => length,
None => {
if config.playlist.length.contains(':') {
time_to_sec(&config.playlist.length)
} else {
86400.0
}
}
};
let player_control = PlayerControl::new();
let playlist_root = &config.playlist.path;
let mut playlists = vec![];
let mut date_range = vec![];
let mut from_template = false;
let channel = match channel_name {
Some(name) => name,
None => "Channel 1".to_string(),
};
if !playlist_root.is_dir() {
error!(
"Playlist folder <b><magenta>{:?}</></b> not exists!",
config.playlist.path
);
exit(1);
}
if let Some(range) = config.general.generate.clone() {
date_range = range;
}
if date_range.contains(&"-".to_string()) && date_range.len() == 3 {
date_range = get_date_range(&date_range)
}
// gives an iterator with infinit length
let folder_iter = if let Some(template) = &config.general.template {
from_template = true;
generate_from_template(config, &player_control, template.clone())
} else {
FolderSource::new(config, None, &player_control)
};
let list_length = player_control.current_list.lock().unwrap().len();
for date in date_range {
let d: Vec<&str> = date.split('-').collect();
let year = d[0];
let month = d[1];
let playlist_path = playlist_root.join(year).join(month);
let playlist_file = &playlist_path.join(format!("{date}.json"));
let mut length = 0.0;
let mut round = 0;
create_dir_all(playlist_path)?;
if playlist_file.is_file() {
warn!(
"Playlist exists, skip: <b><magenta>{}</></b>",
playlist_file.display()
);
continue;
}
info!(
"Generate playlist: <b><magenta>{}</></b>",
playlist_file.display()
);
let mut playlist = JsonPlaylist {
channel: channel.clone(),
date,
path: None,
start_sec: None,
length: None,
modified: None,
program: vec![],
};
if from_template {
let media_list = player_control.current_list.lock().unwrap();
playlist.program = media_list.to_vec();
} else {
for item in folder_iter.clone() {
let duration = item.duration;
if total_length >= length + duration {
playlist.program.push(item);
length += duration;
} else if round == list_length - 1 {
break;
} else {
round += 1;
}
}
let list_duration = sum_durations(&playlist.program);
if config.playlist.length_sec.unwrap() > list_duration {
let time_left = config.playlist.length_sec.unwrap() - list_duration;
let mut fillers = filler_list(config, time_left);
playlist.program.append(&mut fillers);
}
}
let json: String = serde_json::to_string_pretty(&playlist)?;
write(playlist_file, json)?;
playlists.push(playlist);
}
Ok(playlists)
}

View File

@ -253,7 +253,7 @@ fn file_formatter(
)
}
fn file_logger() -> Box<dyn LogWriter> {
pub fn log_file_path() -> PathBuf {
let mut log_path = ARGS
.log_path
.clone()
@ -263,10 +263,14 @@ fn file_logger() -> Box<dyn LogWriter> {
log_path = env::current_dir().unwrap();
}
log_path
}
fn file_logger() -> Box<dyn LogWriter> {
if ARGS.log_to_console {
Box::new(LogConsole)
} else {
Box::new(MultiFileLogger::new(log_path))
Box::new(MultiFileLogger::new(log_file_path()))
}
}

View File

@ -28,6 +28,7 @@ pub mod config;
pub mod control;
pub mod errors;
pub mod files;
pub mod generator;
pub mod logging;
pub mod playlist;
pub mod system;
@ -38,8 +39,8 @@ use crate::db::{
handles::{db_init, insert_user, select_channel, select_global},
models::{Channel, User},
};
use crate::utils::errors::ServiceError;
use ffplayout_lib::utils::{time_to_sec, PlayoutConfig};
use crate::player::utils::time_to_sec;
use crate::utils::{config::PlayoutConfig, errors::ServiceError, logging::log_file_path};
#[derive(Clone, Debug, Eq, Hash, PartialEq, Serialize, Deserialize)]
pub enum Role {
@ -308,42 +309,30 @@ pub async fn playout_config(
))
}
pub async fn read_log_file(
conn: &Pool<Sqlite>,
channel_id: &i32,
date: &str,
) -> Result<String, ServiceError> {
if let Ok(channel) = select_channel(conn, channel_id).await {
let mut date_str = "".to_string();
pub async fn read_log_file(channel_id: &i32, date: &str) -> Result<String, ServiceError> {
let mut date_str = "".to_string();
if !date.is_empty() {
date_str.push('.');
date_str.push_str(date);
}
if let Ok(config) = read_playout_config(&channel.config_path) {
let mut log_path = Path::new(&config.logging.path)
.join("ffplayout.log")
.display()
.to_string();
log_path.push_str(&date_str);
let file_size = metadata(&log_path)?.len() as f64;
let file_content = if file_size > 5000000.0 {
error!("Log file to big: {}", sizeof_fmt(file_size));
format!("The log file is larger ({}) than the hard limit of 5MB, the probability is very high that something is wrong with the playout. Check this on the server with `less {log_path}`.", sizeof_fmt(file_size))
} else {
fs::read_to_string(log_path)?
};
return Ok(file_content);
}
if !date.is_empty() {
date_str.push('.');
date_str.push_str(date);
}
Err(ServiceError::NoContent(
"Requested log file not exists, or not readable.".to_string(),
))
let mut log_path = log_file_path()
.join(format!("ffplayout_{channel_id}.log"))
.display()
.to_string();
log_path.push_str(&date_str);
let file_size = metadata(&log_path)?.len() as f64;
let file_content = if file_size > 5000000.0 {
error!("Log file to big: {}", sizeof_fmt(file_size));
format!("The log file is larger ({}) than the hard limit of 5MB, the probability is very high that something is wrong with the playout. Check this on the server with `less {log_path}`.", sizeof_fmt(file_size))
} else {
fs::read_to_string(log_path)?
};
return Ok(file_content);
}
/// get human readable file size

View File

@ -3,9 +3,10 @@ use std::{fs, path::PathBuf};
use simplelog::*;
use sqlx::{Pool, Sqlite};
use crate::utils::{errors::ServiceError, files::norm_abs_path, playout_config};
use ffplayout_lib::utils::{
generate_playlist as playlist_generator, json_reader, json_writer, JsonPlaylist, PlayoutConfig,
use crate::player::utils::{json_reader, json_writer, JsonPlaylist};
use crate::utils::{
config::PlayoutConfig, errors::ServiceError, files::norm_abs_path,
generator::generate_playlist as playlist_generator, playout_config,
};
pub async fn read_playlist(

View File

@ -4,8 +4,8 @@ use local_ip_address::list_afinet_netifas;
use serde::Serialize;
use sysinfo::System;
use crate::utils::config::PlayoutConfig;
use crate::{DISKS, NETWORKS, SYS};
use ffplayout_lib::utils::PlayoutConfig;
const IGNORE_INTERFACES: [&str; 7] = ["docker", "lxdbr", "tab", "tun", "virbr", "veth", "vnet"];

View File

@ -4,7 +4,8 @@ use log::*;
use crate::player::utils::get_data_map;
use ffplayout_lib::utils::{config::PlayoutConfig, Media, PlayoutStatus};
use crate::player::{controller::PlayoutStatus, utils::Media};
use crate::utils::config::PlayoutConfig;
pub fn run(config: PlayoutConfig, node: Media, playout_stat: PlayoutStatus, server_running: bool) {
let obj =