commit
3e884bc346
21
Cargo.lock
generated
21
Cargo.lock
generated
@ -143,7 +143,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "ffplayout-rs"
|
||||
version = "0.5.0"
|
||||
version = "0.6.0"
|
||||
dependencies = [
|
||||
"chrono",
|
||||
"clap",
|
||||
@ -160,6 +160,7 @@ dependencies = [
|
||||
"serde",
|
||||
"serde_json",
|
||||
"serde_yaml",
|
||||
"shlex",
|
||||
"simplelog",
|
||||
"tokio",
|
||||
"walkdir",
|
||||
@ -450,9 +451,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "libc"
|
||||
version = "0.2.120"
|
||||
version = "0.2.121"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ad5c14e80759d0939d013e6ca49930e59fc53dd8e5009132f76240c179380c09"
|
||||
checksum = "efaa7b300f3b5fe8eb6bf21ce3895e1751d9665086af2d64b42f19701015ff4f"
|
||||
|
||||
[[package]]
|
||||
name = "linked-hash-map"
|
||||
@ -462,9 +463,9 @@ checksum = "7fb9b38af92608140b86b693604b9ffcc5824240a484d1ecd4795bacb2fe88f3"
|
||||
|
||||
[[package]]
|
||||
name = "log"
|
||||
version = "0.4.14"
|
||||
version = "0.4.15"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "51b9bbe6c47d51fc3e1a9b945965946b4c44142ab8792c50835a980d362c2710"
|
||||
checksum = "1c4dcd960cc540667f619483fc99102f88d6118b87730e24e8fbe8054b7445e4"
|
||||
dependencies = [
|
||||
"cfg-if 1.0.0",
|
||||
]
|
||||
@ -771,9 +772,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "quote"
|
||||
version = "1.0.15"
|
||||
version = "1.0.16"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "864d3e96a899863136fc6e99f3d7cae289dafe43bf2c5ac19b70df7210c0a145"
|
||||
checksum = "b4af2ec4714533fcdf07e886f17025ace8b997b9ce51204ee69b6da831c3da57"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
]
|
||||
@ -940,6 +941,12 @@ dependencies = [
|
||||
"yaml-rust",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "shlex"
|
||||
version = "1.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "43b2853a4d09f215c24cc5489c992ce46052d359b5109343cbafbf26bc62f8a3"
|
||||
|
||||
[[package]]
|
||||
name = "simplelog"
|
||||
version = "0.11.2"
|
||||
|
@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "ffplayout-rs"
|
||||
version = "0.5.0"
|
||||
version = "0.6.0"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
@ -11,13 +11,14 @@ file-rotate = "0.6.0"
|
||||
lettre = "0.10.0-rc.4"
|
||||
log = "0.4.14"
|
||||
notify = "4.0.17"
|
||||
rand = "0.8.5"
|
||||
regex = "1"
|
||||
once_cell = "1.10"
|
||||
process_control = "3.3"
|
||||
rand = "0.8.5"
|
||||
regex = "1"
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
serde_json = "1.0"
|
||||
serde_yaml = "0.8"
|
||||
shlex = "1.1"
|
||||
simplelog = { version = "^0.11.2", features = ["paris"] }
|
||||
tokio = { version = "1.16.1", features = ["rt-multi-thread"] }
|
||||
walkdir = "2"
|
||||
|
@ -65,7 +65,7 @@ ingest:
|
||||
There is no authentication, this is up to you. The recommend way is to set address to
|
||||
localhost, stream to a local server with authentication and from there stream to this app.
|
||||
enable: false
|
||||
stream_input: [-f, live_flv, -listen, 1, -i, rtmp://localhost:1936/live/stream]
|
||||
input_param: -f live_flv -listen 1 -i rtmp://localhost:1936/live/stream
|
||||
|
||||
playlist:
|
||||
helptext: >
|
||||
@ -116,34 +116,34 @@ out:
|
||||
inside. 'preview' works only in streaming output and creates a separate preview stream.
|
||||
mode: 'stream'
|
||||
preview: false
|
||||
preview_param:
|
||||
[-s, 512x288,
|
||||
-c:v, libx264,
|
||||
-crf, 24,
|
||||
-x264-params, keyint=50:min-keyint=25:scenecut=-1,
|
||||
-maxrate, 800k,
|
||||
-bufsize, 1600k,
|
||||
-preset, ultrafast,
|
||||
-tune, zerolatency,
|
||||
-profile:v, Main,
|
||||
-level, 3.1,
|
||||
-c:a, aac,
|
||||
-ar, 44100,
|
||||
-b:a, 128k,
|
||||
-flags, +global_header,
|
||||
-f, flv, rtmp://preview.local/live/stream]
|
||||
stream_param:
|
||||
[-c:v, libx264,
|
||||
-crf, 23,
|
||||
-x264-params, keyint=50:min-keyint=25:scenecut=-1,
|
||||
-maxrate, 1300k,
|
||||
-bufsize, 2600k,
|
||||
-preset, faster,
|
||||
-tune, zerolatency,
|
||||
-profile:v, Main,
|
||||
-level, 3.1,
|
||||
-c:a, aac,
|
||||
-ar, 44100,
|
||||
-b:a, 128k,
|
||||
-flags, +global_header,
|
||||
-f, flv, rtmp://localhost/live/stream]
|
||||
preview_param: >-
|
||||
-s 512x288
|
||||
-c:v libx264
|
||||
-crf 24
|
||||
-x264-params keyint=50:min-keyint=25:scenecut=-1
|
||||
-maxrate 800k
|
||||
-bufsize 1600k
|
||||
-preset ultrafast
|
||||
-tune zerolatency
|
||||
-profile:v Main
|
||||
-level 3.1
|
||||
-c:a aac
|
||||
-ar 44100
|
||||
-b:a 128k
|
||||
-flags +global_header
|
||||
-f flv rtmp://preview.local/live/stream
|
||||
output_param: >-
|
||||
-c:v libx264
|
||||
-crf 23
|
||||
-x264-params keyint=50:min-keyint=25:scenecut=-1
|
||||
-maxrate 1300k
|
||||
-bufsize 2600k
|
||||
-preset faster
|
||||
-tune zerolatency
|
||||
-profile:v Main
|
||||
-level 3.1
|
||||
-c:a aac
|
||||
-ar 44100
|
||||
-b:a 128k
|
||||
-flags +global_header
|
||||
-f flv rtmp://localhost/live/stream
|
||||
|
@ -1,57 +0,0 @@
|
||||
use std::{
|
||||
thread::sleep,
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
struct List {
|
||||
arr: Vec<u8>,
|
||||
msg: String,
|
||||
i: usize,
|
||||
}
|
||||
|
||||
impl List {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
arr: (0..10).collect(),
|
||||
msg: "fist init".to_string(),
|
||||
i: 0,
|
||||
}
|
||||
}
|
||||
|
||||
fn fill(&mut self, val: String) {
|
||||
println!("{val}");
|
||||
self.msg = "new fill".to_string();
|
||||
}
|
||||
}
|
||||
|
||||
impl Iterator for List {
|
||||
type Item = u8;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
if self.i == 0 {
|
||||
println!("{}", self.msg);
|
||||
}
|
||||
if self.i < self.arr.len() {
|
||||
let current = self.arr[self.i];
|
||||
self.i += 1;
|
||||
|
||||
Some(current)
|
||||
} else {
|
||||
self.i = 1;
|
||||
let current = self.arr[0];
|
||||
self.fill("pass to function".to_string());
|
||||
println!("{}", self.msg);
|
||||
|
||||
Some(current)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn main() {
|
||||
let list = List::new();
|
||||
|
||||
for i in list {
|
||||
println!("{i}");
|
||||
sleep(Duration::from_millis(300));
|
||||
}
|
||||
}
|
@ -1,33 +0,0 @@
|
||||
use chrono::prelude::*;
|
||||
use std::{time, time::UNIX_EPOCH};
|
||||
|
||||
pub fn get_sec() -> f64 {
|
||||
let local: DateTime<Local> = Local::now();
|
||||
|
||||
(local.hour() * 3600 + local.minute() * 60 + local.second()) as f64
|
||||
+ (local.nanosecond() as f64 / 1000000000.0)
|
||||
}
|
||||
|
||||
pub fn sec_to_time(sec: f64) -> String {
|
||||
let d = UNIX_EPOCH + time::Duration::from_secs(sec as u64);
|
||||
// Create DateTime from SystemTime
|
||||
let date_time = DateTime::<Utc>::from(d);
|
||||
|
||||
date_time.format("%H:%M:%S").to_string()
|
||||
}
|
||||
|
||||
fn main() {
|
||||
let current_time = get_sec();
|
||||
let start = 21600.0;
|
||||
let target_length = 86400.0;
|
||||
let total_delta;
|
||||
|
||||
if current_time < start {
|
||||
total_delta = start - current_time;
|
||||
} else {
|
||||
total_delta = target_length + start - current_time;
|
||||
}
|
||||
|
||||
println!("Total Seconds: {total_delta}");
|
||||
println!("Total Time: {}", sec_to_time(total_delta));
|
||||
}
|
@ -1,52 +0,0 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_yaml::{self};
|
||||
use std::{fs::File};
|
||||
|
||||
use once_cell::sync::OnceCell;
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
pub struct Config {
|
||||
pub general: General,
|
||||
pub mail: Mail,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
pub struct General {
|
||||
pub stop_threshold: f64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
pub struct Mail {
|
||||
pub subject: String,
|
||||
pub smtp_server: String,
|
||||
pub starttls: bool,
|
||||
pub sender_addr: String,
|
||||
pub sender_pass: String,
|
||||
pub recipient: String,
|
||||
pub mail_level: String,
|
||||
}
|
||||
|
||||
static INSTANCE: OnceCell<Config> = OnceCell::new();
|
||||
|
||||
impl Config {
|
||||
fn new() -> Self {
|
||||
let config_path = "/etc/ffplayout/ffplayout.yml".to_string();
|
||||
let f = File::open(&config_path).unwrap();
|
||||
|
||||
let config: Config = serde_yaml::from_reader(f).expect("Could not read config file.");
|
||||
|
||||
config
|
||||
}
|
||||
|
||||
pub fn init() -> &'static Config {
|
||||
INSTANCE.get().expect("Config is not initialized")
|
||||
}
|
||||
}
|
||||
|
||||
pub fn main() {
|
||||
let config = Config::new();
|
||||
INSTANCE.set(config).unwrap();
|
||||
let config = Config::init();
|
||||
|
||||
println!("{:#?}", config);
|
||||
}
|
@ -1,121 +0,0 @@
|
||||
extern crate log;
|
||||
extern crate simplelog;
|
||||
|
||||
use std::{thread::sleep, time::Duration};
|
||||
|
||||
use simplelog::*;
|
||||
|
||||
use file_rotate::{compression::Compression, suffix::AppendCount, ContentLimit, FileRotate};
|
||||
use log::{Level, LevelFilter, Log, Metadata, Record};
|
||||
|
||||
pub struct LogMailer {
|
||||
level: LevelFilter,
|
||||
config: Config,
|
||||
}
|
||||
|
||||
impl LogMailer {
|
||||
pub fn new(log_level: LevelFilter, config: Config) -> Box<LogMailer> {
|
||||
Box::new(LogMailer {
|
||||
level: log_level,
|
||||
config,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl Log for LogMailer {
|
||||
fn enabled(&self, metadata: &Metadata<'_>) -> bool {
|
||||
metadata.level() <= self.level
|
||||
}
|
||||
|
||||
fn log(&self, record: &Record<'_>) {
|
||||
if self.enabled(record.metadata()) {
|
||||
match record.level() {
|
||||
Level::Error => {
|
||||
println!("Send Error Mail: {:?}", record.args())
|
||||
}
|
||||
Level::Warn => {
|
||||
println!("Send Warn Mail: {:?}", record.args())
|
||||
}
|
||||
Level::Info => {
|
||||
println!("Send Info Mail: {:?}", record.args())
|
||||
}
|
||||
_ => (),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn flush(&self) {}
|
||||
}
|
||||
|
||||
impl SharedLogger for LogMailer {
|
||||
fn level(&self) -> LevelFilter {
|
||||
self.level
|
||||
}
|
||||
|
||||
fn config(&self) -> Option<&Config> {
|
||||
Some(&self.config)
|
||||
}
|
||||
|
||||
fn as_log(self: Box<Self>) -> Box<dyn Log> {
|
||||
Box::new(*self)
|
||||
}
|
||||
}
|
||||
|
||||
fn main() {
|
||||
let log = || {
|
||||
FileRotate::new(
|
||||
"logs/ffplayout.log",
|
||||
AppendCount::new(7),
|
||||
ContentLimit::Lines(1000),
|
||||
Compression::None,
|
||||
)
|
||||
};
|
||||
|
||||
let def_config = simplelog::ConfigBuilder::new()
|
||||
.set_target_level(LevelFilter::Off)
|
||||
.set_thread_level(LevelFilter::Off)
|
||||
.set_level_padding(LevelPadding::Left)
|
||||
.set_time_to_local(true)
|
||||
.clone();
|
||||
|
||||
let term_config = def_config
|
||||
.clone()
|
||||
.set_level_color(Level::Debug, Some(Color::Ansi256(12)))
|
||||
.set_level_color(Level::Info, Some(Color::Ansi256(10)))
|
||||
.set_level_color(Level::Warn, Some(Color::Ansi256(208)))
|
||||
.set_level_color(Level::Error, Some(Color::Ansi256(9)))
|
||||
.set_time_format_str("\x1b[30;1m[%Y-%m-%d %H:%M:%S%.3f]\x1b[0m")
|
||||
.build();
|
||||
|
||||
let file_config = def_config
|
||||
.clone()
|
||||
.set_time_format_str("[%Y-%m-%d %H:%M:%S%.3f]")
|
||||
.build();
|
||||
|
||||
let mail_config = def_config
|
||||
.clone()
|
||||
.set_time_format_str("[%Y-%m-%d %H:%M:%S%.3f]")
|
||||
.build();
|
||||
|
||||
CombinedLogger::init(vec![
|
||||
TermLogger::new(
|
||||
LevelFilter::Debug,
|
||||
term_config,
|
||||
TerminalMode::Stderr,
|
||||
ColorChoice::Auto,
|
||||
),
|
||||
WriteLogger::new(LevelFilter::Debug, file_config, log()),
|
||||
LogMailer::new(LevelFilter::Info, mail_config),
|
||||
])
|
||||
.unwrap();
|
||||
|
||||
debug!("this is a <b>debug</> message");
|
||||
info!("this is a info message");
|
||||
warn!("this is a warning message");
|
||||
error!("this is a error message");
|
||||
|
||||
for idx in 1..10 {
|
||||
info!("{idx}");
|
||||
sleep(Duration::from_secs(2));
|
||||
}
|
||||
}
|
@ -1,320 +0,0 @@
|
||||
use std::{
|
||||
io::{prelude::*, BufReader, Error, Read},
|
||||
process::{ChildStderr, Command, Stdio},
|
||||
sync::{
|
||||
mpsc::{channel, Receiver, Sender},
|
||||
Arc, Mutex,
|
||||
},
|
||||
thread::sleep,
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use process_control::{ChildExt, Terminator};
|
||||
use tokio::runtime::{Handle, Runtime};
|
||||
|
||||
async fn ingest_server(
|
||||
dec_setting: Vec<&str>,
|
||||
ingest_sender: Sender<[u8; 65424]>,
|
||||
proc_terminator: Arc<Mutex<Option<Terminator>>>,
|
||||
is_terminated: Arc<Mutex<bool>>,
|
||||
rt_handle: Handle,
|
||||
) -> Result<(), Error> {
|
||||
let mut buffer: [u8; 65424] = [0; 65424];
|
||||
let filter = "[0:v]fps=25,scale=1024:576,setdar=dar=1.778[vout1]";
|
||||
let mut filter_list = vec!["-filter_complex", &filter, "-map", "[vout1]", "-map", "0:a"];
|
||||
let mut server_cmd = vec!["-hide_banner", "-nostats", "-v", "level+error"];
|
||||
let mut stream_input = vec![
|
||||
"-f",
|
||||
"live_flv",
|
||||
"-listen",
|
||||
"1",
|
||||
"-i",
|
||||
"rtmp://localhost:1936/live/stream",
|
||||
];
|
||||
|
||||
server_cmd.append(&mut stream_input);
|
||||
server_cmd.append(&mut filter_list);
|
||||
server_cmd.append(&mut dec_setting.clone());
|
||||
|
||||
loop {
|
||||
if *is_terminated.lock().unwrap() {
|
||||
break;
|
||||
}
|
||||
|
||||
let mut server_proc = match Command::new("ffmpeg")
|
||||
.args(server_cmd.clone())
|
||||
.stdout(Stdio::piped())
|
||||
.stderr(Stdio::piped())
|
||||
.spawn()
|
||||
{
|
||||
Err(e) => {
|
||||
panic!("couldn't spawn ingest server: {}", e)
|
||||
}
|
||||
Ok(proc) => proc,
|
||||
};
|
||||
|
||||
let serv_terminator = server_proc.terminator()?;
|
||||
*proc_terminator.lock().unwrap() = Some(serv_terminator);
|
||||
|
||||
rt_handle.spawn(stderr_reader(
|
||||
server_proc.stderr.take().unwrap(),
|
||||
"Server".to_string(),
|
||||
proc_terminator.clone(),
|
||||
is_terminated.clone(),
|
||||
|
||||
));
|
||||
|
||||
let ingest_reader = server_proc.stdout.as_mut().unwrap();
|
||||
|
||||
loop {
|
||||
match ingest_reader.read_exact(&mut buffer[..]) {
|
||||
Ok(length) => length,
|
||||
Err(_) => break,
|
||||
};
|
||||
|
||||
if let Err(e) = ingest_sender.send(buffer) {
|
||||
println!("Ingest server error: {:?}", e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
sleep(Duration::from_secs(1));
|
||||
|
||||
if let Err(e) = server_proc.wait() {
|
||||
panic!("Server error: {:?}", e)
|
||||
};
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn stderr_reader(
|
||||
std_errors: ChildStderr,
|
||||
suffix: String,
|
||||
server_term: Arc<Mutex<Option<Terminator>>>,
|
||||
is_terminated: Arc<Mutex<bool>>,
|
||||
) -> Result<(), Error> {
|
||||
// read ffmpeg stderr decoder and encoder instance
|
||||
// and log the output
|
||||
|
||||
fn format_line(line: String, level: String) -> String {
|
||||
line.replace(&format!("[{}] ", level), "")
|
||||
}
|
||||
|
||||
let buffer = BufReader::new(std_errors);
|
||||
|
||||
for line in buffer.lines() {
|
||||
let line = line?;
|
||||
|
||||
if line.contains("[info]") {
|
||||
println!("[{suffix}] {}", format_line(line, "info".to_string()))
|
||||
} else if line.contains("[warning]") {
|
||||
println!("[{suffix}] {}", format_line(line, "warning".to_string()))
|
||||
} else {
|
||||
if suffix != "server" && !line.contains("Input/output error") {
|
||||
println!(
|
||||
"[{suffix}] {}",
|
||||
format_line(line.clone(), "level+error".to_string())
|
||||
);
|
||||
}
|
||||
|
||||
if line.contains("Error closing file pipe:: Broken pipe") {
|
||||
*is_terminated.lock().unwrap() = true;
|
||||
|
||||
match &*server_term.lock().unwrap() {
|
||||
Some(serv) => unsafe {
|
||||
if let Ok(_) = serv.terminate() {
|
||||
println!("Terminate server done");
|
||||
}
|
||||
},
|
||||
None => (),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn main() {
|
||||
let decoder_term: Arc<Mutex<Option<Terminator>>> = Arc::new(Mutex::new(None));
|
||||
let player_term: Arc<Mutex<Option<Terminator>>> = Arc::new(Mutex::new(None));
|
||||
let server_term: Arc<Mutex<Option<Terminator>>> = Arc::new(Mutex::new(None));
|
||||
let is_terminated: Arc<Mutex<bool>> = Arc::new(Mutex::new(false));
|
||||
|
||||
let runtime = Runtime::new().unwrap();
|
||||
let rt_handle = runtime.handle();
|
||||
|
||||
let dec_setting: Vec<&str> = vec![
|
||||
"-pix_fmt",
|
||||
"yuv420p",
|
||||
"-c:v",
|
||||
"mpeg2video",
|
||||
"-g",
|
||||
"1",
|
||||
"-b:v",
|
||||
"50000k",
|
||||
"-minrate",
|
||||
"50000k",
|
||||
"-maxrate",
|
||||
"50000k",
|
||||
"-bufsize",
|
||||
"25000k",
|
||||
"-c:a",
|
||||
"s302m",
|
||||
"-strict",
|
||||
"-2",
|
||||
"-ar",
|
||||
"48000",
|
||||
"-ac",
|
||||
"2",
|
||||
"-f",
|
||||
"mpegts",
|
||||
"-",
|
||||
];
|
||||
|
||||
let mut player_proc = match Command::new("ffplay")
|
||||
.args([
|
||||
"-v",
|
||||
"level+error",
|
||||
"-hide_banner",
|
||||
"-nostats",
|
||||
"-i",
|
||||
"pipe:0",
|
||||
])
|
||||
.stdin(Stdio::piped())
|
||||
.stderr(Stdio::piped())
|
||||
.spawn()
|
||||
{
|
||||
Err(e) => panic!("couldn't spawn ffplay: {}", e),
|
||||
Ok(proc) => proc,
|
||||
};
|
||||
|
||||
rt_handle.spawn(stderr_reader(
|
||||
player_proc.stderr.take().unwrap(),
|
||||
"Player".to_string(),
|
||||
server_term.clone(),
|
||||
is_terminated.clone(),
|
||||
));
|
||||
|
||||
let player_terminator = match player_proc.terminator() {
|
||||
Ok(proc) => Some(proc),
|
||||
Err(_) => None,
|
||||
};
|
||||
*player_term.lock().unwrap() = player_terminator;
|
||||
|
||||
let (ingest_sender, ingest_receiver): (Sender<[u8; 65424]>, Receiver<[u8; 65424]>) = channel();
|
||||
|
||||
rt_handle.spawn(ingest_server(
|
||||
dec_setting.clone(),
|
||||
ingest_sender,
|
||||
server_term.clone(),
|
||||
is_terminated.clone(),
|
||||
rt_handle.clone(),
|
||||
));
|
||||
|
||||
let mut buffer: [u8; 65424] = [0; 65424];
|
||||
let mut dec_cmd = vec![
|
||||
"-v",
|
||||
"level+error",
|
||||
"-hide_banner",
|
||||
"-nostats",
|
||||
"-f",
|
||||
"lavfi",
|
||||
"-i",
|
||||
"testsrc=duration=20:size=1024x576:rate=25",
|
||||
"-f",
|
||||
"lavfi",
|
||||
"-i",
|
||||
"anoisesrc=d=20:c=pink:r=48000:a=0.5",
|
||||
];
|
||||
|
||||
dec_cmd.append(&mut dec_setting.clone());
|
||||
|
||||
let mut dec_proc = match Command::new("ffmpeg")
|
||||
.args(dec_cmd)
|
||||
.stdout(Stdio::piped())
|
||||
.stderr(Stdio::piped())
|
||||
.spawn()
|
||||
{
|
||||
Err(e) => panic!("couldn't spawn ffmpeg: {}", e),
|
||||
Ok(proc) => proc,
|
||||
};
|
||||
|
||||
rt_handle.spawn(stderr_reader(
|
||||
dec_proc.stderr.take().unwrap(),
|
||||
"Decoder".to_string(),
|
||||
server_term.clone(),
|
||||
is_terminated.clone(),
|
||||
));
|
||||
|
||||
let dec_terminator = match dec_proc.terminator() {
|
||||
Ok(proc) => Some(proc),
|
||||
Err(_) => None,
|
||||
};
|
||||
*decoder_term.lock().unwrap() = dec_terminator;
|
||||
|
||||
let mut player_writer = player_proc.stdin.as_ref().unwrap();
|
||||
let dec_reader = dec_proc.stdout.as_mut().unwrap();
|
||||
|
||||
loop {
|
||||
let bytes_len = match dec_reader.read(&mut buffer[..]) {
|
||||
Ok(length) => length,
|
||||
Err(e) => panic!("Reading error from decoder: {:?}", e),
|
||||
};
|
||||
|
||||
if let Ok(receive) = ingest_receiver.try_recv() {
|
||||
if let Err(e) = player_writer.write_all(&receive) {
|
||||
panic!("Err: {:?}", e)
|
||||
};
|
||||
continue;
|
||||
}
|
||||
|
||||
if let Err(e) = player_writer.write(&buffer[..bytes_len]) {
|
||||
panic!("Err: {:?}", e)
|
||||
};
|
||||
|
||||
if bytes_len == 0 {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
*is_terminated.lock().unwrap() = true;
|
||||
|
||||
sleep(Duration::from_secs(1));
|
||||
|
||||
println!("Terminate decoder...");
|
||||
|
||||
match &*decoder_term.lock().unwrap() {
|
||||
Some(dec) => unsafe {
|
||||
if let Ok(_) = dec.terminate() {
|
||||
println!("Terminate decoder done");
|
||||
}
|
||||
},
|
||||
None => (),
|
||||
}
|
||||
|
||||
println!("Terminate encoder...");
|
||||
|
||||
match &*player_term.lock().unwrap() {
|
||||
Some(enc) => unsafe {
|
||||
if let Ok(_) = enc.terminate() {
|
||||
println!("Terminate encoder done");
|
||||
}
|
||||
},
|
||||
None => (),
|
||||
}
|
||||
|
||||
println!("Terminate server...");
|
||||
|
||||
match &*server_term.lock().unwrap() {
|
||||
Some(serv) => unsafe {
|
||||
if let Ok(_) = serv.terminate() {
|
||||
println!("Terminate server done");
|
||||
}
|
||||
},
|
||||
None => (),
|
||||
}
|
||||
|
||||
println!("Terminate done...");
|
||||
}
|
@ -1,262 +0,0 @@
|
||||
use std::{
|
||||
io::{prelude::*, BufReader, Error, Read},
|
||||
process::{Command, Stdio},
|
||||
sync::{
|
||||
mpsc::{sync_channel, Receiver, SyncSender},
|
||||
Arc, Mutex,
|
||||
},
|
||||
thread::sleep,
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use process_control::{ChildExt, Terminator};
|
||||
use tokio::runtime::Runtime;
|
||||
|
||||
async fn ingest_server(
|
||||
dec_setting: Vec<&str>,
|
||||
ingest_sender: SyncSender<(usize, [u8; 65088])>,
|
||||
proc_terminator: Arc<Mutex<Option<Terminator>>>,
|
||||
is_terminated: Arc<Mutex<bool>>,
|
||||
server_is_running: Arc<Mutex<bool>>,
|
||||
) -> Result<(), Error> {
|
||||
let mut buffer: [u8; 65088] = [0; 65088];
|
||||
let filter = "[0:v]fps=25,scale=1024:576,setdar=dar=1.778[vout1]";
|
||||
let mut filter_list = vec!["-filter_complex", &filter, "-map", "[vout1]", "-map", "0:a"];
|
||||
let mut server_cmd = vec!["-hide_banner", "-nostats", "-v", "error"];
|
||||
|
||||
let mut stream_input = vec![
|
||||
"-f",
|
||||
"live_flv",
|
||||
"-listen",
|
||||
"1",
|
||||
"-i",
|
||||
"rtmp://localhost:1936/live/stream",
|
||||
];
|
||||
|
||||
server_cmd.append(&mut stream_input);
|
||||
server_cmd.append(&mut filter_list);
|
||||
server_cmd.append(&mut dec_setting.clone());
|
||||
|
||||
let mut is_running;
|
||||
|
||||
loop {
|
||||
if *is_terminated.lock().unwrap() {
|
||||
break;
|
||||
}
|
||||
|
||||
let mut server_proc = match Command::new("ffmpeg")
|
||||
.args(server_cmd.clone())
|
||||
.stdout(Stdio::piped())
|
||||
.spawn()
|
||||
{
|
||||
Err(e) => {
|
||||
panic!("couldn't spawn ingest server: {}", e)
|
||||
}
|
||||
Ok(proc) => proc,
|
||||
};
|
||||
|
||||
let serv_terminator = server_proc.terminator()?;
|
||||
*proc_terminator.lock().unwrap() = Some(serv_terminator);
|
||||
let ingest_reader = server_proc.stdout.as_mut().unwrap();
|
||||
is_running = false;
|
||||
|
||||
loop {
|
||||
let bytes_len = match ingest_reader.read(&mut buffer[..]) {
|
||||
Ok(length) => length,
|
||||
Err(e) => {
|
||||
println!("Reading error from ingest server: {:?}", e);
|
||||
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
if !is_running {
|
||||
*server_is_running.lock().unwrap() = true;
|
||||
is_running = true;
|
||||
}
|
||||
|
||||
if bytes_len > 0 {
|
||||
if let Err(e) = ingest_sender.send((bytes_len, buffer)) {
|
||||
println!("Ingest server write error: {:?}", e);
|
||||
|
||||
*is_terminated.lock().unwrap() = true;
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
*server_is_running.lock().unwrap() = false;
|
||||
|
||||
sleep(Duration::from_secs(1));
|
||||
|
||||
if let Err(e) = server_proc.kill() {
|
||||
print!("Ingest server {:?}", e)
|
||||
};
|
||||
|
||||
if let Err(e) = server_proc.wait() {
|
||||
panic!("Decoder error: {:?}", e)
|
||||
};
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
fn main() {
|
||||
let server_term: Arc<Mutex<Option<Terminator>>> = Arc::new(Mutex::new(None));
|
||||
let is_terminated: Arc<Mutex<bool>> = Arc::new(Mutex::new(false));
|
||||
let server_is_running: Arc<Mutex<bool>> = Arc::new(Mutex::new(false));
|
||||
|
||||
let dec_setting: Vec<&str> = vec![
|
||||
"-pix_fmt",
|
||||
"yuv420p",
|
||||
"-c:v",
|
||||
"mpeg2video",
|
||||
"-g",
|
||||
"1",
|
||||
"-b:v",
|
||||
"50000k",
|
||||
"-minrate",
|
||||
"50000k",
|
||||
"-maxrate",
|
||||
"50000k",
|
||||
"-bufsize",
|
||||
"25000k",
|
||||
"-c:a",
|
||||
"s302m",
|
||||
"-strict",
|
||||
"-2",
|
||||
"-ar",
|
||||
"48000",
|
||||
"-ac",
|
||||
"2",
|
||||
"-f",
|
||||
"mpegts",
|
||||
"-",
|
||||
];
|
||||
|
||||
let mut player_proc = match Command::new("ffplay")
|
||||
.args(["-v", "error", "-hide_banner", "-nostats", "-i", "pipe:0"])
|
||||
.stdin(Stdio::piped())
|
||||
.spawn()
|
||||
{
|
||||
Err(e) => panic!("couldn't spawn ffplay: {}", e),
|
||||
Ok(proc) => proc,
|
||||
};
|
||||
|
||||
let (ingest_sender, ingest_receiver): (
|
||||
SyncSender<(usize, [u8; 65088])>,
|
||||
Receiver<(usize, [u8; 65088])>,
|
||||
) = sync_channel(1);
|
||||
let runtime = Runtime::new().unwrap();
|
||||
|
||||
runtime.spawn(ingest_server(
|
||||
dec_setting.clone(),
|
||||
ingest_sender,
|
||||
server_term.clone(),
|
||||
is_terminated.clone(),
|
||||
server_is_running.clone(),
|
||||
));
|
||||
|
||||
let mut buffer: [u8; 65088] = [0; 65088];
|
||||
|
||||
let mut dec_cmd = vec![
|
||||
"-v",
|
||||
"error",
|
||||
"-hide_banner",
|
||||
"-nostats",
|
||||
"-f",
|
||||
"lavfi",
|
||||
"-i",
|
||||
"testsrc=duration=120:size=1024x576:rate=25",
|
||||
"-f",
|
||||
"lavfi",
|
||||
"-i",
|
||||
"anoisesrc=d=120:c=pink:r=48000:a=0.5",
|
||||
];
|
||||
|
||||
dec_cmd.append(&mut dec_setting.clone());
|
||||
|
||||
let mut dec_proc = match Command::new("ffmpeg")
|
||||
.args(dec_cmd)
|
||||
.stdout(Stdio::piped())
|
||||
.spawn()
|
||||
{
|
||||
Err(e) => panic!("couldn't spawn ffmpeg: {}", e),
|
||||
Ok(proc) => proc,
|
||||
};
|
||||
|
||||
let mut player_writer = player_proc.stdin.as_ref().unwrap();
|
||||
let mut dec_reader = BufReader::new(dec_proc.stdout.take().unwrap());
|
||||
|
||||
let mut live_on = false;
|
||||
|
||||
let mut count = 0;
|
||||
|
||||
loop {
|
||||
count += 1;
|
||||
|
||||
if *server_is_running.lock().unwrap() {
|
||||
if let Ok(receive) = ingest_receiver.try_recv() {
|
||||
if let Err(e) = player_writer.write(&receive.1[..receive.0]) {
|
||||
println!("Ingest receiver error: {:?}", e);
|
||||
|
||||
break;
|
||||
};
|
||||
}
|
||||
|
||||
if !live_on {
|
||||
println!("Switch from offline source to live");
|
||||
|
||||
live_on = true;
|
||||
}
|
||||
} else {
|
||||
println!("{count}");
|
||||
let dec_bytes_len = match dec_reader.read(&mut buffer[..]) {
|
||||
Ok(length) => length,
|
||||
Err(e) => {
|
||||
println!("Reading error from decoder: {:?}", e);
|
||||
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
if dec_bytes_len > 0 {
|
||||
if let Err(e) = player_writer.write(&buffer[..dec_bytes_len]) {
|
||||
println!("Encoder write error: {:?}", e);
|
||||
|
||||
break;
|
||||
};
|
||||
} else {
|
||||
if live_on {
|
||||
println!("Switch from live ingest to offline source");
|
||||
|
||||
live_on = false;
|
||||
}
|
||||
|
||||
player_writer.flush().unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
*is_terminated.lock().unwrap() = true;
|
||||
|
||||
if let Some(server) = &*server_term.lock().unwrap() {
|
||||
unsafe {
|
||||
if let Ok(_) = server.terminate() {
|
||||
println!("Terminate ingest server done");
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
sleep(Duration::from_secs(1));
|
||||
|
||||
match player_proc.kill() {
|
||||
Ok(_) => println!("Playout done..."),
|
||||
Err(e) => panic!("Encoder error: {:?}", e),
|
||||
}
|
||||
|
||||
if let Err(e) = player_proc.wait() {
|
||||
println!("Encoder: {e}")
|
||||
};
|
||||
}
|
@ -1,141 +0,0 @@
|
||||
use notify::DebouncedEvent::{Create, Remove, Rename};
|
||||
use notify::{watcher, RecursiveMode, Watcher};
|
||||
use std::{
|
||||
ffi::OsStr,
|
||||
path::Path,
|
||||
sync::{
|
||||
mpsc::{channel, Receiver},
|
||||
{Arc, Mutex},
|
||||
},
|
||||
thread::sleep,
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use walkdir::WalkDir;
|
||||
|
||||
use tokio::runtime::Builder;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Source {
|
||||
nodes: Arc<Mutex<Vec<String>>>,
|
||||
index: usize,
|
||||
}
|
||||
|
||||
impl Source {
|
||||
pub fn new(path: String) -> Self {
|
||||
let mut file_list = vec![];
|
||||
|
||||
for entry in WalkDir::new(path.clone())
|
||||
.into_iter()
|
||||
.filter_map(|e| e.ok())
|
||||
{
|
||||
if entry.path().is_file() {
|
||||
let ext = file_extension(entry.path());
|
||||
|
||||
if ext.is_some()
|
||||
&& ["mp4".to_string(), "mkv".to_string()]
|
||||
.clone()
|
||||
.contains(&ext.unwrap().to_lowercase())
|
||||
{
|
||||
file_list.push(entry.path().display().to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Self {
|
||||
nodes: Arc::new(Mutex::new(file_list)),
|
||||
index: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Iterator for Source {
|
||||
type Item = String;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
if self.index < self.nodes.lock().unwrap().len() {
|
||||
let current_file = self.nodes.lock().unwrap()[self.index].clone();
|
||||
self.index += 1;
|
||||
|
||||
Some(current_file)
|
||||
} else {
|
||||
let current_file = self.nodes.lock().unwrap()[0].clone();
|
||||
|
||||
self.index = 1;
|
||||
|
||||
Some(current_file)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn watch(receiver: Receiver<notify::DebouncedEvent>, sources: Arc<Mutex<Vec<String>>>) {
|
||||
while let Ok(res) = receiver.recv() {
|
||||
match res {
|
||||
Create(new_path) => {
|
||||
sources.lock().unwrap().push(new_path.display().to_string());
|
||||
println!("Create new file: {:?}", new_path);
|
||||
}
|
||||
Remove(old_path) => {
|
||||
sources
|
||||
.lock()
|
||||
.unwrap()
|
||||
.retain(|x| x != &old_path.display().to_string());
|
||||
println!("Remove file: {:?}", old_path);
|
||||
}
|
||||
Rename(old_path, new_path) => {
|
||||
let i = sources
|
||||
.lock()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.position(|x| *x == old_path.display().to_string())
|
||||
.unwrap();
|
||||
sources.lock().unwrap()[i] = new_path.display().to_string();
|
||||
println!("Rename file: {:?} to {:?}", old_path, new_path);
|
||||
}
|
||||
_ => (),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn file_extension(filename: &Path) -> Option<&str> {
|
||||
filename.extension().and_then(OsStr::to_str)
|
||||
}
|
||||
|
||||
fn main() {
|
||||
let path = "/home/jb/Videos/tv-media/ADtv/01 - Intro".to_string();
|
||||
let sources = Source::new(path.clone());
|
||||
|
||||
let (sender, receiver) = channel();
|
||||
let runtime = Builder::new_multi_thread()
|
||||
.worker_threads(1)
|
||||
.thread_name("file_watcher")
|
||||
.enable_all()
|
||||
.build()
|
||||
.expect("Creating Tokio runtime");
|
||||
|
||||
let mut watcher = watcher(sender, Duration::from_secs(2)).unwrap();
|
||||
|
||||
watcher
|
||||
.watch(path.clone(), RecursiveMode::Recursive)
|
||||
.unwrap();
|
||||
|
||||
runtime.spawn(watch(
|
||||
receiver,
|
||||
Arc::clone(&sources.nodes),
|
||||
));
|
||||
|
||||
let mut count = 0;
|
||||
|
||||
for node in sources {
|
||||
println!("task: {:?}", node);
|
||||
sleep(Duration::from_secs(1));
|
||||
|
||||
count += 1;
|
||||
|
||||
if count == 5 {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
println!("after loop");
|
||||
}
|
@ -1,79 +0,0 @@
|
||||
use notify::DebouncedEvent::{Create, Remove, Rename};
|
||||
use notify::{watcher, RecursiveMode, Watcher};
|
||||
use std::{
|
||||
sync::{
|
||||
mpsc::{channel, Receiver},
|
||||
{Arc, Mutex},
|
||||
},
|
||||
thread::sleep,
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use tokio::runtime::Builder;
|
||||
|
||||
async fn watch(receiver: Receiver<notify::DebouncedEvent>, stop: Arc<Mutex<bool>>) {
|
||||
loop {
|
||||
if *stop.lock().unwrap() {
|
||||
break;
|
||||
}
|
||||
|
||||
match receiver.recv() {
|
||||
Ok(event) => match event {
|
||||
Create(new_path) => {
|
||||
println!("Create new file: {:?}", new_path);
|
||||
}
|
||||
Remove(old_path) => {
|
||||
println!("Remove file: {:?}", old_path);
|
||||
}
|
||||
Rename(old_path, new_path) => {
|
||||
println!("Rename file: {:?} to {:?}", old_path, new_path);
|
||||
}
|
||||
_ => (),
|
||||
},
|
||||
Err(e) => {
|
||||
println!("{:?}", e);
|
||||
}
|
||||
}
|
||||
|
||||
sleep(Duration::from_secs(1));
|
||||
}
|
||||
}
|
||||
|
||||
fn main() {
|
||||
let path = "/home/jb/Videos/tv-media/ADtv/01 - Intro".to_string();
|
||||
let stop = Arc::new(Mutex::new(false));
|
||||
|
||||
let (sender, receiver) = channel();
|
||||
let mut watcher = watcher(sender, Duration::from_secs(2)).unwrap();
|
||||
watcher.watch(path.clone(), RecursiveMode::Recursive).unwrap();
|
||||
|
||||
let runtime = Builder::new_multi_thread()
|
||||
.worker_threads(1)
|
||||
.thread_name("file_watcher")
|
||||
.enable_all()
|
||||
.build()
|
||||
.expect("Creating Tokio runtime");
|
||||
|
||||
if true {
|
||||
runtime.spawn(watch(receiver, Arc::clone(&stop)));
|
||||
}
|
||||
|
||||
let mut count = 0;
|
||||
|
||||
loop {
|
||||
println!("task: {count}");
|
||||
sleep(Duration::from_secs(1));
|
||||
|
||||
count += 1;
|
||||
|
||||
if count == 5 {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
*stop.lock().unwrap() = true;
|
||||
|
||||
watcher.unwatch(path).unwrap();
|
||||
|
||||
println!("after loop");
|
||||
}
|
@ -84,7 +84,7 @@ pub async fn ingest_server(
|
||||
];
|
||||
|
||||
let mut server_cmd = vec!["-hide_banner", "-nostats", "-v", log_format.as_str()];
|
||||
let stream_input = config.ingest.stream_input.clone();
|
||||
let stream_input = config.ingest.input_cmd.clone().unwrap();
|
||||
let stream_settings = config.processing.settings.clone().unwrap();
|
||||
|
||||
server_cmd.append(&mut stream_input.iter().map(String::as_str).collect());
|
||||
@ -98,7 +98,7 @@ pub async fn ingest_server(
|
||||
stream_input.last().unwrap()
|
||||
);
|
||||
|
||||
debug!("Server CMD: <bright-blue>{:?}</>", server_cmd);
|
||||
debug!("Server CMD: <bright-blue>\"ffmpeg {}\"</>", server_cmd.join(" "));
|
||||
|
||||
loop {
|
||||
if *is_terminated.lock().unwrap() {
|
||||
|
@ -295,8 +295,8 @@ fn timed_source(node: Media, config: &GlobalConfig, last: bool) -> Media {
|
||||
new_node.process = Some(false);
|
||||
|
||||
if config.playlist.length.contains(":") {
|
||||
debug!("Delta: <yellow>{delta}</>");
|
||||
debug!("Total delta: <yellow>{total_delta}</>");
|
||||
debug!("Delta: <yellow>{delta:.3}</>");
|
||||
debug!("Total delta: <yellow>{total_delta:.3}</>");
|
||||
let sync = check_sync(delta);
|
||||
|
||||
if !sync {
|
||||
|
@ -5,8 +5,8 @@ use std::{
|
||||
|
||||
use simplelog::*;
|
||||
|
||||
use crate::utils::{GlobalConfig, Media};
|
||||
use crate::filter::v_drawtext;
|
||||
use crate::utils::{GlobalConfig, Media};
|
||||
|
||||
pub fn output(log_format: String) -> process::Child {
|
||||
let config = GlobalConfig::global();
|
||||
@ -35,7 +35,7 @@ pub fn output(log_format: String) -> process::Child {
|
||||
|
||||
enc_cmd.append(&mut enc_filter.iter().map(String::as_str).collect());
|
||||
|
||||
debug!("Encoder CMD: <bright-blue>{:?}</>", enc_cmd);
|
||||
debug!("Encoder CMD: <bright-blue>\"ffplay {}\"</>", enc_cmd.join(" "));
|
||||
|
||||
let enc_proc = match Command::new("ffplay")
|
||||
.args(enc_cmd)
|
||||
|
@ -1,6 +1,6 @@
|
||||
use notify::{watcher, RecursiveMode, Watcher};
|
||||
use std::{
|
||||
io::{prelude::*, BufReader, Read},
|
||||
io::{prelude::*, BufReader, BufWriter, Read},
|
||||
path::Path,
|
||||
process,
|
||||
process::{Command, Stdio},
|
||||
@ -77,6 +77,8 @@ pub fn play(rt_handle: &Handle) {
|
||||
_ => panic!("Output mode doesn't exists!"),
|
||||
};
|
||||
|
||||
let mut enc_writer = BufWriter::new(enc_proc.stdin.take().unwrap());
|
||||
|
||||
rt_handle.spawn(stderr_reader(
|
||||
enc_proc.stderr.take().unwrap(),
|
||||
"Encoder".to_string(),
|
||||
@ -85,7 +87,7 @@ pub fn play(rt_handle: &Handle) {
|
||||
let (ingest_sender, ingest_receiver): (
|
||||
SyncSender<(usize, [u8; 65088])>,
|
||||
Receiver<(usize, [u8; 65088])>,
|
||||
) = sync_channel(4);
|
||||
) = sync_channel(8);
|
||||
|
||||
if config.ingest.enable {
|
||||
rt_handle.spawn(ingest_server(
|
||||
@ -114,9 +116,9 @@ pub fn play(rt_handle: &Handle) {
|
||||
node.source
|
||||
);
|
||||
|
||||
let mut kill_dec = true;
|
||||
let filter = node.filter.unwrap();
|
||||
let mut dec_cmd = vec!["-hide_banner", "-nostats", "-v", ff_log_format.as_str()];
|
||||
|
||||
dec_cmd.append(&mut cmd.iter().map(String::as_str).collect());
|
||||
|
||||
if filter.len() > 1 {
|
||||
@ -124,7 +126,8 @@ pub fn play(rt_handle: &Handle) {
|
||||
}
|
||||
|
||||
dec_cmd.append(&mut dec_settings.iter().map(String::as_str).collect());
|
||||
debug!("Decoder CMD: <bright-blue>{:?}</>", dec_cmd);
|
||||
|
||||
debug!("Decoder CMD: <bright-blue>\"ffmpeg {}\"</>", dec_cmd.join(" "));
|
||||
|
||||
let mut dec_proc = match Command::new("ffmpeg")
|
||||
.args(dec_cmd)
|
||||
@ -139,7 +142,6 @@ pub fn play(rt_handle: &Handle) {
|
||||
Ok(proc) => proc,
|
||||
};
|
||||
|
||||
let mut enc_writer = enc_proc.stdin.as_ref().unwrap();
|
||||
let mut dec_reader = BufReader::new(dec_proc.stdout.take().unwrap());
|
||||
|
||||
rt_handle.spawn(stderr_reader(
|
||||
@ -147,23 +149,15 @@ pub fn play(rt_handle: &Handle) {
|
||||
"Decoder".to_string(),
|
||||
));
|
||||
|
||||
let mut kill_dec = true;
|
||||
|
||||
loop {
|
||||
if *server_is_running.lock().unwrap() {
|
||||
if let Ok(receive) = ingest_receiver.try_recv() {
|
||||
if let Err(e) = enc_writer.write(&receive.1[..receive.0]) {
|
||||
error!("Ingest receiver error: {:?}", e);
|
||||
|
||||
break 'source_iter;
|
||||
};
|
||||
}
|
||||
|
||||
live_on = true;
|
||||
|
||||
if kill_dec {
|
||||
info!("Switch from {} to live ingest", config.processing.mode);
|
||||
|
||||
if let Err(e) = enc_writer.flush() {
|
||||
error!("Encoder error: {e}")
|
||||
}
|
||||
|
||||
if let Err(e) = dec_proc.kill() {
|
||||
error!("Decoder error: {e}")
|
||||
};
|
||||
@ -173,12 +167,31 @@ pub fn play(rt_handle: &Handle) {
|
||||
};
|
||||
|
||||
kill_dec = false;
|
||||
live_on = true;
|
||||
|
||||
if let Some(init) = &init_playlist {
|
||||
*init.lock().unwrap() = true;
|
||||
}
|
||||
}
|
||||
|
||||
if let Ok(receive) = ingest_receiver.try_recv() {
|
||||
if let Err(e) = enc_writer.write(&receive.1[..receive.0]) {
|
||||
error!("Ingest receiver error: {:?}", e);
|
||||
|
||||
break 'source_iter;
|
||||
};
|
||||
}
|
||||
} else {
|
||||
if live_on {
|
||||
info!("Switch from live ingest to {}", config.processing.mode);
|
||||
|
||||
if let Err(e) = enc_writer.flush() {
|
||||
error!("Encoder error: {e}")
|
||||
}
|
||||
|
||||
live_on = false;
|
||||
}
|
||||
|
||||
let dec_bytes_len = match dec_reader.read(&mut buffer[..]) {
|
||||
Ok(length) => length,
|
||||
Err(e) => {
|
||||
@ -195,14 +208,6 @@ pub fn play(rt_handle: &Handle) {
|
||||
break 'source_iter;
|
||||
};
|
||||
} else {
|
||||
if live_on {
|
||||
info!("Switch from live ingest to {}", config.processing.mode);
|
||||
|
||||
live_on = false;
|
||||
}
|
||||
|
||||
enc_writer.flush().unwrap();
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -5,13 +5,15 @@ use std::{
|
||||
|
||||
use simplelog::*;
|
||||
|
||||
use crate::utils::{GlobalConfig, Media};
|
||||
use crate::filter::v_drawtext;
|
||||
use crate::utils::{GlobalConfig, Media};
|
||||
|
||||
pub fn output(log_format: String) -> process::Child {
|
||||
let config = GlobalConfig::global();
|
||||
let mut enc_filter: Vec<String> = vec![];
|
||||
let mut preview: Vec<&str> = vec![];
|
||||
let preview_cmd = config.out.preview_cmd.as_ref().unwrap().clone();
|
||||
let output_cmd = config.out.output_cmd.as_ref().unwrap().clone();
|
||||
|
||||
let mut enc_cmd = vec![
|
||||
"-hide_banner",
|
||||
@ -36,20 +38,20 @@ pub fn output(log_format: String) -> process::Child {
|
||||
filter.push_str(",split=2[v_out1][v_out2]");
|
||||
|
||||
preview = vec!["-map", "[v_out1]", "-map", "0:a"];
|
||||
preview.append(&mut config.out.preview_param.iter().map(String::as_str).collect());
|
||||
preview.append(&mut preview_cmd.iter().map(String::as_str).collect());
|
||||
preview.append(&mut vec!["-map", "[v_out2]", "-map", "0:a"]);
|
||||
}
|
||||
|
||||
enc_filter = vec!["-filter_complex".to_string(), filter];
|
||||
} else if config.out.preview {
|
||||
preview = config.out.preview_param.iter().map(String::as_str).collect()
|
||||
preview = preview_cmd.iter().map(String::as_str).collect()
|
||||
}
|
||||
|
||||
enc_cmd.append(&mut enc_filter.iter().map(String::as_str).collect());
|
||||
enc_cmd.append(&mut preview);
|
||||
enc_cmd.append(&mut config.out.stream_param.iter().map(String::as_str).collect());
|
||||
enc_cmd.append(&mut output_cmd.iter().map(String::as_str).collect());
|
||||
|
||||
debug!("Encoder CMD: <bright-blue>{:?}</>", enc_cmd);
|
||||
debug!("Encoder CMD: <bright-blue>\"ffmpeg {}\"</>", enc_cmd.join(" "));
|
||||
|
||||
let enc_proc = match Command::new("ffmpeg")
|
||||
.args(enc_cmd)
|
||||
|
@ -8,6 +8,8 @@ use std::{
|
||||
process,
|
||||
};
|
||||
|
||||
use shlex::split;
|
||||
|
||||
use crate::utils::{get_args, time_to_sec};
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
@ -73,7 +75,8 @@ pub struct Processing {
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
pub struct Ingest {
|
||||
pub enable: bool,
|
||||
pub stream_input: Vec<String>,
|
||||
input_param: String,
|
||||
pub input_cmd: Option<Vec<String>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
@ -109,8 +112,10 @@ pub struct Text {
|
||||
pub struct Out {
|
||||
pub mode: String,
|
||||
pub preview: bool,
|
||||
pub preview_param: Vec<String>,
|
||||
pub stream_param: Vec<String>,
|
||||
preview_param: String,
|
||||
pub preview_cmd: Option<Vec<String>>,
|
||||
output_param: String,
|
||||
pub output_cmd: Option<Vec<String>>,
|
||||
}
|
||||
|
||||
static INSTANCE: OnceCell<GlobalConfig> = OnceCell::new();
|
||||
@ -184,6 +189,10 @@ impl GlobalConfig {
|
||||
|
||||
config.processing.settings = Some(settings);
|
||||
|
||||
config.ingest.input_cmd = split(config.ingest.input_param.as_str());
|
||||
config.out.preview_cmd = split(config.out.preview_param.as_str());
|
||||
config.out.output_cmd = split(config.out.output_param.as_str());
|
||||
|
||||
if args.log.is_some() {
|
||||
config.logging.log_path = args.log.unwrap();
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user