run channels with argument

This commit is contained in:
jb-alvarado 2024-06-12 17:35:20 +02:00
parent 26ec945921
commit 1daea32eb9
5 changed files with 109 additions and 40 deletions

View File

@ -532,15 +532,17 @@ async fn update_playout_config(
controllers: web::Data<Mutex<ChannelController>>,
) -> Result<impl Responder, ServiceError> {
let manager = controllers.lock().unwrap().get(*id).unwrap();
let mut config = manager.config.lock().unwrap();
let id = config.general.id;
let channel_id = config.general.channel_id;
let general_config = manager.config.lock().unwrap().general.clone();
let id = general_config.id;
let channel_id = general_config.channel_id;
let db_config = Configuration::from(id, channel_id, data.into_inner());
if let Err(e) = handles::update_configuration(&pool.into_inner(), db_config.clone()).await {
return Err(ServiceError::Conflict(format!("{e}")));
}
let mut config = manager.config.lock().unwrap();
config.general.stop_threshold = db_config.stop_threshold;
config.mail.subject = db_config.subject;
config.mail.smtp_server = db_config.smtp_server;
@ -555,7 +557,7 @@ async fn update_playout_config(
config.logging.detect_silence = db_config.detect_silence;
config.logging.ignore_lines = db_config
.ignore_lines
.split(";")
.split(';')
.map(|l| l.to_string())
.collect();
config.processing.mode = string_to_processing_mode(db_config.processing_mode);
@ -587,7 +589,7 @@ async fn update_playout_config(
config.storage.filler = PathBuf::from(db_config.filler);
config.storage.extensions = db_config
.extensions
.split(";")
.split(';')
.map(|l| l.to_string())
.collect();
config.storage.shuffle = db_config.shuffle;

View File

@ -26,7 +26,7 @@ use ffplayout::{
sse::{broadcast::Broadcaster, routes::*, AuthState},
utils::{
config::PlayoutConfig,
db_path, init_globales,
init_globales,
logging::{init_logging, MailQueue},
run_args,
},
@ -79,41 +79,35 @@ async fn main() -> std::io::Result<()> {
exit(c);
}
let channel_controllers = Arc::new(Mutex::new(ChannelController::new()));
let channels = handles::select_all_channels(&pool)
.await
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
let mail_queues = Arc::new(Mutex::new(vec![]));
init_globales(&pool).await;
init_logging(mail_queues.clone())?;
for channel in channels.iter() {
let config = PlayoutConfig::new(&pool, channel.id).await;
let channel_manager =
ChannelManager::new(Some(pool.clone()), channel.clone(), config.clone());
channel_controllers
.lock()
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?
.add(channel_manager.clone());
let m_queue = Arc::new(Mutex::new(MailQueue::new(channel.id, config.mail)));
if let Ok(mut mqs) = mail_queues.lock() {
mqs.push(m_queue.clone());
}
if channel.active {
channel_manager.async_start().await;
}
}
let channel_controllers = Arc::new(Mutex::new(ChannelController::new()));
if let Some(conn) = &ARGS.listen {
if db_path().is_err() {
error!("Database is not initialized! Init DB first and add admin user.");
exit(1);
let channels = handles::select_all_channels(&pool)
.await
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
for channel in channels.iter() {
let config = PlayoutConfig::new(&pool, channel.id).await;
let manager = ChannelManager::new(Some(pool.clone()), channel.clone(), config.clone());
let m_queue = Arc::new(Mutex::new(MailQueue::new(channel.id, config.mail)));
channel_controllers
.lock()
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?
.add(manager.clone());
if let Ok(mut mqs) = mail_queues.lock() {
mqs.push(m_queue.clone());
}
if channel.active {
manager.async_start().await;
}
}
let ip_port = conn.split(':').collect::<Vec<&str>>();
@ -229,6 +223,35 @@ async fn main() -> std::io::Result<()> {
.workers(thread_count)
.run()
.await
} else if ARGS.list_channels {
let channels = handles::select_all_channels(&pool)
.await
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
let ids = channels.iter().map(|c| c.id).collect::<Vec<i32>>();
info!("Available channels: {ids:?}");
Ok(())
} else if let Some(channels) = &ARGS.channels {
for (index, channel_id) in channels.iter().enumerate() {
let channel = handles::select_channel(&pool, &channel_id).await.unwrap();
let config = PlayoutConfig::new(&pool, *channel_id).await;
let manager = ChannelManager::new(Some(pool.clone()), channel.clone(), config.clone());
let m_queue = Arc::new(Mutex::new(MailQueue::new(*channel_id, config.mail)));
channel_controllers
.lock()
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?
.add(manager.clone());
if let Ok(mut mqs) = mail_queues.lock() {
mqs.push(m_queue.clone());
}
manager.foreground_start(index).await;
}
Ok(())
} else {
error!("Run ffplayout with listen parameter!");

View File

@ -24,6 +24,7 @@ use crate::utils::{
config::{OutputMode::*, PlayoutConfig},
errors::ProcessError,
};
use crate::ARGS;
/// Defined process units.
#[derive(Clone, Debug, Default, Copy, Eq, Serialize, Deserialize, PartialEq)]
@ -122,6 +123,44 @@ impl ChannelManager {
}
}
pub async fn foreground_start(&self, index: usize) {
if !self.is_alive.load(Ordering::SeqCst) {
self.run_count.fetch_add(1, Ordering::SeqCst);
self.is_alive.store(true, Ordering::SeqCst);
self.is_terminated.store(false, Ordering::SeqCst);
let pool_clone = self.db_pool.clone().unwrap();
let self_clone = self.clone();
let channel_id = self.channel.lock().unwrap().id;
if let Err(e) = handles::update_player(&pool_clone, channel_id, true).await {
error!("Unable write to player status: {e}");
};
if index + 1 == ARGS.channels.clone().unwrap_or_default().len() {
let run_count = self_clone.run_count.clone();
tokio::task::spawn_blocking(move || {
if let Err(e) = start_channel(self_clone) {
run_count.fetch_sub(1, Ordering::SeqCst);
error!("{e}");
}
})
.await
.unwrap();
} else {
thread::spawn(move || {
let run_count = self_clone.run_count.clone();
if let Err(e) = start_channel(self_clone) {
run_count.fetch_sub(1, Ordering::SeqCst);
error!("{e}");
};
});
}
}
}
pub fn stop(&self, unit: ProcessUnit) -> Result<(), ProcessError> {
let mut channel = self.channel.lock()?;

View File

@ -13,6 +13,17 @@ pub struct Args {
#[clap(long, help = "path to database file")]
pub db: Option<PathBuf>,
#[clap(
short,
long,
help = "Run channels by ids immediately (works without webserver and frontend, no listening parameter is needed)",
num_args = 1..,
)]
pub channels: Option<Vec<i32>>,
#[clap(long, help = "List available channel ids")]
pub list_channels: bool,
#[clap(long, help = "path to public files")]
pub public: Option<PathBuf>,

View File

@ -317,12 +317,6 @@ pub fn public_path() -> PathBuf {
pub async fn run_args() -> Result<(), i32> {
let mut args = ARGS.clone();
if args.listen.is_none() && !args.ask && args.username.is_none() {
eprintln!("Wrong number of arguments! Run ffplayout --help for more information.");
return Err(0);
}
if args.ask {
let mut user = String::new();
print!("Username: ");