Merge pull request #277 from jb-alvarado/nuxt3-frontend

Piggyback Mode
This commit is contained in:
jb-alvarado 2023-02-21 14:30:57 +01:00 committed by GitHub
commit 6adf43c3d9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 376 additions and 75 deletions

94
Cargo.lock generated
View File

@ -971,7 +971,7 @@ dependencies = [
[[package]]
name = "ffplayout"
version = "0.17.0-beta4"
version = "0.17.0-beta5"
dependencies = [
"chrono",
"clap",
@ -991,7 +991,7 @@ dependencies = [
[[package]]
name = "ffplayout-api"
version = "0.17.0-beta4"
version = "0.17.0-beta5"
dependencies = [
"actix-files",
"actix-multipart",
@ -1019,11 +1019,12 @@ dependencies = [
"serde_yaml",
"simplelog",
"sqlx",
"tokio",
]
[[package]]
name = "ffplayout-lib"
version = "0.17.0-beta4"
version = "0.17.0-beta5"
dependencies = [
"chrono",
"crossbeam-channel",
@ -2415,6 +2416,27 @@ dependencies = [
"semver",
]
[[package]]
name = "rustls"
version = "0.20.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fff78fc74d175294f4e83b28343315ffcfb114b156f0185e9741cb5570f50e2f"
dependencies = [
"log",
"ring",
"sct",
"webpki",
]
[[package]]
name = "rustls-pemfile"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d194b56d58803a43635bdc398cd17e383d6f71f9182b9a192c127ca42494a59b"
dependencies = [
"base64 0.21.0",
]
[[package]]
name = "ryu"
version = "1.0.12"
@ -2461,6 +2483,16 @@ version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ddccb15bcce173023b3fedd9436f882a0739b8dfb45e4f6b6002bee5929f61b2"
[[package]]
name = "sct"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d53dcdb7c9f8158937a7981b48accfd39a43af418591a5d008c7b22b5e1b7ca4"
dependencies = [
"ring",
"untrusted",
]
[[package]]
name = "security-framework"
version = "2.8.2"
@ -2549,8 +2581,7 @@ dependencies = [
[[package]]
name = "serial_test"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "538c30747ae860d6fb88330addbbd3e0ddbe46d662d032855596d8a8ca260611"
source = "git+https://github.com/palfrey/serial_test.git?branch=remove-ignore-ignore#fc0497c8e8f08c2fc5c50ffab40a8f51d7c1effe"
dependencies = [
"dashmap",
"futures",
@ -2563,8 +2594,7 @@ dependencies = [
[[package]]
name = "serial_test_derive"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "079a83df15f85d89a68d64ae1238f142f172b1fa915d0d76b26a7cba1b659a69"
source = "git+https://github.com/palfrey/serial_test.git?branch=remove-ignore-ignore#fc0497c8e8f08c2fc5c50ffab40a8f51d7c1effe"
dependencies = [
"proc-macro2",
"quote",
@ -2726,6 +2756,8 @@ dependencies = [
"once_cell",
"paste",
"percent-encoding",
"rustls",
"rustls-pemfile",
"sha2",
"smallvec",
"sqlformat",
@ -2734,6 +2766,7 @@ dependencies = [
"thiserror",
"tokio-stream",
"url",
"webpki-roots",
]
[[package]]
@ -2761,10 +2794,9 @@ version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "24c5b2d25fa654cc5f841750b8e1cdedbe21189bf9a9382ee90bfa9dd3562396"
dependencies = [
"native-tls",
"once_cell",
"tokio",
"tokio-native-tls",
"tokio-rustls",
]
[[package]]
@ -2825,7 +2857,7 @@ dependencies = [
[[package]]
name = "tests"
version = "0.17.0-beta4"
version = "0.17.0-beta5"
dependencies = [
"chrono",
"crossbeam-channel",
@ -2947,9 +2979,21 @@ dependencies = [
"pin-project-lite",
"signal-hook-registry",
"socket2",
"tokio-macros",
"windows-sys 0.42.0",
]
[[package]]
name = "tokio-macros"
version = "1.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d266c00fde287f55d3f1c3e96c500c362a2b8c695076ec180f27918820bc6df8"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "tokio-native-tls"
version = "0.3.1"
@ -2960,6 +3004,17 @@ dependencies = [
"tokio",
]
[[package]]
name = "tokio-rustls"
version = "0.23.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c43ee83903113e03984cb9e5cebe6c04a5116269e900e3ddba8f068a62adda59"
dependencies = [
"rustls",
"tokio",
"webpki",
]
[[package]]
name = "tokio-stream"
version = "0.1.11"
@ -3255,6 +3310,25 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "webpki"
version = "0.22.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f095d78192e208183081cc07bc5515ef55216397af48b873e5edcd72637fa1bd"
dependencies = [
"ring",
"untrusted",
]
[[package]]
name = "webpki-roots"
version = "0.22.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6c71e40d7d2c34a5106301fb632274ca37242cd0c9d3e64dbece371a40a2d87"
dependencies = [
"webpki",
]
[[package]]
name = "wepoll-ffi"
version = "0.1.2"

View File

@ -3,7 +3,7 @@ members = ["ffplayout-api", "ffplayout-engine", "lib", "tests"]
default-members = ["ffplayout-api", "ffplayout-engine", "tests"]
[workspace.package]
version = "0.17.0-beta4"
version = "0.17.0-beta5"
license = "GPL-3.0"
repository = "https://github.com/ffplayout/ffplayout"
authors = ["Jonathan Baecker <jonbae77@gmail.com>"]

View File

@ -46,11 +46,12 @@ Check the [releases](https://github.com/ffplayout/ffplayout/releases/latest) for
- JSON RPC server, to get information about what is playing and to control it
- [live ingest](/docs/live_ingest.md)
- image source (will loop until out duration is reached)
- extra audio source (experimental *) (has priority over audio from video source)
- extra audio source, has priority over audio from video (experimental *)
- [multiple audio tracks](/docs/multi_audio.md) (experimental *)
- [custom filters](/docs/custom_filters.md) globally in config, or in playlist for specific clips
- import playlist from text or m3u file, with CLI or frontend
- audio only, for radio mode (experimental *)
- [Piggyback Mode](/ffplayout-api/README.md#piggyback-mode), mostly for non Linux systems (experimental *)
For preview stream, read: [/docs/preview_stream.md](/docs/preview_stream.md)

View File

@ -34,7 +34,8 @@ serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
serde_yaml = "0.9"
simplelog = { version = "^0.12", features = ["paris"] }
sqlx = { version = "0.6", features = ["runtime-actix-native-tls", "sqlite"] }
sqlx = { version = "0.6", features = ["runtime-tokio-rustls", "sqlite"] }
tokio = { version = "1.25", features = ["full"] }
[[bin]]
name = "ffpapi"

View File

@ -42,3 +42,22 @@ If you plan to run ffpapi with systemd set permission from **/usr/share/ffplayou
**For possible endpoints read: [api endpoints](/docs/api.md)**
ffpapi can also serve the browser based frontend, just run in your browser `127.0.0.1:8787`.
"Piggyback" Mode
-----
ffplayout was originally planned to run under Linux as a SystemD service. It is also designed so that the engine and ffpapi run completely independently of each other. This is to increase flexibility and stability.
Nevertheless, programs compiled in Rust can basically run on all systems supported by the language. And so this repo also offers binaries for other platforms.
In the past, however, it was only possible under Linux to start/stop/restart the ffplayout engine process through ffpapi. This limit no longer exists since v0.17.0, because the "piggyback" mode was introduced here. This means that ffpapi recognizes which platform it is running on, and if it is not on Linux, it starts the engine as a child process. Thus it is now possible to control ffplayout engine completely on all platforms. The disadvantage here is that the engine process is dependent on ffpapi; if it closes or crashes, the engine also closes.
Under Linux, this mode can be simulated by starting ffpapi with the environment variable `PIGGYBACK_MODE=true`. This scenario is also conceivable in container operation, for example.
**Run in piggyback mode:**
```BASH
PIGGYBACK_MODE=True ffpapi -l 127.0.0.1:8787
```
This function is experimental, use it with caution.

View File

@ -23,14 +23,13 @@ use serde::{Deserialize, Serialize};
use simplelog::*;
use sqlx::{Pool, Sqlite};
use crate::auth::{create_jwt, Claims};
use crate::db::{
handles,
models::{Channel, LoginUser, TextPreset, User},
};
use crate::utils::{
channels::{create_channel, delete_channel},
control::{control_service, control_state, media_info, send_message, Process},
control::{control_service, control_state, media_info, send_message, ControlParams, Process},
errors::ServiceError,
files::{
browser, create_directory, norm_abs_path, remove_file_or_folder, rename_file, upload,
@ -40,6 +39,10 @@ use crate::utils::{
playlist::{delete_playlist, generate_playlist, read_playlist, write_playlist},
playout_config, read_log_file, read_playout_config, Role,
};
use crate::{
auth::{create_jwt, Claims},
utils::control::ProcessControl,
};
use ffplayout_lib::{
utils::{
get_date_range, import::import_file, sec_to_time, time_to_sec, JsonPlaylist, PlayoutConfig,
@ -586,9 +589,9 @@ pub async fn send_text_message(
pub async fn control_playout(
pool: web::Data<Pool<Sqlite>>,
id: web::Path<i32>,
control: web::Json<Process>,
control: web::Json<ControlParams>,
) -> Result<impl Responder, ServiceError> {
match control_state(&pool.into_inner(), *id, control.command.clone()).await {
match control_state(&pool.into_inner(), *id, &control.control).await {
Ok(res) => Ok(res.text().await.unwrap_or_else(|_| "Success".into())),
Err(e) => Err(e),
}
@ -690,8 +693,9 @@ pub async fn process_control(
pool: web::Data<Pool<Sqlite>>,
id: web::Path<i32>,
proc: web::Json<Process>,
engine_process: web::Data<ProcessControl>,
) -> Result<impl Responder, ServiceError> {
control_service(&pool.into_inner(), *id, &proc.command).await
control_service(&pool.into_inner(), *id, &proc.command, Some(engine_process)).await
}
/// #### ffplayout Playlist Operations

View File

@ -1,3 +1,5 @@
use std::env;
use argon2::{
password_hash::{rand_core::OsRng, SaltString},
Argon2, PasswordHasher,
@ -101,6 +103,12 @@ pub async fn db_init(domain: Option<String>) -> Result<&'static str, Box<dyn std
None => "http://localhost/live/stream.m3u8".to_string(),
};
let config_path = if env::consts::OS == "linux" {
"/etc/ffplayout/ffplayout.yml"
} else {
"./assets/ffplayout.yml"
};
let query = "CREATE TRIGGER global_row_count
BEFORE INSERT ON global
WHEN (SELECT COUNT(*) FROM global) >= 1
@ -109,7 +117,7 @@ pub async fn db_init(domain: Option<String>) -> Result<&'static str, Box<dyn std
END;
INSERT INTO global(secret) VALUES($1);
INSERT INTO channels(name, preview_url, config_path, extra_extensions, service)
VALUES('Channel 1', $2, '/etc/ffplayout/ffplayout.yml', 'jpg,jpeg,png', 'ffplayout.service');
VALUES('Channel 1', $2, $3, 'jpg,jpeg,png', 'ffplayout.service');
INSERT INTO roles(name) VALUES('admin'), ('user'), ('guest');
INSERT INTO presets(name, text, x, y, fontsize, line_spacing, fontcolor, box, boxcolor, boxborderw, alpha, channel_id)
VALUES('Default', 'Wellcome to ffplayout messenger!', '(w-text_w)/2', '(h-text_h)/2', '24', '4', '#ffffff@0xff', '0', '#000000@0x80', '4', '1.0', '1'),
@ -124,6 +132,7 @@ pub async fn db_init(domain: Option<String>) -> Result<&'static str, Box<dyn std
sqlx::query(query)
.bind(secret)
.bind(url)
.bind(config_path)
.execute(&pool)
.await?;

View File

@ -25,7 +25,7 @@ use api::{
},
};
use db::{db_pool, models::LoginUser};
use utils::{args_parse::Args, db_path, init_config, run_args, Role};
use utils::{args_parse::Args, control::ProcessControl, db_path, init_config, run_args, Role};
use ffplayout_lib::utils::{init_logging, PlayoutConfig};
@ -87,6 +87,7 @@ async fn main() -> std::io::Result<()> {
let ip_port = conn.split(':').collect::<Vec<&str>>();
let addr = ip_port[0];
let port = ip_port[1].parse::<u16>().unwrap();
let engine_process = web::Data::new(ProcessControl::new());
info!("running ffplayout API, listen on {conn}");
@ -97,6 +98,7 @@ async fn main() -> std::io::Result<()> {
App::new()
.app_data(db_pool)
.app_data(engine_process.clone())
.wrap(middleware::Logger::default())
.service(login)
.service(

View File

@ -3,7 +3,10 @@ use std::fs;
use simplelog::*;
use sqlx::{Pool, Sqlite};
use crate::utils::{control::control_service, errors::ServiceError};
use crate::utils::{
control::{control_service, ServiceCmd},
errors::ServiceError,
};
use crate::db::{handles, models::Channel};
@ -25,15 +28,15 @@ pub async fn create_channel(
)?;
let new_channel = handles::insert_channel(conn, target_channel).await?;
control_service(conn, new_channel.id, "enable").await?;
control_service(conn, new_channel.id, &ServiceCmd::Enable, None).await?;
Ok(new_channel)
}
pub async fn delete_channel(conn: &Pool<Sqlite>, id: i32) -> Result<(), ServiceError> {
let channel = handles::select_channel(conn, &id).await?;
control_service(conn, channel.id, "stop").await?;
control_service(conn, channel.id, "disable").await?;
control_service(conn, channel.id, &ServiceCmd::Stop, None).await?;
control_service(conn, channel.id, &ServiceCmd::Disable, None).await?;
if let Err(e) = fs::remove_file(channel.config_path) {
error!("{e}");

View File

@ -1,11 +1,21 @@
use std::{collections::HashMap, process::Command};
use std::{
collections::HashMap,
env, fmt,
str::FromStr,
sync::atomic::{AtomicBool, Ordering},
};
use actix_web::web;
use reqwest::{
header::{HeaderMap, AUTHORIZATION, CONTENT_TYPE},
Client, Response,
};
use serde::{Deserialize, Serialize};
use sqlx::{Pool, Sqlite};
use tokio::{
process::{Child, Command},
sync::Mutex,
};
use crate::db::handles::select_channel;
use crate::utils::{errors::ServiceError, playout_config};
@ -26,8 +36,8 @@ struct TextParams {
}
#[derive(Debug, Deserialize, Serialize, Clone)]
struct ControlParams {
control: String,
pub struct ControlParams {
pub control: String,
}
#[derive(Debug, Deserialize, Serialize, Clone)]
@ -46,9 +56,142 @@ impl<T> RpcObj<T> {
}
}
/// ffplayout engine process
///
/// When running not on Linux, or with environment variable `PIGGYBACK_MODE=true`,
/// the engine get startet and controlled from ffpapi
pub struct ProcessControl {
pub engine_child: Mutex<Option<Child>>,
pub is_running: AtomicBool,
pub piggyback: AtomicBool,
}
impl ProcessControl {
pub fn new() -> Self {
let piggyback = if env::consts::OS != "linux" || env::var("PIGGYBACK_MODE").is_ok() {
AtomicBool::new(true)
} else {
AtomicBool::new(false)
};
Self {
engine_child: Mutex::new(None),
is_running: AtomicBool::new(false),
piggyback,
}
}
}
impl ProcessControl {
pub async fn start(&self) -> Result<String, ServiceError> {
#[cfg(not(debug_assertions))]
let engine_path = "ffplayout";
#[cfg(debug_assertions)]
let engine_path = "./target/debug/ffplayout";
match Command::new(engine_path).kill_on_drop(true).spawn() {
Ok(proc) => *self.engine_child.lock().await = Some(proc),
Err(_) => return Err(ServiceError::InternalServerError),
};
self.is_running.store(true, Ordering::SeqCst);
Ok("Success".to_string())
}
pub async fn stop(&self) -> Result<String, ServiceError> {
if let Some(proc) = self.engine_child.lock().await.as_mut() {
if proc.kill().await.is_err() {
return Err(ServiceError::InternalServerError);
};
}
self.wait().await?;
self.is_running.store(false, Ordering::SeqCst);
Ok("Success".to_string())
}
pub async fn restart(&self) -> Result<String, ServiceError> {
self.stop().await?;
self.start().await?;
self.is_running.store(true, Ordering::SeqCst);
Ok("Success".to_string())
}
/// Wait for process to proper close.
/// This prevents orphaned/zombi processes in system
pub async fn wait(&self) -> Result<String, ServiceError> {
if let Some(proc) = self.engine_child.lock().await.as_mut() {
if proc.wait().await.is_err() {
return Err(ServiceError::InternalServerError);
};
}
Ok("Success".to_string())
}
pub fn status(&self) -> Result<String, ServiceError> {
if self.is_running.load(Ordering::SeqCst) {
Ok("active".to_string())
} else {
Ok("not running".to_string())
}
}
}
impl Default for ProcessControl {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq)]
#[serde(rename_all = "snake_case")]
pub enum ServiceCmd {
Enable,
Disable,
Start,
Stop,
Restart,
Status,
}
impl FromStr for ServiceCmd {
type Err = String;
fn from_str(input: &str) -> Result<Self, Self::Err> {
match input.to_lowercase().as_str() {
"enable" => Ok(Self::Enable),
"disable" => Ok(Self::Disable),
"start" => Ok(Self::Start),
"stop" => Ok(Self::Stop),
"restart" => Ok(Self::Restart),
"status" => Ok(Self::Status),
_ => Err(format!("Command '{input}' not found!")),
}
}
}
impl fmt::Display for ServiceCmd {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
Self::Enable => write!(f, "enable"),
Self::Disable => write!(f, "disable"),
Self::Start => write!(f, "start"),
Self::Stop => write!(f, "stop"),
Self::Restart => write!(f, "restart"),
Self::Status => write!(f, "status"),
}
}
}
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct Process {
pub command: String,
pub command: ServiceCmd,
}
struct SystemD {
@ -110,11 +253,11 @@ impl SystemD {
Ok("Success".to_string())
}
fn status(mut self) -> Result<String, ServiceError> {
async fn status(mut self) -> Result<String, ServiceError> {
self.cmd
.append(&mut vec!["is-active".to_string(), self.service]);
let output = Command::new("sudo").args(self.cmd).output()?;
let output = Command::new("sudo").args(self.cmd).output().await?;
Ok(String::from_utf8_lossy(&output.stdout).trim().to_string())
}
@ -175,9 +318,15 @@ pub async fn send_message(
pub async fn control_state(
conn: &Pool<Sqlite>,
id: i32,
command: String,
command: &str,
) -> Result<Response, ServiceError> {
let json_obj = RpcObj::new(id, "player".into(), ControlParams { control: command });
let json_obj = RpcObj::new(
id,
"player".into(),
ControlParams {
control: command.to_owned(),
},
);
post_request(conn, id, json_obj).await
}
@ -195,17 +344,42 @@ pub async fn media_info(
pub async fn control_service(
conn: &Pool<Sqlite>,
id: i32,
command: &str,
command: &ServiceCmd,
engine: Option<web::Data<ProcessControl>>,
) -> Result<String, ServiceError> {
let system_d = SystemD::new(conn, id).await?;
if engine.is_some() && engine.as_ref().unwrap().piggyback.load(Ordering::SeqCst) {
match command {
ServiceCmd::Start => engine.unwrap().start().await,
ServiceCmd::Stop => {
if control_state(conn, id, "stop_all").await.is_ok() {
engine.unwrap().stop().await
} else {
Err(ServiceError::NoContent("Nothing to stop".to_string()))
}
}
ServiceCmd::Restart => {
if control_state(conn, id, "stop_all").await.is_ok() {
engine.unwrap().restart().await
} else {
Err(ServiceError::NoContent("Nothing to stop".to_string()))
}
}
ServiceCmd::Status => engine.unwrap().status(),
_ => Err(ServiceError::Conflict(
"Engine runs in piggyback mode, in this mode this command is not allowed."
.to_string(),
)),
}
} else {
let system_d = SystemD::new(conn, id).await?;
match command {
"enable" => system_d.enable(),
"disable" => system_d.disable(),
"start" => system_d.start(),
"stop" => system_d.stop(),
"restart" => system_d.restart(),
"status" => system_d.status(),
_ => Err(ServiceError::BadRequest("Command not found!".to_string())),
match command {
ServiceCmd::Enable => system_d.enable(),
ServiceCmd::Disable => system_d.disable(),
ServiceCmd::Start => system_d.start(),
ServiceCmd::Stop => system_d.stop(),
ServiceCmd::Restart => system_d.restart(),
ServiceCmd::Status => system_d.status().await,
}
}
}

View File

@ -20,7 +20,7 @@ use ffplayout_lib::{
fn server_monitor(
level: &str,
buffer: BufReader<ChildStderr>,
mut proc_ctl: ProcessControl,
proc_ctl: ProcessControl,
) -> Result<(), Error> {
for line in buffer.lines() {
let line = line?;
@ -30,7 +30,7 @@ fn server_monitor(
}
if line.contains("rtmp") && line.contains("Unexpected stream") && !valid_stream(&line) {
if let Err(e) = proc_ctl.kill(Ingest) {
if let Err(e) = proc_ctl.stop(Ingest) {
error!("{e}");
};
}
@ -39,7 +39,7 @@ fn server_monitor(
.iter()
.any(|i| line.contains(*i))
{
proc_ctl.kill_all();
proc_ctl.stop_all();
}
}
@ -52,7 +52,7 @@ fn server_monitor(
pub fn ingest_server(
config: PlayoutConfig,
ingest_sender: Sender<(usize, [u8; 65088])>,
mut proc_control: ProcessControl,
proc_control: ProcessControl,
) -> Result<(), Error> {
let mut buffer: [u8; 65088] = [0; 65088];
let mut server_cmd = vec_strings!["-hide_banner", "-nostats", "-v", "level+info"];
@ -76,7 +76,7 @@ pub fn ingest_server(
if let Some(url) = stream_input.iter().find(|s| s.contains("://")) {
if !test_tcp_port(url) {
proc_control.kill_all();
proc_control.stop_all();
exit(1);
}

View File

@ -41,7 +41,7 @@ use ffplayout_lib::{
fn ingest_to_hls_server(
config: PlayoutConfig,
playout_stat: PlayoutStatus,
mut proc_control: ProcessControl,
proc_control: ProcessControl,
) -> Result<(), Error> {
let playlist_init = playout_stat.list_init;
let level = config.logging.ffmpeg_level.clone();
@ -56,7 +56,7 @@ fn ingest_to_hls_server(
if let Some(url) = stream_input.iter().find(|s| s.contains("://")) {
if !test_tcp_port(url) {
proc_control.kill_all();
proc_control.stop_all();
exit(1);
}
@ -93,7 +93,7 @@ fn ingest_to_hls_server(
let line = line?;
if line.contains("rtmp") && line.contains("Unexpected stream") && !valid_stream(&line) {
if let Err(e) = proc_ctl.kill(Ingest) {
if let Err(e) = proc_ctl.stop(Ingest) {
error!("{e}");
};
}
@ -105,7 +105,7 @@ fn ingest_to_hls_server(
info!("Switch from {} to live ingest", config.processing.mode);
if let Err(e) = proc_control.kill(Encoder) {
if let Err(e) = proc_control.stop(Encoder) {
error!("{e}");
}
}
@ -140,7 +140,7 @@ pub fn write_hls(
config: &PlayoutConfig,
play_control: PlayerControl,
playout_stat: PlayoutStatus,
mut proc_control: ProcessControl,
proc_control: ProcessControl,
) {
let config_clone = config.clone();
let ff_log_format = format!("level+{}", config.logging.ffmpeg_level.to_lowercase());
@ -217,5 +217,5 @@ pub fn write_hls(
sleep(Duration::from_secs(1));
proc_control.kill_all();
proc_control.stop_all();
}

View File

@ -36,7 +36,7 @@ pub fn player(
config: &PlayoutConfig,
play_control: PlayerControl,
playout_stat: PlayoutStatus,
mut proc_control: ProcessControl,
proc_control: ProcessControl,
) {
let config_clone = config.clone();
let ff_log_format = format!("level+{}", config.logging.ffmpeg_level.to_lowercase());
@ -149,7 +149,7 @@ pub fn player(
error!("Encoder error: {e}")
}
if let Err(e) = proc_control.kill(Decoder) {
if let Err(e) = proc_control.stop(Decoder) {
error!("{e}")
}
@ -208,7 +208,7 @@ pub fn player(
sleep(Duration::from_secs(1));
proc_control.kill_all();
proc_control.stop_all();
if let Err(e) = error_encoder_thread.join() {
error!("{e:?}");

View File

@ -149,7 +149,7 @@ pub fn json_rpc_server(
config: PlayoutConfig,
play_control: PlayerControl,
playout_stat: PlayoutStatus,
mut proc_control: ProcessControl,
proc_control: ProcessControl,
) {
let addr = config.rpc_server.address.clone();
let auth = config.rpc_server.authorization.clone();
@ -187,7 +187,7 @@ pub fn json_rpc_server(
)) {
return Ok(Value::String(reply));
};
} else if let Err(e) = proc.kill(Ingest) {
} else if let Err(e) = proc.stop(Ingest) {
error!("Ingest {e:?}")
}
}
@ -310,6 +310,13 @@ pub fn json_rpc_server(
return Ok(Value::String("Reset playout state failed".to_string()));
}
// stop playout
if map.contains_key("control") && &map["control"] == "stop_all" {
proc.stop_all();
return Ok(Value::String("Stop playout!".to_string()));
}
// get infos about current clip
if map.contains_key("media") && &map["media"] == "current" {
if let Some(media) = play_control.current_media.lock().unwrap().clone() {
@ -383,7 +390,7 @@ pub fn json_rpc_server(
}
Err(e) => {
error!("Unable to start RPC server: {e}");
proc_control.kill_all();
proc_control.stop_all();
exit(1);
}

@ -1 +1 @@
Subproject commit 590026119cbeb1c7b166f7f45b6dfa44628f6597
Subproject commit 80d1fd8d625aafaf64aea586d2e390a87d6b3b81

View File

@ -75,7 +75,7 @@ impl Default for ProcessControl {
}
impl ProcessControl {
pub fn kill(&self, unit: ProcessUnit) -> Result<(), String> {
pub fn stop(&self, unit: ProcessUnit) -> Result<(), String> {
match unit {
Decoder => {
if let Some(proc) = self.decoder_term.lock().unwrap().as_mut() {
@ -136,18 +136,25 @@ impl ProcessControl {
}
/// No matter what is running, terminate them all.
pub fn kill_all(&mut self) {
pub fn stop_all(&self) {
debug!("Stop all child processes");
self.is_terminated.store(true, Ordering::SeqCst);
self.server_is_running.store(false, Ordering::SeqCst);
if self.is_alive.load(Ordering::SeqCst) {
self.is_alive.store(false, Ordering::SeqCst);
if let Some(rpc) = &*self.rpc_handle.lock().unwrap() {
rpc.clone().close()
};
// if let Some(rpc) = &*self.rpc_handle.lock().unwrap() {
// rpc.clone().close()
// };
for unit in [Decoder, Encoder, Ingest] {
if let Err(e) = self.kill(unit) {
if let Err(e) = self.stop(unit) {
if !e.contains("exited process") {
error!("{e}")
}
}
if let Err(e) = self.wait(unit) {
if !e.contains("exited process") {
error!("{e}")
}
@ -159,7 +166,7 @@ impl ProcessControl {
// impl Drop for ProcessControl {
// fn drop(&mut self) {
// self.kill_all()
// self.stop_all()
// }
// }

View File

@ -635,7 +635,7 @@ pub fn include_file(config: PlayoutConfig, file_path: &Path) -> bool {
pub fn stderr_reader(
buffer: BufReader<ChildStderr>,
suffix: ProcessUnit,
mut proc_control: ProcessControl,
proc_control: ProcessControl,
) -> Result<(), Error> {
for line in buffer.lines() {
let line = line?;
@ -666,7 +666,7 @@ pub fn stderr_reader(
|| (line.contains("No such file or directory")
&& !line.contains("failed to delete old segment"))
{
proc_control.kill_all();
proc_control.stop_all();
exit(1);
}
}

View File

@ -26,7 +26,7 @@ reqwest = { version = "0.11", features = ["blocking", "json"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
serde_yaml = "0.9"
serial_test = "1.0"
serial_test = { git = "https://github.com/palfrey/serial_test.git", branch = "remove-ignore-ignore" }
shlex = "1.1"
simplelog = { version = "^0.12", features = ["paris"] }
time = { version = "0.3", features = ["formatting", "macros"] }

View File

@ -9,12 +9,12 @@ use simplelog::*;
use ffplayout::output::player;
use ffplayout_lib::{utils::*, vec_strings};
fn timed_kill(sec: u64, mut proc_ctl: ProcessControl) {
fn timed_stop(sec: u64, proc_ctl: ProcessControl) {
sleep(Duration::from_secs(sec));
info!("Timed kill of process");
info!("Timed stop of process");
proc_ctl.kill_all();
proc_ctl.stop_all();
}
#[test]
@ -49,7 +49,7 @@ fn playlist_change_at_midnight() {
mock_time::set_mock_time("2023-02-08T23:59:45");
thread::spawn(move || timed_kill(28, proc_ctl));
thread::spawn(move || timed_stop(28, proc_ctl));
player(&config, play_control, playout_stat.clone(), proc_control);
@ -90,7 +90,7 @@ fn playlist_change_at_six() {
mock_time::set_mock_time("2023-02-09T05:59:45");
thread::spawn(move || timed_kill(28, proc_ctl));
thread::spawn(move || timed_stop(28, proc_ctl));
player(&config, play_control, playout_stat.clone(), proc_control);