use async process::Command

This commit is contained in:
jb-alvarado 2023-02-20 22:12:33 +01:00
parent 7e5a391e3d
commit b4004d7d8b
6 changed files with 139 additions and 47 deletions

80
Cargo.lock generated
View File

@ -1019,6 +1019,7 @@ dependencies = [
"serde_yaml",
"simplelog",
"sqlx",
"tokio",
]
[[package]]
@ -2415,6 +2416,27 @@ dependencies = [
"semver",
]
[[package]]
name = "rustls"
version = "0.20.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fff78fc74d175294f4e83b28343315ffcfb114b156f0185e9741cb5570f50e2f"
dependencies = [
"log",
"ring",
"sct",
"webpki",
]
[[package]]
name = "rustls-pemfile"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d194b56d58803a43635bdc398cd17e383d6f71f9182b9a192c127ca42494a59b"
dependencies = [
"base64 0.21.0",
]
[[package]]
name = "ryu"
version = "1.0.12"
@ -2461,6 +2483,16 @@ version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ddccb15bcce173023b3fedd9436f882a0739b8dfb45e4f6b6002bee5929f61b2"
[[package]]
name = "sct"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d53dcdb7c9f8158937a7981b48accfd39a43af418591a5d008c7b22b5e1b7ca4"
dependencies = [
"ring",
"untrusted",
]
[[package]]
name = "security-framework"
version = "2.8.2"
@ -2724,6 +2756,8 @@ dependencies = [
"once_cell",
"paste",
"percent-encoding",
"rustls",
"rustls-pemfile",
"sha2",
"smallvec",
"sqlformat",
@ -2732,6 +2766,7 @@ dependencies = [
"thiserror",
"tokio-stream",
"url",
"webpki-roots",
]
[[package]]
@ -2759,10 +2794,9 @@ version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "24c5b2d25fa654cc5f841750b8e1cdedbe21189bf9a9382ee90bfa9dd3562396"
dependencies = [
"native-tls",
"once_cell",
"tokio",
"tokio-native-tls",
"tokio-rustls",
]
[[package]]
@ -2945,9 +2979,21 @@ dependencies = [
"pin-project-lite",
"signal-hook-registry",
"socket2",
"tokio-macros",
"windows-sys 0.42.0",
]
[[package]]
name = "tokio-macros"
version = "1.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d266c00fde287f55d3f1c3e96c500c362a2b8c695076ec180f27918820bc6df8"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "tokio-native-tls"
version = "0.3.1"
@ -2958,6 +3004,17 @@ dependencies = [
"tokio",
]
[[package]]
name = "tokio-rustls"
version = "0.23.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c43ee83903113e03984cb9e5cebe6c04a5116269e900e3ddba8f068a62adda59"
dependencies = [
"rustls",
"tokio",
"webpki",
]
[[package]]
name = "tokio-stream"
version = "0.1.11"
@ -3253,6 +3310,25 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "webpki"
version = "0.22.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f095d78192e208183081cc07bc5515ef55216397af48b873e5edcd72637fa1bd"
dependencies = [
"ring",
"untrusted",
]
[[package]]
name = "webpki-roots"
version = "0.22.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6c71e40d7d2c34a5106301fb632274ca37242cd0c9d3e64dbece371a40a2d87"
dependencies = [
"webpki",
]
[[package]]
name = "wepoll-ffi"
version = "0.1.2"

View File

@ -34,7 +34,8 @@ serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
serde_yaml = "0.9"
simplelog = { version = "^0.12", features = ["paris"] }
sqlx = { version = "0.6", features = ["runtime-actix-native-tls", "sqlite"] }
sqlx = { version = "0.6", features = ["runtime-tokio-rustls", "sqlite"] }
tokio = { version = "1.25", features = ["full"] }
[[bin]]
name = "ffpapi"

View File

@ -87,6 +87,7 @@ async fn main() -> std::io::Result<()> {
let ip_port = conn.split(':').collect::<Vec<&str>>();
let addr = ip_port[0];
let port = ip_port[1].parse::<u16>().unwrap();
let engine_process = web::Data::new(ProcessControl::new());
info!("running ffplayout API, listen on {conn}");
@ -94,11 +95,10 @@ async fn main() -> std::io::Result<()> {
HttpServer::new(move || {
let auth = HttpAuthentication::bearer(validator);
let db_pool = web::Data::new(pool.clone());
let engine_process = web::Data::new(ProcessControl::new());
App::new()
.app_data(db_pool)
.app_data(engine_process)
.app_data(engine_process.clone())
.wrap(middleware::Logger::default())
.service(login)
.service(

View File

@ -1,13 +1,8 @@
use std::{
collections::HashMap,
env, fmt,
process::Child,
process::Command,
str::FromStr,
sync::{
atomic::{AtomicBool, Ordering},
Mutex,
},
sync::atomic::{AtomicBool, Ordering},
};
use actix_web::web;
@ -17,6 +12,10 @@ use reqwest::{
};
use serde::{Deserialize, Serialize};
use sqlx::{Pool, Sqlite};
use tokio::{
process::{Child, Command},
sync::Mutex,
};
use crate::db::handles::select_channel;
use crate::utils::{errors::ServiceError, playout_config};
@ -60,25 +59,39 @@ impl<T> RpcObj<T> {
/// ffplayout engine process
///
/// When running not on Linux, or with environment variable `PIGGYBACK_MODE=true`,
/// the engine get startet and conntrolled from ffpapi
/// the engine get startet and controlled from ffpapi
pub struct ProcessControl {
pub engine_child: Mutex<Option<Child>>,
pub is_running: AtomicBool,
pub piggyback: AtomicBool,
}
impl ProcessControl {
pub fn new() -> Self {
let piggyback = if env::consts::OS != "linux" || env::var("PIGGYBACK_MODE").is_ok() {
AtomicBool::new(true)
} else {
AtomicBool::new(false)
};
Self {
engine_child: Mutex::new(None),
is_running: AtomicBool::new(false),
piggyback,
}
}
}
impl ProcessControl {
pub fn start(&self) -> Result<String, ServiceError> {
match Command::new("ffplayout").spawn() {
Ok(proc) => *self.engine_child.lock().unwrap() = Some(proc),
pub async fn start(&self) -> Result<String, ServiceError> {
#[cfg(not(debug_assertions))]
let engine_path = "ffplayout";
#[cfg(debug_assertions)]
let engine_path = "./target/debug/ffplayout";
match Command::new(engine_path).kill_on_drop(true).spawn() {
Ok(proc) => *self.engine_child.lock().await = Some(proc),
Err(_) => return Err(ServiceError::InternalServerError),
};
@ -87,26 +100,22 @@ impl ProcessControl {
Ok("Success".to_string())
}
pub fn kill(&self) -> Result<String, ServiceError> {
if let Some(proc) = self.engine_child.lock().unwrap().as_mut() {
if proc.kill().is_err() {
pub async fn kill(&self) -> Result<String, ServiceError> {
if let Some(proc) = self.engine_child.lock().await.as_mut() {
if proc.kill().await.is_err() {
return Err(ServiceError::InternalServerError);
};
}
self.wait()?;
self.wait().await?;
self.is_running.store(false, Ordering::SeqCst);
Ok("Success".to_string())
}
pub fn restart(&self) -> Result<String, ServiceError> {
self.kill()?;
match Command::new("ffplayout").spawn() {
Ok(proc) => *self.engine_child.lock().unwrap() = Some(proc),
Err(_) => return Err(ServiceError::InternalServerError),
};
pub async fn restart(&self) -> Result<String, ServiceError> {
self.kill().await?;
self.start().await?;
self.is_running.store(true, Ordering::SeqCst);
@ -115,9 +124,9 @@ impl ProcessControl {
/// Wait for process to proper close.
/// This prevents orphaned/zombi processes in system
pub fn wait(&self) -> Result<String, ServiceError> {
if let Some(proc) = self.engine_child.lock().unwrap().as_mut() {
if proc.wait().is_err() {
pub async fn wait(&self) -> Result<String, ServiceError> {
if let Some(proc) = self.engine_child.lock().await.as_mut() {
if proc.wait().await.is_err() {
return Err(ServiceError::InternalServerError);
};
}
@ -244,11 +253,11 @@ impl SystemD {
Ok("Success".to_string())
}
fn status(mut self) -> Result<String, ServiceError> {
async fn status(mut self) -> Result<String, ServiceError> {
self.cmd
.append(&mut vec!["is-active".to_string(), self.service]);
let output = Command::new("sudo").args(self.cmd).output()?;
let output = Command::new("sudo").args(self.cmd).output().await?;
Ok(String::from_utf8_lossy(&output.stdout).trim().to_string())
}
@ -338,13 +347,11 @@ pub async fn control_service(
command: &ServiceCmd,
engine: Option<web::Data<ProcessControl>>,
) -> Result<String, ServiceError> {
let piggyback = env::var("PIGGYBACK_MODE");
if (env::consts::OS != "linux" || piggyback.is_ok()) && engine.is_some() {
if engine.is_some() && engine.as_ref().unwrap().piggyback.load(Ordering::SeqCst) {
match command {
ServiceCmd::Start => engine.unwrap().start(),
ServiceCmd::Stop => engine.unwrap().kill(),
ServiceCmd::Restart => engine.unwrap().restart(),
ServiceCmd::Start => engine.unwrap().start().await,
ServiceCmd::Stop => engine.unwrap().kill().await,
ServiceCmd::Restart => engine.unwrap().restart().await,
ServiceCmd::Status => engine.unwrap().status(),
_ => Err(ServiceError::Conflict(
"Engine runs in piggyback mode, in this mode this command is not allowed."
@ -360,7 +367,7 @@ pub async fn control_service(
ServiceCmd::Start => system_d.start(),
ServiceCmd::Stop => system_d.stop(),
ServiceCmd::Restart => system_d.restart(),
ServiceCmd::Status => system_d.status(),
ServiceCmd::Status => system_d.status().await,
}
}
}

View File

@ -81,7 +81,7 @@ fn fake_time(args: &Args) {
}
}
fn main() {
fn main() -> Result<(), ()> {
let args = get_args();
// use fake time function only in debugging mode
@ -153,9 +153,9 @@ fn main() {
match config.out.mode {
// write files/playlist to HLS m3u8 playlist
HLS => write_hls(&config, play_control, playout_stat, proc_control),
HLS => write_hls(&config, play_control, playout_stat, proc_control.clone()),
// play on desktop or stream to a remote target
_ => player(&config, play_control, playout_stat, proc_control),
_ => player(&config, play_control, playout_stat, proc_control.clone()),
}
info!("Playout done...");
@ -167,4 +167,7 @@ fn main() {
}
drop(msg);
drop(proc_control);
Ok(())
}

View File

@ -146,22 +146,27 @@ impl ProcessControl {
rpc.clone().close()
};
for unit in [Decoder, Encoder, Ingest] {
for unit in [Encoder, Decoder, Ingest] {
if let Err(e) = self.kill(unit) {
if !e.contains("exited process") {
error!("{e}")
}
}
if let Err(e) = self.wait(unit) {
if !e.contains("exited process") {
error!("{e}")
}
}
}
}
}
}
// impl Drop for ProcessControl {
// fn drop(&mut self) {
// self.kill_all()
// }
// }
impl Drop for ProcessControl {
fn drop(&mut self) {
self.kill_all()
}
}
/// Global player control, to get infos about current clip etc.
#[derive(Clone)]