first channel can run

This commit is contained in:
jb-alvarado 2024-06-07 10:44:20 +02:00
parent 687bb2e1b9
commit 6a3c34a925
11 changed files with 275 additions and 319 deletions

380
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -16,7 +16,7 @@ chrono = { version = "0.4", default-features = false, features = ["clock", "std"
clap = { version = "4.3", features = ["derive"] }
crossbeam-channel = "0.5"
futures = "0.3"
itertools = "0.12"
itertools = "0.13"
notify = "6.0"
notify-debouncer-full = { version = "*", default-features = false }
rand = "0.8"
@ -26,7 +26,7 @@ serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
simplelog = { version = "0.12", features = ["paris"] }
tiny_http = { version = "0.12", default-features = false }
zeromq = { version = "0.3", default-features = false, features = [
zeromq = { version = "0.4", default-features = false, features = [
"async-std-runtime",
"tcp-transport",
] }

View File

@ -31,7 +31,7 @@ ffprobe = "0.4"
flexi_logger = { version = "0.28", features = ["kv", "colors"] }
futures-util = { version = "0.3", default-features = false, features = ["std"] }
home = "0.5"
itertools = "0.12"
itertools = "0.13"
jsonwebtoken = "9"
lazy_static = "1.4"
lettre = { version = "0.11", features = ["builder", "rustls-tls", "smtp-transport", "tokio1", "tokio1-rustls-tls"], default-features = false }

View File

@ -1,56 +0,0 @@
use std::fmt;
use flexi_logger::{
filter::{LogLineFilter, LogLineWriter},
DeferredNow, FlexiLoggerError, Logger,
};
use log::info;
use log::kv::Key;
#[derive(Debug)]
enum Target {
Terminal,
File,
}
impl Target {
fn as_str(&self) -> &'static str {
match *self {
Target::Terminal => "terminal",
Target::File => "file",
}
}
}
impl fmt::Display for Target {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
Target::Terminal => write!(f, "terminal"),
Target::File => write!(f, "file"),
}
}
}
pub struct Console;
impl LogLineFilter for Console {
fn write(
&self,
now: &mut DeferredNow,
record: &log::Record,
log_line_writer: &dyn LogLineWriter,
) -> std::io::Result<()> {
println!("{:?}", record.key_values().get(Key::from_str("target")));
log_line_writer.write(now, record)?;
Ok(())
}
}
fn main() -> Result<(), FlexiLoggerError> {
Logger::try_with_str("debug")?
.filter(Box::new(Console))
.start()?;
info!(target: Target::Terminal.as_str(), "info logging");
Ok(())
}

View File

@ -2,7 +2,7 @@ use std::{
collections::HashSet,
env, io,
path::PathBuf,
process::exit,
process::{self, exit},
sync::{Arc, Mutex},
thread,
};
@ -27,9 +27,8 @@ use ffplayout::{
sse::{broadcast::Broadcaster, routes::*, AuthState},
utils::{
config::PlayoutConfig,
control::ProcessControl,
db_path, init_globales,
logging::{init_logging, MailQueue, Target},
logging::{init_logging, MailQueue},
run_args,
},
ARGS,
@ -41,6 +40,14 @@ use ffplayout::utils::public_path;
#[cfg(all(not(debug_assertions), feature = "embed_frontend"))]
include!(concat!(env!("OUT_DIR"), "/generated.rs"));
fn thread_counter() -> usize {
let available_threads = thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(1);
(available_threads / 2).max(2)
}
async fn validator(
req: ServiceRequest,
credentials: BearerAuth,
@ -69,7 +76,7 @@ async fn main() -> std::io::Result<()> {
.await
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
let channel_controller = Arc::new(Mutex::new(ChannelController::new()));
let channel_controllers = Arc::new(Mutex::new(ChannelController::new()));
let channels = handles::select_all_channels(&pool)
.await
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
@ -89,16 +96,13 @@ async fn main() -> std::io::Result<()> {
}
};
let channel_manager = Arc::new(Mutex::new(ChannelManager::new(
channel.clone(),
config.clone(),
)));
let channel_manager = ChannelManager::new(channel.clone(), config.clone());
channel_controller
channel_controllers
.lock()
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?
.add(channel_manager);
let control_clone = channel_controller.clone();
.add(channel_manager.clone());
let controllers = channel_controllers.clone();
let m_queue = Arc::new(Mutex::new(MailQueue::new(channel.id, config.mail)));
if let Ok(mut mqs) = mail_queues.lock() {
@ -106,10 +110,14 @@ async fn main() -> std::io::Result<()> {
}
if channel.active {
info!(target: Target::file(), channel = channel.id; "Start Playout");
thread::spawn(move || {
controller::start(control_clone);
if let Err(e) = controller::start(channel_manager) {
error!("{e}");
};
if controllers.lock().unwrap().run_count() == 0 {
process::exit(0)
};
});
}
}
@ -123,13 +131,15 @@ async fn main() -> std::io::Result<()> {
let ip_port = conn.split(':').collect::<Vec<&str>>();
let addr = ip_port[0];
let port = ip_port[1].parse::<u16>().unwrap();
let engine_process = web::Data::new(ProcessControl::new());
let controllers = web::Data::from(channel_controllers);
let auth_state = web::Data::new(AuthState {
uuids: tokio::sync::Mutex::new(HashSet::new()),
});
let broadcast_data = Broadcaster::create();
let thread_count = thread_counter();
info!("running ffplayout API, listen on http://{conn}");
info!("Running ffplayout API, listen on http://{conn}");
debug!("Use {thread_count} threads for the webserver");
// no 'allow origin' here, give it to the reverse proxy
HttpServer::new(move || {
@ -144,7 +154,7 @@ async fn main() -> std::io::Result<()> {
let mut web_app = App::new()
.app_data(db_pool)
.app_data(web::Data::from(queues))
.app_data(engine_process.clone())
.app_data(controllers.clone())
.app_data(auth_state.clone())
.app_data(web::Data::from(Arc::clone(&broadcast_data)))
.wrap(logger)
@ -231,6 +241,7 @@ async fn main() -> std::io::Result<()> {
web_app
})
.bind((addr, port))?
.workers(thread_count)
.run()
.await
} else {

View File

@ -5,6 +5,7 @@ use std::{
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc, Mutex,
},
thread,
};
#[cfg(not(windows))]
@ -16,7 +17,7 @@ use serde::{Deserialize, Serialize};
use crate::db::models::Channel;
use crate::player::{
output::{player, write_hls},
utils::Media,
utils::{folder::fill_filler_list, Media},
};
use crate::utils::{
config::{OutputMode::*, PlayoutConfig},
@ -245,13 +246,27 @@ impl ChannelController {
channel.id != channel_id
});
}
pub fn run_count(&self) -> usize {
self.channels
.iter()
.filter(|manager| manager.is_alive.load(Ordering::SeqCst))
.count()
}
}
pub fn start(channel: Arc<Mutex<ChannelManager>>) -> Result<(), ProcessError> {
let mode = channel.lock()?.config.lock()?.out.mode.clone();
pub fn start(channel: ChannelManager) -> Result<(), ProcessError> {
let config = channel.config.lock()?.clone();
let mode = config.out.mode.clone();
let play_control = PlayerControl::new();
let play_control_clone = play_control.clone();
let play_status = PlayoutStatus::new();
// Fill filler list, can also be a single file.
thread::spawn(move || {
fill_filler_list(&config, Some(play_control_clone));
});
match mode {
// write files/playlist to HLS m3u8 playlist
HLS => write_hls(channel, play_control, play_status),

View File

@ -1,7 +1,7 @@
use std::{
io::{BufRead, BufReader, Read},
process::{exit, ChildStderr, Command, Stdio},
sync::{atomic::Ordering, Arc, Mutex},
sync::atomic::Ordering,
thread,
};
@ -25,7 +25,7 @@ fn server_monitor(
level: &str,
ignore: Vec<String>,
buffer: BufReader<ChildStderr>,
channel_mgr: Arc<Mutex<ChannelManager>>,
channel_mgr: ChannelManager,
) -> Result<(), ProcessError> {
for line in buffer.lines() {
let line = line?;
@ -37,7 +37,7 @@ fn server_monitor(
}
if line.contains("rtmp") && line.contains("Unexpected stream") && !valid_stream(&line) {
if let Err(e) = channel_mgr.lock()?.stop(Ingest) {
if let Err(e) = channel_mgr.stop(Ingest) {
error!("{e}");
};
}
@ -46,7 +46,7 @@ fn server_monitor(
.iter()
.any(|i| line.contains(*i))
{
channel_mgr.lock()?.stop_all();
channel_mgr.stop_all();
}
}
@ -59,7 +59,7 @@ fn server_monitor(
pub fn ingest_server(
config: PlayoutConfig,
ingest_sender: Sender<(usize, [u8; 65088])>,
channel_mgr: Arc<Mutex<ChannelManager>>,
channel_mgr: ChannelManager,
) -> Result<(), ProcessError> {
let mut buffer: [u8; 65088] = [0; 65088];
let mut server_cmd = vec_strings!["-hide_banner", "-nostats", "-v", "level+info"];
@ -67,8 +67,8 @@ pub fn ingest_server(
let mut dummy_media = Media::new(0, "Live Stream", false);
dummy_media.unit = Ingest;
dummy_media.add_filter(&config, &None);
let is_terminated = channel_mgr.lock()?.is_terminated.clone();
let ingest_is_running = channel_mgr.lock()?.ingest_is_running.clone();
let is_terminated = channel_mgr.is_terminated.clone();
let ingest_is_running = channel_mgr.ingest_is_running.clone();
if let Some(ingest_input_cmd) = config
.advanced
@ -93,7 +93,7 @@ pub fn ingest_server(
if let Some(url) = stream_input.iter().find(|s| s.contains("://")) {
if !test_tcp_port(url) {
channel_mgr.lock()?.stop_all();
channel_mgr.stop_all();
exit(1);
}
@ -126,7 +126,7 @@ pub fn ingest_server(
let error_reader_thread =
thread::spawn(move || server_monitor(&level, ignore, server_err, proc_ctl));
*channel_mgr.lock()?.ingest.lock().unwrap() = Some(server_proc);
*channel_mgr.ingest.lock().unwrap() = Some(server_proc);
is_running = false;
loop {
@ -147,10 +147,7 @@ pub fn ingest_server(
if let Err(e) = ingest_sender.send((bytes_len, buffer)) {
error!("Ingest server write error: {e:?}");
channel_mgr
.lock()?
.is_terminated
.store(true, Ordering::SeqCst);
is_terminated.store(true, Ordering::SeqCst);
break;
}
} else {
@ -161,7 +158,7 @@ pub fn ingest_server(
drop(ingest_reader);
ingest_is_running.store(false, Ordering::SeqCst);
if let Err(e) = channel_mgr.lock()?.wait(Ingest) {
if let Err(e) = channel_mgr.wait(Ingest) {
error!("{e}")
}

View File

@ -20,7 +20,7 @@ out:
use std::{
io::{BufRead, BufReader},
process::{exit, Command, Stdio},
sync::{atomic::Ordering, Arc, Mutex},
sync::atomic::Ordering,
thread::{self, sleep},
time::Duration,
};
@ -45,7 +45,7 @@ use crate::{
fn ingest_to_hls_server(
config: PlayoutConfig,
playout_stat: PlayoutStatus,
channel_mgr: Arc<Mutex<ChannelManager>>,
channel_mgr: ChannelManager,
) -> Result<(), ProcessError> {
let playlist_init = playout_stat.list_init;
@ -54,8 +54,8 @@ fn ingest_to_hls_server(
let mut dummy_media = Media::new(0, "Live Stream", false);
dummy_media.unit = Ingest;
let is_terminated = channel_mgr.lock()?.is_terminated.clone();
let ingest_is_running = channel_mgr.lock()?.ingest_is_running.clone();
let is_terminated = channel_mgr.is_terminated.clone();
let ingest_is_running = channel_mgr.ingest_is_running.clone();
if let Some(ingest_input_cmd) = config
.advanced
@ -71,7 +71,7 @@ fn ingest_to_hls_server(
if let Some(url) = stream_input.iter().find(|s| s.contains("://")) {
if !test_tcp_port(url) {
channel_mgr.lock()?.stop_all();
channel_mgr.stop_all();
exit(1);
}
@ -101,14 +101,14 @@ fn ingest_to_hls_server(
};
let server_err = BufReader::new(server_proc.stderr.take().unwrap());
*channel_mgr.lock()?.ingest.lock().unwrap() = Some(server_proc);
*channel_mgr.ingest.lock().unwrap() = Some(server_proc);
is_running = false;
for line in server_err.lines() {
let line = line?;
if line.contains("rtmp") && line.contains("Unexpected stream") && !valid_stream(&line) {
if let Err(e) = proc_ctl.lock()?.stop(Ingest) {
if let Err(e) = proc_ctl.stop(Ingest) {
error!("{e}");
};
}
@ -120,7 +120,7 @@ fn ingest_to_hls_server(
info!("Switch from {} to live ingest", config.processing.mode);
if let Err(e) = channel_mgr.lock()?.stop(Decoder) {
if let Err(e) = channel_mgr.stop(Decoder) {
error!("{e}");
}
}
@ -134,7 +134,7 @@ fn ingest_to_hls_server(
ingest_is_running.store(false, Ordering::SeqCst);
if let Err(e) = channel_mgr.lock()?.wait(Ingest) {
if let Err(e) = channel_mgr.wait(Ingest) {
error!("{e}")
}
@ -150,18 +150,18 @@ fn ingest_to_hls_server(
///
/// Write with single ffmpeg instance directly to a HLS playlist.
pub fn write_hls(
channel_mgr: Arc<Mutex<ChannelManager>>,
channel_mgr: ChannelManager,
player_control: PlayerControl,
playout_stat: PlayoutStatus,
) -> Result<(), ProcessError> {
let config = channel_mgr.lock()?.config.lock()?.clone();
let config = channel_mgr.config.lock()?.clone();
let config_clone = config.clone();
let ff_log_format = format!("level+{}", config.logging.ffmpeg_level.to_lowercase());
let play_stat = playout_stat.clone();
let play_stat2 = playout_stat.clone();
let channel_mgr_c = channel_mgr.clone();
let is_terminated = channel_mgr.lock()?.is_terminated.clone();
let ingest_is_running = channel_mgr.lock()?.ingest_is_running.clone();
let is_terminated = channel_mgr.is_terminated.clone();
let ingest_is_running = channel_mgr.ingest_is_running.clone();
let get_source = source_generator(
config.clone(),
@ -261,13 +261,13 @@ pub fn write_hls(
};
let enc_err = BufReader::new(dec_proc.stderr.take().unwrap());
*channel_mgr.lock()?.decoder.lock().unwrap() = Some(dec_proc);
*channel_mgr.decoder.lock().unwrap() = Some(dec_proc);
if let Err(e) = stderr_reader(enc_err, ignore, Decoder, channel_mgr.clone()) {
error!("{e:?}")
};
if let Err(e) = channel_mgr.lock()?.wait(Decoder) {
if let Err(e) = channel_mgr.wait(Decoder) {
error!("{e}");
}
@ -278,7 +278,7 @@ pub fn write_hls(
sleep(Duration::from_secs(1));
channel_mgr.lock()?.stop_all();
channel_mgr.stop_all();
Ok(())
}

View File

@ -1,7 +1,7 @@
use std::{
io::{prelude::*, BufReader, BufWriter, Read},
process::{Command, Stdio},
sync::{atomic::Ordering, Arc, Mutex},
sync::atomic::Ordering,
thread::{self, sleep},
time::Duration,
};
@ -34,11 +34,11 @@ use crate::vec_strings;
/// When a live ingest arrive, it stops the current playing and switch to the live source.
/// When ingest stops, it switch back to playlist/folder mode.
pub fn player(
channel_mgr: Arc<Mutex<ChannelManager>>,
channel_mgr: ChannelManager,
play_control: &PlayerControl,
playout_stat: PlayoutStatus,
) -> Result<(), ProcessError> {
let config = channel_mgr.lock()?.config.lock()?.clone();
let config = channel_mgr.config.lock()?.clone();
let config_clone = config.clone();
let ff_log_format = format!("level+{}", config.logging.ffmpeg_level.to_lowercase());
let ignore_enc = config.logging.ignore_lines.clone();
@ -47,16 +47,15 @@ pub fn player(
let playlist_init = playout_stat.list_init.clone();
let play_stat = playout_stat.clone();
let channel = channel_mgr.lock()?;
let is_terminated = channel_mgr.lock()?.is_terminated.clone();
let ingest_is_running = channel_mgr.lock()?.ingest_is_running.clone();
let is_terminated = channel_mgr.is_terminated.clone();
let ingest_is_running = channel_mgr.ingest_is_running.clone();
// get source iterator
let node_sources = source_generator(
config.clone(),
play_control,
playout_stat,
channel.is_terminated.clone(),
is_terminated.clone(),
);
// get ffmpeg output instance
@ -70,7 +69,7 @@ pub fn player(
let mut enc_writer = BufWriter::new(enc_proc.stdin.take().unwrap());
let enc_err = BufReader::new(enc_proc.stderr.take().unwrap());
*channel.encoder.lock().unwrap() = Some(enc_proc);
*channel_mgr.encoder.lock().unwrap() = Some(enc_proc);
let enc_p_ctl = channel_mgr.clone();
// spawn a thread to log ffmpeg output error messages
@ -188,7 +187,7 @@ pub fn player(
let mut dec_reader = BufReader::new(dec_proc.stdout.take().unwrap());
let dec_err = BufReader::new(dec_proc.stderr.take().unwrap());
*channel_mgr.lock()?.decoder.lock().unwrap() = Some(dec_proc);
*channel_mgr.clone().decoder.lock().unwrap() = Some(dec_proc);
let channel_mgr_c = channel_mgr.clone();
let error_decoder_thread =
@ -200,7 +199,7 @@ pub fn player(
if !live_on {
info!("Switch from {} to live ingest", config.processing.mode);
if let Err(e) = channel_mgr.lock()?.stop(Decoder) {
if let Err(e) = channel_mgr.stop(Decoder) {
error!("{e}")
}
@ -245,7 +244,7 @@ pub fn player(
}
}
if let Err(e) = channel_mgr.lock()?.wait(Decoder) {
if let Err(e) = channel_mgr.wait(Decoder) {
error!("{e}")
}
@ -258,7 +257,7 @@ pub fn player(
sleep(Duration::from_secs(1));
channel_mgr.lock()?.stop_all();
channel_mgr.stop_all();
if let Err(e) = error_encoder_thread.join() {
error!("{e:?}");

View File

@ -799,7 +799,7 @@ pub fn stderr_reader(
buffer: BufReader<ChildStderr>,
ignore: Vec<String>,
suffix: ProcessUnit,
channel_mgr: Arc<Mutex<ChannelManager>>,
channel_mgr: ChannelManager,
) -> Result<(), ProcessError> {
for line in buffer.lines() {
let line = line?;
@ -832,7 +832,7 @@ pub fn stderr_reader(
|| (line.contains("No such file or directory")
&& !line.contains("failed to delete old segment"))
{
channel_mgr.lock()?.stop_all();
channel_mgr.stop_all();
exit(1);
}
}

View File

@ -393,6 +393,8 @@ pub fn init_logging(mail_queues: Arc<Mutex<Vec<Arc<Mutex<MailQueue>>>>>) -> io::
let mut builder = LogSpecification::builder();
builder
.default(log_level)
.module("actix_files", LevelFilter::Error)
.module("actix_web", LevelFilter::Error)
.module("hyper", LevelFilter::Error)
.module("libc", LevelFilter::Error)
.module("neli", LevelFilter::Error)