switch jsonrpc-http-server to tiny_http, update clap to next major version

The jsonrpc-http-server don't get any updates anymore and some libs are already unmaintained. Migration to the new jsonrpsee makes not so much sense, because its features are not needed. For our needs tiny_http is absolut enough.
This commit is contained in:
jb-alvarado 2023-06-19 16:57:25 +02:00
parent 5502c45420
commit 8eb5c2ba02
14 changed files with 737 additions and 877 deletions

552
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -146,27 +146,27 @@ out:
## JSON RPC ## JSON RPC
The ffplayout engine can run a JSON RPC server. A request looks like: The ffplayout engine can run a simple RPC server. A request looks like:
```Bash ```Bash
curl -X POST -H "Content-Type: application/json" -H "Authorization: ---auth-key---" \ curl -X POST -H "Content-Type: application/json" -H "Authorization: ---auth-key---" \
-d '{"jsonrpc": "2.0", "id":1, "method": "player", "params":{"control":"next"}}' \ -d '{"control":"next"}' \
127.0.0.1:7070 127.0.0.1:7070
``` ```
At the moment this commends are possible: At the moment this commends are possible:
```Bash ```Bash
'{"jsonrpc": "2.0", "id":1, "method": "player", "params":{"media":"current"}}' # get infos about current clip '{"media":"current"}' # get infos about current clip
'{"jsonrpc": "2.0", "id":2, "method": "player", "params":{"media":"next"}}' # get infos about next clip '{"media":"next"}' # get infos about next clip
'{"jsonrpc": "2.0", "id":3, "method": "player", "params":{"media":"last"}}' # get infos about last clip '{"media":"last"}' # get infos about last clip
'{"jsonrpc": "2.0", "id":4, "method": "player", "params":{"control":"next"}}' # jump to next clip '{"control":"next"}' # jump to next clip
'{"jsonrpc": "2.0", "id":5, "method": "player", "params":{"control":"back"}}' # jump to last clip '{"control":"back"}' # jump to last clip
'{"jsonrpc": "2.0", "id":6, "method": "player", "params":{"control":"reset"}}' # reset playlist to old state '{"control":"reset"}' # reset playlist to old state
'{"jsonrpc": "2.0", "id":7, "method": "player", "params":{"control":"text", \ '{"control":"text", \
"message": {"text": "Hello from ffplayout", "x": "(w-text_w)/2", "y": "(h-text_h)/2", \ "message": {"text": "Hello from ffplayout", "x": "(w-text_w)/2", "y": "(h-text_h)/2", \
"fontsize": 24, "line_spacing": 4, "fontcolor": "#ffffff", "box": 1, \ "fontsize": 24, "line_spacing": 4, "fontcolor": "#ffffff", "box": 1, \
"boxcolor": "#000000", "boxborderw": 4, "alpha": 1.0}}}' # send text to drawtext filter from ffmpeg "boxcolor": "#000000", "boxborderw": 4, "alpha": 1.0}}' # send text to drawtext filter from ffmpeg
``` ```
Output from `{"media":"current"}` show: Output from `{"media":"current"}` show:

View File

@ -17,7 +17,7 @@ actix-web-grants = "3"
actix-web-httpauth = "0.6" actix-web-httpauth = "0.6"
argon2 = "0.4" argon2 = "0.4"
chrono = "0.4" chrono = "0.4"
clap = { version = "3.2", features = ["derive"] } clap = { version = "4.3", features = ["derive"] }
derive_more = "0.99" derive_more = "0.99"
faccess = "0.2" faccess = "0.2"
futures-util = { version = "0.3", default-features = false, features = ["std"] } futures-util = { version = "0.3", default-features = false, features = ["std"] }

View File

@ -6,10 +6,7 @@ use std::{
}; };
use actix_web::web; use actix_web::web;
use reqwest::{ use reqwest::{header::AUTHORIZATION, Client, Response};
header::{HeaderMap, AUTHORIZATION},
Client, Response,
};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use sqlx::{Pool, Sqlite}; use sqlx::{Pool, Sqlite};
use tokio::{ use tokio::{
@ -21,14 +18,6 @@ use crate::db::handles::select_channel;
use crate::utils::{errors::ServiceError, playout_config}; use crate::utils::{errors::ServiceError, playout_config};
use ffplayout_lib::vec_strings; use ffplayout_lib::vec_strings;
#[derive(Debug, Deserialize, Serialize, Clone)]
struct RpcObj<T> {
jsonrpc: String,
id: i32,
method: String,
params: T,
}
#[derive(Debug, Deserialize, Serialize, Clone)] #[derive(Debug, Deserialize, Serialize, Clone)]
struct TextParams { struct TextParams {
control: String, control: String,
@ -45,17 +34,6 @@ struct MediaParams {
media: String, media: String,
} }
impl<T> RpcObj<T> {
fn new(id: i32, method: String, params: T) -> Self {
Self {
jsonrpc: "2.0".into(),
id,
method,
params,
}
}
}
/// ffplayout engine process /// ffplayout engine process
/// ///
/// When running not on Linux, or with environment variable `PIGGYBACK_MODE=true`, /// When running not on Linux, or with environment variable `PIGGYBACK_MODE=true`,
@ -263,18 +241,7 @@ impl SystemD {
} }
} }
fn create_header(auth: &str) -> HeaderMap { async fn post_request<T>(conn: &Pool<Sqlite>, id: i32, obj: T) -> Result<Response, ServiceError>
let mut headers = HeaderMap::new();
headers.insert(AUTHORIZATION, auth.parse().unwrap());
headers
}
async fn post_request<T>(
conn: &Pool<Sqlite>,
id: i32,
obj: RpcObj<T>,
) -> Result<Response, ServiceError>
where where
T: Serialize, T: Serialize,
{ {
@ -284,7 +251,7 @@ where
match client match client
.post(&url) .post(&url)
.headers(create_header(&config.rpc_server.authorization)) .header(AUTHORIZATION, &config.rpc_server.authorization)
.json(&obj) .json(&obj)
.send() .send()
.await .await
@ -299,14 +266,10 @@ pub async fn send_message(
id: i32, id: i32,
message: HashMap<String, String>, message: HashMap<String, String>,
) -> Result<Response, ServiceError> { ) -> Result<Response, ServiceError> {
let json_obj = RpcObj::new( let json_obj = TextParams {
id, control: "text".into(),
"player".into(), message,
TextParams { };
control: "text".into(),
message,
},
);
post_request(conn, id, json_obj).await post_request(conn, id, json_obj).await
} }
@ -316,13 +279,9 @@ pub async fn control_state(
id: i32, id: i32,
command: &str, command: &str,
) -> Result<Response, ServiceError> { ) -> Result<Response, ServiceError> {
let json_obj = RpcObj::new( let json_obj = ControlParams {
id, control: command.to_owned(),
"player".into(), };
ControlParams {
control: command.to_owned(),
},
);
post_request(conn, id, json_obj).await post_request(conn, id, json_obj).await
} }
@ -332,7 +291,7 @@ pub async fn media_info(
id: i32, id: i32,
command: String, command: String,
) -> Result<Response, ServiceError> { ) -> Result<Response, ServiceError> {
let json_obj = RpcObj::new(id, "player".into(), MediaParams { media: command }); let json_obj = MediaParams { media: command };
post_request(conn, id, json_obj).await post_request(conn, id, json_obj).await
} }

View File

@ -13,10 +13,9 @@ default-run = "ffplayout"
[dependencies] [dependencies]
ffplayout-lib = { path = "../lib" } ffplayout-lib = { path = "../lib" }
chrono = "0.4" chrono = "0.4"
clap = { version = "3.2", features = ["derive"] } clap = { version = "4.3", features = ["derive"] }
crossbeam-channel = "0.5" crossbeam-channel = "0.5"
futures = "0.3" futures = "0.3"
jsonrpc-http-server = "18.0"
notify = "6.0" notify = "6.0"
notify-debouncer-full = { version = "*", default-features = false } notify-debouncer-full = { version = "*", default-features = false }
regex = "1" regex = "1"
@ -24,6 +23,7 @@ reqwest = { version = "0.11", features = ["blocking", "json"] }
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0" serde_json = "1.0"
simplelog = { version = "^0.12", features = ["paris"] } simplelog = { version = "^0.12", features = ["paris"] }
tiny_http = { version = "0.12", default-features = false }
zeromq = { git = "https://github.com/zeromq/zmq.rs.git", default-features = false, features = [ zeromq = { git = "https://github.com/zeromq/zmq.rs.git", default-features = false, features = [
"async-std-runtime", "async-std-runtime",
"tcp-transport", "tcp-transport",

View File

@ -14,7 +14,7 @@ use simplelog::*;
use ffplayout::{ use ffplayout::{
output::{player, write_hls}, output::{player, write_hls},
rpc::json_rpc_server, rpc::run_server,
utils::{arg_parse::get_args, get_config}, utils::{arg_parse::get_args, get_config},
}; };
@ -155,7 +155,7 @@ fn main() {
if config.rpc_server.enable { if config.rpc_server.enable {
// If RPC server is enable we also fire up a JSON RPC server. // If RPC server is enable we also fire up a JSON RPC server.
thread::spawn(move || json_rpc_server(config_clone, play_ctl, play_stat, proc_ctl2)); thread::spawn(move || run_server(config_clone, play_ctl, play_stat, proc_ctl2));
} }
status_file(&config.general.stat_file, &playout_stat); status_file(&config.general.stat_file, &playout_stat);

View File

@ -1,407 +1,5 @@
use std::{fmt, process::exit, sync::atomic::Ordering}; mod server;
mod zmq_cmd; mod zmq_cmd;
use futures::executor::block_on; pub use server::run_server;
use jsonrpc_http_server::{ pub use zmq_cmd::zmq_send;
hyper,
jsonrpc_core::{IoHandler, Params, Value},
AccessControlAllowOrigin, DomainsValidation, Response, RestApi, ServerBuilder,
};
use serde::Deserialize;
use serde_json::{json, Map};
use simplelog::*;
use ffplayout_lib::utils::{
get_delta, get_sec, sec_to_time, write_status, Ingest, Media, OutputMode::*, PlayerControl,
PlayoutConfig, PlayoutStatus, ProcessControl,
};
use zmq_cmd::zmq_send;
#[derive(Default, Deserialize, Clone)]
struct TextFilter {
text: Option<String>,
x: Option<String>,
y: Option<String>,
fontsize: Option<String>,
line_spacing: Option<String>,
fontcolor: Option<String>,
alpha: Option<String>,
r#box: Option<String>,
boxcolor: Option<String>,
boxborderw: Option<String>,
}
impl fmt::Display for TextFilter {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let escaped_text = self
.text
.clone()
.unwrap_or_default()
.replace('\'', "'\\\\\\''")
.replace('\\', "\\\\\\\\")
.replace('%', "\\\\\\%")
.replace(':', "\\:");
let mut s = format!("text='{escaped_text}'");
if let Some(v) = &self.x {
if !v.is_empty() {
s.push_str(&format!(":x='{v}'"));
}
}
if let Some(v) = &self.y {
if !v.is_empty() {
s.push_str(&format!(":y='{v}'"));
}
}
if let Some(v) = &self.fontsize {
if !v.is_empty() {
s.push_str(&format!(":fontsize={v}"));
}
}
if let Some(v) = &self.line_spacing {
if !v.is_empty() {
s.push_str(&format!(":line_spacing={v}"));
}
}
if let Some(v) = &self.fontcolor {
if !v.is_empty() {
s.push_str(&format!(":fontcolor={v}"));
}
}
if let Some(v) = &self.alpha {
if !v.is_empty() {
s.push_str(&format!(":alpha='{v}'"));
}
}
if let Some(v) = &self.r#box {
if !v.is_empty() {
s.push_str(&format!(":box={v}"));
}
}
if let Some(v) = &self.boxcolor {
if !v.is_empty() {
s.push_str(&format!(":boxcolor={v}"));
}
}
if let Some(v) = &self.boxborderw {
if !v.is_empty() {
s.push_str(&format!(":boxborderw={v}"));
}
}
write!(f, "{s}")
}
}
/// Covert JSON string to ffmpeg filter command.
fn filter_from_json(raw_text: serde_json::Value) -> String {
let filter: TextFilter = serde_json::from_value(raw_text).unwrap_or_default();
filter.to_string()
}
/// map media struct to json object
fn get_media_map(media: Media) -> Value {
json!({
"seek": media.seek,
"out": media.out,
"duration": media.duration,
"category": media.category,
"source": media.source,
})
}
/// prepare json object for response
fn get_data_map(
config: &PlayoutConfig,
media: Media,
server_is_running: bool,
) -> Map<String, Value> {
let mut data_map = Map::new();
let begin = media.begin.unwrap_or(0.0);
data_map.insert("play_mode".to_string(), json!(config.processing.mode));
data_map.insert("ingest_runs".to_string(), json!(server_is_running));
data_map.insert("index".to_string(), json!(media.index));
data_map.insert("start_sec".to_string(), json!(begin));
if begin > 0.0 {
let played_time = get_sec() - begin;
let remaining_time = media.out - played_time;
data_map.insert("start_time".to_string(), json!(sec_to_time(begin)));
data_map.insert("played_sec".to_string(), json!(played_time));
data_map.insert("remaining_sec".to_string(), json!(remaining_time));
}
data_map.insert("current_media".to_string(), get_media_map(media));
data_map
}
/// JSON RPC Server
///
/// A simple rpc server for getting status information and controlling player:
///
/// - current clip information
/// - jump to next clip
/// - get last clip
/// - reset player state to original clip
pub fn json_rpc_server(
config: PlayoutConfig,
play_control: PlayerControl,
playout_stat: PlayoutStatus,
proc_control: ProcessControl,
) {
let addr = config.rpc_server.address.clone();
let auth = config.rpc_server.authorization.clone();
let mut io = IoHandler::default();
let proc = proc_control.clone();
io.add_sync_method("player", move |params: Params| {
if let Params::Map(map) = params {
let mut time_shift = playout_stat.time_shift.lock().unwrap();
let current_date = playout_stat.current_date.lock().unwrap().clone();
let mut date = playout_stat.date.lock().unwrap();
let current_list = play_control.current_list.lock().unwrap();
// forward text message to ffmpeg
if map.contains_key("control")
&& &map["control"] == "text"
&& map.contains_key("message")
{
let filter = filter_from_json(map["message"].clone());
debug!("Got drawtext command: <bright-blue>\"{filter}\"</>");
// TODO: in Rust 1.66 use let_chains instead
if !filter.is_empty() && config.text.zmq_stream_socket.is_some() {
if let Some(clips_filter) = playout_stat.chain.clone() {
*clips_filter.lock().unwrap() = vec![filter.clone()];
}
if config.out.mode == HLS {
if proc.server_is_running.load(Ordering::SeqCst) {
let filter_server = format!("drawtext@dyntext reinit {filter}");
if let Ok(reply) = block_on(zmq_send(
&filter_server,
&config.text.zmq_server_socket.clone().unwrap(),
)) {
return Ok(Value::String(reply));
};
} else if let Err(e) = proc.stop(Ingest) {
error!("Ingest {e:?}")
}
}
if config.out.mode != HLS || !proc.server_is_running.load(Ordering::SeqCst) {
let filter_stream = format!("drawtext@dyntext reinit {filter}");
if let Ok(reply) = block_on(zmq_send(
&filter_stream,
&config.text.zmq_stream_socket.clone().unwrap(),
)) {
return Ok(Value::String(reply));
};
}
}
return Ok(Value::String("Last clip can not be skipped".to_string()));
}
// get next clip
if map.contains_key("control") && &map["control"] == "next" {
let index = play_control.index.load(Ordering::SeqCst);
if index < current_list.len() {
if let Some(proc) = proc.decoder_term.lock().unwrap().as_mut() {
if let Err(e) = proc.kill() {
error!("Decoder {e:?}")
};
if let Err(e) = proc.wait() {
error!("Decoder {e:?}")
};
info!("Move to next clip");
let mut data_map = Map::new();
let mut media = current_list[index].clone();
media.add_probe();
let (delta, _) = get_delta(&config, &media.begin.unwrap_or(0.0));
*time_shift = delta;
*date = current_date.clone();
write_status(&config, &current_date, delta);
data_map.insert("operation".to_string(), json!("move_to_next"));
data_map.insert("shifted_seconds".to_string(), json!(delta));
data_map.insert("media".to_string(), get_media_map(media));
return Ok(Value::Object(data_map));
}
return Ok(Value::String("Move failed".to_string()));
}
return Ok(Value::String("Last clip can not be skipped".to_string()));
}
// get last clip
if map.contains_key("control") && &map["control"] == "back" {
let index = play_control.index.load(Ordering::SeqCst);
if index > 1 && current_list.len() > 1 {
if let Some(proc) = proc.decoder_term.lock().unwrap().as_mut() {
if let Err(e) = proc.kill() {
error!("Decoder {e:?}")
};
if let Err(e) = proc.wait() {
error!("Decoder {e:?}")
};
info!("Move to last clip");
let mut data_map = Map::new();
let mut media = current_list[index - 2].clone();
play_control.index.fetch_sub(2, Ordering::SeqCst);
media.add_probe();
let (delta, _) = get_delta(&config, &media.begin.unwrap_or(0.0));
*time_shift = delta;
*date = current_date.clone();
write_status(&config, &current_date, delta);
data_map.insert("operation".to_string(), json!("move_to_last"));
data_map.insert("shifted_seconds".to_string(), json!(delta));
data_map.insert("media".to_string(), get_media_map(media));
return Ok(Value::Object(data_map));
}
return Ok(Value::String("Move failed".to_string()));
}
return Ok(Value::String("Clip index out of range".to_string()));
}
// reset player state
if map.contains_key("control") && &map["control"] == "reset" {
if let Some(proc) = proc.decoder_term.lock().unwrap().as_mut() {
if let Err(e) = proc.kill() {
error!("Decoder {e:?}")
};
if let Err(e) = proc.wait() {
error!("Decoder {e:?}")
};
info!("Reset playout to original state");
let mut data_map = Map::new();
*time_shift = 0.0;
*date = current_date.clone();
playout_stat.list_init.store(true, Ordering::SeqCst);
write_status(&config, &current_date, 0.0);
data_map.insert("operation".to_string(), json!("reset_playout_state"));
return Ok(Value::Object(data_map));
}
return Ok(Value::String("Reset playout state failed".to_string()));
}
// stop playout
if map.contains_key("control") && &map["control"] == "stop_all" {
proc.stop_all();
return Ok(Value::String("Stop playout!".to_string()));
}
// get infos about current clip
if map.contains_key("media") && &map["media"] == "current" {
if let Some(media) = play_control.current_media.lock().unwrap().clone() {
let data_map = get_data_map(
&config,
media,
proc.server_is_running.load(Ordering::SeqCst),
);
return Ok(Value::Object(data_map));
};
}
// get infos about next clip
if map.contains_key("media") && &map["media"] == "next" {
let index = play_control.index.load(Ordering::SeqCst);
if index < current_list.len() {
let media = current_list[index].clone();
let data_map = get_data_map(&config, media, false);
return Ok(Value::Object(data_map));
}
return Ok(Value::String("There is no next clip".to_string()));
}
// get infos about last clip
if map.contains_key("media") && &map["media"] == "last" {
let index = play_control.index.load(Ordering::SeqCst);
if index > 1 && index - 2 < current_list.len() {
let media = current_list[index - 2].clone();
let data_map = get_data_map(&config, media, false);
return Ok(Value::Object(data_map));
}
return Ok(Value::String("There is no last clip".to_string()));
}
}
Ok(Value::String("No, or wrong parameters set!".to_string()))
});
info!("Run JSON RPC server, listening on: <b><magenta>http://{addr}</></b>");
// build rpc server
match ServerBuilder::new(io)
.cors(DomainsValidation::AllowOnly(vec![
AccessControlAllowOrigin::Null,
]))
// add middleware, for authentication
.request_middleware(move |request: hyper::Request<hyper::Body>| {
if request.headers().contains_key("authorization")
&& request.headers()["authorization"] == auth
{
if request.uri() == "/status" {
Response::ok("Server running OK.").into()
} else {
request.into()
}
} else {
Response::bad_request("No authorization header or valid key found!").into()
}
})
.rest_api(RestApi::Secure)
.start_http(&addr.parse().unwrap())
{
Ok(server) => {
*proc_control.rpc_handle.lock().unwrap() = Some(server.close_handle());
server.wait();
}
Err(e) => {
error!("Unable to start RPC server: {e}");
proc_control.stop_all();
exit(1);
}
};
}

View File

@ -0,0 +1,541 @@
use std::{fmt, sync::atomic::Ordering};
extern crate serde;
extern crate serde_json;
extern crate tiny_http;
use futures::executor::block_on;
use serde::{Deserialize, Serialize};
use serde_json::{json, Map, Value};
use simplelog::*;
use std::collections::HashMap;
use std::io::{Cursor, Error as IoError};
use tiny_http::{Header, Method, Request, Response, Server};
use crate::rpc::zmq_send;
use ffplayout_lib::utils::{
get_delta, get_sec, sec_to_time, write_status, Ingest, Media, OutputMode::*, PlayerControl,
PlayoutConfig, PlayoutStatus, ProcessControl,
};
#[derive(Default, Deserialize, Clone)]
struct TextFilter {
text: Option<String>,
x: Option<String>,
y: Option<String>,
fontsize: Option<String>,
line_spacing: Option<String>,
fontcolor: Option<String>,
alpha: Option<String>,
r#box: Option<String>,
boxcolor: Option<String>,
boxborderw: Option<String>,
}
impl fmt::Display for TextFilter {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let escaped_text = self
.text
.clone()
.unwrap_or_default()
.replace('\'', "'\\\\\\''")
.replace('\\', "\\\\\\\\")
.replace('%', "\\\\\\%")
.replace(':', "\\:");
let mut s = format!("text='{escaped_text}'");
if let Some(v) = &self.x {
if !v.is_empty() {
s.push_str(&format!(":x='{v}'"));
}
}
if let Some(v) = &self.y {
if !v.is_empty() {
s.push_str(&format!(":y='{v}'"));
}
}
if let Some(v) = &self.fontsize {
if !v.is_empty() {
s.push_str(&format!(":fontsize={v}"));
}
}
if let Some(v) = &self.line_spacing {
if !v.is_empty() {
s.push_str(&format!(":line_spacing={v}"));
}
}
if let Some(v) = &self.fontcolor {
if !v.is_empty() {
s.push_str(&format!(":fontcolor={v}"));
}
}
if let Some(v) = &self.alpha {
if !v.is_empty() {
s.push_str(&format!(":alpha='{v}'"));
}
}
if let Some(v) = &self.r#box {
if !v.is_empty() {
s.push_str(&format!(":box={v}"));
}
}
if let Some(v) = &self.boxcolor {
if !v.is_empty() {
s.push_str(&format!(":boxcolor={v}"));
}
}
if let Some(v) = &self.boxborderw {
if !v.is_empty() {
s.push_str(&format!(":boxborderw={v}"));
}
}
write!(f, "{s}")
}
}
/// Covert JSON string to ffmpeg filter command.
fn filter_from_json(raw_text: serde_json::Value) -> String {
let filter: TextFilter = serde_json::from_value(raw_text).unwrap_or_default();
filter.to_string()
}
/// map media struct to json object
fn get_media_map(media: Media) -> Value {
json!({
"seek": media.seek,
"out": media.out,
"duration": media.duration,
"category": media.category,
"source": media.source,
})
}
/// prepare json object for response
fn get_data_map(
config: &PlayoutConfig,
media: Media,
server_is_running: bool,
) -> Map<String, Value> {
let mut data_map = Map::new();
let begin = media.begin.unwrap_or(0.0);
data_map.insert("play_mode".to_string(), json!(config.processing.mode));
data_map.insert("ingest_runs".to_string(), json!(server_is_running));
data_map.insert("index".to_string(), json!(media.index));
data_map.insert("start_sec".to_string(), json!(begin));
if begin > 0.0 {
let played_time = get_sec() - begin;
let remaining_time = media.out - played_time;
data_map.insert("start_time".to_string(), json!(sec_to_time(begin)));
data_map.insert("played_sec".to_string(), json!(played_time));
data_map.insert("remaining_sec".to_string(), json!(remaining_time));
}
data_map.insert("current_media".to_string(), get_media_map(media));
data_map
}
#[derive(Debug, Serialize, Deserialize)]
struct ResponseData {
message: String,
}
fn read_request_body(request: &mut Request) -> Result<String, IoError> {
let mut buffer = String::new();
let body = request.as_reader();
match body.read_to_string(&mut buffer) {
Ok(_) => Ok(buffer),
Err(error) => Err(error),
}
}
fn json_response(data: serde_json::Map<String, serde_json::Value>) -> Response<Cursor<Vec<u8>>> {
let response_body = serde_json::to_string(&data).unwrap();
// create HTTP-Response
Response::from_string(response_body)
.with_status_code(200)
.with_header(Header::from_bytes(&b"Content-Type"[..], &b"application/json"[..]).unwrap())
}
fn error_response(answer: &str, code: i32) -> Response<Cursor<Vec<u8>>> {
error!("RPC: {answer}");
Response::from_string(answer)
.with_status_code(code)
.with_header(Header::from_bytes(&b"Content-Type"[..], &b"text/plain"[..]).unwrap())
}
fn control_back(
config: &PlayoutConfig,
play_control: &PlayerControl,
playout_stat: &PlayoutStatus,
proc: &ProcessControl,
) -> Response<Cursor<Vec<u8>>> {
let current_date = playout_stat.current_date.lock().unwrap().clone();
let current_list = play_control.current_list.lock().unwrap();
let mut date = playout_stat.date.lock().unwrap();
let index = play_control.index.load(Ordering::SeqCst);
let mut time_shift = playout_stat.time_shift.lock().unwrap();
if index > 1 && current_list.len() > 1 {
if let Some(proc) = proc.decoder_term.lock().unwrap().as_mut() {
if let Err(e) = proc.kill() {
error!("Decoder {e:?}")
};
if let Err(e) = proc.wait() {
error!("Decoder {e:?}")
};
info!("Move to last clip");
let mut data_map = Map::new();
let mut media = current_list[index - 2].clone();
play_control.index.fetch_sub(2, Ordering::SeqCst);
media.add_probe();
let (delta, _) = get_delta(config, &media.begin.unwrap_or(0.0));
*time_shift = delta;
*date = current_date.clone();
write_status(config, &current_date, delta);
data_map.insert("operation".to_string(), json!("move_to_last"));
data_map.insert("shifted_seconds".to_string(), json!(delta));
data_map.insert("media".to_string(), get_media_map(media));
return json_response(data_map);
}
return error_response("Jump to last clip failed!", 500);
}
error_response("Clip index out of range!", 400)
}
fn control_next(
config: &PlayoutConfig,
play_control: &PlayerControl,
playout_stat: &PlayoutStatus,
proc: &ProcessControl,
) -> Response<Cursor<Vec<u8>>> {
let current_date = playout_stat.current_date.lock().unwrap().clone();
let current_list = play_control.current_list.lock().unwrap();
let mut date = playout_stat.date.lock().unwrap();
let index = play_control.index.load(Ordering::SeqCst);
let mut time_shift = playout_stat.time_shift.lock().unwrap();
if index < current_list.len() {
if let Some(proc) = proc.decoder_term.lock().unwrap().as_mut() {
if let Err(e) = proc.kill() {
error!("Decoder {e:?}")
};
if let Err(e) = proc.wait() {
error!("Decoder {e:?}")
};
info!("Move to next clip");
let mut data_map = Map::new();
let mut media = current_list[index].clone();
media.add_probe();
let (delta, _) = get_delta(config, &media.begin.unwrap_or(0.0));
*time_shift = delta;
*date = current_date.clone();
write_status(config, &current_date, delta);
data_map.insert("operation".to_string(), json!("move_to_next"));
data_map.insert("shifted_seconds".to_string(), json!(delta));
data_map.insert("media".to_string(), get_media_map(media));
return json_response(data_map);
}
return error_response("Jump to next clip failed!", 500);
}
error_response("Last clip can not be skipped!", 400)
}
fn control_reset(
config: &PlayoutConfig,
playout_stat: &PlayoutStatus,
proc: &ProcessControl,
) -> Response<Cursor<Vec<u8>>> {
let current_date = playout_stat.current_date.lock().unwrap().clone();
let mut date = playout_stat.date.lock().unwrap();
let mut time_shift = playout_stat.time_shift.lock().unwrap();
if let Some(proc) = proc.decoder_term.lock().unwrap().as_mut() {
if let Err(e) = proc.kill() {
error!("Decoder {e:?}")
};
if let Err(e) = proc.wait() {
error!("Decoder {e:?}")
};
info!("Reset playout to original state");
let mut data_map = Map::new();
*time_shift = 0.0;
*date = current_date.clone();
playout_stat.list_init.store(true, Ordering::SeqCst);
write_status(config, &current_date, 0.0);
data_map.insert("operation".to_string(), json!("reset_playout_state"));
return json_response(data_map);
}
error_response("Reset playout state failed!", 400)
}
fn control_stop(proc: &ProcessControl) -> Response<Cursor<Vec<u8>>> {
proc.stop_all();
let mut data_map = Map::new();
data_map.insert("message".to_string(), json!("Stop playout!"));
// Ok(Value::String("Stop playout!".to_string()));
json_response(data_map)
}
fn control_text(
data: HashMap<String, serde_json::Value>,
config: &PlayoutConfig,
playout_stat: &PlayoutStatus,
proc: &ProcessControl,
) -> Response<Cursor<Vec<u8>>> {
if data.contains_key("message") {
let filter = filter_from_json(data["message"].clone());
debug!("Got drawtext command: <bright-blue>\"{filter}\"</>");
let mut data_map = Map::new();
if !filter.is_empty() && config.text.zmq_stream_socket.is_some() {
if let Some(clips_filter) = playout_stat.chain.clone() {
*clips_filter.lock().unwrap() = vec![filter.clone()];
}
if config.out.mode == HLS {
if proc.server_is_running.load(Ordering::SeqCst) {
let filter_server = format!("drawtext@dyntext reinit {filter}");
if let Ok(reply) = block_on(zmq_send(
&filter_server,
&config.text.zmq_server_socket.clone().unwrap(),
)) {
data_map.insert("message".to_string(), json!(reply));
return json_response(data_map);
};
} else if let Err(e) = proc.stop(Ingest) {
error!("Ingest {e:?}")
}
}
if config.out.mode != HLS || !proc.server_is_running.load(Ordering::SeqCst) {
let filter_stream = format!("drawtext@dyntext reinit {filter}");
if let Ok(reply) = block_on(zmq_send(
&filter_stream,
&config.text.zmq_stream_socket.clone().unwrap(),
)) {
data_map.insert("message".to_string(), json!(reply));
return json_response(data_map);
};
}
}
}
error_response("text message missing!", 400)
}
fn media_current(
config: &PlayoutConfig,
play_control: &PlayerControl,
proc: &ProcessControl,
) -> Response<Cursor<Vec<u8>>> {
if let Some(media) = play_control.current_media.lock().unwrap().clone() {
let data_map = get_data_map(config, media, proc.server_is_running.load(Ordering::SeqCst));
return json_response(data_map);
};
error_response("No current clip...", 204)
}
fn media_next(config: &PlayoutConfig, play_control: &PlayerControl) -> Response<Cursor<Vec<u8>>> {
let index = play_control.index.load(Ordering::SeqCst);
let current_list = play_control.current_list.lock().unwrap();
if index < current_list.len() {
let media = current_list[index].clone();
let data_map = get_data_map(config, media, false);
return json_response(data_map);
}
error_response("There is no next clip", 500)
}
fn media_last(config: &PlayoutConfig, play_control: &PlayerControl) -> Response<Cursor<Vec<u8>>> {
let index = play_control.index.load(Ordering::SeqCst);
let current_list = play_control.current_list.lock().unwrap();
if index > 1 && index - 2 < current_list.len() {
let media = current_list[index - 2].clone();
let data_map = get_data_map(config, media, false);
return json_response(data_map);
}
error_response("There is no last clip", 500)
}
fn build_response(
mut request: Request,
config: &PlayoutConfig,
play_control: &PlayerControl,
playout_stat: &PlayoutStatus,
proc_control: &ProcessControl,
) {
if let Ok(body) = read_request_body(&mut request) {
if let Ok(data) = serde_json::from_str::<HashMap<String, serde_json::Value>>(&body) {
debug!("Received JSON request: {:?}", data);
if let Some(control_value) = data.get("control").and_then(|c| c.as_str()) {
match control_value {
"back" => {
let _ = request.respond(control_back(
config,
play_control,
playout_stat,
proc_control,
));
}
"next" => {
let _ = request.respond(control_next(
config,
play_control,
playout_stat,
proc_control,
));
}
"reset" => {
let _ = request.respond(control_reset(config, playout_stat, proc_control));
}
"stop_all" => {
let _ = request.respond(control_stop(proc_control));
}
"text" => {
let _ =
request.respond(control_text(data, config, playout_stat, proc_control));
}
_ => (),
}
} else if let Some(media_value) = data.get("media").and_then(|m| m.as_str()) {
match media_value {
"current" => {
let _ = request.respond(media_current(config, play_control, proc_control));
}
"next" => {
let _ = request.respond(media_next(config, play_control));
}
"last" => {
let _ = request.respond(media_last(config, play_control));
}
_ => (),
}
}
} else {
error!("Error parsing JSON request.");
let _ = request.respond(error_response("Invalid JSON request", 400));
}
} else {
error!("Error reading request body.");
let _ = request.respond(error_response("Invalid JSON request", 500));
}
}
fn handle_request(
request: Request,
config: &PlayoutConfig,
play_control: &PlayerControl,
playout_stat: &PlayoutStatus,
proc_control: &ProcessControl,
) {
// Check Authorization-Header
match request
.headers()
.iter()
.find(|h| h.field.equiv("Authorization"))
{
Some(header) => {
let auth_value = header.value.as_str();
if auth_value == config.rpc_server.authorization {
// create and send response
build_response(request, config, play_control, playout_stat, proc_control)
} else {
let _ = request.respond(error_response("Unauthorized", 401));
}
}
None => {
let _ = request.respond(error_response("Missing authorization", 401));
}
}
}
/// JSON RPC Server
///
/// A simple rpc server for getting status information and controlling player:
///
/// - current clip information
/// - jump to next clip
/// - get last clip
/// - reset player state to original clip
pub fn run_server(
config: PlayoutConfig,
play_control: PlayerControl,
playout_stat: PlayoutStatus,
proc_control: ProcessControl,
) {
let addr = config.rpc_server.address.clone();
// info!("Server listening on {addr}");
let server = Server::http(addr).expect("Failed to start server");
for request in server.incoming_requests() {
match request.method() {
Method::Post => handle_request(
request,
&config,
&play_control,
&playout_stat,
&proc_control,
),
_ => {
// Method not allowed
let response = Response::from_string("Method not allowed")
.with_status_code(405)
.with_header(
Header::from_bytes(&b"Content-Type"[..], &b"text/plain"[..]).unwrap(),
);
let _ = request.respond(response);
}
}
}
}

View File

@ -23,15 +23,11 @@ pub struct Args {
long, long,
help = "Generate playlist for dates, like: 2022-01-01 - 2022-01-10", help = "Generate playlist for dates, like: 2022-01-01 - 2022-01-10",
name = "YYYY-MM-DD", name = "YYYY-MM-DD",
multiple_values = true num_args = 0..=3,
)] )]
pub generate: Option<Vec<String>>, pub generate: Option<Vec<String>>,
#[clap( #[clap(long, help = "Optional path list for playlist generations", num_args = 0..=20)]
long,
help = "Optional path list for playlist generations",
multiple_values = true
)]
pub paths: Option<Vec<String>>, pub paths: Option<Vec<String>>,
#[clap(short = 'm', long, help = "Playing mode: folder, playlist")] #[clap(short = 'm', long, help = "Playing mode: folder, playlist")]

@ -1 +1 @@
Subproject commit 00c0ab661199b9a08532821e724592ecbdbe5170 Subproject commit 75515da508578700c82cd7ed916b9d3f2e4f1fdc

View File

@ -13,10 +13,8 @@ chrono = "0.4"
crossbeam-channel = "0.5" crossbeam-channel = "0.5"
ffprobe = "0.3" ffprobe = "0.3"
file-rotate = "0.7.0" file-rotate = "0.7.0"
jsonrpc-http-server = "18.0"
lettre = "0.10" lettre = "0.10"
log = "0.4" log = "0.4"
notify = "4.0"
rand = "0.8" rand = "0.8"
regex = "1" regex = "1"
reqwest = { version = "0.11", features = ["blocking", "json"] } reqwest = { version = "0.11", features = ["blocking", "json"] }

View File

@ -7,7 +7,6 @@ use std::{
}, },
}; };
use jsonrpc_http_server::CloseHandle;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use simplelog::*; use simplelog::*;
@ -44,7 +43,6 @@ pub struct ProcessControl {
pub encoder_term: Arc<Mutex<Option<Child>>>, pub encoder_term: Arc<Mutex<Option<Child>>>,
pub server_term: Arc<Mutex<Option<Child>>>, pub server_term: Arc<Mutex<Option<Child>>>,
pub server_is_running: Arc<AtomicBool>, pub server_is_running: Arc<AtomicBool>,
pub rpc_handle: Arc<Mutex<Option<CloseHandle>>>,
pub is_terminated: Arc<AtomicBool>, pub is_terminated: Arc<AtomicBool>,
pub is_alive: Arc<AtomicBool>, pub is_alive: Arc<AtomicBool>,
} }
@ -56,7 +54,6 @@ impl ProcessControl {
encoder_term: Arc::new(Mutex::new(None)), encoder_term: Arc::new(Mutex::new(None)),
server_term: Arc::new(Mutex::new(None)), server_term: Arc::new(Mutex::new(None)),
server_is_running: Arc::new(AtomicBool::new(false)), server_is_running: Arc::new(AtomicBool::new(false)),
rpc_handle: Arc::new(Mutex::new(None)),
is_terminated: Arc::new(AtomicBool::new(false)), is_terminated: Arc::new(AtomicBool::new(false)),
is_alive: Arc::new(AtomicBool::new(true)), is_alive: Arc::new(AtomicBool::new(true)),
} }

View File

@ -14,7 +14,6 @@ use std::env;
use chrono::{prelude::*, Duration}; use chrono::{prelude::*, Duration};
use ffprobe::{ffprobe, Format, Stream}; use ffprobe::{ffprobe, Format, Stream};
use jsonrpc_http_server::hyper::HeaderMap;
use rand::prelude::*; use rand::prelude::*;
use regex::Regex; use regex::Regex;
use reqwest::header; use reqwest::header;
@ -332,7 +331,7 @@ pub fn get_date(seek: bool, start: f64, next_start: f64) -> String {
local.format("%Y-%m-%d").to_string() local.format("%Y-%m-%d").to_string()
} }
pub fn time_from_header(headers: &HeaderMap) -> Option<DateTime<Local>> { pub fn time_from_header(headers: &header::HeaderMap) -> Option<DateTime<Local>> {
if let Some(time) = headers.get(header::LAST_MODIFIED) { if let Some(time) = headers.get(header::LAST_MODIFIED) {
if let Ok(t) = time.to_str() { if let Ok(t) = time.to_str() {
let time = DateTime::parse_from_rfc2822(t); let time = DateTime::parse_from_rfc2822(t);

View File

@ -16,10 +16,8 @@ chrono = "0.4"
crossbeam-channel = "0.5" crossbeam-channel = "0.5"
ffprobe = "0.3" ffprobe = "0.3"
file-rotate = "0.7.0" file-rotate = "0.7.0"
jsonrpc-http-server = "18.0"
lettre = "0.10" lettre = "0.10"
log = "0.4" log = "0.4"
notify = "4.0"
rand = "0.8" rand = "0.8"
regex = "1" regex = "1"
reqwest = { version = "0.11", features = ["blocking", "json"] } reqwest = { version = "0.11", features = ["blocking", "json"] }