fix time shift from forward and backward jumping, optimize code

This commit is contained in:
jb-alvarado 2022-04-08 12:18:29 +02:00
parent c2eb9ae746
commit 15a74155f6
19 changed files with 357 additions and 329 deletions

2
Cargo.lock generated
View File

@ -167,7 +167,7 @@ dependencies = [
[[package]]
name = "ffplayout-rs"
version = "0.9.0"
version = "0.9.1"
dependencies = [
"chrono",
"clap",

View File

@ -1,6 +1,6 @@
[package]
name = "ffplayout-rs"
version = "0.9.0"
version = "0.9.1"
edition = "2021"
[dependencies]

View File

@ -47,6 +47,10 @@ Requirements
-----
- RAM and CPU depends on video resolution, minimum 4 threads and 3GB RAM for 720p are recommend
- **ffmpeg** v4.2+ and **ffprobe** (**ffplay** if you want to play on desktop)
- if you want to overlay text, ffmpeg needs to have **libzmq**
-----
JSON Playlist Example
-----
@ -107,6 +111,8 @@ But be careful with it, better test it multiple times!
More informations in [Wiki](https://github.com/ffplayout/ffplayout_engine/wiki/Remote-URL-Source)
-----
HLS output
-----
@ -127,6 +133,8 @@ out:
-hls_segment_filename /var/www/html/live/stream-%09d.ts /var/www/html/live/stream.m3u8
```
-----
JSON RPC
-----
@ -142,9 +150,11 @@ At the moment this comments are possible:
```Bash
'{"jsonrpc": "2.0", "method": "player", "params":{"media":"current"}, "id":1 }' # get infos about current clip
'{"jsonrpc": "2.0", "method": "player", "params":{"control":"next"}, "id":1 }' # jump to next clip
'{"jsonrpc": "2.0", "method": "player", "params":{"control":"back"}, "id":1 }' # jump to last clip
'{"jsonrpc": "2.0", "method": "player", "params":{"control":"reset"}, "id":1 }' # reset playlist to old state
'{"jsonrpc": "2.0", "method": "player", "params":{"media":"next"}, "id":2 }' # get infos about next clip
'{"jsonrpc": "2.0", "method": "player", "params":{"media":"last"}, "id":3 }' # get infos about last clip
'{"jsonrpc": "2.0", "method": "player", "params":{"control":"next"}, "id":4 }' # jump to next clip
'{"jsonrpc": "2.0", "method": "player", "params":{"control":"back"}, "id":5 }' # jump to last clip
'{"jsonrpc": "2.0", "method": "player", "params":{"control":"reset"}, "id":6 }' # reset playlist to old state
```
@ -170,9 +180,10 @@ Output from `{"media":"current"}` show:
},
"id": 1
}
```
When you are in playlist mode and jumping forward or backwards in time, the time shift will be saved so the playlist is still in sync. But have in mind, that then maybe your playlist gets to short. When you are not resetting the state, it will reset on the next day automatically.
-----
Installation
-----

View File

@ -24,19 +24,19 @@ impl Filters {
}
}
fn add_filter(&mut self, filter: String, codec_type: String) {
match codec_type.as_str() {
fn add_filter(&mut self, filter: &str, codec_type: &str) {
match codec_type {
"audio" => match &self.audio_chain {
Some(ac) => {
if filter.starts_with(";") || filter.starts_with("[") {
self.audio_chain = Some(format!("{}{}", ac, filter))
self.audio_chain = Some(format!("{ac}{filter}"))
} else {
self.audio_chain = Some(format!("{},{}", ac, filter))
self.audio_chain = Some(format!("{ac},{filter}"))
}
}
None => {
if filter.contains("aevalsrc") || filter.contains("anoisesrc") {
self.audio_chain = Some(filter);
self.audio_chain = Some(filter.to_string());
} else {
self.audio_chain =
Some(format!("[{}]{filter}", self.audio_map.clone().unwrap()));
@ -47,9 +47,9 @@ impl Filters {
"video" => match &self.video_chain {
Some(vc) => {
if filter.starts_with(";") || filter.starts_with("[") {
self.video_chain = Some(format!("{}{}", vc, filter))
self.video_chain = Some(format!("{vc}{filter}"))
} else {
self.video_chain = Some(format!("{},{}", vc, filter))
self.video_chain = Some(format!("{vc},{filter}"))
}
}
None => {
@ -63,8 +63,10 @@ impl Filters {
}
fn deinterlace(field_order: Option<String>, chain: &mut Filters) {
if field_order.is_some() && field_order.unwrap() != "progressive".to_string() {
chain.add_filter("yadif=0:-1:0".into(), "video".into())
if let Some(order) = field_order {
if &order != "progressive" {
chain.add_filter("yadif=0:-1:0", "video")
}
}
}
@ -72,21 +74,19 @@ fn pad(aspect: f64, chain: &mut Filters, config: &GlobalConfig) {
if !is_close(aspect, config.processing.aspect, 0.03) {
if aspect < config.processing.aspect {
chain.add_filter(
format!(
&format!(
"pad=ih*{}/{}/sar:ih:(ow-iw)/2:(oh-ih)/2",
config.processing.width, config.processing.height
)
.into(),
"video".into(),
),
"video",
)
} else if aspect > config.processing.aspect {
chain.add_filter(
format!(
&format!(
"pad=iw:iw*{}/{}/sar:(ow-iw)/2:(oh-ih)/2",
config.processing.width, config.processing.height
)
.into(),
"video".into(),
),
"video",
)
}
}
@ -95,8 +95,8 @@ fn pad(aspect: f64, chain: &mut Filters, config: &GlobalConfig) {
fn fps(fps: f64, chain: &mut Filters, config: &GlobalConfig) {
if fps != config.processing.fps {
chain.add_filter(
format!("fps={}", config.processing.fps).into(),
"video".into(),
&format!("fps={}", config.processing.fps),
"video",
)
}
}
@ -104,37 +104,36 @@ fn fps(fps: f64, chain: &mut Filters, config: &GlobalConfig) {
fn scale(width: i64, height: i64, aspect: f64, chain: &mut Filters, config: &GlobalConfig) {
if width != config.processing.width || height != config.processing.height {
chain.add_filter(
format!(
&format!(
"scale={}:{}",
config.processing.width, config.processing.height
)
.into(),
"video".into(),
),
"video",
)
}
if !is_close(aspect, config.processing.aspect, 0.03) {
chain.add_filter(
format!("setdar=dar={}", config.processing.aspect).into(),
"video".into(),
&format!("setdar=dar={}", config.processing.aspect),
"video"
)
}
}
fn fade(node: &mut Media, chain: &mut Filters, codec_type: String) {
let mut t = String::new();
fn fade(node: &mut Media, chain: &mut Filters, codec_type: &str) {
let mut t = "";
if codec_type == "audio".to_string() {
t = "a".to_string()
if codec_type == "audio" {
t = "a"
}
if node.seek > 0.0 {
chain.add_filter(format!("{t}fade=in:st=0:d=0.5"), codec_type.clone())
chain.add_filter(&format!("{t}fade=in:st=0:d=0.5"), codec_type)
}
if node.out != node.duration && node.out - node.seek - 1.0 > 0.0 {
chain.add_filter(
format!("{t}fade=out:st={}:d=1.0", (node.out - node.seek - 1.0)).into(),
&format!("{t}fade=out:st={}:d=1.0", (node.out - node.seek - 1.0)),
codec_type,
)
}
@ -168,7 +167,7 @@ fn overlay(node: &mut Media, chain: &mut Filters, config: &GlobalConfig) {
logo_chain
.push_str(format!("[l];[v][l]{}:shortest=1", config.processing.logo_filter).as_str());
chain.add_filter(logo_chain, "video".into());
chain.add_filter(&logo_chain, "video");
}
}
@ -182,11 +181,11 @@ fn extend_video(node: &mut Media, chain: &mut Filters) {
if node.out - node.seek > duration_float - node.seek + 0.1 {
chain.add_filter(
format!(
&format!(
"tpad=stop_mode=add:stop_duration={}",
(node.out - node.seek) - (duration_float - node.seek)
),
"video".into(),
"video",
)
}
}
@ -199,7 +198,7 @@ fn add_text(node: &mut Media, chain: &mut Filters, config: &GlobalConfig) {
if config.text.add_text && config.text.over_pre {
let filter = v_drawtext::filter_node(node);
chain.add_filter(filter, "video".into());
chain.add_filter(&filter, "video");
if let Some(filters) = &chain.video_chain {
for (i, f) in filters.split(",").enumerate() {
@ -220,7 +219,7 @@ fn add_audio(node: &mut Media, chain: &mut Filters) {
"aevalsrc=0:channel_layout=stereo:duration={}:sample_rate=48000",
node.out - node.seek
);
chain.add_filter(audio, "audio".into());
chain.add_filter(&audio, "audio");
}
}
@ -234,8 +233,8 @@ fn extend_audio(node: &mut Media, chain: &mut Filters) {
if node.out - node.seek > duration_float - node.seek + 0.1 {
chain.add_filter(
format!("apad=whole_dur={}", node.out - node.seek),
"audio".into(),
&format!("apad=whole_dur={}", node.out - node.seek),
"audio",
)
}
}
@ -254,15 +253,15 @@ fn add_loudnorm(node: &mut Media, chain: &mut Filters, config: &GlobalConfig) {
config.processing.loud_i, config.processing.loud_tp, config.processing.loud_lra
);
chain.add_filter(loud_filter, "audio".into());
chain.add_filter(&loud_filter, "audio");
}
}
fn audio_volume(chain: &mut Filters, config: &GlobalConfig) {
if config.processing.volume != 1.0 {
chain.add_filter(
format!("volume={}", config.processing.volume),
"audio".into(),
&format!("volume={}", config.processing.volume),
"audio",
)
}
}
@ -289,17 +288,17 @@ fn realtime_filter(
node: &mut Media,
chain: &mut Filters,
config: &GlobalConfig,
codec_type: String,
codec_type: &str,
) {
// this realtime filter is important for HLS output to stay in sync
let mut t = String::new();
let mut t = "";
if codec_type == "audio".to_string() {
t = "a".to_string()
if codec_type == "audio" {
t = "a"
}
if config.out.mode.to_lowercase() == "hls".to_string() {
if &config.out.mode.to_lowercase() == "hls" {
let mut speed_filter = format!("{t}realtime=speed=1");
let (delta, _) = get_delta(&node.begin.unwrap());
let duration = node.out - node.seek;
@ -312,7 +311,7 @@ fn realtime_filter(
}
}
chain.add_filter(speed_filter, codec_type);
chain.add_filter(&speed_filter, codec_type);
}
}

View File

@ -30,9 +30,8 @@ pub fn filter_node(node: &mut Media) -> String {
filter = format!("drawtext=text='{escape}':{}{font}", config.text.style)
} else {
filter = format!(
"zmq=b=tcp\\\\://'{}',drawtext=text=''{}",
config.text.bind_address.replace(":", "\\:"),
font
"zmq=b=tcp\\\\://'{}',drawtext=text=''{font}",
config.text.bind_address.replace(":", "\\:")
)
}
}

View File

@ -152,14 +152,14 @@ pub async fn file_worker(
let media = Media::new(index, new_path.display().to_string(), false);
sources.lock().unwrap().push(media);
info!("Create new file: {:?}", new_path);
info!("Create new file: {new_path:?}");
}
Remove(old_path) => {
sources
.lock()
.unwrap()
.retain(|x| x.source != old_path.display().to_string());
info!("Remove file: {:?}", old_path);
info!("Remove file: {old_path:?}");
}
Rename(old_path, new_path) => {
let index = sources
@ -172,7 +172,7 @@ pub async fn file_worker(
let media = Media::new(index, new_path.display().to_string(), false);
sources.lock().unwrap()[index] = media;
info!("Rename file: {:?} to {:?}", old_path, new_path);
info!("Rename file: {old_path:?} to {new_path:?}");
}
_ => (),
}

View File

@ -109,8 +109,8 @@ pub async fn ingest_server(
.spawn()
{
Err(e) => {
error!("couldn't spawn ingest server: {}", e);
panic!("couldn't spawn ingest server: {}", e)
error!("couldn't spawn ingest server: {e}");
panic!("couldn't spawn ingest server: {e}")
}
Ok(proc) => proc,
};
@ -120,7 +120,7 @@ pub async fn ingest_server(
rt_handle.spawn(stderr_reader(
server_proc.stderr.take().unwrap(),
"Server".to_string(),
"Server",
));
let mut ingest_reader = BufReader::new(server_proc.stdout.take().unwrap());
@ -130,7 +130,7 @@ pub async fn ingest_server(
let bytes_len = match ingest_reader.read(&mut buffer[..]) {
Ok(length) => length,
Err(e) => {
debug!("Ingest server read {:?}", e);
debug!("Ingest server read {e:?}");
break;
}
@ -143,7 +143,7 @@ pub async fn ingest_server(
if bytes_len > 0 {
if let Err(e) = ingest_sender.send((bytes_len, buffer)) {
error!("Ingest server write error: {:?}", e);
error!("Ingest server write error: {e:?}");
*proc_control.is_terminated.lock().unwrap() = true;
break;
@ -158,11 +158,11 @@ pub async fn ingest_server(
sleep(Duration::from_secs(1));
if let Err(e) = server_proc.kill() {
error!("Ingest server {:?}", e)
error!("Ingest server {e:?}")
};
if let Err(e) = server_proc.wait() {
error!("Ingest server {:?}", e)
error!("Ingest server {e:?}")
};
}

View File

@ -2,8 +2,6 @@ use std::{
fs,
path::Path,
sync::{Arc, Mutex},
thread::sleep,
time::Duration,
};
use serde_json::json;
@ -180,15 +178,13 @@ impl CurrentProgram {
let index = *self.index.lock().unwrap();
let current_list = self.nodes.lock().unwrap();
if index + 1 < current_list.len()
&& current_list[index + 1].category == "advertisement".to_string()
{
if index + 1 < current_list.len() && &current_list[index + 1].category == "advertisement" {
self.current_node.next_ad = Some(true);
}
if index > 0
&& index < current_list.len()
&& current_list[index - 1].category == "advertisement".to_string()
&& &current_list[index - 1].category == "advertisement"
{
self.current_node.last_ad = Some(true);
}
@ -207,7 +203,8 @@ impl CurrentProgram {
fn get_current_clip(&mut self) {
let mut time_sec = self.get_current_time();
if *self.playout_stat.current_date.lock().unwrap() == *self.playout_stat.date.lock().unwrap()
if *self.playout_stat.current_date.lock().unwrap()
== *self.playout_stat.date.lock().unwrap()
&& *self.playout_stat.time_shift.lock().unwrap() != 0.0
{
let shift = *self.playout_stat.time_shift.lock().unwrap();
@ -355,8 +352,7 @@ impl Iterator for CurrentProgram {
}
*self.index.lock().unwrap() = 0;
self.current_node =
gen_source(self.nodes.lock().unwrap()[0].clone());
self.current_node = gen_source(self.nodes.lock().unwrap()[0].clone());
self.last_next_ad();
self.current_node.last_ad = last_ad;
@ -383,11 +379,12 @@ fn timed_source(
new_node.process = Some(false);
if config.playlist.length.contains(":") {
let time_shift = playout_stat.time_shift.lock().unwrap();
if *playout_stat.current_date.lock().unwrap() == *playout_stat.date.lock().unwrap()
&& *playout_stat.time_shift.lock().unwrap() != 0.0
&& *time_shift != 0.0
{
sleep(Duration::from_millis(300));
shifted_delta = delta - *playout_stat.time_shift.lock().unwrap();
shifted_delta = delta - *time_shift;
debug!("Delta: <yellow>{shifted_delta:.3}</>, shifted: <yellow>{delta:.3}</>");
} else {
@ -483,10 +480,7 @@ fn handle_list_end(mut node: Media, total_delta: f64) -> Media {
if out > node.duration {
out = node.duration
} else {
warn!(
"Clip length is not in time, new duration is: <yellow>{:.2}</>",
total_delta
)
warn!("Clip length is not in time, new duration is: <yellow>{total_delta:.2}</>")
}
if node.duration > total_delta && total_delta > 1.0 && node.duration - node.seek >= total_delta
@ -509,10 +503,7 @@ fn handle_list_end(mut node: Media, total_delta: f64) -> Media {
return node;
} else {
error!(
"Playlist is not long enough: <yellow>{:.2}</> seconds needed",
total_delta
);
error!("Playlist is not long enough: <yellow>{total_delta:.2}</> seconds needed");
}
node.process = Some(true);

View File

@ -14,13 +14,15 @@ use tokio::runtime::Builder;
mod filter;
mod input;
mod output;
mod rpc;
mod utils;
use crate::output::{player, write_hls};
use crate::utils::{
init_config, init_logging, run_rpc, validate_ffmpeg, GlobalConfig, PlayerControl,
PlayoutStatus, ProcessControl,
init_config, init_logging, validate_ffmpeg, GlobalConfig, PlayerControl, PlayoutStatus,
ProcessControl,
};
use rpc::json_rpc_server;
#[derive(Serialize, Deserialize)]
struct StatusData {
@ -66,14 +68,14 @@ fn main() {
validate_ffmpeg();
if config.rpc_server.enable {
rt_handle.spawn(run_rpc(
rt_handle.spawn(json_rpc_server(
play_control.clone(),
playout_stat.clone(),
proc_control.clone(),
));
}
if config.out.mode.to_lowercase() == "hls".to_string() {
if &config.out.mode.to_lowercase() == "hls" {
write_hls(rt_handle, play_control, playout_stat, proc_control);
} else {
player(rt_handle, play_control, playout_stat, proc_control);

View File

@ -8,7 +8,7 @@ use simplelog::*;
use crate::filter::v_drawtext;
use crate::utils::{GlobalConfig, Media};
pub fn output(log_format: String) -> process::Child {
pub fn output(log_format: &str) -> process::Child {
let config = GlobalConfig::global();
let mut enc_filter: Vec<String> = vec![];
@ -17,7 +17,7 @@ pub fn output(log_format: String) -> process::Child {
"-hide_banner",
"-nostats",
"-v",
log_format.as_str(),
log_format,
"-i",
"pipe:0",
];
@ -44,8 +44,8 @@ pub fn output(log_format: String) -> process::Child {
.spawn()
{
Err(e) => {
error!("couldn't spawn encoder process: {}", e);
panic!("couldn't spawn encoder process: {}", e)
error!("couldn't spawn encoder process: {e}");
panic!("couldn't spawn encoder process: {e}")
}
Ok(proc) => proc,
};

View File

@ -85,15 +85,15 @@ pub fn write_hls(
.spawn()
{
Err(e) => {
error!("couldn't spawn decoder process: {}", e);
panic!("couldn't spawn decoder process: {}", e)
error!("couldn't spawn decoder process: {e}");
panic!("couldn't spawn decoder process: {e}")
}
Ok(proc) => proc,
};
rt_handle.spawn(stderr_reader(
dec_proc.stderr.take().unwrap(),
"Writer".to_string(),
"Writer",
));
if let Err(e) = dec_proc.wait() {

View File

@ -103,8 +103,8 @@ pub fn player(
);
let mut enc_proc = match config.out.mode.as_str() {
"desktop" => desktop::output(ff_log_format.clone()),
"stream" => stream::output(ff_log_format.clone()),
"desktop" => desktop::output(&ff_log_format),
"stream" => stream::output(&ff_log_format),
_ => panic!("Output mode doesn't exists!"),
};
@ -112,7 +112,7 @@ pub fn player(
rt_handle.spawn(stderr_reader(
enc_proc.stderr.take().unwrap(),
"Encoder".to_string(),
"Encoder",
));
let (ingest_sender, ingest_receiver): (
@ -179,7 +179,7 @@ pub fn player(
rt_handle.spawn(stderr_reader(
dec_proc.stderr.take().unwrap(),
"Decoder".to_string(),
"Decoder",
));
if let Ok(dec_terminator) = dec_proc.terminator() {
@ -229,7 +229,7 @@ pub fn player(
let dec_bytes_len = match dec_reader.read(&mut buffer[..]) {
Ok(length) => length,
Err(e) => {
error!("Reading error from decoder: {:?}", e);
error!("Reading error from decoder: {e:?}");
break 'source_iter;
}
@ -237,7 +237,7 @@ pub fn player(
if dec_bytes_len > 0 {
if let Err(e) = enc_writer.write(&buffer[..dec_bytes_len]) {
error!("Encoder write error: {:?}", e);
error!("Encoder write error: {e:?}");
break 'source_iter;
};
@ -248,17 +248,17 @@ pub fn player(
}
if let Err(e) = dec_proc.wait() {
panic!("Decoder error: {:?}", e)
panic!("Decoder error: {e:?}")
};
}
sleep(Duration::from_secs(1));
if let Err(e) = enc_proc.kill() {
panic!("Encoder error: {:?}", e)
panic!("Encoder error: {e:?}")
};
if let Err(e) = enc_proc.wait() {
panic!("Encoder error: {:?}", e)
panic!("Encoder error: {e:?}")
};
}

View File

@ -8,7 +8,7 @@ use simplelog::*;
use crate::filter::v_drawtext;
use crate::utils::{GlobalConfig, Media};
pub fn output(log_format: String) -> process::Child {
pub fn output(log_format: &str) -> process::Child {
let config = GlobalConfig::global();
let mut enc_filter: Vec<String> = vec![];
let mut preview: Vec<&str> = vec![];
@ -19,7 +19,7 @@ pub fn output(log_format: String) -> process::Child {
"-hide_banner",
"-nostats",
"-v",
log_format.as_str(),
log_format,
"-re",
"-i",
"pipe:0",
@ -60,8 +60,8 @@ pub fn output(log_format: String) -> process::Child {
.spawn()
{
Err(e) => {
error!("couldn't spawn encoder process: {}", e);
panic!("couldn't spawn encoder process: {}", e)
error!("couldn't spawn encoder process: {e}");
panic!("couldn't spawn encoder process: {e}")
}
Ok(proc) => proc,
};

211
src/rpc/mod.rs Normal file
View File

@ -0,0 +1,211 @@
use std::sync::{Arc, Mutex};
use jsonrpc_http_server::jsonrpc_core::{IoHandler, Params, Value};
use jsonrpc_http_server::{
hyper, AccessControlAllowOrigin, DomainsValidation, Response, RestApi, ServerBuilder,
};
use process_control::Terminator;
use serde_json::{json, Map};
use simplelog::*;
use crate::utils::{
get_delta, get_sec, sec_to_time, write_status, GlobalConfig, Media, PlayerControl,
PlayoutStatus, ProcessControl,
};
fn get_media_map(media: Media) -> Value {
json!({
"seek": media.seek,
"out": media.out,
"duration": media.duration,
"category": media.category,
"source": media.source,
})
}
fn get_data_map(config: &GlobalConfig, media: Media) -> Map<String, Value> {
let mut data_map = Map::new();
let begin = media.begin.unwrap_or(0.0);
data_map.insert("play_mode".to_string(), json!(config.processing.mode));
data_map.insert("index".to_string(), json!(media.index));
data_map.insert("start_sec".to_string(), json!(begin));
if begin > 0.0 {
let played_time = get_sec() - begin;
let remaining_time = media.out - played_time;
data_map.insert("start_time".to_string(), json!(sec_to_time(begin)));
data_map.insert("played_sec".to_string(), json!(played_time));
data_map.insert("remaining_sec".to_string(), json!(remaining_time));
}
data_map.insert("current_media".to_string(), get_media_map(media));
data_map
}
fn kill_decoder(terminator: Arc<Mutex<Option<Terminator>>>) -> Result<(), String> {
match &*terminator.lock().unwrap() {
Some(decoder) => unsafe {
if let Err(e) = decoder.terminate() {
return Err(format!("Terminate decoder: {e}"));
}
},
None => return Err("No decoder terminator found".to_string()),
}
Ok(())
}
pub async fn json_rpc_server(
play_control: PlayerControl,
playout_stat: PlayoutStatus,
proc_control: ProcessControl,
) {
let config = GlobalConfig::global();
let mut io = IoHandler::default();
let play = play_control.clone();
let proc = proc_control.clone();
io.add_sync_method("player", move |params: Params| {
if let Params::Map(map) = params {
let mut time_shift = playout_stat.time_shift.lock().unwrap();
let current_date = playout_stat.current_date.lock().unwrap().clone();
let mut date = playout_stat.date.lock().unwrap();
if map.contains_key("control") && &map["control"] == "next" {
if let Ok(_) = kill_decoder(proc.decoder_term.clone()) {
info!("Move to next clip");
let index = *play.index.lock().unwrap();
if index < play.current_list.lock().unwrap().len() {
let mut data_map = Map::new();
let mut media = play.current_list.lock().unwrap()[index].clone();
media.add_probe();
let (delta, _) = get_delta(&media.begin.unwrap_or(0.0));
*time_shift = delta;
*date = current_date.clone();
write_status(&current_date, delta);
data_map.insert("operation".to_string(), json!("move_to_next"));
data_map.insert("shifted_seconds".to_string(), json!(delta));
data_map.insert("media".to_string(), get_media_map(media));
return Ok(Value::Object(data_map));
}
}
return Ok(Value::String("Move failed".to_string()));
}
if map.contains_key("control") && &map["control"] == "back" {
if let Ok(_) = kill_decoder(proc.decoder_term.clone()) {
let index = *play.index.lock().unwrap();
if index > 1 && play.current_list.lock().unwrap().len() > 1 {
info!("Move to last clip");
let mut data_map = Map::new();
let mut media = play.current_list.lock().unwrap()[index - 2].clone();
*play.index.lock().unwrap() = index - 2;
media.add_probe();
let (delta, _) = get_delta(&media.begin.unwrap_or(0.0));
*time_shift = delta;
*date = current_date.clone();
write_status(&current_date, delta);
data_map.insert("operation".to_string(), json!("move_to_last"));
data_map.insert("shifted_seconds".to_string(), json!(delta));
data_map.insert("media".to_string(), get_media_map(media));
return Ok(Value::Object(data_map));
}
}
return Ok(Value::String("Move failed".to_string()));
}
if map.contains_key("control") && &map["control"] == "reset" {
if let Ok(_) = kill_decoder(proc.decoder_term.clone()) {
info!("Reset playout to original state");
let mut data_map = Map::new();
*time_shift = 0.0;
*date = current_date.clone();
*playout_stat.list_init.lock().unwrap() = true;
write_status(&current_date, 0.0);
data_map.insert("operation".to_string(), json!("reset_playout_state"));
return Ok(Value::Object(data_map));
}
return Ok(Value::String("Reset playout state failed".to_string()));
}
if map.contains_key("media") && &map["media"] == "current" {
if let Some(media) = play.current_media.lock().unwrap().clone() {
let data_map = get_data_map(config, media);
return Ok(Value::Object(data_map));
};
}
if map.contains_key("media") && &map["media"] == "next" {
let index = *play.index.lock().unwrap();
if index < play.current_list.lock().unwrap().len() {
let media = play.current_list.lock().unwrap()[index].clone();
let data_map = get_data_map(config, media);
return Ok(Value::Object(data_map));
}
return Ok(Value::String("There is no next clip".to_string()));
}
if map.contains_key("media") && &map["media"] == "last" {
let index = *play.index.lock().unwrap();
if index > 1 && index - 2 < play.current_list.lock().unwrap().len() {
let media = play.current_list.lock().unwrap()[index - 2].clone();
let data_map = get_data_map(config, media);
return Ok(Value::Object(data_map));
}
return Ok(Value::String("There is no last clip".to_string()));
}
}
Ok(Value::String("No, or wrong parameters set!".to_string()))
});
let server = ServerBuilder::new(io)
.cors(DomainsValidation::AllowOnly(vec![
AccessControlAllowOrigin::Null,
]))
.request_middleware(|request: hyper::Request<hyper::Body>| {
if request.headers().contains_key("authorization")
&& request.headers()["authorization"] == config.rpc_server.authorization
{
if request.uri() == "/status" {
println!("{:?}", request.headers().contains_key("authorization"));
Response::ok("Server running OK.").into()
} else {
request.into()
}
} else {
Response::bad_request("No authorization header or valid key found!").into()
}
})
.rest_api(RestApi::Secure)
.start_http(&config.rpc_server.address.parse().unwrap())
.expect("Unable to start RPC server");
*proc_control.rpc_handle.lock().unwrap() = Some(server.close_handle().clone());
server.wait();
}

View File

@ -148,8 +148,7 @@ impl GlobalConfig {
Ok(file) => file,
Err(err) => {
println!(
"{:?} doesn't exists!\n{}\n\nSystem error: {err}",
config_path,
"{config_path:?} doesn't exists!\n{}\n\nSystem error: {err}",
"Put \"ffplayout.yml\" in \"/etc/playout/\" or beside the executable!"
);
process::exit(0x0100);
@ -182,11 +181,11 @@ impl GlobalConfig {
"-g",
"1",
"-b:v",
format!("{}k", bitrate).as_str(),
format!("{bitrate}k").as_str(),
"-minrate",
format!("{}k", bitrate).as_str(),
format!("{bitrate}k").as_str(),
"-maxrate",
format!("{}k", bitrate).as_str(),
format!("{bitrate}k").as_str(),
"-bufsize",
format!("{}k", bitrate / 2).as_str(),
]

View File

@ -67,12 +67,12 @@ pub fn read_json(
}
if !playlist_path.is_file() {
error!("Playlist <b><magenta>{}</></b> not exists!", current_file);
error!("Playlist <b><magenta>{current_file}</></b> not exists!");
return Playlist::new(date, start_sec);
}
info!("Read Playlist: <b><magenta>{}</></b>", &current_file);
info!("Read Playlist: <b><magenta>{current_file}</></b>");
let f = File::options()
.read(true)

View File

@ -29,7 +29,7 @@ fn send_mail(msg: String) {
.to(config.mail.recipient.parse().unwrap())
.subject(config.mail.subject.clone())
.header(header::ContentType::TEXT_PLAIN)
.body(clean_string(msg.clone()))
.body(clean_string(&msg))
.unwrap();
let credentials = Credentials::new(
@ -109,7 +109,7 @@ impl Log for LogMailer {
fn log(&self, record: &Record<'_>) {
if self.enabled(record.metadata()) {
let local: DateTime<Local> = Local::now();
let time_stamp: String = local.format("[%Y-%m-%d %H:%M:%S%.3f]").to_string();
let time_stamp = local.format("[%Y-%m-%d %H:%M:%S%.3f]");
let level = record.level().to_string().to_uppercase();
let rec = record.args().to_string();
let full_line: String = format!("{time_stamp} [{level: >5}] {rec}");
@ -135,10 +135,10 @@ impl SharedLogger for LogMailer {
}
}
fn clean_string(text: String) -> String {
fn clean_string(text: &str) -> String {
let regex: Regex = Regex::new(r"\x1b\[[0-9;]*[mGKF]").unwrap();
regex.replace_all(text.as_str(), "").to_string()
regex.replace_all(text, "").to_string()
}
pub fn init_logging(

View File

@ -16,23 +16,21 @@ use std::{
use jsonrpc_http_server::CloseHandle;
use process_control::Terminator;
use regex::Regex;
use simplelog::*;
use serde::{Deserialize, Serialize};
use serde_json::json;
use simplelog::*;
mod arg_parse;
mod config;
pub mod json_reader;
mod json_validate;
mod logging;
mod rpc_server;
pub use arg_parse::get_args;
pub use config::{init_config, GlobalConfig};
pub use json_reader::{read_json, Playlist, DUMMY_LEN};
pub use json_validate::validate_playlist;
pub use logging::init_logging;
pub use rpc_server::run_rpc;
use crate::filter::filter_chains;
@ -75,7 +73,7 @@ impl ProcessControl {
if let Some(server) = &*self.server_term.lock().unwrap() {
unsafe {
if let Err(e) = server.terminate() {
error!("Ingest server: {:?}", e);
error!("Ingest server: {e:?}");
}
}
};
@ -83,7 +81,7 @@ impl ProcessControl {
if let Some(decoder) = &*self.decoder_term.lock().unwrap() {
unsafe {
if let Err(e) = decoder.terminate() {
error!("Decoder: {:?}", e);
error!("Decoder: {e:?}");
}
}
};
@ -91,7 +89,7 @@ impl ProcessControl {
if let Some(encoder) = &*self.encoder_term.lock().unwrap() {
unsafe {
if let Err(e) = encoder.terminate() {
error!("Encoder: {:?}", e);
error!("Encoder: {e:?}");
}
}
};
@ -238,7 +236,7 @@ impl MediaProbe {
}
_ => {
error!("No codec type found for stream: {:?}", &stream)
error!("No codec type found for stream: {stream:?}")
}
}
}
@ -257,11 +255,9 @@ impl MediaProbe {
},
}
}
Err(err) => {
Err(e) => {
error!(
"Can't read source '{}' with ffprobe, source is probably damaged! Error is: {:?}",
input,
err
"Can't read source '{input}' with ffprobe, source is probably damaged! Error is: {e:?}"
);
MediaProbe {
@ -274,7 +270,7 @@ impl MediaProbe {
}
}
pub fn write_status(date: String, shift: f64) {
pub fn write_status(date: &str, shift: f64) {
let config = GlobalConfig::global();
let stat_file = config.general.stat_file.clone();
@ -283,10 +279,10 @@ pub fn write_status(date: String, shift: f64) {
"date": date,
});
let status_data: String = serde_json::to_string(&data)
.expect("Serialize status data failed");
fs::write(stat_file, &status_data)
.expect("Unable to write file");
let status_data: String = serde_json::to_string(&data).expect("Serialize status data failed");
if let Err(e) = fs::write(stat_file, &status_data) {
error!("Unable to write file: {e:?}")
};
}
// pub fn get_timestamp() -> i64 {
@ -316,7 +312,7 @@ pub fn get_date(seek: bool, start: f64, next_start: f64) -> String {
local.format("%Y-%m-%d").to_string()
}
pub fn modified_time(path: &String) -> Option<DateTime<Local>> {
pub fn modified_time(path: &str) -> Option<DateTime<Local>> {
let metadata = metadata(path).unwrap();
if let Ok(time) = metadata.modified() {
@ -327,8 +323,8 @@ pub fn modified_time(path: &String) -> Option<DateTime<Local>> {
None
}
pub fn time_to_sec(time_str: &String) -> f64 {
if ["now", "", "none"].contains(&time_str.as_str()) || !time_str.contains(":") {
pub fn time_to_sec(time_str: &str) -> f64 {
if ["now", "", "none"].contains(&time_str) || !time_str.contains(":") {
return get_sec();
}
@ -427,10 +423,7 @@ pub fn seek_and_length(src: String, seek: f64, out: f64, duration: f64) -> Vec<S
let mut source_cmd: Vec<String> = vec![];
if seek > 0.0 {
source_cmd.append(&mut vec![
"-ss".to_string(),
format!("{}", seek).to_string(),
])
source_cmd.append(&mut vec!["-ss".to_string(), format!("{seek}")])
}
source_cmd.append(&mut vec!["-i".to_string(), src]);
@ -445,12 +438,12 @@ pub fn seek_and_length(src: String, seek: f64, out: f64, duration: f64) -> Vec<S
source_cmd
}
pub async fn stderr_reader(std_errors: ChildStderr, suffix: String) -> Result<(), Error> {
pub async fn stderr_reader(std_errors: ChildStderr, suffix: &str) -> Result<(), Error> {
// read ffmpeg stderr decoder, encoder and server instance
// and log the output
fn format_line(line: String, level: String) -> String {
line.replace(&format!("[{}] ", level), "")
fn format_line(line: String, level: &str) -> String {
line.replace(&format!("[{level: >5}] "), "")
}
let buffer = BufReader::new(std_errors);
@ -459,14 +452,11 @@ pub async fn stderr_reader(std_errors: ChildStderr, suffix: String) -> Result<()
let line = line?;
if line.contains("[info]") {
info!(
"<bright black>[{suffix}]</> {}",
format_line(line, "info".to_string())
)
info!("<bright black>[{suffix}]</> {}", format_line(line, "info"))
} else if line.contains("[warning]") {
warn!(
"<bright black>[{suffix}]</> {}",
format_line(line, "warning".to_string())
format_line(line, "warning")
)
} else {
if suffix != "server"
@ -475,7 +465,7 @@ pub async fn stderr_reader(std_errors: ChildStderr, suffix: String) -> Result<()
{
error!(
"<bright black>[{suffix}]</> {}",
format_line(line.clone(), "error".to_string())
format_line(line.clone(), "error")
);
}
}
@ -491,10 +481,10 @@ fn is_in_system(name: &str) {
.spawn()
{
if let Err(e) = proc.wait() {
error!("{:?}", e)
error!("{e:?}")
};
} else {
error!("{} not found on system!", name);
error!("{name} not found on system!");
exit(0x0100);
}
}

View File

@ -1,174 +0,0 @@
use std::sync::{Arc, Mutex};
use jsonrpc_http_server::jsonrpc_core::{IoHandler, Params, Value};
use jsonrpc_http_server::{
hyper, AccessControlAllowOrigin, DomainsValidation, Response, RestApi, ServerBuilder,
};
use process_control::Terminator;
use serde_json::{json, Map};
use simplelog::*;
use crate::utils::{
get_delta, get_sec, sec_to_time, write_status, GlobalConfig, Media, PlayerControl,
PlayoutStatus, ProcessControl,
};
fn get_media_map(media: Media) -> Value {
json!({
"seek": media.seek,
"out": media.out,
"duration": media.duration,
"category": media.category,
"source": media.source,
})
}
fn get_data_map(config: &GlobalConfig, media: Media) -> Map<String, Value> {
let mut data_map = Map::new();
let begin = media.begin.unwrap_or(0.0);
data_map.insert("play_mode".to_string(), json!(config.processing.mode));
data_map.insert("index".to_string(), json!(media.index));
data_map.insert("start_sec".to_string(), json!(begin));
if begin > 0.0 {
let played_time = get_sec() - begin;
let remaining_time = media.out - played_time;
data_map.insert("start_time".to_string(), json!(sec_to_time(begin)));
data_map.insert("played_sec".to_string(), json!(played_time));
data_map.insert("remaining_sec".to_string(), json!(remaining_time));
}
data_map.insert("current_media".to_string(), get_media_map(media));
data_map
}
fn kill_decoder(terminator: Arc<Mutex<Option<Terminator>>>) -> Result<(), String> {
match &*terminator.lock().unwrap() {
Some(decoder) => unsafe {
if let Err(e) = decoder.terminate() {
return Err(format!("Terminate decoder: {e}"));
}
},
None => return Err("No decoder terminator found".to_string()),
}
Ok(())
}
pub async fn run_rpc(
play_control: PlayerControl,
playout_stat: PlayoutStatus,
proc_control: ProcessControl,
) {
let config = GlobalConfig::global();
let mut io = IoHandler::default();
let play = play_control.clone();
let proc = proc_control.clone();
io.add_sync_method("player", move |params: Params| {
match params {
Params::Map(map) => {
if map.contains_key("control") && map["control"] == "next".to_string() {
if let Ok(_) = kill_decoder(proc.decoder_term.clone()) {
info!("Move to next clip");
let index = *play.index.lock().unwrap();
if index < play.current_list.lock().unwrap().len() {
let mut data_map = Map::new();
let mut media = play.current_list.lock().unwrap()[index].clone();
media.add_probe();
let (delta, _) = get_delta(&media.begin.unwrap_or(0.0));
*playout_stat.time_shift.lock().unwrap() = delta;
write_status(playout_stat.current_date.lock().unwrap().clone(), delta);
data_map.insert("operation".to_string(), json!("Move to next clip"));
data_map.insert("media".to_string(), get_media_map(media));
return Ok(Value::Object(data_map));
}
}
return Ok(Value::String("Move failed".to_string()));
}
if map.contains_key("control") && map["control"] == "back".to_string() {
if let Ok(_) = kill_decoder(proc.decoder_term.clone()) {
let index = *play.index.lock().unwrap();
if index > 1 && play.current_list.lock().unwrap().len() > 1 {
info!("Move to last clip");
let mut data_map = Map::new();
let mut media = play.current_list.lock().unwrap()[index - 2].clone();
*play.index.lock().unwrap() = index - 2;
media.add_probe();
let (delta, _) = get_delta(&media.begin.unwrap_or(0.0));
*playout_stat.time_shift.lock().unwrap() = delta;
write_status(playout_stat.current_date.lock().unwrap().clone(), delta);
data_map.insert("operation".to_string(), json!("Move to last clip"));
data_map.insert("media".to_string(), get_media_map(media));
return Ok(Value::Object(data_map));
}
}
return Ok(Value::String("Move failed".to_string()));
}
if map.contains_key("control") && map["control"] == "reset".to_string() {
*playout_stat.date.lock().unwrap() = String::new();
*playout_stat.time_shift.lock().unwrap() = 0.0;
*playout_stat.list_init.lock().unwrap() = true;
write_status(String::new().clone(), 0.0);
if let Err(e) = kill_decoder(proc.decoder_term.clone()) {
error!("{e}");
}
return Ok(Value::String("Reset playout to original state".to_string()));
}
if map.contains_key("media") && map["media"] == "current".to_string() {
if let Some(media) = play.current_media.lock().unwrap().clone() {
let data_map = get_data_map(config, media);
return Ok(Value::Object(data_map));
};
}
}
_ => return Ok(Value::String(format!("Wrong parameters..."))),
}
Ok(Value::String(format!("no parameters set...")))
});
let server = ServerBuilder::new(io)
.cors(DomainsValidation::AllowOnly(vec![
AccessControlAllowOrigin::Null,
]))
.request_middleware(|request: hyper::Request<hyper::Body>| {
if request.headers().contains_key("authorization")
&& request.headers()["authorization"] == config.rpc_server.authorization
{
if request.uri() == "/status" {
println!("{:?}", request.headers().contains_key("authorization"));
Response::ok("Server running OK.").into()
} else {
request.into()
}
} else {
Response::bad_request("No authorization header or valid key found!").into()
}
})
.rest_api(RestApi::Secure)
.start_http(&config.rpc_server.address.parse().unwrap())
.expect("Unable to start RPC server");
*proc_control.rpc_handle.lock().unwrap() = Some(server.close_handle().clone());
server.wait();
}