work on PlayoutControl and PlayoutStatus

This commit is contained in:
jb-alvarado 2024-06-10 21:57:48 +02:00
parent f908e29fc5
commit 17e728ead1
17 changed files with 276 additions and 392 deletions

View File

@ -493,8 +493,10 @@ async fn get_playout_config(
id: web::Path<i32>,
_details: AuthDetails<Role>,
) -> Result<impl Responder, ServiceError> {
if let Ok(channel) = handles::select_channel(&pool.into_inner(), &id).await {
if let Ok(_channel) = handles::select_channel(&pool.into_inner(), &id).await {
// TODO: get config
return Ok("Update playout config success.");
};
Err(ServiceError::InternalServerError)
@ -511,9 +513,9 @@ async fn get_playout_config(
async fn update_playout_config(
pool: web::Data<Pool<Sqlite>>,
id: web::Path<i32>,
data: web::Json<PlayoutConfig>,
_data: web::Json<PlayoutConfig>,
) -> Result<impl Responder, ServiceError> {
if let Ok(channel) = handles::select_channel(&pool.into_inner(), &id).await {
if let Ok(_channel) = handles::select_channel(&pool.into_inner(), &id).await {
// TODO: update config
return Ok("Update playout config success.");
@ -722,13 +724,10 @@ pub async fn media_current(
#[post("/control/{id}/process/")]
#[protect(any("Role::Admin", "Role::User"), ty = "Role")]
pub async fn process_control(
pool: web::Data<Pool<Sqlite>>,
id: web::Path<i32>,
_id: web::Path<i32>,
_proc: web::Json<Process>,
_engine_process: web::Data<ProcessControl>,
) -> Result<impl Responder, ServiceError> {
let (_config, _) = playout_config(&pool.clone().into_inner(), &id).await?;
Ok(web::Json("no implemented"))
}
@ -743,11 +742,14 @@ pub async fn process_control(
#[get("/playlist/{id}")]
#[protect(any("Role::Admin", "Role::User"), ty = "Role")]
pub async fn get_playlist(
pool: web::Data<Pool<Sqlite>>,
id: web::Path<i32>,
obj: web::Query<DateObj>,
controllers: web::Data<Mutex<ChannelController>>,
) -> Result<impl Responder, ServiceError> {
match read_playlist(&pool.into_inner(), *id, obj.date.clone()).await {
let manager = controllers.lock().unwrap().get(*id).unwrap();
let config = manager.config.lock().unwrap().clone();
match read_playlist(&config, obj.date.clone()).await {
Ok(playlist) => Ok(web::Json(playlist)),
Err(e) => Err(e),
}
@ -763,11 +765,14 @@ pub async fn get_playlist(
#[post("/playlist/{id}/")]
#[protect(any("Role::Admin", "Role::User"), ty = "Role")]
pub async fn save_playlist(
pool: web::Data<Pool<Sqlite>>,
id: web::Path<i32>,
data: web::Json<JsonPlaylist>,
controllers: web::Data<Mutex<ChannelController>>,
) -> Result<impl Responder, ServiceError> {
match write_playlist(&pool.into_inner(), *id, data.into_inner()).await {
let manager = controllers.lock().unwrap().get(*id).unwrap();
let config = manager.config.lock().unwrap().clone();
match write_playlist(&config, data.into_inner()).await {
Ok(res) => Ok(web::Json(res)),
Err(e) => Err(e),
}
@ -794,11 +799,13 @@ pub async fn save_playlist(
#[post("/playlist/{id}/generate/{date}")]
#[protect(any("Role::Admin", "Role::User"), ty = "Role")]
pub async fn gen_playlist(
pool: web::Data<Pool<Sqlite>>,
params: web::Path<(i32, String)>,
data: Option<web::Json<PathsObj>>,
controllers: web::Data<Mutex<ChannelController>>,
) -> Result<impl Responder, ServiceError> {
let (mut config, channel) = playout_config(&pool.into_inner(), &params.0).await?;
let manager = controllers.lock().unwrap().get(params.0).unwrap();
let channel_name = manager.channel.lock().unwrap().name.clone();
let mut config = manager.config.lock().unwrap();
config.general.generate = Some(vec![params.1.clone()]);
if let Some(obj) = data {
@ -817,7 +824,7 @@ pub async fn gen_playlist(
config.general.template.clone_from(&obj.template);
}
match generate_playlist(config.to_owned(), channel.name).await {
match generate_playlist(config.clone(), channel_name).await {
Ok(playlist) => Ok(web::Json(playlist)),
Err(e) => Err(e),
}
@ -832,10 +839,13 @@ pub async fn gen_playlist(
#[delete("/playlist/{id}/{date}")]
#[protect(any("Role::Admin", "Role::User"), ty = "Role")]
pub async fn del_playlist(
pool: web::Data<Pool<Sqlite>>,
params: web::Path<(i32, String)>,
controllers: web::Data<Mutex<ChannelController>>,
) -> Result<impl Responder, ServiceError> {
match delete_playlist(&pool.into_inner(), params.0, &params.1).await {
let manager = controllers.lock().unwrap().get(params.0).unwrap();
let config = manager.config.lock().unwrap().clone();
match delete_playlist(&config, &params.1).await {
Ok(m) => Ok(web::Json(m)),
Err(e) => Err(e),
}
@ -869,11 +879,15 @@ pub async fn get_log(
#[post("/file/{id}/browse/")]
#[protect(any("Role::Admin", "Role::User"), ty = "Role")]
pub async fn file_browser(
pool: web::Data<Pool<Sqlite>>,
id: web::Path<i32>,
data: web::Json<PathObject>,
controllers: web::Data<Mutex<ChannelController>>,
) -> Result<impl Responder, ServiceError> {
match browser(&pool.into_inner(), *id, &data.into_inner()).await {
let manager = controllers.lock().unwrap().get(*id).unwrap();
let channel = manager.channel.lock().unwrap().clone();
let config = manager.config.lock().unwrap().clone();
match browser(&config, &channel, &data.into_inner()).await {
Ok(obj) => Ok(web::Json(obj)),
Err(e) => Err(e),
}
@ -888,11 +902,14 @@ pub async fn file_browser(
#[post("/file/{id}/create-folder/")]
#[protect(any("Role::Admin", "Role::User"), ty = "Role")]
pub async fn add_dir(
pool: web::Data<Pool<Sqlite>>,
id: web::Path<i32>,
data: web::Json<PathObject>,
controllers: web::Data<Mutex<ChannelController>>,
) -> Result<HttpResponse, ServiceError> {
create_directory(&pool.into_inner(), *id, &data.into_inner()).await
let manager = controllers.lock().unwrap().get(*id).unwrap();
let config = manager.config.lock().unwrap().clone();
create_directory(&config, &data.into_inner()).await
}
/// **Rename File**
@ -904,11 +921,14 @@ pub async fn add_dir(
#[post("/file/{id}/rename/")]
#[protect(any("Role::Admin", "Role::User"), ty = "Role")]
pub async fn move_rename(
pool: web::Data<Pool<Sqlite>>,
id: web::Path<i32>,
data: web::Json<MoveObject>,
controllers: web::Data<Mutex<ChannelController>>,
) -> Result<impl Responder, ServiceError> {
match rename_file(&pool.into_inner(), *id, &data.into_inner()).await {
let manager = controllers.lock().unwrap().get(*id).unwrap();
let config = manager.config.lock().unwrap().clone();
match rename_file(&config, &data.into_inner()).await {
Ok(obj) => Ok(web::Json(obj)),
Err(e) => Err(e),
}
@ -923,11 +943,14 @@ pub async fn move_rename(
#[post("/file/{id}/remove/")]
#[protect(any("Role::Admin", "Role::User"), ty = "Role")]
pub async fn remove(
pool: web::Data<Pool<Sqlite>>,
id: web::Path<i32>,
data: web::Json<PathObject>,
controllers: web::Data<Mutex<ChannelController>>,
) -> Result<impl Responder, ServiceError> {
match remove_file_or_folder(&pool.into_inner(), *id, &data.into_inner().source).await {
let manager = controllers.lock().unwrap().get(*id).unwrap();
let config = manager.config.lock().unwrap().clone();
match remove_file_or_folder(&config, &data.into_inner().source).await {
Ok(obj) => Ok(web::Json(obj)),
Err(e) => Err(e),
}
@ -942,12 +965,15 @@ pub async fn remove(
#[put("/file/{id}/upload/")]
#[protect(any("Role::Admin", "Role::User"), ty = "Role")]
async fn save_file(
pool: web::Data<Pool<Sqlite>>,
id: web::Path<i32>,
req: HttpRequest,
payload: Multipart,
obj: web::Query<FileObj>,
controllers: web::Data<Mutex<ChannelController>>,
) -> Result<HttpResponse, ServiceError> {
let manager = controllers.lock().unwrap().get(*id).unwrap();
let config = manager.config.lock().unwrap().clone();
let size: u64 = req
.headers()
.get("content-length")
@ -955,7 +981,7 @@ async fn save_file(
.and_then(|cls| cls.parse().ok())
.unwrap_or(0);
upload(&pool.into_inner(), *id, size, payload, &obj.path, false).await
upload(&config, size, payload, &obj.path, false).await
}
/// **Get File**
@ -967,12 +993,13 @@ async fn save_file(
/// ```
#[get("/file/{id}/{filename:.*}")]
async fn get_file(
pool: web::Data<Pool<Sqlite>>,
req: HttpRequest,
controllers: web::Data<Mutex<ChannelController>>,
) -> Result<actix_files::NamedFile, ServiceError> {
let id: i32 = req.match_info().query("id").parse()?;
let (config, _) = playout_config(&pool.into_inner(), &id).await?;
let storage_path = config.storage.path;
let manager = controllers.lock().unwrap().get(id).unwrap();
let config = manager.config.lock().unwrap();
let storage_path = config.storage.path.clone();
let file_path = req.match_info().query("filename");
let (path, _, _) = norm_abs_path(&storage_path, file_path)?;
let file = actix_files::NamedFile::open(path)?;
@ -1026,17 +1053,18 @@ async fn get_public(public: web::Path<String>) -> Result<actix_files::NamedFile,
#[put("/file/{id}/import/")]
#[protect(any("Role::Admin", "Role::User"), ty = "Role")]
async fn import_playlist(
pool: web::Data<Pool<Sqlite>>,
id: web::Path<i32>,
req: HttpRequest,
payload: Multipart,
obj: web::Query<ImportObj>,
controllers: web::Data<Mutex<ChannelController>>,
) -> Result<HttpResponse, ServiceError> {
let manager = controllers.lock().unwrap().get(*id).unwrap();
let channel_name = manager.channel.lock().unwrap().name.clone();
let config = manager.config.lock().unwrap().clone();
let file = obj.file.file_name().unwrap_or_default();
let path = env::temp_dir().join(file);
let path_clone = path.clone();
let (config, _) = playout_config(&pool.clone().into_inner(), &id).await?;
let channel = handles::select_channel(&pool.clone().into_inner(), &id).await?;
let size: u64 = req
.headers()
.get("content-length")
@ -1044,10 +1072,10 @@ async fn import_playlist(
.and_then(|cls| cls.parse().ok())
.unwrap_or(0);
upload(&pool.into_inner(), *id, size, payload, &path, true).await?;
upload(&config, size, payload, &path, true).await?;
let response = task::spawn_blocking(move || {
import_file(&config, &obj.date, Some(channel.name), &path_clone)
import_file(&config, &obj.date, Some(channel_name), &path_clone)
})
.await??;
@ -1081,11 +1109,12 @@ async fn import_playlist(
#[get("/program/{id}/")]
#[protect(any("Role::Admin", "Role::User"), ty = "Role")]
async fn get_program(
pool: web::Data<Pool<Sqlite>>,
id: web::Path<i32>,
obj: web::Query<ProgramObj>,
controllers: web::Data<Mutex<ChannelController>>,
) -> Result<impl Responder, ServiceError> {
let (config, _) = playout_config(&pool.clone().into_inner(), &id).await?;
let manager = controllers.lock().unwrap().get(*id).unwrap();
let config = manager.config.lock().unwrap().clone();
let start_sec = config.playlist.start_sec.unwrap();
let mut days = 0;
let mut program = vec![];
@ -1110,14 +1139,13 @@ async fn get_program(
]);
for date in date_range {
let conn = pool.clone().into_inner();
let mut naive = NaiveDateTime::parse_from_str(
&format!("{date} {}", sec_to_time(start_sec)),
"%Y-%m-%d %H:%M:%S%.3f",
)
.unwrap();
let playlist = match read_playlist(&conn, *id, date.clone()).await {
let playlist = match read_playlist(&config, date.clone()).await {
Ok(p) => p,
Err(e) => {
error!("Error in Playlist from {date}: {e}");
@ -1169,10 +1197,11 @@ async fn get_program(
#[get("/system/{id}")]
#[protect(any("Role::Admin", "Role::User"), ty = "Role")]
pub async fn get_system_stat(
pool: web::Data<Pool<Sqlite>>,
id: web::Path<i32>,
controllers: web::Data<Mutex<ChannelController>>,
) -> Result<impl Responder, ServiceError> {
let (config, _) = playout_config(&pool.clone().into_inner(), &id).await?;
let manager = controllers.lock().unwrap().get(*id).unwrap();
let config = manager.config.lock().unwrap().clone();
let stat = web::block(move || system::stat(config)).await?;

View File

@ -108,7 +108,6 @@ pub struct Channel {
pub id: i32,
pub name: String,
pub preview_url: String,
pub config_path: String,
pub extra_extensions: String,
pub active: bool,
pub current_date: Option<String>,

View File

@ -72,6 +72,11 @@ impl ChannelManager {
is_alive: Arc::new(AtomicBool::new(channel.active)),
channel: Arc::new(Mutex::new(channel)),
config: Arc::new(Mutex::new(config)),
current_media: Arc::new(Mutex::new(None)),
current_list: Arc::new(Mutex::new(vec![Media::new(0, "", false)])),
filler_list: Arc::new(Mutex::new(vec![])),
current_index: Arc::new(AtomicUsize::new(0)),
filler_index: Arc::new(AtomicUsize::new(0)),
..Default::default()
}
}
@ -178,62 +183,6 @@ impl ChannelManager {
}
}
/// Global player control, to get infos about current clip etc.
#[derive(Clone, Debug)]
pub struct PlayerControl {
pub current_media: Arc<Mutex<Option<Media>>>,
pub current_list: Arc<Mutex<Vec<Media>>>,
pub filler_list: Arc<Mutex<Vec<Media>>>,
pub current_index: Arc<AtomicUsize>,
pub filler_index: Arc<AtomicUsize>,
}
impl PlayerControl {
pub fn new() -> Self {
Self {
current_media: Arc::new(Mutex::new(None)),
current_list: Arc::new(Mutex::new(vec![Media::new(0, "", false)])),
filler_list: Arc::new(Mutex::new(vec![])),
current_index: Arc::new(AtomicUsize::new(0)),
filler_index: Arc::new(AtomicUsize::new(0)),
}
}
}
impl Default for PlayerControl {
fn default() -> Self {
Self::new()
}
}
/// Global playout control, for move forward/backward clip, or resetting playlist/state.
#[derive(Clone, Debug)]
pub struct PlayoutStatus {
pub chain: Option<Arc<Mutex<Vec<String>>>>,
pub current_date: Arc<Mutex<String>>,
pub date: Arc<Mutex<String>>,
pub list_init: Arc<AtomicBool>,
pub time_shift: Arc<Mutex<f64>>,
}
impl PlayoutStatus {
pub fn new() -> Self {
Self {
chain: None,
current_date: Arc::new(Mutex::new(String::new())),
date: Arc::new(Mutex::new(String::new())),
list_init: Arc::new(AtomicBool::new(true)),
time_shift: Arc::new(Mutex::new(0.0)),
}
}
}
impl Default for PlayoutStatus {
fn default() -> Self {
Self::new()
}
}
#[derive(Clone, Debug, Default)]
pub struct ChannelController {
pub channels: Vec<ChannelManager>,
@ -273,22 +222,20 @@ impl ChannelController {
}
}
pub fn start(db_pool: Pool<Sqlite>, channel: ChannelManager) -> Result<(), ProcessError> {
let config = channel.config.lock()?.clone();
pub fn start(db_pool: Pool<Sqlite>, manager: ChannelManager) -> Result<(), ProcessError> {
let config = manager.config.lock()?.clone();
let mode = config.output.mode.clone();
let play_control = PlayerControl::new();
let play_control_clone = play_control.clone();
let play_status = PlayoutStatus::new();
let filler_list = manager.filler_list.clone();
// Fill filler list, can also be a single file.
thread::spawn(move || {
fill_filler_list(&config, Some(play_control_clone));
fill_filler_list(&config, Some(filler_list));
});
match mode {
// write files/playlist to HLS m3u8 playlist
HLS => write_hls(channel, db_pool, play_control, play_status),
HLS => write_hls(manager, db_pool),
// play on desktop or stream to a remote target
_ => player(channel, db_pool, &play_control, play_status),
_ => player(manager, db_pool),
}
}

View File

@ -1,7 +1,4 @@
use std::{
sync::{atomic::AtomicBool, Arc},
thread,
};
use std::thread;
use simplelog::*;
use sqlx::{Pool, Sqlite};
@ -15,19 +12,22 @@ pub use ingest::ingest_server;
pub use playlist::CurrentProgram;
use crate::player::{
controller::{PlayerControl, PlayoutStatus},
controller::ChannelManager,
utils::{folder::FolderSource, Media},
};
use crate::utils::config::{PlayoutConfig, ProcessMode::*};
use crate::utils::config::ProcessMode::*;
/// Create a source iterator from playlist, or from folder.
pub fn source_generator(
config: PlayoutConfig,
manager: ChannelManager,
db_pool: Pool<Sqlite>,
player_control: &PlayerControl,
playout_stat: PlayoutStatus,
is_terminated: Arc<AtomicBool>,
) -> Box<dyn Iterator<Item = Media>> {
let config = manager.config.lock().unwrap().clone();
let is_terminated = manager.is_terminated.clone();
let chain = manager.chain.clone();
let current_list = manager.current_list.clone();
let current_index = manager.current_index.clone();
match config.processing.mode {
Folder => {
info!("Playout in folder mode");
@ -37,23 +37,18 @@ pub fn source_generator(
);
let config_clone = config.clone();
let folder_source = FolderSource::new(&config, playout_stat.chain, player_control);
let node_clone = folder_source.player_control.current_list.clone();
let folder_source =
FolderSource::new(&config, chain, current_list.clone(), current_index);
let list_clone = current_list.clone();
// Spawn a thread to monitor folder for file changes.
thread::spawn(move || watchman(config_clone, is_terminated.clone(), node_clone));
thread::spawn(move || watchman(config_clone, is_terminated.clone(), list_clone));
Box::new(folder_source) as Box<dyn Iterator<Item = Media>>
}
Playlist => {
info!("Playout in playlist mode");
let program = CurrentProgram::new(
&config,
db_pool,
playout_stat,
is_terminated,
player_control,
);
let program = CurrentProgram::new(manager, db_pool);
Box::new(program) as Box<dyn Iterator<Item = Media>>
}

View File

@ -12,7 +12,7 @@ use sqlx::{Pool, Sqlite};
use crate::db::handles;
use crate::player::{
controller::{PlayerControl, PlayoutStatus},
controller::ChannelManager,
utils::{
gen_dummy, get_delta, is_close, is_remote,
json_serializer::{read_json, set_defaults},
@ -27,29 +27,26 @@ use crate::utils::config::{PlayoutConfig, IMAGE_FORMAT};
/// Here we prepare the init clip and build a iterator where we pull our clips.
#[derive(Debug)]
pub struct CurrentProgram {
manager: ChannelManager,
config: PlayoutConfig,
db_pool: Pool<Sqlite>,
start_sec: f64,
end_sec: f64,
json_playlist: JsonPlaylist,
player_control: PlayerControl,
current_node: Media,
is_terminated: Arc<AtomicBool>,
playout_stat: PlayoutStatus,
last_json_path: Option<String>,
last_node_ad: bool,
}
/// Prepare a playlist iterator.
impl CurrentProgram {
pub fn new(
config: &PlayoutConfig,
db_pool: Pool<Sqlite>,
playout_stat: PlayoutStatus,
is_terminated: Arc<AtomicBool>,
player_control: &PlayerControl,
) -> Self {
pub fn new(manager: ChannelManager, db_pool: Pool<Sqlite>) -> Self {
let config = manager.config.lock().unwrap().clone();
let is_terminated = manager.is_terminated.clone();
Self {
manager,
config: config.clone(),
db_pool,
start_sec: config.playlist.start_sec.unwrap(),
@ -58,10 +55,8 @@ impl CurrentProgram {
"1970-01-01".to_string(),
config.playlist.start_sec.unwrap(),
),
player_control: player_control.clone(),
current_node: Media::new(0, "", false),
is_terminated,
playout_stat,
last_json_path: None,
last_node_ad: false,
}
@ -78,7 +73,7 @@ impl CurrentProgram {
&& self.json_playlist.modified != modified_time(&path)
{
info!("Reload playlist <b><magenta>{path}</></b>");
self.playout_stat.list_init.store(true, Ordering::SeqCst);
self.manager.list_init.store(true, Ordering::SeqCst);
get_current = true;
reload = true;
}
@ -89,7 +84,7 @@ impl CurrentProgram {
if get_current {
self.json_playlist = read_json(
&mut self.config,
&self.player_control,
self.manager.current_list.clone(),
self.json_playlist.path.clone(),
self.is_terminated.clone(),
seek,
@ -101,18 +96,27 @@ impl CurrentProgram {
info!("Read playlist: <b><magenta>{file}</></b>");
}
if *self.playout_stat.date.lock().unwrap() != self.json_playlist.date {
if *self
.manager
.channel
.lock()
.unwrap()
.current_date
.clone()
.unwrap_or_default()
!= self.json_playlist.date
{
self.set_status(self.json_playlist.date.clone());
}
self.playout_stat
self.manager
.current_date
.lock()
.unwrap()
.clone_from(&self.json_playlist.date);
}
self.player_control
self.manager
.current_list
.lock()
.unwrap()
@ -122,8 +126,8 @@ impl CurrentProgram {
trace!("missing playlist");
self.current_node = Media::new(0, "", false);
self.playout_stat.list_init.store(true, Ordering::SeqCst);
self.player_control.current_index.store(0, Ordering::SeqCst);
self.manager.list_init.store(true, Ordering::SeqCst);
self.manager.current_index.store(0, Ordering::SeqCst);
}
}
}
@ -145,9 +149,7 @@ impl CurrentProgram {
let mut next_start =
self.current_node.begin.unwrap_or_default() - self.start_sec + duration + delta;
if node_index > 0
&& node_index == self.player_control.current_list.lock().unwrap().len() - 1
{
if node_index > 0 && node_index == self.manager.current_list.lock().unwrap().len() - 1 {
next_start += self.config.general.stop_threshold;
}
@ -168,7 +170,7 @@ impl CurrentProgram {
self.json_playlist = read_json(
&mut self.config,
&self.player_control,
self.manager.current_list.clone(),
None,
self.is_terminated.clone(),
false,
@ -179,15 +181,15 @@ impl CurrentProgram {
info!("Read next playlist: <b><magenta>{file}</></b>");
}
self.playout_stat.list_init.store(false, Ordering::SeqCst);
self.manager.list_init.store(false, Ordering::SeqCst);
self.set_status(self.json_playlist.date.clone());
self.player_control
self.manager
.current_list
.lock()
.unwrap()
.clone_from(&self.json_playlist.program);
self.player_control.current_index.store(0, Ordering::SeqCst);
self.manager.current_index.store(0, Ordering::SeqCst);
} else {
self.load_or_update_playlist(seek)
}
@ -196,18 +198,20 @@ impl CurrentProgram {
}
fn set_status(&mut self, date: String) {
if *self.playout_stat.date.lock().unwrap() != date
&& *self.playout_stat.time_shift.lock().unwrap() != 0.0
if self.manager.channel.lock().unwrap().current_date != Some(date.clone())
&& self.manager.channel.lock().unwrap().time_shift != 0.0
{
info!("Reset playout status");
}
self.playout_stat
.current_date
self.manager.current_date.lock().unwrap().clone_from(&date);
self.manager
.channel
.lock()
.unwrap()
.clone_from(&date);
*self.playout_stat.time_shift.lock().unwrap() = 0.0;
.current_date
.clone_from(&Some(date.clone()));
self.manager.channel.lock().unwrap().time_shift = 0.0;
if let Err(e) = executor::block_on(handles::update_stat(
&self.db_pool,
@ -221,8 +225,8 @@ impl CurrentProgram {
// Check if last and/or next clip is a advertisement.
fn last_next_ad(&mut self, node: &mut Media) {
let index = self.player_control.current_index.load(Ordering::SeqCst);
let current_list = self.player_control.current_list.lock().unwrap();
let index = self.manager.current_index.load(Ordering::SeqCst);
let current_list = self.manager.current_list.lock().unwrap();
if index + 1 < current_list.len() && &current_list[index + 1].category == "advertisement" {
node.next_ad = true;
@ -251,7 +255,7 @@ impl CurrentProgram {
// On init or reload we need to seek for the current clip.
fn get_current_clip(&mut self) {
let mut time_sec = self.get_current_time();
let shift = *self.playout_stat.time_shift.lock().unwrap();
let shift = self.manager.channel.lock().unwrap().time_shift.clone();
if shift != 0.0 {
info!("Shift playlist start for <yellow>{shift:.3}</> seconds");
@ -265,17 +269,10 @@ impl CurrentProgram {
self.recalculate_begin(true)
}
for (i, item) in self
.player_control
.current_list
.lock()
.unwrap()
.iter()
.enumerate()
{
for (i, item) in self.manager.current_list.lock().unwrap().iter().enumerate() {
if item.begin.unwrap() + item.out - item.seek > time_sec {
self.playout_stat.list_init.store(false, Ordering::SeqCst);
self.player_control.current_index.store(i, Ordering::SeqCst);
self.manager.list_init.store(false, Ordering::SeqCst);
self.manager.current_index.store(i, Ordering::SeqCst);
break;
}
@ -288,10 +285,10 @@ impl CurrentProgram {
self.get_current_clip();
let mut is_filler = false;
if !self.playout_stat.list_init.load(Ordering::SeqCst) {
if !self.manager.list_init.load(Ordering::SeqCst) {
let time_sec = self.get_current_time();
let index = self.player_control.current_index.load(Ordering::SeqCst);
let nodes = self.player_control.current_list.lock().unwrap();
let index = self.manager.current_index.load(Ordering::SeqCst);
let nodes = self.manager.current_list.lock().unwrap();
let last_index = nodes.len() - 1;
// de-instance node to preserve original values in list
@ -303,13 +300,11 @@ impl CurrentProgram {
trace!("Clip from init: {}", node_clone.source);
node_clone.seek += time_sec
- (node_clone.begin.unwrap() - *self.playout_stat.time_shift.lock().unwrap());
- (node_clone.begin.unwrap() - self.manager.channel.lock().unwrap().time_shift);
self.last_next_ad(&mut node_clone);
self.player_control
.current_index
.fetch_add(1, Ordering::SeqCst);
self.manager.current_index.fetch_add(1, Ordering::SeqCst);
self.current_node = handle_list_init(
&self.config,
@ -334,7 +329,7 @@ impl CurrentProgram {
fn fill_end(&mut self, total_delta: f64) {
// Fill end from playlist
let index = self.player_control.current_index.load(Ordering::SeqCst);
let index = self.manager.current_index.load(Ordering::SeqCst);
let mut media = Media::new(index, "", false);
media.begin = Some(time_in_seconds());
media.duration = total_delta;

View File

@ -28,11 +28,11 @@ use std::{
use log::*;
use sqlx::{Pool, Sqlite};
use crate::utils::{config::PlayoutConfig, logging::log_line, task_runner};
use crate::utils::{logging::log_line, task_runner};
use crate::vec_strings;
use crate::{
player::{
controller::{ChannelManager, PlayerControl, PlayoutStatus, ProcessUnit::*},
controller::{ChannelManager, ProcessUnit::*},
input::source_generator,
utils::{
get_delta, prepare_output_cmd, sec_to_time, stderr_reader, test_tcp_port, valid_stream,
@ -43,20 +43,18 @@ use crate::{
};
/// Ingest Server for HLS
fn ingest_to_hls_server(
config: PlayoutConfig,
playout_stat: PlayoutStatus,
channel_mgr: ChannelManager,
) -> Result<(), ProcessError> {
let playlist_init = playout_stat.list_init;
fn ingest_to_hls_server(manager: ChannelManager) -> Result<(), ProcessError> {
let config = manager.config.lock().unwrap();
let playlist_init = manager.list_init.clone();
let chain = manager.chain.clone();
let mut server_prefix = vec_strings!["-hide_banner", "-nostats", "-v", "level+info"];
let stream_input = config.ingest.input_cmd.clone().unwrap();
let mut dummy_media = Media::new(0, "Live Stream", false);
dummy_media.unit = Ingest;
let is_terminated = channel_mgr.is_terminated.clone();
let ingest_is_running = channel_mgr.ingest_is_running.clone();
let is_terminated = manager.is_terminated.clone();
let ingest_is_running = manager.ingest_is_running.clone();
if let Some(ingest_input_cmd) = &config.advanced.ingest.input_cmd {
server_prefix.append(&mut ingest_input_cmd.clone());
@ -68,15 +66,18 @@ fn ingest_to_hls_server(
if let Some(url) = stream_input.iter().find(|s| s.contains("://")) {
if !test_tcp_port(url) {
channel_mgr.stop_all();
manager.stop_all();
exit(1);
}
info!("Start ingest server, listening on: <b><magenta>{url}</></b>");
};
drop(config);
loop {
dummy_media.add_filter(&config, &playout_stat.chain);
let config = manager.config.lock().unwrap().clone();
dummy_media.add_filter(&config, &chain);
let server_cmd = prepare_output_cmd(&config, server_prefix.clone(), &dummy_media.filter);
debug!(
@ -84,7 +85,7 @@ fn ingest_to_hls_server(
server_cmd.join(" ")
);
let proc_ctl = channel_mgr.clone();
let proc_ctl = manager.clone();
let mut server_proc = match Command::new("ffmpeg")
.args(server_cmd.clone())
.stderr(Stdio::piped())
@ -98,7 +99,7 @@ fn ingest_to_hls_server(
};
let server_err = BufReader::new(server_proc.stderr.take().unwrap());
*channel_mgr.ingest.lock().unwrap() = Some(server_proc);
*manager.ingest.lock().unwrap() = Some(server_proc);
is_running = false;
for line in server_err.lines() {
@ -117,7 +118,7 @@ fn ingest_to_hls_server(
info!("Switch from {} to live ingest", config.processing.mode);
if let Err(e) = channel_mgr.stop(Decoder) {
if let Err(e) = manager.stop(Decoder) {
error!("{e}");
}
}
@ -131,7 +132,7 @@ fn ingest_to_hls_server(
ingest_is_running.store(false, Ordering::SeqCst);
if let Err(e) = channel_mgr.wait(Ingest) {
if let Err(e) = manager.wait(Ingest) {
error!("{e}")
}
@ -146,35 +147,24 @@ fn ingest_to_hls_server(
/// HLS Writer
///
/// Write with single ffmpeg instance directly to a HLS playlist.
pub fn write_hls(
channel_mgr: ChannelManager,
db_pool: Pool<Sqlite>,
player_control: PlayerControl,
playout_stat: PlayoutStatus,
) -> Result<(), ProcessError> {
let config = channel_mgr.config.lock()?.clone();
let config_clone = config.clone();
let ff_log_format = format!("level+{}", config.logging.ffmpeg_level.to_lowercase());
let play_stat = playout_stat.clone();
let channel_mgr_2 = channel_mgr.clone();
let is_terminated = channel_mgr.is_terminated.clone();
let ingest_is_running = channel_mgr.ingest_is_running.clone();
pub fn write_hls(manager: ChannelManager, db_pool: Pool<Sqlite>) -> Result<(), ProcessError> {
let config = manager.config.lock()?.clone();
let current_media = manager.current_media.clone();
let get_source = source_generator(
config.clone(),
db_pool,
&player_control,
playout_stat,
is_terminated.clone(),
);
let ff_log_format = format!("level+{}", config.logging.ffmpeg_level.to_lowercase());
let channel_mgr_2 = manager.clone();
let ingest_is_running = manager.ingest_is_running.clone();
let get_source = source_generator(manager.clone(), db_pool);
// spawn a thread for ffmpeg ingest server and create a channel for package sending
if config.ingest.enable {
thread::spawn(move || ingest_to_hls_server(config_clone, play_stat, channel_mgr_2));
thread::spawn(move || ingest_to_hls_server(channel_mgr_2));
}
for node in get_source {
*player_control.current_media.lock().unwrap() = Some(node.clone());
*current_media.lock().unwrap() = Some(node.clone());
let ignore = config.logging.ignore_lines.clone();
let mut cmd = match &node.cmd {
@ -194,7 +184,7 @@ pub fn write_hls(
if config.task.enable {
if config.task.path.is_file() {
let channel_mgr_3 = channel_mgr.clone();
let channel_mgr_3 = manager.clone();
thread::spawn(move || task_runner::run(channel_mgr_3));
} else {
@ -250,13 +240,13 @@ pub fn write_hls(
};
let enc_err = BufReader::new(dec_proc.stderr.take().unwrap());
*channel_mgr.decoder.lock().unwrap() = Some(dec_proc);
*manager.decoder.lock().unwrap() = Some(dec_proc);
if let Err(e) = stderr_reader(enc_err, ignore, Decoder, channel_mgr.clone()) {
if let Err(e) = stderr_reader(enc_err, ignore, Decoder, manager.clone()) {
error!("{e:?}")
};
if let Err(e) = channel_mgr.wait(Decoder) {
if let Err(e) = manager.wait(Decoder) {
error!("{e}");
}
@ -267,7 +257,7 @@ pub fn write_hls(
sleep(Duration::from_secs(1));
channel_mgr.stop_all();
manager.stop_all();
Ok(())
}

View File

@ -18,7 +18,7 @@ mod stream;
pub use hls::write_hls;
use crate::player::{
controller::{ChannelManager, PlayerControl, PlayoutStatus, ProcessUnit::*},
controller::{ChannelManager, ProcessUnit::*},
input::{ingest_server, source_generator},
utils::{sec_to_time, stderr_reader},
};
@ -34,31 +34,20 @@ use crate::vec_strings;
/// for getting live feeds.
/// When a live ingest arrive, it stops the current playing and switch to the live source.
/// When ingest stops, it switch back to playlist/folder mode.
pub fn player(
channel_mgr: ChannelManager,
db_pool: Pool<Sqlite>,
play_control: &PlayerControl,
playout_stat: PlayoutStatus,
) -> Result<(), ProcessError> {
let config = channel_mgr.config.lock()?.clone();
pub fn player(manager: ChannelManager, db_pool: Pool<Sqlite>) -> Result<(), ProcessError> {
let config = manager.config.lock()?.clone();
let config_clone = config.clone();
let ff_log_format = format!("level+{}", config.logging.ffmpeg_level.to_lowercase());
let ignore_enc = config.logging.ignore_lines.clone();
let mut buffer = [0; 65088];
let mut live_on = false;
let playlist_init = playout_stat.list_init.clone();
let playlist_init = manager.list_init.clone();
let is_terminated = channel_mgr.is_terminated.clone();
let ingest_is_running = channel_mgr.ingest_is_running.clone();
let is_terminated = manager.is_terminated.clone();
let ingest_is_running = manager.ingest_is_running.clone();
// get source iterator
let node_sources = source_generator(
config.clone(),
db_pool,
play_control,
playout_stat,
is_terminated.clone(),
);
let node_sources = source_generator(manager.clone(), db_pool);
// get ffmpeg output instance
let mut enc_proc = match config.output.mode {
@ -71,14 +60,14 @@ pub fn player(
let mut enc_writer = BufWriter::new(enc_proc.stdin.take().unwrap());
let enc_err = BufReader::new(enc_proc.stderr.take().unwrap());
*channel_mgr.encoder.lock().unwrap() = Some(enc_proc);
let enc_p_ctl = channel_mgr.clone();
*manager.encoder.lock().unwrap() = Some(enc_proc);
let enc_p_ctl = manager.clone();
// spawn a thread to log ffmpeg output error messages
let error_encoder_thread =
thread::spawn(move || stderr_reader(enc_err, ignore_enc, Encoder, enc_p_ctl));
let channel_mgr_2 = channel_mgr.clone();
let channel_mgr_2 = manager.clone();
let mut ingest_receiver = None;
// spawn a thread for ffmpeg ingest server and create a channel for package sending
@ -88,8 +77,12 @@ pub fn player(
thread::spawn(move || ingest_server(config_clone, ingest_sender, channel_mgr_2));
}
drop(config);
'source_iter: for node in node_sources {
*play_control.current_media.lock().unwrap() = Some(node.clone());
let config = manager.config.lock()?.clone();
*manager.current_media.lock().unwrap() = Some(node.clone());
let ignore_dec = config.logging.ignore_lines.clone();
if is_terminated.load(Ordering::SeqCst) {
@ -115,7 +108,7 @@ pub fn player(
format!(
" ({}/{})",
node.index.unwrap() + 1,
play_control.current_list.lock().unwrap().len()
manager.current_list.lock().unwrap().len()
)
} else {
String::new()
@ -130,7 +123,7 @@ pub fn player(
if config.task.enable {
if config.task.path.is_file() {
let channel_mgr_3 = channel_mgr.clone();
let channel_mgr_3 = manager.clone();
thread::spawn(move || task_runner::run(channel_mgr_3));
} else {
@ -180,8 +173,8 @@ pub fn player(
let mut dec_reader = BufReader::new(dec_proc.stdout.take().unwrap());
let dec_err = BufReader::new(dec_proc.stderr.take().unwrap());
*channel_mgr.clone().decoder.lock().unwrap() = Some(dec_proc);
let channel_mgr_c = channel_mgr.clone();
*manager.clone().decoder.lock().unwrap() = Some(dec_proc);
let channel_mgr_c = manager.clone();
let error_decoder_thread =
thread::spawn(move || stderr_reader(dec_err, ignore_dec, Decoder, channel_mgr_c));
@ -192,7 +185,7 @@ pub fn player(
if !live_on {
info!("Switch from {} to live ingest", config.processing.mode);
if let Err(e) = channel_mgr.stop(Decoder) {
if let Err(e) = manager.stop(Decoder) {
error!("{e}")
}
@ -237,7 +230,7 @@ pub fn player(
}
}
if let Err(e) = channel_mgr.wait(Decoder) {
if let Err(e) = manager.wait(Decoder) {
error!("{e}")
}
@ -250,7 +243,7 @@ pub fn player(
sleep(Duration::from_secs(1));
channel_mgr.stop_all();
manager.stop_all();
if let Err(e) = error_encoder_thread.join() {
error!("{e:?}");

View File

@ -1,5 +1,5 @@
use std::sync::{
atomic::Ordering,
atomic::{AtomicUsize, Ordering},
{Arc, Mutex},
};
@ -8,7 +8,6 @@ use rand::{seq::SliceRandom, thread_rng};
use simplelog::*;
use walkdir::WalkDir;
use crate::player::controller::PlayerControl;
use crate::player::utils::{include_file_extension, time_in_seconds, Media, PlayoutConfig};
/// Folder Sources
@ -18,7 +17,8 @@ use crate::player::utils::{include_file_extension, time_in_seconds, Media, Playo
pub struct FolderSource {
config: PlayoutConfig,
filter_chain: Option<Arc<Mutex<Vec<String>>>>,
pub player_control: PlayerControl,
pub current_list: Arc<Mutex<Vec<Media>>>,
pub current_index: Arc<AtomicUsize>,
current_node: Media,
}
@ -26,7 +26,8 @@ impl FolderSource {
pub fn new(
config: &PlayoutConfig,
filter_chain: Option<Arc<Mutex<Vec<String>>>>,
player_control: &PlayerControl,
current_list: Arc<Mutex<Vec<Media>>>,
current_index: Arc<AtomicUsize>,
) -> Self {
let mut path_list = vec![];
let mut media_list = vec![];
@ -77,12 +78,13 @@ impl FolderSource {
index += 1;
}
*player_control.current_list.lock().unwrap() = media_list;
*current_list.lock().unwrap() = media_list;
Self {
config: config.clone(),
filter_chain,
player_control: player_control.clone(),
current_list,
current_index,
current_node: Media::new(0, "", false),
}
}
@ -90,22 +92,24 @@ impl FolderSource {
pub fn from_list(
config: &PlayoutConfig,
filter_chain: Option<Arc<Mutex<Vec<String>>>>,
player_control: &PlayerControl,
current_list: Arc<Mutex<Vec<Media>>>,
current_index: Arc<AtomicUsize>,
list: Vec<Media>,
) -> Self {
*player_control.current_list.lock().unwrap() = list;
*current_list.lock().unwrap() = list;
Self {
config: config.clone(),
filter_chain,
player_control: player_control.clone(),
current_list,
current_index,
current_node: Media::new(0, "", false),
}
}
fn shuffle(&mut self) {
let mut rng = thread_rng();
let mut nodes = self.player_control.current_list.lock().unwrap();
let mut nodes = self.current_list.lock().unwrap();
nodes.shuffle(&mut rng);
@ -115,7 +119,7 @@ impl FolderSource {
}
fn sort(&mut self) {
let mut nodes = self.player_control.current_list.lock().unwrap();
let mut nodes = self.current_list.lock().unwrap();
nodes.sort_by(|d1, d2| d1.source.cmp(&d2.source));
@ -130,19 +134,15 @@ impl Iterator for FolderSource {
type Item = Media;
fn next(&mut self) -> Option<Self::Item> {
if self.player_control.current_index.load(Ordering::SeqCst)
< self.player_control.current_list.lock().unwrap().len()
{
let i = self.player_control.current_index.load(Ordering::SeqCst);
self.current_node = self.player_control.current_list.lock().unwrap()[i].clone();
if self.current_index.load(Ordering::SeqCst) < self.current_list.lock().unwrap().len() {
let i = self.current_index.load(Ordering::SeqCst);
self.current_node = self.current_list.lock().unwrap()[i].clone();
let _ = self.current_node.add_probe(false).ok();
self.current_node
.add_filter(&self.config, &self.filter_chain);
self.current_node.begin = Some(time_in_seconds());
self.player_control
.current_index
.fetch_add(1, Ordering::SeqCst);
self.current_index.fetch_add(1, Ordering::SeqCst);
Some(self.current_node.clone())
} else {
@ -160,13 +160,13 @@ impl Iterator for FolderSource {
self.sort();
}
self.current_node = self.player_control.current_list.lock().unwrap()[0].clone();
self.current_node = self.current_list.lock().unwrap()[0].clone();
let _ = self.current_node.add_probe(false).ok();
self.current_node
.add_filter(&self.config, &self.filter_chain);
self.current_node.begin = Some(time_in_seconds());
self.player_control.current_index.store(1, Ordering::SeqCst);
self.current_index.store(1, Ordering::SeqCst);
Some(self.current_node.clone())
}
@ -175,7 +175,7 @@ impl Iterator for FolderSource {
pub fn fill_filler_list(
config: &PlayoutConfig,
player_control: Option<PlayerControl>,
fillers: Option<Arc<Mutex<Vec<Media>>>>,
) -> Vec<Media> {
let mut filler_list = vec![];
let filler_path = &config.storage.filler;
@ -190,7 +190,7 @@ pub fn fill_filler_list(
{
let mut media = Media::new(index, &entry.path().to_string_lossy(), false);
if player_control.is_none() {
if fillers.is_none() {
if let Err(e) = media.add_probe(false) {
error!("{e:?}");
};
@ -211,13 +211,13 @@ pub fn fill_filler_list(
item.index = Some(index);
}
if let Some(control) = player_control.as_ref() {
control.filler_list.lock().unwrap().clone_from(&filler_list);
if let Some(f) = fillers.as_ref() {
f.lock().unwrap().clone_from(&filler_list);
}
} else if filler_path.is_file() {
let mut media = Media::new(0, &config.storage.filler.to_string_lossy(), false);
if player_control.is_none() {
if fillers.is_none() {
if let Err(e) = media.add_probe(false) {
error!("{e:?}");
};
@ -225,8 +225,8 @@ pub fn fill_filler_list(
filler_list.push(media);
if let Some(control) = player_control.as_ref() {
control.filler_list.lock().unwrap().clone_from(&filler_list);
if let Some(f) = fillers.as_ref() {
f.lock().unwrap().clone_from(&filler_list);
}
}

View File

@ -2,18 +2,15 @@ use serde::{Deserialize, Serialize};
use std::{
fs::File,
path::Path,
sync::{atomic::AtomicBool, Arc},
sync::{atomic::AtomicBool, Arc, Mutex},
thread,
};
use simplelog::*;
use crate::player::{
controller::PlayerControl,
utils::{
get_date, is_remote, json_validate::validate_playlist, modified_time, time_from_header,
Media, PlayoutConfig,
},
use crate::player::utils::{
get_date, is_remote, json_validate::validate_playlist, modified_time, time_from_header, Media,
PlayoutConfig,
};
use crate::utils::config::DUMMY_LEN;
@ -95,14 +92,13 @@ pub fn set_defaults(playlist: &mut JsonPlaylist) {
/// which we need to process.
pub fn read_json(
config: &mut PlayoutConfig,
player_control: &PlayerControl,
current_list: Arc<Mutex<Vec<Media>>>,
path: Option<String>,
is_terminated: Arc<AtomicBool>,
seek: bool,
get_next: bool,
) -> JsonPlaylist {
let config_clone = config.clone();
let control_clone = player_control.clone();
let mut playlist_path = config.playlist.path.clone();
let start_sec = config.playlist.start_sec.unwrap();
let date = get_date(seek, start_sec, get_next);
@ -150,12 +146,7 @@ pub fn read_json(
if !config.general.skip_validation {
thread::spawn(move || {
validate_playlist(
config_clone,
control_clone,
list_clone,
is_terminated,
)
validate_playlist(config_clone, current_list, list_clone, is_terminated)
});
}
@ -194,7 +185,7 @@ pub fn read_json(
if !config.general.skip_validation {
thread::spawn(move || {
validate_playlist(config_clone, control_clone, list_clone, is_terminated)
validate_playlist(config_clone, current_list, list_clone, is_terminated)
});
}

View File

@ -3,7 +3,7 @@ use std::{
process::{Command, Stdio},
sync::{
atomic::{AtomicBool, Ordering},
Arc,
Arc, Mutex,
},
time::Instant,
};
@ -12,9 +12,8 @@ use log::*;
use regex::Regex;
use crate::player::filter::FilterType::Audio;
use crate::player::{
controller::PlayerControl,
utils::{is_close, is_remote, loop_image, sec_to_time, seek_and_length, JsonPlaylist, Media},
use crate::player::utils::{
is_close, is_remote, loop_image, sec_to_time, seek_and_length, JsonPlaylist, Media,
};
use crate::utils::{
config::{OutputMode::Null, PlayoutConfig, FFMPEG_IGNORE_ERRORS, IMAGE_FORMAT},
@ -155,7 +154,7 @@ fn check_media(
/// This function we run in a thread, to don't block the main function.
pub fn validate_playlist(
mut config: PlayoutConfig,
player_control: PlayerControl,
current_list: Arc<Mutex<Vec<Media>>>,
mut playlist: JsonPlaylist,
is_terminated: Arc<AtomicBool>,
) {
@ -206,7 +205,7 @@ pub fn validate_playlist(
sec_to_time(begin),
item.source
)
} else if let Ok(mut list) = player_control.current_list.try_lock() {
} else if let Ok(mut list) = current_list.try_lock() {
// Filter out same item in current playlist, then add the probe to it.
// Check also if duration differs with playlist value, log error if so and adjust that value.
list.iter_mut().filter(|list_item| list_item.source == item.source).for_each(|o| {

View File

@ -1,11 +1,8 @@
use std::fs;
use rand::prelude::*;
use simplelog::*;
use sqlx::{Pool, Sqlite};
use crate::db::{handles, models::Channel};
use crate::utils::{config::PlayoutConfig, errors::ServiceError, playout_config};
use crate::utils::{config::PlayoutConfig, errors::ServiceError};
pub async fn create_channel(
conn: &Pool<Sqlite>,
@ -34,12 +31,10 @@ pub async fn create_channel(
}
pub async fn delete_channel(conn: &Pool<Sqlite>, id: i32) -> Result<(), ServiceError> {
let _channel = handles::select_channel(conn, &id).await?;
let (_config, _) = playout_config(conn, &id).await?;
let channel = handles::select_channel(conn, &id).await?;
// TODO: Remove Channel controller
handles::delete_channel(conn, &id).await?;
handles::delete_channel(conn, &channel.id).await?;
Ok(())
}

View File

@ -189,7 +189,8 @@ pub async fn control_state(
let (delta, _) = get_delta(&config, &media.begin.unwrap_or(0.0));
manager.channel.lock().unwrap().time_shift = delta;
date.clone_from(&current_date);
handles::update_stat(conn, config.general.channel_id, current_date, delta);
handles::update_stat(conn, config.general.channel_id, current_date, delta)
.await?;
data_map.insert("operation".to_string(), json!("move_to_last"));
data_map.insert("shifted_seconds".to_string(), json!(delta));
@ -225,7 +226,8 @@ pub async fn control_state(
let (delta, _) = get_delta(&config, &media.begin.unwrap_or(0.0));
manager.channel.lock().unwrap().time_shift = delta;
date.clone_from(&current_date);
handles::update_stat(conn, config.general.channel_id, current_date, delta);
handles::update_stat(conn, config.general.channel_id, current_date, delta)
.await?;
data_map.insert("operation".to_string(), json!("move_to_next"));
data_map.insert("shifted_seconds".to_string(), json!(delta));
@ -254,7 +256,7 @@ pub async fn control_state(
date.clone_from(&current_date);
manager.list_init.store(true, Ordering::SeqCst);
handles::update_stat(conn, config.general.channel_id, current_date, 0.0);
handles::update_stat(conn, config.general.channel_id, current_date, 0.0).await?;
data_map.insert("operation".to_string(), json!("reset_playout_state"));

View File

@ -11,13 +11,13 @@ use lexical_sort::{natural_lexical_cmp, PathSort};
use rand::{distributions::Alphanumeric, Rng};
use relative_path::RelativePath;
use serde::{Deserialize, Serialize};
use sqlx::{Pool, Sqlite};
use tokio::fs;
use simplelog::*;
use log::*;
use crate::utils::{errors::ServiceError, playout_config};
use ffplayout_lib::utils::{file_extension, MediaProbe};
use crate::db::models::Channel;
use crate::player::utils::{file_extension, MediaProbe};
use crate::utils::{config::PlayoutConfig, errors::ServiceError};
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct PathObject {
@ -128,18 +128,17 @@ pub fn norm_abs_path(
/// Input should be a relative path segment, but when it is a absolut path, the norm_abs_path function
/// will take care, that user can not break out from given storage path in config.
pub async fn browser(
conn: &Pool<Sqlite>,
id: i32,
config: &PlayoutConfig,
channel: &Channel,
path_obj: &PathObject,
) -> Result<PathObject, ServiceError> {
let (config, channel) = playout_config(conn, &id).await?;
let mut channel_extensions = channel
.extra_extensions
.split(',')
.map(|e| e.to_string())
.collect::<Vec<String>>();
let mut parent_folders = vec![];
let mut extensions = config.storage.extensions;
let mut extensions = config.storage.extensions.clone();
extensions.append(&mut channel_extensions);
let (path, parent, path_component) = norm_abs_path(&config.storage.path, &path_obj.source)?;
@ -235,11 +234,9 @@ pub async fn browser(
}
pub async fn create_directory(
conn: &Pool<Sqlite>,
id: i32,
config: &PlayoutConfig,
path_obj: &PathObject,
) -> Result<HttpResponse, ServiceError> {
let (config, _) = playout_config(conn, &id).await?;
let (path, _, _) = norm_abs_path(&config.storage.path, &path_obj.source)?;
if let Err(e) = fs::create_dir_all(&path).await {
@ -306,11 +303,9 @@ async fn rename(source: &PathBuf, target: &PathBuf) -> Result<MoveObject, Servic
}
pub async fn rename_file(
conn: &Pool<Sqlite>,
id: i32,
config: &PlayoutConfig,
move_object: &MoveObject,
) -> Result<MoveObject, ServiceError> {
let (config, _) = playout_config(conn, &id).await?;
let (source_path, _, _) = norm_abs_path(&config.storage.path, &move_object.source)?;
let (mut target_path, _, _) = norm_abs_path(&config.storage.path, &move_object.target)?;
@ -341,11 +336,9 @@ pub async fn rename_file(
}
pub async fn remove_file_or_folder(
conn: &Pool<Sqlite>,
id: i32,
config: &PlayoutConfig,
source_path: &str,
) -> Result<(), ServiceError> {
let (config, _) = playout_config(conn, &id).await?;
let (source, _, _) = norm_abs_path(&config.storage.path, source_path)?;
if !source.exists() {
@ -377,8 +370,7 @@ pub async fn remove_file_or_folder(
Err(ServiceError::InternalServerError)
}
async fn valid_path(conn: &Pool<Sqlite>, id: i32, path: &str) -> Result<PathBuf, ServiceError> {
let (config, _) = playout_config(conn, &id).await?;
async fn valid_path(config: &PlayoutConfig, path: &str) -> Result<PathBuf, ServiceError> {
let (test_path, _, _) = norm_abs_path(&config.storage.path, path)?;
if !test_path.is_dir() {
@ -389,8 +381,7 @@ async fn valid_path(conn: &Pool<Sqlite>, id: i32, path: &str) -> Result<PathBuf,
}
pub async fn upload(
conn: &Pool<Sqlite>,
id: i32,
config: &PlayoutConfig,
_size: u64,
mut payload: Multipart,
path: &Path,
@ -411,7 +402,7 @@ pub async fn upload(
let filepath = if abs_path {
path.to_path_buf()
} else {
valid_path(conn, id, &path.to_string_lossy())
valid_path(&config, &path.to_string_lossy())
.await?
.join(filename)
};

View File

@ -16,14 +16,11 @@ use rand::{seq::SliceRandom, thread_rng, Rng};
use simplelog::*;
use walkdir::WalkDir;
use crate::player::{
controller::PlayerControl,
utils::{
folder::{fill_filler_list, FolderSource},
gen_dummy, get_date_range, include_file_extension,
json_serializer::JsonPlaylist,
sum_durations, Media,
},
use crate::player::utils::{
folder::{fill_filler_list, FolderSource},
gen_dummy, get_date_range, include_file_extension,
json_serializer::JsonPlaylist,
sum_durations, Media,
};
use crate::utils::{
config::{PlayoutConfig, Template},

View File

@ -2,8 +2,8 @@ use std::{
env,
error::Error,
fmt,
fs::{self, metadata, File},
io::{stdin, stdout, Read, Write},
fs::{self, metadata},
io::{stdin, stdout, Write},
net::TcpListener,
path::{Path, PathBuf},
str::FromStr,
@ -40,11 +40,11 @@ pub mod task_runner;
use crate::db::{
db_pool,
handles::{insert_user, select_channel, select_global},
models::{Channel, User},
handles::{insert_user, select_global},
models::User,
};
use crate::player::utils::time_to_sec;
use crate::utils::{config::PlayoutConfig, errors::ServiceError, logging::log_file_path};
use crate::utils::{errors::ServiceError, logging::log_file_path};
#[derive(Clone, Debug, Eq, Hash, PartialEq, Serialize, Deserialize)]
pub enum Role {
@ -396,35 +396,6 @@ pub async fn run_args() -> Result<(), i32> {
Ok(())
}
pub fn read_playout_config(path: &str) -> Result<PlayoutConfig, Box<dyn Error>> {
let mut file = File::open(path)?;
let mut contents = String::new();
file.read_to_string(&mut contents)?;
let mut config: PlayoutConfig = toml_edit::de::from_str(&contents)?;
config.playlist.start_sec = Some(time_to_sec(&config.playlist.day_start));
config.playlist.length_sec = Some(time_to_sec(&config.playlist.length));
Ok(config)
}
pub async fn playout_config(
conn: &Pool<Sqlite>,
channel_id: &i32,
) -> Result<(PlayoutConfig, Channel), ServiceError> {
if let Ok(channel) = select_channel(conn, channel_id).await {
match read_playout_config(&channel.config_path.clone()) {
Ok(config) => return Ok((config, channel)),
Err(e) => error!("{e}"),
}
}
Err(ServiceError::BadRequest(
"Error in getting config!".to_string(),
))
}
pub async fn read_log_file(channel_id: &i32, date: &str) -> Result<String, ServiceError> {
let mut date_str = "".to_string();

View File

@ -1,20 +1,17 @@
use std::{fs, path::PathBuf};
use simplelog::*;
use sqlx::{Pool, Sqlite};
use log::*;
use crate::player::utils::{json_reader, json_writer, JsonPlaylist};
use crate::utils::{
config::PlayoutConfig, errors::ServiceError, files::norm_abs_path,
generator::generate_playlist as playlist_generator, playout_config,
generator::generate_playlist as playlist_generator,
};
pub async fn read_playlist(
conn: &Pool<Sqlite>,
id: i32,
config: &PlayoutConfig,
date: String,
) -> Result<JsonPlaylist, ServiceError> {
let (config, _) = playout_config(conn, &id).await?;
let (path, _, _) = norm_abs_path(&config.playlist.path, "")?;
let mut playlist_path = path;
let d: Vec<&str> = date.split('-').collect();
@ -31,13 +28,11 @@ pub async fn read_playlist(
}
pub async fn write_playlist(
conn: &Pool<Sqlite>,
id: i32,
config: &PlayoutConfig,
json_data: JsonPlaylist,
) -> Result<String, ServiceError> {
let (config, _) = playout_config(conn, &id).await?;
let date = json_data.date.clone();
let mut playlist_path = config.playlist.path;
let mut playlist_path = config.playlist.path.clone();
let d: Vec<&str> = date.split('-').collect();
if !playlist_path
@ -125,12 +120,7 @@ pub async fn generate_playlist(
}
}
pub async fn delete_playlist(
conn: &Pool<Sqlite>,
id: i32,
date: &str,
) -> Result<String, ServiceError> {
let (config, _) = playout_config(conn, &id).await?;
pub async fn delete_playlist(config: &PlayoutConfig, date: &str) -> Result<String, ServiceError> {
let mut playlist_path = PathBuf::from(&config.playlist.path);
let d: Vec<&str> = date.split('-').collect();
playlist_path = playlist_path

View File

@ -63,7 +63,7 @@ CREATE TABLE configurations (
starttls INTEGER NOT NULL DEFAULT 0,
mail_level TEXT NOT NULL DEFAULT "ERROR",
interval INTEGER NOT NULL DEFAULT 120,
log_help TEXT NOT NULL DEFAULT "If 'log_to_file' is true, log to file, when is false log to console. \n'backup_count' says how long log files will be saved in days.\n'local_time' to false will set log timestamps to UTC. Path to /var/log/ only if you run this program as daemon.\n'level' can be DEBUG, INFO, WARNING, ERROR.\n'ffmpeg_level/ingest_level' can be INFO, WARNING, ERROR.\n'detect_silence' logs an error message if the audio line is silent for 15 seconds during the validation process.\n'ignore_lines' makes logging to ignore strings that contains matched lines, in frontend is a semicolon separated list.",
logging_help TEXT NOT NULL DEFAULT "If 'log_to_file' is true, log to file, when is false log to console. \n'backup_count' says how long log files will be saved in days.\n'local_time' to false will set log timestamps to UTC. Path to /var/log/ only if you run this program as daemon.\n'level' can be DEBUG, INFO, WARNING, ERROR.\n'ffmpeg_level/ingest_level' can be INFO, WARNING, ERROR.\n'detect_silence' logs an error message if the audio line is silent for 15 seconds during the validation process.\n'ignore_lines' makes logging to ignore strings that contains matched lines, in frontend is a semicolon separated list.",
log_to_file INTEGER NOT NULL DEFAULT 1,
backup_count INTEGER NOT NULL DEFAULT 7,
local_time INTEGER NOT NULL DEFAULT 1,