Merge pull request #134 from jb-alvarado/master

simplify json reader, fix playlist update check
This commit is contained in:
jb-alvarado 2022-06-02 22:12:01 +02:00 committed by GitHub
commit ed749b8b2e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 192 additions and 244 deletions

View File

@ -42,7 +42,6 @@ The main purpose of ffplayout is to provide a 24/7 broadcasting solution that pl
- **HLS** - **HLS**
- JSON RPC server, for getting infos about current playing and controlling - JSON RPC server, for getting infos about current playing and controlling
- [live ingest](/docs/live_ingest.md) - [live ingest](/docs/live_ingest.md)
- [multiple outputs](/docs/multiple_outputs.md)
Requirements Requirements
----- -----

View File

@ -1,46 +0,0 @@
### Multiple Outputs
ffplayout supports multiple outputs in a way, that it can output the same stream to multiple targets with different encoding settings.
For example you want to stream different resolutions, you could apply this output parameters:
```YAML
...
output_param: >-
-c:v libx264
-crf 23
-x264-params keyint=50:min-keyint=25:scenecut=-1
-maxrate 1300k
-bufsize 2600k
-preset faster
-tune zerolatency
-profile:v Main
-level 3.1
-c:a aac
-ar 44100
-b:a 128k
-flags +global_header
-f flv rtmp://example.org/live/stream-high
-s 960x540
-c:v libx264
-crf 23
-x264-params keyint=50:min-keyint=25:scenecut=-1
-maxrate 1000k
-bufsize 1800k
-preset faster
-tune zerolatency
-profile:v Main
-level 3.1
-c:a aac
-ar 44100
-b:a 128k
-flags +global_header
-f flv rtmp://example.org/live/stream-low
```
When you are using the text overlay filter, it will apply to all outputs.
The same works to for HLS output.
If you want to use different resolution, you should apply them in order from biggest to smallest. Use the biggest resolution in config under `processing:` and the smaller ones in `output_params:`.

View File

@ -6,44 +6,51 @@ The streaming output can be used for ever kind of classical streaming. For examp
### Multiple Outputs: ### Multiple Outputs:
If you would like to have multiple outputs, you can add you settings to `output_param:` like: ffplayout supports multiple outputs in a way, that it can output the same stream to multiple targets with different encoding settings.
```yam For example you want to stream different resolutions, you could apply this output parameters:
...
output_param: >- ```YAML
... ...
-flags +global_header
-f flv rtmp://127.0.0.1/live/big output_param: >-
-s 1280x720 -c:v libx264
-c:v libx264 -crf 23
-crf 23 -x264-params keyint=50:min-keyint=25:scenecut=-1
-x264-params keyint=50:min-keyint=25:scenecut=-1 -maxrate 1300k
-maxrate 2400k -bufsize 2600k
-bufsize 4800k -preset faster
-preset medium -tune zerolatency
-profile:v Main -profile:v Main
-level 3.1 -level 3.1
-c:a aac -c:a aac
-ar 44100 -ar 44100
-b:a 128k -b:a 128k
-flags +global_header -flags +global_header
-f flv rtmp://127.0.0.1/live/middle -f flv rtmp://example.org/live/stream-high
-s 640x360 -s 960x540
-c:v libx264 -c:v libx264
-crf 23 -crf 23
-x264-params keyint=50:min-keyint=25:scenecut=-1 -x264-params keyint=50:min-keyint=25:scenecut=-1
-maxrate 600k -maxrate 1000k
-bufsize 1200k -bufsize 1800k
-preset medium -preset faster
-profile:v Main -tune zerolatency
-level 3.1 -profile:v Main
-c:a aac -level 3.1
-ar 44100 -c:a aac
-b:a 128k -ar 44100
-flags +global_header -b:a 128k
-f flv rtmp://127.0.0.1/live/small -flags +global_header
-f flv rtmp://example.org/live/stream-low
``` ```
When you are using the text overlay filter, it will apply to all outputs.
The same works to for HLS output.
If you want to use different resolution, you should apply them in order from biggest to smallest. Use the biggest resolution in config under `processing:` and the smaller ones in `output_params:`.
## Desktop ## Desktop
In desktop mode you will get your picture on screen. For this you need a desktop system, theoretical all platforms should work here. ffplayout will need for that **ffplay**. In desktop mode you will get your picture on screen. For this you need a desktop system, theoretical all platforms should work here. ffplayout will need for that **ffplay**.

View File

@ -42,6 +42,10 @@ impl CurrentProgram {
) -> Self { ) -> Self {
let json = read_json(config, None, is_terminated.clone(), true, 0.0); let json = read_json(config, None, is_terminated.clone(), true, 0.0);
if let Some(file) = &json.current_file {
info!("Read Playlist: <b><magenta>{}</></b>", file);
}
*current_list.lock().unwrap() = json.program; *current_list.lock().unwrap() = json.program;
*playout_stat.current_date.lock().unwrap() = json.date.clone(); *playout_stat.current_date.lock().unwrap() = json.date.clone();
@ -74,48 +78,19 @@ impl CurrentProgram {
if self.json_path.is_none() { if self.json_path.is_none() {
let json = read_json(&self.config, None, self.is_terminated.clone(), seek, 0.0); let json = read_json(&self.config, None, self.is_terminated.clone(), seek, 0.0);
if let Some(file) = &json.current_file {
info!("Read Playlist: <b><magenta>{}</></b>", file);
}
self.json_path = json.current_file; self.json_path = json.current_file;
self.json_mod = json.modified; self.json_mod = json.modified;
*self.nodes.lock().unwrap() = json.program; *self.nodes.lock().unwrap() = json.program;
} else if Path::new(&self.json_path.clone().unwrap()).is_file() } else if Path::new(&self.json_path.clone().unwrap()).is_file()
|| is_remote(&self.json_path.clone().unwrap()) || is_remote(&self.json_path.clone().unwrap())
{ {
let mut is_playlist_changed = false; let mod_time = modified_time(&self.json_path.clone().unwrap());
if is_remote(&self.json_path.clone().unwrap()) { if self.json_mod != mod_time {
let resp = reqwest::blocking::Client::new()
.head(self.json_path.clone().unwrap())
.send();
match resp {
Ok(resp) => {
if resp.status().is_success() {
match resp.headers().get(reqwest::header::LAST_MODIFIED) {
Some(last_modified) => {
if !last_modified
.to_str()
.unwrap()
.eq(&self.json_mod.clone().unwrap())
{
is_playlist_changed = true
}
}
None => {}
}
}
}
Err(_) => self.on_check_update_error(),
};
} else {
let mod_time = modified_time(&self.json_path.clone().unwrap());
if let Some(m) = mod_time {
if !m.to_string().eq(&self.json_mod.clone().unwrap()) {
is_playlist_changed = true;
}
}
}
if is_playlist_changed {
// when playlist has changed, reload it // when playlist has changed, reload it
info!( info!(
"Reload playlist <b><magenta>{}</></b>", "Reload playlist <b><magenta>{}</></b>",
@ -133,31 +108,26 @@ impl CurrentProgram {
self.json_mod = json.modified; self.json_mod = json.modified;
*self.nodes.lock().unwrap() = json.program; *self.nodes.lock().unwrap() = json.program;
self.get_current_clip(); self.playout_stat.list_init.store(true, Ordering::SeqCst);
self.index.fetch_add(1, Ordering::SeqCst);
} }
} else { } else {
self.on_check_update_error(); error!(
"Playlist <b><magenta>{}</></b> not exists!",
self.json_path.clone().unwrap()
);
let mut media = Media::new(0, String::new(), false);
media.begin = Some(get_sec());
media.duration = DUMMY_LEN;
media.out = DUMMY_LEN;
self.json_path = None;
*self.nodes.lock().unwrap() = vec![media.clone()];
self.current_node = media;
self.playout_stat.list_init.store(true, Ordering::SeqCst);
self.index.store(0, Ordering::SeqCst);
} }
} }
fn on_check_update_error(&mut self) {
error!(
"Playlist <b><magenta>{}</></b> not exists!",
self.json_path.clone().unwrap()
);
let mut media = Media::new(0, String::new(), false);
media.begin = Some(get_sec());
media.duration = DUMMY_LEN;
media.out = DUMMY_LEN;
self.json_path = None;
*self.nodes.lock().unwrap() = vec![media.clone()];
self.current_node = media;
self.playout_stat.list_init.store(true, Ordering::SeqCst);
self.index.store(0, Ordering::SeqCst);
}
// Check if day is past and it is time for a new playlist. // Check if day is past and it is time for a new playlist.
fn check_for_next_playlist(&mut self) { fn check_for_next_playlist(&mut self) {
let current_time = get_sec(); let current_time = get_sec();
@ -188,6 +158,10 @@ impl CurrentProgram {
next_start, next_start,
); );
if let Some(file) = &json.current_file {
info!("Read Playlist: <b><magenta>{}</></b>", file);
}
let data = json!({ let data = json!({
"time_shift": 0.0, "time_shift": 0.0,
"date": json.date, "date": json.date,
@ -288,9 +262,9 @@ impl Iterator for CurrentProgram {
type Item = Media; type Item = Media;
fn next(&mut self) -> Option<Self::Item> { fn next(&mut self) -> Option<Self::Item> {
if self.playout_stat.list_init.load(Ordering::SeqCst) { self.check_update(self.playout_stat.list_init.load(Ordering::SeqCst));
self.check_update(true);
if self.playout_stat.list_init.load(Ordering::SeqCst) {
if self.json_path.is_some() { if self.json_path.is_some() {
self.init_clip(); self.init_clip();
} }
@ -331,9 +305,9 @@ impl Iterator for CurrentProgram {
media.out = duration; media.out = duration;
self.current_node = gen_source(&self.config, media); self.current_node = gen_source(&self.config, media);
self.nodes.lock().unwrap().push(self.current_node.clone()); let mut nodes = self.nodes.lock().unwrap();
self.index nodes.push(self.current_node.clone());
.store(self.nodes.lock().unwrap().len(), Ordering::SeqCst); self.index.store(nodes.len(), Ordering::SeqCst);
} }
} }
@ -346,23 +320,23 @@ impl Iterator for CurrentProgram {
self.check_for_next_playlist(); self.check_for_next_playlist();
let mut is_last = false; let mut is_last = false;
let index = self.index.load(Ordering::SeqCst); let index = self.index.load(Ordering::SeqCst);
let nodes = self.nodes.lock().unwrap();
if index == self.nodes.lock().unwrap().len() - 1 { if index == nodes.len() - 1 {
is_last = true is_last = true
} }
self.current_node = timed_source( self.current_node = timed_source(
self.nodes.lock().unwrap()[index].clone(), nodes[index].clone(),
&self.config, &self.config,
is_last, is_last,
&self.playout_stat, &self.playout_stat,
); );
drop(nodes);
self.last_next_ad(); self.last_next_ad();
self.index.fetch_add(1, Ordering::SeqCst); self.index.fetch_add(1, Ordering::SeqCst);
// update playlist should happen after current clip,
// to prevent unknown behaviors.
self.check_update(false);
Some(self.current_node.clone()) Some(self.current_node.clone())
} else { } else {
let last_playlist = self.json_path.clone(); let last_playlist = self.json_path.clone();

View File

@ -8,7 +8,9 @@ use std::{
use simplelog::*; use simplelog::*;
use crate::utils::{get_date, is_remote, modified_time, validate_playlist, GlobalConfig, Media}; use crate::utils::{
get_date, is_remote, modified_time, time_from_header, validate_playlist, GlobalConfig, Media,
};
pub const DUMMY_LEN: f64 = 60.0; pub const DUMMY_LEN: f64 = 60.0;
@ -39,102 +41,13 @@ impl Playlist {
date, date,
start_sec: Some(start), start_sec: Some(start),
current_file: None, current_file: None,
modified: Some(String::new()), modified: None,
program: vec![media], program: vec![media],
} }
} }
} }
/// Read json playlist file, fills Playlist struct and set some extra values, fn set_defaults(mut playlist: Playlist, current_file: String, mut start_sec: f64) -> Playlist {
/// which we need to process.
pub fn read_json(
config: &GlobalConfig,
path: Option<String>,
is_terminated: Arc<AtomicBool>,
seek: bool,
next_start: f64,
) -> Playlist {
let config_clone = config.clone();
let mut playlist_path = Path::new(&config.playlist.path).to_owned();
let mut start_sec = config.playlist.start_sec.unwrap();
let date = get_date(seek, start_sec, next_start);
if playlist_path.is_dir() {
let d: Vec<&str> = date.split('-').collect();
playlist_path = playlist_path
.join(d[0])
.join(d[1])
.join(date.clone())
.with_extension("json");
}
let mut current_file: String = playlist_path.as_path().display().to_string();
if let Some(p) = path {
playlist_path = Path::new(&p).to_owned();
current_file = p
}
let mut playlist: Playlist;
if is_remote(&current_file) {
let resp = reqwest::blocking::Client::new().get(&current_file).send();
match resp {
Ok(resp) => {
if resp.status().is_success() {
info!("Read Remote Playlist: <b><magenta>{current_file}</></b>");
let headers = resp.headers().clone();
let body = resp.text().unwrap();
playlist =
serde_json::from_str(&body).expect("Could not read json playlist str.");
match headers.get(reqwest::header::LAST_MODIFIED) {
Some(t) => {
playlist.modified = Some(t.to_str().unwrap().to_string());
}
None => {}
}
} else {
error!(
"Get Remote Playlist <b><magenta>{current_file}</></b> not success!: {}",
resp.text().unwrap()
);
return Playlist::new(date, start_sec);
}
}
Err(e) => {
error!("Remote Playlist <b><magenta>{current_file}</></b>: {}", e);
return Playlist::new(date, start_sec);
}
};
} else {
if !playlist_path.is_file() {
error!("Playlist <b><magenta>{current_file}</></b> not exists!");
return Playlist::new(date, start_sec);
}
info!("Read Playlist: <b><magenta>{current_file}</></b>");
let f = File::options()
.read(true)
.write(false)
.open(&current_file)
.expect("Could not open json playlist file.");
playlist = serde_json::from_reader(f).expect("Could not read json playlist file.");
let modify = modified_time(&current_file);
if let Some(modi) = modify {
playlist.modified = Some(modi.to_string());
}
}
playlist.current_file = Some(current_file); playlist.current_file = Some(current_file);
playlist.start_sec = Some(start_sec); playlist.start_sec = Some(start_sec);
@ -150,9 +63,82 @@ pub fn read_json(
start_sec += item.out - item.seek; start_sec += item.out - item.seek;
} }
let list_clone = playlist.clone();
thread::spawn(move || validate_playlist(list_clone, is_terminated, config_clone));
playlist playlist
} }
/// Read json playlist file, fills Playlist struct and set some extra values,
/// which we need to process.
pub fn read_json(
config: &GlobalConfig,
path: Option<String>,
is_terminated: Arc<AtomicBool>,
seek: bool,
next_start: f64,
) -> Playlist {
let config_clone = config.clone();
let mut playlist_path = Path::new(&config.playlist.path).to_owned();
let start_sec = config.playlist.start_sec.unwrap();
let date = get_date(seek, start_sec, next_start);
if playlist_path.is_dir() || is_remote(&config.playlist.path) {
let d: Vec<&str> = date.split('-').collect();
playlist_path = playlist_path
.join(d[0])
.join(d[1])
.join(date.clone())
.with_extension("json");
}
let mut current_file = playlist_path.as_path().display().to_string();
if let Some(p) = path {
playlist_path = Path::new(&p).to_owned();
current_file = p
}
if is_remote(&current_file) {
let response = reqwest::blocking::Client::new().get(&current_file).send();
if let Ok(resp) = response {
if resp.status().is_success() {
let headers = resp.headers().clone();
if let Ok(body) = resp.text() {
let mut playlist: Playlist =
serde_json::from_str(&body).expect("Could't read remote json playlist.");
if let Some(time) = time_from_header(&headers) {
playlist.modified = Some(time.to_string());
}
let list_clone = playlist.clone();
thread::spawn(move || {
validate_playlist(list_clone, is_terminated, config_clone)
});
return set_defaults(playlist, current_file, start_sec);
}
}
}
} else if playlist_path.is_file() {
let f = File::options()
.read(true)
.write(false)
.open(&current_file)
.expect("Could not open json playlist file.");
let mut playlist: Playlist =
serde_json::from_reader(f).expect("Could't read json playlist file.");
playlist.modified = modified_time(&current_file);
let list_clone = playlist.clone();
thread::spawn(move || validate_playlist(list_clone, is_terminated, config_clone));
return set_defaults(playlist, current_file, start_sec);
}
error!("Read playlist error, on: <b><magenta>{current_file}</></b>!");
Playlist::new(date, start_sec)
}

View File

@ -8,7 +8,9 @@ use std::{
use chrono::{prelude::*, Duration}; use chrono::{prelude::*, Duration};
use ffprobe::{ffprobe, Format, Stream}; use ffprobe::{ffprobe, Format, Stream};
use jsonrpc_http_server::hyper::HeaderMap;
use regex::Regex; use regex::Regex;
use reqwest::header;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json::json; use serde_json::json;
use simplelog::*; use simplelog::*;
@ -233,11 +235,37 @@ pub fn get_date(seek: bool, start: f64, next_start: f64) -> String {
local.format("%Y-%m-%d").to_string() local.format("%Y-%m-%d").to_string()
} }
pub fn time_from_header(headers: &HeaderMap) -> Option<DateTime<Local>> {
if let Some(time) = headers.get(header::LAST_MODIFIED) {
if let Ok(t) = time.to_str() {
let time = DateTime::parse_from_rfc2822(t);
let date_time: DateTime<Local> = time.unwrap().into();
return Some(date_time);
};
}
None
}
/// Get file modification time. /// Get file modification time.
pub fn modified_time(path: &str) -> Option<DateTime<Local>> { pub fn modified_time(path: &str) -> Option<String> {
if is_remote(path) {
let response = reqwest::blocking::Client::new().head(path).send();
if let Ok(resp) = response {
if resp.status().is_success() {
if let Some(time) = time_from_header(resp.headers()) {
return Some(time.to_string());
}
}
}
return None;
}
if let Ok(time) = metadata(path).and_then(|metadata| metadata.modified()) { if let Ok(time) = metadata(path).and_then(|metadata| metadata.modified()) {
let date_time: DateTime<Local> = time.into(); let date_time: DateTime<Local> = time.into();
return Some(date_time); return Some(date_time.to_string());
} }
None None