rename kill with stop (we are not killing anybody...), stop engine over rpc. #268

This commit is contained in:
jb-alvarado 2023-02-21 14:10:06 +01:00
parent 3220610c3e
commit 6c5264ea5f
13 changed files with 90 additions and 43 deletions

View File

@ -46,11 +46,12 @@ Check the [releases](https://github.com/ffplayout/ffplayout/releases/latest) for
- JSON RPC server, to get information about what is playing and to control it
- [live ingest](/docs/live_ingest.md)
- image source (will loop until out duration is reached)
- extra audio source (experimental *) (has priority over audio from video source)
- extra audio source, has priority over audio from video (experimental *)
- [multiple audio tracks](/docs/multi_audio.md) (experimental *)
- [custom filters](/docs/custom_filters.md) globally in config, or in playlist for specific clips
- import playlist from text or m3u file, with CLI or frontend
- audio only, for radio mode (experimental *)
- [Piggyback Mode](/ffplayout-api/README.md#piggyback-mode), mostly for non Linux systems (experimental *)
For preview stream, read: [/docs/preview_stream.md](/docs/preview_stream.md)

View File

@ -42,3 +42,22 @@ If you plan to run ffpapi with systemd set permission from **/usr/share/ffplayou
**For possible endpoints read: [api endpoints](/docs/api.md)**
ffpapi can also serve the browser based frontend, just run in your browser `127.0.0.1:8787`.
"Piggyback" Mode
-----
ffplayout was originally planned to run under Linux as a SystemD service. It is also designed so that the engine and ffpapi run completely independently of each other. This is to increase flexibility and stability.
Nevertheless, programs compiled in Rust can basically run on all systems supported by the language. And so this repo also offers binaries for other platforms.
In the past, however, it was only possible under Linux to start/stop/restart the ffplayout engine process through ffpapi. This limit no longer exists since v0.17.0, because the "piggyback" mode was introduced here. This means that ffpapi recognizes which platform it is running on, and if it is not on Linux, it starts the engine as a child process. Thus it is now possible to control ffplayout engine completely on all platforms. The disadvantage here is that the engine process is dependent on ffpapi; if it closes or crashes, the engine also closes.
Under Linux, this mode can be simulated by starting ffpapi with the environment variable `PIGGYBACK_MODE=true`. This scenario is also conceivable in container operation, for example.
**Run in piggyback mode:**
```BASH
PIGGYBACK_MODE=True ffpapi -l 127.0.0.1:8787
```
This function is experimental, use it with caution.

View File

@ -1,3 +1,5 @@
use std::env;
use argon2::{
password_hash::{rand_core::OsRng, SaltString},
Argon2, PasswordHasher,
@ -101,6 +103,12 @@ pub async fn db_init(domain: Option<String>) -> Result<&'static str, Box<dyn std
None => "http://localhost/live/stream.m3u8".to_string(),
};
let config_path = if env::consts::OS == "linux" {
"/etc/ffplayout/ffplayout.yml"
} else {
"./assets/ffplayout.yml"
};
let query = "CREATE TRIGGER global_row_count
BEFORE INSERT ON global
WHEN (SELECT COUNT(*) FROM global) >= 1
@ -109,7 +117,7 @@ pub async fn db_init(domain: Option<String>) -> Result<&'static str, Box<dyn std
END;
INSERT INTO global(secret) VALUES($1);
INSERT INTO channels(name, preview_url, config_path, extra_extensions, service)
VALUES('Channel 1', $2, '/etc/ffplayout/ffplayout.yml', 'jpg,jpeg,png', 'ffplayout.service');
VALUES('Channel 1', $2, $3, 'jpg,jpeg,png', 'ffplayout.service');
INSERT INTO roles(name) VALUES('admin'), ('user'), ('guest');
INSERT INTO presets(name, text, x, y, fontsize, line_spacing, fontcolor, box, boxcolor, boxborderw, alpha, channel_id)
VALUES('Default', 'Wellcome to ffplayout messenger!', '(w-text_w)/2', '(h-text_h)/2', '24', '4', '#ffffff@0xff', '0', '#000000@0x80', '4', '1.0', '1'),
@ -124,6 +132,7 @@ pub async fn db_init(domain: Option<String>) -> Result<&'static str, Box<dyn std
sqlx::query(query)
.bind(secret)
.bind(url)
.bind(config_path)
.execute(&pool)
.await?;

View File

@ -100,7 +100,7 @@ impl ProcessControl {
Ok("Success".to_string())
}
pub async fn kill(&self) -> Result<String, ServiceError> {
pub async fn stop(&self) -> Result<String, ServiceError> {
if let Some(proc) = self.engine_child.lock().await.as_mut() {
if proc.kill().await.is_err() {
return Err(ServiceError::InternalServerError);
@ -114,7 +114,7 @@ impl ProcessControl {
}
pub async fn restart(&self) -> Result<String, ServiceError> {
self.kill().await?;
self.stop().await?;
self.start().await?;
self.is_running.store(true, Ordering::SeqCst);
@ -350,8 +350,20 @@ pub async fn control_service(
if engine.is_some() && engine.as_ref().unwrap().piggyback.load(Ordering::SeqCst) {
match command {
ServiceCmd::Start => engine.unwrap().start().await,
ServiceCmd::Stop => engine.unwrap().kill().await,
ServiceCmd::Restart => engine.unwrap().restart().await,
ServiceCmd::Stop => {
if control_state(conn, id, "stop_all").await.is_ok() {
engine.unwrap().stop().await
} else {
Err(ServiceError::NoContent("Nothing to stop".to_string()))
}
}
ServiceCmd::Restart => {
if control_state(conn, id, "stop_all").await.is_ok() {
engine.unwrap().restart().await
} else {
Err(ServiceError::NoContent("Nothing to stop".to_string()))
}
}
ServiceCmd::Status => engine.unwrap().status(),
_ => Err(ServiceError::Conflict(
"Engine runs in piggyback mode, in this mode this command is not allowed."

View File

@ -20,7 +20,7 @@ use ffplayout_lib::{
fn server_monitor(
level: &str,
buffer: BufReader<ChildStderr>,
mut proc_ctl: ProcessControl,
proc_ctl: ProcessControl,
) -> Result<(), Error> {
for line in buffer.lines() {
let line = line?;
@ -30,7 +30,7 @@ fn server_monitor(
}
if line.contains("rtmp") && line.contains("Unexpected stream") && !valid_stream(&line) {
if let Err(e) = proc_ctl.kill(Ingest) {
if let Err(e) = proc_ctl.stop(Ingest) {
error!("{e}");
};
}
@ -39,7 +39,7 @@ fn server_monitor(
.iter()
.any(|i| line.contains(*i))
{
proc_ctl.kill_all();
proc_ctl.stop_all();
}
}
@ -52,7 +52,7 @@ fn server_monitor(
pub fn ingest_server(
config: PlayoutConfig,
ingest_sender: Sender<(usize, [u8; 65088])>,
mut proc_control: ProcessControl,
proc_control: ProcessControl,
) -> Result<(), Error> {
let mut buffer: [u8; 65088] = [0; 65088];
let mut server_cmd = vec_strings!["-hide_banner", "-nostats", "-v", "level+info"];
@ -76,7 +76,7 @@ pub fn ingest_server(
if let Some(url) = stream_input.iter().find(|s| s.contains("://")) {
if !test_tcp_port(url) {
proc_control.kill_all();
proc_control.stop_all();
exit(1);
}

View File

@ -81,7 +81,7 @@ fn fake_time(args: &Args) {
}
}
fn main() -> Result<(), ()> {
fn main() {
let args = get_args();
// use fake time function only in debugging mode
@ -167,7 +167,4 @@ fn main() -> Result<(), ()> {
}
drop(msg);
// drop(proc_control);
Ok(())
}

View File

@ -41,7 +41,7 @@ use ffplayout_lib::{
fn ingest_to_hls_server(
config: PlayoutConfig,
playout_stat: PlayoutStatus,
mut proc_control: ProcessControl,
proc_control: ProcessControl,
) -> Result<(), Error> {
let playlist_init = playout_stat.list_init;
let level = config.logging.ffmpeg_level.clone();
@ -56,7 +56,7 @@ fn ingest_to_hls_server(
if let Some(url) = stream_input.iter().find(|s| s.contains("://")) {
if !test_tcp_port(url) {
proc_control.kill_all();
proc_control.stop_all();
exit(1);
}
@ -93,7 +93,7 @@ fn ingest_to_hls_server(
let line = line?;
if line.contains("rtmp") && line.contains("Unexpected stream") && !valid_stream(&line) {
if let Err(e) = proc_ctl.kill(Ingest) {
if let Err(e) = proc_ctl.stop(Ingest) {
error!("{e}");
};
}
@ -105,7 +105,7 @@ fn ingest_to_hls_server(
info!("Switch from {} to live ingest", config.processing.mode);
if let Err(e) = proc_control.kill(Encoder) {
if let Err(e) = proc_control.stop(Encoder) {
error!("{e}");
}
}
@ -140,7 +140,7 @@ pub fn write_hls(
config: &PlayoutConfig,
play_control: PlayerControl,
playout_stat: PlayoutStatus,
mut proc_control: ProcessControl,
proc_control: ProcessControl,
) {
let config_clone = config.clone();
let ff_log_format = format!("level+{}", config.logging.ffmpeg_level.to_lowercase());
@ -217,5 +217,5 @@ pub fn write_hls(
sleep(Duration::from_secs(1));
proc_control.kill_all();
proc_control.stop_all();
}

View File

@ -36,7 +36,7 @@ pub fn player(
config: &PlayoutConfig,
play_control: PlayerControl,
playout_stat: PlayoutStatus,
mut proc_control: ProcessControl,
proc_control: ProcessControl,
) {
let config_clone = config.clone();
let ff_log_format = format!("level+{}", config.logging.ffmpeg_level.to_lowercase());
@ -149,7 +149,7 @@ pub fn player(
error!("Encoder error: {e}")
}
if let Err(e) = proc_control.kill(Decoder) {
if let Err(e) = proc_control.stop(Decoder) {
error!("{e}")
}
@ -208,7 +208,7 @@ pub fn player(
sleep(Duration::from_secs(1));
proc_control.kill_all();
proc_control.stop_all();
if let Err(e) = error_encoder_thread.join() {
error!("{e:?}");

View File

@ -149,7 +149,7 @@ pub fn json_rpc_server(
config: PlayoutConfig,
play_control: PlayerControl,
playout_stat: PlayoutStatus,
mut proc_control: ProcessControl,
proc_control: ProcessControl,
) {
let addr = config.rpc_server.address.clone();
let auth = config.rpc_server.authorization.clone();
@ -187,7 +187,7 @@ pub fn json_rpc_server(
)) {
return Ok(Value::String(reply));
};
} else if let Err(e) = proc.kill(Ingest) {
} else if let Err(e) = proc.stop(Ingest) {
error!("Ingest {e:?}")
}
}
@ -310,6 +310,13 @@ pub fn json_rpc_server(
return Ok(Value::String("Reset playout state failed".to_string()));
}
// stop playout
if map.contains_key("control") && &map["control"] == "stop_all" {
proc.stop_all();
return Ok(Value::String("Stop playout!".to_string()));
}
// get infos about current clip
if map.contains_key("media") && &map["media"] == "current" {
if let Some(media) = play_control.current_media.lock().unwrap().clone() {
@ -383,7 +390,7 @@ pub fn json_rpc_server(
}
Err(e) => {
error!("Unable to start RPC server: {e}");
proc_control.kill_all();
proc_control.stop_all();
exit(1);
}

@ -1 +1 @@
Subproject commit bd7d2a21e3450b2489717b3e37e016864698e8a0
Subproject commit 80d1fd8d625aafaf64aea586d2e390a87d6b3b81

View File

@ -75,7 +75,7 @@ impl Default for ProcessControl {
}
impl ProcessControl {
pub fn kill(&self, unit: ProcessUnit) -> Result<(), String> {
pub fn stop(&self, unit: ProcessUnit) -> Result<(), String> {
match unit {
Decoder => {
if let Some(proc) = self.decoder_term.lock().unwrap().as_mut() {
@ -136,18 +136,20 @@ impl ProcessControl {
}
/// No matter what is running, terminate them all.
pub fn kill_all(&mut self) {
pub fn stop_all(&self) {
debug!("Stop all child processes");
self.is_terminated.store(true, Ordering::SeqCst);
self.server_is_running.store(false, Ordering::SeqCst);
if self.is_alive.load(Ordering::SeqCst) {
self.is_alive.store(false, Ordering::SeqCst);
if let Some(rpc) = &*self.rpc_handle.lock().unwrap() {
rpc.clone().close()
};
// if let Some(rpc) = &*self.rpc_handle.lock().unwrap() {
// rpc.clone().close()
// };
for unit in [Encoder, Decoder, Ingest] {
if let Err(e) = self.kill(unit) {
for unit in [Decoder, Encoder, Ingest] {
if let Err(e) = self.stop(unit) {
if !e.contains("exited process") {
error!("{e}")
}
@ -164,7 +166,7 @@ impl ProcessControl {
// impl Drop for ProcessControl {
// fn drop(&mut self) {
// self.kill_all()
// self.stop_all()
// }
// }

View File

@ -635,7 +635,7 @@ pub fn include_file(config: PlayoutConfig, file_path: &Path) -> bool {
pub fn stderr_reader(
buffer: BufReader<ChildStderr>,
suffix: ProcessUnit,
mut proc_control: ProcessControl,
proc_control: ProcessControl,
) -> Result<(), Error> {
for line in buffer.lines() {
let line = line?;
@ -666,7 +666,7 @@ pub fn stderr_reader(
|| (line.contains("No such file or directory")
&& !line.contains("failed to delete old segment"))
{
proc_control.kill_all();
proc_control.stop_all();
exit(1);
}
}

View File

@ -9,12 +9,12 @@ use simplelog::*;
use ffplayout::output::player;
use ffplayout_lib::{utils::*, vec_strings};
fn timed_kill(sec: u64, mut proc_ctl: ProcessControl) {
fn timed_stop(sec: u64, proc_ctl: ProcessControl) {
sleep(Duration::from_secs(sec));
info!("Timed kill of process");
info!("Timed stop of process");
proc_ctl.kill_all();
proc_ctl.stop_all();
}
#[test]
@ -49,7 +49,7 @@ fn playlist_change_at_midnight() {
mock_time::set_mock_time("2023-02-08T23:59:45");
thread::spawn(move || timed_kill(28, proc_ctl));
thread::spawn(move || timed_stop(28, proc_ctl));
player(&config, play_control, playout_stat.clone(), proc_control);
@ -90,7 +90,7 @@ fn playlist_change_at_six() {
mock_time::set_mock_time("2023-02-09T05:59:45");
thread::spawn(move || timed_kill(28, proc_ctl));
thread::spawn(move || timed_stop(28, proc_ctl));
player(&config, play_control, playout_stat.clone(), proc_control);