get correct error level from config, fix playlist tests

This commit is contained in:
jb-alvarado 2023-02-09 11:09:54 +01:00
parent 35b9bbcbfa
commit c0740fc830
13 changed files with 45230 additions and 70 deletions

26
Cargo.lock generated
View File

@ -2551,6 +2551,31 @@ dependencies = [
"yaml-rust",
]
[[package]]
name = "serial_test"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "538c30747ae860d6fb88330addbbd3e0ddbe46d662d032855596d8a8ca260611"
dependencies = [
"dashmap",
"futures",
"lazy_static",
"log",
"parking_lot 0.12.1",
"serial_test_derive",
]
[[package]]
name = "serial_test_derive"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "079a83df15f85d89a68d64ae1238f142f172b1fa915d0d76b26a7cba1b659a69"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "sha1"
version = "0.10.5"
@ -2823,6 +2848,7 @@ dependencies = [
"serde",
"serde_json",
"serde_yaml",
"serial_test",
"shlex",
"simplelog",
"time 0.3.17",

View File

@ -6,48 +6,16 @@ use std::{
};
use crossbeam_channel::Sender;
use regex::Regex;
use simplelog::*;
use ffplayout_lib::utils::{
controller::ProcessUnit::*, test_tcp_port, Media, PlayoutConfig, ProcessControl,
FFMPEG_IGNORE_ERRORS,
use crate::utils::{log_line, valid_stream};
use ffplayout_lib::{
utils::{
controller::ProcessUnit::*, test_tcp_port, Media, PlayoutConfig, ProcessControl,
FFMPEG_IGNORE_ERRORS, FFMPEG_UNRECOVERABLE_ERRORS,
},
vec_strings,
};
use ffplayout_lib::vec_strings;
pub fn log_line(line: String, level: &str) {
if line.contains("[info]") && level.to_lowercase() == "info" {
info!("<bright black>[Server]</> {}", line.replace("[info] ", ""))
} else if line.contains("[warning]")
&& (level.to_lowercase() == "warning" || level.to_lowercase() == "info")
{
warn!(
"<bright black>[Server]</> {}",
line.replace("[warning] ", "")
)
} else if line.contains("[error]")
&& !line.contains("Input/output error")
&& !line.contains("Broken pipe")
{
error!("<bright black>[Server]</> {}", line.replace("[error] ", ""));
} else if line.contains("[fatal]") {
error!("<bright black>[Server]</> {}", line.replace("[fatal] ", ""))
}
}
fn valid_stream(msg: &str) -> bool {
if let Some((unexpected, expected)) = msg.split_once(',') {
let re = Regex::new(r".*Unexpected stream|expecting|[\s]+|\?$").unwrap();
let unexpected = re.replace_all(unexpected, "");
let expected = re.replace_all(expected, "");
if unexpected == expected {
return true;
}
}
false
}
fn server_monitor(
level: &str,
@ -57,24 +25,22 @@ fn server_monitor(
for line in buffer.lines() {
let line = line?;
if !FFMPEG_IGNORE_ERRORS.iter().any(|i| line.contains(*i)) {
log_line(&line, level);
}
if line.contains("rtmp") && line.contains("Unexpected stream") && !valid_stream(&line) {
if let Err(e) = proc_ctl.kill(Ingest) {
error!("{e}");
};
warn!(
"<bright black>[Server]</> {}",
line.replace("[warning] ", "")
);
}
if line.contains("Address already in use") {
if FFMPEG_UNRECOVERABLE_ERRORS
.iter()
.any(|i| line.contains(*i))
{
proc_ctl.kill_all();
}
if !FFMPEG_IGNORE_ERRORS.iter().any(|i| line.contains(*i)) {
log_line(line, level);
}
}
Ok(())

View File

@ -27,8 +27,8 @@ use std::{
use simplelog::*;
use crate::input::{ingest::log_line, source_generator};
use crate::utils::prepare_output_cmd;
use crate::input::source_generator;
use crate::utils::{log_line, prepare_output_cmd, valid_stream};
use ffplayout_lib::{
utils::{
controller::ProcessUnit::*, sec_to_time, stderr_reader, test_tcp_port, Media,
@ -92,7 +92,7 @@ fn ingest_to_hls_server(
for line in server_err.lines() {
let line = line?;
if line.contains("rtmp") && line.contains("Unexpected stream") {
if line.contains("rtmp") && line.contains("Unexpected stream") && !valid_stream(&line) {
if let Err(e) = proc_ctl.kill(Ingest) {
error!("{e}");
};
@ -110,7 +110,7 @@ fn ingest_to_hls_server(
}
}
log_line(line, &level);
log_line(&line, &level);
}
if proc_control.server_is_running.load(Ordering::SeqCst) {

View File

@ -4,6 +4,7 @@ use std::{
};
use regex::Regex;
use simplelog::*;
pub mod arg_parse;
@ -98,6 +99,42 @@ pub fn get_config(args: Args) -> PlayoutConfig {
config
}
/// Format ingest and HLS logging output
pub fn log_line(line: &str, level: &str) {
if line.contains("[info]") && level.to_lowercase() == "info" {
info!("<bright black>[Server]</> {}", line.replace("[info] ", ""))
} else if line.contains("[warning]")
&& (level.to_lowercase() == "warning" || level.to_lowercase() == "info")
{
warn!(
"<bright black>[Server]</> {}",
line.replace("[warning] ", "")
)
} else if line.contains("[error]")
&& !line.contains("Input/output error")
&& !line.contains("Broken pipe")
{
error!("<bright black>[Server]</> {}", line.replace("[error] ", ""));
} else if line.contains("[fatal]") {
error!("<bright black>[Server]</> {}", line.replace("[fatal] ", ""))
}
}
/// Compare incoming stream name with expecting name, but ignore question mark.
pub fn valid_stream(msg: &str) -> bool {
if let Some((unexpected, expected)) = msg.split_once(',') {
let re = Regex::new(r".*Unexpected stream|expecting|[\s]+|\?$").unwrap();
let unexpected = re.replace_all(unexpected, "");
let expected = re.replace_all(expected, "");
if unexpected == expected {
return true;
}
}
false
}
/// Prepare output parameters
///
/// Seek for multiple outputs and add mapping for it.

View File

@ -6,7 +6,8 @@ use std::{
str::FromStr,
};
use serde::{Deserialize, Serialize};
use log::LevelFilter;
use serde::{de, Deserialize, Deserializer, Serialize, Serializer};
use shlex::split;
use super::vec_strings;
@ -33,6 +34,13 @@ pub const FFMPEG_IGNORE_ERRORS: [&str; 11] = [
"frame size not set",
];
pub const FFMPEG_UNRECOVERABLE_ERRORS: [&str; 4] = [
"Address already in use",
"Invalid argument",
"Numerical result",
"Error initializing complex filters",
];
#[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq)]
#[serde(rename_all = "lowercase")]
pub enum OutputMode {
@ -84,6 +92,37 @@ impl FromStr for ProcessMode {
}
}
pub fn string_to_log_level<'de, D>(deserializer: D) -> Result<LevelFilter, D::Error>
where
D: Deserializer<'de>,
{
let s: String = Deserialize::deserialize(deserializer)?;
match s.to_lowercase().as_str() {
"debug" => Ok(LevelFilter::Debug),
"error" => Ok(LevelFilter::Error),
"info" => Ok(LevelFilter::Info),
"trace" => Ok(LevelFilter::Trace),
"warning" => Ok(LevelFilter::Warn),
"off" => Ok(LevelFilter::Off),
_ => Err(de::Error::custom("Error level not exists!")),
}
}
fn log_level_to_string<S>(l: &LevelFilter, s: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
match l {
LevelFilter::Debug => s.serialize_str("DEBUG"),
LevelFilter::Error => s.serialize_str("ERROR"),
LevelFilter::Info => s.serialize_str("INFO"),
LevelFilter::Trace => s.serialize_str("TRACE"),
LevelFilter::Warn => s.serialize_str("WARNING"),
LevelFilter::Off => s.serialize_str("OFF"),
}
}
/// Global Config
///
/// This we init ones, when ffplayout is starting and use them globally in the hole program.
@ -148,7 +187,11 @@ pub struct Logging {
pub local_time: bool,
pub timestamp: bool,
pub log_path: String,
pub log_level: String,
#[serde(
serialize_with = "log_level_to_string",
deserialize_with = "string_to_log_level"
)]
pub log_level: LevelFilter,
pub ffmpeg_level: String,
pub ingest_level: Option<String>,
}

View File

@ -233,7 +233,11 @@ pub fn init_logging(
None,
);
app_logger.push(WriteLogger::new(LevelFilter::Debug, file_config, log_file));
app_logger.push(WriteLogger::new(
app_config.log_level,
file_config,
log_file,
));
} else {
let term_config = log_config
.clone()
@ -247,7 +251,7 @@ pub fn init_logging(
.build();
app_logger.push(TermLogger::new(
LevelFilter::Debug,
app_config.log_level,
term_config,
TerminalMode::Mixed,
ColorChoice::Auto,

View File

@ -39,7 +39,7 @@ pub use config::{
OutputMode::{self, *},
PlayoutConfig,
ProcessMode::{self, *},
DUMMY_LEN, FFMPEG_IGNORE_ERRORS, IMAGE_FORMAT,
DUMMY_LEN, FFMPEG_IGNORE_ERRORS, FFMPEG_UNRECOVERABLE_ERRORS, IMAGE_FORMAT,
};
pub use controller::{
PlayerControl, PlayoutStatus, ProcessControl,
@ -660,11 +660,11 @@ pub fn stderr_reader(
line.replace("[error] ", "").replace("[fatal] ", "")
);
if line.contains("Invalid argument")
|| line.contains("Numerical result")
if FFMPEG_UNRECOVERABLE_ERRORS
.iter()
.any(|i| line.contains(*i))
|| (line.contains("No such file or directory")
&& !line.contains("failed to delete old segment"))
|| line.contains("Error initializing complex filters")
{
proc_control.kill_all();
exit(1);

View File

@ -26,6 +26,7 @@ reqwest = { version = "0.11", features = ["blocking", "json"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
serde_yaml = "0.8"
serial_test = "1.0"
shlex = "1.1"
simplelog = { version = "^0.12", features = ["paris"] }
time = { version = "0.3", features = ["formatting", "macros"] }

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -3,26 +3,39 @@ use std::{
time::Duration,
};
use serial_test::serial;
use simplelog::*;
use ffplayout::output::player;
use ffplayout_lib::utils::*;
use ffplayout_lib::{utils::*, vec_strings};
fn timed_kill(sec: u64, mut proc_ctl: ProcessControl) {
sleep(Duration::from_secs(sec));
info!("Timed kill of process");
proc_ctl.kill_all();
}
#[test]
#[serial]
#[ignore]
fn playlist_change_at_midnight() {
let mut config = PlayoutConfig::new(None);
config.mail.recipient = "".into();
config.processing.mode = Playlist;
config.ingest.enable = false;
config.text.add_text = false;
config.playlist.day_start = "00:00:00".into();
config.playlist.start_sec = Some(0.0);
config.playlist.length = "24:00:00".into();
config.playlist.length_sec = Some(86400.0);
config.playlist.path = "assets/playlists".into();
config.storage.filler_clip = "assets/with_audio.mp4".into();
config.logging.log_to_file = false;
config.logging.timestamp = false;
config.out.mode = Null;
config.out.output_cmd = Some(vec_strings!["-f", "null", "-"]);
let play_control = PlayerControl::new();
let playout_stat = PlayoutStatus::new();
@ -30,24 +43,38 @@ fn playlist_change_at_midnight() {
let proc_ctl = proc_control.clone();
let logging = init_logging(&config, None, None);
CombinedLogger::init(logging).unwrap();
CombinedLogger::init(logging).unwrap_or_default();
mock_time::set_mock_time("2022-05-09T23:59:45");
mock_time::set_mock_time("2023-02-08T23:59:45");
thread::spawn(move || timed_kill(30, proc_ctl));
thread::spawn(move || timed_kill(28, proc_ctl));
player(&config, play_control, playout_stat, proc_control);
player(&config, play_control, playout_stat.clone(), proc_control);
let playlist_date = &*playout_stat.current_date.lock().unwrap();
assert_eq!(playlist_date, "2023-02-09");
}
#[test]
#[serial]
#[ignore]
fn playlist_change_at_six() {
let mut config = PlayoutConfig::new(None);
config.mail.recipient = "".into();
config.processing.mode = Playlist;
config.ingest.enable = false;
config.text.add_text = false;
config.playlist.day_start = "06:00:00".into();
config.playlist.start_sec = Some(21600.0);
config.playlist.length = "24:00:00".into();
config.playlist.length_sec = Some(86400.0);
config.playlist.path = "assets/playlists".into();
config.storage.filler_clip = "assets/with_audio.mp4".into();
config.logging.log_to_file = false;
config.logging.timestamp = false;
config.out.mode = Null;
config.out.output_cmd = Some(vec_strings!["-f", "null", "-"]);
let play_control = PlayerControl::new();
let playout_stat = PlayoutStatus::new();
@ -55,11 +82,15 @@ fn playlist_change_at_six() {
let proc_ctl = proc_control.clone();
let logging = init_logging(&config, None, None);
CombinedLogger::init(logging).unwrap();
CombinedLogger::init(logging).unwrap_or_default();
mock_time::set_mock_time("2022-05-09T05:59:45");
mock_time::set_mock_time("2023-02-09T05:59:45");
thread::spawn(move || timed_kill(30, proc_ctl));
thread::spawn(move || timed_kill(28, proc_ctl));
player(&config, play_control, playout_stat, proc_control);
player(&config, play_control, playout_stat.clone(), proc_control);
let playlist_date = &*playout_stat.current_date.lock().unwrap();
assert_eq!(playlist_date, "2023-02-09");
}