From 790e22a50d7e354ba5921626eec1fc148ab19012 Mon Sep 17 00:00:00 2001 From: jb-alvarado Date: Mon, 18 Apr 2022 13:31:43 +0200 Subject: [PATCH] change sync_channel to crossbeam-channel flush channel after switching --- Cargo.lock | 21 +++++++++++++++++++++ Cargo.toml | 1 + src/input/ingest.rs | 4 ++-- src/output/mod.rs | 17 ++++++++--------- 4 files changed, 32 insertions(+), 11 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f5a46d11..5e65c567 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -147,6 +147,26 @@ dependencies = [ "cfg-if 1.0.0", ] +[[package]] +name = "crossbeam-channel" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5aaa7bd5fb665c6864b5f963dd9097905c54125909c7aa94c9e18507cdbe6c53" +dependencies = [ + "cfg-if 1.0.0", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bf124c720b7686e3c2663cf54062ab0f68a88af2fb6a030e87e30bf721fcb38" +dependencies = [ + "cfg-if 1.0.0", + "lazy_static", +] + [[package]] name = "email-encoding" version = "0.1.0" @@ -171,6 +191,7 @@ version = "0.9.3" dependencies = [ "chrono", "clap", + "crossbeam-channel", "ffprobe", "file-rotate", "jsonrpc-http-server", diff --git a/Cargo.toml b/Cargo.toml index de6f1b5c..ca634de8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,7 @@ edition = "2021" [dependencies] chrono = "0.4" clap = { version = "3.1", features = ["derive"] } +crossbeam-channel = "0.5" ffprobe = "0.3" file-rotate = "0.6" jsonrpc-http-server = "18.0" diff --git a/src/input/ingest.rs b/src/input/ingest.rs index 8dbbedb2..23348462 100644 --- a/src/input/ingest.rs +++ b/src/input/ingest.rs @@ -2,11 +2,11 @@ use std::{ io::{BufReader, Error, Read}, path::Path, process::{Command, Stdio}, - sync::mpsc::SyncSender, thread::sleep, time::Duration, }; +use crossbeam_channel::Sender; use simplelog::*; use tokio::runtime::Handle; @@ -54,7 +54,7 @@ fn audio_filter(config: &GlobalConfig) -> String { pub async fn ingest_server( log_format: String, - ingest_sender: SyncSender<(usize, [u8; 65088])>, + ingest_sender: Sender<(usize, [u8; 65088])>, rt_handle: Handle, mut proc_control: ProcessControl, ) -> Result<(), Error> { diff --git a/src/output/mod.rs b/src/output/mod.rs index 2ff59b93..19b309be 100644 --- a/src/output/mod.rs +++ b/src/output/mod.rs @@ -1,11 +1,11 @@ use std::{ io::{prelude::*, BufReader, BufWriter, Read}, process::{Command, Stdio}, - sync::mpsc::{sync_channel, Receiver, SyncSender}, thread::sleep, time::Duration, }; +use crossbeam_channel::bounded; use simplelog::*; use tokio::runtime::Handle; @@ -30,7 +30,7 @@ pub fn player( let config = GlobalConfig::global(); let dec_settings = config.processing.clone().settings.unwrap(); let ff_log_format = format!("level+{}", config.logging.ffmpeg_level.to_lowercase()); - let mut buffer: [u8; 65088] = [0; 65088]; + let mut buffer = [0; 65088]; let mut live_on = false; let playlist_init = playout_stat.list_init.clone(); @@ -53,17 +53,13 @@ pub fn player( rt_handle.spawn(stderr_reader(enc_proc.stderr.take().unwrap(), "Encoder")); *proc_control.decoder_term.lock().unwrap() = Some(enc_proc); - // too small value for sync_channel size increases CPU load, - // large values leave packets in queue which creates artifacts - let (ingest_sender, ingest_receiver): ( - SyncSender<(usize, [u8; 65088])>, - Receiver<(usize, [u8; 65088])>, - ) = sync_channel(16); + // too small value for channel size increases CPU load, + let (ingest_sender, ingest_receiver) = bounded(56); if config.ingest.enable { rt_handle.spawn(ingest_server( ff_log_format.clone(), - ingest_sender, + ingest_sender.clone(), rt_handle.clone(), proc_control.clone(), )); @@ -152,6 +148,9 @@ pub fn player( error!("Encoder error: {e}") } + let trashcan: Vec<_> = ingest_receiver.try_iter().collect(); + drop(trashcan); + live_on = false; }