Merge pull request #119 from jb-alvarado/master

change to crossbeam channel
This commit is contained in:
jb-alvarado 2022-04-19 22:23:23 +02:00 committed by GitHub
commit 2d561400ab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 74 additions and 44 deletions

57
Cargo.lock generated
View File

@ -94,16 +94,16 @@ dependencies = [
[[package]]
name = "clap"
version = "3.1.8"
version = "3.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "71c47df61d9e16dc010b55dba1952a57d8c215dbb533fd13cdd13369aac73b1c"
checksum = "6aad2534fad53df1cc12519c5cda696dd3e20e6118a027e24054aea14a0bdcbe"
dependencies = [
"atty",
"bitflags",
"clap_derive",
"clap_lex",
"indexmap",
"lazy_static",
"os_str_bytes",
"strsim",
"termcolor",
"textwrap",
@ -122,6 +122,15 @@ dependencies = [
"syn",
]
[[package]]
name = "clap_lex"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "189ddd3b5d32a70b35e7686054371742a937b0d99128e76dde6340210e966669"
dependencies = [
"os_str_bytes",
]
[[package]]
name = "core-foundation"
version = "0.9.3"
@ -147,6 +156,26 @@ dependencies = [
"cfg-if 1.0.0",
]
[[package]]
name = "crossbeam-channel"
version = "0.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5aaa7bd5fb665c6864b5f963dd9097905c54125909c7aa94c9e18507cdbe6c53"
dependencies = [
"cfg-if 1.0.0",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0bf124c720b7686e3c2663cf54062ab0f68a88af2fb6a030e87e30bf721fcb38"
dependencies = [
"cfg-if 1.0.0",
"lazy_static",
]
[[package]]
name = "email-encoding"
version = "0.1.0"
@ -167,10 +196,11 @@ dependencies = [
[[package]]
name = "ffplayout-engine"
version = "0.9.3"
version = "0.9.4"
dependencies = [
"chrono",
"clap",
"crossbeam-channel",
"ffprobe",
"file-rotate",
"jsonrpc-http-server",
@ -212,9 +242,9 @@ dependencies = [
[[package]]
name = "filetime"
version = "0.2.15"
version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "975ccf83d8d9d0d84682850a38c8169027be83368805971cc4f238c2b245bc98"
checksum = "c0408e2626025178a6a7f7ffc05a25bc47103229f19c113755de7bf63816290c"
dependencies = [
"cfg-if 1.0.0",
"libc",
@ -651,9 +681,9 @@ dependencies = [
[[package]]
name = "libc"
version = "0.2.123"
version = "0.2.124"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cb691a747a7ab48abc15c5b42066eaafde10dc427e3b6ee2a1cf43db04c763bd"
checksum = "21a41fed9d98f27ab1c6d161da622a4fa35e8a54a8adc24bbf3ddd0ef70b0e50"
[[package]]
name = "linked-hash-map"
@ -934,9 +964,6 @@ name = "os_str_bytes"
version = "6.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e22443d1643a904602595ba1cd8f7d896afe56d26712531c5ff73a15b2fbf64"
dependencies = [
"memchr",
]
[[package]]
name = "paris"
@ -1363,9 +1390,9 @@ checksum = "360dfd1d6d30e05fda32ace2c8c70e9c0a9da713275777f5a4dbb8a1893930c6"
[[package]]
name = "tracing"
version = "0.1.33"
version = "0.1.34"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "80b9fa4360528139bc96100c160b7ae879f5567f49f1782b0b02035b0358ebf3"
checksum = "5d0ecdcb44a79f0fe9844f0c4f33a342cbcbb5117de8001e6ba0dc2351327d09"
dependencies = [
"cfg-if 1.0.0",
"pin-project-lite",
@ -1374,9 +1401,9 @@ dependencies = [
[[package]]
name = "tracing-core"
version = "0.1.25"
version = "0.1.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6dfce9f3241b150f36e8e54bb561a742d5daa1a47b5dd9a5ce369fd4a4db2210"
checksum = "f54c8ca710e81886d498c2fd3331b56c93aa248d49de2222ad2742247c60072f"
dependencies = [
"lazy_static",
]

View File

@ -4,12 +4,13 @@ description = "24/7 playout based on rust and ffmpeg"
license = "GPL-3.0"
authors = ["Jonathan Baecker jonbae77@gmail.com"]
readme = "README.md"
version = "0.9.3"
version = "0.9.4"
edition = "2021"
[dependencies]
chrono = "0.4"
clap = { version = "3.1", features = ["derive"] }
crossbeam-channel = "0.5"
ffprobe = "0.3"
file-rotate = "0.6"
jsonrpc-http-server = "18.0"

View File

@ -10,7 +10,7 @@ general:
rpc_server:
help_text: Run a JSON RPC server, for getting infos about current playing, and
control for some functions.
enable: true
enable: false
address: 127.0.0.1:7070
authorization: av2Kx8g67lF9qj5wEH3ym1bI4cCs

View File

@ -1,13 +1,7 @@
use notify::{
DebouncedEvent::{Create, Remove, Rename},
{watcher, RecursiveMode, Watcher},
};
use rand::{seq::SliceRandom, thread_rng};
use simplelog::*;
use std::{
ffi::OsStr,
path::Path,
process::exit,
sync::{
mpsc::channel,
{Arc, Mutex},
@ -16,6 +10,12 @@ use std::{
time::Duration,
};
use notify::{
DebouncedEvent::{Create, Remove, Rename},
{watcher, RecursiveMode, Watcher},
};
use rand::{seq::SliceRandom, thread_rng};
use simplelog::*;
use walkdir::WalkDir;
use crate::utils::{get_sec, GlobalConfig, Media};
@ -34,6 +34,14 @@ impl Source {
let mut media_list = vec![];
let mut index: usize = 0;
if !Path::new(&config.storage.path).is_dir() {
error!(
"Path not exists: <b><magenta>{}</></b>",
config.storage.path
);
exit(1);
}
for entry in WalkDir::new(config.storage.path.clone())
.into_iter()
.filter_map(|e| e.ok())
@ -150,10 +158,7 @@ fn file_extension(filename: &Path) -> Option<&str> {
filename.extension().and_then(OsStr::to_str)
}
pub async fn watchman(
sources: Arc<Mutex<Vec<Media>>>,
is_terminated: Arc<Mutex<bool>>,
) {
pub async fn watchman(sources: Arc<Mutex<Vec<Media>>>, is_terminated: Arc<Mutex<bool>>) {
let config = GlobalConfig::global();
let (tx, rx) = channel();
@ -169,7 +174,7 @@ pub async fn watchman(
loop {
if *is_terminated.lock().unwrap() {
break
break;
}
if let Ok(res) = rx.try_recv() {

View File

@ -2,11 +2,11 @@ use std::{
io::{BufReader, Error, Read},
path::Path,
process::{Command, Stdio},
sync::mpsc::SyncSender,
thread::sleep,
time::Duration,
};
use crossbeam_channel::Sender;
use simplelog::*;
use tokio::runtime::Handle;
@ -54,7 +54,7 @@ fn audio_filter(config: &GlobalConfig) -> String {
pub async fn ingest_server(
log_format: String,
ingest_sender: SyncSender<(usize, [u8; 65088])>,
ingest_sender: Sender<(usize, [u8; 65088])>,
rt_handle: Handle,
mut proc_control: ProcessControl,
) -> Result<(), Error> {

View File

@ -1,11 +1,11 @@
use std::{
io::{prelude::*, BufReader, BufWriter, Read},
process::{Command, Stdio},
sync::mpsc::{sync_channel, Receiver, SyncSender},
thread::sleep,
time::Duration,
};
use crossbeam_channel::bounded;
use simplelog::*;
use tokio::runtime::Handle;
@ -30,7 +30,7 @@ pub fn player(
let config = GlobalConfig::global();
let dec_settings = config.processing.clone().settings.unwrap();
let ff_log_format = format!("level+{}", config.logging.ffmpeg_level.to_lowercase());
let mut buffer: [u8; 65088] = [0; 65088];
let mut buffer = [0; 65088];
let mut live_on = false;
let playlist_init = playout_stat.list_init.clone();
@ -53,15 +53,12 @@ pub fn player(
rt_handle.spawn(stderr_reader(enc_proc.stderr.take().unwrap(), "Encoder"));
*proc_control.decoder_term.lock().unwrap() = Some(enc_proc);
let (ingest_sender, ingest_receiver): (
SyncSender<(usize, [u8; 65088])>,
Receiver<(usize, [u8; 65088])>,
) = sync_channel(8);
let (ingest_sender, ingest_receiver) = bounded(96);
if config.ingest.enable {
rt_handle.spawn(ingest_server(
ff_log_format.clone(),
ingest_sender,
ingest_sender.clone(),
rt_handle.clone(),
proc_control.clone(),
));
@ -131,13 +128,12 @@ pub fn player(
}
live_on = true;
*playlist_init.lock().unwrap() = true;
}
if let Ok(receive) = ingest_receiver.try_recv() {
if let Err(e) = enc_writer.write(&receive.1[..receive.0]) {
error!("Ingest receiver error: {:?}", e);
for rx in ingest_receiver.try_iter() {
if let Err(e) = enc_writer.write(&rx.1[..rx.0]) {
error!("Encoder write error: {:?}", e);
break 'source_iter;
};

View File

@ -2,10 +2,11 @@ use clap::Parser;
#[derive(Parser, Debug)]
#[clap(version,
about = "ffplayout, Rust based 24/7 playout solution.\n\nRun without any command to use config file only, or with commands to override parameters.",
about = "ffplayout, Rust based 24/7 playout solution.",
override_usage = "Run without any command to use config file only, or with commands to override parameters:\n\n ffplayout [OPTIONS]",
long_about = None)]
pub struct Args {
#[clap(short, long, help = "File path to ffplayout.conf")]
#[clap(short, long, help = "File path to ffplayout.yml")]
pub config: Option<String>,
#[clap(short, long, help = "File path for logging")]