change sync_channel to crossbeam-channel

flush channel after switching
This commit is contained in:
jb-alvarado 2022-04-18 13:31:43 +02:00
parent 423c8f996f
commit 790e22a50d
4 changed files with 32 additions and 11 deletions

21
Cargo.lock generated
View File

@ -147,6 +147,26 @@ dependencies = [
"cfg-if 1.0.0", "cfg-if 1.0.0",
] ]
[[package]]
name = "crossbeam-channel"
version = "0.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5aaa7bd5fb665c6864b5f963dd9097905c54125909c7aa94c9e18507cdbe6c53"
dependencies = [
"cfg-if 1.0.0",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0bf124c720b7686e3c2663cf54062ab0f68a88af2fb6a030e87e30bf721fcb38"
dependencies = [
"cfg-if 1.0.0",
"lazy_static",
]
[[package]] [[package]]
name = "email-encoding" name = "email-encoding"
version = "0.1.0" version = "0.1.0"
@ -171,6 +191,7 @@ version = "0.9.3"
dependencies = [ dependencies = [
"chrono", "chrono",
"clap", "clap",
"crossbeam-channel",
"ffprobe", "ffprobe",
"file-rotate", "file-rotate",
"jsonrpc-http-server", "jsonrpc-http-server",

View File

@ -10,6 +10,7 @@ edition = "2021"
[dependencies] [dependencies]
chrono = "0.4" chrono = "0.4"
clap = { version = "3.1", features = ["derive"] } clap = { version = "3.1", features = ["derive"] }
crossbeam-channel = "0.5"
ffprobe = "0.3" ffprobe = "0.3"
file-rotate = "0.6" file-rotate = "0.6"
jsonrpc-http-server = "18.0" jsonrpc-http-server = "18.0"

View File

@ -2,11 +2,11 @@ use std::{
io::{BufReader, Error, Read}, io::{BufReader, Error, Read},
path::Path, path::Path,
process::{Command, Stdio}, process::{Command, Stdio},
sync::mpsc::SyncSender,
thread::sleep, thread::sleep,
time::Duration, time::Duration,
}; };
use crossbeam_channel::Sender;
use simplelog::*; use simplelog::*;
use tokio::runtime::Handle; use tokio::runtime::Handle;
@ -54,7 +54,7 @@ fn audio_filter(config: &GlobalConfig) -> String {
pub async fn ingest_server( pub async fn ingest_server(
log_format: String, log_format: String,
ingest_sender: SyncSender<(usize, [u8; 65088])>, ingest_sender: Sender<(usize, [u8; 65088])>,
rt_handle: Handle, rt_handle: Handle,
mut proc_control: ProcessControl, mut proc_control: ProcessControl,
) -> Result<(), Error> { ) -> Result<(), Error> {

View File

@ -1,11 +1,11 @@
use std::{ use std::{
io::{prelude::*, BufReader, BufWriter, Read}, io::{prelude::*, BufReader, BufWriter, Read},
process::{Command, Stdio}, process::{Command, Stdio},
sync::mpsc::{sync_channel, Receiver, SyncSender},
thread::sleep, thread::sleep,
time::Duration, time::Duration,
}; };
use crossbeam_channel::bounded;
use simplelog::*; use simplelog::*;
use tokio::runtime::Handle; use tokio::runtime::Handle;
@ -30,7 +30,7 @@ pub fn player(
let config = GlobalConfig::global(); let config = GlobalConfig::global();
let dec_settings = config.processing.clone().settings.unwrap(); let dec_settings = config.processing.clone().settings.unwrap();
let ff_log_format = format!("level+{}", config.logging.ffmpeg_level.to_lowercase()); let ff_log_format = format!("level+{}", config.logging.ffmpeg_level.to_lowercase());
let mut buffer: [u8; 65088] = [0; 65088]; let mut buffer = [0; 65088];
let mut live_on = false; let mut live_on = false;
let playlist_init = playout_stat.list_init.clone(); let playlist_init = playout_stat.list_init.clone();
@ -53,17 +53,13 @@ pub fn player(
rt_handle.spawn(stderr_reader(enc_proc.stderr.take().unwrap(), "Encoder")); rt_handle.spawn(stderr_reader(enc_proc.stderr.take().unwrap(), "Encoder"));
*proc_control.decoder_term.lock().unwrap() = Some(enc_proc); *proc_control.decoder_term.lock().unwrap() = Some(enc_proc);
// too small value for sync_channel size increases CPU load, // too small value for channel size increases CPU load,
// large values leave packets in queue which creates artifacts let (ingest_sender, ingest_receiver) = bounded(56);
let (ingest_sender, ingest_receiver): (
SyncSender<(usize, [u8; 65088])>,
Receiver<(usize, [u8; 65088])>,
) = sync_channel(16);
if config.ingest.enable { if config.ingest.enable {
rt_handle.spawn(ingest_server( rt_handle.spawn(ingest_server(
ff_log_format.clone(), ff_log_format.clone(),
ingest_sender, ingest_sender.clone(),
rt_handle.clone(), rt_handle.clone(),
proc_control.clone(), proc_control.clone(),
)); ));
@ -152,6 +148,9 @@ pub fn player(
error!("Encoder error: {e}") error!("Encoder error: {e}")
} }
let trashcan: Vec<_> = ingest_receiver.try_iter().collect();
drop(trashcan);
live_on = false; live_on = false;
} }