From 30435d3adaafcd67b9e38bf701712201a4e078d9 Mon Sep 17 00:00:00 2001 From: jb-alvarado Date: Thu, 2 Jun 2022 15:41:20 +0200 Subject: [PATCH 1/3] update Readme --- README.md | 1 - docs/multiple_outputs.md | 46 ------------------------ docs/output.md | 75 ++++++++++++++++++++++------------------ 3 files changed, 41 insertions(+), 81 deletions(-) delete mode 100644 docs/multiple_outputs.md diff --git a/README.md b/README.md index 92c215f3..0add514b 100644 --- a/README.md +++ b/README.md @@ -42,7 +42,6 @@ The main purpose of ffplayout is to provide a 24/7 broadcasting solution that pl - **HLS** - JSON RPC server, for getting infos about current playing and controlling - [live ingest](/docs/live_ingest.md) -- [multiple outputs](/docs/multiple_outputs.md) Requirements ----- diff --git a/docs/multiple_outputs.md b/docs/multiple_outputs.md deleted file mode 100644 index f63dbc5d..00000000 --- a/docs/multiple_outputs.md +++ /dev/null @@ -1,46 +0,0 @@ -### Multiple Outputs - -ffplayout supports multiple outputs in a way, that it can output the same stream to multiple targets with different encoding settings. - -For example you want to stream different resolutions, you could apply this output parameters: - -```YAML - ... - - output_param: >- - -c:v libx264 - -crf 23 - -x264-params keyint=50:min-keyint=25:scenecut=-1 - -maxrate 1300k - -bufsize 2600k - -preset faster - -tune zerolatency - -profile:v Main - -level 3.1 - -c:a aac - -ar 44100 - -b:a 128k - -flags +global_header - -f flv rtmp://example.org/live/stream-high - -s 960x540 - -c:v libx264 - -crf 23 - -x264-params keyint=50:min-keyint=25:scenecut=-1 - -maxrate 1000k - -bufsize 1800k - -preset faster - -tune zerolatency - -profile:v Main - -level 3.1 - -c:a aac - -ar 44100 - -b:a 128k - -flags +global_header - -f flv rtmp://example.org/live/stream-low -``` - -When you are using the text overlay filter, it will apply to all outputs. - -The same works to for HLS output. - -If you want to use different resolution, you should apply them in order from biggest to smallest. Use the biggest resolution in config under `processing:` and the smaller ones in `output_params:`. diff --git a/docs/output.md b/docs/output.md index 117461a4..6733cb9f 100644 --- a/docs/output.md +++ b/docs/output.md @@ -6,44 +6,51 @@ The streaming output can be used for ever kind of classical streaming. For examp ### Multiple Outputs: -If you would like to have multiple outputs, you can add you settings to `output_param:` like: +ffplayout supports multiple outputs in a way, that it can output the same stream to multiple targets with different encoding settings. -```yam -... -output_param: >- +For example you want to stream different resolutions, you could apply this output parameters: + +```YAML ... - -flags +global_header - -f flv rtmp://127.0.0.1/live/big - -s 1280x720 - -c:v libx264 - -crf 23 - -x264-params keyint=50:min-keyint=25:scenecut=-1 - -maxrate 2400k - -bufsize 4800k - -preset medium - -profile:v Main - -level 3.1 - -c:a aac - -ar 44100 - -b:a 128k - -flags +global_header - -f flv rtmp://127.0.0.1/live/middle - -s 640x360 - -c:v libx264 - -crf 23 - -x264-params keyint=50:min-keyint=25:scenecut=-1 - -maxrate 600k - -bufsize 1200k - -preset medium - -profile:v Main - -level 3.1 - -c:a aac - -ar 44100 - -b:a 128k - -flags +global_header - -f flv rtmp://127.0.0.1/live/small + + output_param: >- + -c:v libx264 + -crf 23 + -x264-params keyint=50:min-keyint=25:scenecut=-1 + -maxrate 1300k + -bufsize 2600k + -preset faster + -tune zerolatency + -profile:v Main + -level 3.1 + -c:a aac + -ar 44100 + -b:a 128k + -flags +global_header + -f flv rtmp://example.org/live/stream-high + -s 960x540 + -c:v libx264 + -crf 23 + -x264-params keyint=50:min-keyint=25:scenecut=-1 + -maxrate 1000k + -bufsize 1800k + -preset faster + -tune zerolatency + -profile:v Main + -level 3.1 + -c:a aac + -ar 44100 + -b:a 128k + -flags +global_header + -f flv rtmp://example.org/live/stream-low ``` +When you are using the text overlay filter, it will apply to all outputs. + +The same works to for HLS output. + +If you want to use different resolution, you should apply them in order from biggest to smallest. Use the biggest resolution in config under `processing:` and the smaller ones in `output_params:`. + ## Desktop In desktop mode you will get your picture on screen. For this you need a desktop system, theoretical all platforms should work here. ffplayout will need for that **ffplay**. From 704053e5994f17abe0d858fbb48377238cfbb119 Mon Sep 17 00:00:00 2001 From: jb-alvarado Date: Thu, 2 Jun 2022 18:21:43 +0200 Subject: [PATCH 2/3] simplify remote playlist code (WIP) --- src/input/playlist.rs | 69 +++++++++--------------------------- src/utils/json_serializer.rs | 59 ++++++++++++------------------ src/utils/mod.rs | 32 +++++++++++++++-- 3 files changed, 68 insertions(+), 92 deletions(-) diff --git a/src/input/playlist.rs b/src/input/playlist.rs index e7ba909c..3393a28a 100644 --- a/src/input/playlist.rs +++ b/src/input/playlist.rs @@ -80,42 +80,9 @@ impl CurrentProgram { } else if Path::new(&self.json_path.clone().unwrap()).is_file() || is_remote(&self.json_path.clone().unwrap()) { - let mut is_playlist_changed = false; + let mod_time = modified_time(&self.json_path.clone().unwrap()); - if is_remote(&self.json_path.clone().unwrap()) { - let resp = reqwest::blocking::Client::new() - .head(self.json_path.clone().unwrap()) - .send(); - match resp { - Ok(resp) => { - if resp.status().is_success() { - match resp.headers().get(reqwest::header::LAST_MODIFIED) { - Some(last_modified) => { - if !last_modified - .to_str() - .unwrap() - .eq(&self.json_mod.clone().unwrap()) - { - is_playlist_changed = true - } - } - None => {} - } - } - } - Err(_) => self.on_check_update_error(), - }; - } else { - let mod_time = modified_time(&self.json_path.clone().unwrap()); - - if let Some(m) = mod_time { - if !m.to_string().eq(&self.json_mod.clone().unwrap()) { - is_playlist_changed = true; - } - } - } - - if is_playlist_changed { + if self.json_mod != mod_time { // when playlist has changed, reload it info!( "Reload playlist {}", @@ -137,27 +104,23 @@ impl CurrentProgram { self.index.fetch_add(1, Ordering::SeqCst); } } else { - self.on_check_update_error(); + error!( + "Playlist {} not exists!", + self.json_path.clone().unwrap() + ); + let mut media = Media::new(0, String::new(), false); + media.begin = Some(get_sec()); + media.duration = DUMMY_LEN; + media.out = DUMMY_LEN; + + self.json_path = None; + *self.nodes.lock().unwrap() = vec![media.clone()]; + self.current_node = media; + self.playout_stat.list_init.store(true, Ordering::SeqCst); + self.index.store(0, Ordering::SeqCst); } } - fn on_check_update_error(&mut self) { - error!( - "Playlist {} not exists!", - self.json_path.clone().unwrap() - ); - let mut media = Media::new(0, String::new(), false); - media.begin = Some(get_sec()); - media.duration = DUMMY_LEN; - media.out = DUMMY_LEN; - - self.json_path = None; - *self.nodes.lock().unwrap() = vec![media.clone()]; - self.current_node = media; - self.playout_stat.list_init.store(true, Ordering::SeqCst); - self.index.store(0, Ordering::SeqCst); - } - // Check if day is past and it is time for a new playlist. fn check_for_next_playlist(&mut self) { let current_time = get_sec(); diff --git a/src/utils/json_serializer.rs b/src/utils/json_serializer.rs index 369d82b1..fbc2931e 100644 --- a/src/utils/json_serializer.rs +++ b/src/utils/json_serializer.rs @@ -8,7 +8,9 @@ use std::{ use simplelog::*; -use crate::utils::{get_date, is_remote, modified_time, validate_playlist, GlobalConfig, Media}; +use crate::utils::{ + get_date, is_remote, modified_time, time_from_header, validate_playlist, GlobalConfig, Media, +}; pub const DUMMY_LEN: f64 = 60.0; @@ -39,7 +41,7 @@ impl Playlist { date, start_sec: Some(start), current_file: None, - modified: Some(String::new()), + modified: None, program: vec![media], } } @@ -59,7 +61,7 @@ pub fn read_json( let mut start_sec = config.playlist.start_sec.unwrap(); let date = get_date(seek, start_sec, next_start); - if playlist_path.is_dir() { + if playlist_path.is_dir() || is_remote(&config.playlist.path) { let d: Vec<&str> = date.split('-').collect(); playlist_path = playlist_path .join(d[0]) @@ -75,48 +77,36 @@ pub fn read_json( current_file = p } - let mut playlist: Playlist; + let mut playlist = Playlist::new(date, start_sec); if is_remote(¤t_file) { - let resp = reqwest::blocking::Client::new().get(¤t_file).send(); + let response = reqwest::blocking::Client::new().get(¤t_file).send(); - match resp { - Ok(resp) => { - if resp.status().is_success() { - info!("Read Remote Playlist: {current_file}"); + if let Ok(resp) = response { + if resp.status().is_success() { + info!("Read Remote Playlist: {current_file}"); - let headers = resp.headers().clone(); - let body = resp.text().unwrap(); + let headers = resp.headers().clone(); + if let Ok(body) = resp.text() { playlist = - serde_json::from_str(&body).expect("Could not read json playlist str."); + serde_json::from_str(&body).expect("Could't read remote json playlist."); - match headers.get(reqwest::header::LAST_MODIFIED) { - Some(t) => { - playlist.modified = Some(t.to_str().unwrap().to_string()); - } - None => {} + if let Some(time) = time_from_header(&headers) { + playlist.modified = Some(time.to_string()); } - } else { - error!( - "Get Remote Playlist {current_file} not success!: {}", - resp.text().unwrap() - ); - - return Playlist::new(date, start_sec); } } - Err(e) => { - error!("Remote Playlist {current_file}: {}", e); + } else { + error!("Can't read remote playlist {current_file}"); - return Playlist::new(date, start_sec); - } - }; + return playlist; + } } else { if !playlist_path.is_file() { error!("Playlist {current_file} not exists!"); - return Playlist::new(date, start_sec); + return playlist; } info!("Read Playlist: {current_file}"); @@ -126,13 +116,8 @@ pub fn read_json( .write(false) .open(¤t_file) .expect("Could not open json playlist file."); - playlist = serde_json::from_reader(f).expect("Could not read json playlist file."); - - let modify = modified_time(¤t_file); - - if let Some(modi) = modify { - playlist.modified = Some(modi.to_string()); - } + playlist = serde_json::from_reader(f).expect("Could't read json playlist file."); + playlist.modified = modified_time(¤t_file); } playlist.current_file = Some(current_file); diff --git a/src/utils/mod.rs b/src/utils/mod.rs index c7a2b8a1..5e01c6f7 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -8,7 +8,9 @@ use std::{ use chrono::{prelude::*, Duration}; use ffprobe::{ffprobe, Format, Stream}; +use jsonrpc_http_server::hyper::HeaderMap; use regex::Regex; +use reqwest::header; use serde::{Deserialize, Serialize}; use serde_json::json; use simplelog::*; @@ -233,11 +235,37 @@ pub fn get_date(seek: bool, start: f64, next_start: f64) -> String { local.format("%Y-%m-%d").to_string() } +pub fn time_from_header(headers: &HeaderMap) -> Option> { + if let Some(time) = headers.get(header::LAST_MODIFIED) { + if let Ok(t) = time.to_str() { + let time = DateTime::parse_from_rfc2822(t); + let date_time: DateTime = time.unwrap().into(); + return Some(date_time); + }; + } + + None +} + /// Get file modification time. -pub fn modified_time(path: &str) -> Option> { +pub fn modified_time(path: &str) -> Option { + if is_remote(path) { + let response = reqwest::blocking::Client::new().head(path).send(); + + if let Ok(resp) = response { + if resp.status().is_success() { + if let Some(time) = time_from_header(resp.headers()) { + return Some(time.to_string()); + } + } + } + + return None; + } + if let Ok(time) = metadata(path).and_then(|metadata| metadata.modified()) { let date_time: DateTime = time.into(); - return Some(date_time); + return Some(date_time.to_string()); } None From 63d88fbf97cf9d517f399784f9195c878a4c299a Mon Sep 17 00:00:00 2001 From: jb-alvarado Date: Thu, 2 Jun 2022 22:08:02 +0200 Subject: [PATCH 3/3] simplify json reader, fix playlist update check --- src/input/playlist.rs | 35 +++++--- src/utils/json_serializer.rs | 155 ++++++++++++++++++----------------- 2 files changed, 101 insertions(+), 89 deletions(-) diff --git a/src/input/playlist.rs b/src/input/playlist.rs index 3393a28a..c67e71ac 100644 --- a/src/input/playlist.rs +++ b/src/input/playlist.rs @@ -42,6 +42,10 @@ impl CurrentProgram { ) -> Self { let json = read_json(config, None, is_terminated.clone(), true, 0.0); + if let Some(file) = &json.current_file { + info!("Read Playlist: {}", file); + } + *current_list.lock().unwrap() = json.program; *playout_stat.current_date.lock().unwrap() = json.date.clone(); @@ -74,6 +78,10 @@ impl CurrentProgram { if self.json_path.is_none() { let json = read_json(&self.config, None, self.is_terminated.clone(), seek, 0.0); + if let Some(file) = &json.current_file { + info!("Read Playlist: {}", file); + } + self.json_path = json.current_file; self.json_mod = json.modified; *self.nodes.lock().unwrap() = json.program; @@ -100,8 +108,7 @@ impl CurrentProgram { self.json_mod = json.modified; *self.nodes.lock().unwrap() = json.program; - self.get_current_clip(); - self.index.fetch_add(1, Ordering::SeqCst); + self.playout_stat.list_init.store(true, Ordering::SeqCst); } } else { error!( @@ -151,6 +158,10 @@ impl CurrentProgram { next_start, ); + if let Some(file) = &json.current_file { + info!("Read Playlist: {}", file); + } + let data = json!({ "time_shift": 0.0, "date": json.date, @@ -251,9 +262,9 @@ impl Iterator for CurrentProgram { type Item = Media; fn next(&mut self) -> Option { - if self.playout_stat.list_init.load(Ordering::SeqCst) { - self.check_update(true); + self.check_update(self.playout_stat.list_init.load(Ordering::SeqCst)); + if self.playout_stat.list_init.load(Ordering::SeqCst) { if self.json_path.is_some() { self.init_clip(); } @@ -294,9 +305,9 @@ impl Iterator for CurrentProgram { media.out = duration; self.current_node = gen_source(&self.config, media); - self.nodes.lock().unwrap().push(self.current_node.clone()); - self.index - .store(self.nodes.lock().unwrap().len(), Ordering::SeqCst); + let mut nodes = self.nodes.lock().unwrap(); + nodes.push(self.current_node.clone()); + self.index.store(nodes.len(), Ordering::SeqCst); } } @@ -309,23 +320,23 @@ impl Iterator for CurrentProgram { self.check_for_next_playlist(); let mut is_last = false; let index = self.index.load(Ordering::SeqCst); + let nodes = self.nodes.lock().unwrap(); - if index == self.nodes.lock().unwrap().len() - 1 { + if index == nodes.len() - 1 { is_last = true } self.current_node = timed_source( - self.nodes.lock().unwrap()[index].clone(), + nodes[index].clone(), &self.config, is_last, &self.playout_stat, ); + + drop(nodes); self.last_next_ad(); self.index.fetch_add(1, Ordering::SeqCst); - // update playlist should happen after current clip, - // to prevent unknown behaviors. - self.check_update(false); Some(self.current_node.clone()) } else { let last_playlist = self.json_path.clone(); diff --git a/src/utils/json_serializer.rs b/src/utils/json_serializer.rs index fbc2931e..5408c6c4 100644 --- a/src/utils/json_serializer.rs +++ b/src/utils/json_serializer.rs @@ -47,79 +47,7 @@ impl Playlist { } } -/// Read json playlist file, fills Playlist struct and set some extra values, -/// which we need to process. -pub fn read_json( - config: &GlobalConfig, - path: Option, - is_terminated: Arc, - seek: bool, - next_start: f64, -) -> Playlist { - let config_clone = config.clone(); - let mut playlist_path = Path::new(&config.playlist.path).to_owned(); - let mut start_sec = config.playlist.start_sec.unwrap(); - let date = get_date(seek, start_sec, next_start); - - if playlist_path.is_dir() || is_remote(&config.playlist.path) { - let d: Vec<&str> = date.split('-').collect(); - playlist_path = playlist_path - .join(d[0]) - .join(d[1]) - .join(date.clone()) - .with_extension("json"); - } - - let mut current_file: String = playlist_path.as_path().display().to_string(); - - if let Some(p) = path { - playlist_path = Path::new(&p).to_owned(); - current_file = p - } - - let mut playlist = Playlist::new(date, start_sec); - - if is_remote(¤t_file) { - let response = reqwest::blocking::Client::new().get(¤t_file).send(); - - if let Ok(resp) = response { - if resp.status().is_success() { - info!("Read Remote Playlist: {current_file}"); - - let headers = resp.headers().clone(); - - if let Ok(body) = resp.text() { - playlist = - serde_json::from_str(&body).expect("Could't read remote json playlist."); - - if let Some(time) = time_from_header(&headers) { - playlist.modified = Some(time.to_string()); - } - } - } - } else { - error!("Can't read remote playlist {current_file}"); - - return playlist; - } - } else { - if !playlist_path.is_file() { - error!("Playlist {current_file} not exists!"); - - return playlist; - } - - info!("Read Playlist: {current_file}"); - - let f = File::options() - .read(true) - .write(false) - .open(¤t_file) - .expect("Could not open json playlist file."); - playlist = serde_json::from_reader(f).expect("Could't read json playlist file."); - playlist.modified = modified_time(¤t_file); - } - +fn set_defaults(mut playlist: Playlist, current_file: String, mut start_sec: f64) -> Playlist { playlist.current_file = Some(current_file); playlist.start_sec = Some(start_sec); @@ -135,9 +63,82 @@ pub fn read_json( start_sec += item.out - item.seek; } - let list_clone = playlist.clone(); - - thread::spawn(move || validate_playlist(list_clone, is_terminated, config_clone)); - playlist } + +/// Read json playlist file, fills Playlist struct and set some extra values, +/// which we need to process. +pub fn read_json( + config: &GlobalConfig, + path: Option, + is_terminated: Arc, + seek: bool, + next_start: f64, +) -> Playlist { + let config_clone = config.clone(); + let mut playlist_path = Path::new(&config.playlist.path).to_owned(); + let start_sec = config.playlist.start_sec.unwrap(); + let date = get_date(seek, start_sec, next_start); + + if playlist_path.is_dir() || is_remote(&config.playlist.path) { + let d: Vec<&str> = date.split('-').collect(); + playlist_path = playlist_path + .join(d[0]) + .join(d[1]) + .join(date.clone()) + .with_extension("json"); + } + + let mut current_file = playlist_path.as_path().display().to_string(); + + if let Some(p) = path { + playlist_path = Path::new(&p).to_owned(); + current_file = p + } + + if is_remote(¤t_file) { + let response = reqwest::blocking::Client::new().get(¤t_file).send(); + + if let Ok(resp) = response { + if resp.status().is_success() { + let headers = resp.headers().clone(); + + if let Ok(body) = resp.text() { + let mut playlist: Playlist = + serde_json::from_str(&body).expect("Could't read remote json playlist."); + + if let Some(time) = time_from_header(&headers) { + playlist.modified = Some(time.to_string()); + } + + let list_clone = playlist.clone(); + + thread::spawn(move || { + validate_playlist(list_clone, is_terminated, config_clone) + }); + + return set_defaults(playlist, current_file, start_sec); + } + } + } + } else if playlist_path.is_file() { + let f = File::options() + .read(true) + .write(false) + .open(¤t_file) + .expect("Could not open json playlist file."); + let mut playlist: Playlist = + serde_json::from_reader(f).expect("Could't read json playlist file."); + playlist.modified = modified_time(¤t_file); + + let list_clone = playlist.clone(); + + thread::spawn(move || validate_playlist(list_clone, is_terminated, config_clone)); + + return set_defaults(playlist, current_file, start_sec); + } + + error!("Read playlist error, on: {current_file}!"); + + Playlist::new(date, start_sec) +}