add args for playlist generation etc., add channel to preview url,

This commit is contained in:
jb-alvarado 2024-06-30 22:05:22 +02:00
parent 70fcf41923
commit 8f528f9ec5
8 changed files with 226 additions and 44 deletions

View File

@ -12,7 +12,7 @@ use sqlx::{sqlite::SqliteRow, FromRow, Pool, Row, Sqlite};
use crate::db::handles;
use crate::utils::config::PlayoutConfig;
#[derive(Debug, Deserialize, Serialize, sqlx::FromRow)]
#[derive(Clone, Debug, Deserialize, Serialize, sqlx::FromRow)]
pub struct GlobalSettings {
pub id: i32,
pub secret: Option<String>,

View File

@ -1,8 +1,10 @@
use std::{
collections::HashSet,
env, io,
env,
fs::File,
io,
process::exit,
sync::{Arc, Mutex},
sync::{atomic::AtomicBool, Arc, Mutex},
thread,
};
@ -25,12 +27,16 @@ use ffplayout::{
db_pool, handles,
models::{init_globales, UserMeta},
},
player::controller::{ChannelController, ChannelManager},
player::{
controller::{ChannelController, ChannelManager},
utils::{get_date, is_remote, json_validate::validate_playlist, JsonPlaylist},
},
sse::{broadcast::Broadcaster, routes::*, SseAuthState},
utils::{
args_parse::run_args,
config::PlayoutConfig,
config::get_config,
logging::{init_logging, MailQueue},
playlist::generate_playlist,
},
ARGS,
};
@ -94,7 +100,7 @@ async fn main() -> std::io::Result<()> {
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
for channel in channels.iter() {
let config = PlayoutConfig::new(&pool, channel.id).await;
let config = get_config(&pool, channel.id).await?;
let manager = ChannelManager::new(Some(pool.clone()), channel.clone(), config.clone());
let m_queue = Arc::new(Mutex::new(MailQueue::new(channel.id, config.mail)));
@ -227,26 +233,64 @@ async fn main() -> std::io::Result<()> {
.workers(thread_count)
.run()
.await?;
} else if let Some(channels) = &ARGS.channels {
for (index, channel_id) in channels.iter().enumerate() {
let channel = handles::select_channel(&pool, channel_id).await.unwrap();
let config = PlayoutConfig::new(&pool, *channel_id).await;
let manager = ChannelManager::new(Some(pool.clone()), channel.clone(), config.clone());
let m_queue = Arc::new(Mutex::new(MailQueue::new(*channel_id, config.mail)));
channel_controllers
.lock()
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?
.add(manager.clone());
if let Ok(mut mqs) = mail_queues.lock() {
mqs.push(m_queue.clone());
}
manager.foreground_start(index).await;
}
} else {
error!("Run ffplayout with parameters! Run ffplayout -h for more information.");
let channels = ARGS.channels.clone().unwrap_or_else(|| vec![1]);
for (index, channel_id) in channels.iter().enumerate() {
let config = get_config(&pool, *channel_id).await?;
let channel = handles::select_channel(&pool, channel_id).await.unwrap();
let manager = ChannelManager::new(Some(pool.clone()), channel.clone(), config.clone());
if ARGS.foreground {
let m_queue = Arc::new(Mutex::new(MailQueue::new(*channel_id, config.mail)));
channel_controllers
.lock()
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?
.add(manager.clone());
if let Ok(mut mqs) = mail_queues.lock() {
mqs.push(m_queue.clone());
}
manager.foreground_start(index).await;
} else if ARGS.generate.is_some() {
// run a simple playlist generator and save them to disk
if let Err(e) = generate_playlist(manager) {
error!("{e}");
exit(1);
};
} else if ARGS.validate {
let mut playlist_path = config.global.playlist_path.clone();
let start_sec = config.playlist.start_sec.unwrap();
let date = get_date(false, start_sec, false);
if playlist_path.is_dir() || is_remote(&playlist_path.to_string_lossy()) {
let d: Vec<&str> = date.split('-').collect();
playlist_path = playlist_path
.join(d[0])
.join(d[1])
.join(date.clone())
.with_extension("json");
}
let f = File::options()
.read(true)
.write(false)
.open(&playlist_path)?;
let playlist: JsonPlaylist = serde_json::from_reader(f)?;
validate_playlist(
config,
Arc::new(Mutex::new(Vec::new())),
playlist,
Arc::new(AtomicBool::new(false)),
);
} else {
error!("Run ffplayout with parameters! Run ffplayout -h for more information.");
}
}
}
for channel in &channel_controllers.lock().unwrap().channels {

View File

@ -357,10 +357,8 @@ pub fn start_channel(manager: ChannelManager) -> Result<(), ProcessError> {
}
}
pub fn drain_hls_path(path: &Path, params: &Vec<String>, channel_id: i32) -> io::Result<()> {
pub fn drain_hls_path(path: &Path, params: &[String], channel_id: i32) -> io::Result<()> {
let disks = Disks::new_with_refreshed_list();
// 1059061760
// 1000000000
for disk in &disks {
if disk.mount_point().to_string_lossy().len() > 1
@ -378,7 +376,7 @@ pub fn drain_hls_path(path: &Path, params: &Vec<String>, channel_id: i32) -> io:
fn delete_ts<P: AsRef<Path> + Clone + std::fmt::Debug>(
path: P,
params: &Vec<String>,
params: &[String],
) -> io::Result<()> {
let ts_file = params
.iter()
@ -400,7 +398,7 @@ fn delete_ts<P: AsRef<Path> + Clone + std::fmt::Debug>(
fn paths_match(patterns: &Vec<&String>, actual_path: &str) -> bool {
for pattern in patterns {
let pattern_escaped = regex::escape(&pattern);
let pattern_escaped = regex::escape(pattern);
let pattern_regex = pattern_escaped.replace(r"%d", r"\d+");
let re = Regex::new(&pattern_regex).unwrap();

View File

@ -12,7 +12,10 @@ use crate::db::{
handles::{self, insert_user},
models::{Channel, GlobalSettings, User},
};
use crate::utils::{advanced_config::AdvancedConfig, config::PlayoutConfig};
use crate::utils::{
advanced_config::AdvancedConfig,
config::{OutputMode, PlayoutConfig},
};
use crate::ARGS;
#[derive(Parser, Debug, Clone)]
@ -37,11 +40,14 @@ pub struct Args {
short,
long,
env,
help = "Run channels by ids immediately (works without webserver and frontend, no listening parameter is needed)",
help = "Channels by ids to process (for foreground, etc.)",
num_args = 1..,
)]
pub channels: Option<Vec<i32>>,
#[clap(long, env, help = "Run playout without webserver and frontend.")]
pub foreground: bool,
#[clap(
long,
help = "Dump advanced channel configuration to advanced_{channel}.toml"
@ -74,6 +80,21 @@ pub struct Args {
#[clap(short, env, long, help = "Listen on IP:PORT, like: 127.0.0.1:8787")]
pub listen: Option<String>,
#[clap(short, long, help = "Play folder content")]
pub folder: Option<PathBuf>,
#[clap(
short,
long,
help = "Generate playlist for dates, like: 2022-01-01 - 2022-01-10",
name = "YYYY-MM-DD",
num_args = 1..,
)]
pub generate: Option<Vec<String>>,
#[clap(long, help = "Optional folder path list for playlist generations", num_args = 1..)]
pub gen_paths: Option<Vec<PathBuf>>,
#[clap(long, env, help = "Keep log file for given days")]
pub log_backup_count: Option<usize>,
@ -102,9 +123,6 @@ pub struct Args {
#[clap(long, env, help = "Share storage across channels")]
pub shared_storage: bool,
#[clap(short, long, help = "domain name for initialization")]
pub domain: Option<String>,
#[clap(short, long, help = "Create admin user")]
pub username: Option<String>,
@ -113,6 +131,31 @@ pub struct Args {
#[clap(short, long, help = "Admin password")]
pub password: Option<String>,
#[clap(long, help = "Path to playlist, or playlist root folder.")]
pub playlist: Option<PathBuf>,
#[clap(
short,
long,
help = "Start time in 'hh:mm:ss', 'now' for start with first"
)]
pub start: Option<String>,
#[clap(short = 'T', long, help = "JSON Template file for generating playlist")]
pub template: Option<PathBuf>,
#[clap(short, long, help = "Set output mode: desktop, hls, null, stream")]
pub output: Option<OutputMode>,
#[clap(short, long, help = "Set audio volume")]
pub volume: Option<f64>,
#[clap(long, help = "Skip validation process")]
pub skip_validation: bool,
#[clap(long, help = "Only validate given playlist")]
pub validate: bool,
}
fn global_user(args: &mut Args) {
@ -243,11 +286,18 @@ pub async fn run_args(pool: &Pool<Sqlite>) -> Result<(), i32> {
global.shared_storage = shared_store.trim().to_lowercase().starts_with('y');
if let Err(e) = handles::update_global(pool, global).await {
if let Err(e) = handles::update_global(pool, global.clone()).await {
eprintln!("{e}");
return Err(1);
};
if !global.shared_storage {
let mut channel = handles::select_channel(pool, &1).await.unwrap();
channel.preview_url = "http://127.0.0.1:8787/1/stream.m3u8".to_string();
handles::update_channel(pool, 1, channel).await.unwrap();
};
println!("Set global settings...");
}

View File

@ -1,5 +1,6 @@
use std::{
io,
path::Path,
sync::{Arc, Mutex},
};
@ -27,15 +28,34 @@ async fn map_global_admins(conn: &Pool<Sqlite>) -> Result<(), ServiceError> {
Ok(())
}
fn preview_url(url: &str, id: i32) -> String {
let url_path = Path::new(url);
if let Some(parent) = url_path.parent() {
if let Some(filename) = url_path.file_name() {
let new_path = parent.join(id.to_string()).join(filename);
if let Some(new_url) = new_path.to_str() {
return new_url.to_string();
}
}
}
url.to_string()
}
pub async fn create_channel(
conn: &Pool<Sqlite>,
controllers: Arc<Mutex<ChannelController>>,
queue: Arc<Mutex<Vec<Arc<Mutex<MailQueue>>>>>,
target_channel: Channel,
) -> Result<Channel, ServiceError> {
let channel = handles::insert_channel(conn, target_channel).await?;
let mut channel = handles::insert_channel(conn, target_channel).await?;
let output_param = format!("-c:v libx264 -crf 23 -x264-params keyint=50:min-keyint=25:scenecut=-1 -maxrate 1300k -bufsize 2600k -preset faster -tune zerolatency -profile:v Main -level 3.1 -c:a aac -ar 44100 -b:a 128k -flags +cgop -f hls -hls_time 6 -hls_list_size 600 -hls_flags append_list+delete_segments+omit_endlist -hls_segment_filename live/stream{0}-%d.ts live/stream{0}.m3u8", channel.id);
channel.preview_url = preview_url(&channel.preview_url, channel.id);
handles::update_channel(conn, channel.id, channel.clone()).await?;
let output_param = format!("-c:v libx264 -crf 23 -x264-params keyint=50:min-keyint=25:scenecut=-1 -maxrate 1300k -bufsize 2600k -preset faster -tune zerolatency -profile:v Main -level 3.1 -c:a aac -ar 44100 -b:a 128k -flags +cgop -f hls -hls_time 6 -hls_list_size 600 -hls_flags append_list+delete_segments+omit_endlist -hls_segment_filename {0}/stream-%d.ts {0}/stream.m3u8", channel.id);
handles::insert_advanced_configuration(conn, channel.id).await?;
handles::insert_configuration(conn, channel.id, output_param).await?;
@ -65,8 +85,6 @@ pub async fn delete_channel(
queue: Arc<Mutex<Vec<Arc<Mutex<MailQueue>>>>>,
) -> Result<(), ServiceError> {
let channel = handles::select_channel(conn, &id).await?;
// TODO: Remove Channel controller
handles::delete_channel(conn, &channel.id).await?;
controllers

View File

@ -1,5 +1,5 @@
use std::{
fmt,
fmt, io,
path::{Path, PathBuf},
str::FromStr,
};
@ -9,12 +9,13 @@ use flexi_logger::Level;
use serde::{Deserialize, Serialize};
use shlex::split;
use sqlx::{Pool, Sqlite};
use tokio::io::AsyncReadExt;
use tokio::{fs, io::AsyncReadExt};
use crate::db::{handles, models};
use crate::utils::{files::norm_abs_path, free_tcp_socket, time_to_sec};
use crate::vec_strings;
use crate::AdvancedConfig;
use crate::ARGS;
use super::errors::ServiceError;
@ -682,7 +683,12 @@ impl PlayoutConfig {
for item in cmd.iter_mut() {
if item.ends_with(".ts") || (item.ends_with(".m3u8") && item != "master.m3u8") {
if let Ok((hls_path, _, _)) = norm_abs_path(&global.hls_path, item) {
let filename = Path::new(item)
.file_name()
.unwrap()
.to_string_lossy()
.to_string();
if let Ok((hls_path, _, _)) = norm_abs_path(&global.hls_path, &filename) {
item.clone_from(&hls_path.to_string_lossy().to_string());
};
}
@ -795,3 +801,63 @@ fn pre_audio_codec(proc_filter: &str, ingest_filter: &str, channel_count: u8) ->
codec
}
/// Read command line arguments, and override the config with them.
pub async fn get_config(pool: &Pool<Sqlite>, channel_id: i32) -> Result<PlayoutConfig, io::Error> {
let mut config = PlayoutConfig::new(pool, channel_id).await;
let args = ARGS.clone();
config.general.generate = args.generate;
config.general.validate = args.validate;
config.general.skip_validation = args.skip_validation;
if let Some(template_file) = args.template {
let mut f = fs::File::options()
.read(true)
.write(false)
.open(template_file)
.await?;
let mut buffer = Vec::new();
f.read_to_end(&mut buffer).await?;
let mut template: Template = serde_json::from_slice(&buffer)?;
template.sources.sort_by(|d1, d2| d1.start.cmp(&d2.start));
config.general.template = Some(template);
}
if let Some(paths) = args.gen_paths {
config.storage.paths = paths;
}
if let Some(playlist) = args.playlist {
config.global.playlist_path = playlist;
}
if let Some(folder) = args.folder {
config.global.storage_path = folder;
config.processing.mode = ProcessMode::Folder;
}
if let Some(start) = args.start {
config.playlist.day_start.clone_from(&start);
config.playlist.start_sec = Some(time_to_sec(&start));
}
if let Some(output) = args.output {
config.output.mode = output;
if config.output.mode == OutputMode::Null {
config.output.output_count = 1;
config.output.output_filter = None;
config.output.output_cmd = Some(vec_strings!["-f", "null", "-"]);
}
}
if let Some(volume) = args.volume {
config.processing.volume = volume;
}
Ok(config)
}

View File

@ -119,6 +119,12 @@ impl From<uuid::Error> for ServiceError {
}
}
impl From<serde_json::Error> for ServiceError {
fn from(err: serde_json::Error) -> ServiceError {
ServiceError::BadRequest(err.to_string())
}
}
#[derive(Debug, Display)]
pub enum ProcessError {
#[display(fmt = "Failed to spawn ffmpeg/ffprobe. {}", _0)]

@ -1 +1 @@
Subproject commit 372c759c8909b5bca75c5b637e645aa8f090ef05
Subproject commit 7f9eead8dd6cdce31a2f31cbd69c81d375f96072