work on "piggyback" mode

This commit is contained in:
jb-alvarado 2023-02-20 17:28:33 +01:00
parent d9952c88fc
commit 7e5a391e3d
8 changed files with 199 additions and 37 deletions

14
Cargo.lock generated
View File

@ -971,7 +971,7 @@ dependencies = [
[[package]]
name = "ffplayout"
version = "0.17.0-beta4"
version = "0.17.0-beta5"
dependencies = [
"chrono",
"clap",
@ -991,7 +991,7 @@ dependencies = [
[[package]]
name = "ffplayout-api"
version = "0.17.0-beta4"
version = "0.17.0-beta5"
dependencies = [
"actix-files",
"actix-multipart",
@ -1023,7 +1023,7 @@ dependencies = [
[[package]]
name = "ffplayout-lib"
version = "0.17.0-beta4"
version = "0.17.0-beta5"
dependencies = [
"chrono",
"crossbeam-channel",
@ -2549,8 +2549,7 @@ dependencies = [
[[package]]
name = "serial_test"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "538c30747ae860d6fb88330addbbd3e0ddbe46d662d032855596d8a8ca260611"
source = "git+https://github.com/palfrey/serial_test.git?branch=remove-ignore-ignore#fc0497c8e8f08c2fc5c50ffab40a8f51d7c1effe"
dependencies = [
"dashmap",
"futures",
@ -2563,8 +2562,7 @@ dependencies = [
[[package]]
name = "serial_test_derive"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "079a83df15f85d89a68d64ae1238f142f172b1fa915d0d76b26a7cba1b659a69"
source = "git+https://github.com/palfrey/serial_test.git?branch=remove-ignore-ignore#fc0497c8e8f08c2fc5c50ffab40a8f51d7c1effe"
dependencies = [
"proc-macro2",
"quote",
@ -2825,7 +2823,7 @@ dependencies = [
[[package]]
name = "tests"
version = "0.17.0-beta4"
version = "0.17.0-beta5"
dependencies = [
"chrono",
"crossbeam-channel",

View File

@ -3,7 +3,7 @@ members = ["ffplayout-api", "ffplayout-engine", "lib", "tests"]
default-members = ["ffplayout-api", "ffplayout-engine", "tests"]
[workspace.package]
version = "0.17.0-beta4"
version = "0.17.0-beta5"
license = "GPL-3.0"
repository = "https://github.com/ffplayout/ffplayout"
authors = ["Jonathan Baecker <jonbae77@gmail.com>"]

View File

@ -23,14 +23,13 @@ use serde::{Deserialize, Serialize};
use simplelog::*;
use sqlx::{Pool, Sqlite};
use crate::auth::{create_jwt, Claims};
use crate::db::{
handles,
models::{Channel, LoginUser, TextPreset, User},
};
use crate::utils::{
channels::{create_channel, delete_channel},
control::{control_service, control_state, media_info, send_message, Process},
control::{control_service, control_state, media_info, send_message, ControlParams, Process},
errors::ServiceError,
files::{
browser, create_directory, norm_abs_path, remove_file_or_folder, rename_file, upload,
@ -40,6 +39,10 @@ use crate::utils::{
playlist::{delete_playlist, generate_playlist, read_playlist, write_playlist},
playout_config, read_log_file, read_playout_config, Role,
};
use crate::{
auth::{create_jwt, Claims},
utils::control::ProcessControl,
};
use ffplayout_lib::{
utils::{
get_date_range, import::import_file, sec_to_time, time_to_sec, JsonPlaylist, PlayoutConfig,
@ -586,9 +589,9 @@ pub async fn send_text_message(
pub async fn control_playout(
pool: web::Data<Pool<Sqlite>>,
id: web::Path<i32>,
control: web::Json<Process>,
control: web::Json<ControlParams>,
) -> Result<impl Responder, ServiceError> {
match control_state(&pool.into_inner(), *id, control.command.clone()).await {
match control_state(&pool.into_inner(), *id, &control.control).await {
Ok(res) => Ok(res.text().await.unwrap_or_else(|_| "Success".into())),
Err(e) => Err(e),
}
@ -690,8 +693,9 @@ pub async fn process_control(
pool: web::Data<Pool<Sqlite>>,
id: web::Path<i32>,
proc: web::Json<Process>,
engine_process: web::Data<ProcessControl>,
) -> Result<impl Responder, ServiceError> {
control_service(&pool.into_inner(), *id, &proc.command).await
control_service(&pool.into_inner(), *id, &proc.command, Some(engine_process)).await
}
/// #### ffplayout Playlist Operations

View File

@ -25,7 +25,7 @@ use api::{
},
};
use db::{db_pool, models::LoginUser};
use utils::{args_parse::Args, db_path, init_config, run_args, Role};
use utils::{args_parse::Args, control::ProcessControl, db_path, init_config, run_args, Role};
use ffplayout_lib::utils::{init_logging, PlayoutConfig};
@ -94,9 +94,11 @@ async fn main() -> std::io::Result<()> {
HttpServer::new(move || {
let auth = HttpAuthentication::bearer(validator);
let db_pool = web::Data::new(pool.clone());
let engine_process = web::Data::new(ProcessControl::new());
App::new()
.app_data(db_pool)
.app_data(engine_process)
.wrap(middleware::Logger::default())
.service(login)
.service(

View File

@ -3,7 +3,10 @@ use std::fs;
use simplelog::*;
use sqlx::{Pool, Sqlite};
use crate::utils::{control::control_service, errors::ServiceError};
use crate::utils::{
control::{control_service, ServiceCmd},
errors::ServiceError,
};
use crate::db::{handles, models::Channel};
@ -25,15 +28,15 @@ pub async fn create_channel(
)?;
let new_channel = handles::insert_channel(conn, target_channel).await?;
control_service(conn, new_channel.id, "enable").await?;
control_service(conn, new_channel.id, &ServiceCmd::Enable, None).await?;
Ok(new_channel)
}
pub async fn delete_channel(conn: &Pool<Sqlite>, id: i32) -> Result<(), ServiceError> {
let channel = handles::select_channel(conn, &id).await?;
control_service(conn, channel.id, "stop").await?;
control_service(conn, channel.id, "disable").await?;
control_service(conn, channel.id, &ServiceCmd::Stop, None).await?;
control_service(conn, channel.id, &ServiceCmd::Disable, None).await?;
if let Err(e) = fs::remove_file(channel.config_path) {
error!("{e}");

View File

@ -1,5 +1,16 @@
use std::{collections::HashMap, process::Command};
use std::{
collections::HashMap,
env, fmt,
process::Child,
process::Command,
str::FromStr,
sync::{
atomic::{AtomicBool, Ordering},
Mutex,
},
};
use actix_web::web;
use reqwest::{
header::{HeaderMap, AUTHORIZATION, CONTENT_TYPE},
Client, Response,
@ -26,8 +37,8 @@ struct TextParams {
}
#[derive(Debug, Deserialize, Serialize, Clone)]
struct ControlParams {
control: String,
pub struct ControlParams {
pub control: String,
}
#[derive(Debug, Deserialize, Serialize, Clone)]
@ -46,9 +57,132 @@ impl<T> RpcObj<T> {
}
}
/// ffplayout engine process
///
/// When running not on Linux, or with environment variable `PIGGYBACK_MODE=true`,
/// the engine get startet and conntrolled from ffpapi
pub struct ProcessControl {
pub engine_child: Mutex<Option<Child>>,
pub is_running: AtomicBool,
}
impl ProcessControl {
pub fn new() -> Self {
Self {
engine_child: Mutex::new(None),
is_running: AtomicBool::new(false),
}
}
}
impl ProcessControl {
pub fn start(&self) -> Result<String, ServiceError> {
match Command::new("ffplayout").spawn() {
Ok(proc) => *self.engine_child.lock().unwrap() = Some(proc),
Err(_) => return Err(ServiceError::InternalServerError),
};
self.is_running.store(true, Ordering::SeqCst);
Ok("Success".to_string())
}
pub fn kill(&self) -> Result<String, ServiceError> {
if let Some(proc) = self.engine_child.lock().unwrap().as_mut() {
if proc.kill().is_err() {
return Err(ServiceError::InternalServerError);
};
}
self.wait()?;
self.is_running.store(false, Ordering::SeqCst);
Ok("Success".to_string())
}
pub fn restart(&self) -> Result<String, ServiceError> {
self.kill()?;
match Command::new("ffplayout").spawn() {
Ok(proc) => *self.engine_child.lock().unwrap() = Some(proc),
Err(_) => return Err(ServiceError::InternalServerError),
};
self.is_running.store(true, Ordering::SeqCst);
Ok("Success".to_string())
}
/// Wait for process to proper close.
/// This prevents orphaned/zombi processes in system
pub fn wait(&self) -> Result<String, ServiceError> {
if let Some(proc) = self.engine_child.lock().unwrap().as_mut() {
if proc.wait().is_err() {
return Err(ServiceError::InternalServerError);
};
}
Ok("Success".to_string())
}
pub fn status(&self) -> Result<String, ServiceError> {
if self.is_running.load(Ordering::SeqCst) {
Ok("active".to_string())
} else {
Ok("not running".to_string())
}
}
}
impl Default for ProcessControl {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq)]
#[serde(rename_all = "snake_case")]
pub enum ServiceCmd {
Enable,
Disable,
Start,
Stop,
Restart,
Status,
}
impl FromStr for ServiceCmd {
type Err = String;
fn from_str(input: &str) -> Result<Self, Self::Err> {
match input.to_lowercase().as_str() {
"enable" => Ok(Self::Enable),
"disable" => Ok(Self::Disable),
"start" => Ok(Self::Start),
"stop" => Ok(Self::Stop),
"restart" => Ok(Self::Restart),
"status" => Ok(Self::Status),
_ => Err(format!("Command '{input}' not found!")),
}
}
}
impl fmt::Display for ServiceCmd {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
Self::Enable => write!(f, "enable"),
Self::Disable => write!(f, "disable"),
Self::Start => write!(f, "start"),
Self::Stop => write!(f, "stop"),
Self::Restart => write!(f, "restart"),
Self::Status => write!(f, "status"),
}
}
}
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct Process {
pub command: String,
pub command: ServiceCmd,
}
struct SystemD {
@ -175,9 +309,15 @@ pub async fn send_message(
pub async fn control_state(
conn: &Pool<Sqlite>,
id: i32,
command: String,
command: &str,
) -> Result<Response, ServiceError> {
let json_obj = RpcObj::new(id, "player".into(), ControlParams { control: command });
let json_obj = RpcObj::new(
id,
"player".into(),
ControlParams {
control: command.to_owned(),
},
);
post_request(conn, id, json_obj).await
}
@ -195,17 +335,32 @@ pub async fn media_info(
pub async fn control_service(
conn: &Pool<Sqlite>,
id: i32,
command: &str,
command: &ServiceCmd,
engine: Option<web::Data<ProcessControl>>,
) -> Result<String, ServiceError> {
let piggyback = env::var("PIGGYBACK_MODE");
if (env::consts::OS != "linux" || piggyback.is_ok()) && engine.is_some() {
match command {
ServiceCmd::Start => engine.unwrap().start(),
ServiceCmd::Stop => engine.unwrap().kill(),
ServiceCmd::Restart => engine.unwrap().restart(),
ServiceCmd::Status => engine.unwrap().status(),
_ => Err(ServiceError::Conflict(
"Engine runs in piggyback mode, in this mode this command is not allowed."
.to_string(),
)),
}
} else {
let system_d = SystemD::new(conn, id).await?;
match command {
"enable" => system_d.enable(),
"disable" => system_d.disable(),
"start" => system_d.start(),
"stop" => system_d.stop(),
"restart" => system_d.restart(),
"status" => system_d.status(),
_ => Err(ServiceError::BadRequest("Command not found!".to_string())),
ServiceCmd::Enable => system_d.enable(),
ServiceCmd::Disable => system_d.disable(),
ServiceCmd::Start => system_d.start(),
ServiceCmd::Stop => system_d.stop(),
ServiceCmd::Restart => system_d.restart(),
ServiceCmd::Status => system_d.status(),
}
}
}

@ -1 +1 @@
Subproject commit 590026119cbeb1c7b166f7f45b6dfa44628f6597
Subproject commit bd7d2a21e3450b2489717b3e37e016864698e8a0

View File

@ -26,7 +26,7 @@ reqwest = { version = "0.11", features = ["blocking", "json"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
serde_yaml = "0.9"
serial_test = "1.0"
serial_test = { git = "https://github.com/palfrey/serial_test.git", branch = "remove-ignore-ignore" }
shlex = "1.1"
simplelog = { version = "^0.12", features = ["paris"] }
time = { version = "0.3", features = ["formatting", "macros"] }